Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Now pipeline function takes direct default values rather than dsp.PipelineParam. #110

Merged
merged 12 commits into from
Nov 27, 2018
18 changes: 14 additions & 4 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@ COPY . .
RUN apk add --update gcc musl-dev
RUN go build -o /bin/apiserver backend/src/apiserver/*.go

FROM python:3.5.0-slim as compiler
FROM python:3.5 as compiler

RUN apt-get update -y && \
apt-get install --no-install-recommends -y -q default-jdk wget

RUN pip3 install setuptools==40.5.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this no auto-installed? Do we need the specific version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes per @gaoning777 's investigation.


RUN wget http://central.maven.org/maven2/io/swagger/swagger-codegen-cli/2.3.1/swagger-codegen-cli-2.3.1.jar -O /tmp/swagger-codegen-cli.jar
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this for compilation? The compiler probably runs fine without the swagger codegen or kubernetes.


WORKDIR /go/src/github.com/kubeflow/pipelines
COPY . .
WORKDIR /go/src/github.com/kubeflow/pipelines/sdk/python
RUN ./build.sh /kfp.tar.gz
RUN pip3 install /kfp.tar.gz

# This is hard coded to 0.0.26. Once kfp DSK release process is automated,
# we can dynamically refer to the version from same commit SHA.
RUN pip install https://storage.googleapis.com/ml-pipeline/release/0.0.26/kfp-0.0.26.tar.gz --upgrade
WORKDIR /samples
COPY ./samples .
RUN find . -maxdepth 2 -name "*.py" -exec dsl-compile --py {} --output {}.tar.gz \;
Expand Down
3 changes: 1 addition & 2 deletions samples/basic/exit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
name='Exit Handler',
description='Download a message and print it out. Exit Handler will run at the end.'
)
def download_and_print(
url=dsl.PipelineParam(name='url', value='gs://ml-pipeline-playground/shakespeare1.txt')):
def download_and_print(url='gs://ml-pipeline-playground/shakespeare1.txt'):
"""A sample pipeline showing exit handler."""

exit_op = dsl.ContainerOp(
Expand Down
4 changes: 2 additions & 2 deletions samples/basic/parallel_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
description='Download two messages in parallel and print the concatenated result.'
)
def download_and_join(
url1=dsl.PipelineParam(name='url1', value='gs://ml-pipeline-playground/shakespeare1.txt'),
url2=dsl.PipelineParam(name='url2', value='gs://ml-pipeline-playground/shakespeare2.txt')):
url1='gs://ml-pipeline-playground/shakespeare1.txt',
url2='gs://ml-pipeline-playground/shakespeare2.txt'):
"""A three-step pipeline with first two running in parallel."""

download1 = dsl.ContainerOp(
Expand Down
3 changes: 1 addition & 2 deletions samples/basic/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
name='Sequential',
description='A pipeline with two sequential steps.'
)
def sequential_pipeline(
url=dsl.PipelineParam(name='url', value='gs://ml-pipeline-playground/shakespeare1.txt')):
def sequential_pipeline(url='gs://ml-pipeline-playground/shakespeare1.txt'):
"""A pipeline with two sequential steps."""

op1 = dsl.ContainerOp(
Expand Down
24 changes: 12 additions & 12 deletions samples/kubeflow-tf/kubeflow-training-classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ def confusion_matrix_op(predictions, output, step_name='confusionmatrix'):
name='Pipeline TFJob',
description='Demonstrate the DSL for TFJob'
)
def kubeflow_training( output: dsl.PipelineParam, project: dsl.PipelineParam,
evaluation: dsl.PipelineParam=dsl.PipelineParam(name='evaluation', value='gs://ml-pipeline-playground/flower/eval100.csv'),
train: dsl.PipelineParam=dsl.PipelineParam(name='train', value='gs://ml-pipeline-playground/flower/train200.csv'),
schema: dsl.PipelineParam=dsl.PipelineParam(name='schema', value='gs://ml-pipeline-playground/flower/schema.json'),
learning_rate: dsl.PipelineParam=dsl.PipelineParam(name='learningrate', value=0.1),
hidden_layer_size: dsl.PipelineParam=dsl.PipelineParam(name='hiddenlayersize', value='100,50'),
steps: dsl.PipelineParam=dsl.PipelineParam(name='steps', value=2000),
target: dsl.PipelineParam=dsl.PipelineParam(name='target', value='label'),
workers: dsl.PipelineParam=dsl.PipelineParam(name='workers', value=0),
pss: dsl.PipelineParam=dsl.PipelineParam(name='pss', value=0),
preprocess_mode: dsl.PipelineParam=dsl.PipelineParam(name='preprocessmode', value='local'),
predict_mode: dsl.PipelineParam=dsl.PipelineParam(name='predictmode', value='local')):
def kubeflow_training(output, project,
evaluation='gs://ml-pipeline-playground/flower/eval100.csv',
train='gs://ml-pipeline-playground/flower/train200.csv',
schema='gs://ml-pipeline-playground/flower/schema.json',
learning_rate=0.1,
hidden_layer_size='100,50',
steps=2000,
target='label',
workers=0,
pss=0,
preprocess_mode='local',
predict_mode='local'):
# TODO: use the argo job name as the workflow
workflow = '{{workflow.name}}'

Expand Down
48 changes: 17 additions & 31 deletions samples/tfx/taxi-cab-classification-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,37 +119,23 @@ def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, step_name='dep
description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'
)
def taxi_cab_classification(
output: dsl.PipelineParam,
project: dsl.PipelineParam,

column_names: dsl.PipelineParam=dsl.PipelineParam(
name='column-names',
value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json'),
key_columns: dsl.PipelineParam=dsl.PipelineParam(
name='key-columns',
value='trip_start_timestamp'),
train: dsl.PipelineParam=dsl.PipelineParam(
name='train',
value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'),
evaluation: dsl.PipelineParam=dsl.PipelineParam(
name='evaluation',
value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'),
validation_mode: dsl.PipelineParam=dsl.PipelineParam(
name='validation-mode', value='local'),
preprocess_mode: dsl.PipelineParam=dsl.PipelineParam(
name='preprocess-mode', value='local'),
preprocess_module: dsl.PipelineParam=dsl.PipelineParam(
name='preprocess-module',
value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py'),
target: dsl.PipelineParam=dsl.PipelineParam(
name='target', value='tips'),
learning_rate: dsl.PipelineParam=dsl.PipelineParam(name='learning-rate', value=0.1),
hidden_layer_size: dsl.PipelineParam=dsl.PipelineParam(name='hidden-layer-size', value='1500'),
steps: dsl.PipelineParam=dsl.PipelineParam(name='steps', value=3000),
predict_mode: dsl.PipelineParam=dsl.PipelineParam(name='predict-mode', value='local'),
analyze_mode: dsl.PipelineParam=dsl.PipelineParam(name='analyze-mode', value='local'),
analyze_slice_column: dsl.PipelineParam=dsl.PipelineParam(
name='analyze-slice-column', value='trip_start_hour')):
output,
project,
column_names='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json',
key_columns='trip_start_timestamp',
train='gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv',
evaluation='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv',
validation_mode='local',
preprocess_mode='local',
preprocess_module='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py',
target='tips',
learning_rate=0.1,
hidden_layer_size='1500',
steps=3000,
predict_mode='local',
analyze_mode='local',
analyze_slice_column='trip_start_hour'):

validation_output = '%s/{{workflow.name}}/validation' % output
transform_output = '%s/{{workflow.name}}/transformed' % output
training_output = '%s/{{workflow.name}}/train' % output
Expand Down
10 changes: 1 addition & 9 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,13 +440,6 @@ def _create_pipeline_workflow(self, args, pipeline):
workflow['spec']['volumes'] = volumes
return workflow

def _validate_args(self, argspec):
if argspec.defaults:
for value in argspec.defaults:
if not issubclass(type(value), dsl.PipelineParam):
raise ValueError(
'Default values of argument has to be type dsl.PipelineParam or its child.')

def _validate_exit_handler(self, pipeline):
"""Makes sure there is only one global exit handler.

Expand All @@ -471,7 +464,6 @@ def _compile(self, pipeline_func):
"""Compile the given pipeline function into workflow."""

argspec = inspect.getfullargspec(pipeline_func)
self._validate_args(argspec)

registered_pipeline_functions = dsl.Pipeline.get_pipeline_functions()
if pipeline_func not in registered_pipeline_functions:
Expand All @@ -494,7 +486,7 @@ def _compile(self, pipeline_func):
for arg_name in argspec.args]
if argspec.defaults:
for arg, default in zip(reversed(args_list_with_defaults), reversed(argspec.defaults)):
arg.value = default.value
arg.value = default.value if isinstance(default, dsl.PipelineParam) else default

workflow = self._create_pipeline_workflow(args_list_with_defaults, p)
return workflow
Expand Down
19 changes: 0 additions & 19 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,6 @@ def test_composing_workflow(self):
shutil.rmtree(tmpdir)
# print(tmpdir)

def test_invalid_pipelines(self):
"""Test invalid pipelines."""

@dsl.pipeline(
name='name',
description='description'
)
def invalid_param_defaults(message, outputpath='something'):
pass

with self.assertRaises(ValueError):
compiler.Compiler()._compile(invalid_param_defaults)

def missing_decoration(message: dsl.PipelineParam):
pass

with self.assertRaises(ValueError):
compiler.Compiler()._compile(missing_decoration)

def test_package_compile(self):
"""Test compiling python packages."""

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/compiler/testdata/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(self, name):
name='Save Most Frequent',
description='Get Most Frequent Word and Save to GCS'
)
def save_most_frequent_word(message: dsl.PipelineParam, outputpath: dsl.PipelineParam):
def save_most_frequent_word(message: str, outputpath: str):
"""A pipeline function describing the orchestration of the workflow."""

exit_op = ExitHandlerOp('exiting')
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/compiler/testdata/compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ def __init__(self, name, url):
name='Download and Save Most Frequent',
description='Download and Get Most Frequent Word and Save to GCS'
)
def download_save_most_frequent_word(url: dsl.PipelineParam, outputpath: dsl.PipelineParam):
def download_save_most_frequent_word(url: str, outputpath: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type annotation is not used by DSL, right? it served to show the types to the users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not used by DSL compiler.

downloader = DownloadMessageOp('download', url)
save_most_frequent_word(downloader.output, outputpath)
3 changes: 1 addition & 2 deletions sdk/python/tests/compiler/testdata/default_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
name='Default Value',
description='A pipeline with parameter and default value.'
)
def default_value_pipeline(
url=dsl.PipelineParam(name='url', value='gs://ml-pipeline/shakespeare1.txt')):
def default_value_pipeline(url='gs://ml-pipeline/shakespeare1.txt'):

# "url" is a pipeline parameter, meaning users can provide values when running the
# pipeline using UI, CLI, or API to override the default value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ def __init__(self, name, url):
name='Download and Save Most Frequent',
description='Download and Get Most Frequent Word and Save to GCS'
)
def download_save_most_frequent_word(url: dsl.PipelineParam, outputpath: dsl.PipelineParam):
def download_save_most_frequent_word(url: str, outputpath: str):
downloader = DownloadMessageOp('download', url)
save_most_frequent_word(downloader.output, outputpath)