From 98e4d2f8813fc2f5e6c9f971cb3f2a60325fe1fc Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 6 Nov 2018 09:26:41 -0800 Subject: [PATCH] SDK/DSL/Compiler - Fixed compilation of dsl.Condition (#28) * Fixed compilation of dsl.Conditional The compiler no longer produced intermediate steps. * Got rid of _create_new_groups * Changed the sub_group.type check * Fix tfx name bug in the tfma sample test (#67) * fix tfx name bug * update release build for the data publish --- sdk/python/kfp/compiler/compiler.py | 130 +++++++------------ sdk/python/tests/compiler/testdata/coin.yaml | 48 +------ 2 files changed, 56 insertions(+), 122 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 4d5fdcda783..06f0a33e27c 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -289,14 +289,17 @@ def _get_dependencies(self, pipeline, root_group, op_groups): dependencies[downstream_groups[0]].add(upstream_groups[0]) return dependencies - def _create_condition(self, condition): - left = ('{{inputs.parameters.%s}}' % self._param_full_name(condition.operand1) - if isinstance(condition.operand1, dsl.PipelineParam) - else str(condition.operand1)) - right = ('{{inputs.parameters.%s}}' % self._param_full_name(condition.operand2) - if isinstance(condition.operand2, dsl.PipelineParam) - else str(condition.operand2)) - return ('%s == %s' % (left, right)) + def _resolve_value_or_reference(self, value_or_reference, inputs): + if isinstance(value_or_reference, dsl.PipelineParam): + parameter_name = self._param_full_name(value_or_reference) + task_names = [task_name for param_name, task_name in inputs if param_name == parameter_name] + if task_names: + task_name = task_names[0] + return '{{tasks.%s.outputs.parameters.%s}}' % (task_name, parameter_name) + else: + return '{{inputs.parameters.%s}}' % parameter_name + else: + return str(value_or_reference) def _group_to_template(self, group, inputs, outputs, dependencies): """Generate template given an OpsGroup. @@ -326,89 +329,56 @@ def _group_to_template(self, group, inputs, outputs, dependencies): template_outputs.sort(key=lambda x: x['name']) template['outputs'] = {'parameters': template_outputs} - if group.type == 'condition': - # This is a workaround for the fact that argo does not support conditions in DAG mode. - # Basically, we insert an extra group that contains only the original group. The extra group - # operates in "step" mode where condition is supported. - only_child = group.groups[0] - step = { - 'name': only_child.name, - 'template': only_child.name, + # Generate tasks section. + tasks = [] + for sub_group in group.groups + group.ops: + task = { + 'name': sub_group.name, + 'template': sub_group.name, } - if inputs.get(only_child.name, None): + + if isinstance(sub_group, dsl.OpsGroup) and sub_group.type == 'condition': + subgroup_inputs = inputs.get(sub_group.name, []) + condition = sub_group.condition + condition_operation = '==' + operand1_value = self._resolve_value_or_reference(condition.operand1, subgroup_inputs) + operand2_value = self._resolve_value_or_reference(condition.operand2, subgroup_inputs) + task['when'] = '{} {} {}'.format(operand1_value, condition_operation, operand2_value) + + # Generate dependencies section for this task. + if dependencies.get(sub_group.name, None): + group_dependencies = list(dependencies[sub_group.name]) + group_dependencies.sort() + task['dependencies'] = group_dependencies + + # Generate arguments section for this task. + if inputs.get(sub_group.name, None): arguments = [] - for param_name, dependent_name in inputs[only_child.name]: - arguments.append({ + for param_name, dependent_name in inputs[sub_group.name]: + if dependent_name: + # The value comes from an upstream sibling. + arguments.append({ + 'name': param_name, + 'value': '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name) + }) + else: + # The value comes from its parent. + arguments.append({ 'name': param_name, 'value': '{{inputs.parameters.%s}}' % param_name - }) + }) arguments.sort(key=lambda x: x['name']) - step['arguments'] = {'parameters': arguments} - step['when'] = self._create_condition(group.condition) - template['steps'] = [[step]] - else: - # Generate tasks section. - tasks = [] - for sub_group in group.groups + group.ops: - task = { - 'name': sub_group.name, - 'template': sub_group.name, - } - # Generate dependencies section for this task. - if dependencies.get(sub_group.name, None): - group_dependencies = list(dependencies[sub_group.name]) - group_dependencies.sort() - task['dependencies'] = group_dependencies - - # Generate arguments section for this task. - if inputs.get(sub_group.name, None): - arguments = [] - for param_name, dependent_name in inputs[sub_group.name]: - if dependent_name: - # The value comes from an upstream sibling. - arguments.append({ - 'name': param_name, - 'value': '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name) - }) - else: - # The value comes from its parent. - arguments.append({ - 'name': param_name, - 'value': '{{inputs.parameters.%s}}' % param_name - }) - arguments.sort(key=lambda x: x['name']) - task['arguments'] = {'parameters': arguments} - tasks.append(task) - tasks.sort(key=lambda x: x['name']) - template['dag'] = {'tasks': tasks} + task['arguments'] = {'parameters': arguments} + tasks.append(task) + tasks.sort(key=lambda x: x['name']) + template['dag'] = {'tasks': tasks} return template - def _create_new_groups(self, root_group): - """Create a copy of the input group, and insert extra groups for conditions.""" - - new_group = copy.deepcopy(root_group) - - def _insert_group_for_condition_helper(group): - for i, g in enumerate(group.groups): - if g.type == 'condition': - child_condition_group = dsl.OpsGroup('condition-child', g.name + '-child') - child_condition_group.ops = g.ops - child_condition_group.groups = g.groups - g.groups = [child_condition_group] - g.ops = list() - _insert_group_for_condition_helper(child_condition_group) - else: - _insert_group_for_condition_helper(g) - - _insert_group_for_condition_helper(new_group) - return new_group def _create_templates(self, pipeline): """Create all groups and ops templates in the pipeline.""" - # This is needed only because Argo does not support condition in DAG mode. - # Revisit when https://github.com/argoproj/argo/issues/921 is fixed. - new_root_group = self._create_new_groups(pipeline.groups[0]) + new_root_group = pipeline.groups[0] op_groups = self._get_groups_for_ops(new_root_group) inputs, outputs = self._get_inputs_outputs(pipeline, new_root_group, op_groups) diff --git a/sdk/python/tests/compiler/testdata/coin.yaml b/sdk/python/tests/compiler/testdata/coin.yaml index 47d5a5e28d0..aa464591e2f 100644 --- a/sdk/python/tests/compiler/testdata/coin.yaml +++ b/sdk/python/tests/compiler/testdata/coin.yaml @@ -21,18 +21,6 @@ spec: entrypoint: pipeline-flip-coin serviceAccountName: pipeline-runner templates: - - inputs: - parameters: - - name: flip-output - name: condition-1 - steps: - - - arguments: - parameters: - - name: flip-output - value: '{{inputs.parameters.flip-output}}' - name: condition-1-child - template: condition-1-child - when: '{{inputs.parameters.flip-output}} == heads' - dag: tasks: - arguments: @@ -45,6 +33,7 @@ spec: - flip-again name: condition-2 template: condition-2 + when: '{{tasks.flip-again.outputs.parameters.flip-again-output}} == tails' - arguments: parameters: - name: flip-output @@ -54,22 +43,7 @@ spec: inputs: parameters: - name: flip-output - name: condition-1-child - - inputs: - parameters: - - name: flip-again-output - - name: flip-output - name: condition-2 - steps: - - - arguments: - parameters: - - name: flip-again-output - value: '{{inputs.parameters.flip-again-output}}' - - name: flip-output - value: '{{inputs.parameters.flip-output}}' - name: condition-2-child - template: condition-2-child - when: '{{inputs.parameters.flip-again-output}} == tails' + name: condition-1 - dag: tasks: - arguments: @@ -84,19 +58,7 @@ spec: parameters: - name: flip-again-output - name: flip-output - name: condition-2-child - - inputs: - parameters: - - name: flip-output - name: condition-3 - steps: - - - arguments: - parameters: - - name: flip-output - value: '{{inputs.parameters.flip-output}}' - name: condition-3-child - template: condition-3-child - when: '{{inputs.parameters.flip-output}} == tails' + name: condition-2 - dag: tasks: - arguments: @@ -108,7 +70,7 @@ spec: inputs: parameters: - name: flip-output - name: condition-3-child + name: condition-3 - container: args: - python -c "import random; result = 'heads' if random.randint(0,1) == 0 else @@ -201,6 +163,7 @@ spec: - flip name: condition-1 template: condition-1 + when: '{{tasks.flip.outputs.parameters.flip-output}} == heads' - arguments: parameters: - name: flip-output @@ -209,6 +172,7 @@ spec: - flip name: condition-3 template: condition-3 + when: '{{tasks.flip.outputs.parameters.flip-output}} == tails' - name: flip template: flip name: pipeline-flip-coin