Skip to content

Commit

Permalink
feat(k8s): support custom namespaces for dask resources (#621)
Browse files Browse the repository at this point in the history
Closes #619
  • Loading branch information
Alputer committed Jan 16, 2025
1 parent 907459b commit ea89a32
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 14 deletions.
4 changes: 2 additions & 2 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def _delete_dask_cluster(workflow: Workflow) -> None:
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace="default",
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
name=f"reana-run-dask-{workflow.id_}",
)

Expand All @@ -331,7 +331,7 @@ def _delete_dask_cluster(workflow: Workflow) -> None:
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace="default",
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
)

Expand Down
9 changes: 4 additions & 5 deletions reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
KRB5_STATUS_FILE_LOCATION,
REANA_JOB_HOSTPATH_MOUNTS,
WORKFLOW_RUNTIME_USER_UID,
REANA_RUNTIME_KUBERNETES_NAMESPACE,
)
from reana_commons.k8s.api_client import (
current_k8s_custom_objects_api_client,
Expand Down Expand Up @@ -68,9 +69,7 @@ def __init__(
self.cluster_spec = workflow_spec.get("resources", {}).get("dask", [])
self.cluster_body = self._load_dask_cluster_template()
self.cluster_image = self.cluster_spec["image"]
self.dask_scheduler_uri = (
f"{self.cluster_name}-scheduler.default.svc.cluster.local:8786"
)
self.dask_scheduler_uri = f"{self.cluster_name}-scheduler.{REANA_RUNTIME_KUBERNETES_NAMESPACE}.svc.cluster.local:8786"

self.secrets_store = UserSecretsStore.fetch(self.user_id)
self.secret_env_vars = self.secrets_store.get_env_secrets_as_k8s_spec()
Expand Down Expand Up @@ -486,7 +485,7 @@ def _create_dask_cluster(self):
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace="default",
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
body=self.cluster_body,
)
except Exception:
Expand All @@ -502,7 +501,7 @@ def _create_dask_autoscaler(self):
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace="default",
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
body=self.autoscaler_body,
)
except Exception:
Expand Down
17 changes: 11 additions & 6 deletions reana_workflow_controller/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,10 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):
middleware_spec = {
"apiVersion": "traefik.io/v1alpha1",
"kind": "Middleware",
"metadata": {"name": f"replacepath-{workflow_id}", "namespace": "default"},
"metadata": {
"name": f"replacepath-{workflow_id}",
"namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE,
},
"spec": {
"replacePathRegex": {
"regex": f"/{workflow_id}/dashboard/*",
Expand All @@ -426,7 +429,7 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):
name=f"dask-dashboard-ingress-{cluster_name}",
annotations={
**REANA_INGRESS_ANNOTATIONS,
"traefik.ingress.kubernetes.io/router.middlewares": f"default-replacepath-{workflow_id}@kubernetescrd",
"traefik.ingress.kubernetes.io/router.middlewares": f"{REANA_RUNTIME_KUBERNETES_NAMESPACE}-replacepath-{workflow_id}@kubernetescrd",
},
),
spec=client.V1IngressSpec(
Expand Down Expand Up @@ -458,25 +461,27 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):
current_k8s_custom_objects_api_client.create_namespaced_custom_object(
group="traefik.io",
version="v1alpha1",
namespace="default",
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
plural="middlewares",
body=middleware_spec,
)
# Create the ingress resource
current_k8s_networking_api_client.create_namespaced_ingress(
namespace="default", body=ingress
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, body=ingress
)


def delete_dask_dashboard_ingress(cluster_name, workflow_id):
"""Delete K8S Ingress Object for Dask dashboard."""
current_k8s_networking_api_client.delete_namespaced_ingress(
name=cluster_name, namespace="default", body=client.V1DeleteOptions()
name=cluster_name,
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
body=client.V1DeleteOptions(),
)
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="traefik.io",
version="v1alpha1",
namespace="default",
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
plural="middlewares",
name=f"replacepath-{workflow_id}",
)
Expand Down
2 changes: 1 addition & 1 deletion reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ def _create_job_spec(
job_controller_container.env.append(
{
"name": "DASK_SCHEDULER_URI",
"value": f"reana-run-dask-{self.workflow.id_}-scheduler.default.svc.cluster.local:8786",
"value": f"reana-run-dask-{self.workflow.id_}-scheduler.{REANA_RUNTIME_KUBERNETES_NAMESPACE}.svc.cluster.local:8786",
},
)

Expand Down

0 comments on commit ea89a32

Please sign in to comment.