Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AWS SageMaker] Specify component input types #3683

Merged
merged 26 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/aws/sagemaker/THIRD-PARTY-LICENSES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
** Amazon SageMaker Components for Kubeflow Pipelines; version 0.3.0 --
** Amazon SageMaker Components for Kubeflow Pipelines; version 0.3.1 --
https://github.com/kubeflow/pipelines/tree/master/components/aws/sagemaker
Copyright 2019-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
** boto3; version 1.12.33 -- https://github.com/boto/boto3/
Expand Down
28 changes: 26 additions & 2 deletions components/aws/sagemaker/batch_transform/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,78 +4,102 @@ description: |
inputs:
- name: region
description: 'The region where the cluster launches.'
type: String
RedbackThomson marked this conversation as resolved.
Show resolved Hide resolved
- name: job_name
description: 'The name of the batch transform job.'
default: ''
type: String
- name: model_name
description: 'The name of the model that you want to use for the transform job.'
type: String
- name: max_concurrent
description: 'The maximum number of parallel requests that can be sent to each instance in a transform job.'
default: '0'
type: Integer
- name: max_payload
description: 'The maximum allowed size of the payload, in MB.'
default: '6'
type: Integer
- name: batch_strategy
description: 'The number of records to include in a mini-batch for an HTTP inference request.'
default: ''
type: String
- name: environment
description: 'The environment variables to set in the Docker container. Up to 16 key-value entries in the map.'
default: '{}'
type: JSON
- name: input_location
description: 'The S3 location of the data source that is associated with a channel.'
type: String
- name: data_type
description: 'Data type of the input. Can be ManifestFile, S3Prefix, or AugmentedManifestFile.'
default: 'S3Prefix'
type: String
- name: content_type
description: 'The multipurpose internet mail extension (MIME) type of the data.'
default: ''
type: String
- name: split_type
description: 'The method to use to split the transform job data files into smaller batches.'
default: 'None'
type: String
- name: compression_type
description: 'If the transform data is compressed, the specification of the compression type.'
default: 'None'
type: String
- name: output_location
description: 'The Amazon S3 path where you want Amazon SageMaker to store the results of the transform job.'
type: String
- name: accept
description: 'The MIME type used to specify the output data.'
default: ''
type: String
- name: assemble_with
description: 'Defines how to assemble the results of the transform job as a single S3 object. Either None or Line.'
default: ''
type: String
- name: output_encryption_key
description: 'The AWS Key Management Service ID of the key used to encrypt the output data.'
default: ''
type: String
- name: input_filter
description: 'A JSONPath expression used to select a portion of the input data to pass to the algorithm.'
default: ''
type: String
- name: output_filter
description: 'A JSONPath expression used to select a portion of the joined dataset to save in the output file for a batch transform job.'
default: ''
type: String
- name: join_source
description: 'Specifies the source of the data to join with the transformed data.'
default: 'None'
type: String
- name: instance_type
description: 'The ML compute instance type.'
default: 'ml.m4.xlarge'
type: String
- name: instance_count
description: 'The number of ML compute instances to use in each training job.'
default: '1'
type: Integer
- name: resource_encryption_key
description: 'The AWS KMS key that Amazon SageMaker uses to encrypt data on the storage volume attached to the ML compute instance(s).'
default: ''
type: String
- name: endpoint_url
description: 'The endpoint URL for the private link VPC endpoint.'
default: ''
type: String
- name: tags
description: 'Key-value pairs to categorize AWS resources.'
default: '{}'
type: JSON
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JFYI: The type name currently supported by the SDK is "JsonObject" or "Json". With those type names a dictionary argument will be converted to JSON string automatically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a "JsonArray" type as well? Or would that be covered under "Json"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I found the list of types. Great. Thanks

outputs:
- {name: output_location, description: 'S3 URI of the transform job results.'}
implementation:
container:
image: amazon/aws-sagemaker-kfp-components:0.3.0
command: ['python']
image: amazon/aws-sagemaker-kfp-components:0.3.1
command: ['python3']
args: [
batch_transform.py,
--region, {inputValue: region},
Expand Down
46 changes: 23 additions & 23 deletions components/aws/sagemaker/batch_transform/src/batch_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,31 @@ def create_parser():
parser = argparse.ArgumentParser(description='SageMaker Batch Transformation Job')
_utils.add_default_client_arguments(parser)

parser.add_argument('--job_name', type=str.strip, required=False, help='The name of the transform job.', default='')
parser.add_argument('--model_name', type=str.strip, required=True, help='The name of the model that you want to use for the transform job.')
parser.add_argument('--max_concurrent', type=_utils.str_to_int, required=False, help='The maximum number of parallel requests that can be sent to each instance in a transform job.', default='0')
parser.add_argument('--max_payload', type=_utils.str_to_int, required=False, help='The maximum allowed size of the payload, in MB.', default='6')
parser.add_argument('--batch_strategy', choices=['MultiRecord', 'SingleRecord', ''], type=str.strip, required=False, help='The number of records to include in a mini-batch for an HTTP inference request.', default='')
parser.add_argument('--environment', type=_utils.str_to_json_dict, required=False, help='The dictionary of the environment variables to set in the Docker container. Up to 16 key-value entries in the map.', default='{}')
parser.add_argument('--input_location', type=str.strip, required=True, help='The S3 location of the data source that is associated with a channel.')
parser.add_argument('--data_type', choices=['ManifestFile', 'S3Prefix', 'AugmentedManifestFile', ''], type=str.strip, required=False, help='Data type of the input. Can be ManifestFile, S3Prefix, or AugmentedManifestFile.', default='S3Prefix')
parser.add_argument('--content_type', type=str.strip, required=False, help='The multipurpose internet mail extension (MIME) type of the data.', default='')
parser.add_argument('--split_type', choices=['None', 'Line', 'RecordIO', 'TFRecord', ''], type=str.strip, required=False, help='The method to use to split the transform job data files into smaller batches.', default='None')
parser.add_argument('--compression_type', choices=['None', 'Gzip', ''], type=str.strip, required=False, help='If the transform data is compressed, the specification of the compression type.', default='None')
parser.add_argument('--output_location', type=str.strip, required=True, help='The Amazon S3 path where you want Amazon SageMaker to store the results of the transform job.')
parser.add_argument('--accept', type=str.strip, required=False, help='The MIME type used to specify the output data.')
parser.add_argument('--assemble_with', choices=['None', 'Line', ''], type=str.strip, required=False, help='Defines how to assemble the results of the transform job as a single S3 object. Either None or Line.')
parser.add_argument('--output_encryption_key', type=str.strip, required=False, help='The AWS KMS key that Amazon SageMaker uses to encrypt the model artifacts.', default='')
parser.add_argument('--input_filter', type=str.strip, required=False, help='A JSONPath expression used to select a portion of the input data to pass to the algorithm.', default='')
parser.add_argument('--output_filter', type=str.strip, required=False, help='A JSONPath expression used to select a portion of the joined dataset to save in the output file for a batch transform job.', default='')
parser.add_argument('--join_source', choices=['None', 'Input', ''], type=str.strip, required=False, help='Specifies the source of the data to join with the transformed data.', default='None')
parser.add_argument('--job_name', type=str, required=False, help='The name of the transform job.', default='')
parser.add_argument('--model_name', type=str, required=True, help='The name of the model that you want to use for the transform job.')
parser.add_argument('--max_concurrent', type=int, required=False, help='The maximum number of parallel requests that can be sent to each instance in a transform job.', default='0')
parser.add_argument('--max_payload', type=int, required=False, help='The maximum allowed size of the payload, in MB.', default='6')
parser.add_argument('--batch_strategy', choices=['MultiRecord', 'SingleRecord', ''], type=str, required=False, help='The number of records to include in a mini-batch for an HTTP inference request.', default='')
parser.add_argument('--environment', type=_utils.yaml_or_json_str, required=False, help='The dictionary of the environment variables to set in the Docker container. Up to 16 key-value entries in the map.', default={})
parser.add_argument('--input_location', type=str, required=True, help='The S3 location of the data source that is associated with a channel.')
parser.add_argument('--data_type', choices=['ManifestFile', 'S3Prefix', 'AugmentedManifestFile', ''], type=str, required=False, help='Data type of the input. Can be ManifestFile, S3Prefix, or AugmentedManifestFile.', default='S3Prefix')
parser.add_argument('--content_type', type=str, required=False, help='The multipurpose internet mail extension (MIME) type of the data.', default='')
parser.add_argument('--split_type', choices=['None', 'Line', 'RecordIO', 'TFRecord', ''], type=str, required=False, help='The method to use to split the transform job data files into smaller batches.', default='None')
parser.add_argument('--compression_type', choices=['None', 'Gzip', ''], type=str, required=False, help='If the transform data is compressed, the specification of the compression type.', default='None')
parser.add_argument('--output_location', type=str, required=True, help='The Amazon S3 path where you want Amazon SageMaker to store the results of the transform job.')
parser.add_argument('--accept', type=str, required=False, help='The MIME type used to specify the output data.')
parser.add_argument('--assemble_with', choices=['None', 'Line', ''], type=str, required=False, help='Defines how to assemble the results of the transform job as a single S3 object. Either None or Line.')
parser.add_argument('--output_encryption_key', type=str, required=False, help='The AWS KMS key that Amazon SageMaker uses to encrypt the model artifacts.', default='')
parser.add_argument('--input_filter', type=str, required=False, help='A JSONPath expression used to select a portion of the input data to pass to the algorithm.', default='')
parser.add_argument('--output_filter', type=str, required=False, help='A JSONPath expression used to select a portion of the joined dataset to save in the output file for a batch transform job.', default='')
parser.add_argument('--join_source', choices=['None', 'Input', ''], type=str, required=False, help='Specifies the source of the data to join with the transformed data.', default='None')
parser.add_argument('--instance_type', choices=['ml.m4.xlarge', 'ml.m4.2xlarge', 'ml.m4.4xlarge', 'ml.m4.10xlarge', 'ml.m4.16xlarge', 'ml.m5.large', 'ml.m5.xlarge', 'ml.m5.2xlarge', 'ml.m5.4xlarge',
'ml.m5.12xlarge', 'ml.m5.24xlarge', 'ml.c4.xlarge', 'ml.c4.2xlarge', 'ml.c4.4xlarge', 'ml.c4.8xlarge', 'ml.p2.xlarge', 'ml.p2.8xlarge', 'ml.p2.16xlarge', 'ml.p3.2xlarge', 'ml.p3.8xlarge', 'ml.p3.16xlarge',
'ml.c5.xlarge', 'ml.c5.2xlarge', 'ml.c5.4xlarge', 'ml.c5.9xlarge', 'ml.c5.18xlarge'], type=str.strip, required=True, help='The ML compute instance type for the transform job.', default='ml.m4.xlarge')
parser.add_argument('--instance_count', type=_utils.str_to_int, required=False, help='The number of ML compute instances to use in the transform job.')
parser.add_argument('--resource_encryption_key', type=str.strip, required=False, help='The AWS KMS key that Amazon SageMaker uses to encrypt data on the storage volume attached to the ML compute instance(s).', default='')
parser.add_argument('--tags', type=_utils.str_to_json_dict, required=False, help='An array of key-value pairs, to categorize AWS resources.', default='{}')
parser.add_argument('--output_location_file', type=str.strip, required=True, help='File path where the program will write the Amazon S3 URI of the transform job results.')
'ml.c5.xlarge', 'ml.c5.2xlarge', 'ml.c5.4xlarge', 'ml.c5.9xlarge', 'ml.c5.18xlarge'], type=str, required=True, help='The ML compute instance type for the transform job.', default='ml.m4.xlarge')
parser.add_argument('--instance_count', type=int, required=False, help='The number of ML compute instances to use in the transform job.')
parser.add_argument('--resource_encryption_key', type=str, required=False, help='The AWS KMS key that Amazon SageMaker uses to encrypt data on the storage volume attached to the ML compute instance(s).', default='')
parser.add_argument('--tags', type=_utils.yaml_or_json_str, required=False, help='An array of key-value pairs, to categorize AWS resources.', default={})
parser.add_argument('--output_location_file', type=str, required=True, help='File path where the program will write the Amazon S3 URI of the transform job results.')

return parser

Expand Down
6 changes: 3 additions & 3 deletions components/aws/sagemaker/codebuild/deploy.buildspec.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
version: 0.2
version: 0.2

phases:
pre_build:
commands:
# Log in to Dockerhub
- mkdir -p ~/.docker
- echo $DOCKER_CONFIG > ~/.docker/config.json
- docker login -u $DOCKER_CONFIG_USERNAME -p $DOCKER_CONFIG_PASSWORD

build:
commands:
Expand Down
7 changes: 4 additions & 3 deletions components/aws/sagemaker/codebuild/scripts/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ set -e
REMOTE_REPOSITORY="amazon/aws-sagemaker-kfp-components"
DRYRUN="true"
FULL_VERSION_TAG=""
DOCKER_CONFIG_PATH=${DOCKER_CONFIG_PATH:-"/root/.docker"}

while getopts ":d:v:" opt; do
case ${opt} in
Expand Down Expand Up @@ -64,13 +65,13 @@ echo "Tagged image with ${MAJOR_VERSION_IMAGE}"

# Push to the remote repository
if [ "${DRYRUN}" == "false" ]; then
docker push "${FULL_VERSION_IMAGE}"
docker --config "$DOCKER_CONFIG_PATH" push "${FULL_VERSION_IMAGE}"
echo "Successfully pushed tag ${FULL_VERSION_IMAGE} to Docker Hub"

docker push "${MINOR_VERSION_IMAGE}"
docker --config "$DOCKER_CONFIG_PATH" push "${MINOR_VERSION_IMAGE}"
echo "Successfully pushed tag ${MINOR_VERSION_IMAGE} to Docker Hub"

docker push "${MAJOR_VERSION_IMAGE}"
docker --config "$DOCKER_CONFIG_PATH" push "${MAJOR_VERSION_IMAGE}"
echo "Successfully pushed tag ${MAJOR_VERSION_IMAGE} to Docker Hub"
else
echo "Dry run detected. Not pushing images."
Expand Down
49 changes: 15 additions & 34 deletions components/aws/sagemaker/common/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import argparse
from time import gmtime, strftime
from distutils.util import strtobool
import time
import string
import random
Expand Down Expand Up @@ -63,15 +64,15 @@ def nullable_string_argument(value):


def add_default_client_arguments(parser):
parser.add_argument('--region', type=str.strip, required=True, help='The region where the training job launches.')
parser.add_argument('--region', type=str, required=True, help='The region where the training job launches.')
parser.add_argument('--endpoint_url', type=nullable_string_argument, required=False, help='The URL to use when communicating with the Sagemaker service.')


def get_component_version():
"""Get component version from the first line of License file"""
component_version = 'NULL'

with open('/THIRD-PARTY-LICENSES.txt', 'r') as license_file:
with open('THIRD-PARTY-LICENSES.txt', 'r') as license_file:
RedbackThomson marked this conversation as resolved.
Show resolved Hide resolved
version_match = re.search('Amazon SageMaker Components for Kubeflow Pipelines; version (([0-9]+[.])+[0-9]+)',
license_file.readline())
if version_match is not None:
Expand Down Expand Up @@ -858,35 +859,15 @@ def enable_spot_instance_support(training_job_config, args):
def id_generator(size=4, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))


def str_to_bool(s):
if s.lower().strip() == 'true':
return True
elif s.lower().strip() == 'false':
return False
else:
raise argparse.ArgumentTypeError('"True" or "False" expected.')

def str_to_int(s):
if s:
return int(s)
else:
return 0

def str_to_float(s):
if s:
return float(s)
else:
return 0.0

def str_to_json_dict(s):
if s != '':
return json.loads(s)
else:
return {}

def str_to_json_list(s):
if s != '':
return json.loads(s)
else:
return []
def yaml_or_json_str(str):
Copy link
Contributor

@akartsky akartsky May 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an error in the json input then it wouldn’t print proper error message.
Maybe there's a better way to do this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was taken directly from the TFJob implementation. I believe that if it is improper JSON then it will also fail the YAML load and throw an exception.

if str == "" or str == None:
return None
try:
return json.loads(str)
except:
return yaml.safe_load(str)

def str_to_bool(str):
# This distutils function returns an integer representation of the boolean
# rather than a True/False value. This simply hard casts it.
return bool(strtobool(str))
Loading