-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
SDK - Components - Creating graph components from python pipeline fun…
…ction (#2273) * SDK/Components - Creating graph components from python pipeline function `create_graph_component_from_pipeline_func` converts python pipeline function to a graph component object that can be saved, shared, composed or submitted for execution. Example: producer_op = load_component(component_with_0_inputs_and_2_outputs) processor_op = load_component(component_with_2_inputs_and_2_outputs) def pipeline1(pipeline_param_1: int): producer_task = producer_op() processor_task = processor_op(pipeline_param_1, producer_task.outputs['Output 2']) return OrderedDict([ ('Pipeline output 1', producer_task.outputs['Output 1']), ('Pipeline output 2', processor_task.outputs['Output 2']), ]) graph_component = create_graph_component_from_pipeline_func(pipeline1) * Changed the signatures of exported functions Non-public create_graph_component_spec_from_pipeline_func creates ComponentSpec Public create_graph_component_from_pipeline_func creates component and writes it to file. * Switched to using _extract_component_interface to analyze function signature Stopped humanizing the input names for now. I think it's benefitial to extract interface from function signature the same way for both container and graph python components. * Support outputs declared using pipeline function's return annotation * Cleaned up the test * Stop including the whole parent tasks in task output references * By default, do not include task component specs in the graph component Remove the component spec from component reference unless it will make the reference empty or unless explicitly asked by the user * Exported the create_graph_component_from_pipeline_func function * Fixed imports * Updated the copyright year.
- Loading branch information
1 parent
88864fc
commit be4f585
Showing
6 changed files
with
251 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
147 changes: 147 additions & 0 deletions
147
sdk/python/kfp/components/_python_to_graph_component.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
# Copyright 2019 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
__all__ = [ | ||
'create_graph_component_from_pipeline_func', | ||
] | ||
|
||
|
||
import inspect | ||
from collections import OrderedDict | ||
from typing import Callable | ||
|
||
from . import _components | ||
from ._structures import TaskSpec, ComponentSpec, OutputSpec, GraphInputArgument, TaskOutputArgument, GraphImplementation, GraphSpec | ||
from ._naming import _make_name_unique_by_adding_index | ||
from ._python_op import _extract_component_interface | ||
|
||
|
||
def create_graph_component_from_pipeline_func(pipeline_func: Callable, output_component_file: str, embed_component_specs: bool = False) -> None: | ||
'''Experimental! Creates graph component definition from a python pipeline function. The component file can be published for sharing. | ||
Pipeline function is a function that only calls component functions and passes outputs to inputs. | ||
This feature is experimental and lacks support for some of the DSL features like conditions and loops. | ||
Only pipelines consisting of loaded components or python components are currently supported (no manually created ContainerOps or ResourceOps). | ||
Args: | ||
pipeline_func: Python function to convert | ||
output_component_file: Path of the file where the component definition will be written. The `component.yaml` file can then be published for sharing. | ||
embed_component_specs: Whether to embed component definitions or just reference them. Embedding makes the graph component self-contained. Default is False. | ||
Example: | ||
producer_op = load_component_from_file('producer/component.yaml') | ||
processor_op = load_component_from_file('processor/component.yaml') | ||
def pipeline1(pipeline_param_1: int): | ||
producer_task = producer_op() | ||
processor_task = processor_op(pipeline_param_1, producer_task.outputs['Output 2']) | ||
return OrderedDict([ | ||
('Pipeline output 1', producer_task.outputs['Output 1']), | ||
('Pipeline output 2', processor_task.outputs['Output 2']), | ||
]) | ||
create_graph_component_from_pipeline_func(pipeline1, output_component_file='pipeline.component.yaml') | ||
''' | ||
component_spec = create_graph_component_spec_from_pipeline_func(pipeline_func, embed_component_specs) | ||
if output_component_file: | ||
from pathlib import Path | ||
from ._yaml_utils import dump_yaml | ||
component_dict = component_spec.to_dict() | ||
component_yaml = dump_yaml(component_dict) | ||
Path(output_component_file).write_text(component_yaml) | ||
|
||
|
||
def create_graph_component_spec_from_pipeline_func(pipeline_func: Callable, embed_component_specs: bool = False) -> ComponentSpec: | ||
|
||
component_spec = _extract_component_interface(pipeline_func) | ||
# Checking the function parameters - they should not have file passing annotations. | ||
input_specs = component_spec.inputs or [] | ||
for input in input_specs: | ||
if input._passing_style: | ||
raise TypeError('Graph component function parameter "{}" cannot have file-passing annotation "{}".'.format(input.name, input._passing_style)) | ||
|
||
task_map = OrderedDict() #Preserving task order | ||
|
||
def task_construction_handler(task: TaskSpec): | ||
#Rewriting task ids so that they're same every time | ||
task_id = task.component_ref.spec.name or "Task" | ||
task_id = _make_name_unique_by_adding_index(task_id, task_map.keys(), ' ') | ||
for output_ref in task.outputs.values(): | ||
output_ref.task_output.task_id = task_id | ||
output_ref.task_output.task = None | ||
task_map[task_id] = task | ||
# Remove the component spec from component reference unless it will make the reference empty or unless explicitly asked by the user | ||
if not embed_component_specs and any([task.component_ref.name, task.component_ref.url, task.component_ref.digest]): | ||
task.component_ref.spec = None | ||
|
||
return task #The handler is a transformation function, so it must pass the task through. | ||
|
||
# Preparing the pipeline_func arguments | ||
# TODO: The key should be original parameter name if different | ||
pipeline_func_args = {input.name: GraphInputArgument(input_name=input.name) for input in input_specs} | ||
|
||
try: | ||
#Setting the handler to fix and catch the tasks. | ||
_components._created_task_transformation_handler.append(task_construction_handler) | ||
|
||
#Calling the pipeline_func with GraphInputArgument instances as arguments | ||
pipeline_func_result = pipeline_func(**pipeline_func_args) | ||
finally: | ||
_components._created_task_transformation_handler.pop() | ||
|
||
|
||
# Getting graph outputs | ||
output_names = [output.name for output in (component_spec.outputs or [])] | ||
|
||
if len(output_names) == 1 and output_names[0] == 'Output': # TODO: Check whether the NamedTuple syntax was used | ||
pipeline_func_result = [pipeline_func_result] | ||
|
||
if isinstance(pipeline_func_result, tuple) and hasattr(pipeline_func_result, '_asdict'): # collections.namedtuple and typing.NamedTuple | ||
pipeline_func_result = pipeline_func_result._asdict() | ||
|
||
if isinstance(pipeline_func_result, dict): | ||
if output_names: | ||
if set(output_names) != set(pipeline_func_result.keys()): | ||
raise ValueError('Returned outputs do not match outputs specified in the function signature: {} = {}'.format(str(set(pipeline_func_result.keys())), str(set(output_names)))) | ||
|
||
if pipeline_func_result is None: | ||
graph_output_value_map = {} | ||
elif isinstance(pipeline_func_result, dict): | ||
graph_output_value_map = OrderedDict(pipeline_func_result) | ||
elif isinstance(pipeline_func_result, (list, tuple)): | ||
if output_names: | ||
if len(pipeline_func_result) != len(output_names): | ||
raise ValueError('Expected {} values from pipeline function, but got {}.'.format(len(output_names), len(pipeline_func_result))) | ||
graph_output_value_map = OrderedDict((name_value[0], name_value[1]) for name_value in zip(output_names, pipeline_func_result)) | ||
else: | ||
graph_output_value_map = OrderedDict((output_value.task_output.output_name, output_value) for output_value in pipeline_func_result) # TODO: Fix possible name non-uniqueness (e.g. use task id as prefix or add index to non-unique names) | ||
else: | ||
raise TypeError('Pipeline must return outputs as tuple or OrderedDict.') | ||
|
||
#Checking the pipeline_func output object types | ||
for output_name, output_value in graph_output_value_map.items(): | ||
if not isinstance(output_value, TaskOutputArgument): | ||
raise TypeError('Only TaskOutputArgument instances should be returned from graph component, but got "{output_name}" = "{}".'.format(output_name, str(output_value))) | ||
|
||
if not component_spec.outputs and graph_output_value_map: | ||
component_spec.outputs = [OutputSpec(name=output_name, type=output_value.task_output.type) for output_name, output_value in graph_output_value_map.items()] | ||
|
||
component_spec.implementation = GraphImplementation( | ||
graph=GraphSpec( | ||
tasks=task_map, | ||
output_values=graph_output_value_map, | ||
) | ||
) | ||
return component_spec |
15 changes: 15 additions & 0 deletions
15
sdk/python/tests/components/test_data/component_with_0_inputs_and_2_outputs.component.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
name: Component with 0 inputs and 2 outputs | ||
outputs: | ||
- {name: Output 1} | ||
- {name: Output 2} | ||
implementation: | ||
container: | ||
image: busybox | ||
command: [sh, -c, ' | ||
echo "Data 1" > $0 | ||
echo "Data 2" > $1 | ||
' | ||
] | ||
args: | ||
- {outputPath: Output 1} | ||
- {outputPath: Output 2} |
15 changes: 15 additions & 0 deletions
15
sdk/python/tests/components/test_data/component_with_2_inputs_and_0_outputs.component.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
name: Component with 2 inputs and 0 outputs | ||
inputs: | ||
- {name: Input parameter} | ||
- {name: Input artifact} | ||
implementation: | ||
container: | ||
image: busybox | ||
command: [sh, -c, ' | ||
echo "Input parameter = $0" | ||
echo "Input artifact = $(< $1)" | ||
' | ||
] | ||
args: | ||
- {inputValue: Input parameter} | ||
- {inputPath: Input artifact} |
22 changes: 22 additions & 0 deletions
22
sdk/python/tests/components/test_data/component_with_2_inputs_and_2_outputs.component.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
name: Component with 2 inputs and 2 outputs | ||
inputs: | ||
- {name: Input parameter} | ||
- {name: Input artifact} | ||
outputs: | ||
- {name: Output 1} | ||
- {name: Output 2} | ||
implementation: | ||
container: | ||
image: busybox | ||
command: [sh, -c, ' | ||
mkdir -p $(dirname "$2") | ||
mkdir -p $(dirname "$3") | ||
echo "$0" > "$2" | ||
cp "$1" "$3" | ||
' | ||
] | ||
args: | ||
- {inputValue: Input parameter} | ||
- {inputPath: Input artifact} | ||
- {outputPath: Output 1} | ||
- {outputPath: Output 2} |
51 changes: 51 additions & 0 deletions
51
sdk/python/tests/components/test_python_pipeline_to_graph_component.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
# Copyright 2019 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import os | ||
import sys | ||
import unittest | ||
from collections import OrderedDict | ||
from pathlib import Path | ||
|
||
import kfp.components as comp | ||
from kfp.components._python_to_graph_component import create_graph_component_spec_from_pipeline_func | ||
|
||
|
||
class PythonPipelineToGraphComponentTestCase(unittest.TestCase): | ||
def test_handle_creating_graph_component_from_pipeline_that_uses_container_components(self): | ||
test_data_dir = Path(__file__).parent / 'test_data' | ||
producer_op = comp.load_component_from_file(str(test_data_dir / 'component_with_0_inputs_and_2_outputs.component.yaml')) | ||
processor_op = comp.load_component_from_file(str(test_data_dir / 'component_with_2_inputs_and_2_outputs.component.yaml')) | ||
consumer_op = comp.load_component_from_file(str(test_data_dir / 'component_with_2_inputs_and_0_outputs.component.yaml')) | ||
|
||
def pipeline1(pipeline_param_1: int): | ||
producer_task = producer_op() | ||
processor_task = processor_op(pipeline_param_1, producer_task.outputs['Output 2']) | ||
consumer_task = consumer_op(processor_task.outputs['Output 1'], processor_task.outputs['Output 2']) | ||
|
||
return OrderedDict([ # You can safely return normal dict in python 3.6+ | ||
('Pipeline output 1', producer_task.outputs['Output 1']), | ||
('Pipeline output 2', processor_task.outputs['Output 2']), | ||
]) | ||
|
||
graph_component = create_graph_component_spec_from_pipeline_func(pipeline1) | ||
|
||
self.assertEqual(len(graph_component.inputs), 1) | ||
self.assertListEqual([input.name for input in graph_component.inputs], ['pipeline_param_1']) #Relies on human name conversion function stability | ||
self.assertListEqual([output.name for output in graph_component.outputs], ['Pipeline output 1', 'Pipeline output 2']) | ||
self.assertEqual(len(graph_component.implementation.graph.tasks), 3) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |