kratix_sdk

 1from .status import Status
 2from .resource import Resource
 3from .kratix_sdk import (
 4    KratixSDK,
 5    set_input_dir,
 6    set_output_dir,
 7    get_input_dir,
 8    get_output_dir,
 9    get_metadata_dir,
10    set_metadata_dir,
11)
12from .promise import Promise
13from .types import GroupVersionKind, DestinationSelector
14
15__all__ = [
16    "Status",
17    "Resource",
18    "Promise",
19    "GroupVersionKind",
20    "DestinationSelector",
21    "KratixSDK",
22    "set_input_dir",
23    "set_output_dir",
24    "get_input_dir",
25    "get_output_dir",
26    "get_metadata_dir",
27    "set_metadata_dir",
28]
class Status:
 6class Status:
 7    def __init__(self, data: Optional[Dict[str, Any]] = None):
 8        self.data: Dict[str, Any] = data or {}
 9
10    def get(self, path: str) -> Any:
11        """Retrieves the value at the specified path in Status."""
12        return _get_by_path(self.data, path)
13
14    def set(self, path: str, value: Any) -> None:
15        """Sets the value at the specified path in Status."""
16        _set_by_path(self.data, path, value)
17
18    def remove(self, path: str) -> None:
19        """Removes the value at the specified path in Status.
20        If the path does not exist, it retursn an error."""
21        _remove_by_path(self.data, path)
22
23    def to_dict(self) -> Dict[str, Any]:
24        return self.data
Status(data: Optional[Dict[str, Any]] = None)
7    def __init__(self, data: Optional[Dict[str, Any]] = None):
8        self.data: Dict[str, Any] = data or {}
data: Dict[str, Any]
def get(self, path: str) -> Any:
10    def get(self, path: str) -> Any:
11        """Retrieves the value at the specified path in Status."""
12        return _get_by_path(self.data, path)

Retrieves the value at the specified path in Status.

def set(self, path: str, value: Any) -> None:
14    def set(self, path: str, value: Any) -> None:
15        """Sets the value at the specified path in Status."""
16        _set_by_path(self.data, path, value)

Sets the value at the specified path in Status.

def remove(self, path: str) -> None:
18    def remove(self, path: str) -> None:
19        """Removes the value at the specified path in Status.
20        If the path does not exist, it retursn an error."""
21        _remove_by_path(self.data, path)

Removes the value at the specified path in Status. If the path does not exist, it retursn an error.

def to_dict(self) -> Dict[str, Any]:
23    def to_dict(self) -> Dict[str, Any]:
24        return self.data
class Resource:
 8class Resource:
 9    def __init__(self, data: Dict[str, Any]):
10        self.data = data
11
12    def get_value(self, path: str, **kwargs) -> Any:
13        """Get a value from the resource request by path.
14        Args:
15            path (str): The path to the value in the resource data.
16
17        KWargs:
18            default: The default value to return if the path is not found.
19
20        Raises:
21            KeyError: If the path is not found and no default is provided.
22
23        Returns:
24            Any: The value at the specified path in the resource data.
25        """
26        return _get_by_path(self.data, path, **kwargs)
27
28    def get_status(self, path: str = "") -> Status:
29        """Get the status of the resource by path.
30        If path is empty, return the entire status.
31        If path is provided, return the value at that path."""
32        status_data = self.data.get("status", {})
33        if path:
34            value = _get_by_path(status_data, path)
35            return Status(value if isinstance(value, dict) else {"value": value})
36        return Status(status_data)
37
38    def get_name(self) -> str:
39        """Get the name of the resource."""
40        return self.data.get("metadata", {}).get("name", "")
41
42    def get_namespace(self) -> str:
43        """Get the namespace of the resource."""
44        return self.data.get("metadata", {}).get("namespace", "")
45
46    def get_group_version_kind(self) -> GroupVersionKind:
47        """Get the GroupVersionKind of the resource."""
48        api_version = self.data.get("apiVersion", "")
49        if "/" in api_version:
50            group, version = api_version.split("/", 1)
51        else:
52            group, version = "", api_version
53        kind = self.data.get("kind", "")
54        return GroupVersionKind(group=group, version=version, kind=kind)
55
56    def get_labels(self) -> Dict[str, str]:
57        """Get the labels of the resource."""
58        return self.data.get("metadata", {}).get("labels", {})
59
60    def get_annotations(self) -> Dict[str, str]:
61        """Get the annotations of the resource."""
62        return self.data.get("metadata", {}).get("annotations", {})
Resource(data: Dict[str, Any])
 9    def __init__(self, data: Dict[str, Any]):
10        self.data = data
data
def get_value(self, path: str, **kwargs) -> Any:
12    def get_value(self, path: str, **kwargs) -> Any:
13        """Get a value from the resource request by path.
14        Args:
15            path (str): The path to the value in the resource data.
16
17        KWargs:
18            default: The default value to return if the path is not found.
19
20        Raises:
21            KeyError: If the path is not found and no default is provided.
22
23        Returns:
24            Any: The value at the specified path in the resource data.
25        """
26        return _get_by_path(self.data, path, **kwargs)

Get a value from the resource request by path. Args: path (str): The path to the value in the resource data.

KWargs: default: The default value to return if the path is not found.

Raises: KeyError: If the path is not found and no default is provided.

Returns: Any: The value at the specified path in the resource data.

def get_status(self, path: str = '') -> Status:
28    def get_status(self, path: str = "") -> Status:
29        """Get the status of the resource by path.
30        If path is empty, return the entire status.
31        If path is provided, return the value at that path."""
32        status_data = self.data.get("status", {})
33        if path:
34            value = _get_by_path(status_data, path)
35            return Status(value if isinstance(value, dict) else {"value": value})
36        return Status(status_data)

Get the status of the resource by path. If path is empty, return the entire status. If path is provided, return the value at that path.

def get_name(self) -> str:
38    def get_name(self) -> str:
39        """Get the name of the resource."""
40        return self.data.get("metadata", {}).get("name", "")

Get the name of the resource.

def get_namespace(self) -> str:
42    def get_namespace(self) -> str:
43        """Get the namespace of the resource."""
44        return self.data.get("metadata", {}).get("namespace", "")

Get the namespace of the resource.

def get_group_version_kind(self) -> GroupVersionKind:
46    def get_group_version_kind(self) -> GroupVersionKind:
47        """Get the GroupVersionKind of the resource."""
48        api_version = self.data.get("apiVersion", "")
49        if "/" in api_version:
50            group, version = api_version.split("/", 1)
51        else:
52            group, version = "", api_version
53        kind = self.data.get("kind", "")
54        return GroupVersionKind(group=group, version=version, kind=kind)

Get the GroupVersionKind of the resource.

def get_labels(self) -> Dict[str, str]:
56    def get_labels(self) -> Dict[str, str]:
57        """Get the labels of the resource."""
58        return self.data.get("metadata", {}).get("labels", {})

Get the labels of the resource.

def get_annotations(self) -> Dict[str, str]:
60    def get_annotations(self) -> Dict[str, str]:
61        """Get the annotations of the resource."""
62        return self.data.get("metadata", {}).get("annotations", {})

Get the annotations of the resource.

class Promise:
 5class Promise:
 6    def __init__(self, data: Optional[Dict[str, Any]] = None):
 7        self.data: Dict[str, Any] = data or {}
 8
 9    def get_name(self) -> str:
10        """Get the name of the promise."""
11        return self.data.get("metadata", {}).get("name", "")
12
13    def get_labels(self) -> Dict[str, str]:
14        """Get the labels of the promise."""
15        return self.data.get("metadata", {}).get("labels", {}) or {}
16
17    def get_annotations(self) -> Dict[str, str]:
18        """Get the annotations of the promise."""
19        return self.data.get("metadata", {}).get("annotations", {}) or {}
Promise(data: Optional[Dict[str, Any]] = None)
6    def __init__(self, data: Optional[Dict[str, Any]] = None):
7        self.data: Dict[str, Any] = data or {}
data: Dict[str, Any]
def get_name(self) -> str:
 9    def get_name(self) -> str:
10        """Get the name of the promise."""
11        return self.data.get("metadata", {}).get("name", "")

Get the name of the promise.

def get_labels(self) -> Dict[str, str]:
13    def get_labels(self) -> Dict[str, str]:
14        """Get the labels of the promise."""
15        return self.data.get("metadata", {}).get("labels", {}) or {}

Get the labels of the promise.

def get_annotations(self) -> Dict[str, str]:
17    def get_annotations(self) -> Dict[str, str]:
18        """Get the annotations of the promise."""
19        return self.data.get("metadata", {}).get("annotations", {}) or {}

Get the annotations of the promise.

@dataclass
class GroupVersionKind:
 6@dataclass
 7class GroupVersionKind:
 8    group: str
 9    version: str
10    kind: str
GroupVersionKind(group: str, version: str, kind: str)
group: str
version: str
kind: str
@dataclass
class DestinationSelector:
13@dataclass
14class DestinationSelector:
15    directory: str = ""
16    match_labels: Dict[str, Any] = field(default_factory=dict)
DestinationSelector(directory: str = '', match_labels: Dict[str, Any] = <factory>)
directory: str = ''
match_labels: Dict[str, Any]
class KratixSDK:
 44class KratixSDK:
 45    def read_resource_input(self) -> Resource:
 46        """Reads the file in /kratix/input/object.yaml and returns a Resource.
 47        Can be used in Resource configure workflow."""
 48        path = INPUT_DIR / "object.yaml"
 49        with path.open() as f:
 50            data = yaml.safe_load(f) or {}
 51        return Resource(data)
 52
 53    def read_promise_input(self) -> Promise:
 54        """Reads the file in /kratix/input/object.yaml and returns a Promise.
 55        Can be used in Promise configure workflow."""
 56        path = INPUT_DIR / "object.yaml"
 57        with path.open() as f:
 58            data = yaml.safe_load(f) or {}
 59        return Promise(data)
 60
 61    def read_status(self) -> Status:
 62        """Reads the file in /kratix/metadata/status.yaml and returns a Status."""
 63        path = METADATA_DIR / "status.yaml"
 64        with path.open() as f:
 65            data = yaml.safe_load(f) or {}
 66        return Status(data)
 67
 68    def read_destination_selectors(self) -> List[DestinationSelector]:
 69        """Reads the file in /kratix/metadata/destination-selectors.yaml and returns a list of DestinationSelector"""
 70        path = METADATA_DIR / "destination-selectors.yaml"
 71        with path.open() as f:
 72            raw = yaml.safe_load(f) or []
 73        selectors = [
 74            DestinationSelector(
 75                directory=item.get("directory", ""),
 76                match_labels=item.get("matchLabels", {}) or {},
 77            )
 78            for item in raw
 79        ]
 80        return selectors
 81
 82    def write_output(self, relative_path: str, content: bytes) -> None:
 83        """writes the content to the specifies file at the path /kratix/output/relative_path."""
 84        dest = OUTPUT_DIR / relative_path
 85        dest.parent.mkdir(parents=True, exist_ok=True)
 86        with dest.open("wb") as f:
 87            f.write(content)
 88
 89    def write_status(self, status: Status) -> None:
 90        """writes the specified status to the /kratix/metadata/status.yaml."""
 91        path = METADATA_DIR / "status.yaml"
 92        path.parent.mkdir(parents=True, exist_ok=True)
 93        with path.open("w") as f:
 94            yaml.safe_dump(status.to_dict(), f)
 95
 96    def write_destination_selectors(self, selectors: List[DestinationSelector]) -> None:
 97        """writes the specified Destination Selectors to the /kratix/metadata/destination_selectors.yaml."""
 98        path = METADATA_DIR / "destination-selectors.yaml"
 99        data = []
100        for s in selectors:
101            data.append(
102                {
103                    "directory": s.directory or "",  # directory is optional
104                    "matchLabels": s.match_labels or {},
105                }
106            )
107        path.parent.mkdir(parents=True, exist_ok=True)
108        with path.open("w") as f:
109            yaml.safe_dump(data, f)
110
111    def workflow_action(self) -> str:
112        """Returns the value of KRATIX_WORKFLOW_ACTION environment variable."""
113        return os.getenv("KRATIX_WORKFLOW_ACTION", "")
114
115    def workflow_type(self) -> str:
116        """Returns the value of KRATIX_WORKFLOW_TYPE environment variable."""
117        return os.getenv("KRATIX_WORKFLOW_TYPE", "")
118
119    def promise_name(self) -> str:
120        """Returns the value of KRATIX_PROMISE_NAME environment variable."""
121        return os.getenv("KRATIX_PROMISE_NAME", "")
122
123    def pipeline_name(self) -> str:
124        """Returns the value of KRATIX_PIPELINE_NAME environment variable."""
125        return os.getenv("KRATIX_PIPELINE_NAME", "")
126
127    def publish_status(self, resource: Resource, status: Status) -> None:
128        """Updates the status of a Resource.
129        This function uses the Kubernetes API to patch the status of a Custom Resource.
130        Update is instant and will not change the /kratix/metadata/status.yaml file."""
131        try:
132            k8s_config.load_incluster_config()
133        except Exception:
134            k8s_config.load_kube_config()
135
136        gvk = resource.get_group_version_kind()
137        plural = os.getenv("KRATIX_CRD_PLURAL")
138        if not plural:
139            raise RuntimeError("KRATIX_CRD_PLURAL environment variable is not set")
140
141        namespace = resource.get_namespace()
142        name = resource.get_name()
143
144        body = {"status": status.to_dict()}
145        api = k8s_client.CustomObjectsApi()
146        api.api_client.set_default_header(
147            "Content-Type", "application/merge-patch+json"
148        )
149        api.patch_namespaced_custom_object_status(
150            group=gvk.group,
151            version=gvk.version,
152            namespace=namespace,
153            plural=plural,
154            name=name,
155            body=body,
156        )
def read_resource_input(self) -> Resource:
45    def read_resource_input(self) -> Resource:
46        """Reads the file in /kratix/input/object.yaml and returns a Resource.
47        Can be used in Resource configure workflow."""
48        path = INPUT_DIR / "object.yaml"
49        with path.open() as f:
50            data = yaml.safe_load(f) or {}
51        return Resource(data)

Reads the file in /kratix/input/object.yaml and returns a Resource. Can be used in Resource configure workflow.

def read_promise_input(self) -> Promise:
53    def read_promise_input(self) -> Promise:
54        """Reads the file in /kratix/input/object.yaml and returns a Promise.
55        Can be used in Promise configure workflow."""
56        path = INPUT_DIR / "object.yaml"
57        with path.open() as f:
58            data = yaml.safe_load(f) or {}
59        return Promise(data)

Reads the file in /kratix/input/object.yaml and returns a Promise. Can be used in Promise configure workflow.

def read_status(self) -> Status:
61    def read_status(self) -> Status:
62        """Reads the file in /kratix/metadata/status.yaml and returns a Status."""
63        path = METADATA_DIR / "status.yaml"
64        with path.open() as f:
65            data = yaml.safe_load(f) or {}
66        return Status(data)

Reads the file in /kratix/metadata/status.yaml and returns a Status.

def read_destination_selectors(self) -> List[DestinationSelector]:
68    def read_destination_selectors(self) -> List[DestinationSelector]:
69        """Reads the file in /kratix/metadata/destination-selectors.yaml and returns a list of DestinationSelector"""
70        path = METADATA_DIR / "destination-selectors.yaml"
71        with path.open() as f:
72            raw = yaml.safe_load(f) or []
73        selectors = [
74            DestinationSelector(
75                directory=item.get("directory", ""),
76                match_labels=item.get("matchLabels", {}) or {},
77            )
78            for item in raw
79        ]
80        return selectors

Reads the file in /kratix/metadata/destination-selectors.yaml and returns a list of DestinationSelector

def write_output(self, relative_path: str, content: bytes) -> None:
82    def write_output(self, relative_path: str, content: bytes) -> None:
83        """writes the content to the specifies file at the path /kratix/output/relative_path."""
84        dest = OUTPUT_DIR / relative_path
85        dest.parent.mkdir(parents=True, exist_ok=True)
86        with dest.open("wb") as f:
87            f.write(content)

writes the content to the specifies file at the path /kratix/output/relative_path.

def write_status(self, status: Status) -> None:
89    def write_status(self, status: Status) -> None:
90        """writes the specified status to the /kratix/metadata/status.yaml."""
91        path = METADATA_DIR / "status.yaml"
92        path.parent.mkdir(parents=True, exist_ok=True)
93        with path.open("w") as f:
94            yaml.safe_dump(status.to_dict(), f)

writes the specified status to the /kratix/metadata/status.yaml.

def write_destination_selectors(self, selectors: List[DestinationSelector]) -> None:
 96    def write_destination_selectors(self, selectors: List[DestinationSelector]) -> None:
 97        """writes the specified Destination Selectors to the /kratix/metadata/destination_selectors.yaml."""
 98        path = METADATA_DIR / "destination-selectors.yaml"
 99        data = []
100        for s in selectors:
101            data.append(
102                {
103                    "directory": s.directory or "",  # directory is optional
104                    "matchLabels": s.match_labels or {},
105                }
106            )
107        path.parent.mkdir(parents=True, exist_ok=True)
108        with path.open("w") as f:
109            yaml.safe_dump(data, f)

writes the specified Destination Selectors to the /kratix/metadata/destination_selectors.yaml.

def workflow_action(self) -> str:
111    def workflow_action(self) -> str:
112        """Returns the value of KRATIX_WORKFLOW_ACTION environment variable."""
113        return os.getenv("KRATIX_WORKFLOW_ACTION", "")

Returns the value of KRATIX_WORKFLOW_ACTION environment variable.

def workflow_type(self) -> str:
115    def workflow_type(self) -> str:
116        """Returns the value of KRATIX_WORKFLOW_TYPE environment variable."""
117        return os.getenv("KRATIX_WORKFLOW_TYPE", "")

Returns the value of KRATIX_WORKFLOW_TYPE environment variable.

def promise_name(self) -> str:
119    def promise_name(self) -> str:
120        """Returns the value of KRATIX_PROMISE_NAME environment variable."""
121        return os.getenv("KRATIX_PROMISE_NAME", "")

Returns the value of KRATIX_PROMISE_NAME environment variable.

def pipeline_name(self) -> str:
123    def pipeline_name(self) -> str:
124        """Returns the value of KRATIX_PIPELINE_NAME environment variable."""
125        return os.getenv("KRATIX_PIPELINE_NAME", "")

Returns the value of KRATIX_PIPELINE_NAME environment variable.

def publish_status( self, resource: Resource, status: Status) -> None:
127    def publish_status(self, resource: Resource, status: Status) -> None:
128        """Updates the status of a Resource.
129        This function uses the Kubernetes API to patch the status of a Custom Resource.
130        Update is instant and will not change the /kratix/metadata/status.yaml file."""
131        try:
132            k8s_config.load_incluster_config()
133        except Exception:
134            k8s_config.load_kube_config()
135
136        gvk = resource.get_group_version_kind()
137        plural = os.getenv("KRATIX_CRD_PLURAL")
138        if not plural:
139            raise RuntimeError("KRATIX_CRD_PLURAL environment variable is not set")
140
141        namespace = resource.get_namespace()
142        name = resource.get_name()
143
144        body = {"status": status.to_dict()}
145        api = k8s_client.CustomObjectsApi()
146        api.api_client.set_default_header(
147            "Content-Type", "application/merge-patch+json"
148        )
149        api.patch_namespaced_custom_object_status(
150            group=gvk.group,
151            version=gvk.version,
152            namespace=namespace,
153            plural=plural,
154            name=name,
155            body=body,
156        )

Updates the status of a Resource. This function uses the Kubernetes API to patch the status of a Custom Resource. Update is instant and will not change the /kratix/metadata/status.yaml file.

def set_input_dir(path: pathlib.Path | str) -> None:
29def set_input_dir(path: Path | str) -> None:
30    global INPUT_DIR
31    INPUT_DIR = Path(path)
def set_output_dir(path: pathlib.Path | str) -> None:
34def set_output_dir(path: Path | str) -> None:
35    global OUTPUT_DIR
36    OUTPUT_DIR = Path(path)
def get_input_dir() -> pathlib.Path:
17def get_input_dir() -> Path:
18    return INPUT_DIR
def get_output_dir() -> pathlib.Path:
21def get_output_dir() -> Path:
22    return OUTPUT_DIR
def get_metadata_dir() -> pathlib.Path:
25def get_metadata_dir() -> Path:
26    return METADATA_DIR
def set_metadata_dir(path: pathlib.Path | str) -> None:
39def set_metadata_dir(path: Path | str) -> None:
40    global METADATA_DIR
41    METADATA_DIR = Path(path)