Skip to content

Commit

Permalink
SDK - Refactoring - Replaced the TypeMeta class (kubeflow#1930)
Browse files Browse the repository at this point in the history
* SDK - Refactoring - Replaced the TypeMeta class
The PipelineParam no longer exposes the private TypeMeta class
Fixes kubeflow#1420

The refactoring PR is part of a series of PR which unifies the metadata and specification types.
  • Loading branch information
Ark-kun authored Aug 22, 2019
1 parent 736c7a5 commit c01315a
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 131 deletions.
4 changes: 2 additions & 2 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from ._op_to_template import _op_to_template
from ._default_transformers import add_pod_env

from ..dsl._metadata import TypeMeta, _extract_pipeline_metadata
from ..dsl._metadata import _extract_pipeline_metadata
from ..dsl._ops_group import OpsGroup

class Compiler(object):
Expand Down Expand Up @@ -596,7 +596,7 @@ def _compile(self, pipeline_func):

args_list = []
for arg_name in argspec.args:
arg_type = TypeMeta()
arg_type = None
for input in pipeline_meta.inputs:
if arg_name == input.name:
arg_type = input.param_type
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/kfp/components/_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ def create_task_from_component_and_arguments(pythonic_arguments):
if kfp.TYPE_CHECK:
for input_spec in component_spec.inputs:
if input_spec.name == key:
if arguments[key].param_type is not None and not check_types(arguments[key].param_type.to_dict_or_str(), '' if input_spec.type is None else input_spec.type):
raise InconsistentTypeException('Component "' + name + '" is expecting ' + key + ' to be type(' + str(input_spec.type) + '), but the passed argument is type(' + arguments[key].param_type.serialize() + ')')
if arguments[key].param_type is not None and not check_types(arguments[key].param_type, '' if input_spec.type is None else input_spec.type):
raise InconsistentTypeException('Component "' + name + '" is expecting ' + key + ' to be type(' + str(input_spec.type) + '), but the passed argument is type(' + str(arguments[key].param_type) + ')')
arguments[key] = str(arguments[key])

task = TaskSpec(
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/kfp/components/_dsl_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import Mapping
from ._structures import ContainerImplementation, ConcatPlaceholder, IfPlaceholder, InputValuePlaceholder, InputPathPlaceholder, IsPresentPlaceholder, OutputPathPlaceholder, TaskSpec
from ._components import _generate_output_file_name, _default_component_name
from kfp.dsl._metadata import ComponentMeta, ParameterMeta, TypeMeta, _annotation_to_typemeta
from kfp.dsl._metadata import ComponentMeta, ParameterMeta

def create_container_op_from_task(task_spec: TaskSpec):
argument_values = task_spec.arguments
Expand Down Expand Up @@ -143,10 +143,10 @@ def _create_container_op_from_resolved_task(name:str, container_image:str, comma
# Inputs
if component_spec.inputs is not None:
for input in component_spec.inputs:
component_meta.inputs.append(ParameterMeta(name=input.name, description=input.description, param_type=_annotation_to_typemeta(input.type), default=input.default))
component_meta.inputs.append(ParameterMeta(name=input.name, description=input.description, param_type=input.type, default=input.default))
if component_spec.outputs is not None:
for output in component_spec.outputs:
component_meta.outputs.append(ParameterMeta(name=output.name, description=output.description, param_type=_annotation_to_typemeta(output.type)))
component_meta.outputs.append(ParameterMeta(name=output.name, description=output.description, param_type=output.type))

task = dsl.ContainerOp(
name=name,
Expand Down
12 changes: 6 additions & 6 deletions sdk/python/kfp/dsl/_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,19 @@ def _component(*args, **kargs):
if kfp.TYPE_CHECK:
arg_index = 0
for arg in args:
if isinstance(arg, PipelineParam) and not check_types(arg.param_type.to_dict_or_str(), component_meta.inputs[arg_index].param_type.to_dict_or_str()):
if isinstance(arg, PipelineParam) and not check_types(arg.param_type, component_meta.inputs[arg_index].param_type):
raise InconsistentTypeException('Component "' + component_meta.name + '" is expecting ' + component_meta.inputs[arg_index].name +
' to be type(' + component_meta.inputs[arg_index].param_type.serialize() +
'), but the passed argument is type(' + arg.param_type.serialize() + ')')
' to be type(' + str(component_meta.inputs[arg_index].param_type) +
'), but the passed argument is type(' + str(arg.param_type) + ')')
arg_index += 1
if kargs is not None:
for key in kargs:
if isinstance(kargs[key], PipelineParam):
for input_spec in component_meta.inputs:
if input_spec.name == key and not check_types(kargs[key].param_type.to_dict_or_str(), input_spec.param_type.to_dict_or_str()):
if input_spec.name == key and not check_types(kargs[key].param_type, input_spec.param_type):
raise InconsistentTypeException('Component "' + component_meta.name + '" is expecting ' + input_spec.name +
' to be type(' + input_spec.param_type.serialize() +
'), but the passed argument is type(' + kargs[key].param_type.serialize() + ')')
' to be type(' + str(input_spec.param_type) +
'), but the passed argument is type(' + str(kargs[key].param_type) + ')')

container_op = func(*args, **kargs)
container_op._set_metadata(component_meta)
Expand Down
71 changes: 15 additions & 56 deletions sdk/python/kfp/dsl/_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,64 +32,22 @@ def serialize(self):
def __eq__(self, other):
return self.__dict__ == other.__dict__

class TypeMeta(BaseMeta):
def __init__(self,
name: str = '',
properties: Dict = None):
self.name = name
self.properties = {} if properties is None else properties

def to_dict_or_str(self):
if self.properties is None or len(self.properties) == 0:
return self.name
else:
return {self.name: self.properties}

@staticmethod
def from_dict_or_str(payload):
'''from_dict_or_str accepts a payload object and returns a TypeMeta instance
Args:
payload (str/dict): the payload could be a str or a dict
'''

type_meta = TypeMeta()
if isinstance(payload, dict):
if not _check_valid_type_dict(payload):
raise ValueError(payload + ' is not a valid type string')
type_meta.name, type_meta.properties = list(payload.items())[0]
# Convert possible OrderedDict to dict
type_meta.properties = dict(type_meta.properties)
elif isinstance(payload, str):
type_meta.name = payload
else:
raise ValueError('from_dict_or_str is expecting either dict or str.')
return type_meta

def serialize(self):
return str(self.to_dict_or_str())

@staticmethod
def deserialize(payload):
'''deserialize expects two types of input: dict and str
1) If the payload is a string, the type is named as such with no properties.
2) If the payload is a dict, the type name and properties are extracted. '''
return TypeMeta.from_dict_or_str(payload)

class ParameterMeta(BaseMeta):
def __init__(self,
name: str,
description: str = '',
param_type: TypeMeta = None,
param_type = None,
default = None):
self.name = name
self.description = description
self.param_type = TypeMeta() if param_type is None else param_type
self.param_type = param_type
self.default = default

def to_dict(self):
return {'name': self.name,
'description': self.description,
'type': self.param_type.to_dict_or_str(),
'type': self.param_type or '',
'default': self.default}

class ComponentMeta(BaseMeta):
Expand Down Expand Up @@ -132,25 +90,25 @@ def to_dict(self):
}

def _annotation_to_typemeta(annotation):
'''_annotation_to_type_meta converts an annotation to an instance of TypeMeta
'''_annotation_to_type_meta converts an annotation to a type structure
Args:
annotation(BaseType/str/dict): input/output annotations
BaseType: registered in kfp.dsl.types
str: either a string of a dict serialization or a string of the type name
dict: type name and properties. note that the properties values can be dict.
Returns:
TypeMeta
dict or string representing the type
'''
if isinstance(annotation, BaseType):
arg_type = TypeMeta.deserialize(_instance_to_dict(annotation))
arg_type = _instance_to_dict(annotation)
elif isinstance(annotation, str):
arg_type = TypeMeta.deserialize(annotation)
arg_type = annotation
elif isinstance(annotation, dict):
if not _check_valid_type_dict(annotation):
raise ValueError('Annotation ' + str(annotation) + ' is not a valid type dictionary.')
arg_type = TypeMeta.deserialize(annotation)
arg_type = annotation
else:
return TypeMeta()
return None
return arg_type


Expand All @@ -174,7 +132,7 @@ def _extract_component_metadata(func):
# Inputs
inputs = []
for arg in fullargspec.args:
arg_type = TypeMeta()
arg_type = None
arg_default = arg_defaults[arg] if arg in arg_defaults else None
if isinstance(arg_default, PipelineParam):
arg_default = arg_default.value
Expand Down Expand Up @@ -227,19 +185,20 @@ def _extract_pipeline_metadata(func):
)
# Inputs
for arg in args:
arg_type = TypeMeta()
arg_type = None
arg_default = arg_defaults[arg] if arg in arg_defaults else None
if isinstance(arg_default, PipelineParam):
arg_default = arg_default.value
if arg in annotations:
arg_type = _annotation_to_typemeta(annotations[arg])
if 'openapi_schema_validator' in arg_type.properties and arg_default is not None:
arg_type_properties = list(arg_type.values())[0] if isinstance(arg_type, dict) else {}
if 'openapi_schema_validator' in arg_type_properties and arg_default is not None:
from jsonschema import validate
import json
schema_object = arg_type.properties['openapi_schema_validator']
schema_object = arg_type_properties['openapi_schema_validator']
if isinstance(schema_object, str):
# In case the property value for the schema validator is a string instead of a dict.
schema_object = json.loads(arg_type.properties['openapi_schema_validator'])
schema_object = json.loads(schema_object)
validate(instance=arg_default, schema=schema_object)
pipeline_meta.inputs.append(ParameterMeta(name=arg, description='', param_type=arg_type, default=arg_default))

Expand Down
7 changes: 3 additions & 4 deletions sdk/python/kfp/dsl/_pipeline_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@

import re
from collections import namedtuple
from typing import List
from ._metadata import TypeMeta
from typing import Dict, List, Union


# TODO: Move this to a separate class
Expand Down Expand Up @@ -136,7 +135,7 @@ class PipelineParam(object):
value passed between components.
"""

def __init__(self, name: str, op_name: str=None, value: str=None, param_type: TypeMeta=TypeMeta(), pattern: str=None):
def __init__(self, name: str, op_name: str=None, value: str=None, param_type : Union[str, Dict] = None, pattern: str=None):
"""Create a new instance of PipelineParam.
Args:
name: name of the pipeline parameter.
Expand Down Expand Up @@ -218,6 +217,6 @@ def __hash__(self):

def ignore_type(self):
"""ignore_type ignores the type information such that type checking would also pass"""
self.param_type = TypeMeta()
self.param_type = None
return self

3 changes: 3 additions & 0 deletions sdk/python/kfp/dsl/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ def _check_dict_types(checked_type, expected_type):
checked_type (dict): A dict that describes a type from the upstream component output
expected_type (dict): A dict that describes a type from the downstream component input
'''
if not checked_type or not expected_type:
# If the type is empty, it matches any types
return True
checked_type_name,_ = list(checked_type.items())[0]
expected_type_name,_ = list(expected_type.items())[0]
if checked_type_name == '' or expected_type_name == '':
Expand Down
10 changes: 5 additions & 5 deletions sdk/python/tests/dsl/component_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import kfp
import kfp.dsl as dsl
from kfp.dsl import component, graph_component
from kfp.dsl._metadata import ComponentMeta, ParameterMeta, TypeMeta
from kfp.dsl._metadata import ComponentMeta, ParameterMeta
from kfp.dsl.types import Integer, GCSPath, InconsistentTypeException
from kfp.dsl import ContainerOp, Pipeline, PipelineParam
import unittest
Expand All @@ -36,10 +36,10 @@ def componentA(a: {'ArtifactA': {'file_type': 'csv'}}, b: Integer() = 12, c: {'A
containerOp = componentA(1,2,c=3)

golden_meta = ComponentMeta(name='componentA', description='')
golden_meta.inputs.append(ParameterMeta(name='a', description='', param_type=TypeMeta(name='ArtifactA', properties={'file_type': 'csv'})))
golden_meta.inputs.append(ParameterMeta(name='b', description='', param_type=TypeMeta(name='Integer', properties={'openapi_schema_validator': {"type": "integer"}}), default=12))
golden_meta.inputs.append(ParameterMeta(name='c', description='', param_type=TypeMeta(name='ArtifactB', properties={'path_type':'file', 'file_type': 'tsv'}), default='gs://hello/world'))
golden_meta.outputs.append(ParameterMeta(name='model', description='', param_type=TypeMeta(name='Integer', properties={'openapi_schema_validator': {"type": "integer"}})))
golden_meta.inputs.append(ParameterMeta(name='a', description='', param_type={'ArtifactA': {'file_type': 'csv'}}))
golden_meta.inputs.append(ParameterMeta(name='b', description='', param_type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default=12))
golden_meta.inputs.append(ParameterMeta(name='c', description='', param_type={'ArtifactB': {'path_type':'file', 'file_type': 'tsv'}}, default='gs://hello/world'))
golden_meta.outputs.append(ParameterMeta(name='model', description='', param_type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}))

self.assertEqual(containerOp._metadata, golden_meta)

Expand Down
59 changes: 13 additions & 46 deletions sdk/python/tests/dsl/metadata_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp.dsl._metadata import ComponentMeta, ParameterMeta, TypeMeta
from kfp.dsl._metadata import ComponentMeta, ParameterMeta
import unittest

class TestTypeMeta(unittest.TestCase):
def test_deserialize(self):
component_dict = {
'GCSPath': {
'bucket_type': 'directory',
'file_type': 'csv'
}
}
golden_type_meta = TypeMeta(name='GCSPath', properties={'bucket_type': 'directory',
'file_type': 'csv'})
self.assertEqual(TypeMeta.deserialize(component_dict), golden_type_meta)

component_str = 'GCSPath'
golden_type_meta = TypeMeta(name='GCSPath')
self.assertEqual(TypeMeta.deserialize(component_str), golden_type_meta)


def test_eq(self):
type_a = TypeMeta(name='GCSPath', properties={'bucket_type': 'directory',
'file_type': 'csv'})
type_b = TypeMeta(name='GCSPath', properties={'bucket_type': 'directory',
'file_type': 'tsv'})
type_c = TypeMeta(name='GCSPatha', properties={'bucket_type': 'directory',
'file_type': 'csv'})
type_d = TypeMeta(name='GCSPath', properties={'bucket_type': 'directory',
'file_type': 'csv'})
self.assertNotEqual(type_a, type_b)
self.assertNotEqual(type_a, type_c)
self.assertEqual(type_a, type_d)


class TestComponentMeta(unittest.TestCase):

Expand All @@ -53,34 +23,31 @@ def test_to_dict(self):
description='foobar example',
inputs=[ParameterMeta(name='input1',
description='input1 desc',
param_type=TypeMeta(name='GCSPath',
properties={'bucket_type': 'directory',
'file_type': 'csv'
}
),
param_type={'GCSPath': {
'bucket_type': 'directory',
'file_type': 'csv'
}},
default='default1'
),
ParameterMeta(name='input2',
description='input2 desc',
param_type=TypeMeta(name='TFModel',
properties={'input_data': 'tensor',
'version': '1.8.0'
}
),
param_type={'TFModel': {
'input_data': 'tensor',
'version': '1.8.0'
}},
default='default2'
),
ParameterMeta(name='input3',
description='input3 desc',
param_type=TypeMeta(name='Integer'),
param_type='Integer',
default='default3'
),
],
outputs=[ParameterMeta(name='output1',
description='output1 desc',
param_type=TypeMeta(name='Schema',
properties={'file_type': 'tsv'
}
),
param_type={'Schema': {
'file_type': 'tsv'
}},
default='default_output1'
)
]
Expand Down
Loading

0 comments on commit c01315a

Please sign in to comment.