Skip to content

Commit

Permalink
AWS Sagemaker Components - enhance integration test coverage (#3720)
Browse files Browse the repository at this point in the history
* AWS Sagemaker Components - enhance integration test coverage
	- Add tests for create endpoint, hpo job and batch transform
	- Minor bug fixes and documentation

* rev2: Address comments and clean up generated artifacts

* rev3: address more comments

* rev4: add canary test marker

* Trigger Build
  • Loading branch information
surajkota authored May 15, 2020
1 parent 92a0d11 commit bff8392
Show file tree
Hide file tree
Showing 20 changed files with 709 additions and 39 deletions.
2 changes: 1 addition & 1 deletion components/aws/sagemaker/tests/integration_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Change the bucket name and run the python script `[s3_sample_data_creator.py](ht
AWS_SECRET_ACCESS_KEY: YOUR_BASE64_SECRET_ACCESS
```
> Note: To get base64 string, try `echo -n $AWS_ACCESS_KEY_ID | base64`
> Note: To get base64 string, run `echo -n $AWS_ACCESS_KEY_ID | base64`
1. Create conda environment using environment.yml for running tests. Run `conda env create -f environment.yml`
1. Activate the conda environment `conda activate kfp_test_env`
1. Run port-forward to minio service in background. Example: `kubectl port-forward svc/minio-service 9000:9000 -n kubeflow &`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import utils
import os
import pytest

from utils import kfp_client_utils
from utils import minio_utils
from utils import sagemaker_utils
from utils import s3_utils


@pytest.mark.parametrize(
"test_file_dir",
[
pytest.param(
"resources/config/kmeans-mnist-batch-transform",
marks=pytest.mark.canary_test,
)
],
)
def test_transform_job(
kfp_client,
experiment_id,
s3_client,
sagemaker_client,
s3_data_bucket,
test_file_dir,
):

download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated"))
test_params = utils.load_params(
utils.replace_placeholders(
os.path.join(test_file_dir, "config.yaml"),
os.path.join(download_dir, "config.yaml"),
)
)

# Generate random prefix for model, job name to avoid errors if resources with same name exists
test_params["Arguments"]["model_name"] = test_params["Arguments"][
"job_name"
] = input_job_name = (
utils.generate_random_string(5) + "-" + test_params["Arguments"]["model_name"]
)
print(f"running test with model/job name: {input_job_name}")

# Generate unique location for output since output filename is generated according to the content_type
test_params["Arguments"]["output_location"] = os.path.join(
test_params["Arguments"]["output_location"], input_job_name
)

_, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
test_params["PipelineDefinition"],
test_params["Arguments"],
download_dir,
test_params["TestName"],
test_params["Timeout"],
)

outputs = {"sagemaker-batch-transformation": ["output_location"]}

output_files = minio_utils.artifact_download_iterator(
workflow_json, outputs, download_dir
)

# Verify Job was successful on SageMaker
response = sagemaker_utils.describe_transform_job(sagemaker_client, input_job_name)
assert response["TransformJobStatus"] == "Completed"
assert response["TransformJobName"] == input_job_name

# Verify output location from pipeline matches job output and that the transformed file exists
output_location = utils.read_from_file_in_tar(
output_files["sagemaker-batch-transformation"]["output_location"], "data",
)
print(f"output location: {output_location}")
assert output_location == response["TransformOutput"]["S3OutputPath"]
# Get relative path of file in S3 bucket
# URI is following format s3://<bucket_name>/relative/path/to/file
# split below is to extract the part after bucket name
file_key = os.path.join(
"/".join(output_location.split("/")[3:]), test_params["ExpectedOutputFile"]
)
assert s3_utils.check_object_exists(s3_client, s3_data_bucket, file_key)

utils.remove_dir(download_dir)
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import pytest
import os
import utils
import io
import numpy
import json
import pickle
import gzip

from utils import kfp_client_utils
from utils import minio_utils
from utils import sagemaker_utils


def run_predict_mnist(boto3_session, endpoint_name, download_dir):
""" https://github.com/awslabs/amazon-sagemaker-examples/blob/a8c20eeb72dc7d3e94aaaf28be5bf7d7cd5695cb
/sagemaker-python-sdk/1P_kmeans_lowlevel/kmeans_mnist_lowlevel.ipynb """
# Download and load dataset
region = boto3_session.region_name
download_path = os.path.join(download_dir, "mnist.pkl.gz")
boto3_session.resource("s3", region_name=region).Bucket(
"sagemaker-sample-data-{}".format(region)
).download_file("algorithms/kmeans/mnist/mnist.pkl.gz", download_path)
with gzip.open(download_path, "rb") as f:
train_set, valid_set, test_set = pickle.load(f, encoding="latin1")

# Function to create a csv from numpy array
def np2csv(arr):
csv = io.BytesIO()
numpy.savetxt(csv, arr, delimiter=",", fmt="%g")
return csv.getvalue().decode().rstrip()

# Run prediction on an image
runtime = boto3_session.client("sagemaker-runtime")
payload = np2csv(train_set[0][30:31])

response = runtime.invoke_endpoint(
EndpointName=endpoint_name, ContentType="text/csv", Body=payload,
)
return json.loads(response["Body"].read().decode())


@pytest.mark.parametrize(
"test_file_dir",
[
pytest.param(
"resources/config/kmeans-mnist-endpoint", marks=pytest.mark.canary_test
)
],
)
def test_create_endpoint(
kfp_client, experiment_id, boto3_session, sagemaker_client, test_file_dir
):

download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated"))
test_params = utils.load_params(
utils.replace_placeholders(
os.path.join(test_file_dir, "config.yaml"),
os.path.join(download_dir, "config.yaml"),
)
)

# Generate random prefix for model, endpoint config and endpoint name
# to avoid errors if resources with same name exists
test_params["Arguments"]["model_name"] = test_params["Arguments"][
"endpoint_config_name"
] = test_params["Arguments"]["endpoint_name"] = input_endpoint_name = (
utils.generate_random_string(5) + "-" + test_params["Arguments"]["model_name"]
)
print(f"running test with model/endpoint name: {input_endpoint_name}")

_, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
test_params["PipelineDefinition"],
test_params["Arguments"],
download_dir,
test_params["TestName"],
test_params["Timeout"],
)

try:
outputs = {"sagemaker-deploy-model": ["endpoint_name"]}

output_files = minio_utils.artifact_download_iterator(
workflow_json, outputs, download_dir
)

output_endpoint_name = utils.read_from_file_in_tar(
output_files["sagemaker-deploy-model"]["endpoint_name"], "endpoint_name.txt"
)
print(f"endpoint name: {output_endpoint_name}")

# Verify output from pipeline is endpoint name
assert output_endpoint_name == input_endpoint_name

# Verify endpoint is running
assert (
sagemaker_utils.describe_endpoint(sagemaker_client, input_endpoint_name)[
"EndpointStatus"
]
== "InService"
)

# Validate the model for use by running a prediction
result = run_predict_mnist(boto3_session, input_endpoint_name, download_dir)
print(f"prediction result: {result}")
assert json.dumps(result, sort_keys=True) == json.dumps(
test_params["ExpectedPrediction"], sort_keys=True
)
utils.remove_dir(download_dir)
finally:
# delete endpoint
sagemaker_utils.delete_endpoint(sagemaker_client, input_endpoint_name)
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import pytest
import os
import json
import utils

from utils import kfp_client_utils
from utils import minio_utils
from utils import sagemaker_utils


@pytest.mark.parametrize(
"test_file_dir",
[pytest.param("resources/config/kmeans-mnist-hpo", marks=pytest.mark.canary_test)],
)
def test_hyperparameter_tuning(
kfp_client, experiment_id, region, sagemaker_client, test_file_dir
):

download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated"))
test_params = utils.load_params(
utils.replace_placeholders(
os.path.join(test_file_dir, "config.yaml"),
os.path.join(download_dir, "config.yaml"),
)
)

test_params["Arguments"]["channels"] = json.dumps(
test_params["Arguments"]["channels"]
)
test_params["Arguments"]["static_parameters"] = json.dumps(
test_params["Arguments"]["static_parameters"]
)
test_params["Arguments"]["integer_parameters"] = json.dumps(
test_params["Arguments"]["integer_parameters"]
)
test_params["Arguments"]["categorical_parameters"] = json.dumps(
test_params["Arguments"]["categorical_parameters"]
)

_, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
test_params["PipelineDefinition"],
test_params["Arguments"],
download_dir,
test_params["TestName"],
test_params["Timeout"],
)

outputs = {
"sagemaker-hyperparameter-tuning": [
"best_hyperparameters",
"best_job_name",
"hpo_job_name",
"model_artifact_url",
"training_image",
]
}
output_files = minio_utils.artifact_download_iterator(
workflow_json, outputs, download_dir
)

# Verify HPO job was successful on SageMaker
hpo_job_name = utils.read_from_file_in_tar(
output_files["sagemaker-hyperparameter-tuning"]["hpo_job_name"],
"hpo_job_name.txt",
)
print(f"HPO job name: {hpo_job_name}")
hpo_response = sagemaker_utils.describe_hpo_job(sagemaker_client, hpo_job_name)
assert hpo_response["HyperParameterTuningJobStatus"] == "Completed"

# Verify training image output is an ECR image
training_image = utils.read_from_file_in_tar(
output_files["sagemaker-hyperparameter-tuning"]["training_image"],
"training_image.txt",
)
print(f"Training image used: {training_image}")
if "ExpectedTrainingImage" in test_params.keys():
assert test_params["ExpectedTrainingImage"] == training_image
else:
assert f"dkr.ecr.{region}.amazonaws.com" in training_image

# Verify Training job was part of HPO job, returned as best and was successful
best_training_job_name = utils.read_from_file_in_tar(
output_files["sagemaker-hyperparameter-tuning"]["best_job_name"],
"best_job_name.txt",
)
print(f"best training job name: {best_training_job_name}")
train_response = sagemaker_utils.describe_training_job(
sagemaker_client, best_training_job_name
)
assert train_response["TuningJobArn"] == hpo_response["HyperParameterTuningJobArn"]
assert (
train_response["TrainingJobName"]
== hpo_response["BestTrainingJob"]["TrainingJobName"]
)
assert train_response["TrainingJobStatus"] == "Completed"

# Verify model artifacts output was generated from this run
model_artifact_url = utils.read_from_file_in_tar(
output_files["sagemaker-hyperparameter-tuning"]["model_artifact_url"],
"model_artifact_url.txt",
)
print(f"model_artifact_url: {model_artifact_url}")
assert model_artifact_url == train_response["ModelArtifacts"]["S3ModelArtifacts"]
assert best_training_job_name in model_artifact_url

# Verify hyper_parameters output is not empty
hyper_parameters = json.loads(
utils.read_from_file_in_tar(
output_files["sagemaker-hyperparameter-tuning"]["best_hyperparameters"],
"best_hyperparameters.txt",
)
)
print(f"HPO best hyperparameters: {json.dumps(hyper_parameters, indent = 2)}")
assert hyper_parameters is not None

utils.remove_dir(download_dir)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@
from utils import sagemaker_utils


@pytest.mark.parametrize("test_file_dir", ["resources/config/kmeans-mnist-model"])
@pytest.mark.parametrize(
"test_file_dir",
[
pytest.param(
"resources/config/kmeans-mnist-model", marks=pytest.mark.canary_test
)
],
)
def test_createmodel(kfp_client, experiment_id, sagemaker_client, test_file_dir):

download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated"))
Expand All @@ -22,8 +29,9 @@ def test_createmodel(kfp_client, experiment_id, sagemaker_client, test_file_dir)
test_params["Arguments"]["model_name"] = input_model_name = (
utils.generate_random_string(5) + "-" + test_params["Arguments"]["model_name"]
)
print(f"running test with model_name: {input_model_name}")

run_id, status, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
_, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
test_params["PipelineDefinition"],
Expand All @@ -39,11 +47,13 @@ def test_createmodel(kfp_client, experiment_id, sagemaker_client, test_file_dir)
workflow_json, outputs, download_dir
)

output_model_name = utils.extract_information(
output_model_name = utils.read_from_file_in_tar(
output_files["sagemaker-create-model"]["model_name"], "model_name.txt"
)
print(f"model_name: {output_model_name.decode()}")
assert output_model_name.decode() == input_model_name
print(f"model_name: {output_model_name}")
assert output_model_name == input_model_name
assert (
sagemaker_utils.describe_model(sagemaker_client, input_model_name) is not None
)

utils.remove_dir(download_dir)
Loading

0 comments on commit bff8392

Please sign in to comment.