Skip to content

Commit

Permalink
Add working FSx setup and test
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicholas Thomson committed May 27, 2020
1 parent 695573d commit 2bffbfe
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,78 @@ def test_trainingjob(
assert f"dkr.ecr.{region}.amazonaws.com" in training_image

utils.remove_dir(download_dir)

# @pytest.mark.skipif(utils.get_fsx_id() is None, reason="Skipping FSx Test")
@pytest.mark.parametrize(
"test_file_dir",
[
pytest.param(
"resources/config/fsx-mnist-training", marks=pytest.mark.canary_test
)
],
)
def test_fsx_trainingjob(
kfp_client, experiment_id, region, fsx_subnet, fsx_security_group, fsx_id, sagemaker_client, test_file_dir
):

download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated"))
test_params = utils.load_params(
utils.replace_placeholders(
os.path.join(test_file_dir, "config.yaml"),
os.path.join(download_dir, "config.yaml"),
)
)

test_params["Arguments"]["hyperparameters"] = json.dumps(
test_params["Arguments"]["hyperparameters"]
)
test_params["Arguments"]["channels"] = json.dumps(
test_params["Arguments"]["channels"]
)
_, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
test_params["PipelineDefinition"],
test_params["Arguments"],
download_dir,
test_params["TestName"],
test_params["Timeout"],
)

outputs = {
"sagemaker-training-job": ["job_name", "model_artifact_url", "training_image"]
}
output_files = minio_utils.artifact_download_iterator(
workflow_json, outputs, download_dir
)

# Verify Training job was successful on SageMaker
training_job_name = utils.read_from_file_in_tar(
output_files["sagemaker-training-job"]["job_name"], "job_name.txt"
)
print(f"training job name: {training_job_name}")
train_response = sagemaker_utils.describe_training_job(
sagemaker_client, training_job_name
)
assert train_response["TrainingJobStatus"] == "Completed"

# Verify model artifacts output was generated from this run
model_artifact_url = utils.read_from_file_in_tar(
output_files["sagemaker-training-job"]["model_artifact_url"],
"model_artifact_url.txt",
)
print(f"model_artifact_url: {model_artifact_url}")
assert model_artifact_url == train_response["ModelArtifacts"]["S3ModelArtifacts"]
assert training_job_name in model_artifact_url

# Verify training image output is an ECR image
training_image = utils.read_from_file_in_tar(
output_files["sagemaker-training-job"]["training_image"], "training_image.txt",
)
print(f"Training image used: {training_image}")
if "ExpectedTrainingImage" in test_params.keys():
assert test_params["ExpectedTrainingImage"] == training_image
else:
assert f"dkr.ecr.{region}.amazonaws.com" in training_image

utils.remove_dir(download_dir)
29 changes: 29 additions & 0 deletions components/aws/sagemaker/tests/integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ def pytest_addoption(parser):
required=False,
help="Cluster namespace where kubeflow pipelines is installed",
)
parser.addoption(
"--fsx-subnet",
required=False,
help="The subnet in which FSx is installed"
)
parser.addoption(
"--fsx-security-group",
required=False,
help="The security group SageMaker should use when running the FSx test"
)
parser.addoption(
"--fsx-id",
required=False,
help="The file system ID of the FSx instance"
)


@pytest.fixture(scope="session", autouse=True)
Expand Down Expand Up @@ -66,6 +81,20 @@ def kfp_namespace(request):
os.environ["NAMESPACE"] = request.config.getoption("--kfp-namespace")
return request.config.getoption("--kfp-namespace")

@pytest.fixture(scope="session", autouse=True)
def fsx_subnet(request):
os.environ["FSX_SUBNET"] = request.config.getoption("--fsx-subnet")
return request.config.getoption("--fsx-subnet")

@pytest.fixture(scope="session", autouse=True)
def fsx_security_group(request):
os.environ["FSX_SECURITY_GROUP"] = request.config.getoption("--fsx-security-group")
return request.config.getoption("--fsx-security-group")

@pytest.fixture(scope="session", autouse=True)
def fsx_id(request):
os.environ["FSX_ID"] = request.config.getoption("--fsx-id")
return request.config.getoption("--fsx-id")

@pytest.fixture(scope="session")
def boto3_session(region):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
PipelineDefinition: resources/definition/training_pipeline.py
TestName: fsx-mnist-training
Timeout: 3600
ExpectedTrainingImage: ((KMEANS_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/kmeans:1
Arguments:
region: ((REGION))
image: ((KMEANS_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/kmeans:1
training_input_mode: File
hyperparameters:
k: "10"
feature_dim: "784"
channels:
- ChannelName: train
DataSource:
FileSystemDataSource:
FileSystemType: FSxLustre
FileSystemAccessMode: ro
FileSystemId: ((FSX_ID))
DirectoryPath: /fsx/mnist_kmeans_example/input
CompressionType: None
ContentType: text/csv;label_size=0
RecordWrapperType: None
InputMode: File
vpc_security_group_ids: ((FSX_SECURITY_GROUP))
vpc_subnets: ((FSX_SUBNET))
instance_type: ml.m5.xlarge
instance_count: 1
volume_size: 50
max_run_time: 3600
model_artifact_path: s3://((DATA_BUCKET))/mnist_kmeans_example/output
network_isolation: "True"
traffic_encryption: "False"
spot_instance: "False"
max_wait_time: 3600
checkpoint_config: "{}"
role: ((ROLE_ARN))
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def training_pipeline(
spot_instance="",
max_wait_time="",
checkpoint_config="{}",
vpc_security_group_ids="",
vpc_subnets="",
role="",
):
sagemaker_train_op(
Expand All @@ -45,6 +47,8 @@ def training_pipeline(
spot_instance=spot_instance,
max_wait_time=max_wait_time,
checkpoint_config=checkpoint_config,
vpc_security_group_ids=vpc_security_group_ids,
vpc_subnets=vpc_subnets,
role=role,
)

Expand Down
74 changes: 74 additions & 0 deletions components/aws/sagemaker/tests/integration_tests/scripts/fsx_setup
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env bash

# Helper script that provides a set of methods to configure VPC, EFS and FSx
# ready for the full suite of integration tests.

function create_fsx_security_group() {
echo "[Creating FSx Security Group] Creating security group"

IFS=',' read -r -a subnets_list <<< "$EKS_PRIVATE_SUBNETS"
local vpc_id="$(aws ec2 describe-subnets --subnet-ids "${subnets_list[0]}" \
--output text --query "Subnets[0].VpcId" --region ${REGION})"

local fsx_security_group="${DEPLOY_NAME}-fsx-sg"
FSX_SECURITY_GROUP_ID="$(aws ec2 create-security-group --region "${REGION}" \
--vpc-id ${vpc_id} \
--description "Security group for FSx in ${DEPLOY_NAME}" \
--group-name "${fsx_security_group}" --output text --query "GroupId")"

# Open FSx port to internal security group
aws ec2 authorize-security-group-ingress \
--region "${REGION}" --group-id "${FSX_SECURITY_GROUP_ID}" \
--protocol tcp --port 988 --source-group "${FSX_SECURITY_GROUP_ID}"

echo "[Creating FSx Security Group] Created security group ${FSX_SECURITY_GROUP_ID}"
}

function cleanup_fsx_security_group() {
if [ ! -z "${FSX_SECURITY_GROUP_ID}" ]; then
aws ec2 delete-security-group --group-id "${FSX_SECURITY_GROUP_ID}" --region "${REGION}"
fi
}

# Creates a new FSX LUSTRE instance and automatically imports the data set from S3.
function create_fsx_instance() {
echo "[Creating FSx] Creating file system"
IFS=',' read -r -a subnets_list <<< "$EKS_PRIVATE_SUBNETS"

local fs_id=$(aws fsx create-file-system \
--file-system-type LUSTRE \
--lustre-configuration ImportPath=s3://${S3_DATA_BUCKET}/mnist_kmeans_example \
--storage-capacity 1200 \
--subnet-ids "${subnets_list[0]}" \
--security-group-ids "${FSX_SECURITY_GROUP_ID}" \
--tags Key="Name",Value=fsx-integ-lustre \
--region "${REGION}" \
--output text \
--query "FileSystem.FileSystemId")

echo "[Creating FSx] Waiting for file system to be in state AVAILABLE"

local fs_status="CREATING"
until [[ "${fs_status}" != "CREATING" ]]; do
fs_status="$(aws fsx describe-file-systems --region "${REGION}" --file-system-id ${fs_id} --output text --query "FileSystems[0].Lifecycle")"
sleep 10
done
aws fsx --region "${REGION}" describe-file-systems --file-system-id ${fs_id}

if [[ "${fs_status}" != "AVAILABLE" ]]; then
echo "[Creating FSx] FSx cluster never reached state 'Available'"
return 1
fi

echo "[Creating FSx] File system now available"

FSX_ID="${fs_id}"

return 0
}

function delete_fsx_instance() {
if [ ! -z "${FSX_ID}" ]; then
aws fsx delete-file-system --file-system-id "${FSX_ID}" --region "${REGION}"
fi
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ usage(){
}

cwd=$(dirname $(realpath $0))
source "$cwd"/fsx_setup

### Input parameters
DEPLOY_NAME="sagemaker-kfp-"$(date '+%Y-%m-%d-%H-%M-%S')"" # The name given to the entire deployment (tagging all resources)
Expand All @@ -30,6 +31,8 @@ PYTEST_MARKER=${PYTEST_MARKER:-""}
S3_DATA_BUCKET=${S3_DATA_BUCKET:-""}
SAGEMAKER_EXECUTION_ROLE_ARN=${SAGEMAKER_EXECUTION_ROLE_ARN:-""}

SKIP_FSX_TESTS=${SKIP_FSX_TESTS:-"false"}

while getopts ":n:r:s:" opt; do
case $opt in
n)
Expand Down Expand Up @@ -65,6 +68,12 @@ if [ "$S3_DATA_BUCKET" == "" ]; then
exit 1
fi

if [[ "$SKIP_FSX_TESTS" == "false" && "$EKS_PRIVATE_SUBNETS" == "" ]]; then
echo "Missing EKS private subnets"
usage
exit 1
fi

function cleanup() {
set +e

Expand Down Expand Up @@ -143,18 +152,38 @@ function delete_generated_role() {
}

function cleanup_kfp() {
if [[ "${SKIP_FSX_TESTS}" == "false" ]]; then
delete_fsx_instance
# Sleep in order for the security group to detach before attempting to delete it
sleep 5s
cleanup_fsx_security_group
fi

# Clean up Minio
if [ ! -z "${MINIO_PID}" ]; then
kill -9 $MINIO_PID || true
fi
}

if [[ -z "${EKS_EXISTING_CLUSTER}" ]]; then
launch_eks
# Launch all of these in parallel to reduce start-up time
launch_eks &

if [[ "${SKIP_FSX_TESTS}" == "false" ]]; then
create_fsx_security_group
create_fsx_instance &
fi

wait
else
aws eks update-kubeconfig --name "${EKS_EXISTING_CLUSTER}" --region "$REGION"
EKS_CLUSTER_NAME="${EKS_EXISTING_CLUSTER}"
DEPLOY_NAME="${EKS_EXISTING_CLUSTER}"

if [[ "${SKIP_FSX_TESTS}" == "false" ]]; then
create_fsx_security_group
create_fsx_instance
fi
fi

generate_iam_role_name
Expand All @@ -163,6 +192,16 @@ install_kfp
install_generated_role

pytest_args=( --region "${REGION}" --role-arn "${SAGEMAKER_EXECUTION_ROLE_ARN}" --s3-data-bucket "${S3_DATA_BUCKET}" --minio-service-port "${MINIO_LOCAL_PORT}" --kfp-namespace "${KFP_NAMESPACE}" )

if [[ "${SKIP_FSX_TESTS}" == "false" ]]; then
# Get the VPC arguments for the FSx test
IFS=',' read -r -a private_subnets <<< "$EKS_PRIVATE_SUBNETS"
pytest_args+=( --fsx-subnet "${private_subnets[0]}" --fsx-security-group "${FSX_SECURITY_GROUP_ID}" --fsx-id "${FSX_ID}" )
fi

[ ! -z "${PYTEST_MARKER}" ] && pytest_args+=( -m "${PYTEST_MARKER}" )

# TODO: DELETE BEFORE MERGING PR
pytest_args+=( -k 'test_fsx_trainingjob' )

cd tests/integration_tests && python -m pytest "${pytest_args[@]}" --junitxml ./integration_tests.log -n $(nproc)
15 changes: 15 additions & 0 deletions components/aws/sagemaker/tests/integration_tests/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ def get_kfp_namespace():
return os.environ.get("NAMESPACE")


def get_fsx_subnet():
return os.environ.get("FSX_SUBNET")


def get_fsx_security_group():
return os.environ.get("FSX_SECURITY_GROUP")


def get_fsx_id():
return os.environ.get("FSX_ID")


def get_algorithm_image_registry(region, algorithm):
return get_image_uri(region, algorithm).split(".")[0]

Expand Down Expand Up @@ -61,6 +73,9 @@ def replace_placeholders(input_filename, output_filename):
"((ROLE_ARN))": get_role_arn(),
"((DATA_BUCKET))": get_s3_data_bucket(),
"((KMEANS_REGISTRY))": get_algorithm_image_registry(region, "kmeans"),
"((FSX_ID))": get_fsx_id(),
"((FSX_SUBNET))": get_fsx_subnet(),
"((FSX_SECURITY_GROUP))": get_fsx_security_group(),
}

filedata = ""
Expand Down

0 comments on commit 2bffbfe

Please sign in to comment.