kratix_sdk
1from .status import Status 2from .resource import Resource 3from .kratix_sdk import ( 4 KratixSDK, 5 set_input_dir, 6 set_output_dir, 7 get_input_dir, 8 get_output_dir, 9 get_metadata_dir, 10 set_metadata_dir, 11) 12from .promise import Promise 13from .types import GroupVersionKind, DestinationSelector 14 15__all__ = [ 16 "Status", 17 "Resource", 18 "Promise", 19 "GroupVersionKind", 20 "DestinationSelector", 21 "KratixSDK", 22 "set_input_dir", 23 "set_output_dir", 24 "get_input_dir", 25 "get_output_dir", 26 "get_metadata_dir", 27 "set_metadata_dir", 28]
6class Status: 7 def __init__(self, data: Optional[Dict[str, Any]] = None): 8 self.data: Dict[str, Any] = data or {} 9 10 def get(self, path: str) -> Any: 11 """Retrieves the value at the specified path in Status.""" 12 return _get_by_path(self.data, path) 13 14 def set(self, path: str, value: Any) -> None: 15 """Sets the value at the specified path in Status.""" 16 _set_by_path(self.data, path, value) 17 18 def remove(self, path: str) -> None: 19 """Removes the value at the specified path in Status. 20 If the path does not exist, it retursn an error.""" 21 _remove_by_path(self.data, path) 22 23 def to_dict(self) -> Dict[str, Any]: 24 return self.data
10 def get(self, path: str) -> Any: 11 """Retrieves the value at the specified path in Status.""" 12 return _get_by_path(self.data, path)
Retrieves the value at the specified path in Status.
14 def set(self, path: str, value: Any) -> None: 15 """Sets the value at the specified path in Status.""" 16 _set_by_path(self.data, path, value)
Sets the value at the specified path in Status.
18 def remove(self, path: str) -> None: 19 """Removes the value at the specified path in Status. 20 If the path does not exist, it retursn an error.""" 21 _remove_by_path(self.data, path)
Removes the value at the specified path in Status. If the path does not exist, it retursn an error.
8class Resource: 9 def __init__(self, data: Dict[str, Any]): 10 self.data = data 11 12 def get_value(self, path: str, **kwargs) -> Any: 13 """Get a value from the resource request by path. 14 Args: 15 path (str): The path to the value in the resource data. 16 17 KWargs: 18 default: The default value to return if the path is not found. 19 20 Raises: 21 KeyError: If the path is not found and no default is provided. 22 23 Returns: 24 Any: The value at the specified path in the resource data. 25 """ 26 return _get_by_path(self.data, path, **kwargs) 27 28 def get_status(self, path: str = "") -> Status: 29 """Get the status of the resource by path. 30 If path is empty, return the entire status. 31 If path is provided, return the value at that path.""" 32 status_data = self.data.get("status", {}) 33 if path: 34 value = _get_by_path(status_data, path) 35 return Status(value if isinstance(value, dict) else {"value": value}) 36 return Status(status_data) 37 38 def get_name(self) -> str: 39 """Get the name of the resource.""" 40 return self.data.get("metadata", {}).get("name", "") 41 42 def get_namespace(self) -> str: 43 """Get the namespace of the resource.""" 44 return self.data.get("metadata", {}).get("namespace", "") 45 46 def get_group_version_kind(self) -> GroupVersionKind: 47 """Get the GroupVersionKind of the resource.""" 48 api_version = self.data.get("apiVersion", "") 49 if "/" in api_version: 50 group, version = api_version.split("/", 1) 51 else: 52 group, version = "", api_version 53 kind = self.data.get("kind", "") 54 return GroupVersionKind(group=group, version=version, kind=kind) 55 56 def get_labels(self) -> Dict[str, str]: 57 """Get the labels of the resource.""" 58 return self.data.get("metadata", {}).get("labels", {}) 59 60 def get_annotations(self) -> Dict[str, str]: 61 """Get the annotations of the resource.""" 62 return self.data.get("metadata", {}).get("annotations", {})
12 def get_value(self, path: str, **kwargs) -> Any: 13 """Get a value from the resource request by path. 14 Args: 15 path (str): The path to the value in the resource data. 16 17 KWargs: 18 default: The default value to return if the path is not found. 19 20 Raises: 21 KeyError: If the path is not found and no default is provided. 22 23 Returns: 24 Any: The value at the specified path in the resource data. 25 """ 26 return _get_by_path(self.data, path, **kwargs)
Get a value from the resource request by path. Args: path (str): The path to the value in the resource data.
KWargs: default: The default value to return if the path is not found.
Raises: KeyError: If the path is not found and no default is provided.
Returns: Any: The value at the specified path in the resource data.
28 def get_status(self, path: str = "") -> Status: 29 """Get the status of the resource by path. 30 If path is empty, return the entire status. 31 If path is provided, return the value at that path.""" 32 status_data = self.data.get("status", {}) 33 if path: 34 value = _get_by_path(status_data, path) 35 return Status(value if isinstance(value, dict) else {"value": value}) 36 return Status(status_data)
Get the status of the resource by path. If path is empty, return the entire status. If path is provided, return the value at that path.
38 def get_name(self) -> str: 39 """Get the name of the resource.""" 40 return self.data.get("metadata", {}).get("name", "")
Get the name of the resource.
42 def get_namespace(self) -> str: 43 """Get the namespace of the resource.""" 44 return self.data.get("metadata", {}).get("namespace", "")
Get the namespace of the resource.
46 def get_group_version_kind(self) -> GroupVersionKind: 47 """Get the GroupVersionKind of the resource.""" 48 api_version = self.data.get("apiVersion", "") 49 if "/" in api_version: 50 group, version = api_version.split("/", 1) 51 else: 52 group, version = "", api_version 53 kind = self.data.get("kind", "") 54 return GroupVersionKind(group=group, version=version, kind=kind)
Get the GroupVersionKind of the resource.
5class Promise: 6 def __init__(self, data: Optional[Dict[str, Any]] = None): 7 self.data: Dict[str, Any] = data or {} 8 9 def get_name(self) -> str: 10 """Get the name of the promise.""" 11 return self.data.get("metadata", {}).get("name", "") 12 13 def get_labels(self) -> Dict[str, str]: 14 """Get the labels of the promise.""" 15 return self.data.get("metadata", {}).get("labels", {}) or {} 16 17 def get_annotations(self) -> Dict[str, str]: 18 """Get the annotations of the promise.""" 19 return self.data.get("metadata", {}).get("annotations", {}) or {}
9 def get_name(self) -> str: 10 """Get the name of the promise.""" 11 return self.data.get("metadata", {}).get("name", "")
Get the name of the promise.
44class KratixSDK: 45 def read_resource_input(self) -> Resource: 46 """Reads the file in /kratix/input/object.yaml and returns a Resource. 47 Can be used in Resource configure workflow.""" 48 path = INPUT_DIR / "object.yaml" 49 with path.open() as f: 50 data = yaml.safe_load(f) or {} 51 return Resource(data) 52 53 def read_promise_input(self) -> Promise: 54 """Reads the file in /kratix/input/object.yaml and returns a Promise. 55 Can be used in Promise configure workflow.""" 56 path = INPUT_DIR / "object.yaml" 57 with path.open() as f: 58 data = yaml.safe_load(f) or {} 59 return Promise(data) 60 61 def read_status(self) -> Status: 62 """Reads the file in /kratix/metadata/status.yaml and returns a Status.""" 63 path = METADATA_DIR / "status.yaml" 64 with path.open() as f: 65 data = yaml.safe_load(f) or {} 66 return Status(data) 67 68 def read_destination_selectors(self) -> List[DestinationSelector]: 69 """Reads the file in /kratix/metadata/destination-selectors.yaml and returns a list of DestinationSelector""" 70 path = METADATA_DIR / "destination-selectors.yaml" 71 with path.open() as f: 72 raw = yaml.safe_load(f) or [] 73 selectors = [ 74 DestinationSelector( 75 directory=item.get("directory", ""), 76 match_labels=item.get("matchLabels", {}) or {}, 77 ) 78 for item in raw 79 ] 80 return selectors 81 82 def write_output(self, relative_path: str, content: bytes) -> None: 83 """writes the content to the specifies file at the path /kratix/output/relative_path.""" 84 dest = OUTPUT_DIR / relative_path 85 dest.parent.mkdir(parents=True, exist_ok=True) 86 with dest.open("wb") as f: 87 f.write(content) 88 89 def write_status(self, status: Status) -> None: 90 """writes the specified status to the /kratix/metadata/status.yaml.""" 91 path = METADATA_DIR / "status.yaml" 92 path.parent.mkdir(parents=True, exist_ok=True) 93 with path.open("w") as f: 94 yaml.safe_dump(status.to_dict(), f) 95 96 def write_destination_selectors(self, selectors: List[DestinationSelector]) -> None: 97 """writes the specified Destination Selectors to the /kratix/metadata/destination_selectors.yaml.""" 98 path = METADATA_DIR / "destination-selectors.yaml" 99 data = [] 100 for s in selectors: 101 data.append( 102 { 103 "directory": s.directory or "", # directory is optional 104 "matchLabels": s.match_labels or {}, 105 } 106 ) 107 path.parent.mkdir(parents=True, exist_ok=True) 108 with path.open("w") as f: 109 yaml.safe_dump(data, f) 110 111 def workflow_action(self) -> str: 112 """Returns the value of KRATIX_WORKFLOW_ACTION environment variable.""" 113 return os.getenv("KRATIX_WORKFLOW_ACTION", "") 114 115 def workflow_type(self) -> str: 116 """Returns the value of KRATIX_WORKFLOW_TYPE environment variable.""" 117 return os.getenv("KRATIX_WORKFLOW_TYPE", "") 118 119 def promise_name(self) -> str: 120 """Returns the value of KRATIX_PROMISE_NAME environment variable.""" 121 return os.getenv("KRATIX_PROMISE_NAME", "") 122 123 def pipeline_name(self) -> str: 124 """Returns the value of KRATIX_PIPELINE_NAME environment variable.""" 125 return os.getenv("KRATIX_PIPELINE_NAME", "") 126 127 def publish_status(self, resource: Resource, status: Status) -> None: 128 """Updates the status of a Resource. 129 This function uses the Kubernetes API to patch the status of a Custom Resource. 130 Update is instant and will not change the /kratix/metadata/status.yaml file.""" 131 try: 132 k8s_config.load_incluster_config() 133 except Exception: 134 k8s_config.load_kube_config() 135 136 gvk = resource.get_group_version_kind() 137 plural = os.getenv("KRATIX_CRD_PLURAL") 138 if not plural: 139 raise RuntimeError("KRATIX_CRD_PLURAL environment variable is not set") 140 141 namespace = resource.get_namespace() 142 name = resource.get_name() 143 144 body = {"status": status.to_dict()} 145 api = k8s_client.CustomObjectsApi() 146 api.api_client.set_default_header( 147 "Content-Type", "application/merge-patch+json" 148 ) 149 api.patch_namespaced_custom_object_status( 150 group=gvk.group, 151 version=gvk.version, 152 namespace=namespace, 153 plural=plural, 154 name=name, 155 body=body, 156 )
45 def read_resource_input(self) -> Resource: 46 """Reads the file in /kratix/input/object.yaml and returns a Resource. 47 Can be used in Resource configure workflow.""" 48 path = INPUT_DIR / "object.yaml" 49 with path.open() as f: 50 data = yaml.safe_load(f) or {} 51 return Resource(data)
Reads the file in /kratix/input/object.yaml and returns a Resource. Can be used in Resource configure workflow.
53 def read_promise_input(self) -> Promise: 54 """Reads the file in /kratix/input/object.yaml and returns a Promise. 55 Can be used in Promise configure workflow.""" 56 path = INPUT_DIR / "object.yaml" 57 with path.open() as f: 58 data = yaml.safe_load(f) or {} 59 return Promise(data)
Reads the file in /kratix/input/object.yaml and returns a Promise. Can be used in Promise configure workflow.
61 def read_status(self) -> Status: 62 """Reads the file in /kratix/metadata/status.yaml and returns a Status.""" 63 path = METADATA_DIR / "status.yaml" 64 with path.open() as f: 65 data = yaml.safe_load(f) or {} 66 return Status(data)
Reads the file in /kratix/metadata/status.yaml and returns a Status.
68 def read_destination_selectors(self) -> List[DestinationSelector]: 69 """Reads the file in /kratix/metadata/destination-selectors.yaml and returns a list of DestinationSelector""" 70 path = METADATA_DIR / "destination-selectors.yaml" 71 with path.open() as f: 72 raw = yaml.safe_load(f) or [] 73 selectors = [ 74 DestinationSelector( 75 directory=item.get("directory", ""), 76 match_labels=item.get("matchLabels", {}) or {}, 77 ) 78 for item in raw 79 ] 80 return selectors
Reads the file in /kratix/metadata/destination-selectors.yaml and returns a list of DestinationSelector
82 def write_output(self, relative_path: str, content: bytes) -> None: 83 """writes the content to the specifies file at the path /kratix/output/relative_path.""" 84 dest = OUTPUT_DIR / relative_path 85 dest.parent.mkdir(parents=True, exist_ok=True) 86 with dest.open("wb") as f: 87 f.write(content)
writes the content to the specifies file at the path /kratix/output/relative_path.
89 def write_status(self, status: Status) -> None: 90 """writes the specified status to the /kratix/metadata/status.yaml.""" 91 path = METADATA_DIR / "status.yaml" 92 path.parent.mkdir(parents=True, exist_ok=True) 93 with path.open("w") as f: 94 yaml.safe_dump(status.to_dict(), f)
writes the specified status to the /kratix/metadata/status.yaml.
96 def write_destination_selectors(self, selectors: List[DestinationSelector]) -> None: 97 """writes the specified Destination Selectors to the /kratix/metadata/destination_selectors.yaml.""" 98 path = METADATA_DIR / "destination-selectors.yaml" 99 data = [] 100 for s in selectors: 101 data.append( 102 { 103 "directory": s.directory or "", # directory is optional 104 "matchLabels": s.match_labels or {}, 105 } 106 ) 107 path.parent.mkdir(parents=True, exist_ok=True) 108 with path.open("w") as f: 109 yaml.safe_dump(data, f)
writes the specified Destination Selectors to the /kratix/metadata/destination_selectors.yaml.
111 def workflow_action(self) -> str: 112 """Returns the value of KRATIX_WORKFLOW_ACTION environment variable.""" 113 return os.getenv("KRATIX_WORKFLOW_ACTION", "")
Returns the value of KRATIX_WORKFLOW_ACTION environment variable.
115 def workflow_type(self) -> str: 116 """Returns the value of KRATIX_WORKFLOW_TYPE environment variable.""" 117 return os.getenv("KRATIX_WORKFLOW_TYPE", "")
Returns the value of KRATIX_WORKFLOW_TYPE environment variable.
119 def promise_name(self) -> str: 120 """Returns the value of KRATIX_PROMISE_NAME environment variable.""" 121 return os.getenv("KRATIX_PROMISE_NAME", "")
Returns the value of KRATIX_PROMISE_NAME environment variable.
123 def pipeline_name(self) -> str: 124 """Returns the value of KRATIX_PIPELINE_NAME environment variable.""" 125 return os.getenv("KRATIX_PIPELINE_NAME", "")
Returns the value of KRATIX_PIPELINE_NAME environment variable.
127 def publish_status(self, resource: Resource, status: Status) -> None: 128 """Updates the status of a Resource. 129 This function uses the Kubernetes API to patch the status of a Custom Resource. 130 Update is instant and will not change the /kratix/metadata/status.yaml file.""" 131 try: 132 k8s_config.load_incluster_config() 133 except Exception: 134 k8s_config.load_kube_config() 135 136 gvk = resource.get_group_version_kind() 137 plural = os.getenv("KRATIX_CRD_PLURAL") 138 if not plural: 139 raise RuntimeError("KRATIX_CRD_PLURAL environment variable is not set") 140 141 namespace = resource.get_namespace() 142 name = resource.get_name() 143 144 body = {"status": status.to_dict()} 145 api = k8s_client.CustomObjectsApi() 146 api.api_client.set_default_header( 147 "Content-Type", "application/merge-patch+json" 148 ) 149 api.patch_namespaced_custom_object_status( 150 group=gvk.group, 151 version=gvk.version, 152 namespace=namespace, 153 plural=plural, 154 name=name, 155 body=body, 156 )
Updates the status of a Resource. This function uses the Kubernetes API to patch the status of a Custom Resource. Update is instant and will not change the /kratix/metadata/status.yaml file.