kratix_sdk v0.4.1

class Status:
 7class Status:
 8    def __init__(self, data: dict[str, Any] | None = None):
 9        self.data: dict[str, Any] = data or {}
10
11    def get(self, path: str) -> Any:
12        """Retrieves the value at the specified path in Status."""
13        return _get_by_path(self.data, path)
14
15    def set(self, path: str, value: Any) -> None:
16        """Sets the value at the specified path in Status."""
17        _set_by_path(self.data, path, value)
18
19    def remove(self, path: str) -> None:
20        """Removes the value at the specified path in Status.
21        If the path does not exist, it retursn an error."""
22        _remove_by_path(self.data, path)
23
24    def to_dict(self) -> dict[str, Any]:
25        return self.data
Status(data: dict[str, typing.Any] | None = None)
8    def __init__(self, data: dict[str, Any] | None = None):
9        self.data: dict[str, Any] = data or {}
data: dict[str, typing.Any]
def get(self, path: str) -> Any:
11    def get(self, path: str) -> Any:
12        """Retrieves the value at the specified path in Status."""
13        return _get_by_path(self.data, path)

Retrieves the value at the specified path in Status.

def set(self, path: str, value: Any) -> None:
15    def set(self, path: str, value: Any) -> None:
16        """Sets the value at the specified path in Status."""
17        _set_by_path(self.data, path, value)

Sets the value at the specified path in Status.

def remove(self, path: str) -> None:
19    def remove(self, path: str) -> None:
20        """Removes the value at the specified path in Status.
21        If the path does not exist, it retursn an error."""
22        _remove_by_path(self.data, path)

Removes the value at the specified path in Status. If the path does not exist, it retursn an error.

def to_dict(self) -> dict[str, typing.Any]:
24    def to_dict(self) -> dict[str, Any]:
25        return self.data
class Resource:
 9class Resource:
10    def __init__(self, data: dict[str, Any]):
11        self.data = data
12
13    def get_value(self, path: str, **kwargs) -> Any:
14        """Get a value from the resource request by path.
15        Args:
16            path (str): The path to the value in the resource data.
17
18        KWargs:
19            default: The default value to return if the path is not found.
20
21        Raises:
22            KeyError: If the path is not found and no default is provided.
23
24        Returns:
25            Any: The value at the specified path in the resource data.
26        """
27        return _get_by_path(self.data, path, **kwargs)
28
29    def get_status(self, path: str = "") -> Status:
30        """Get the status of the resource by path.
31        If path is empty, return the entire status.
32        If path is provided, return the value at that path."""
33        status_data = self.data.get("status", {})
34        if path:
35            value = _get_by_path(status_data, path)
36            return Status(value if isinstance(value, dict) else {"value": value})
37        return Status(status_data)
38
39    def get_name(self) -> str:
40        """Get the name of the resource."""
41        return self.data.get("metadata", {}).get("name", "")
42
43    def get_namespace(self) -> str:
44        """Get the namespace of the resource."""
45        return self.data.get("metadata", {}).get("namespace", "")
46
47    def get_group_version_kind(self) -> GroupVersionKind:
48        """Get the GroupVersionKind of the resource."""
49        api_version = self.data.get("apiVersion", "")
50        if "/" in api_version:
51            group, version = api_version.split("/", 1)
52        else:
53            group, version = "", api_version
54        kind = self.data.get("kind", "")
55        return GroupVersionKind(group=group, version=version, kind=kind)
56
57    def get_labels(self) -> dict[str, str]:
58        """Get the labels of the resource."""
59        return self.data.get("metadata", {}).get("labels", {})
60
61    def get_annotations(self) -> dict[str, str]:
62        """Get the annotations of the resource."""
63        return self.data.get("metadata", {}).get("annotations", {})
Resource(data: dict[str, typing.Any])
10    def __init__(self, data: dict[str, Any]):
11        self.data = data
data
def get_value(self, path: str, **kwargs) -> Any:
13    def get_value(self, path: str, **kwargs) -> Any:
14        """Get a value from the resource request by path.
15        Args:
16            path (str): The path to the value in the resource data.
17
18        KWargs:
19            default: The default value to return if the path is not found.
20
21        Raises:
22            KeyError: If the path is not found and no default is provided.
23
24        Returns:
25            Any: The value at the specified path in the resource data.
26        """
27        return _get_by_path(self.data, path, **kwargs)

Get a value from the resource request by path. Args: path (str): The path to the value in the resource data.

KWargs: default: The default value to return if the path is not found.

Raises: KeyError: If the path is not found and no default is provided.

Returns: Any: The value at the specified path in the resource data.

def get_status(self, path: str = '') -> Status:
29    def get_status(self, path: str = "") -> Status:
30        """Get the status of the resource by path.
31        If path is empty, return the entire status.
32        If path is provided, return the value at that path."""
33        status_data = self.data.get("status", {})
34        if path:
35            value = _get_by_path(status_data, path)
36            return Status(value if isinstance(value, dict) else {"value": value})
37        return Status(status_data)

Get the status of the resource by path. If path is empty, return the entire status. If path is provided, return the value at that path.

def get_name(self) -> str:
39    def get_name(self) -> str:
40        """Get the name of the resource."""
41        return self.data.get("metadata", {}).get("name", "")

Get the name of the resource.

def get_namespace(self) -> str:
43    def get_namespace(self) -> str:
44        """Get the namespace of the resource."""
45        return self.data.get("metadata", {}).get("namespace", "")

Get the namespace of the resource.

def get_group_version_kind(self) -> GroupVersionKind:
47    def get_group_version_kind(self) -> GroupVersionKind:
48        """Get the GroupVersionKind of the resource."""
49        api_version = self.data.get("apiVersion", "")
50        if "/" in api_version:
51            group, version = api_version.split("/", 1)
52        else:
53            group, version = "", api_version
54        kind = self.data.get("kind", "")
55        return GroupVersionKind(group=group, version=version, kind=kind)

Get the GroupVersionKind of the resource.

def get_labels(self) -> dict[str, str]:
57    def get_labels(self) -> dict[str, str]:
58        """Get the labels of the resource."""
59        return self.data.get("metadata", {}).get("labels", {})

Get the labels of the resource.

def get_annotations(self) -> dict[str, str]:
61    def get_annotations(self) -> dict[str, str]:
62        """Get the annotations of the resource."""
63        return self.data.get("metadata", {}).get("annotations", {})

Get the annotations of the resource.

class Promise:
 5class Promise:
 6    def __init__(self, data: dict[str, Any] | None = None):
 7        self.data: dict[str, Any] = data or {}
 8
 9    def get_name(self) -> str:
10        """Get the name of the promise."""
11        return self.data.get("metadata", {}).get("name", "")
12
13    def get_labels(self) -> dict[str, str]:
14        """Get the labels of the promise."""
15        return self.data.get("metadata", {}).get("labels", {}) or {}
16
17    def get_annotations(self) -> dict[str, str]:
18        """Get the annotations of the promise."""
19        return self.data.get("metadata", {}).get("annotations", {}) or {}
Promise(data: dict[str, typing.Any] | None = None)
6    def __init__(self, data: dict[str, Any] | None = None):
7        self.data: dict[str, Any] = data or {}
data: dict[str, typing.Any]
def get_name(self) -> str:
 9    def get_name(self) -> str:
10        """Get the name of the promise."""
11        return self.data.get("metadata", {}).get("name", "")

Get the name of the promise.

def get_labels(self) -> dict[str, str]:
13    def get_labels(self) -> dict[str, str]:
14        """Get the labels of the promise."""
15        return self.data.get("metadata", {}).get("labels", {}) or {}

Get the labels of the promise.

def get_annotations(self) -> dict[str, str]:
17    def get_annotations(self) -> dict[str, str]:
18        """Get the annotations of the promise."""
19        return self.data.get("metadata", {}).get("annotations", {}) or {}

Get the annotations of the promise.

@dataclass
class GroupVersionKind:
 6@dataclass
 7class GroupVersionKind:
 8    group: str
 9    version: str
10    kind: str
GroupVersionKind(group: str, version: str, kind: str)
group: str
version: str
kind: str
@dataclass
class DestinationSelector:
13@dataclass
14class DestinationSelector:
15    directory: str = ""
16    match_labels: dict[str, Any] = field(default_factory=dict)
DestinationSelector(directory: str = '', match_labels: dict[str, typing.Any] = <factory>)
directory: str = ''
match_labels: dict[str, typing.Any]
class KratixSDK:
 46class KratixSDK:
 47    def read_resource_input(self) -> Resource:
 48        """Reads the file in /kratix/input/object.yaml and returns a Resource.
 49        Can be used in Resource configure workflow."""
 50        path = INPUT_DIR / "object.yaml"
 51        with path.open() as f:
 52            data = yaml.safe_load(f) or {}
 53        return Resource(data)
 54
 55    def read_promise_input(self) -> Promise:
 56        """Reads the file in /kratix/input/object.yaml and returns a Promise.
 57        Can be used in Promise configure workflow."""
 58        path = INPUT_DIR / "object.yaml"
 59        with path.open() as f:
 60            data = yaml.safe_load(f) or {}
 61        return Promise(data)
 62
 63    def read_status(self) -> Status:
 64        """Reads the file in /kratix/metadata/status.yaml and returns a Status."""
 65        path = METADATA_DIR / "status.yaml"
 66        with path.open() as f:
 67            data = yaml.safe_load(f) or {}
 68        return Status(data)
 69
 70    def read_destination_selectors(self) -> list[DestinationSelector]:
 71        """Reads the file in /kratix/metadata/destination-selectors.yaml and
 72        returns a list of DestinationSelector"""
 73        path = METADATA_DIR / "destination-selectors.yaml"
 74        with path.open() as f:
 75            raw = yaml.safe_load(f) or []
 76        selectors = [
 77            DestinationSelector(
 78                directory=item.get("directory", ""),
 79                match_labels=item.get("matchLabels", {}) or {},
 80            )
 81            for item in raw
 82        ]
 83        return selectors
 84
 85    def write_output(self, relative_path: str, content: bytes) -> None:
 86        """writes the content to the specifies file at the path
 87        /kratix/output/relative_path."""
 88        dest = OUTPUT_DIR / relative_path
 89        dest.parent.mkdir(parents=True, exist_ok=True)
 90        with dest.open("wb") as f:
 91            f.write(content)
 92
 93    def write_status(self, status: Status) -> None:
 94        """writes the specified status to the /kratix/metadata/status.yaml."""
 95        path = METADATA_DIR / "status.yaml"
 96        path.parent.mkdir(parents=True, exist_ok=True)
 97        with path.open("w") as f:
 98            yaml.safe_dump(status.to_dict(), f)
 99
100    def write_destination_selectors(self, selectors: list[DestinationSelector]) -> None:
101        """writes the specified Destination Selectors to the
102        /kratix/metadata/destination_selectors.yaml."""
103        path = METADATA_DIR / "destination-selectors.yaml"
104        data = []
105        for s in selectors:
106            data.append(
107                {
108                    "directory": s.directory or "",  # directory is optional
109                    "matchLabels": s.match_labels or {},
110                }
111            )
112        path.parent.mkdir(parents=True, exist_ok=True)
113        with path.open("w") as f:
114            yaml.safe_dump(data, f)
115
116    def workflow_action(self) -> str:
117        """Returns the value of KRATIX_WORKFLOW_ACTION environment variable."""
118        return os.getenv("KRATIX_WORKFLOW_ACTION", "")
119
120    def workflow_type(self) -> str:
121        """Returns the value of KRATIX_WORKFLOW_TYPE environment variable."""
122        return os.getenv("KRATIX_WORKFLOW_TYPE", "")
123
124    def promise_name(self) -> str:
125        """Returns the value of KRATIX_PROMISE_NAME environment variable."""
126        return os.getenv("KRATIX_PROMISE_NAME", "")
127
128    def pipeline_name(self) -> str:
129        """Returns the value of KRATIX_PIPELINE_NAME environment variable."""
130        return os.getenv("KRATIX_PIPELINE_NAME", "")
131
132    def publish_status(self, resource: Resource, status: Status) -> None:
133        """Updates the status of a Resource.
134        This function uses the Kubernetes API to patch the status of a Custom Resource.
135        Update is instant and will not change the /kratix/metadata/status.yaml file."""
136        try:
137            k8s_config.load_incluster_config()
138        except Exception:
139            k8s_config.load_kube_config()
140
141        gvk = resource.get_group_version_kind()
142        plural = os.getenv("KRATIX_CRD_PLURAL")
143        if not plural:
144            raise RuntimeError("KRATIX_CRD_PLURAL environment variable is not set")
145
146        namespace = resource.get_namespace()
147        name = resource.get_name()
148
149        body = {"status": status.to_dict()}
150        api = k8s_client.CustomObjectsApi()
151        api.api_client.set_default_header(
152            "Content-Type", "application/merge-patch+json"
153        )
154        api.patch_namespaced_custom_object_status(
155            group=gvk.group,
156            version=gvk.version,
157            namespace=namespace,
158            plural=plural,
159            name=name,
160            body=body,
161        )
162
163    def is_promise_workflow(self) -> bool:
164        """Returns true if the workflow is a promise workflow."""
165        return self.workflow_type() == "promise"
166
167    def is_resource_workflow(self) -> bool:
168        """Returns true if the workflow is a resource workflow."""
169        return self.workflow_type() == "resource"
170
171    def is_configure_action(self) -> bool:
172        """Returns true if the workflow is a configure action."""
173        return self.workflow_action() == "configure"
174
175    def is_delete_action(self) -> bool:
176        """Returns true if the workflow is a delete action."""
177        return self.workflow_action() == "delete"
def read_resource_input(self) -> Resource:
47    def read_resource_input(self) -> Resource:
48        """Reads the file in /kratix/input/object.yaml and returns a Resource.
49        Can be used in Resource configure workflow."""
50        path = INPUT_DIR / "object.yaml"
51        with path.open() as f:
52            data = yaml.safe_load(f) or {}
53        return Resource(data)

Reads the file in /kratix/input/object.yaml and returns a Resource. Can be used in Resource configure workflow.

def read_promise_input(self) -> Promise:
55    def read_promise_input(self) -> Promise:
56        """Reads the file in /kratix/input/object.yaml and returns a Promise.
57        Can be used in Promise configure workflow."""
58        path = INPUT_DIR / "object.yaml"
59        with path.open() as f:
60            data = yaml.safe_load(f) or {}
61        return Promise(data)

Reads the file in /kratix/input/object.yaml and returns a Promise. Can be used in Promise configure workflow.

def read_status(self) -> Status:
63    def read_status(self) -> Status:
64        """Reads the file in /kratix/metadata/status.yaml and returns a Status."""
65        path = METADATA_DIR / "status.yaml"
66        with path.open() as f:
67            data = yaml.safe_load(f) or {}
68        return Status(data)

Reads the file in /kratix/metadata/status.yaml and returns a Status.

def read_destination_selectors(self) -> list[DestinationSelector]:
70    def read_destination_selectors(self) -> list[DestinationSelector]:
71        """Reads the file in /kratix/metadata/destination-selectors.yaml and
72        returns a list of DestinationSelector"""
73        path = METADATA_DIR / "destination-selectors.yaml"
74        with path.open() as f:
75            raw = yaml.safe_load(f) or []
76        selectors = [
77            DestinationSelector(
78                directory=item.get("directory", ""),
79                match_labels=item.get("matchLabels", {}) or {},
80            )
81            for item in raw
82        ]
83        return selectors

Reads the file in /kratix/metadata/destination-selectors.yaml and returns a list of DestinationSelector

def write_output(self, relative_path: str, content: bytes) -> None:
85    def write_output(self, relative_path: str, content: bytes) -> None:
86        """writes the content to the specifies file at the path
87        /kratix/output/relative_path."""
88        dest = OUTPUT_DIR / relative_path
89        dest.parent.mkdir(parents=True, exist_ok=True)
90        with dest.open("wb") as f:
91            f.write(content)

writes the content to the specifies file at the path /kratix/output/relative_path.

def write_status(self, status: Status) -> None:
93    def write_status(self, status: Status) -> None:
94        """writes the specified status to the /kratix/metadata/status.yaml."""
95        path = METADATA_DIR / "status.yaml"
96        path.parent.mkdir(parents=True, exist_ok=True)
97        with path.open("w") as f:
98            yaml.safe_dump(status.to_dict(), f)

writes the specified status to the /kratix/metadata/status.yaml.

def write_destination_selectors(self, selectors: list[DestinationSelector]) -> None:
100    def write_destination_selectors(self, selectors: list[DestinationSelector]) -> None:
101        """writes the specified Destination Selectors to the
102        /kratix/metadata/destination_selectors.yaml."""
103        path = METADATA_DIR / "destination-selectors.yaml"
104        data = []
105        for s in selectors:
106            data.append(
107                {
108                    "directory": s.directory or "",  # directory is optional
109                    "matchLabels": s.match_labels or {},
110                }
111            )
112        path.parent.mkdir(parents=True, exist_ok=True)
113        with path.open("w") as f:
114            yaml.safe_dump(data, f)

writes the specified Destination Selectors to the /kratix/metadata/destination_selectors.yaml.

def workflow_action(self) -> str:
116    def workflow_action(self) -> str:
117        """Returns the value of KRATIX_WORKFLOW_ACTION environment variable."""
118        return os.getenv("KRATIX_WORKFLOW_ACTION", "")

Returns the value of KRATIX_WORKFLOW_ACTION environment variable.

def workflow_type(self) -> str:
120    def workflow_type(self) -> str:
121        """Returns the value of KRATIX_WORKFLOW_TYPE environment variable."""
122        return os.getenv("KRATIX_WORKFLOW_TYPE", "")

Returns the value of KRATIX_WORKFLOW_TYPE environment variable.

def promise_name(self) -> str:
124    def promise_name(self) -> str:
125        """Returns the value of KRATIX_PROMISE_NAME environment variable."""
126        return os.getenv("KRATIX_PROMISE_NAME", "")

Returns the value of KRATIX_PROMISE_NAME environment variable.

def pipeline_name(self) -> str:
128    def pipeline_name(self) -> str:
129        """Returns the value of KRATIX_PIPELINE_NAME environment variable."""
130        return os.getenv("KRATIX_PIPELINE_NAME", "")

Returns the value of KRATIX_PIPELINE_NAME environment variable.

def publish_status( self, resource: Resource, status: Status) -> None:
132    def publish_status(self, resource: Resource, status: Status) -> None:
133        """Updates the status of a Resource.
134        This function uses the Kubernetes API to patch the status of a Custom Resource.
135        Update is instant and will not change the /kratix/metadata/status.yaml file."""
136        try:
137            k8s_config.load_incluster_config()
138        except Exception:
139            k8s_config.load_kube_config()
140
141        gvk = resource.get_group_version_kind()
142        plural = os.getenv("KRATIX_CRD_PLURAL")
143        if not plural:
144            raise RuntimeError("KRATIX_CRD_PLURAL environment variable is not set")
145
146        namespace = resource.get_namespace()
147        name = resource.get_name()
148
149        body = {"status": status.to_dict()}
150        api = k8s_client.CustomObjectsApi()
151        api.api_client.set_default_header(
152            "Content-Type", "application/merge-patch+json"
153        )
154        api.patch_namespaced_custom_object_status(
155            group=gvk.group,
156            version=gvk.version,
157            namespace=namespace,
158            plural=plural,
159            name=name,
160            body=body,
161        )

Updates the status of a Resource. This function uses the Kubernetes API to patch the status of a Custom Resource. Update is instant and will not change the /kratix/metadata/status.yaml file.

def is_promise_workflow(self) -> bool:
163    def is_promise_workflow(self) -> bool:
164        """Returns true if the workflow is a promise workflow."""
165        return self.workflow_type() == "promise"

Returns true if the workflow is a promise workflow.

def is_resource_workflow(self) -> bool:
167    def is_resource_workflow(self) -> bool:
168        """Returns true if the workflow is a resource workflow."""
169        return self.workflow_type() == "resource"

Returns true if the workflow is a resource workflow.

def is_configure_action(self) -> bool:
171    def is_configure_action(self) -> bool:
172        """Returns true if the workflow is a configure action."""
173        return self.workflow_action() == "configure"

Returns true if the workflow is a configure action.

def is_delete_action(self) -> bool:
175    def is_delete_action(self) -> bool:
176        """Returns true if the workflow is a delete action."""
177        return self.workflow_action() == "delete"

Returns true if the workflow is a delete action.

def set_input_dir(path: pathlib.Path | str) -> None:
31def set_input_dir(path: Path | str) -> None:
32    global INPUT_DIR
33    INPUT_DIR = Path(path)
def set_output_dir(path: pathlib.Path | str) -> None:
36def set_output_dir(path: Path | str) -> None:
37    global OUTPUT_DIR
38    OUTPUT_DIR = Path(path)
def get_input_dir() -> pathlib.Path:
19def get_input_dir() -> Path:
20    return INPUT_DIR
def get_output_dir() -> pathlib.Path:
23def get_output_dir() -> Path:
24    return OUTPUT_DIR
def get_metadata_dir() -> pathlib.Path:
27def get_metadata_dir() -> Path:
28    return METADATA_DIR
def set_metadata_dir(path: pathlib.Path | str) -> None:
41def set_metadata_dir(path: Path | str) -> None:
42    global METADATA_DIR
43    METADATA_DIR = Path(path)
__version__ = '0.4.1'