Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,19 @@ A **task** describes an instance of a component and specifies the input argument
The resulting *graph* of interconnected tasks is called a *pipeline*.
A pipeline can be submitted for *execution*. During the pipeline execution, the pipeline's tasks are executed (in parallel, if possible) and produce output artifacts that are passed to downstream tasks.

## Multi-node runtime helpers

Kubernetes-backed multi-node tasks receive runtime metadata through environment variables such as `TANGLE_MULTI_NODE_NUMBER_OF_NODES`, `TANGLE_MULTI_NODE_NODE_INDEX`, and `TANGLE_MULTI_NODE_NODE_0_ADDRESS`.
Python component images that include this package can use the cooperative barrier helper when all nodes must reach the same phase before node 0 continues:

```python
from cloud_pipelines_backend.runtime import multi_node

multi_node.barrier("training-finished", timeout_seconds=600)
```

The helper coordinates over the task's headless Kubernetes Service. It is a synchronization primitive for nodes in the same task, not an authentication boundary. Component images must include `cloud-pipelines-backend` for the helper import to be available; non-Python containers can use the injected environment variables to implement equivalent coordination.

## Design

This backend consists of the API Server and the Orchestrator.
Expand Down
3 changes: 3 additions & 0 deletions cloud_pipelines_backend/launchers/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ def upload_log(self) -> None:
def stream_log_lines(self) -> typing.Iterator[str]:
raise NotImplementedError()

def cleanup(self) -> None:
pass

def terminate(self) -> None:
raise NotImplementedError()

Expand Down
138 changes: 132 additions & 6 deletions cloud_pipelines_backend/launchers/kubernetes_launchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import os
import pathlib
import secrets
import typing
from typing import Any, Optional

Expand All @@ -19,6 +20,7 @@
)
from cloud_pipelines.orchestration.storage_providers import local_storage
from .. import component_structures as structures
from ..runtime import multi_node as multi_node_runtime
from . import common_annotations
from . import container_component_utils
from . import interfaces
Expand Down Expand Up @@ -73,6 +75,8 @@
_T = typing.TypeVar("_T")

_CONTAINER_FILE_NAME = "data"
_REDACTED_VALUE = "[REDACTED]"
_SENSITIVE_ENV_VAR_NAME_PARTS = ("PASSWORD", "SECRET", "TOKEN")


def _create_volume_and_volume_mount_host_path(
Expand Down Expand Up @@ -146,6 +150,25 @@ def _create_volume_and_volume_mount_google_cloud_storage(
)


def _delete_namespaced_service_if_exists(
*,
core_api_client: k8s_client_lib.CoreV1Api,
name: str,
namespace: str,
request_timeout: int | tuple[int, int],
) -> None:
try:
core_api_client.delete_namespaced_service(
name=name,
namespace=namespace,
_request_timeout=request_timeout,
)
except kubernetes.client.exceptions.ApiException as ex:
if ex.status == 404:
return
raise


class PodPostProcessor(typing.Protocol):
def __call__(
self, *, pod: k8s_client_lib.V1Pod, annotations: dict[str, str] | None = None
Expand Down Expand Up @@ -1073,6 +1096,7 @@ def launch_container_task(
else:
node_0_address = "localhost"
all_node_addresses_str = node_0_address
multi_node_barrier_token = secrets.token_hex(16)

# Resolving the dynamic data arguments
known_dynamic_data_values = {
Expand Down Expand Up @@ -1127,6 +1151,20 @@ def launch_container_task(
if enable_multi_node:
main_container_spec = pod.spec.containers[0]
main_container_spec.env = main_container_spec.env or []
runtime_env_names = {
_MULTI_NODE_NODE_INDEX_ENV_VAR_NAME,
multi_node_runtime.NUMBER_OF_NODES_ENV_VAR_NAME,
multi_node_runtime.NODE_INDEX_ENV_VAR_NAME,
multi_node_runtime.NODE_0_ADDRESS_ENV_VAR_NAME,
multi_node_runtime.ALL_NODE_ADDRESSES_ENV_VAR_NAME,
multi_node_runtime.BARRIER_PORT_ENV_VAR_NAME,
multi_node_runtime.BARRIER_TOKEN_ENV_VAR_NAME,
}
main_container_spec.env = [
env_var
for env_var in main_container_spec.env
if env_var.name not in runtime_env_names
]

# We need to insert this env variable at the start on the list since subsequent variables can depend on it.
main_container_spec.env.insert(
Expand All @@ -1143,6 +1181,34 @@ def launch_container_task(
),
),
)
main_container_spec.env.extend(
[
k8s_client_lib.V1EnvVar(
name=multi_node_runtime.NUMBER_OF_NODES_ENV_VAR_NAME,
value=str(num_nodes),
),
k8s_client_lib.V1EnvVar(
name=multi_node_runtime.NODE_INDEX_ENV_VAR_NAME,
value=f"$({_MULTI_NODE_NODE_INDEX_ENV_VAR_NAME})",
),
k8s_client_lib.V1EnvVar(
name=multi_node_runtime.NODE_0_ADDRESS_ENV_VAR_NAME,
value=node_0_address,
),
k8s_client_lib.V1EnvVar(
name=multi_node_runtime.ALL_NODE_ADDRESSES_ENV_VAR_NAME,
value=all_node_addresses_str,
),
k8s_client_lib.V1EnvVar(
name=multi_node_runtime.BARRIER_PORT_ENV_VAR_NAME,
value=str(multi_node_runtime.DEFAULT_BARRIER_PORT),
),
k8s_client_lib.V1EnvVar(
name=multi_node_runtime.BARRIER_TOKEN_ENV_VAR_NAME,
value=multi_node_barrier_token,
),
]
)
# Handling cross-pod communication.
# Creating headless Kubernetes Service to give all pods in the job a stable DNS name to communicate with each other.
service = k8s_client_lib.V1Service(
Expand All @@ -1156,6 +1222,14 @@ def launch_container_task(
selector={
"job-name": explicit_job_name,
},
ports=[
k8s_client_lib.V1ServicePort(
name="barrier",
port=multi_node_runtime.DEFAULT_BARRIER_PORT,
target_port=multi_node_runtime.DEFAULT_BARRIER_PORT,
)
],
publish_not_ready_addresses=True,
),
)
core_api_client = k8s_client_lib.CoreV1Api(api_client=self._api_client)
Expand Down Expand Up @@ -1207,6 +1281,20 @@ def launch_container_task(
_request_timeout=self._request_timeout,
)
except Exception as ex:
if enable_multi_node:
try:
_delete_namespaced_service_if_exists(
core_api_client=core_api_client,
name=explicit_service_name,
namespace=namespace,
request_timeout=self._request_timeout,
)
except Exception:
_logger.warning(
"Failed to delete Kubernetes Service %s after Job creation failed.",
explicit_service_name,
exc_info=True,
)
raise interfaces.LauncherError(
f"Failed to create Kubernetes Job: {_kubernetes_serialize(job)}"
) from ex
Expand Down Expand Up @@ -1584,15 +1672,30 @@ def __str__(self) -> str:

return pprint.pformat(self.to_dict())

def terminate(self):
def cleanup(self) -> None:
launcher = self._get_launcher()
batch_api_client = k8s_client_lib.BatchV1Api(api_client=launcher._api_client)
batch_api_client.delete_namespaced_job(
core_api_client = k8s_client_lib.CoreV1Api(api_client=launcher._api_client)
_delete_namespaced_service_if_exists(
core_api_client=core_api_client,
name=self._job_name,
namespace=self._namespace,
grace_period_seconds=10,
propagation_policy="Foreground",
request_timeout=launcher._request_timeout,
)

def terminate(self):
launcher = self._get_launcher()
batch_api_client = k8s_client_lib.BatchV1Api(api_client=launcher._api_client)
try:
batch_api_client.delete_namespaced_job(
name=self._job_name,
namespace=self._namespace,
grace_period_seconds=10,
propagation_policy="Foreground",
)
except kubernetes.client.exceptions.ApiException as ex:
if ex.status != 404:
raise
self.cleanup()
_logger.info(f"Terminated job {self._job_name} in namespace {self._namespace}")


Expand Down Expand Up @@ -1779,7 +1882,30 @@ def windows_path_to_docker_path(path: str) -> str:

def _kubernetes_serialize(obj) -> dict[str, Any]:
shallow_client = k8s_client_lib.ApiClient.__new__(k8s_client_lib.ApiClient)
return shallow_client.sanitize_for_serialization(obj)
serialized = shallow_client.sanitize_for_serialization(obj)
_redact_sensitive_env_values(serialized)
return serialized


def _redact_sensitive_env_values(value: Any) -> None:
if isinstance(value, dict):
env_name = value.get("name")
if (
isinstance(env_name, str)
and "value" in value
and _is_sensitive_env_var_name(env_name)
):
value["value"] = _REDACTED_VALUE
for child_value in value.values():
_redact_sensitive_env_values(child_value)
elif isinstance(value, list):
for item in value:
_redact_sensitive_env_values(item)


def _is_sensitive_env_var_name(name: str) -> bool:
upper_name = name.upper()
return any(part in upper_name for part in _SENSITIVE_ENV_VAR_NAME_PARTS)


def _kubernetes_deserialize(obj_dict: dict[str, Any], cls: typing.Type[_T]) -> _T:
Expand Down
14 changes: 14 additions & 0 deletions cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,11 @@ def internal_process_one_running_execution(
)
session.rollback()
container_execution.updated_at = current_time
if new_status in (
launcher_interfaces.ContainerStatus.SUCCEEDED,
launcher_interfaces.ContainerStatus.FAILED,
):
_cleanup_launched_container(reloaded_launched_container)
execution_nodes = container_execution.execution_nodes
if not execution_nodes:
raise OrchestratorError(
Expand Down Expand Up @@ -1065,6 +1070,15 @@ def _retry(
raise


def _cleanup_launched_container(
launched_container: launcher_interfaces.LaunchedContainer,
) -> None:
try:
_retry(launched_container.cleanup)
except Exception:
_logger.exception("Error during `LaunchedContainer.cleanup` call.")


def record_system_error_exception(execution: bts.ExecutionNode, exception: Exception):
app_metrics.execution_system_errors.add(1)
bugsnag_instrumentation.notify(
Expand Down
5 changes: 5 additions & 0 deletions cloud_pipelines_backend/runtime/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Runtime helpers for Tangle component containers."""

from . import multi_node

__all__ = ["multi_node"]
Loading