diff --git a/.buildkite/setup-env.sh b/.buildkite/setup-env.sh index 479bbcfea66..3480034d99f 100755 --- a/.buildkite/setup-env.sh +++ b/.buildkite/setup-env.sh @@ -27,20 +27,5 @@ mv linux-amd64/helm /usr/local/bin/helm helm repo add kuberay https://ray-project.github.io/kuberay-helm/ helm repo update -# Install python 3.11 and pip -apt-get update -apt-get install -y python3.11 python3.11-venv -python3 -m venv .venv - -# Activate the virtual environment and then execute the subsequent commands -# within the same sub-shell. -( - # shellcheck disable=SC1091 # Ignore: activate script is created by venv - source .venv/bin/activate - - # Install requirements - pip install -r tests/framework/config/requirements.txt -) - # Bypass Git's ownership check due to unconventional user IDs in Docker containers git config --global --add safe.directory /workdir diff --git a/.buildkite/test-e2e.yml b/.buildkite/test-e2e.yml index 6a8d32efa08..e17f18857b4 100644 --- a/.buildkite/test-e2e.yml +++ b/.buildkite/test-e2e.yml @@ -3,7 +3,7 @@ image: golang:1.22 commands: - source .buildkite/setup-env.sh - - kind create cluster --wait 900s --config ./tests/framework/config/kind-config-buildkite.yml + - kind create cluster --wait 900s --config ./ci/kind-config-buildkite.yml - kubectl config set clusters.kind-kind.server https://docker:6443 # Build nightly KubeRay operator image - pushd ray-operator @@ -22,7 +22,7 @@ image: golang:1.22 commands: - source .buildkite/setup-env.sh - - kind create cluster --wait 900s --config ./tests/framework/config/kind-config-buildkite.yml + - kind create cluster --wait 900s --config ./ci/kind-config-buildkite.yml - kubectl config set clusters.kind-kind.server https://docker:6443 # Build nightly KubeRay operator image - pushd ray-operator @@ -41,7 +41,7 @@ image: golang:1.22 commands: - source .buildkite/setup-env.sh - - kind create cluster --wait 900s --config ./tests/framework/config/kind-config-buildkite.yml + - kind create cluster --wait 900s --config ./ci/kind-config-buildkite.yml - kubectl config set clusters.kind-kind.server https://docker:6443 # Build nightly KubeRay operator image - pushd ray-operator @@ -60,7 +60,7 @@ image: golang:1.22 commands: - source .buildkite/setup-env.sh - - kind create cluster --wait 900s --config ./tests/framework/config/kind-config-buildkite.yml + - kind create cluster --wait 900s --config ./ci/kind-config-buildkite.yml - kubectl config set clusters.kind-kind.server https://docker:6443 # Deploy previous KubeRay operator release (v1.2.2) using helm - echo Deploying KubeRay operator diff --git a/.buildkite/test-kubectl-plugin-e2e.yml b/.buildkite/test-kubectl-plugin-e2e.yml index 5f009642523..19a02868090 100644 --- a/.buildkite/test-kubectl-plugin-e2e.yml +++ b/.buildkite/test-kubectl-plugin-e2e.yml @@ -3,7 +3,7 @@ image: golang:1.22 commands: - source .buildkite/setup-env.sh - - kind create cluster --wait 900s --config ./tests/framework/config/kind-config-buildkite.yml + - kind create cluster --wait 900s --config ./ci/kind-config-buildkite.yml - kubectl config set clusters.kind-kind.server https://docker:6443 # Deploy nightly KubeRay operator - echo Deploying Kuberay operator diff --git a/.buildkite/test-sample-yamls.yml b/.buildkite/test-sample-yamls.yml index 0044d384c24..a9c87710f3d 100644 --- a/.buildkite/test-sample-yamls.yml +++ b/.buildkite/test-sample-yamls.yml @@ -3,7 +3,7 @@ image: golang:1.22 commands: - source .buildkite/setup-env.sh - - kind create cluster --wait 900s --config ./tests/framework/config/kind-config-buildkite.yml + - kind create cluster --wait 900s --config ./ci/kind-config-buildkite.yml - kubectl config set clusters.kind-kind.server https://docker:6443 # Build nightly KubeRay operator image - pushd ray-operator @@ -24,7 +24,7 @@ image: golang:1.22 commands: - source .buildkite/setup-env.sh - - kind create cluster --wait 900s --config ./tests/framework/config/kind-config-buildkite.yml + - kind create cluster --wait 900s --config ./ci/kind-config-buildkite.yml - kubectl config set clusters.kind-kind.server https://docker:6443 # Deploy KubeRay operator - pushd ray-operator diff --git a/ci/markdownlint.yaml b/.markdownlint.yaml similarity index 73% rename from ci/markdownlint.yaml rename to .markdownlint.yaml index e18d45dea1e..5568df61f71 100644 --- a/ci/markdownlint.yaml +++ b/.markdownlint.yaml @@ -1,5 +1,3 @@ ---- -# Default state for all rules default: true MD013: false diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3d05e4a1791..4665260e505 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -69,9 +69,5 @@ repos: - repo: https://github.com/igorshubovych/markdownlint-cli rev: v0.44.0 hooks: - - id: markdownlint + - id: markdownlint-fix name: Markdown linting - args: - - --config - - ci/markdownlint.yaml - - --fix diff --git a/tests/framework/config/kind-config-buildkite.yml b/ci/kind-config-buildkite.yml similarity index 100% rename from tests/framework/config/kind-config-buildkite.yml rename to ci/kind-config-buildkite.yml diff --git a/tests/framework/config/kind-config.yaml b/tests/framework/config/kind-config.yaml deleted file mode 100644 index f64b87e8e29..00000000000 --- a/tests/framework/config/kind-config.yaml +++ /dev/null @@ -1,4 +0,0 @@ -kind: Cluster -apiVersion: kind.x-k8s.io/v1alpha4 -nodes: - - role: control-plane diff --git a/tests/framework/config/requirements.txt b/tests/framework/config/requirements.txt deleted file mode 100644 index 50724ab0a3f..00000000000 --- a/tests/framework/config/requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -docker -GitPython -kubernetes -jsonpatch -pytest diff --git a/tests/framework/prototype.py b/tests/framework/prototype.py deleted file mode 100644 index 61c375203d0..00000000000 --- a/tests/framework/prototype.py +++ /dev/null @@ -1,593 +0,0 @@ -"""Configuration test framework for KubeRay""" -import json -import time -import unittest -from typing import Dict, List, Optional -import jsonpatch - -from framework.utils import ( - create_custom_object, - delete_custom_object, - get_custom_object, - get_pod, - get_head_pod, - start_curl_pod, - logger, - pod_exec_command, - shell_subprocess_run, - shell_subprocess_check_output, - CONST, - K8S_CLUSTER_MANAGER, - OperatorManager -) - -# Utility functions -def search_path(yaml_object, steps, default_value = None): - """ - Search the position in `yaml_object` based on steps. The following example uses - `search_path` to get the name of the first container in the head pod. If the field does - not exist, return `default_value`. - - [Example] - name = search_path(cr, "spec.headGroupSpec.template.spec.containers.0.name".split('.')) - """ - curr = yaml_object - for step in steps: - if step.isnumeric(): - int_step = int(step) - if int_step >= len(curr) or int_step < 0: - return default_value - curr = curr[int(step)] - elif step in curr: - curr = curr[step] - else: - return default_value - return curr - -def check_pod_running(pods) -> bool: - """"Check whether all of the pods are in running state""" - for pod in pods: - if pod.status.phase != 'Running': - return False - for container in pod.status.container_statuses: - if not container.ready: - return False - return True - -def get_expected_head_pods(custom_resource): - """Get the number of head pods in custom_resource""" - resource_kind = custom_resource["kind"] - head_replica_paths = { - CONST.RAY_CLUSTER_CRD: "spec.headGroupSpec.replicas", - CONST.RAY_SERVICE_CRD: "spec.rayClusterConfig.headGroupSpec.replicas", - CONST.RAY_JOB_CRD: "spec.rayClusterSpec.headGroupSpec.replicas" - } - if resource_kind in head_replica_paths: - path = head_replica_paths[resource_kind] - return search_path(custom_resource, path.split('.'), default_value=1) - raise Exception(f"Unknown resource kind: {resource_kind} in get_expected_head_pods()") - -def get_expected_worker_pods(custom_resource): - """Get the number of head pods in custom_resource""" - resource_kind = custom_resource["kind"] - worker_specs_paths = { - CONST.RAY_CLUSTER_CRD: "spec.workerGroupSpecs", - CONST.RAY_SERVICE_CRD: "spec.rayClusterConfig.workerGroupSpecs", - CONST.RAY_JOB_CRD: "spec.rayClusterSpec.workerGroupSpecs" - } - if resource_kind in worker_specs_paths: - path = worker_specs_paths[resource_kind] - worker_group_specs = search_path(custom_resource, path.split('.'), default_value=[]) - expected_worker_pods = 0 - for spec in worker_group_specs: - expected_worker_pods += spec['replicas'] - return expected_worker_pods - raise Exception(f"Unknown resource kind: {resource_kind} in get_expected_worker_pods()") - -def show_cluster_info(cr_namespace): - """Show system information""" - k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] - head_pods = k8s_v1_api.list_namespaced_pod( - namespace=cr_namespace, - label_selector='ray.io/node-type=head' - ) - worker_pods = k8s_v1_api.list_namespaced_pod( - namespace=cr_namespace, - label_selector='ray.io/node-type=worker' - ) - logger.info( - f"Number of head pods: {len(head_pods.items)}, " - f"number of worker pods: {len(worker_pods.items)}" - ) - shell_subprocess_run(f'kubectl get all -n={cr_namespace}') - shell_subprocess_run(f'kubectl describe pods -n={cr_namespace}') - # With "--tail=-1", every line in the log will be printed. The default value of "tail" is not - # -1 when using selector. - shell_subprocess_run(f'kubectl logs -n={cr_namespace} -l ray.io/node-type=head --tail=-1') - operator_namespace = shell_subprocess_check_output('kubectl get pods ' - '-l app.kubernetes.io/component=kuberay-operator -A ' - '-o jsonpath={".items[0].metadata.namespace"}') - shell_subprocess_check_output("kubectl logs -l app.kubernetes.io/component=kuberay-operator -n " - f'{operator_namespace.decode("utf-8")} --tail=-1') - -# Configuration Test Framework Abstractions: (1) Mutator (2) Rule (3) RuleSet (4) CREvent -class Mutator: - """ - Mutator will start to mutate from `base_cr`. `patch_list` is a list of JsonPatch, and you - can specify multiple fields that want to mutate in a single JsonPatch. - """ - def __init__(self, base_custom_resource, json_patch_list: List[jsonpatch.JsonPatch]): - self.base_cr = base_custom_resource - self.patch_list = json_patch_list - - def mutate(self): - """ Generate a new cr by applying the json patch to `cr`. """ - for patch in self.patch_list: - yield patch.apply(self.base_cr) - -class Rule: - """ - Rule is used to check whether the actual cluster state is the same as our expectation after - a CREvent. We can infer the expected state by CR YAML file, and get the actual cluster state - by Kubernetes API. - """ - def __init__(self): - pass - - def trigger_condition(self, custom_resource=None) -> bool: - """ - The rule will only be checked when `trigger_condition` is true. For example, we will only - check "HeadPodNameRule" when "spec.headGroupSpec" is defined in CR YAML file. - """ - return True - - def assert_rule(self, custom_resource=None, cr_namespace='default'): - """Check whether the actual cluster state fulfills the rule or not.""" - raise NotImplementedError - -class RuleSet: - """A set of Rule""" - def __init__(self, rules: List[Rule]): - self.rules = rules - - def check_rule_set(self, custom_resource, namespace): - """Check all rules that the trigger conditions are fulfilled.""" - for rule in self.rules: - if rule.trigger_condition(custom_resource): - rule.assert_rule(custom_resource, namespace) - -class CREvent: - """ - CREvent: Custom Resource Event can be mainly divided into 3 categories. - (1) Add (create) CR (2) Update CR (3) Delete CR - """ - def __init__( - self, - custom_resource_object, - rulesets: List[RuleSet] = [], - timeout: int = 300, - namespace: str = "default", - filepath: Optional[str] = None, - ): - self.rulesets = rulesets - self.timeout = timeout - self.namespace = namespace - self.custom_resource_object = custom_resource_object - # A file may consists of multiple Kubernetes resources (ex: ray-cluster.external-redis.yaml) - self.filepath = filepath - self.num_pods = 0 - - def trigger(self): - """ - The member functions integrate together in `trigger()`. - [Step1] exec(): Execute a command to trigger the CREvent. - [Step2] wait(): Wait for the system to converge. - [Step3] check_rule_sets(): When the system converges, check all registered RuleSets. - """ - self.exec() - self.wait() - self.check_rule_sets() - - def exec(self): - """ - Execute a command to trigger the CREvent. For example, create a CR by a - `kubectl apply` command. - """ - k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] - pods = k8s_v1_api.list_namespaced_pod(namespace=self.namespace).items - self.num_pods = len(pods) - logger.info("Number of Pods before CREvent: %d", self.num_pods) - for pod in pods: - logger.info("[%s] Pod name: %s", self.namespace, pod.metadata.name) - - if not self.filepath: - create_custom_object(self.namespace, self.custom_resource_object) - else: - shell_subprocess_run(f"kubectl apply -n {self.namespace} -f {self.filepath}") - - def wait(self): - """Wait for the system to converge.""" - time.sleep(self.timeout) - - def check_rule_sets(self): - """When the system converges, check all registered RuleSets.""" - for ruleset in self.rulesets: - ruleset.check_rule_set(self.custom_resource_object, self.namespace) - - def clean_up(self): - """Cleanup the CR.""" - raise NotImplementedError - -# My implementations -class HeadPodNameRule(Rule): - """Check head pod's name""" - def trigger_condition(self, custom_resource=None) -> bool: - steps = "spec.headGroupSpec".split('.') - return search_path(custom_resource, steps) is not None - - def assert_rule(self, custom_resource=None, cr_namespace='default'): - expected_val = search_path(custom_resource, - "spec.headGroupSpec.template.spec.containers.0.name".split('.')) - headpod = get_head_pod(cr_namespace) - assert headpod.spec.containers[0].name == expected_val - -class HeadSvcRule(Rule): - """The labels of the head pod and the selectors of the head service must match.""" - def assert_rule(self, custom_resource=None, cr_namespace='default'): - k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] - head_services = k8s_v1_api.list_namespaced_service( - namespace= cr_namespace, label_selector="ray.io/node-type=head") - assert len(head_services.items) == 1 - selector_dict = head_services.items[0].spec.selector - selector = ','.join(map(lambda key: f"{key}={selector_dict[key]}", selector_dict)) - headpods = k8s_v1_api.list_namespaced_pod( - namespace =cr_namespace, label_selector=selector) - assert len(headpods.items) == 1 - -class EasyJobRule(Rule): - """Submit a very simple Ray job to test the basic functionality of the Ray cluster.""" - def assert_rule(self, custom_resource=None, cr_namespace='default'): - headpod = get_head_pod(cr_namespace) - headpod_name = headpod.metadata.name - pod_exec_command(headpod_name, cr_namespace, - "python -c \"import ray; ray.init(); print(ray.cluster_resources())\"") - -class ShutdownJobRule(Rule): - """Check the Ray cluster is shutdown when setting `spec.shutdownAfterJobFinishes` to true.""" - def assert_rule(self, custom_resource=None, cr_namespace='default'): - custom_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY] - # Wait for there to be no RayClusters - logger.info("Waiting for RayCluster to be deleted...") - for i in range(30): - rayclusters = custom_api.list_namespaced_custom_object( - group = 'ray.io', version = 'v1', namespace = cr_namespace, - plural = 'rayclusters') - # print debug log - if i != 0: - logger.info("ShutdownJobRule wait() hasn't converged yet.") - logger.info("Number of RayClusters: %d", len(rayclusters["items"])) - if len(rayclusters["items"]) == 0: - break - time.sleep(1) - else: - raise TimeoutError("RayCluster hasn't been deleted in 30 seconds.") - - logger.info("RayCluster has been deleted.") - - - - def trigger_condition(self, custom_resource=None) -> bool: - # Trigger if shutdownAfterJobFinishes is set to true - steps = "spec.shutdownAfterJobFinishes".split('.') - value = search_path(custom_resource, steps) - logger.info("ShutdownJobRule trigger_condition(): %s", value) - assert isinstance(value, bool) or value is None - return value is not None and value - -class CurlServiceRule(Rule): - """Using curl to access the deployed application(s) on RayService""" - CURL_CMD_FMT = ( - "kubectl exec curl -n {namespace} -- " - "curl -X POST -H 'Content-Type: application/json' " - "{name}-serve-svc.{namespace}.svc.cluster.local:8000{path}/ -d '{json}'" - ) - - def __init__(self, queries: List[Dict[str, str]], start_in_background: bool = False): - self.queries = queries - self.start_in_background = start_in_background - - def assert_rule(self, custom_resource, cr_namespace): - # If curl pod doesn't exist, create one - if get_pod("default", "run=curl") is None: - start_curl_pod("curl", cr_namespace, timeout_s=30) - - for query in self.queries: - cmd = self.CURL_CMD_FMT.format( - name=custom_resource["metadata"]["name"], - namespace=cr_namespace, - path=query.get("path").rstrip("/"), - json=json.dumps(query["json_args"]) - ) - - if self.start_in_background: - shell_subprocess_run(f"{cmd} &", hide_output=False) - - else: - output = shell_subprocess_check_output(cmd) - logger.info("curl output: %s", output.decode('utf-8')) - if hasattr(query.get("expected_output"), "__iter__"): - assert output.decode('utf-8') in query["expected_output"] - else: - assert output.decode('utf-8') == query["expected_output"] - time.sleep(1) - -class AutoscaleRule(Rule): - def __init__( - self, - query: Dict[str, str], - num_repeat: int, - expected_worker_pods: int, - timeout: int, - message: str = "", - ): - self.query: Dict[str, str] = query - self.num_repeat: int = num_repeat - self.expected_worker_pods = expected_worker_pods - self.query_rule = CurlServiceRule(queries=[query], start_in_background=True) - self.timeout = timeout - self.message = message - - def assert_rule(self, custom_resource, cr_namespace): - logger.info(self.message) - for _ in range(self.num_repeat): - self.query_rule.assert_rule(custom_resource, cr_namespace) - - start_time = time.time() - while time.time() - start_time < self.timeout: - k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] - pods = k8s_v1_api.list_namespaced_pod( - namespace=cr_namespace, label_selector='ray.io/node-type=worker' - ) - logger.info("Number of worker pods: %d", len(pods.items)) - if len(pods.items) == self.expected_worker_pods: - logger.info( - "Cluster has successfully scaled to the expected number of " - f"{self.expected_worker_pods} worker pods after " - f"{time.time() - start_time} seconds." - ) - break - time.sleep(2) - else: - show_cluster_info(cr_namespace) - raise TimeoutError( - "Cluster did not scale to the expected number of " - f"{self.expected_worker_pods} worker pod(s) within {self.timeout} " - f"seconds. Cluster is currently at {len(pods.items)} worker pods." - ) - - -class RayClusterAddCREvent(CREvent): - """CREvent for RayCluster addition""" - def wait(self): - start_time = time.time() - expected_head_pods = get_expected_head_pods(self.custom_resource_object) - expected_worker_pods = get_expected_worker_pods(self.custom_resource_object) - # Wait until: - # (1) The number of head pods and worker pods are as expected. - # (2) All head pods and worker pods are "Running". - converge = False - k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] - for _ in range(self.timeout): - headpods = k8s_v1_api.list_namespaced_pod( - namespace = self.namespace, label_selector='ray.io/node-type=head') - workerpods = k8s_v1_api.list_namespaced_pod( - namespace = self.namespace, label_selector='ray.io/node-type=worker') - if (len(headpods.items) == expected_head_pods - and len(workerpods.items) == expected_worker_pods - and check_pod_running(headpods.items) and check_pod_running(workerpods.items)): - converge = True - logger.info("--- RayClusterAddCREvent %s seconds ---", time.time() - start_time) - break - time.sleep(1) - - if not converge: - logger.info("RayClusterAddCREvent wait() failed to converge in %d seconds.", - self.timeout) - logger.info("expected_head_pods: %d, expected_worker_pods: %d", - expected_head_pods, expected_worker_pods) - show_cluster_info(self.namespace) - raise Exception("RayClusterAddCREvent wait() timeout") - - def clean_up(self): - """Delete added RayCluster""" - delete_custom_object(CONST.RAY_CLUSTER_CRD, - self.namespace, self.custom_resource_object['metadata']['name']) - - # Wait pods to be deleted - converge = False - k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] - start_time = time.time() - for _ in range(self.timeout): - headpods = k8s_v1_api.list_namespaced_pod( - namespace = self.namespace, label_selector='ray.io/node-type=head') - workerpods = k8s_v1_api.list_namespaced_pod( - namespace = self.namespace, label_selector='ray.io/node-type=worker') - rediscleanuppods = k8s_v1_api.list_namespaced_pod( - namespace = self.namespace, label_selector='ray.io/node-type=redis-cleanup') - if (len(headpods.items) == 0 and len(workerpods.items) == 0 and len(rediscleanuppods.items) == 0): - converge = True - logger.info("--- Cleanup RayCluster %s seconds ---", time.time() - start_time) - break - time.sleep(1) - - if not converge: - logger.info("RayClusterAddCREvent clean_up() failed to converge in %d seconds.", - self.timeout) - logger.info("expected_head_pods: 0, expected_worker_pods: 0") - show_cluster_info(self.namespace) - raise Exception("RayClusterAddCREvent clean_up() timeout") - - """Delete other resources in the yaml""" - if self.filepath: - logger.info("Delete other resources in the YAML") - shell_subprocess_run(f"kubectl delete -n {self.namespace} -f {self.filepath} --ignore-not-found=true") - - start_time = time.time() - converge = False - for _ in range(self.timeout): - k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] - pods = k8s_v1_api.list_namespaced_pod(namespace=self.namespace).items - if len(pods) == self.num_pods: - converge = True - logger.info("--- Cleanup other resources %s seconds ---", time.time() - start_time) - break - logger.info("#Pods: (1) before CREvent: %d (2) now: %d", self.num_pods, len(pods)) - time.sleep(10) - - if not converge: - show_cluster_info(self.namespace) - raise Exception("RayClusterAddCREvent clean_up() timeout") - -class RayJobAddCREvent(CREvent): - """CREvent for RayJob addition""" - def wait(self): - """Wait for RayJob to converge""" - start_time = time.time() - expected_head_pods = get_expected_head_pods(self.custom_resource_object) - expected_worker_pods = get_expected_worker_pods(self.custom_resource_object) - expected_rayclusters = 1 - expected_job_pods = 1 - # Wait until: - # (1) The number of head pods and worker pods are as expected. - # (2) All head pods and worker pods are "Running". - # (3) A RayCluster has been created. - # (4) Exactly one Job pod has been created. - # (5) RayJob named "rayjob-sample" has status "SUCCEEDED". - # We check the `expected_job_pods = 1` condition to catch situations described in - # https://github.com/ray-project/kuberay/issues/1381 - converge = False - k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] - custom_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY] - for i in range(self.timeout): - rayclusters = custom_api.list_namespaced_custom_object( - group = 'ray.io', version = 'v1', namespace = self.namespace, - plural = 'rayclusters')["items"] - headpods = k8s_v1_api.list_namespaced_pod( - namespace = self.namespace, label_selector='ray.io/node-type=head') - workerpods = k8s_v1_api.list_namespaced_pod( - namespace = self.namespace, label_selector='ray.io/node-type=worker') - rayjob = get_custom_object(CONST.RAY_JOB_CRD, self.namespace, - self.custom_resource_object['metadata']['name']) - jobpods = k8s_v1_api.list_namespaced_pod( - namespace = self.namespace, label_selector='job-name='+self.custom_resource_object['metadata']['name']) - - if (len(headpods.items) == expected_head_pods - and len(workerpods.items) == expected_worker_pods - and len(jobpods.items) == expected_job_pods - and check_pod_running(headpods.items) and check_pod_running(workerpods.items) - and rayjob.get('status') is not None - and rayjob.get('status').get('jobStatus') == "SUCCEEDED" - and len(rayclusters) == expected_rayclusters): - converge = True - logger.info("--- RayJobAddCREvent converged in %s seconds ---", - time.time() - start_time) - break - else: - # Print debug logs every 10 seconds. - if i % 10 == 0 and i != 0: - logger.info("RayJobAddCREvent wait() hasn't converged yet.") - # Print out the delta between expected and actual for the parts that are not - # converged yet. - if len(headpods.items) != expected_head_pods: - logger.info("expected_head_pods: %d, actual_head_pods: %d", - expected_head_pods, len(headpods.items)) - if len(workerpods.items) != expected_worker_pods: - logger.info("expected_worker_pods: %d, actual_worker_pods: %d", - expected_worker_pods, len(workerpods.items)) - if len(jobpods.items) != expected_job_pods: - logger.info("expected_job_pods: %d, actual_job_pods: %d", - expected_job_pods, len(jobpods.items)) - if not check_pod_running(headpods.items): - logger.info("head pods are not running yet.") - if not check_pod_running(workerpods.items): - logger.info("worker pods are not running yet.") - if rayjob.get('status') is None: - logger.info("rayjob status is None.") - elif rayjob.get('status').get('jobStatus') != "SUCCEEDED": - logger.info("rayjob status is not SUCCEEDED yet.") - logger.info("rayjob status: %s", rayjob.get('status').get('jobStatus')) - if len(rayclusters) != expected_rayclusters: - logger.info("expected_rayclusters: %d, actual_rayclusters: %d", - expected_rayclusters, len(rayclusters)) - - if (rayjob.get("status") is not None and - rayjob.get("status").get("jobStatus") in ["STOPPED", "FAILED"]): - logger.info("Job Status: %s", rayjob.get("status").get("jobStatus")) - logger.info("Job Message: %s", rayjob.get("status").get("message")) - break - time.sleep(1) - - if not converge: - logger.info("RayJobAddCREvent wait() failed to converge in %d seconds.", - self.timeout) - logger.info("expected_head_pods: %d, expected_worker_pods: %d", - expected_head_pods, expected_worker_pods) - logger.info("rayjob: %s", rayjob) - show_cluster_info(self.namespace) - raise Exception("RayJobAddCREvent wait() timeout") - - def clean_up(self): - """Delete added RayJob""" - if not self.filepath: - delete_custom_object(CONST.RAY_JOB_CRD, - self.namespace, self.custom_resource_object['metadata']['name']) - else: - shell_subprocess_run(f"kubectl delete -n {self.namespace} -f {self.filepath}") - # Wait for pods to be deleted - converge = False - k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] - start_time = time.time() - for _ in range(self.timeout): - headpods = k8s_v1_api.list_namespaced_pod( - namespace = self.namespace, label_selector = 'ray.io/node-type=head') - workerpods = k8s_v1_api.list_namespaced_pod( - namespace = self.namespace, label_selector = 'ray.io/node-type=worker') - if (len(headpods.items) == 0 and len(workerpods.items) == 0): - converge = True - logger.info("--- Cleanup RayJob %s seconds ---", time.time() - start_time) - break - time.sleep(1) - - if not converge: - logger.info("RayJobAddCREvent clean_up() failed to converge in %d seconds.", - self.timeout) - logger.info("expected_head_pods: 0, expected_worker_pods: 0") - show_cluster_info(self.namespace) - raise Exception("RayJobAddCREvent clean_up() timeout") - -class GeneralTestCase(unittest.TestCase): - """TestSuite""" - def __init__(self, methodName, cr_event): - super().__init__(methodName) - self.cr_event = cr_event - self.operator_manager = OperatorManager.instance() - - @classmethod - def setUpClass(cls): - K8S_CLUSTER_MANAGER.cleanup() - - def setUp(self): - if not K8S_CLUSTER_MANAGER.check_cluster_exist(): - K8S_CLUSTER_MANAGER.initialize_cluster() - self.operator_manager.prepare_operator() - - def runtest(self): - """Run a configuration test""" - self.cr_event.trigger() - - def tearDown(self) -> None: - try: - self.cr_event.clean_up() - except Exception as ex: - logger.error(str(ex)) - K8S_CLUSTER_MANAGER.cleanup() diff --git a/tests/framework/utils.py b/tests/framework/utils.py deleted file mode 100644 index 2196867dfde..00000000000 --- a/tests/framework/utils.py +++ /dev/null @@ -1,523 +0,0 @@ -"""Utilities for integration tests of KubeRay.""" - -import os -import subprocess -import logging -from pathlib import Path -import tempfile -from urllib import request -import yaml -import jsonpatch -import time -from kubernetes import client, config -from abc import ABC, abstractmethod - -logger = logging.getLogger(__name__) -logging.basicConfig( - format="%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s", - datefmt="%Y-%m-%d:%H:%M:%S", - level=logging.INFO, -) - - -class CONST: - """Constants""" - - __slots__ = () - # Docker images - OPERATOR_IMAGE_KEY = "kuberay-operator-image" - KUBERAY_LATEST_RELEASE = "quay.io/kuberay/operator:v1.1.0" - - # Kubernetes API clients - K8S_CR_CLIENT_KEY = "k8s-cr-api-client" - K8S_V1_CLIENT_KEY = "k8s-v1-api-client" - - # Paths - REPO_ROOT = Path(__file__).absolute().parent.parent.parent - HELM_CHART_ROOT = REPO_ROOT.joinpath("helm-chart") - - # Decide the config based on the environment - BUILDKITE_ENV = "BUILDKITE_ENV" - if os.getenv(BUILDKITE_ENV, default="") == "true": - DEFAULT_KIND_CONFIG = REPO_ROOT.joinpath("tests/framework/config/kind-config-buildkite.yml") - else: - DEFAULT_KIND_CONFIG = REPO_ROOT.joinpath("tests/framework/config/kind-config.yaml") - - # Ray features - RAY_FT = "RAY_FT" - RAY_SERVICE = "RAY_SERVICE" - RAY_SERVE_FT = "RAY_SERVE_FT" - - # Custom Resource Definitions - RAY_CLUSTER_CRD = "RayCluster" - RAY_SERVICE_CRD = "RayService" - RAY_JOB_CRD = "RayJob" - - # Failures - CREATE_NEW_POD = "CREATE_NEW_POD" - RESTART_OLD_POD = "RESTART_OLD_POD" - - -CONST = CONST() - -class ClusterManager(ABC): - EXTERNAL_CLUSTER = "EXTERNAL_CLUSTER" - - @abstractmethod - def initialize_cluster(self, kind_config=None) -> None: - pass - - @abstractmethod - def cleanup(self) -> None: - pass - - @abstractmethod - def upload_image(): - pass - - @abstractmethod - def check_cluster_exist(self) -> bool: - pass - - @classmethod - def instance(cls): - if cls.EXTERNAL_CLUSTER in os.environ: - return ExternalClusterManager() - else: - return KindClusterManager() - -class ExternalClusterManager(ClusterManager): - CLUSTER_CLEANUP_SCRIPT = "CLUSTER_CLEANUP_SCRIPT" - - def __init__(self) -> None: - self.k8s_client_dict = {} - self.cleanup_timeout = 120 - - def cleanup(self, namespace = "default") -> None: - if self.CLUSTER_CLEANUP_SCRIPT in os.environ: - cleanup_script = os.environ[self.CLUSTER_CLEANUP_SCRIPT] - shell_subprocess_run(cleanup_script) - else: - self.__delete_all_crs("ray.io", "v1", namespace, "rayservices") - self.__delete_all_crs("ray.io", "v1", namespace, "rayjobs") - self.__delete_all_crs("ray.io", "v1", namespace, "rayclusters") - - k8s_v1_api = self.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] - start_time = time.time() - while time.time() - start_time < self.cleanup_timeout: - pods = k8s_v1_api.list_pod_for_all_namespaces(label_selector = 'app.kubernetes.io/created-by=kuberay-operator') - if len(pods.items) == 0: - logger.info("--- Cleanup rayservices, rayjobs, rayclusters %s seconds ---", time.time() - start_time) - break - - time.sleep(1) - - shell_subprocess_run("helm uninstall kuberay-operator", check=False) - start_time = time.time() - while time.time() - start_time < self.cleanup_timeout: - pods = k8s_v1_api.list_pod_for_all_namespaces(label_selector = 'app.kubernetes.io/component=kuberay-operator') - if len(pods.items) == 0: - logger.info("--- Cleanup kuberay-operator %s seconds ---", time.time() - start_time) - break - - time.sleep(1) - - for _, k8s_client in self.k8s_client_dict.items(): - k8s_client.api_client.rest_client.pool_manager.clear() - k8s_client.api_client.close() - - self.k8s_client_dict = {} - - def initialize_cluster(self, kind_config=None) -> None: - config.load_kube_config() - self.k8s_client_dict.update( - { - CONST.K8S_V1_CLIENT_KEY: client.CoreV1Api(), - CONST.K8S_CR_CLIENT_KEY: client.CustomObjectsApi(), - } - ) - - def upload_image(self, image): - pass - - def check_cluster_exist(self) -> bool: - """Check whether cluster exists or not""" - return ( - shell_subprocess_run( - "kubectl cluster-info", check=False - ) - == 0 - ) - - def __delete_all_crs(self, group, version, namespace, plural): - custom_objects_api = self.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY] - try: - crs = custom_objects_api.list_namespaced_custom_object(group, version, namespace, plural) - for cr in crs["items"]: - name = cr["metadata"]["name"] - custom_objects_api.delete_namespaced_custom_object(group, version, namespace, plural, name) - except client.exceptions.ApiException: - logger.info("CRD did not exist during clean up %s", plural) - - -class KindClusterManager(ClusterManager): - """ - KindClusterManager controlls the lifecycle of KinD cluster and Kubernetes API client. - """ - - def __init__(self) -> None: - self.k8s_client_dict = {} - - def cleanup(self) -> None: - """Delete a KinD cluster""" - shell_subprocess_run("kind delete cluster") - for _, k8s_client in self.k8s_client_dict.items(): - k8s_client.api_client.rest_client.pool_manager.clear() - k8s_client.api_client.close() - self.k8s_client_dict = {} - - def initialize_cluster(self, kind_config=None) -> None: - """Create a KinD cluster""" - # To use NodePort service, `kind_config` needs to set `extraPortMappings` properly. - kind_config = CONST.DEFAULT_KIND_CONFIG if not kind_config else kind_config - shell_subprocess_run(f"kind create cluster --wait 900s --config {kind_config}") - - # Adjust the kubeconfig server address if necessary - self._adjust_kubeconfig_server_address() - - config.load_kube_config() - self.k8s_client_dict.update( - { - CONST.K8S_V1_CLIENT_KEY: client.CoreV1Api(), - CONST.K8S_CR_CLIENT_KEY: client.CustomObjectsApi(), - } - ) - - def upload_image(self, image): - shell_subprocess_run(f"kind load docker-image {image}") - - def check_cluster_exist(self) -> bool: - """Check whether KinD cluster exists or not""" - return ( - shell_subprocess_run( - "kubectl cluster-info --context kind-kind", check=False - ) - == 0 - ) - - def _adjust_kubeconfig_server_address(self) -> None: - """Modify the server address in kubeconfig to https://docker:6443""" - if os.getenv(CONST.BUILDKITE_ENV, default="") == "true": - shell_subprocess_run("kubectl config set clusters.kind-kind.server https://docker:6443") - -K8S_CLUSTER_MANAGER = ClusterManager.instance() - - -class OperatorManager(ABC): - KUBERAY_OPERATOR_INSTALLATION_SCRIPT = "KUBERAY_OPERATOR_INSTALLATION_SCRIPT" - - @abstractmethod - def prepare_operator(self): - pass - - @classmethod - def instance(cls, namespace=None, patch=jsonpatch.JsonPatch([]), - cluster_manager = K8S_CLUSTER_MANAGER): - if cls.KUBERAY_OPERATOR_INSTALLATION_SCRIPT in os.environ: - if (namespace != None) or (patch != jsonpatch.JsonPatch([])): - raise Exception("Parameters namespace or patch are not supported in ScriptBasedOperatorManager") - return ScriptBasedOperatorManager() - else: - if namespace == None: - namespace = "default" - DEFAULT_IMAGE_DICT = { - CONST.OPERATOR_IMAGE_KEY: os.getenv('OPERATOR_IMAGE', default='quay.io/kuberay/operator:nightly'), - } - default_operator_manager = DefaultOperatorManager(DEFAULT_IMAGE_DICT, namespace, patch, cluster_manager) - return default_operator_manager - -class DefaultOperatorManager(OperatorManager): - """ - OperatorManager controlls the lifecycle of KubeRay operator. It will download Docker images, - load images into an existing KinD cluster, and install CRD and KubeRay operator. - Parameters: - docker_image_dict : A dict that includes docker images that need to be - downloaded and loaded to the cluster. - namespace : A namespace(string) that KubeRay operator will be installed in. - patch : A jsonpatch that will be applied to the default KubeRay operator config - to create the custom config. - cluster_manager : Cluster manager instance. - """ - - def __init__( - self, docker_image_dict, namespace="default", patch=jsonpatch.JsonPatch([]), - cluster_manager = K8S_CLUSTER_MANAGER - ) -> None: - self.docker_image_dict = docker_image_dict - self.cluster_manager = cluster_manager - self.namespace = namespace - self.values_yaml = {} - for key in [CONST.OPERATOR_IMAGE_KEY]: - if key not in self.docker_image_dict: - raise Exception(f"Image {key} does not exist!") - repo, tag = self.docker_image_dict[CONST.OPERATOR_IMAGE_KEY].split(":") - if f"{repo}:{tag}" == CONST.KUBERAY_LATEST_RELEASE: - url = ( - "https://github.com/ray-project/kuberay-helm" - f"/raw/kuberay-operator-{tag[1:]}/helm-chart/kuberay-operator/values.yaml" - ) - else: - url = "file:///" + str( - CONST.HELM_CHART_ROOT.joinpath("kuberay-operator/values.yaml") - ) - with request.urlopen(url) as base_fd: - self.values_yaml = patch.apply(yaml.safe_load(base_fd)) - - def prepare_operator(self): - """Prepare KubeRay operator for an existing KinD cluster""" - self.__kind_prepare_images() - self.__install_crd_and_operator() - - def __kind_prepare_images(self): - """Download images and load images into KinD cluster""" - - def download_images(): - """Download Docker images from DockerHub""" - logger.info("Download Docker images: %s", self.docker_image_dict) - for key in self.docker_image_dict: - # Only pull the image from DockerHub when the image does not - # exist in the local docker registry. - image = self.docker_image_dict[key] - if ( - shell_subprocess_run( - f"docker image inspect {image} > /dev/null", check=False - ) - != 0 - ): - shell_subprocess_run(f"docker pull {image}") - else: - logger.info("Image %s exists", image) - - download_images() - logger.info("Load images into KinD cluster") - for key in self.docker_image_dict: - image = self.docker_image_dict[key] - self.cluster_manager.upload_image(image) - - def __install_crd_and_operator(self): - """ - Install both CRD and KubeRay operator by kuberay-operator chart. - """ - with tempfile.NamedTemporaryFile("w", suffix="_values.yaml") as values_fd: - # dump the config to a temporary file and use the file as values.yaml in the chart. - yaml.safe_dump(self.values_yaml, values_fd) - repo, tag = self.docker_image_dict[CONST.OPERATOR_IMAGE_KEY].split(":") - if f"{repo}:{tag}" == CONST.KUBERAY_LATEST_RELEASE: - logger.info( - "Install both CRD and KubeRay operator with the latest release." - ) - shell_subprocess_run( - "helm repo add kuberay https://ray-project.github.io/kuberay-helm/" - ) - shell_subprocess_run( - f"helm install -n {self.namespace} -f {values_fd.name} kuberay-operator " - f"kuberay/kuberay-operator --version {tag[1:]}" - ) - else: - logger.info( - "Install both CRD and KubeRay operator by kuberay-operator chart" - ) - shell_subprocess_run( - f"helm install -n {self.namespace} -f {values_fd.name} kuberay-operator " - f"{CONST.HELM_CHART_ROOT}/kuberay-operator/ " - f"--set image.repository={repo},image.tag={tag}" - ) - -class ScriptBasedOperatorManager(OperatorManager): - def __init__(self): - self.installation_script = os.getenv(self.KUBERAY_OPERATOR_INSTALLATION_SCRIPT) - - def prepare_operator(self): - return_code = shell_subprocess_run(self.installation_script) - if return_code != 0: - raise Exception("Operator installation failed with exit code " + str(return_code)) - - -def shell_subprocess_run(command, check=True, hide_output=False) -> int: - """Command will be executed through the shell. - - Args: - check: If true, an error will be raised if the returncode is nonzero - hide_output: If true, stdout and stderr of the command will be hidden - - Returns: - Return code of the subprocess. - """ - logger.info("Execute command: %s", command) - if hide_output: - return subprocess.run( - command, - shell=True, - check=check, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ).returncode - else: - return subprocess.run(command, shell=True, check=check).returncode - - -def shell_subprocess_check_output(command): - """ - Run command and return STDOUT as encoded bytes. - """ - logger.info("Execute command (check_output): %s", command) - - try: - output = subprocess.check_output(command, shell=True) - logger.info("Output: %s", output) - return output - except subprocess.CalledProcessError as e: - logger.info("Exception: %s", e.output) - raise - - -def get_pod(namespace, label_selector): - """Gets pods in the `namespace`. Returns the first pod that has `label_filter`. - Returns None if the number of matches is not equal to 1. - """ - pods = K8S_CLUSTER_MANAGER.k8s_client_dict[ - CONST.K8S_V1_CLIENT_KEY - ].list_namespaced_pod(namespace=namespace, label_selector=label_selector) - if len(pods.items) != 1: - logger.warning( - "There are %d matches for selector %s in namespace %s, but the expected match is 1.", - len(pods.items), - label_selector, - namespace, - ) - return None - return pods.items[0] - - -def get_head_pod(namespace): - """Gets a head pod in the `namespace`. Returns None if there are no matches.""" - return get_pod(namespace, "ray.io/node-type=head") - - -def pod_exec_command(pod_name, namespace, exec_command, check=True): - """kubectl exec the `exec_command` in the given `pod_name` Pod in the given `namespace`. - Both STDOUT and STDERR of `exec_command` will be printed. - """ - return shell_subprocess_run( - f"kubectl exec {pod_name} -n {namespace} -- {exec_command}", check - ) - -def delete_all_cr(crd_name, namespace, check=True): - return shell_subprocess_run( - f"kubectl delete {crd_name} --all -n {namespace}", check - ) - - -def start_curl_pod(name: str, namespace: str, timeout_s: int = -1): - shell_subprocess_run( - f"kubectl run {name} --image=rancher/curl -n {namespace} " - '--command -- /bin/sh -c "while true; do sleep 10;done"' - ) - - # Wait for curl pod to be created - k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] - start_time = time.time() - while time.time() - start_time < timeout_s or timeout_s < 0: - resp = k8s_v1_api.read_namespaced_pod(name=name, namespace=namespace) - if resp.status.phase == "Running": - return - - raise TimeoutError(f"Curl pod wasn't started in {timeout_s}s.") - - -def create_custom_object(namespace, cr_object): - """Create a custom resource based on `cr_object` in the given `namespace`.""" - k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY] - crd = cr_object["kind"] - if crd == CONST.RAY_CLUSTER_CRD: - k8s_cr_api.create_namespaced_custom_object( - group="ray.io", - version="v1", - namespace=namespace, - plural="rayclusters", - body=cr_object, - ) - elif crd == CONST.RAY_SERVICE_CRD: - k8s_cr_api.create_namespaced_custom_object( - group="ray.io", - version="v1", - namespace=namespace, - plural="rayservices", - body=cr_object, - ) - elif crd == CONST.RAY_JOB_CRD: - k8s_cr_api.create_namespaced_custom_object( - group="ray.io", - version="v1", - namespace=namespace, - plural="rayjobs", - body=cr_object, - ) - - -def delete_custom_object(crd, namespace, cr_name): - """Delete the given `cr_name` custom resource in the given `namespace`.""" - k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY] - if crd == CONST.RAY_CLUSTER_CRD: - k8s_cr_api.delete_namespaced_custom_object( - group="ray.io", - version="v1", - namespace=namespace, - plural="rayclusters", - name=cr_name, - ) - elif crd == CONST.RAY_SERVICE_CRD: - k8s_cr_api.delete_namespaced_custom_object( - group="ray.io", - version="v1", - namespace=namespace, - plural="rayservices", - name=cr_name, - ) - elif crd == CONST.RAY_JOB_CRD: - k8s_cr_api.delete_namespaced_custom_object( - group="ray.io", - version="v1", - namespace=namespace, - plural="rayjobs", - name=cr_name, - ) - - -def get_custom_object(crd, namespace, cr_name): - """Get the given `cr_name` custom resource in the given `namespace`.""" - k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY] - if crd == CONST.RAY_CLUSTER_CRD: - return k8s_cr_api.get_namespaced_custom_object( - group="ray.io", - version="v1", - namespace=namespace, - plural="rayclusters", - name=cr_name, - ) - elif crd == CONST.RAY_SERVICE_CRD: - return k8s_cr_api.get_namespaced_custom_object( - group="ray.io", - version="v1", - namespace=namespace, - plural="rayservices", - name=cr_name, - ) - elif crd == CONST.RAY_JOB_CRD: - return k8s_cr_api.get_namespaced_custom_object( - group="ray.io", - version="v1", - namespace=namespace, - plural="rayjobs", - name=cr_name, - ) diff --git a/tests/test_security.py b/tests/test_security.py deleted file mode 100644 index c66374f60ef..00000000000 --- a/tests/test_security.py +++ /dev/null @@ -1,121 +0,0 @@ -""" -https://github.com/ray-project/kuberay/blob/master/docs/guidance/pod-security.md -Test for pod-security.md in CI -""" -import os -import logging -import unittest -import yaml -import jsonpatch -from kubernetes import client -from kubernetes.client.rest import ApiException - -from framework.prototype import ( - RayClusterAddCREvent -) - -from framework.utils import ( - shell_subprocess_run, - CONST, - K8S_CLUSTER_MANAGER, - OperatorManager -) - -logger = logging.getLogger(__name__) -logging.basicConfig( - format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', - datefmt='%Y-%m-%d:%H:%M:%S', - level=logging.INFO -) - -class PodSecurityTestCase(unittest.TestCase): - """ - https://github.com/ray-project/kuberay/blob/master/docs/guidance/pod-security.md - Test for the document for the Pod security standard in CI. - The differences between this test and pod-security.md is: - (Step 5.1) Installs a simple Pod without securityContext instead of a RayCluster. - """ - namespace = "pod-security" - - @classmethod - def setUpClass(cls): - K8S_CLUSTER_MANAGER.cleanup() - kind_config = CONST.REPO_ROOT.joinpath("ray-operator/config/security/kind-config.yaml") - K8S_CLUSTER_MANAGER.initialize_cluster(kind_config = kind_config) - # Apply the restricted Pod security standard to all Pods in the namespace pod-security. - # The label pod-security.kubernetes.io/enforce=restricted means that the Pod that violates - # the policies will be rejected. - shell_subprocess_run(f"kubectl create ns {PodSecurityTestCase.namespace}") - shell_subprocess_run(f"kubectl label --overwrite ns {PodSecurityTestCase.namespace} \ - {PodSecurityTestCase.namespace}.kubernetes.io/warn=restricted \ - {PodSecurityTestCase.namespace}.kubernetes.io/warn-version=latest \ - {PodSecurityTestCase.namespace}.kubernetes.io/audit=restricted \ - {PodSecurityTestCase.namespace}.kubernetes.io/audit-version=latest \ - {PodSecurityTestCase.namespace}.kubernetes.io/enforce=restricted \ - {PodSecurityTestCase.namespace}.kubernetes.io/enforce-version=latest") - # Install the KubeRay operator in the namespace pod-security. - image_dict = { - CONST.OPERATOR_IMAGE_KEY: os.getenv('OPERATOR_IMAGE','quay.io/kuberay/operator:nightly'), - } - logger.info(image_dict) - patch = jsonpatch.JsonPatch([{ - 'op': 'add', - 'path': '/securityContext', - 'value': { - 'allowPrivilegeEscalation': False, - 'capabilities': {'drop':["ALL"]}, - 'runAsNonRoot': True, - 'seccompProfile': {'type': 'RuntimeDefault'} - } - }]) - operator_manager = OperatorManager.instance(PodSecurityTestCase.namespace, patch) - operator_manager.prepare_operator() - - def test_ray_cluster_with_security_context(self): - """ - Create a RayCluster with securityContext config under restricted mode. - """ - context = {} - cr_yaml = CONST.REPO_ROOT.joinpath( - "ray-operator/config/security/ray-cluster.pod-security.yaml" - ) - with open(cr_yaml, encoding="utf-8") as cr_fd: - context['filepath'] = cr_fd.name - for k8s_object in yaml.safe_load_all(cr_fd): - if k8s_object['kind'] == 'RayCluster': - context['cr'] = k8s_object - break - - logger.info('[TEST]:Create RayCluster with securityContext config under restricted mode') - ray_cluster_add_event = RayClusterAddCREvent( - custom_resource_object = context['cr'], - rulesets = [], - timeout = 90, - namespace = PodSecurityTestCase.namespace, - filepath = context['filepath'] - ) - ray_cluster_add_event.trigger() - - def test_pod_without_security_context(self): - """ - Create a pod without securityContext config under restricted mode. - """ - k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] - pod_spec = client.V1PodSpec(containers=[client.V1Container(name='busybox',image='busybox')]) - pod_metadata = client.V1ObjectMeta(name='my-pod', namespace=PodSecurityTestCase.namespace) - pod_body = client.V1Pod(api_version='v1', kind='Pod', metadata=pod_metadata, spec=pod_spec) - logger.info('[TEST]:Create pod without securityContext config under restricted mode') - with self.assertRaises( - ApiException, - msg = 'A Pod that violates restricted security policies should be rejected.' - ) as ex: - k8s_v1_api.create_namespaced_pod(namespace=PodSecurityTestCase.namespace, body=pod_body) - # check if raise forbidden error. Only forbidden error is allowed - self.assertEqual( - first = ex.exception.status, - second = 403, - msg = f'Error code 403 is expected but Pod creation failed with {ex.exception.status}' - ) - -if __name__ == '__main__': - unittest.main(verbosity=2)