From d1328bf82793c9547115b84d775a651fd75fd60a Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Tue, 5 Feb 2019 17:11:43 -0800 Subject: [PATCH 01/29] add a While in the ops group --- sdk/python/kfp/dsl/_ops_group.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 321fe4792af..86483370d2f 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -15,7 +15,7 @@ from . import _container_op from . import _pipeline - +from ._pipeline_param import ConditionOperator class OpsGroup(object): """Represents a logical group of ops and group of OpsGroups. @@ -92,10 +92,30 @@ class Condition(OpsGroup): def __init__(self, condition): """Create a new instance of ExitHandler. Args: - exit_op: an operator invoked at exiting a group of ops. + condition (ConditionOperator): the condition. Raises: ValueError is the exit_op is invalid. """ super(Condition, self).__init__('condition') self.condition = condition + +class While(OpsGroup): + """While loop with conditions. + TODO: Example usage: + """ + def __init__(self, condition): + """Create a while loop structure + Args: + condition (ConditionOperator): the condition. + """ + super(While, self).__init__('while') + if not isinstance(condition, (ConditionOperator)): + raise ValueError + self.condition = condition + + def __exit__(self, *args): + #TODO: while needs special handling of the condition because + # the pipelineparam during the exit contains the dynamic information + # that are needed to resolve the condition. + _pipeline.Pipeline.get_default_pipeline().pop_ops_group() \ No newline at end of file From d740b44cbc3dcd54bfa269cf0e227be31c7b0a0f Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Tue, 19 Mar 2019 10:41:07 -0700 Subject: [PATCH 02/29] deepcopy the while conditions when entering and exiting --- sdk/python/kfp/dsl/_ops_group.py | 6 +++++- sdk/python/kfp/dsl/_pipeline_param.py | 12 ++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 86483370d2f..4a6d2c5f870 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -112,10 +112,14 @@ def __init__(self, condition): super(While, self).__init__('while') if not isinstance(condition, (ConditionOperator)): raise ValueError + import copy + self.condition_when_entering = copy.deepcopy(condition) self.condition = condition def __exit__(self, *args): - #TODO: while needs special handling of the condition because + # While needs special handling of the condition because # the pipelineparam during the exit contains the dynamic information # that are needed to resolve the condition. + import copy + self.condition_when_exiting = copy.deepcopy(self.condition) _pipeline.Pipeline.get_default_pipeline().pop_ops_group() \ No newline at end of file diff --git a/sdk/python/kfp/dsl/_pipeline_param.py b/sdk/python/kfp/dsl/_pipeline_param.py index 77295cd8732..a1ea7b62b0e 100644 --- a/sdk/python/kfp/dsl/_pipeline_param.py +++ b/sdk/python/kfp/dsl/_pipeline_param.py @@ -111,6 +111,18 @@ def __str__(self): return '{{pipelineparam:op=%s;name=%s;value=%s}}' % (op_name, self.name, value) else: return '{{pipelineparam:op=%s;name=%s;value=%s;type=%s;}}' % (op_name, self.name, value, self.param_type.serialize()) + + def assign(self, other): + """assign copies the content of another pipelineparam to this instance. + Args: + other (PipelineParam) + """ + if not isinstance(other, PipelineParam): + raise ValueError('assign is expecting a PipelineParam instance.') + self.op_name = other.op_name + self.name = other.op_name + self.value = other.value + self.param_type = other.param_type def __repr__(self): return str({self.__class__.__name__: self.__dict__}) From f309d85689b34b88f1d19ffa7949e318bed2e02b Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Tue, 19 Mar 2019 16:47:52 -0700 Subject: [PATCH 03/29] add while condition resolution in the compiler --- sdk/python/kfp/compiler/compiler.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 21a9401c9e7..bf758ee5470 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -160,6 +160,12 @@ def _get_condition_params_for_ops_helper(group, current_conditions_params): new_current_conditions_params.append(group.condition.operand1) if isinstance(group.condition.operand2, dsl.PipelineParam): new_current_conditions_params.append(group.condition.operand2) + elif group.type == 'while': + new_current_conditions_params = list(current_conditions_params) + if isinstance(group.condition_when_entering.operand1, dsl.PipelineParam): + new_current_conditions_params.append(group.condition_when_entering.operand1) + if isinstance(group.condition_when_entering.operand2, dsl.PipelineParam): + new_current_conditions_params.append(group.condition_when_entering.operand2) for op in group.ops: for param in new_current_conditions_params: conditions[op.name].add(param) @@ -167,6 +173,12 @@ def _get_condition_params_for_ops_helper(group, current_conditions_params): _get_condition_params_for_ops_helper(g, new_current_conditions_params) _get_condition_params_for_ops_helper(root_group, []) + # Sanitize the pipelineparams in the conditions. + for op in conditions: + for param in conditions[op]: + param.name = K8sHelper.sanitize_k8s_name(param.name) + if param.op_name: + param.op_name = K8sHelper.sanitize_k8s_name(param.op_name) return conditions def _get_dependencies(self, pipeline, root_group, op_groups): From 9afc4bb6e3860d24ab34b40faeed54c4d70e8a4d Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Tue, 19 Mar 2019 17:38:49 -0700 Subject: [PATCH 04/29] define graph component decorator --- sdk/python/kfp/dsl/_component.py | 35 ++++++++++++++++- sdk/python/kfp/dsl/_ops_group.py | 54 ++++++++++++++++----------- sdk/python/kfp/dsl/_pipeline_param.py | 22 +++++------ 3 files changed, 77 insertions(+), 34 deletions(-) diff --git a/sdk/python/kfp/dsl/_component.py b/sdk/python/kfp/dsl/_component.py index ef5e1fce0ff..be4146cba06 100644 --- a/sdk/python/kfp/dsl/_component.py +++ b/sdk/python/kfp/dsl/_component.py @@ -15,6 +15,8 @@ from ._metadata import ComponentMeta, ParameterMeta, TypeMeta, _annotation_to_typemeta from ._pipeline_param import PipelineParam from ._types import check_types, InconsistentTypeException +from ._ops_group import Graph +from . import _pipeline import kfp def python_component(name, description=None, base_image=None, target_component_file: str = None): @@ -54,7 +56,7 @@ def _python_component(func): return _python_component def component(func): - """Decorator for component functions that use ContainerOp. + """Decorator for component functions that returns a ContainerOp. This is useful to enable type checking in the DSL compiler Usage: @@ -118,3 +120,34 @@ def _component(*args, **kargs): return container_op return _component + +def graph_component(func): + """Decorator for graph component functions. + This decorator returns an ops_group """ + #TODO: add usage + from functools import wraps + @wraps(func) + def _graph_component(*args, **kargs): + # Entering Graph Context + if not _pipeline.Pipeline.get_default_pipeline(): + raise ValueError('Default pipeline not defined.') + # If this is a recursive call + for ops_group in _pipeline.Pipeline.get_default_pipeline().groups: + if ops_group.name.endswith(func.__name__): + # Store the current input pipelineparam and return + ops_group.recursive_inputs = args + kargs.values() + return + graph_ops_group = Graph(func.__name__) + _pipeline.Pipeline.get_default_pipeline().push_ops_group(graph_ops_group) + + # Process + graph_ops_group.inputs = args + kargs.values() + #TODO: check if the inputs is a list of pipelineparams + graph_ops_group.outputs = func(*args, **kargs) + #TODO: check if the outputs is a dictionary of str to pipelineparams + + # Exiting Graph Context + _pipeline.Pipeline.get_default_pipeline().pop_ops_group() + + return graph_ops_group + return _graph_component \ No newline at end of file diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 4a6d2c5f870..619a6502af0 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -28,7 +28,7 @@ class OpsGroup(object): def __init__(self, group_type: str, name: str=None): """Create a new instance of OpsGroup. Args: - group_type: one of 'pipeline', 'exit_handler', 'condition', and 'loop'. + group_type: one of 'pipeline', 'exit_handler', 'condition', and 'graph-'. """ self.type = group_type self.ops = list() @@ -100,26 +100,36 @@ def __init__(self, condition): super(Condition, self).__init__('condition') self.condition = condition -class While(OpsGroup): - """While loop with conditions. +# class While(OpsGroup): +# """While loop with conditions. +# TODO: Example usage: +# """ +# def __init__(self, condition): +# """Create a while loop structure +# Args: +# condition (ConditionOperator): the condition. +# """ +# super(While, self).__init__('while') +# if not isinstance(condition, (ConditionOperator)): +# raise ValueError +# import copy +# self.condition_when_entering = copy.deepcopy(condition) +# self.condition = condition +# +# def __exit__(self, *args): +# # While needs special handling of the condition because +# # the pipelineparam during the exit contains the dynamic information +# # that are needed to resolve the condition. +# import copy +# self.condition_when_exiting = copy.deepcopy(self.condition) +# _pipeline.Pipeline.get_default_pipeline().pop_ops_group() + +class Graph(OpsGroup): + """Graph DAG with inputs, recursive_inputs, and outputs. TODO: Example usage: """ - def __init__(self, condition): - """Create a while loop structure - Args: - condition (ConditionOperator): the condition. - """ - super(While, self).__init__('while') - if not isinstance(condition, (ConditionOperator)): - raise ValueError - import copy - self.condition_when_entering = copy.deepcopy(condition) - self.condition = condition - - def __exit__(self, *args): - # While needs special handling of the condition because - # the pipelineparam during the exit contains the dynamic information - # that are needed to resolve the condition. - import copy - self.condition_when_exiting = copy.deepcopy(self.condition) - _pipeline.Pipeline.get_default_pipeline().pop_ops_group() \ No newline at end of file + def __init__(self, name): + super(Graph, self).__init__('graph-' + name) + self.inputs = [] + self.recursive_inputs = [] + self.outputs = {} \ No newline at end of file diff --git a/sdk/python/kfp/dsl/_pipeline_param.py b/sdk/python/kfp/dsl/_pipeline_param.py index a1ea7b62b0e..5a76da1f7c3 100644 --- a/sdk/python/kfp/dsl/_pipeline_param.py +++ b/sdk/python/kfp/dsl/_pipeline_param.py @@ -112,17 +112,17 @@ def __str__(self): else: return '{{pipelineparam:op=%s;name=%s;value=%s;type=%s;}}' % (op_name, self.name, value, self.param_type.serialize()) - def assign(self, other): - """assign copies the content of another pipelineparam to this instance. - Args: - other (PipelineParam) - """ - if not isinstance(other, PipelineParam): - raise ValueError('assign is expecting a PipelineParam instance.') - self.op_name = other.op_name - self.name = other.op_name - self.value = other.value - self.param_type = other.param_type + # def assign(self, other): + # """assign copies the content of another pipelineparam to this instance. + # Args: + # other (PipelineParam) + # """ + # if not isinstance(other, PipelineParam): + # raise ValueError('assign is expecting a PipelineParam instance.') + # self.op_name = other.op_name + # self.name = other.op_name + # self.value = other.value + # self.param_type = other.param_type def __repr__(self): return str({self.__class__.__name__: self.__dict__}) From c61022fe247c13815b5275a5de365b247f52940c Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Tue, 19 Mar 2019 18:37:28 -0700 Subject: [PATCH 05/29] remove while loop related codes --- sdk/python/kfp/dsl/_ops_group.py | 25 +------------------------ sdk/python/kfp/dsl/_pipeline_param.py | 12 ------------ 2 files changed, 1 insertion(+), 36 deletions(-) diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 619a6502af0..7c9e99b41e1 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -100,32 +100,9 @@ def __init__(self, condition): super(Condition, self).__init__('condition') self.condition = condition -# class While(OpsGroup): -# """While loop with conditions. -# TODO: Example usage: -# """ -# def __init__(self, condition): -# """Create a while loop structure -# Args: -# condition (ConditionOperator): the condition. -# """ -# super(While, self).__init__('while') -# if not isinstance(condition, (ConditionOperator)): -# raise ValueError -# import copy -# self.condition_when_entering = copy.deepcopy(condition) -# self.condition = condition -# -# def __exit__(self, *args): -# # While needs special handling of the condition because -# # the pipelineparam during the exit contains the dynamic information -# # that are needed to resolve the condition. -# import copy -# self.condition_when_exiting = copy.deepcopy(self.condition) -# _pipeline.Pipeline.get_default_pipeline().pop_ops_group() - class Graph(OpsGroup): """Graph DAG with inputs, recursive_inputs, and outputs. + This is not used directly by the users but auto generated when the graph_component decoration exists TODO: Example usage: """ def __init__(self, name): diff --git a/sdk/python/kfp/dsl/_pipeline_param.py b/sdk/python/kfp/dsl/_pipeline_param.py index 5a76da1f7c3..77295cd8732 100644 --- a/sdk/python/kfp/dsl/_pipeline_param.py +++ b/sdk/python/kfp/dsl/_pipeline_param.py @@ -111,18 +111,6 @@ def __str__(self): return '{{pipelineparam:op=%s;name=%s;value=%s}}' % (op_name, self.name, value) else: return '{{pipelineparam:op=%s;name=%s;value=%s;type=%s;}}' % (op_name, self.name, value, self.param_type.serialize()) - - # def assign(self, other): - # """assign copies the content of another pipelineparam to this instance. - # Args: - # other (PipelineParam) - # """ - # if not isinstance(other, PipelineParam): - # raise ValueError('assign is expecting a PipelineParam instance.') - # self.op_name = other.op_name - # self.name = other.op_name - # self.value = other.value - # self.param_type = other.param_type def __repr__(self): return str({self.__class__.__name__: self.__dict__}) From 054a988af329ac30c66eb75a0974f924f12f1848 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Tue, 19 Mar 2019 18:44:09 -0700 Subject: [PATCH 06/29] fixes --- sdk/python/kfp/dsl/_component.py | 2 +- sdk/python/kfp/dsl/_ops_group.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/kfp/dsl/_component.py b/sdk/python/kfp/dsl/_component.py index be4146cba06..d2c6b6933fc 100644 --- a/sdk/python/kfp/dsl/_component.py +++ b/sdk/python/kfp/dsl/_component.py @@ -133,7 +133,7 @@ def _graph_component(*args, **kargs): raise ValueError('Default pipeline not defined.') # If this is a recursive call for ops_group in _pipeline.Pipeline.get_default_pipeline().groups: - if ops_group.name.endswith(func.__name__): + if ops_group.type == 'graph' and ops_group.name == func.__name__: # Store the current input pipelineparam and return ops_group.recursive_inputs = args + kargs.values() return diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 7c9e99b41e1..9140be3e2e8 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -28,7 +28,7 @@ class OpsGroup(object): def __init__(self, group_type: str, name: str=None): """Create a new instance of OpsGroup. Args: - group_type: one of 'pipeline', 'exit_handler', 'condition', and 'graph-'. + group_type: one of 'pipeline', 'exit_handler', 'condition', and 'graph'. """ self.type = group_type self.ops = list() @@ -106,7 +106,7 @@ class Graph(OpsGroup): TODO: Example usage: """ def __init__(self, name): - super(Graph, self).__init__('graph-' + name) + super(Graph, self).__init__(group_type='graph', name=name) self.inputs = [] self.recursive_inputs = [] self.outputs = {} \ No newline at end of file From 0bec6b4551722f39ca6eb88b8ea0417dbe79d25e Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Tue, 19 Mar 2019 18:46:41 -0700 Subject: [PATCH 07/29] remove while loop related code --- sdk/python/kfp/compiler/compiler.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index bf758ee5470..21a9401c9e7 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -160,12 +160,6 @@ def _get_condition_params_for_ops_helper(group, current_conditions_params): new_current_conditions_params.append(group.condition.operand1) if isinstance(group.condition.operand2, dsl.PipelineParam): new_current_conditions_params.append(group.condition.operand2) - elif group.type == 'while': - new_current_conditions_params = list(current_conditions_params) - if isinstance(group.condition_when_entering.operand1, dsl.PipelineParam): - new_current_conditions_params.append(group.condition_when_entering.operand1) - if isinstance(group.condition_when_entering.operand2, dsl.PipelineParam): - new_current_conditions_params.append(group.condition_when_entering.operand2) for op in group.ops: for param in new_current_conditions_params: conditions[op.name].add(param) @@ -173,12 +167,6 @@ def _get_condition_params_for_ops_helper(group, current_conditions_params): _get_condition_params_for_ops_helper(g, new_current_conditions_params) _get_condition_params_for_ops_helper(root_group, []) - # Sanitize the pipelineparams in the conditions. - for op in conditions: - for param in conditions[op]: - param.name = K8sHelper.sanitize_k8s_name(param.name) - if param.op_name: - param.op_name = K8sHelper.sanitize_k8s_name(param.op_name) return conditions def _get_dependencies(self, pipeline, root_group, op_groups): From 56d71da1fb04c0c149ee263e22d09f27eb1e2c54 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Wed, 20 Mar 2019 09:59:27 -0700 Subject: [PATCH 08/29] fix bugs --- sdk/python/kfp/dsl/_component.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/kfp/dsl/_component.py b/sdk/python/kfp/dsl/_component.py index d2c6b6933fc..0853fcc87fa 100644 --- a/sdk/python/kfp/dsl/_component.py +++ b/sdk/python/kfp/dsl/_component.py @@ -135,13 +135,13 @@ def _graph_component(*args, **kargs): for ops_group in _pipeline.Pipeline.get_default_pipeline().groups: if ops_group.type == 'graph' and ops_group.name == func.__name__: # Store the current input pipelineparam and return - ops_group.recursive_inputs = args + kargs.values() + ops_group.recursive_inputs = list(args) + list(kargs.values()) return graph_ops_group = Graph(func.__name__) _pipeline.Pipeline.get_default_pipeline().push_ops_group(graph_ops_group) # Process - graph_ops_group.inputs = args + kargs.values() + graph_ops_group.inputs = list(args) + list(kargs.values()) #TODO: check if the inputs is a list of pipelineparams graph_ops_group.outputs = func(*args, **kargs) #TODO: check if the outputs is a dictionary of str to pipelineparams From 5215206e240f404beb0b68e806d287c5cb9f615f Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Wed, 20 Mar 2019 11:18:27 -0700 Subject: [PATCH 09/29] generate a unique ops group name and being able to retrieve by name --- sdk/python/kfp/dsl/_component.py | 16 +++++++--------- sdk/python/kfp/dsl/_ops_group.py | 31 ++++++++++++++++++++++++++----- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/sdk/python/kfp/dsl/_component.py b/sdk/python/kfp/dsl/_component.py index 0853fcc87fa..f54e9523a4f 100644 --- a/sdk/python/kfp/dsl/_component.py +++ b/sdk/python/kfp/dsl/_component.py @@ -15,7 +15,7 @@ from ._metadata import ComponentMeta, ParameterMeta, TypeMeta, _annotation_to_typemeta from ._pipeline_param import PipelineParam from ._types import check_types, InconsistentTypeException -from ._ops_group import Graph +from ._ops_group import OpsGroup, Graph from . import _pipeline import kfp @@ -129,15 +129,13 @@ def graph_component(func): @wraps(func) def _graph_component(*args, **kargs): # Entering Graph Context - if not _pipeline.Pipeline.get_default_pipeline(): - raise ValueError('Default pipeline not defined.') - # If this is a recursive call - for ops_group in _pipeline.Pipeline.get_default_pipeline().groups: - if ops_group.type == 'graph' and ops_group.name == func.__name__: - # Store the current input pipelineparam and return - ops_group.recursive_inputs = list(args) + list(kargs.values()) - return + graph_ops_group = OpsGroup._get_opsgroup_pipeline('graph', func.__name__) + # If the ops group already exists, record the inputs and exit. + if graph_ops_group is not None: + graph_ops_group.recursive_inputs = list(args) + list(kargs.values()) + return graph_ops_group = Graph(func.__name__) + graph_ops_group._make_name_unique() _pipeline.Pipeline.get_default_pipeline().push_ops_group(graph_ops_group) # Process diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 9140be3e2e8..6ffadaa4da5 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -35,22 +35,43 @@ def __init__(self, group_type: str, name: str=None): self.groups = list() self.name = name - def __enter__(self): + @staticmethod + def _get_opsgroup_pipeline(group_type, name): + """retrieves the opsgroup when the pipeline already contains it. + the opsgroup might be already in the pipeline in case of recursive calls. + Args: + group_type (str): one of 'pipeline', 'exit_handler', 'condition', and 'graph'. + name (str): the name before conversion. """ + if not _pipeline.Pipeline.get_default_pipeline(): + raise ValueError('Default pipeline not defined.') + if name is None: + raise ValueError('name must be a string.') + name_prefix = (group_type + '-' + name + '-').replace('_', '-') + for ops_group in _pipeline.Pipeline.get_default_pipeline().groups: + if ops_group.type == group_type and ops_group.name.startswith(name_prefix): + return ops_group + return None + + def _make_name_unique(self): + """Generate a unique opsgroup name in the pipeline""" if not _pipeline.Pipeline.get_default_pipeline(): raise ValueError('Default pipeline not defined.') - if not self.name: - self.name = (self.type + '-' + - str(_pipeline.Pipeline.get_default_pipeline().get_next_group_id())) + self.name = (self.type + '-' + ('' if self.name is None else self.name + '-') + + str(_pipeline.Pipeline.get_default_pipeline().get_next_group_id())) self.name = self.name.replace('_', '-') + def __enter__(self): + if not _pipeline.Pipeline.get_default_pipeline(): + raise ValueError('Default pipeline not defined.') + + self._make_name_unique() _pipeline.Pipeline.get_default_pipeline().push_ops_group(self) return self def __exit__(self, *args): _pipeline.Pipeline.get_default_pipeline().pop_ops_group() - class ExitHandler(OpsGroup): """Represents an exit handler that is invoked upon exiting a group of ops. From 98d6ba6ebb53d8fbc95fb3bee436b17c9308521d Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Wed, 20 Mar 2019 14:33:15 -0700 Subject: [PATCH 10/29] resolve the opsgroups inputs and dependencies based on the pipelineparam in the condition --- sdk/python/kfp/compiler/compiler.py | 47 +++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index f5155194837..7be9a57eef5 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -113,8 +113,10 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups): inputs = defaultdict(set) outputs = defaultdict(set) for op in pipeline.ops.values(): + #print('op name: '+ op.name) # op's inputs and all params used in conditions for that op are both considered. for param in op.inputs + list(condition_params[op.name]): + #print('\tparam: ' + str(param)) # if the value is already provided (immediate value), then no need to expose # it as input for its parent groups. if param.value: @@ -125,11 +127,16 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups): upstream_op = pipeline.ops[param.op_name] upstream_groups, downstream_groups = self._get_uncommon_ancestors( op_groups, upstream_op, op) + #print('\t\tupstream:') + #print(upstream_groups) + #print('\t\tdownstream:') + #print(downstream_groups) for i, g in enumerate(downstream_groups): if i == 0: # If it is the first uncommon downstream group, then the input comes from # the first uncommon upstream group. inputs[g].add((full_name, upstream_groups[0])) + #print('\t\t\tadding ' + full_name + ' to ' + str(g) + ' from op name: ' + str(upstream_groups[0])) else: # If not the first downstream group, then the input is passed down from # its ancestor groups so the upstream group is None. @@ -145,6 +152,24 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups): if not op.is_exit_handler: for g in op_groups[op.name]: inputs[g].add((full_name, None)) + + # Generate the input/output for ops groups + def _get_inputs_outputs_opsgroup(group, inputs): + if group.type == 'condition': + #print('group: ' + group.name) + if isinstance(group.condition.operand1, dsl.PipelineParam): + full_name = self._pipelineparam_full_name(group.condition.operand1) + inputs[group.name].add((full_name, group.condition.operand1.op_name)) + #print('\tadd ' + full_name + " from op name: "+ group.condition.operand1.op_name) + if isinstance(group.condition.operand2, dsl.PipelineParam): + full_name = self._pipelineparam_full_name(group.condition.operand2) + inputs[group.name].add((full_name, group.condition.operand2.op_name)) + #print('\tadd ' + full_name) + for subgroup in group.groups: + _get_inputs_outputs_opsgroup(subgroup, inputs) + + _get_inputs_outputs_opsgroup(root_group, inputs) + return inputs, outputs def _get_condition_params_for_ops(self, root_group): @@ -193,6 +218,22 @@ def _get_dependencies(self, pipeline, root_group, op_groups): upstream_groups, downstream_groups = self._get_uncommon_ancestors( op_groups, upstream_op, op) dependencies[downstream_groups[0]].add(upstream_groups[0]) + + # Generate dependencies for ops groups + def _get_dependency_opsgroup(group, dependencies): + if group.type == 'condition': + #print('group: ' + group.name) + if isinstance(group.condition.operand1, dsl.PipelineParam): + dependencies[group.name].add((group.condition.operand1.op_name)) + #print('\tadd dependency on op name: '+ group.condition.operand1.op_name) + if isinstance(group.condition.operand2, dsl.PipelineParam): + dependencies[group.name].add((group.condition.operand2.op_name)) + #print('\tadd dependency on op name: '+ group.condition.operand2.op_name) + for subgroup in group.groups: + _get_dependency_opsgroup(subgroup, dependencies) + + _get_dependency_opsgroup(root_group, dependencies) + return dependencies def _resolve_value_or_reference(self, value_or_reference, potential_references): @@ -340,6 +381,7 @@ def _group_to_template(self, group, inputs, outputs, dependencies): """ template = {'name': group.name} + #print('group_name: ' + group.name) # Generate inputs section. if inputs.get(group.name, None): template_inputs = [{'name': x[0]} for x in inputs[group.name]] @@ -347,6 +389,8 @@ def _group_to_template(self, group, inputs, outputs, dependencies): template['inputs'] = { 'parameters': template_inputs } + # for input in inputs[group.name]: + # print(input[0]) # Generate outputs section. if outputs.get(group.name, None): @@ -370,10 +414,13 @@ def _group_to_template(self, group, inputs, outputs, dependencies): } if isinstance(sub_group, dsl.OpsGroup) and sub_group.type == 'condition': + #print('\tresolving condition for ' + sub_group.name) subgroup_inputs = inputs.get(sub_group.name, []) condition = sub_group.condition operand1_value = self._resolve_value_or_reference(condition.operand1, subgroup_inputs) operand2_value = self._resolve_value_or_reference(condition.operand2, subgroup_inputs) + #print('\t\t' + operand1_value) + #print('\t\t' + operand2_value) task['when'] = '{} {} {}'.format(operand1_value, condition.operator, operand2_value) # Generate dependencies section for this task. From b9bf07a5a2af202a3707ef23814b75978180bbab Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Wed, 20 Mar 2019 18:11:27 -0700 Subject: [PATCH 11/29] add a recursive ops_groups --- sdk/python/kfp/compiler/compiler.py | 97 +++++++++++++++++------------ sdk/python/kfp/dsl/_component.py | 23 ++++--- sdk/python/kfp/dsl/_ops_group.py | 21 ++++++- 3 files changed, 87 insertions(+), 54 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 7be9a57eef5..481bbd68eab 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -65,6 +65,10 @@ def _get_groups_for_ops(self, root_group): def _get_op_groups_helper(current_groups, ops_to_groups): root_group = current_groups[-1] for g in root_group.groups: + # Add recursive opsgroup in the ops_to_groups + if g.is_recursive: + ops_to_groups[g.name] = [x.name for x in current_groups] + [g.name] + continue current_groups.append(g) _get_op_groups_helper(current_groups, ops_to_groups) del current_groups[-1] @@ -82,7 +86,8 @@ def _get_groups(self, root_group): def _get_groups_helper(group): groups = [group] for g in group.groups: - groups += _get_groups_helper(g) + if not g.is_recursive: + groups += _get_groups_helper(g) return groups return _get_groups_helper(root_group) @@ -113,10 +118,8 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups): inputs = defaultdict(set) outputs = defaultdict(set) for op in pipeline.ops.values(): - #print('op name: '+ op.name) # op's inputs and all params used in conditions for that op are both considered. for param in op.inputs + list(condition_params[op.name]): - #print('\tparam: ' + str(param)) # if the value is already provided (immediate value), then no need to expose # it as input for its parent groups. if param.value: @@ -127,16 +130,11 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups): upstream_op = pipeline.ops[param.op_name] upstream_groups, downstream_groups = self._get_uncommon_ancestors( op_groups, upstream_op, op) - #print('\t\tupstream:') - #print(upstream_groups) - #print('\t\tdownstream:') - #print(downstream_groups) for i, g in enumerate(downstream_groups): if i == 0: # If it is the first uncommon downstream group, then the input comes from # the first uncommon upstream group. inputs[g].add((full_name, upstream_groups[0])) - #print('\t\t\tadding ' + full_name + ' to ' + str(g) + ' from op name: ' + str(upstream_groups[0])) else: # If not the first downstream group, then the input is passed down from # its ancestor groups so the upstream group is None. @@ -153,23 +151,36 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups): for g in op_groups[op.name]: inputs[g].add((full_name, None)) - # Generate the input/output for ops groups - def _get_inputs_outputs_opsgroup(group, inputs): - if group.type == 'condition': - #print('group: ' + group.name) - if isinstance(group.condition.operand1, dsl.PipelineParam): - full_name = self._pipelineparam_full_name(group.condition.operand1) - inputs[group.name].add((full_name, group.condition.operand1.op_name)) - #print('\tadd ' + full_name + " from op name: "+ group.condition.operand1.op_name) - if isinstance(group.condition.operand2, dsl.PipelineParam): - full_name = self._pipelineparam_full_name(group.condition.operand2) - inputs[group.name].add((full_name, group.condition.operand2.op_name)) - #print('\tadd ' + full_name) + # Generate the input/output for recursive opsgroups + def _get_inputs_outputs_recursive_opsgroup(group): + #TODO: refactor the following codes with the above + if group.is_recursive: + for param in list(condition_params[group.name]): + if param.value: + continue + full_name = self._pipelineparam_full_name(param) + if param.op_name: + upstream_op = pipeline.ops[param.op_name] + upstream_groups, downstream_groups = self._get_uncommon_ancestors( + op_groups, upstream_op, group) + for i, g in enumerate(downstream_groups): + if i == 0: + inputs[g].add((full_name, upstream_groups[0])) + else: + inputs[g].add((full_name, None)) + for i, g in enumerate(upstream_groups): + if i == len(upstream_groups) - 1: + outputs[g].add((full_name, None)) + else: + outputs[g].add((full_name, upstream_groups[i+1])) + else: + if not op.is_exit_handler: + for g in op_groups[op.name]: + inputs[g].add((full_name, None)) for subgroup in group.groups: - _get_inputs_outputs_opsgroup(subgroup, inputs) - - _get_inputs_outputs_opsgroup(root_group, inputs) + _get_inputs_outputs_recursive_opsgroup(subgroup) + _get_inputs_outputs_recursive_opsgroup(root_group) return inputs, outputs def _get_condition_params_for_ops(self, root_group): @@ -189,8 +200,13 @@ def _get_condition_params_for_ops_helper(group, current_conditions_params): for param in new_current_conditions_params: conditions[op.name].add(param) for g in group.groups: - _get_condition_params_for_ops_helper(g, new_current_conditions_params) - + if not g.is_recursive: + _get_condition_params_for_ops_helper(g, new_current_conditions_params) + else: + # If the subgroup is a recursive opsgroup, propagate the pipelineparams + # in the condition, similar to the ops. + for param in new_current_conditions_params: + conditions[g.name].add(param) _get_condition_params_for_ops_helper(root_group, []) return conditions @@ -219,16 +235,23 @@ def _get_dependencies(self, pipeline, root_group, op_groups): op_groups, upstream_op, op) dependencies[downstream_groups[0]].add(upstream_groups[0]) - # Generate dependencies for ops groups + # Generate dependencies based on the recursive opsgroups + #TODO: refactor the following codes with the above def _get_dependency_opsgroup(group, dependencies): - if group.type == 'condition': - #print('group: ' + group.name) - if isinstance(group.condition.operand1, dsl.PipelineParam): - dependencies[group.name].add((group.condition.operand1.op_name)) - #print('\tadd dependency on op name: '+ group.condition.operand1.op_name) - if isinstance(group.condition.operand2, dsl.PipelineParam): - dependencies[group.name].add((group.condition.operand2.op_name)) - #print('\tadd dependency on op name: '+ group.condition.operand2.op_name) + if group.is_recursive: + unstream_op_names = set() + for param in group.inputs + list(condition_params[group.name]): + if param.op_name: + unstream_op_names.add(param.op_name) + unstream_op_names |= set(group.dependencies) + + for op_name in unstream_op_names: + upstream_op = pipeline.ops[op_name] + upstream_groups, downstream_groups = self._get_uncommon_ancestors( + op_groups, upstream_op, group) + dependencies[downstream_groups[0]].add(upstream_groups[0]) + + for subgroup in group.groups: _get_dependency_opsgroup(subgroup, dependencies) @@ -381,7 +404,6 @@ def _group_to_template(self, group, inputs, outputs, dependencies): """ template = {'name': group.name} - #print('group_name: ' + group.name) # Generate inputs section. if inputs.get(group.name, None): template_inputs = [{'name': x[0]} for x in inputs[group.name]] @@ -389,8 +411,6 @@ def _group_to_template(self, group, inputs, outputs, dependencies): template['inputs'] = { 'parameters': template_inputs } - # for input in inputs[group.name]: - # print(input[0]) # Generate outputs section. if outputs.get(group.name, None): @@ -414,13 +434,10 @@ def _group_to_template(self, group, inputs, outputs, dependencies): } if isinstance(sub_group, dsl.OpsGroup) and sub_group.type == 'condition': - #print('\tresolving condition for ' + sub_group.name) subgroup_inputs = inputs.get(sub_group.name, []) condition = sub_group.condition operand1_value = self._resolve_value_or_reference(condition.operand1, subgroup_inputs) operand2_value = self._resolve_value_or_reference(condition.operand2, subgroup_inputs) - #print('\t\t' + operand1_value) - #print('\t\t' + operand2_value) task['when'] = '{} {} {}'.format(operand1_value, condition.operator, operand2_value) # Generate dependencies section for this task. diff --git a/sdk/python/kfp/dsl/_component.py b/sdk/python/kfp/dsl/_component.py index f54e9523a4f..d15839e1ce4 100644 --- a/sdk/python/kfp/dsl/_component.py +++ b/sdk/python/kfp/dsl/_component.py @@ -128,21 +128,20 @@ def graph_component(func): from functools import wraps @wraps(func) def _graph_component(*args, **kargs): - # Entering Graph Context - graph_ops_group = OpsGroup._get_opsgroup_pipeline('graph', func.__name__) - # If the ops group already exists, record the inputs and exit. - if graph_ops_group is not None: - graph_ops_group.recursive_inputs = list(args) + list(kargs.values()) - return graph_ops_group = Graph(func.__name__) - graph_ops_group._make_name_unique() - _pipeline.Pipeline.get_default_pipeline().push_ops_group(graph_ops_group) - - # Process graph_ops_group.inputs = list(args) + list(kargs.values()) #TODO: check if the inputs is a list of pipelineparams - graph_ops_group.outputs = func(*args, **kargs) - #TODO: check if the outputs is a dictionary of str to pipelineparams + + graph_ops_group.resolve_recursion() + + # Entering Graph Context + _pipeline.Pipeline.get_default_pipeline().push_ops_group(graph_ops_group) + + # Call the function + if not graph_ops_group.is_recursive: + graph_ops_group._make_name_unique() + graph_ops_group.outputs = func(*args, **kargs) + #TODO: check if the outputs is a dictionary of str to pipelineparams # Exiting Graph Context _pipeline.Pipeline.get_default_pipeline().pop_ops_group() diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 6ffadaa4da5..428ec1b2185 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -34,6 +34,18 @@ def __init__(self, group_type: str, name: str=None): self.ops = list() self.groups = list() self.name = name + # The following two fields are references to another OpsGroup that is already created. + self.is_recursive = False + self.recursive_ref = None + + def resolve_recursion(self): + """resolve_recursion resolves the recursion scenario where the OpsGroup is already created.""" + old_opsgroup = self._get_opsgroup_pipeline(self.type, self.name) + if old_opsgroup is None: + return + self.is_recursive = True + self.recursive_ref = old_opsgroup + @staticmethod def _get_opsgroup_pipeline(group_type, name): @@ -129,5 +141,10 @@ class Graph(OpsGroup): def __init__(self, name): super(Graph, self).__init__(group_type='graph', name=name) self.inputs = [] - self.recursive_inputs = [] - self.outputs = {} \ No newline at end of file + self.outputs = {} + self.dependencies = [] + + def after(self, dependency): + """Specify explicit dependency on another op.""" + self.dependencies.append(dependency) + return self \ No newline at end of file From 4107e36635315bf8c3623985d84d7d6365436926 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Thu, 21 Mar 2019 09:24:42 -0700 Subject: [PATCH 12/29] fix bugs of the recursive opsgroup template name --- sdk/python/kfp/compiler/compiler.py | 6 ++++++ sdk/python/kfp/dsl/_ops_group.py | 1 + 2 files changed, 7 insertions(+) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 481bbd68eab..d456b51dca4 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -23,6 +23,7 @@ from ._k8s_helper import K8sHelper from ..dsl._pipeline_param import _match_serialized_pipelineparam from ..dsl._metadata import TypeMeta +from ..dsl._ops_group import OpsGroup class Compiler(object): """DSL Compiler. @@ -432,6 +433,11 @@ def _group_to_template(self, group, inputs, outputs, dependencies): 'name': sub_group.name, 'template': sub_group.name, } + if isinstance(sub_group, OpsGroup) and sub_group.is_recursive: + task = { + 'name': sub_group.recursive_ref.name, + 'template': sub_group.recursive_ref.name, + } if isinstance(sub_group, dsl.OpsGroup) and sub_group.type == 'condition': subgroup_inputs = inputs.get(sub_group.name, []) diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 428ec1b2185..93bfdd4e751 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -30,6 +30,7 @@ def __init__(self, group_type: str, name: str=None): Args: group_type: one of 'pipeline', 'exit_handler', 'condition', and 'graph'. """ + #TODO: declare the group_type to be strongly typed self.type = group_type self.ops = list() self.groups = list() From a4ecc9fc1a60a37805ea411bd28235d22437a8a4 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Thu, 21 Mar 2019 10:35:27 -0700 Subject: [PATCH 13/29] resolve the recursive template name and arguments --- sdk/python/kfp/compiler/compiler.py | 40 +++++++++++++++++++---------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index d456b51dca4..953384d346d 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -156,7 +156,7 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups): def _get_inputs_outputs_recursive_opsgroup(group): #TODO: refactor the following codes with the above if group.is_recursive: - for param in list(condition_params[group.name]): + for param in group.inputs + list(condition_params[group.name]): if param.value: continue full_name = self._pipelineparam_full_name(param) @@ -201,13 +201,13 @@ def _get_condition_params_for_ops_helper(group, current_conditions_params): for param in new_current_conditions_params: conditions[op.name].add(param) for g in group.groups: - if not g.is_recursive: - _get_condition_params_for_ops_helper(g, new_current_conditions_params) - else: + if g.is_recursive: # If the subgroup is a recursive opsgroup, propagate the pipelineparams # in the condition, similar to the ops. for param in new_current_conditions_params: conditions[g.name].add(param) + else: + _get_condition_params_for_ops_helper(g, new_current_conditions_params) _get_condition_params_for_ops_helper(root_group, []) return conditions @@ -429,16 +429,17 @@ def _group_to_template(self, group, inputs, outputs, dependencies): # Generate tasks section. tasks = [] for sub_group in group.groups + group.ops: - task = { - 'name': sub_group.name, - 'template': sub_group.name, - } - if isinstance(sub_group, OpsGroup) and sub_group.is_recursive: + is_recursive_subgroup = (isinstance(sub_group, OpsGroup) and sub_group.is_recursive) + if is_recursive_subgroup: task = { 'name': sub_group.recursive_ref.name, 'template': sub_group.recursive_ref.name, } - + else: + task = { + 'name': sub_group.name, + 'template': sub_group.name, + } if isinstance(sub_group, dsl.OpsGroup) and sub_group.type == 'condition': subgroup_inputs = inputs.get(sub_group.name, []) condition = sub_group.condition @@ -464,10 +465,21 @@ def _group_to_template(self, group, inputs, outputs, dependencies): }) else: # The value comes from its parent. - arguments.append({ - 'name': param_name, - 'value': '{{inputs.parameters.%s}}' % param_name - }) + if is_recursive_subgroup: + for index, input in enumerate(sub_group.inputs): + if param_name == input.name: + break + referenced_input = sub_group.recursive_ref.inputs[index-1] + full_name = self._pipelineparam_full_name(referenced_input) + arguments.append({ + 'name': full_name, + 'value': '{{inputs.parameters.%s}}' % param_name + }) + else: + arguments.append({ + 'name': param_name, + 'value': '{{inputs.parameters.%s}}' % param_name + }) arguments.sort(key=lambda x: x['name']) task['arguments'] = {'parameters': arguments} tasks.append(task) From b76c803d0acf95baa7b1367cf8ad989b2b712cea Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Thu, 21 Mar 2019 16:48:31 -0700 Subject: [PATCH 14/29] add validity checks --- sdk/python/kfp/dsl/_component.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sdk/python/kfp/dsl/_component.py b/sdk/python/kfp/dsl/_component.py index d15839e1ce4..5608033eac9 100644 --- a/sdk/python/kfp/dsl/_component.py +++ b/sdk/python/kfp/dsl/_component.py @@ -130,7 +130,9 @@ def graph_component(func): def _graph_component(*args, **kargs): graph_ops_group = Graph(func.__name__) graph_ops_group.inputs = list(args) + list(kargs.values()) - #TODO: check if the inputs is a list of pipelineparams + for input in graph_ops_group.inputs: + if not isinstance(input, PipelineParam): + raise ValueError('arguments to ' + func.__name__ + ' should be PipelineParams.') graph_ops_group.resolve_recursion() @@ -141,7 +143,11 @@ def _graph_component(*args, **kargs): if not graph_ops_group.is_recursive: graph_ops_group._make_name_unique() graph_ops_group.outputs = func(*args, **kargs) - #TODO: check if the outputs is a dictionary of str to pipelineparams + if not isinstance(graph_ops_group.outputs, dict): + raise ValueError(func.__name__ + ' needs to return a dictionary of string to PipelineParam.') + for output in graph_ops_group.outputs: + if not (isinstance(output, str) and isinstance(graph_ops_group.outputs[output], PipelineParam)): + raise ValueError(func.__name__ + ' needs to return a dictionary of string to PipelineParam.') # Exiting Graph Context _pipeline.Pipeline.get_default_pipeline().pop_ops_group() From 45d7b90c91b5e1d7e5fa2f3acd869281d8fb65f7 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Thu, 21 Mar 2019 17:11:39 -0700 Subject: [PATCH 15/29] add more comments --- sdk/python/kfp/compiler/compiler.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 953384d346d..85565eb093e 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -67,6 +67,7 @@ def _get_op_groups_helper(current_groups, ops_to_groups): root_group = current_groups[-1] for g in root_group.groups: # Add recursive opsgroup in the ops_to_groups + # such that the i/o dependency can be propagated to the ancester opsgroups if g.is_recursive: ops_to_groups[g.name] = [x.name for x in current_groups] + [g.name] continue @@ -87,6 +88,8 @@ def _get_groups(self, root_group): def _get_groups_helper(group): groups = [group] for g in group.groups: + # Skip the recursive opsgroup because no templates + # need to be generated for the recursive opsgroups. if not g.is_recursive: groups += _get_groups_helper(g) return groups @@ -153,6 +156,7 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups): inputs[g].add((full_name, None)) # Generate the input/output for recursive opsgroups + # It propagates the recursive opsgroups IO to their ancester opsgroups def _get_inputs_outputs_recursive_opsgroup(group): #TODO: refactor the following codes with the above if group.is_recursive: @@ -201,9 +205,9 @@ def _get_condition_params_for_ops_helper(group, current_conditions_params): for param in new_current_conditions_params: conditions[op.name].add(param) for g in group.groups: + # If the subgroup is a recursive opsgroup, propagate the pipelineparams + # in the condition expression, similar to the ops. if g.is_recursive: - # If the subgroup is a recursive opsgroup, propagate the pipelineparams - # in the condition, similar to the ops. for param in new_current_conditions_params: conditions[g.name].add(param) else: @@ -221,6 +225,8 @@ def _get_dependencies(self, pipeline, root_group, op_groups): then G3 is dependent on G2. Basically dependency only exists in the first uncommon ancesters in their ancesters chain. Only sibling groups/ops can have dependencies. """ + #TODO: move the condition_params out because both the _get_inputs_outputs + # and _get_dependencies depend on it. condition_params = self._get_condition_params_for_ops(root_group) dependencies = defaultdict(set) for op in pipeline.ops.values(): @@ -430,6 +436,7 @@ def _group_to_template(self, group, inputs, outputs, dependencies): tasks = [] for sub_group in group.groups + group.ops: is_recursive_subgroup = (isinstance(sub_group, OpsGroup) and sub_group.is_recursive) + # Special handling for recursive subgroup: use the existing opsgroup name if is_recursive_subgroup: task = { 'name': sub_group.recursive_ref.name, @@ -465,6 +472,7 @@ def _group_to_template(self, group, inputs, outputs, dependencies): }) else: # The value comes from its parent. + # Special handling for recursive subgroup: argument name comes from the existing opsgroup if is_recursive_subgroup: for index, input in enumerate(sub_group.inputs): if param_name == input.name: @@ -492,6 +500,15 @@ def _create_templates(self, pipeline): new_root_group = pipeline.groups[0] + # Generate core data structures to prepare for argo yaml generation + # op_groups: op name -> list of ancestor groups including the current op + # inputs, outputs: group/op names -> list of tuples (param_name, producing_op_name) + # dependencies: group/op name -> list of dependent groups/ops. + # groups: opsgroups + # Special Handling for the recursive opsgroup + # op_groups also contains the recursive opsgroups + # condition_params from _get_condition_params_for_ops also contains the recursive opsgroups + # groups does not include the recursive opsgroups op_groups = self._get_groups_for_ops(new_root_group) inputs, outputs = self._get_inputs_outputs(pipeline, new_root_group, op_groups) dependencies = self._get_dependencies(pipeline, new_root_group, op_groups) From 6e94785c2d68e7865cb36e76497775595fe146be Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Thu, 21 Mar 2019 17:14:15 -0700 Subject: [PATCH 16/29] add usage comment in graph_component --- sdk/python/kfp/dsl/_component.py | 14 ++++++++++++-- sdk/python/kfp/dsl/_ops_group.py | 1 - 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sdk/python/kfp/dsl/_component.py b/sdk/python/kfp/dsl/_component.py index 5608033eac9..18fad5a0148 100644 --- a/sdk/python/kfp/dsl/_component.py +++ b/sdk/python/kfp/dsl/_component.py @@ -123,8 +123,18 @@ def _component(*args, **kargs): def graph_component(func): """Decorator for graph component functions. - This decorator returns an ops_group """ - #TODO: add usage + This decorator returns an ops_group. + + Usage: + ```python + @dsl._component.graph_component + def flip_component(flip_result): + print_flip = PrintOp(flip_result) + flipA = FlipCoinOp().after(print_flip) + with dsl.Condition(flipA.output == 'heads'): + flip_component(flipA.output) + return {'flip_result': flipA.output} + """ from functools import wraps @wraps(func) def _graph_component(*args, **kargs): diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 93bfdd4e751..06a0cd3002e 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -137,7 +137,6 @@ def __init__(self, condition): class Graph(OpsGroup): """Graph DAG with inputs, recursive_inputs, and outputs. This is not used directly by the users but auto generated when the graph_component decoration exists - TODO: Example usage: """ def __init__(self, name): super(Graph, self).__init__(group_type='graph', name=name) From 7bcf800c762e7d08698976a6741ee22c24354d37 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Thu, 21 Mar 2019 17:28:06 -0700 Subject: [PATCH 17/29] add a sample --- samples/basic/recursion.py | 59 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 samples/basic/recursion.py diff --git a/samples/basic/recursion.py b/samples/basic/recursion.py new file mode 100644 index 00000000000..7c091c21a0e --- /dev/null +++ b/samples/basic/recursion.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp.dsl as dsl + +class FlipCoinOp(dsl.ContainerOp): + """Flip a coin and output heads or tails randomly.""" + + def __init__(self): + super(FlipCoinOp, self).__init__( + name='Flip', + image='python:alpine3.6', + command=['sh', '-c'], + arguments=['python -c "import random; result = \'heads\' if random.randint(0,1) == 0 ' + 'else \'tails\'; print(result)" | tee /tmp/output'], + file_outputs={'output': '/tmp/output'}) + +class PrintOp(dsl.ContainerOp): + """Print a message.""" + + def __init__(self, msg): + super(PrintOp, self).__init__( + name='Print', + image='alpine:3.6', + command=['echo', msg], + ) + +@dsl._component.graph_component +def flip_component(flip_result): + print_flip = PrintOp(flip_result) + flipA = FlipCoinOp().after(print_flip) + with dsl.Condition(flipA.output == 'heads'): + flip_component(flipA.output) + return {'flip_result': flipA.output} + +@dsl.pipeline( + name='pipeline flip coin', + description='shows how to use dsl.Condition.' +) +def flipcoin(): + flipA = FlipCoinOp() + flip_loop = flip_component(flipA.output) + PrintOp('cool, it is over. %s' % flip_loop.outputs['flip_result']) + +if __name__ == '__main__': + import kfp.compiler as compiler + compiler.Compiler().compile(flipcoin, __file__ + '.tar.gz') From 8f67da2e4111434564464cd3a844528c4441b8f4 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Thu, 21 Mar 2019 17:49:28 -0700 Subject: [PATCH 18/29] add unit test for the graph opsgraph --- sdk/python/tests/dsl/ops_group_tests.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/dsl/ops_group_tests.py b/sdk/python/tests/dsl/ops_group_tests.py index e59469fb536..e3240093c08 100644 --- a/sdk/python/tests/dsl/ops_group_tests.py +++ b/sdk/python/tests/dsl/ops_group_tests.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import kfp.dsl as dsl from kfp.dsl import Pipeline, PipelineParam, ContainerOp, ExitHandler, OpsGroup import unittest @@ -47,6 +47,26 @@ def test_basic(self): self.assertFalse(loop_group.groups) self.assertCountEqual([x.name for x in loop_group.ops], ['op4']) + def test_recursive_opsgroups(self): + """Test recursive opsgroups.""" + with Pipeline('somename') as p: + self.assertEqual(1, len(p.groups)) + + # When a graph opsgraph is called. + graph_ops_group_one = dsl._ops_group.Graph('hello') + graph_ops_group_one.resolve_recursion() + self.assertFalse(graph_ops_group_one.is_recursive) + p.push_ops_group(graph_ops_group_one) + graph_ops_group_one._make_name_unique() + self.assertEqual('graph-hello-1', graph_ops_group_one.name) + + # Another graph opsgraph is called with the same name + # when the previous graph opsgraphs is not finished. + graph_ops_group_two = dsl._ops_group.Graph('hello') + graph_ops_group_two.resolve_recursion() + self.assertFalse(graph_ops_group_one.is_recursive) + self.assertTrue(graph_ops_group_two.is_recursive) + self.assertEqual(graph_ops_group_one, graph_ops_group_two.recursive_ref) class TestExitHandler(unittest.TestCase): From d99ddb27c324bf1fa9e08f3816789411e873b8df Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Fri, 22 Mar 2019 09:45:37 -0700 Subject: [PATCH 19/29] refactor the opsgroup --- sdk/python/kfp/compiler/compiler.py | 12 ++++++------ sdk/python/kfp/dsl/_component.py | 13 +++++-------- sdk/python/kfp/dsl/_ops_group.py | 22 ++++++++-------------- sdk/python/kfp/dsl/_pipeline.py | 1 + sdk/python/tests/dsl/ops_group_tests.py | 11 ++++------- 5 files changed, 24 insertions(+), 35 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 85565eb093e..3540d95cc1e 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -68,7 +68,7 @@ def _get_op_groups_helper(current_groups, ops_to_groups): for g in root_group.groups: # Add recursive opsgroup in the ops_to_groups # such that the i/o dependency can be propagated to the ancester opsgroups - if g.is_recursive: + if g.recursive_ref: ops_to_groups[g.name] = [x.name for x in current_groups] + [g.name] continue current_groups.append(g) @@ -90,7 +90,7 @@ def _get_groups_helper(group): for g in group.groups: # Skip the recursive opsgroup because no templates # need to be generated for the recursive opsgroups. - if not g.is_recursive: + if not g.recursive_ref: groups += _get_groups_helper(g) return groups @@ -159,7 +159,7 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups): # It propagates the recursive opsgroups IO to their ancester opsgroups def _get_inputs_outputs_recursive_opsgroup(group): #TODO: refactor the following codes with the above - if group.is_recursive: + if group.recursive_ref: for param in group.inputs + list(condition_params[group.name]): if param.value: continue @@ -207,7 +207,7 @@ def _get_condition_params_for_ops_helper(group, current_conditions_params): for g in group.groups: # If the subgroup is a recursive opsgroup, propagate the pipelineparams # in the condition expression, similar to the ops. - if g.is_recursive: + if g.recursive_ref: for param in new_current_conditions_params: conditions[g.name].add(param) else: @@ -245,7 +245,7 @@ def _get_dependencies(self, pipeline, root_group, op_groups): # Generate dependencies based on the recursive opsgroups #TODO: refactor the following codes with the above def _get_dependency_opsgroup(group, dependencies): - if group.is_recursive: + if group.recursive_ref: unstream_op_names = set() for param in group.inputs + list(condition_params[group.name]): if param.op_name: @@ -435,7 +435,7 @@ def _group_to_template(self, group, inputs, outputs, dependencies): # Generate tasks section. tasks = [] for sub_group in group.groups + group.ops: - is_recursive_subgroup = (isinstance(sub_group, OpsGroup) and sub_group.is_recursive) + is_recursive_subgroup = (isinstance(sub_group, OpsGroup) and sub_group.recursive_ref) # Special handling for recursive subgroup: use the existing opsgroup name if is_recursive_subgroup: task = { diff --git a/sdk/python/kfp/dsl/_component.py b/sdk/python/kfp/dsl/_component.py index 18fad5a0148..04cf2e46723 100644 --- a/sdk/python/kfp/dsl/_component.py +++ b/sdk/python/kfp/dsl/_component.py @@ -144,14 +144,11 @@ def _graph_component(*args, **kargs): if not isinstance(input, PipelineParam): raise ValueError('arguments to ' + func.__name__ + ' should be PipelineParams.') - graph_ops_group.resolve_recursion() - - # Entering Graph Context - _pipeline.Pipeline.get_default_pipeline().push_ops_group(graph_ops_group) + # Entering the Graph Context + graph_ops_group.__enter__() # Call the function - if not graph_ops_group.is_recursive: - graph_ops_group._make_name_unique() + if not graph_ops_group.recursive_ref: graph_ops_group.outputs = func(*args, **kargs) if not isinstance(graph_ops_group.outputs, dict): raise ValueError(func.__name__ + ' needs to return a dictionary of string to PipelineParam.') @@ -159,8 +156,8 @@ def _graph_component(*args, **kargs): if not (isinstance(output, str) and isinstance(graph_ops_group.outputs[output], PipelineParam)): raise ValueError(func.__name__ + ' needs to return a dictionary of string to PipelineParam.') - # Exiting Graph Context - _pipeline.Pipeline.get_default_pipeline().pop_ops_group() + # Exiting the Graph Context + graph_ops_group.__exit__() return graph_ops_group return _graph_component \ No newline at end of file diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 06a0cd3002e..91647aa839b 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -28,26 +28,17 @@ class OpsGroup(object): def __init__(self, group_type: str, name: str=None): """Create a new instance of OpsGroup. Args: - group_type: one of 'pipeline', 'exit_handler', 'condition', and 'graph'. + group_type (str): one of 'pipeline', 'exit_handler', 'condition', and 'graph'. + name (str): name of the opsgroup """ #TODO: declare the group_type to be strongly typed self.type = group_type self.ops = list() self.groups = list() self.name = name - # The following two fields are references to another OpsGroup that is already created. - self.is_recursive = False + # recursive_ref points to the opsgroups with the same name if exists. self.recursive_ref = None - def resolve_recursion(self): - """resolve_recursion resolves the recursion scenario where the OpsGroup is already created.""" - old_opsgroup = self._get_opsgroup_pipeline(self.type, self.name) - if old_opsgroup is None: - return - self.is_recursive = True - self.recursive_ref = old_opsgroup - - @staticmethod def _get_opsgroup_pipeline(group_type, name): """retrieves the opsgroup when the pipeline already contains it. @@ -58,7 +49,7 @@ def _get_opsgroup_pipeline(group_type, name): if not _pipeline.Pipeline.get_default_pipeline(): raise ValueError('Default pipeline not defined.') if name is None: - raise ValueError('name must be a string.') + return None name_prefix = (group_type + '-' + name + '-').replace('_', '-') for ops_group in _pipeline.Pipeline.get_default_pipeline().groups: if ops_group.type == group_type and ops_group.name.startswith(name_prefix): @@ -78,7 +69,10 @@ def __enter__(self): if not _pipeline.Pipeline.get_default_pipeline(): raise ValueError('Default pipeline not defined.') - self._make_name_unique() + self.recursive_ref = self._get_opsgroup_pipeline(self.type, self.name) + if not self.recursive_ref: + self._make_name_unique() + _pipeline.Pipeline.get_default_pipeline().push_ops_group(self) return self diff --git a/sdk/python/kfp/dsl/_pipeline.py b/sdk/python/kfp/dsl/_pipeline.py index c1da3c7de3e..8b2f50efd53 100644 --- a/sdk/python/kfp/dsl/_pipeline.py +++ b/sdk/python/kfp/dsl/_pipeline.py @@ -86,6 +86,7 @@ def get_pipeline_conf(): """ return Pipeline.get_default_pipeline().conf +#TODO: Pipeline is in fact an opsgroup, refactor the code. class Pipeline(): """A pipeline contains a list of operators. diff --git a/sdk/python/tests/dsl/ops_group_tests.py b/sdk/python/tests/dsl/ops_group_tests.py index e3240093c08..df28ae939fd 100644 --- a/sdk/python/tests/dsl/ops_group_tests.py +++ b/sdk/python/tests/dsl/ops_group_tests.py @@ -54,18 +54,15 @@ def test_recursive_opsgroups(self): # When a graph opsgraph is called. graph_ops_group_one = dsl._ops_group.Graph('hello') - graph_ops_group_one.resolve_recursion() - self.assertFalse(graph_ops_group_one.is_recursive) - p.push_ops_group(graph_ops_group_one) - graph_ops_group_one._make_name_unique() + graph_ops_group_one.__enter__() + self.assertFalse(graph_ops_group_one.recursive_ref) self.assertEqual('graph-hello-1', graph_ops_group_one.name) # Another graph opsgraph is called with the same name # when the previous graph opsgraphs is not finished. graph_ops_group_two = dsl._ops_group.Graph('hello') - graph_ops_group_two.resolve_recursion() - self.assertFalse(graph_ops_group_one.is_recursive) - self.assertTrue(graph_ops_group_two.is_recursive) + graph_ops_group_two.__enter__() + self.assertTrue(graph_ops_group_two.recursive_ref) self.assertEqual(graph_ops_group_one, graph_ops_group_two.recursive_ref) class TestExitHandler(unittest.TestCase): From d1ce1e2fe314ad0838d9da4d8fa2d36eeeb0d7f8 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Fri, 22 Mar 2019 10:09:35 -0700 Subject: [PATCH 20/29] add unit test for the graph_component decorator --- sdk/python/tests/dsl/component_tests.py | 31 +++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/sdk/python/tests/dsl/component_tests.py b/sdk/python/tests/dsl/component_tests.py index b6b9d244a5a..c69c888e437 100644 --- a/sdk/python/tests/dsl/component_tests.py +++ b/sdk/python/tests/dsl/component_tests.py @@ -13,10 +13,11 @@ # limitations under the License. import kfp +import kfp.dsl as dsl from kfp.dsl._component import component from kfp.dsl._metadata import ComponentMeta, ParameterMeta, TypeMeta from kfp.dsl._types import GCSPath, Integer, InconsistentTypeException -from kfp.dsl import ContainerOp, Pipeline +from kfp.dsl import ContainerOp, Pipeline, PipelineParam import unittest class TestPythonComponent(unittest.TestCase): @@ -422,4 +423,30 @@ def b_op(field_x: {'customized_type': {'openapi_schema_validator': '{"type": "st a = a_op(field_l=12) with self.assertRaises(InconsistentTypeException): b = b_op(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m']) - b = b_op(field_x=a.outputs['field_n'].ignore_type(), field_y=a.outputs['field_o'], field_z=a.outputs['field_m']) \ No newline at end of file + b = b_op(field_x=a.outputs['field_n'].ignore_type(), field_y=a.outputs['field_o'], field_z=a.outputs['field_m']) + +class TestGraphComponent(unittest.TestCase): + + def test_graphcomponent_basic(self): + """Test graph_component decorator metadata.""" + @dsl._component.graph_component + def flip_component(flip_result): + with dsl.Condition(flip_result == 'heads'): + flip_component(flip_result) + return {'flip_result': flip_result} + + with Pipeline('pipeline') as p: + param = PipelineParam(name='param') + flip_component(param) + self.assertEqual(1, len(p.groups)) + self.assertEqual(1, len(p.groups[0].groups)) # pipeline + self.assertEqual(1, len(p.groups[0].groups[0].groups)) # flip_component + self.assertEqual(1, len(p.groups[0].groups[0].groups[0].groups)) # condition + self.assertEqual(0, len(p.groups[0].groups[0].groups[0].groups[0].groups)) # recursive flip_component + recursive_group = p.groups[0].groups[0].groups[0].groups[0] + self.assertTrue(recursive_group.recursive_ref is not None) + self.assertEqual(1, len(recursive_group.inputs)) + self.assertEqual('param', recursive_group.inputs[0].name) + original_group = p.groups[0].groups[0] + self.assertTrue('flip_result' in original_group.outputs) + self.assertEqual('param', original_group.outputs['flip_result']) From 7e96a30f45c2b7edfbfb67eb5d45b0b9136405b7 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Fri, 22 Mar 2019 10:12:43 -0700 Subject: [PATCH 21/29] exposing graph_component decorator --- sdk/python/kfp/dsl/__init__.py | 2 +- sdk/python/tests/dsl/component_tests.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index 06fae50887b..c91424350d2 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -17,5 +17,5 @@ from ._pipeline import Pipeline, pipeline, get_pipeline_conf from ._container_op import ContainerOp from ._ops_group import OpsGroup, ExitHandler, Condition -from ._component import python_component +from ._component import python_component, graph_component #TODO: expose the component decorator when ready \ No newline at end of file diff --git a/sdk/python/tests/dsl/component_tests.py b/sdk/python/tests/dsl/component_tests.py index c69c888e437..b947f0c2fd1 100644 --- a/sdk/python/tests/dsl/component_tests.py +++ b/sdk/python/tests/dsl/component_tests.py @@ -14,7 +14,7 @@ import kfp import kfp.dsl as dsl -from kfp.dsl._component import component +from kfp.dsl._component import component, graph_component from kfp.dsl._metadata import ComponentMeta, ParameterMeta, TypeMeta from kfp.dsl._types import GCSPath, Integer, InconsistentTypeException from kfp.dsl import ContainerOp, Pipeline, PipelineParam @@ -429,7 +429,7 @@ class TestGraphComponent(unittest.TestCase): def test_graphcomponent_basic(self): """Test graph_component decorator metadata.""" - @dsl._component.graph_component + @graph_component def flip_component(flip_result): with dsl.Condition(flip_result == 'heads'): flip_component(flip_result) From f840cf5b54724ff04a814dc2445886438d3879ad Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Fri, 22 Mar 2019 10:35:46 -0700 Subject: [PATCH 22/29] add recursive compiler unit tests --- sdk/python/tests/compiler/compiler_tests.py | 4 + .../tests/compiler/testdata/recursive.py | 60 +++++ .../tests/compiler/testdata/recursive.yaml | 229 ++++++++++++++++++ 3 files changed, 293 insertions(+) create mode 100644 sdk/python/tests/compiler/testdata/recursive.py create mode 100644 sdk/python/tests/compiler/testdata/recursive.yaml diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 705c0cbe608..47460efc703 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -241,6 +241,10 @@ def test_py_image_pull_secret(self): """Test pipeline imagepullsecret.""" self._test_py_compile('imagepullsecret') + def test_py_recursive(self): + """Test pipeline recursive.""" + self._test_py_compile('recursive') + def test_type_checking_with_consistent_types(self): """Test type check pipeline parameters against component metadata.""" @component diff --git a/sdk/python/tests/compiler/testdata/recursive.py b/sdk/python/tests/compiler/testdata/recursive.py new file mode 100644 index 00000000000..c38ccfe8769 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/recursive.py @@ -0,0 +1,60 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import kfp.dsl as dsl +from kfp.dsl import graph_component + +class FlipCoinOp(dsl.ContainerOp): + """Flip a coin and output heads or tails randomly.""" + + def __init__(self): + super(FlipCoinOp, self).__init__( + name='Flip', + image='python:alpine3.6', + command=['sh', '-c'], + arguments=['python -c "import random; result = \'heads\' if random.randint(0,1) == 0 ' + 'else \'tails\'; print(result)" | tee /tmp/output'], + file_outputs={'output': '/tmp/output'}) + +class PrintOp(dsl.ContainerOp): + """Print a message.""" + + def __init__(self, msg): + super(PrintOp, self).__init__( + name='Print', + image='alpine:3.6', + command=['echo', msg], + ) + +@graph_component +def flip_component(flip_result): + print_flip = PrintOp(flip_result) + flipA = FlipCoinOp().after(print_flip) + with dsl.Condition(flipA.output == 'heads'): + flip_component(flipA.output) + return {'flip_result': flipA.output} + +@dsl.pipeline( + name='pipeline flip coin', + description='shows how to use graph_component.' +) +def recursive(): + flipA = FlipCoinOp() + flip_loop = flip_component(flipA.output) + PrintOp('cool, it is over. %s' % flip_loop.outputs['flip_result']) + +if __name__ == '__main__': + import kfp.compiler as compiler + compiler.Compiler().compile(recursive, __file__ + '.tar.gz') \ No newline at end of file diff --git a/sdk/python/tests/compiler/testdata/recursive.yaml b/sdk/python/tests/compiler/testdata/recursive.yaml new file mode 100644 index 00000000000..89afcc2aa27 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/recursive.yaml @@ -0,0 +1,229 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: pipeline-flip-coin- +spec: + arguments: + parameters: [] + entrypoint: pipeline-flip-coin + serviceAccountName: pipeline-runner + templates: + - dag: + tasks: + - arguments: + parameters: + - name: flip-output + value: '{{inputs.parameters.flip-2-output}}' + name: graph-flip-component-1 + template: graph-flip-component-1 + inputs: + parameters: + - name: flip-2-output + name: condition-2 + - container: + args: + - python -c "import random; result = 'heads' if random.randint(0,1) == 0 else + 'tails'; print(result)" | tee /tmp/output + command: + - sh + - -c + image: python:alpine3.6 + name: flip + outputs: + artifacts: + - name: mlpipeline-ui-metadata + path: /mlpipeline-ui-metadata.json + s3: + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + bucket: mlpipeline + endpoint: minio-service.kubeflow:9000 + insecure: true + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + - name: mlpipeline-metrics + path: /mlpipeline-metrics.json + s3: + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + bucket: mlpipeline + endpoint: minio-service.kubeflow:9000 + insecure: true + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + parameters: + - name: flip-output + valueFrom: + path: /tmp/output + - container: + args: + - python -c "import random; result = 'heads' if random.randint(0,1) == 0 else + 'tails'; print(result)" | tee /tmp/output + command: + - sh + - -c + image: python:alpine3.6 + name: flip-2 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + path: /mlpipeline-ui-metadata.json + s3: + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + bucket: mlpipeline + endpoint: minio-service.kubeflow:9000 + insecure: true + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + - name: mlpipeline-metrics + path: /mlpipeline-metrics.json + s3: + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + bucket: mlpipeline + endpoint: minio-service.kubeflow:9000 + insecure: true + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + parameters: + - name: flip-2-output + valueFrom: + path: /tmp/output + - dag: + tasks: + - arguments: + parameters: + - name: flip-2-output + value: '{{tasks.flip-2.outputs.parameters.flip-2-output}}' + dependencies: + - flip-2 + name: condition-2 + template: condition-2 + when: '{{tasks.flip-2.outputs.parameters.flip-2-output}} == heads' + - dependencies: + - print + name: flip-2 + template: flip-2 + - arguments: + parameters: + - name: flip-output + value: '{{inputs.parameters.flip-output}}' + name: print + template: print + inputs: + parameters: + - name: flip-output + name: graph-flip-component-1 + outputs: + parameters: + - name: flip-2-output + valueFrom: + parameter: '{{tasks.flip-2.outputs.parameters.flip-2-output}}' + - dag: + tasks: + - name: flip + template: flip + - arguments: + parameters: + - name: flip-output + value: '{{tasks.flip.outputs.parameters.flip-output}}' + dependencies: + - flip + name: graph-flip-component-1 + template: graph-flip-component-1 + - arguments: + parameters: + - name: flip-2-output + value: '{{tasks.graph-flip-component-1.outputs.parameters.flip-2-output}}' + dependencies: + - graph-flip-component-1 + name: print-2 + template: print-2 + name: pipeline-flip-coin + - container: + command: + - echo + - '{{inputs.parameters.flip-output}}' + image: alpine:3.6 + inputs: + parameters: + - name: flip-output + name: print + outputs: + artifacts: + - name: mlpipeline-ui-metadata + path: /mlpipeline-ui-metadata.json + s3: + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + bucket: mlpipeline + endpoint: minio-service.kubeflow:9000 + insecure: true + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + - name: mlpipeline-metrics + path: /mlpipeline-metrics.json + s3: + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + bucket: mlpipeline + endpoint: minio-service.kubeflow:9000 + insecure: true + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + - container: + command: + - echo + - cool, it is over. {{inputs.parameters.flip-2-output}} + image: alpine:3.6 + inputs: + parameters: + - name: flip-2-output + name: print-2 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + path: /mlpipeline-ui-metadata.json + s3: + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + bucket: mlpipeline + endpoint: minio-service.kubeflow:9000 + insecure: true + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + - name: mlpipeline-metrics + path: /mlpipeline-metrics.json + s3: + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + bucket: mlpipeline + endpoint: minio-service.kubeflow:9000 + insecure: true + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact From bb7e425cbc4ff50a2b6480910a744a77b41f1b22 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Fri, 22 Mar 2019 11:34:56 -0700 Subject: [PATCH 23/29] add the sample test --- test/e2e_test_gke_v2.yaml | 12 ++++++++++++ test/sample-test/run_test.sh | 13 +++++++++++++ 2 files changed, 25 insertions(+) diff --git a/test/e2e_test_gke_v2.yaml b/test/e2e_test_gke_v2.yaml index 48edc315981..47ba67329a6 100644 --- a/test/e2e_test_gke_v2.yaml +++ b/test/e2e_test_gke_v2.yaml @@ -155,6 +155,18 @@ spec: value: "{{inputs.parameters.namespace}}" - name: test-name value: "paralleljoin" + - name: run-recursive-tests + template: run-basic-e2e-tests + arguments: + parameters: + - name: test-results-gcs-dir + value: "{{inputs.parameters.test-results-gcs-dir}}" + - name: sample-tests-image + value: "{{inputs.parameters.target-image-prefix}}{{inputs.parameters.basic-e2e-tests-image-suffix}}" + - name: namespace + value: "{{inputs.parameters.namespace}}" + - name: test-name + value: "recursion" # Build and push image - name: build-image diff --git a/test/sample-test/run_test.sh b/test/sample-test/run_test.sh index 509d8f4ec69..494a3d2ebbb 100755 --- a/test/sample-test/run_test.sh +++ b/test/sample-test/run_test.sh @@ -248,6 +248,19 @@ elif [ "$TEST_NAME" == "paralleljoin" ]; then echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/" gsutil cp ${SAMPLE_PARALLEL_JOIN_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_PARALLEL_JOIN_TEST_RESULT} +elif [ "$TEST_NAME" == "recursion" ]; then + SAMPLE_RECURSION_TEST_RESULT=junit_SampleRecursionOutput.xml + SAMPLE_RECURSION_TEST_OUTPUT=${RESULTS_GCS_DIR} + + # Compile samples + cd ${BASE_DIR}/samples/basic + dsl-compile --py recursion.py --output recursion.tar.gz + + cd "${TEST_DIR}" + python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/recursion.tar.gz --result SAMPLE_RECURSION_TEST_RESULT --output $SAMPLE_RECURSION_TEST_OUTPUT --testname recursion --namespace ${NAMESPACE} + + echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/" + gsutil cp ${SAMPLE_RECURSION_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_RECURSION_TEST_RESULT} elif [ "$TEST_NAME" == "xgboost" ]; then SAMPLE_XGBOOST_TEST_RESULT=junit_SampleXGBoostOutput.xml SAMPLE_XGBOOST_TEST_OUTPUT=${RESULTS_GCS_DIR} From 035078e31c76eeec3e2fe3e23b12c41fba5d16b0 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Mon, 25 Mar 2019 15:04:12 -0700 Subject: [PATCH 24/29] fix the bug of opsgroup name adjust the graph_component usage example fix index bugs use with statement in the graph_component instead of directly calling the enter/exit functions --- sdk/python/kfp/compiler/compiler.py | 2 +- sdk/python/kfp/dsl/_component.py | 23 ++++++++++------------- sdk/python/kfp/dsl/_ops_group.py | 5 +++-- sdk/python/tests/dsl/ops_group_tests.py | 19 ++++++++++++++++++- 4 files changed, 32 insertions(+), 17 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 3540d95cc1e..89c4ef0b4a5 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -477,7 +477,7 @@ def _group_to_template(self, group, inputs, outputs, dependencies): for index, input in enumerate(sub_group.inputs): if param_name == input.name: break - referenced_input = sub_group.recursive_ref.inputs[index-1] + referenced_input = sub_group.recursive_ref.inputs[index] full_name = self._pipelineparam_full_name(referenced_input) arguments.append({ 'name': full_name, diff --git a/sdk/python/kfp/dsl/_component.py b/sdk/python/kfp/dsl/_component.py index 04cf2e46723..9d40adf8290 100644 --- a/sdk/python/kfp/dsl/_component.py +++ b/sdk/python/kfp/dsl/_component.py @@ -127,7 +127,8 @@ def graph_component(func): Usage: ```python - @dsl._component.graph_component + import kfp.dsl as dsl + @dsl.graph_component def flip_component(flip_result): print_flip = PrintOp(flip_result) flipA = FlipCoinOp().after(print_flip) @@ -145,19 +146,15 @@ def _graph_component(*args, **kargs): raise ValueError('arguments to ' + func.__name__ + ' should be PipelineParams.') # Entering the Graph Context - graph_ops_group.__enter__() - - # Call the function - if not graph_ops_group.recursive_ref: - graph_ops_group.outputs = func(*args, **kargs) - if not isinstance(graph_ops_group.outputs, dict): - raise ValueError(func.__name__ + ' needs to return a dictionary of string to PipelineParam.') - for output in graph_ops_group.outputs: - if not (isinstance(output, str) and isinstance(graph_ops_group.outputs[output], PipelineParam)): + with graph_ops_group: + # Call the function + if not graph_ops_group.recursive_ref: + graph_ops_group.outputs = func(*args, **kargs) + if not isinstance(graph_ops_group.outputs, dict): raise ValueError(func.__name__ + ' needs to return a dictionary of string to PipelineParam.') - - # Exiting the Graph Context - graph_ops_group.__exit__() + for output in graph_ops_group.outputs: + if not (isinstance(output, str) and isinstance(graph_ops_group.outputs[output], PipelineParam)): + raise ValueError(func.__name__ + ' needs to return a dictionary of string to PipelineParam.') return graph_ops_group return _graph_component \ No newline at end of file diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index 91647aa839b..72374405623 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -50,9 +50,10 @@ def _get_opsgroup_pipeline(group_type, name): raise ValueError('Default pipeline not defined.') if name is None: return None - name_prefix = (group_type + '-' + name + '-').replace('_', '-') + name_pattern = '^' + (group_type + '-' + name + '-').replace('_', '-') + '[\d]+$' for ops_group in _pipeline.Pipeline.get_default_pipeline().groups: - if ops_group.type == group_type and ops_group.name.startswith(name_prefix): + import re + if ops_group.type == group_type and re.match(name_pattern ,ops_group.name): return ops_group return None diff --git a/sdk/python/tests/dsl/ops_group_tests.py b/sdk/python/tests/dsl/ops_group_tests.py index df28ae939fd..cb27098944c 100644 --- a/sdk/python/tests/dsl/ops_group_tests.py +++ b/sdk/python/tests/dsl/ops_group_tests.py @@ -47,7 +47,7 @@ def test_basic(self): self.assertFalse(loop_group.groups) self.assertCountEqual([x.name for x in loop_group.ops], ['op4']) - def test_recursive_opsgroups(self): + def test_basic_recursive_opsgroups(self): """Test recursive opsgroups.""" with Pipeline('somename') as p: self.assertEqual(1, len(p.groups)) @@ -65,6 +65,23 @@ def test_recursive_opsgroups(self): self.assertTrue(graph_ops_group_two.recursive_ref) self.assertEqual(graph_ops_group_one, graph_ops_group_two.recursive_ref) + def test_recursive_opsgroups_with_prefix_names(self): + """Test recursive opsgroups.""" + with Pipeline('somename') as p: + self.assertEqual(1, len(p.groups)) + + # When a graph opsgraph is called. + graph_ops_group_one = dsl._ops_group.Graph('foo_bar') + graph_ops_group_one.__enter__() + self.assertFalse(graph_ops_group_one.recursive_ref) + self.assertEqual('graph-foo-bar-1', graph_ops_group_one.name) + + # Another graph opsgraph is called with the name as the prefix of the ops_group_one + # when the previous graph opsgraphs is not finished. + graph_ops_group_two = dsl._ops_group.Graph('foo') + graph_ops_group_two.__enter__() + self.assertFalse(graph_ops_group_two.recursive_ref) + class TestExitHandler(unittest.TestCase): def test_basic(self): From b824b3cf5360ee11fb8481589419e40366ef36a6 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Mon, 25 Mar 2019 16:30:28 -0700 Subject: [PATCH 25/29] add a todo to combine the graph_component and component decorators --- sdk/python/kfp/dsl/_component.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/kfp/dsl/_component.py b/sdk/python/kfp/dsl/_component.py index 9d40adf8290..19aa9ad8757 100644 --- a/sdk/python/kfp/dsl/_component.py +++ b/sdk/python/kfp/dsl/_component.py @@ -121,6 +121,7 @@ def _component(*args, **kargs): return _component +#TODO: combine the component and graph_component decorators into one def graph_component(func): """Decorator for graph component functions. This decorator returns an ops_group. From 9dd86fa27cea82cb05f6918740b19983a6cd891d Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Tue, 26 Mar 2019 17:34:56 -0700 Subject: [PATCH 26/29] fix some merging bug --- sdk/python/tests/compiler/compiler_tests.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 300187b762e..31b13501c7e 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -287,10 +287,6 @@ def test_py_recursive(self): """Test pipeline recursive.""" self._test_py_compile_yaml('recursive') - def test_py_recursive(self): - """Test pipeline recursive.""" - self._test_py_compile('recursive') - def test_type_checking_with_consistent_types(self): """Test type check pipeline parameters against component metadata.""" @component From 9169b0193e912bd381bac863063c5587d09255d7 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Wed, 27 Mar 2019 09:24:49 -0700 Subject: [PATCH 27/29] fix typo --- test/sample-test/run_test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/sample-test/run_test.sh b/test/sample-test/run_test.sh index 509ea5c3caf..a1e84778cc3 100755 --- a/test/sample-test/run_test.sh +++ b/test/sample-test/run_test.sh @@ -257,7 +257,7 @@ elif [ "$TEST_NAME" == "recursion" ]; then dsl-compile --py recursion.py --output recursion.tar.gz cd "${TEST_DIR}" - python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/recursion.tar.gz --result SAMPLE_RECURSION_TEST_RESULT --output $SAMPLE_RECURSION_TEST_OUTPUT --testname recursion --namespace ${NAMESPACE} + python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/recursion.tar.gz --result $SAMPLE_RECURSION_TEST_RESULT --output $SAMPLE_RECURSION_TEST_OUTPUT --testname recursion --namespace ${NAMESPACE} echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/" gsutil cp ${SAMPLE_RECURSION_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_RECURSION_TEST_RESULT} From bc12f98f25b25c111a0556dfbc909197fd613417 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Wed, 27 Mar 2019 11:13:58 -0700 Subject: [PATCH 28/29] add more comments in the sample --- samples/basic/recursion.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/samples/basic/recursion.py b/samples/basic/recursion.py index 7c091c21a0e..0768085433c 100644 --- a/samples/basic/recursion.py +++ b/samples/basic/recursion.py @@ -37,12 +37,21 @@ def __init__(self, msg): command=['echo', msg], ) -@dsl._component.graph_component +# Use the dsl.graph_component to decorate functions that are +# recursively called. +@dsl.graph_component def flip_component(flip_result): print_flip = PrintOp(flip_result) flipA = FlipCoinOp().after(print_flip) with dsl.Condition(flipA.output == 'heads'): + # When the flip_component is called recursively, the flipA.output + # from inside the component will be passed to the next flip_component + # as the input, compared to the flipA.output in the flipcoin + # function. flip_component(flipA.output) + # Return a dictionary of string to arguments + # such that the downstream components that depend + # on this graph component can access the output. return {'flip_result': flipA.output} @dsl.pipeline( @@ -52,6 +61,8 @@ def flip_component(flip_result): def flipcoin(): flipA = FlipCoinOp() flip_loop = flip_component(flipA.output) + # flip_loop is a graph_component with the outputs field + # filled with the returned dictionary. PrintOp('cool, it is over. %s' % flip_loop.outputs['flip_result']) if __name__ == '__main__': From cd77e427e77e947ef358b7c202f0b737e6293364 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Wed, 27 Mar 2019 11:17:44 -0700 Subject: [PATCH 29/29] update comments --- samples/basic/recursion.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/samples/basic/recursion.py b/samples/basic/recursion.py index 0768085433c..b9057a97d5f 100644 --- a/samples/basic/recursion.py +++ b/samples/basic/recursion.py @@ -45,9 +45,9 @@ def flip_component(flip_result): flipA = FlipCoinOp().after(print_flip) with dsl.Condition(flipA.output == 'heads'): # When the flip_component is called recursively, the flipA.output - # from inside the component will be passed to the next flip_component - # as the input, compared to the flipA.output in the flipcoin - # function. + # from inside the graph component will be passed to the next flip_component + # as the input whereas the flip_result in the current graph component + # comes from the flipA.output in the flipcoin function. flip_component(flipA.output) # Return a dictionary of string to arguments # such that the downstream components that depend