diff --git a/backend/Dockerfile b/backend/Dockerfile index 72bf06394ad..7b6c2521375 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -14,11 +14,21 @@ COPY . . RUN apk add --update gcc musl-dev RUN go build -o /bin/apiserver backend/src/apiserver/*.go -FROM python:3.5.0-slim as compiler +FROM python:3.5 as compiler + +RUN apt-get update -y && \ + apt-get install --no-install-recommends -y -q default-jdk wget + +RUN pip3 install setuptools==40.5.0 + +RUN wget http://central.maven.org/maven2/io/swagger/swagger-codegen-cli/2.3.1/swagger-codegen-cli-2.3.1.jar -O /tmp/swagger-codegen-cli.jar + +WORKDIR /go/src/github.com/kubeflow/pipelines +COPY . . +WORKDIR /go/src/github.com/kubeflow/pipelines/sdk/python +RUN ./build.sh /kfp.tar.gz +RUN pip3 install /kfp.tar.gz -# This is hard coded to 0.0.26. Once kfp DSK release process is automated, -# we can dynamically refer to the version from same commit SHA. -RUN pip install https://storage.googleapis.com/ml-pipeline/release/0.0.26/kfp-0.0.26.tar.gz --upgrade WORKDIR /samples COPY ./samples . RUN find . -maxdepth 2 -name "*.py" -exec dsl-compile --py {} --output {}.tar.gz \; diff --git a/samples/basic/exit_handler.py b/samples/basic/exit_handler.py index 1abc7b3b2fe..adb40416354 100755 --- a/samples/basic/exit_handler.py +++ b/samples/basic/exit_handler.py @@ -21,8 +21,7 @@ name='Exit Handler', description='Download a message and print it out. Exit Handler will run at the end.' ) -def download_and_print( - url=dsl.PipelineParam(name='url', value='gs://ml-pipeline-playground/shakespeare1.txt')): +def download_and_print(url='gs://ml-pipeline-playground/shakespeare1.txt'): """A sample pipeline showing exit handler.""" exit_op = dsl.ContainerOp( diff --git a/samples/basic/parallel_join.py b/samples/basic/parallel_join.py index adde2b472dd..3742a7ecd38 100755 --- a/samples/basic/parallel_join.py +++ b/samples/basic/parallel_join.py @@ -22,8 +22,8 @@ description='Download two messages in parallel and print the concatenated result.' ) def download_and_join( - url1=dsl.PipelineParam(name='url1', value='gs://ml-pipeline-playground/shakespeare1.txt'), - url2=dsl.PipelineParam(name='url2', value='gs://ml-pipeline-playground/shakespeare2.txt')): + url1='gs://ml-pipeline-playground/shakespeare1.txt', + url2='gs://ml-pipeline-playground/shakespeare2.txt'): """A three-step pipeline with first two running in parallel.""" download1 = dsl.ContainerOp( diff --git a/samples/basic/sequential.py b/samples/basic/sequential.py index 30b4c538eaa..84b8a78c811 100755 --- a/samples/basic/sequential.py +++ b/samples/basic/sequential.py @@ -21,8 +21,7 @@ name='Sequential', description='A pipeline with two sequential steps.' ) -def sequential_pipeline( - url=dsl.PipelineParam(name='url', value='gs://ml-pipeline-playground/shakespeare1.txt')): +def sequential_pipeline(url='gs://ml-pipeline-playground/shakespeare1.txt'): """A pipeline with two sequential steps.""" op1 = dsl.ContainerOp( diff --git a/samples/kubeflow-tf/kubeflow-training-classification.py b/samples/kubeflow-tf/kubeflow-training-classification.py index 8ebca866385..aaa3241b899 100755 --- a/samples/kubeflow-tf/kubeflow-training-classification.py +++ b/samples/kubeflow-tf/kubeflow-training-classification.py @@ -81,18 +81,18 @@ def confusion_matrix_op(predictions, output, step_name='confusionmatrix'): name='Pipeline TFJob', description='Demonstrate the DSL for TFJob' ) -def kubeflow_training( output: dsl.PipelineParam, project: dsl.PipelineParam, - evaluation: dsl.PipelineParam=dsl.PipelineParam(name='evaluation', value='gs://ml-pipeline-playground/flower/eval100.csv'), - train: dsl.PipelineParam=dsl.PipelineParam(name='train', value='gs://ml-pipeline-playground/flower/train200.csv'), - schema: dsl.PipelineParam=dsl.PipelineParam(name='schema', value='gs://ml-pipeline-playground/flower/schema.json'), - learning_rate: dsl.PipelineParam=dsl.PipelineParam(name='learningrate', value=0.1), - hidden_layer_size: dsl.PipelineParam=dsl.PipelineParam(name='hiddenlayersize', value='100,50'), - steps: dsl.PipelineParam=dsl.PipelineParam(name='steps', value=2000), - target: dsl.PipelineParam=dsl.PipelineParam(name='target', value='label'), - workers: dsl.PipelineParam=dsl.PipelineParam(name='workers', value=0), - pss: dsl.PipelineParam=dsl.PipelineParam(name='pss', value=0), - preprocess_mode: dsl.PipelineParam=dsl.PipelineParam(name='preprocessmode', value='local'), - predict_mode: dsl.PipelineParam=dsl.PipelineParam(name='predictmode', value='local')): +def kubeflow_training(output, project, + evaluation='gs://ml-pipeline-playground/flower/eval100.csv', + train='gs://ml-pipeline-playground/flower/train200.csv', + schema='gs://ml-pipeline-playground/flower/schema.json', + learning_rate=0.1, + hidden_layer_size='100,50', + steps=2000, + target='label', + workers=0, + pss=0, + preprocess_mode='local', + predict_mode='local'): # TODO: use the argo job name as the workflow workflow = '{{workflow.name}}' diff --git a/samples/tfx/taxi-cab-classification-pipeline.py b/samples/tfx/taxi-cab-classification-pipeline.py index 1026a792596..be83eb1bda3 100755 --- a/samples/tfx/taxi-cab-classification-pipeline.py +++ b/samples/tfx/taxi-cab-classification-pipeline.py @@ -119,37 +119,23 @@ def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, step_name='dep description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.' ) def taxi_cab_classification( - output: dsl.PipelineParam, - project: dsl.PipelineParam, - - column_names: dsl.PipelineParam=dsl.PipelineParam( - name='column-names', - value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json'), - key_columns: dsl.PipelineParam=dsl.PipelineParam( - name='key-columns', - value='trip_start_timestamp'), - train: dsl.PipelineParam=dsl.PipelineParam( - name='train', - value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'), - evaluation: dsl.PipelineParam=dsl.PipelineParam( - name='evaluation', - value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'), - validation_mode: dsl.PipelineParam=dsl.PipelineParam( - name='validation-mode', value='local'), - preprocess_mode: dsl.PipelineParam=dsl.PipelineParam( - name='preprocess-mode', value='local'), - preprocess_module: dsl.PipelineParam=dsl.PipelineParam( - name='preprocess-module', - value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py'), - target: dsl.PipelineParam=dsl.PipelineParam( - name='target', value='tips'), - learning_rate: dsl.PipelineParam=dsl.PipelineParam(name='learning-rate', value=0.1), - hidden_layer_size: dsl.PipelineParam=dsl.PipelineParam(name='hidden-layer-size', value='1500'), - steps: dsl.PipelineParam=dsl.PipelineParam(name='steps', value=3000), - predict_mode: dsl.PipelineParam=dsl.PipelineParam(name='predict-mode', value='local'), - analyze_mode: dsl.PipelineParam=dsl.PipelineParam(name='analyze-mode', value='local'), - analyze_slice_column: dsl.PipelineParam=dsl.PipelineParam( - name='analyze-slice-column', value='trip_start_hour')): + output, + project, + column_names='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json', + key_columns='trip_start_timestamp', + train='gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv', + evaluation='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv', + validation_mode='local', + preprocess_mode='local', + preprocess_module='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py', + target='tips', + learning_rate=0.1, + hidden_layer_size='1500', + steps=3000, + predict_mode='local', + analyze_mode='local', + analyze_slice_column='trip_start_hour'): + validation_output = '%s/{{workflow.name}}/validation' % output transform_output = '%s/{{workflow.name}}/transformed' % output training_output = '%s/{{workflow.name}}/train' % output diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index d3c04b94558..e93afd6aa0a 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -440,13 +440,6 @@ def _create_pipeline_workflow(self, args, pipeline): workflow['spec']['volumes'] = volumes return workflow - def _validate_args(self, argspec): - if argspec.defaults: - for value in argspec.defaults: - if not issubclass(type(value), dsl.PipelineParam): - raise ValueError( - 'Default values of argument has to be type dsl.PipelineParam or its child.') - def _validate_exit_handler(self, pipeline): """Makes sure there is only one global exit handler. @@ -471,7 +464,6 @@ def _compile(self, pipeline_func): """Compile the given pipeline function into workflow.""" argspec = inspect.getfullargspec(pipeline_func) - self._validate_args(argspec) registered_pipeline_functions = dsl.Pipeline.get_pipeline_functions() if pipeline_func not in registered_pipeline_functions: @@ -494,7 +486,7 @@ def _compile(self, pipeline_func): for arg_name in argspec.args] if argspec.defaults: for arg, default in zip(reversed(args_list_with_defaults), reversed(argspec.defaults)): - arg.value = default.value + arg.value = default.value if isinstance(default, dsl.PipelineParam) else default workflow = self._create_pipeline_workflow(args_list_with_defaults, p) return workflow diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 93230aa6664..4c0c9c6a7c1 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -168,25 +168,6 @@ def test_composing_workflow(self): shutil.rmtree(tmpdir) # print(tmpdir) - def test_invalid_pipelines(self): - """Test invalid pipelines.""" - - @dsl.pipeline( - name='name', - description='description' - ) - def invalid_param_defaults(message, outputpath='something'): - pass - - with self.assertRaises(ValueError): - compiler.Compiler()._compile(invalid_param_defaults) - - def missing_decoration(message: dsl.PipelineParam): - pass - - with self.assertRaises(ValueError): - compiler.Compiler()._compile(missing_decoration) - def test_package_compile(self): """Test compiling python packages.""" diff --git a/sdk/python/tests/compiler/testdata/basic.py b/sdk/python/tests/compiler/testdata/basic.py index 3f078010000..85f12d206a4 100644 --- a/sdk/python/tests/compiler/testdata/basic.py +++ b/sdk/python/tests/compiler/testdata/basic.py @@ -70,7 +70,7 @@ def __init__(self, name): name='Save Most Frequent', description='Get Most Frequent Word and Save to GCS' ) -def save_most_frequent_word(message: dsl.PipelineParam, outputpath: dsl.PipelineParam): +def save_most_frequent_word(message: str, outputpath: str): """A pipeline function describing the orchestration of the workflow.""" exit_op = ExitHandlerOp('exiting') diff --git a/sdk/python/tests/compiler/testdata/compose.py b/sdk/python/tests/compiler/testdata/compose.py index 637e63802b9..9b4300b21ab 100644 --- a/sdk/python/tests/compiler/testdata/compose.py +++ b/sdk/python/tests/compiler/testdata/compose.py @@ -95,6 +95,6 @@ def __init__(self, name, url): name='Download and Save Most Frequent', description='Download and Get Most Frequent Word and Save to GCS' ) -def download_save_most_frequent_word(url: dsl.PipelineParam, outputpath: dsl.PipelineParam): +def download_save_most_frequent_word(url: str, outputpath: str): downloader = DownloadMessageOp('download', url) save_most_frequent_word(downloader.output, outputpath) diff --git a/sdk/python/tests/compiler/testdata/default_value.py b/sdk/python/tests/compiler/testdata/default_value.py index 1879805ac74..e84464e4707 100644 --- a/sdk/python/tests/compiler/testdata/default_value.py +++ b/sdk/python/tests/compiler/testdata/default_value.py @@ -20,8 +20,7 @@ name='Default Value', description='A pipeline with parameter and default value.' ) -def default_value_pipeline( - url=dsl.PipelineParam(name='url', value='gs://ml-pipeline/shakespeare1.txt')): +def default_value_pipeline(url='gs://ml-pipeline/shakespeare1.txt'): # "url" is a pipeline parameter, meaning users can provide values when running the # pipeline using UI, CLI, or API to override the default value. diff --git a/sdk/python/tests/compiler/testdata/testpackage/mypipeline/compose.py b/sdk/python/tests/compiler/testdata/testpackage/mypipeline/compose.py index 637e63802b9..9b4300b21ab 100644 --- a/sdk/python/tests/compiler/testdata/testpackage/mypipeline/compose.py +++ b/sdk/python/tests/compiler/testdata/testpackage/mypipeline/compose.py @@ -95,6 +95,6 @@ def __init__(self, name, url): name='Download and Save Most Frequent', description='Download and Get Most Frequent Word and Save to GCS' ) -def download_save_most_frequent_word(url: dsl.PipelineParam, outputpath: dsl.PipelineParam): +def download_save_most_frequent_word(url: str, outputpath: str): downloader = DownloadMessageOp('download', url) save_most_frequent_word(downloader.output, outputpath)