Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

component build support for both python2 and python3 #730

Merged
merged 11 commits into from
Feb 25, 2019
134 changes: 131 additions & 3 deletions samples/notebooks/KubeFlow Pipeline Using TFX OSS Components.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"PROJECT_NAME = 'Your-Gcp-Project-Name'\n",
"BASE_IMAGE='gcr.io/%s/pusherbase:dev' % PROJECT_NAME\n",
"TARGET_IMAGE='gcr.io/%s/pusher:dev' % PROJECT_NAME\n",
"TARGET_IMAGE_TWO='gcr.io/%s/pusher_two:dev' % PROJECT_NAME\n",
"KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.6/kfp.tar.gz'\n",
"TRAIN_DATA = 'gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'\n",
"EVAL_DATA = 'gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'\n",
Expand All @@ -52,7 +53,8 @@
"KUBEFLOW_DEPLOYER_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:5df2cdc1ed145320204e8bc73b59cdbd7b3da28f'\n",
"DEPLOYER_MODEL = 'notebook_tfx_taxi'\n",
"DEPLOYER_VERSION_DEV = 'dev'\n",
"DEPLOYER_VERSION_PROD = 'prod'"
"DEPLOYER_VERSION_PROD = 'prod'\n",
"DEPLOYER_VERSION_PROD_TWO = 'prodtwo'"
]
},
{
Expand Down Expand Up @@ -563,7 +565,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Submit a new job"
"### Submit a new job"
]
},
{
Expand All @@ -587,7 +589,132 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Clean up"
"## Customize a step in Python2\n",
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
"Let's reuse the deploy_model function defined above. However, this time we will use python2 instead of the default python3."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from kfp import compiler\n",
"\n",
"# The return value \"DeployerOp\" represents a step that can be used directly in a pipeline function\n",
"DeployerOp = compiler.build_python_component(\n",
" component_func=deploy_model,\n",
" staging_gcs_path=OUTPUT_DIR,\n",
" dependency=[kfp.compiler.VersionedDependency(name='google-api-python-client', version='1.7.0')],\n",
" base_image='tensorflow/tensorflow:1.12.0',\n",
" target_image=TARGET_IMAGE_TWO,\n",
" python_version='python2')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Modify the pipeline with the new deployer"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# My New Pipeline. It's almost the same as the original one with the last step deployer replaced.\n",
"@dsl.pipeline(\n",
" name='TFX Taxi Cab Classification Pipeline Example',\n",
" description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'\n",
")\n",
"def my_taxi_cab_classification(\n",
" output,\n",
" project,\n",
" model,\n",
" version,\n",
" column_names=dsl.PipelineParam(\n",
" name='column-names',\n",
" value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json'),\n",
" key_columns=dsl.PipelineParam(name='key-columns', value='trip_start_timestamp'),\n",
" train=dsl.PipelineParam(\n",
" name='train',\n",
" value=TRAIN_DATA),\n",
" evaluation=dsl.PipelineParam(\n",
" name='evaluation',\n",
" value=EVAL_DATA),\n",
" validation_mode=dsl.PipelineParam(name='validation-mode', value='local'),\n",
" preprocess_mode=dsl.PipelineParam(name='preprocess-mode', value='local'),\n",
" preprocess_module: dsl.PipelineParam=dsl.PipelineParam(\n",
" name='preprocess-module',\n",
" value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py'),\n",
" target=dsl.PipelineParam(name='target', value='tips'),\n",
" learning_rate=dsl.PipelineParam(name='learning-rate', value=0.1),\n",
" hidden_layer_size=dsl.PipelineParam(name='hidden-layer-size', value=HIDDEN_LAYER_SIZE),\n",
" steps=dsl.PipelineParam(name='steps', value=STEPS),\n",
" predict_mode=dsl.PipelineParam(name='predict-mode', value='local'),\n",
" analyze_mode=dsl.PipelineParam(name='analyze-mode', value='local'),\n",
" analyze_slice_column=dsl.PipelineParam(name='analyze-slice-column', value='trip_start_hour')):\n",
" \n",
" \n",
" validation_output = '%s/{{workflow.name}}/validation' % output\n",
" transform_output = '%s/{{workflow.name}}/transformed' % output\n",
" training_output = '%s/{{workflow.name}}/train' % output\n",
" analysis_output = '%s/{{workflow.name}}/analysis' % output\n",
" prediction_output = '%s/{{workflow.name}}/predict' % output\n",
"\n",
" validation = dataflow_tf_data_validation_op(\n",
" train, evaluation, column_names, key_columns, project,\n",
" validation_mode, validation_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" preprocess = dataflow_tf_transform_op(\n",
" train, evaluation, validation.outputs['schema'], project, preprocess_mode,\n",
" preprocess_module, transform_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" training = tf_train_op(\n",
" preprocess.output, validation.outputs['schema'], learning_rate, hidden_layer_size,\n",
" steps, target, preprocess_module, training_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" analysis = dataflow_tf_model_analyze_op(\n",
" training.output, evaluation, validation.outputs['schema'], project,\n",
" analyze_mode, analyze_slice_column, analysis_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" prediction = dataflow_tf_predict_op(\n",
" evaluation, validation.outputs['schema'], target, training.output,\n",
" predict_mode, project, prediction_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" \n",
" # The new deployer. Note that the DeployerOp interface is similar to the function \"deploy_model\".\n",
" deploy = DeployerOp(\n",
" gcp_project=project, model_name=model, version_name=version, runtime='1.9',\n",
" model_path=training.output).apply(gcp.use_gcp_secret('user-gcp-sa'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit a new job"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"compiler.Compiler().compile(my_taxi_cab_classification, 'my-tfx-two.tar.gz')\n",
"\n",
"run = client.run_pipeline(exp.id, 'my-tfx-two', 'my-tfx-two.tar.gz',\n",
" params={'output': OUTPUT_DIR,\n",
" 'project': PROJECT_NAME,\n",
" 'model': DEPLOYER_MODEL,\n",
" 'version': DEPLOYER_VERSION_PROD_TWO})\n",
"\n",
"result = client.wait_for_run_completion(run.id, timeout=600)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Clean up"
]
},
{
Expand All @@ -612,6 +739,7 @@
"\n",
"\n",
"!gcloud ml-engine versions delete $DEPLOYER_VERSION_PROD --model $DEPLOYER_MODEL -q\n",
"!gcloud ml-engine versions delete $DEPLOYER_VERSION_PROD_TWO --model $DEPLOYER_MODEL -q\n",
"!gcloud ml-engine versions delete $DEPLOYER_VERSION_DEV --model $DEPLOYER_MODEL -q\n",
"!gcloud ml-engine models delete $DEPLOYER_MODEL -q"
]
Expand Down
87 changes: 68 additions & 19 deletions sdk/python/kfp/compiler/_component_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,34 @@ def _generate_pip_requirement(self, dependency, requirement_filepath):
dependency_helper.add_python_package(version)
dependency_helper.generate_pip_requirements(requirement_filepath)

def _generate_dockerfile_with_py(self, target_file, base_image, python_filepath, has_requirement_file):
""" _generate_docker_file generates a simple dockerfile with the python path """
def _generate_dockerfile_with_py(self, target_file, base_image, python_filepath, has_requirement_file, python_version):
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
""" _generate_docker_file generates a simple dockerfile with the python path
args:
target_file (str): target file name for the dockerfile.
base_image (str): the base image name.
python_filepath (str): the path of the python file that is copied to the docker image.
has_requirement_file (bool): whether it has a requirement file or not.
python_version (str): choose python2 or python3
"""
if python_version not in ['python2', 'python3']:
raise ValueError('python_version has to be either python2 or python3')
with open(target_file, 'w') as f:
f.write('FROM ' + base_image + '\n')
f.write('RUN apt-get update -y && apt-get install --no-install-recommends -y -q python3 python3-pip python3-setuptools\n')
if python_version is 'python3':
f.write('RUN apt-get update -y && apt-get install --no-install-recommends -y -q python3 python3-pip python3-setuptools\n')
else:
f.write('RUN apt-get update -y && apt-get install --no-install-recommends -y -q python python-pip python-setuptools\n')
if has_requirement_file:
f.write('ADD ' + self._ARC_REQUIREMENT_FILE + ' /ml/\n')
f.write('RUN pip3 install -r /ml/' + self._ARC_REQUIREMENT_FILE + '\n')
if python_version is 'python3':
f.write('RUN pip3 install -r /ml/' + self._ARC_REQUIREMENT_FILE + '\n')
else:
f.write('RUN pip install -r /ml/' + self._ARC_REQUIREMENT_FILE + '\n')
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
f.write('ADD ' + python_filepath + " /ml/" + '\n')
f.write('ENTRYPOINT ["python3", "/ml/' + python_filepath + '"]')
if python_version is 'python3':
f.write('ENTRYPOINT ["python3", "/ml/' + python_filepath + '"]')
else:
f.write('ENTRYPOINT ["python", "/ml/' + python_filepath + '"]')
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved

def _wrap_files_in_tarball(self, tarball_path, files={}):
""" _wrap_files_in_tarball creates a tarball for all the input files
Expand All @@ -165,16 +183,21 @@ def _wrap_files_in_tarball(self, tarball_path, files={}):
for key, value in files.items():
tarball.add(value, arcname=key)

def prepare_docker_tarball_with_py(self, arc_python_filename, python_filepath, base_image, local_tarball_path, dependency=None):
""" prepare_docker_tarball is the API to generate dockerfile and prepare the tarball with python scripts """
def prepare_docker_tarball_with_py(self, arc_python_filename, python_filepath, base_image, local_tarball_path, python_version, dependency=None):
""" prepare_docker_tarball is the API to generate dockerfile and prepare the tarball with python scripts
args:
python_version (str): choose python2 or python3
"""
if python_version not in ['python2', 'python3']:
raise ValueError('python_version has to be either python2 or python3')
with tempfile.TemporaryDirectory() as local_build_dir:
has_requirement_file = False
local_requirement_path = os.path.join(local_build_dir, self._ARC_REQUIREMENT_FILE)
if dependency is not None and len(dependency) != 0:
self._generate_pip_requirement(dependency, local_requirement_path)
has_requirement_file = True
local_dockerfile_path = os.path.join(local_build_dir, self._arc_dockerfile_name)
self._generate_dockerfile_with_py(local_dockerfile_path, base_image, arc_python_filename, has_requirement_file)
self._generate_dockerfile_with_py(local_dockerfile_path, base_image, arc_python_filename, has_requirement_file, python_version)
file_lists = {self._arc_dockerfile_name:local_dockerfile_path,
arc_python_filename:python_filepath}
if has_requirement_file:
Expand Down Expand Up @@ -272,7 +295,14 @@ def _generate_kaniko_spec(self, namespace, arc_dockerfile_name, gcs_path, target
return content

#TODO: currently it supports single output, future support for multiple return values
def _generate_entrypoint(self, component_func):
def _generate_entrypoint(self, component_func, python_version='python3'):
'''
args:
python_version (str): choose python2 or python3, default is python3
'''
if python_version not in ['python2', 'python3']:
raise ValueError('python_version has to be either python2 or python3')

fullargspec = inspect.getfullargspec(component_func)
annotations = fullargspec[6]
input_args = fullargspec[0]
Expand Down Expand Up @@ -312,9 +342,16 @@ def _generate_entrypoint(self, component_func):
codegen.writeline(call_component_func)

# Serialize output
codegen.writeline('from pathlib import Path')
codegen.writeline('Path(_output_file).parent.mkdir(parents=True, exist_ok=True)')
codegen.writeline('Path(_output_file).write_text(str(output))')
if python_version == 'python3':
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
codegen.writeline('from pathlib import Path')
codegen.writeline('Path(_output_file).parent.mkdir(parents=True)')
codegen.writeline('Path(_output_file).write_text(str(output))')
else:
codegen.writeline('import os')
codegen.writeline('os.makedirs(os.path.dirname(_output_file))')
codegen.writeline('with open(_output_file, "w") as data:')
codegen.indent()
codegen.writeline('data.write(str(output))')
wrapper_code = codegen.end()

# CLI codes
Expand All @@ -337,6 +374,8 @@ def _generate_entrypoint(self, component_func):
if line.startswith('def '):
break
start_line_num += 1
if python_version == 'python2':
src_lines[start_line_num] = 'def ' + component_func.__name__ + '(' + ', '.join((inspect.getfullargspec(component_func).args)) + '):'
dedecorated_component_src = '\n'.join(src_lines[start_line_num:])

complete_component_code = dedecorated_component_src + '\n' + wrapper_code + '\n' + codegen.end()
Expand All @@ -358,13 +397,18 @@ def _build_image_from_tarball(self, local_tarball_path, namespace, timeout):
# Clean up
GCSHelper.remove_gcs_blob(self._gcs_path)

def build_image_from_func(self, component_func, namespace, base_image, timeout, dependency):
""" build_image builds an image for the given python function"""
def build_image_from_func(self, component_func, namespace, base_image, timeout, dependency, python_version='python3'):
""" build_image builds an image for the given python function
args:
python_version (str): choose python2 or python3, default is python3
"""
if python_version not in ['python2', 'python3']:
raise ValueError('python_version has to be either python2 or python3')
with tempfile.TemporaryDirectory() as local_build_dir:
# Generate entrypoint and serialization python codes
local_python_filepath = os.path.join(local_build_dir, self._arc_python_filepath)
logging.info('Generate entrypoint and serialization codes.')
complete_component_code = self._generate_entrypoint(component_func)
complete_component_code = self._generate_entrypoint(component_func, python_version)
with open(local_python_filepath, 'w') as f:
f.write(complete_component_code)

Expand All @@ -376,6 +420,7 @@ def build_image_from_func(self, component_func, namespace, base_image, timeout,
arc_python_filename=self._arc_python_filepath,
base_image=base_image,
local_tarball_path=local_tarball_path,
python_version=python_version,
dependency=dependency)
self._build_image_from_tarball(local_tarball_path, namespace, timeout)

Expand Down Expand Up @@ -445,7 +490,7 @@ def _generate_pythonop(component_func, target_image, target_component_file=None)

return _create_task_factory_from_component_spec(component_spec)

def build_python_component(component_func, target_image, base_image=None, dependency=[], staging_gcs_path=None, build_image=True, timeout=600, namespace='kubeflow', target_component_file=None):
def build_python_component(component_func, target_image, base_image=None, dependency=[], staging_gcs_path=None, build_image=True, timeout=600, namespace='kubeflow', target_component_file=None, python_version='python3'):
""" build_component automatically builds a container image for the component_func
based on the base_image and pushes to the target_image.

Expand All @@ -459,9 +504,9 @@ def build_python_component(component_func, target_image, base_image=None, depend
timeout (int): the timeout for the image build(in secs), default is 600 seconds
namespace (str): the namespace within which to run the kubernetes kaniko job, default is "kubeflow"
dependency (list): a list of VersionedDependency, which includes the package name and versions, default is empty

python_version (str): choose python2 or python3, default is python3
Raises:
ValueError: The function is not decorated with python_component decorator
ValueError: The function is not decorated with python_component decorator or the python_version is neither python2 nor python3
"""

_configure_logger(logging.getLogger())
Expand All @@ -471,6 +516,9 @@ def build_python_component(component_func, target_image, base_image=None, depend
if target_image is None:
raise ValueError('target_image must not be None')

if python_version not in ['python2', 'python3']:
raise ValueError('python_version has to be either python2 or python3')

if build_image:
if staging_gcs_path is None:
raise ValueError('staging_gcs_path must not be None')
Expand All @@ -486,7 +534,8 @@ def build_python_component(component_func, target_image, base_image=None, depend
target_image)
builder = ImageBuilder(gcs_base=staging_gcs_path, target_image=target_image)
builder.build_image_from_func(component_func, namespace=namespace,
base_image=base_image, timeout=timeout, dependency=dependency)
base_image=base_image, timeout=timeout,
python_version=python_version, dependency=dependency)
logging.info('Build component complete.')
return _generate_pythonop(component_func, target_image, target_component_file)

Expand Down
Loading