diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 5c84b70a4b4..20e89d4b66b 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -718,7 +718,7 @@ def create_workflow(self, pipeline_func: Callable, pipeline_name: Text=None, pipeline_description: Text=None, - params_list: List[dsl.PipelineParam]=()) -> Dict[Text, Any]: + params_list: List[dsl.PipelineParam]=None) -> Dict[Text, Any]: """ Create workflow spec from pipeline function and specified pipeline params/metadata. Currently, the pipeline params are either specified in the signature of the pipeline function or by passing a list of @@ -730,6 +730,7 @@ def create_workflow(self, :param params_list: list of pipeline params to append to the pipeline. :return: workflow dict. """ + params_list = params_list or [] argspec = inspect.getfullargspec(pipeline_func) # Create the arg list with no default values and call pipeline function. @@ -739,6 +740,13 @@ def create_workflow(self, pipeline_meta.description = pipeline_description or pipeline_meta.description pipeline_name = K8sHelper.sanitize_k8s_name(pipeline_meta.name) + # Need to first clear the default value of dsl.PipelineParams. Otherwise, it + # will be resolved immediately in place when being to each component. + default_param_values = {} + for param in params_list: + default_param_values[param.name] = param.value + param.value = None + # Currently only allow specifying pipeline params at one place. if params_list and pipeline_meta.inputs: raise ValueError('Either specify pipeline params in the pipeline function, or in "params_list", but not both.') @@ -767,6 +775,9 @@ def create_workflow(self, arg.value = default.value if isinstance(default, dsl.PipelineParam) else default else: # Or, if args are provided by params_list, fill in pipeline_meta. + for param in params_list: + param.value = default_param_values[param.name] + args_list_with_defaults = params_list pipeline_meta.inputs = [ ParameterMeta( @@ -790,7 +801,7 @@ def create_workflow(self, def _compile(self, pipeline_func): """Compile the given pipeline function into workflow.""" - return self.create_workflow(pipeline_func, []) + return self.create_workflow(pipeline_func=pipeline_func) def compile(self, pipeline_func, package_path, type_check=True): """Compile the given pipeline function into workflow yaml. diff --git a/sdk/python/tests/compiler/testdata/basic_no_decorator.py b/sdk/python/tests/compiler/testdata/basic_no_decorator.py index 7463cc9a1a9..7e4cad22bcd 100644 --- a/sdk/python/tests/compiler/testdata/basic_no_decorator.py +++ b/sdk/python/tests/compiler/testdata/basic_no_decorator.py @@ -18,7 +18,7 @@ message_param = dsl.PipelineParam(name='message') -output_path_param = dsl.PipelineParam(name='outputpath') +output_path_param = dsl.PipelineParam(name='outputpath', value='default_output') class GetFrequentWordOp(dsl.ContainerOp): """A get frequent word class representing a component in ML Pipelines. diff --git a/sdk/python/tests/compiler/testdata/basic_no_decorator.yaml b/sdk/python/tests/compiler/testdata/basic_no_decorator.yaml index 32cda24dcab..d4e257030a4 100644 --- a/sdk/python/tests/compiler/testdata/basic_no_decorator.yaml +++ b/sdk/python/tests/compiler/testdata/basic_no_decorator.yaml @@ -15,13 +15,14 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: annotations: - pipelines.kubeflow.org/pipeline_spec: '{"description": "Get Most Frequent Word and Save to GCS", "inputs": [{"name": "message"}, {"name": "outputpath"}], "name": "Save Most Frequent"}' + pipelines.kubeflow.org/pipeline_spec: '{"description": "Get Most Frequent Word and Save to GCS", "inputs": [{"name": "message"}, {"default": "default_output", "name": "outputpath"}], "name": "Save Most Frequent"}' generateName: save-most-frequent- spec: arguments: parameters: - name: message - name: outputpath + value: default_output entrypoint: save-most-frequent serviceAccountName: pipeline-runner onExit: exiting