diff --git a/components/aws/sagemaker/tests/integration_tests/.flake8 b/components/aws/sagemaker/tests/integration_tests/.flake8 new file mode 100644 index 00000000000..e9bfde932c0 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/.flake8 @@ -0,0 +1,5 @@ +[flake8] +max-line-length = 120 +extend-ignore = + # See https://github.com/PyCQA/pycodestyle/issues/373 + E203, \ No newline at end of file diff --git a/components/aws/sagemaker/tests/integration_tests/README.md b/components/aws/sagemaker/tests/integration_tests/README.md new file mode 100644 index 00000000000..a06ae1bd31f --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/README.md @@ -0,0 +1,42 @@ +## Requirements +1. [Conda](https://docs.conda.io/en/latest/miniconda.html) +1. [Kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/) +1. Argo CLI: [Mac](https://github.com/argoproj/homebrew-tap), [Linux](https://eksworkshop.com/advanced/410_batch/install/) +1. K8s cluster with Kubeflow pipelines > 0.4.0 installed +1. [IAM Role](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html) with a SageMakerFullAccess and S3FullAccess +1. IAM User credentials with SageMakerFullAccess permissions + +## Creating S3 buckets with datasets + +Change the bucket name and run the python script `[s3_sample_data_creator.py](https://github.com/kubeflow/pipelines/tree/master/samples/contrib/aws-samples/mnist-kmeans-sagemaker#the-sample-dataset)` to create S3 buckets with mnist dataset in the region where you want to run the tests + +## Step to run integration tests +1. Configure AWS credentials with access to EKS cluster +1. Fetch kubeconfig to `~/.kube/config` or set `KUBECONFIG` environment variable to point to kubeconfig of the cluster +1. Create a [secret](https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/) named `aws-secret` in kubeflow namespace with credentials of IAM User for SageMakerFullAccess + ```yaml + apiVersion: v1 + kind: Secret + metadata: + name: aws-secret + namespace: kubeflow + type: Opaque + data: + AWS_ACCESS_KEY_ID: YOUR_BASE64_ACCESS_KEY + AWS_SECRET_ACCESS_KEY: YOUR_BASE64_SECRET_ACCESS + ``` + + > Note: To get base64 string, try `echo -n $AWS_ACCESS_KEY_ID | base64` +1. Create conda environment using environment.yml for running tests. Run `conda env create -f environment.yml` +1. Activate the conda environment `conda activate kfp_test_env` +1. Run port-forward to minio service in background. Example: `kubectl port-forward svc/minio-service 9000:9000 -n kubeflow &` +1. Provide the following arguments to pytest: + 1. `region`: AWS region where test will run. Default - us-west-2 + 1. `role-arn`: SageMaker execution IAM role ARN + 1. `s3-data-bucket`: Regional S3 bucket in which test data is hosted + 1. `minio-service-port`: Localhost port to which minio service is mapped to. Default - 9000 + 1. `kfp-namespace`: Cluster namespace where kubeflow pipelines is installed. Default - Kubeflow +1. cd into this directory and run + ``` + pytest --region <> --role-arn <> --s3-data-bucket <> --minio-service-port <> --kfp-namespace <> + ``` diff --git a/components/aws/sagemaker/tests/integration_tests/component_tests/test_model_component.py b/components/aws/sagemaker/tests/integration_tests/component_tests/test_model_component.py new file mode 100644 index 00000000000..65701ebd048 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/component_tests/test_model_component.py @@ -0,0 +1,49 @@ +import pytest +import os +import utils + +from utils import kfp_client_utils +from utils import minio_utils +from utils import sagemaker_utils + + +@pytest.mark.parametrize("test_file_dir", ["resources/config/kmeans-mnist-model"]) +def test_createmodel(kfp_client, experiment_id, sagemaker_client, test_file_dir): + + download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated")) + test_params = utils.load_params( + utils.replace_placeholders( + os.path.join(test_file_dir, "config.yaml"), + os.path.join(download_dir, "config.yaml"), + ) + ) + + # Generate random prefix for model name to avoid errors if model with same name exists + test_params["Arguments"]["model_name"] = input_model_name = ( + utils.generate_random_string(5) + "-" + test_params["Arguments"]["model_name"] + ) + + run_id, status, workflow_json = kfp_client_utils.compile_run_monitor_pipeline( + kfp_client, + experiment_id, + test_params["PipelineDefinition"], + test_params["Arguments"], + download_dir, + test_params["TestName"], + test_params["Timeout"], + ) + + outputs = {"sagemaker-create-model": ["model_name"]} + + output_files = minio_utils.artifact_download_iterator( + workflow_json, outputs, download_dir + ) + + output_model_name = utils.extract_information( + output_files["sagemaker-create-model"]["model_name"], "model_name.txt" + ) + print(f"model_name: {output_model_name.decode()}") + assert output_model_name.decode() == input_model_name + assert ( + sagemaker_utils.describe_model(sagemaker_client, input_model_name) is not None + ) diff --git a/components/aws/sagemaker/tests/integration_tests/component_tests/test_train_component.py b/components/aws/sagemaker/tests/integration_tests/component_tests/test_train_component.py new file mode 100644 index 00000000000..bbaf7bfa8a8 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/component_tests/test_train_component.py @@ -0,0 +1,65 @@ +import pytest +import os +import json +import utils +from utils import kfp_client_utils +from utils import minio_utils +from utils import sagemaker_utils + + +@pytest.mark.parametrize("test_file_dir", ["resources/config/simple-mnist-training"]) +def test_trainingjob(kfp_client, experiment_id, sagemaker_client, test_file_dir): + + download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated")) + test_params = utils.load_params( + utils.replace_placeholders( + os.path.join(test_file_dir, "config.yaml"), + os.path.join(download_dir, "config.yaml"), + ) + ) + + test_params["Arguments"]["hyperparameters"] = json.dumps( + test_params["Arguments"]["hyperparameters"] + ) + test_params["Arguments"]["channels"] = json.dumps( + test_params["Arguments"]["channels"] + ) + run_id, status, workflow_json = kfp_client_utils.compile_run_monitor_pipeline( + kfp_client, + experiment_id, + test_params["PipelineDefinition"], + test_params["Arguments"], + download_dir, + test_params["TestName"], + test_params["Timeout"], + ) + + outputs = {"sagemaker-training-job": ["job_name", "model_artifact_url"]} + output_files = minio_utils.artifact_download_iterator( + workflow_json, outputs, download_dir + ) + + # Verify Training job was successful on SageMaker + training_job_name = utils.extract_information( + output_files["sagemaker-training-job"]["job_name"], "job_name.txt" + ) + print(f"training job name: {training_job_name}") + train_response = sagemaker_utils.describe_training_job( + sagemaker_client, training_job_name.decode() + ) + assert train_response["TrainingJobStatus"] == "Completed" + + # Verify model artifacts output was generated from this run + model_artifact_url = utils.extract_information( + output_files["sagemaker-training-job"]["model_artifact_url"], + "model_artifact_url.txt", + ) + print(f"model_artifact_url: {model_artifact_url}") + assert ( + model_artifact_url.decode() + == train_response["ModelArtifacts"]["S3ModelArtifacts"] + ) + assert ( + train_response["ModelArtifacts"]["S3ModelArtifacts"] + in model_artifact_url.decode() + ) diff --git a/components/aws/sagemaker/tests/integration_tests/conftest.py b/components/aws/sagemaker/tests/integration_tests/conftest.py new file mode 100644 index 00000000000..47e6cb9ea40 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/conftest.py @@ -0,0 +1,97 @@ +import pytest +import boto3 +import kfp +import os +import utils + +from datetime import datetime + + +def pytest_addoption(parser): + parser.addoption( + "--region", + default="us-west-2", + required=False, + help="AWS region where test will run", + ) + parser.addoption( + "--role-arn", required=True, help="SageMaker execution IAM role ARN", + ) + parser.addoption( + "--s3-data-bucket", + required=True, + help="Regional S3 bucket name in which test data is hosted", + ) + parser.addoption( + "--minio-service-port", + default="9000", + required=False, + help="Localhost port to which minio service is mapped to", + ) + parser.addoption( + "--kfp-namespace", + default="kubeflow", + required=False, + help="Cluster namespace where kubeflow pipelines is installed", + ) + + +@pytest.fixture(scope="session", autouse=True) +def region(request): + os.environ["AWS_REGION"] = request.config.getoption("--region") + return request.config.getoption("--region") + + +@pytest.fixture(scope="session", autouse=True) +def role_arn(request): + os.environ["ROLE_ARN"] = request.config.getoption("--role-arn") + return request.config.getoption("--role-arn") + + +@pytest.fixture(scope="session", autouse=True) +def s3_data_bucket(request): + os.environ["S3_DATA_BUCKET"] = request.config.getoption("--s3-data-bucket") + return request.config.getoption("--s3-data-bucket") + + +@pytest.fixture(scope="session", autouse=True) +def minio_service_port(request): + os.environ["MINIO_SERVICE_PORT"] = request.config.getoption("--minio-service-port") + return request.config.getoption("--minio-service-port") + + +@pytest.fixture(scope="session", autouse=True) +def kfp_namespace(request): + os.environ["NAMESPACE"] = request.config.getoption("--kfp-namespace") + return request.config.getoption("--kfp-namespace") + + +@pytest.fixture(scope="session") +def boto3_session(region): + return boto3.Session(region_name=region) + + +@pytest.fixture(scope="session") +def sagemaker_client(boto3_session): + return boto3_session.client(service_name="sagemaker") + + +@pytest.fixture(scope="session") +def s3_client(boto3_session): + return boto3_session.client(service_name="s3") + + +@pytest.fixture(scope="session") +def kfp_client(): + kfp_installed_namespace = utils.get_kfp_namespace() + return kfp.Client(namespace=kfp_installed_namespace) + + +@pytest.fixture(scope="session") +def experiment_id(kfp_client): + exp_name = datetime.now().strftime("%Y-%m-%d") + try: + experiment = kfp_client.get_experiment(experiment_name=exp_name) + except ValueError: + experiment = kfp_client.create_experiment(name=exp_name) + return experiment.id diff --git a/components/aws/sagemaker/tests/integration_tests/environment.yml b/components/aws/sagemaker/tests/integration_tests/environment.yml new file mode 100644 index 00000000000..cb72e4b82bb --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/environment.yml @@ -0,0 +1,20 @@ +name: kfp_test_env +channels: + - conda-forge + - defaults +dependencies: + - python 3.7.* + - pip 20.0.* + - awscli 1.18.* + - boto3 1.12.* + - pytest 5.* + - pyyaml 5.3.* + - flake8 3.7.* + - flake8-black 0.1.* + - pip: + - kubernetes==11.0.* + - kfp==0.5.* + - minio==5.0.10 + - sagemaker==1.56.* + + diff --git a/components/aws/sagemaker/tests/integration_tests/pytest.ini b/components/aws/sagemaker/tests/integration_tests/pytest.ini new file mode 100644 index 00000000000..77749dcfd2b --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +addopts = -rA \ No newline at end of file diff --git a/components/aws/sagemaker/tests/integration_tests/resources/config/kmeans-mnist-model/config.yaml b/components/aws/sagemaker/tests/integration_tests/resources/config/kmeans-mnist-model/config.yaml new file mode 100644 index 00000000000..ab854fa936b --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/resources/config/kmeans-mnist-model/config.yaml @@ -0,0 +1,11 @@ +PipelineDefinition: resources/definition/create_model_pipeline.py +TestName: kmeans-create-model-test +Timeout: 300 +Arguments: + region: ((REGION)) + model_name: kmeans-mnist-model + image: ((KMEANS_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/kmeans:1 + model_artifact_url: s3://((DATA_BUCKET))/mnist_kmeans_example/model/kmeans-mnist-model/model.tar.gz + network_isolation: "True" + role: ((ROLE_ARN)) + \ No newline at end of file diff --git a/components/aws/sagemaker/tests/integration_tests/resources/config/simple-mnist-training/config.yaml b/components/aws/sagemaker/tests/integration_tests/resources/config/simple-mnist-training/config.yaml new file mode 100644 index 00000000000..e179d6185db --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/resources/config/simple-mnist-training/config.yaml @@ -0,0 +1,31 @@ +PipelineDefinition: resources/definition/training_pipeline.py +TestName: simple-mnist-training +Timeout: 3600 +Arguments: + region: ((REGION)) + image: ((KMEANS_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/kmeans:1 + training_input_mode: File + hyperparameters: + k: "10" + feature_dim: "784" + channels: + - ChannelName: train + DataSource: + S3DataSource: + S3Uri: s3://((DATA_BUCKET))/mnist_kmeans_example/data + S3DataType: S3Prefix + S3DataDistributionType: FullyReplicated + CompressionType: None + RecordWrapperType: None + InputMode: File + instance_type: ml.p2.xlarge + instance_count: 1 + volume_size: 50 + max_run_time: 3600 + model_artifact_path: s3://((DATA_BUCKET))/mnist_kmeans_example/output + network_isolation: "True" + traffic_encryption: "False" + spot_instance: "False" + max_wait_time: 3600 + checkpoint_config: "{}" + role: ((ROLE_ARN)) diff --git a/components/aws/sagemaker/tests/integration_tests/resources/definition/create_model_pipeline.py b/components/aws/sagemaker/tests/integration_tests/resources/definition/create_model_pipeline.py new file mode 100644 index 00000000000..a7fa0afe057 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/resources/definition/create_model_pipeline.py @@ -0,0 +1,35 @@ +import kfp +from kfp import components +from kfp import dsl +from kfp.aws import use_aws_secret + +sagemaker_model_op = components.load_component_from_file("../../model/component.yaml") + + +@dsl.pipeline( + name="Create Model in SageMaker", description="SageMaker model component test" +) +def create_model_pipeline( + region="", + endpoint_url="", + image="", + model_name="", + model_artifact_url="", + network_isolation="", + role="", +): + sagemaker_model_op( + region=region, + endpoint_url=endpoint_url, + model_name=model_name, + image=image, + model_artifact_url=model_artifact_url, + network_isolation=network_isolation, + role=role, + ).apply(use_aws_secret("aws-secret", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY")) + + +if __name__ == "__main__": + kfp.compiler.Compiler().compile( + create_model_pipeline, "SageMaker_create_model_pipeline" + ".yaml" + ) diff --git a/components/aws/sagemaker/tests/integration_tests/resources/definition/training_pipeline.py b/components/aws/sagemaker/tests/integration_tests/resources/definition/training_pipeline.py new file mode 100644 index 00000000000..e69d103e564 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/resources/definition/training_pipeline.py @@ -0,0 +1,55 @@ +import kfp +from kfp import components +from kfp import dsl +from kfp.aws import use_aws_secret + +sagemaker_train_op = components.load_component_from_file("../../train/component.yaml") + + +@dsl.pipeline(name="SageMaker Training", description="SageMaker training job test") +def training_pipeline( + region="", + endpoint_url="", + image="", + training_input_mode="", + hyperparameters="", + channels="", + instance_type="", + instance_count="", + volume_size="", + max_run_time="", + model_artifact_path="", + output_encryption_key="", + network_isolation="", + traffic_encryption="", + spot_instance="", + max_wait_time="", + checkpoint_config="{}", + role="", +): + sagemaker_train_op( + region=region, + endpoint_url=endpoint_url, + image=image, + training_input_mode=training_input_mode, + hyperparameters=hyperparameters, + channels=channels, + instance_type=instance_type, + instance_count=instance_count, + volume_size=volume_size, + max_run_time=max_run_time, + model_artifact_path=model_artifact_path, + output_encryption_key=output_encryption_key, + network_isolation=network_isolation, + traffic_encryption=traffic_encryption, + spot_instance=spot_instance, + max_wait_time=max_wait_time, + checkpoint_config=checkpoint_config, + role=role, + ).apply(use_aws_secret("aws-secret", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY")) + + +if __name__ == "__main__": + kfp.compiler.Compiler().compile( + training_pipeline, "SageMaker_training_pipeline" + ".yaml" + ) diff --git a/components/aws/sagemaker/tests/integration_tests/utils/__init__.py b/components/aws/sagemaker/tests/integration_tests/utils/__init__.py new file mode 100644 index 00000000000..02ed4792cf4 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/utils/__init__.py @@ -0,0 +1,90 @@ +import os +import subprocess +import pytest +import tarfile +import yaml +import random +import string + +from sagemaker.amazon.amazon_estimator import get_image_uri + + +def get_region(): + return os.environ.get("AWS_REGION") + + +def get_role_arn(): + return os.environ.get("ROLE_ARN") + + +def get_s3_data_bucket(): + return os.environ.get("S3_DATA_BUCKET") + + +def get_minio_service_port(): + return os.environ.get("MINIO_SERVICE_PORT") + + +def get_kfp_namespace(): + return os.environ.get("NAMESPACE") + + +def get_algorithm_image_registry(region, algorithm): + return get_image_uri(region, algorithm).split(".")[0] + + +def run_command(cmd, *popenargs, **kwargs): + if isinstance(cmd, str): + cmd = cmd.split(" ") + try: + print("executing command: {}".format(" ".join(cmd))) + return subprocess.check_output( + cmd, *popenargs, stderr=subprocess.STDOUT, **kwargs + ) + except subprocess.CalledProcessError as e: + pytest.fail(f"Command failed. Error code: {e.returncode}, Log: {e.output}") + + +def extract_information(file_path, file_name): + with tarfile.open(file_path).extractfile(file_name) as f: + return f.read() + + +def replace_placeholders(input_filename, output_filename): + region = get_region() + variables_to_replace = { + "((REGION))": region, + "((ROLE_ARN))": get_role_arn(), + "((DATA_BUCKET))": get_s3_data_bucket(), + "((KMEANS_REGISTRY))": get_algorithm_image_registry(region, "kmeans"), + } + + filedata = "" + with open(input_filename, "r") as f: + filedata = f.read() + for replace_key, replace_value in variables_to_replace.items(): + filedata = filedata.replace(replace_key, replace_value) + + with open(output_filename, "w") as f: + f.write(filedata) + return output_filename + + +def load_params(file_name): + with open(file_name, "r") as f: + return yaml.safe_load(f) + + +def generate_random_string(length): + """Generate a random string with twice the length of input parameter""" + assert isinstance(length, int) + return "".join( + [random.choice(string.ascii_lowercase) for n in range(length)] + + [random.choice(string.digits) for n in range(length)] + ) + + +def mkdir(directory_path): + if not os.path.exists(directory_path): + os.makedirs(directory_path) + return directory_path diff --git a/components/aws/sagemaker/tests/integration_tests/utils/argo_utils.py b/components/aws/sagemaker/tests/integration_tests/utils/argo_utils.py new file mode 100644 index 00000000000..874cbf4d11b --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/utils/argo_utils.py @@ -0,0 +1,8 @@ +import utils + + +def print_workflow_logs(workflow_name): + output = utils.run_command( + f"argo logs {workflow_name} -n {utils.get_kfp_namespace()}" + ) + print(f"workflow logs:\n", output.decode()) diff --git a/components/aws/sagemaker/tests/integration_tests/utils/kfp_client_utils.py b/components/aws/sagemaker/tests/integration_tests/utils/kfp_client_utils.py new file mode 100644 index 00000000000..8ed6633ab17 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/utils/kfp_client_utils.py @@ -0,0 +1,67 @@ +import os +import utils +import pytest + +from utils import argo_utils + + +def compile_and_run_pipeline( + client, + experiment_id, + pipeline_definition, + input_params, + output_file_dir, + pipeline_name, +): + + env_value = os.environ.copy() + env_value["PYTHONPATH"] = f"{os.getcwd()}:" + os.environ.get("PYTHONPATH", "") + pipeline_path = os.path.join(output_file_dir, pipeline_name) + utils.run_command( + f"dsl-compile --py {pipeline_definition} --output {pipeline_path}.yaml", + env=env_value, + ) + run = client.run_pipeline( + experiment_id, pipeline_name, f"{pipeline_path}.yaml", input_params + ) + return run.id + + +def wait_for_job_completion(client, run_id, timeout): + response = client.wait_for_run_completion(run_id, timeout) + status = response.run.status.lower() == "succeeded" + return status + + +def get_workflow_json(client, run_id): + # API not in readthedocs + # Refer: https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/_client.py#L663 + return client._get_workflow_json(run_id) + + +def compile_run_monitor_pipeline( + client, + experiment_id, + pipeline_definition, + input_params, + output_file_dir, + pipeline_name, + timeout, + check=True, +): + run_id = compile_and_run_pipeline( + client, + experiment_id, + pipeline_definition, + input_params, + output_file_dir, + pipeline_name, + ) + status = wait_for_job_completion(client, run_id, timeout) + workflow_json = get_workflow_json(client, run_id) + + if check and not status: + argo_utils.print_workflow_logs(workflow_json["metadata"]["name"]) + pytest.fail(f"Test Failed: {pipeline_name}") + + return run_id, status, workflow_json diff --git a/components/aws/sagemaker/tests/integration_tests/utils/minio_utils.py b/components/aws/sagemaker/tests/integration_tests/utils/minio_utils.py new file mode 100644 index 00000000000..8bde5ac7f19 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/utils/minio_utils.py @@ -0,0 +1,39 @@ +import utils +import os + +from minio import Minio + + +def get_artifact_in_minio(workflow_json, step_name, artifact_name, output_dir): + s3_data = {} + minio_access_key = "minio" + minio_secret_key = "minio123" + minio_port = utils.get_minio_service_port() + for node in workflow_json["status"]["nodes"].values(): + if step_name in node["name"]: + for artifact in node["outputs"]["artifacts"]: + if artifact["name"] == artifact_name: + s3_data = artifact["s3"] + minio_client = Minio( + "localhost:{}".format(minio_port), + access_key=minio_access_key, + secret_key=minio_secret_key, + secure=False, + ) + output_file = os.path.join(output_dir, artifact_name + ".tgz") + minio_client.fget_object(s3_data["bucket"], s3_data["key"], output_file) + # https://docs.min.io/docs/python-client-api-reference.html#fget_object + + return output_file + + +def artifact_download_iterator(workflow_json, outputs_dict, output_dir): + output_files = {} + for step_name, artifacts in outputs_dict.items(): + output_files[step_name] = {} + for artifact in artifacts: + output_files[step_name][artifact] = get_artifact_in_minio( + workflow_json, step_name, step_name + "-" + artifact, output_dir + ) + + return output_files diff --git a/components/aws/sagemaker/tests/integration_tests/utils/sagemaker_utils.py b/components/aws/sagemaker/tests/integration_tests/utils/sagemaker_utils.py new file mode 100644 index 00000000000..9f47db68ada --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/utils/sagemaker_utils.py @@ -0,0 +1,6 @@ +def describe_training_job(client, training_job_name): + return client.describe_training_job(TrainingJobName=training_job_name) + + +def describe_model(client, model_name): + return client.describe_model(ModelName=model_name)