Skip to content

Commit

Permalink
SDK - Unified the function signature parsing implementations (#2689)
Browse files Browse the repository at this point in the history
* Replaced `_instance_to_dict(obj)` with `obj.to_dict()`

* Fixed the capitalization in _python_function_name_to_component_name
It now only changes the case of the first letter.

* Replaced the _extract_component_metadata function with _extract_component_interface

* Stopped adding newline to the component description.

* Handling None inputs and outputs

* Not including emply inputs and outputs in component spec

* Renamed the private attributes that the @pipeline decorator sets

* Changged _extract_pipeline_metadata to use _extract_component_interface

* Fixed issues based on feedback
  • Loading branch information
Ark-kun authored and k8s-ci-robot committed Dec 27, 2019
1 parent ac21a3a commit 27f7e77
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 115 deletions.
45 changes: 36 additions & 9 deletions sdk/python/kfp/components/_python_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ def _extract_component_interface(func) -> ComponentSpec:
def annotation_to_type_struct(annotation):
if not annotation or annotation == inspect.Parameter.empty:
return None
if hasattr(annotation, 'to_dict'):
annotation = annotation.to_dict()
if isinstance(annotation, dict):
return annotation
if isinstance(annotation, type):
if annotation in type_to_type_name:
return type_to_type_name[annotation]
Expand Down Expand Up @@ -283,7 +287,8 @@ def annotation_to_type_struct(annotation):
if parameter.default is not inspect.Parameter.empty:
input_spec.optional = True
if parameter.default is not None:
input_spec.default = serialize_value(parameter.default, type_struct)
outer_type_name = list(type_struct.keys())[0] if isinstance(type_struct, dict) else type_struct
input_spec.default = serialize_value(parameter.default, outer_type_name)
input_spec._passing_style = passing_style
input_spec._parameter_name = parameter.name
inputs.append(input_spec)
Expand All @@ -305,6 +310,21 @@ def annotation_to_type_struct(annotation):
output_spec._passing_style = None
output_spec._return_tuple_field_name = field_name
outputs.append(output_spec)
# Deprecated dict-based way of declaring multiple outputs. Was only used by the @component decorator
elif isinstance(return_ann, dict):
import warnings
warnings.warn(
"The ability to specify multiple outputs using the dict syntax has been deprecated."
"It will be removed soon after release 0.1.32."
"Please use typing.NamedTuple to declare multiple outputs."
)
for output_name, output_type_annotation in return_ann.items():
output_type_struct = annotation_to_type_struct(output_type_annotation)
output_spec = OutputSpec(
name=output_name,
type=output_type_struct,
)
outputs.append(output_spec)
elif signature.return_annotation is not None and signature.return_annotation != inspect.Parameter.empty:
output_name = _make_name_unique_by_adding_index(single_output_name_const, output_names, '_') # Fixes exotic, but possible collision: `def func(output_path: OutputPath()) -> str: ...`
output_names.add(output_name)
Expand All @@ -321,13 +341,17 @@ def annotation_to_type_struct(annotation):
component_name = getattr(func, '_component_human_name', None) or _python_function_name_to_component_name(func.__name__)
description = getattr(func, '_component_description', None) or func.__doc__
if description:
description = description.strip() + '\n' #Interesting: unlike ruamel.yaml, PyYaml cannot handle trailing spaces in the last line (' \n') and switches the style to double-quoted.
description = description.strip()

# TODO: Parse input/output descriptions from the function docstring. See:
# https://github.com/rr-/docstring_parser
# https://github.com/terrencepreilly/darglint/blob/master/darglint/parse.py

component_spec = ComponentSpec(
name=component_name,
description=description,
inputs=inputs,
outputs=outputs,
inputs=inputs if inputs else None,
outputs=outputs if outputs else None,
)
return component_spec

Expand Down Expand Up @@ -360,9 +384,12 @@ def _func_to_component_spec(func, extra_code='', base_image : str = None, packag

component_spec = _extract_component_interface(func)

component_inputs = component_spec.inputs or []
component_outputs = component_spec.outputs or []

arguments = []
arguments.extend(InputValuePlaceholder(input.name) for input in component_spec.inputs)
arguments.extend(OutputPathPlaceholder(output.name) for output in component_spec.outputs)
arguments.extend(InputValuePlaceholder(input.name) for input in component_inputs)
arguments.extend(OutputPathPlaceholder(output.name) for output in component_outputs)

if use_code_pickling:
func_code = _capture_function_code_using_cloudpickle(func, modules_to_capture)
Expand Down Expand Up @@ -425,10 +452,10 @@ def get_serializer_and_register_definitions(type_name) -> str:
description_repr=repr(component_spec.description or ''),
),
]
outputs_passed_through_func_return_tuple = [output for output in (component_spec.outputs or []) if output._passing_style is None]
file_outputs_passed_using_func_parameters = [output for output in (component_spec.outputs or []) if output._passing_style is not None]
outputs_passed_through_func_return_tuple = [output for output in component_outputs if output._passing_style is None]
file_outputs_passed_using_func_parameters = [output for output in component_outputs if output._passing_style is not None]
arguments = []
for input in component_spec.inputs + file_outputs_passed_using_func_parameters:
for input in component_inputs + file_outputs_passed_using_func_parameters:
param_flag = "--" + input.name.replace("_", "-")
is_required = isinstance(input, OutputSpec) or not input.optional
line = '_parser.add_argument("{param_flag}", dest="{param_var}", type={param_type}, required={is_required}, default=argparse.SUPPRESS)'.format(
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/kfp/dsl/_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ._metadata import _extract_component_metadata
from ._pipeline_param import PipelineParam
from .types import check_types, InconsistentTypeException
from ._ops_group import Graph
Expand Down Expand Up @@ -67,7 +66,8 @@ def foobar(model: TFModel(), step: MLStep()):
from functools import wraps
@wraps(func)
def _component(*args, **kargs):
component_meta = _extract_component_metadata(func)
from ..components._python_op import _extract_component_interface
component_meta = _extract_component_interface(func)
if kfp.TYPE_CHECK:
arg_index = 0
for arg in args:
Expand Down
79 changes: 9 additions & 70 deletions sdk/python/kfp/dsl/_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import warnings
from .types import BaseType, _check_valid_type_dict, _instance_to_dict
from .types import BaseType, _check_valid_type_dict
from ..components._data_passing import serialize_value
from ..components._structures import ComponentSpec, InputSpec, OutputSpec

Expand All @@ -29,7 +29,7 @@ def _annotation_to_typemeta(annotation):
dict or string representing the type
'''
if isinstance(annotation, BaseType):
arg_type = _instance_to_dict(annotation)
arg_type = annotation.to_dict()
elif isinstance(annotation, str):
arg_type = annotation
elif isinstance(annotation, dict):
Expand All @@ -41,61 +41,12 @@ def _annotation_to_typemeta(annotation):
return arg_type


def _extract_component_metadata(func):
'''Creates component metadata structure instance based on the function signature.'''

# Importing here to prevent circular import failures
#TODO: Change _pipeline_param to stop importing _metadata
from ._pipeline_param import PipelineParam

import inspect
fullargspec = inspect.getfullargspec(func)
annotations = fullargspec.annotations

# defaults
arg_defaults = {}
if fullargspec.defaults:
for arg, default in zip(reversed(fullargspec.args), reversed(fullargspec.defaults)):
arg_defaults[arg] = default

# Inputs
inputs = []
for arg in fullargspec.args:
arg_type = None
arg_default = arg_defaults[arg] if arg in arg_defaults else None
if isinstance(arg_default, PipelineParam):
warnings.warn('Explicit creation of `kfp.dsl.PipelineParam`s by the users is deprecated. The users should define the parameter type and default values using standard pythonic constructs: def my_func(a: int = 1, b: str = "default"):')
arg_default = arg_default.value
if arg in annotations:
arg_type = _annotation_to_typemeta(annotations[arg])
if arg_default is not None:
arg_default = serialize_value(arg_default, type_name=str(arg_type) if arg_type else None) # TODO: Improve _annotation_to_typemeta or just replace the whole function with kfp.component._python_op._extract_component_interface
inputs.append(InputSpec(name=arg, type=arg_type, default=arg_default))
# Outputs
outputs = []
if 'return' in annotations:
for output in annotations['return']:
arg_type = _annotation_to_typemeta(annotations['return'][output])
outputs.append(OutputSpec(name=output, type=arg_type))

#TODO: add descriptions to the metadata
#docstring parser:
# https://github.com/rr-/docstring_parser
# https://github.com/terrencepreilly/darglint/blob/master/darglint/parse.py

# Construct the ComponentSpec
return ComponentSpec(
name=func.__name__,
inputs=inputs if inputs else None,
outputs=outputs if outputs else None,
)


def _extract_pipeline_metadata(func):
'''Creates pipeline metadata structure instance based on the function signature.'''

# Importing here to prevent circular import failures
#TODO: Change _pipeline_param to stop importing _metadata
# Most of this code is only needed for verifying the default values against "openapi_schema_validator" type properties.
# TODO: Move the value verification code to some other place

from ._pipeline_param import PipelineParam

import inspect
Expand All @@ -109,8 +60,6 @@ def _extract_pipeline_metadata(func):
for arg, default in zip(reversed(fullargspec.args), reversed(fullargspec.defaults)):
arg_defaults[arg] = default

# Inputs
inputs = []
for arg in args:
arg_type = None
arg_default = arg_defaults[arg] if arg in arg_defaults else None
Expand All @@ -129,19 +78,9 @@ def _extract_pipeline_metadata(func):
schema_object = json.loads(schema_object)
# Only validating non-serialized values
validate(instance=arg_default, schema=schema_object)
if arg_default is not None:
arg_default = serialize_value(arg_default, type_name=str(arg_type) if arg_type else None) # TODO: Improve _annotation_to_typemeta or just replace the whole function with kfp.component._python_op._extract_component_interface
inputs.append(InputSpec(name=arg, type=arg_type, default=arg_default))

#TODO: add descriptions to the metadata
#docstring parser:
# https://github.com/rr-/docstring_parser
# https://github.com/terrencepreilly/darglint/blob/master/darglint/parse.py

# Construct the ComponentSpec
pipeline_meta = ComponentSpec(
name=getattr(func, '_pipeline_name', func.__name__),
description=getattr(func, '_pipeline_description', func.__doc__),
inputs=inputs if inputs else None,
)
return pipeline_meta

from kfp.components._python_op import _extract_component_interface
component_spec = _extract_component_interface(func)
return component_spec
4 changes: 2 additions & 2 deletions sdk/python/kfp/dsl/_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ def my_pipeline(a: PipelineParam, b: PipelineParam):
"""
def _pipeline(func):
if name:
func._pipeline_name = name
func._component_human_name = name
if description:
func._pipeline_description = description
func._component_description = description

if _pipeline_decorator_handler:
return _pipeline_decorator_handler(func) or func
Expand Down
26 changes: 10 additions & 16 deletions sdk/python/kfp/dsl/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@


class BaseType:
'''MetaType is a base type for all scalar and artifact types.
'''
pass
'''BaseType is a base type for all scalar and artifact types.'''

def to_dict(self) -> Union[Dict, str]:
'''to_dict serializes the type instance into a python dictionary or string'''
return {type(self).__name__: self.__dict__} if self.__dict__ else type(self).__name__


# Primitive Types
class Integer(BaseType):
Expand Down Expand Up @@ -137,12 +140,12 @@ def check_types(checked_type, expected_type):
expected_type (BaseType/str/dict): it describes a type from the downstream component input
'''
if isinstance(checked_type, BaseType):
checked_type = _instance_to_dict(checked_type)
elif isinstance(checked_type, str):
checked_type = checked_type.to_dict()
if isinstance(checked_type, str):
checked_type = {checked_type: {}}
if isinstance(expected_type, BaseType):
expected_type = _instance_to_dict(expected_type)
elif isinstance(expected_type, str):
expected_type = expected_type.to_dict()
if isinstance(expected_type, str):
expected_type = {expected_type: {}}
return _check_dict_types(checked_type, expected_type)

Expand All @@ -163,15 +166,6 @@ def _check_valid_type_dict(payload):
return False
return True

def _instance_to_dict(instance):
'''_instance_to_dict serializes the type instance into a python dictionary
Args:
instance(BaseType): An instance that describes a type
Return:
dict
'''
return {type(instance).__name__: instance.__dict__}

def _check_dict_types(checked_type, expected_type):
'''_check_dict_types checks the type consistency.
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/compiler/testdata/artifact_location.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "hello world", "inputs": [{"name": "tag"}, {"default": "kubeflow", "name": "namespace"}, {"default": "foobar", "name": "bucket"}], "name": "artifact-location-pipeine"}'
pipelines.kubeflow.org/pipeline_spec: '{"description": "hello world", "inputs": [{"name": "tag", "type": "String"}, {"default": "kubeflow", "name": "namespace", "type": "String"}, {"default": "foobar", "name": "bucket", "type": "String"}], "name": "artifact-location-pipeine"}'
generateName: artifact-location-pipeine-
spec:
arguments:
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/compiler/testdata/basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "Get Most Frequent Word and Save to GCS", "inputs": [{"name": "message"}, {"name": "outputpath"}], "name": "Save Most Frequent"}'
pipelines.kubeflow.org/pipeline_spec: '{"description": "Get Most Frequent Word and Save to GCS", "inputs": [{"name": "message", "type": "String"}, {"name": "outputpath", "type": "String"}], "name": "Save Most Frequent"}'
generateName: save-most-frequent-
spec:
arguments:
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/compiler/testdata/imagepullsecrets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "Get Most Frequent Word and Save to GCS", "inputs": [{"name": "message"}], "name": "Save Most Frequent"}'
pipelines.kubeflow.org/pipeline_spec: '{"description": "Get Most Frequent Word and Save to GCS", "inputs": [{"name": "message", "type": "String"}], "name": "Save Most Frequent"}'
generateName: save-most-frequent-
spec:
arguments:
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/components/test_python_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def my_func( # noqa: F722
component_spec.to_dict(),
{
'name': 'My func',
'description': 'Function docstring\n',
'description': 'Function docstring',
'inputs': [
{'name': 'required_param'},
{'name': 'int_param', 'type': 'Integer', 'default': '42', 'optional': True},
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/tests/dsl/component_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def componentA(a: {'ArtifactA': {'file_type': 'csv'}}, b: Integer() = 12, c: {'A

containerOp = componentA(1,2,c=3)

golden_meta = ComponentSpec(name='componentA', inputs=[], outputs=[])
golden_meta = ComponentSpec(name='ComponentA', inputs=[], outputs=[])
golden_meta.inputs.append(InputSpec(name='a', type={'ArtifactA': {'file_type': 'csv'}}))
golden_meta.inputs.append(InputSpec(name='b', type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default="12"))
golden_meta.inputs.append(InputSpec(name='c', type={'ArtifactB': {'path_type':'file', 'file_type': 'tsv'}}, default='gs://hello/world'))
golden_meta.inputs.append(InputSpec(name='b', type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default="12", optional=True))
golden_meta.inputs.append(InputSpec(name='c', type={'ArtifactB': {'path_type':'file', 'file_type': 'tsv'}}, default='gs://hello/world', optional=True))
golden_meta.outputs.append(OutputSpec(name='model', type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}))

self.assertEqual(containerOp._metadata, golden_meta)
Expand Down
14 changes: 7 additions & 7 deletions sdk/python/tests/dsl/pipeline_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ def my_pipeline1():
def my_pipeline2():
pass

self.assertEqual(my_pipeline1._pipeline_name, 'p1')
self.assertEqual(my_pipeline2._pipeline_name, 'p2')
self.assertEqual(my_pipeline1._pipeline_description, 'description1')
self.assertEqual(my_pipeline2._pipeline_description, 'description2')
self.assertEqual(my_pipeline1._component_human_name, 'p1')
self.assertEqual(my_pipeline2._component_human_name, 'p2')
self.assertEqual(my_pipeline1._component_description, 'description1')
self.assertEqual(my_pipeline2._component_description, 'description2')

def test_decorator_metadata(self):
"""Test @pipeline decorator with metadata."""
Expand All @@ -71,8 +71,8 @@ def my_pipeline1(a: {'Schema': {'file_type': 'csv'}}='good', b: Integer()=12):
pass

golden_meta = ComponentSpec(name='p1', description='description1', inputs=[])
golden_meta.inputs.append(InputSpec(name='a', type={'Schema': {'file_type': 'csv'}}, default='good'))
golden_meta.inputs.append(InputSpec(name='b', type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default="12"))
golden_meta.inputs.append(InputSpec(name='a', type={'Schema': {'file_type': 'csv'}}, default='good', optional=True))
golden_meta.inputs.append(InputSpec(name='b', type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default="12", optional=True))

pipeline_meta = _extract_pipeline_metadata(my_pipeline1)
self.assertEqual(pipeline_meta, golden_meta)
self.assertEqual(pipeline_meta, golden_meta)
4 changes: 2 additions & 2 deletions sdk/python/tests/dsl/type_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp.dsl.types import _instance_to_dict, check_types, GCSPath
from kfp.dsl.types import check_types, GCSPath
import unittest

class TestTypes(unittest.TestCase):

def test_class_to_dict(self):
"""Test _class_to_dict function."""
gcspath_dict = _instance_to_dict(GCSPath())
gcspath_dict = GCSPath().to_dict()
golden_dict = {
'GCSPath': {
'openapi_schema_validator': {
Expand Down

0 comments on commit 27f7e77

Please sign in to comment.