Skip to content

Commit

Permalink
Task output parameters as input for subsequent tasks (#27)
Browse files Browse the repository at this point in the history
Closes #19
  • Loading branch information
ckadner authored Mar 18, 2020
1 parent ab5d433 commit 9ff9467
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 87 deletions.
6 changes: 3 additions & 3 deletions sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ a basic sequential pipeline.

2. Build the compiler:

- `pip install sdk/python`
- `pip install -e sdk/python`

3. Run the compiler tests (optional):

Expand All @@ -34,7 +34,7 @@ a basic sequential pipeline.
4. Compile the sample pipeline:

- `cd sdk/samples`
- `dsl-compile-tekton --py ./parallel_join.py --output pipeline.yaml`
- `dsl-compile-tekton --py parallel_join.py --output pipeline.yaml`

5. Run the sample pipeline on a Tekton cluster:

Expand All @@ -46,5 +46,5 @@ a basic sequential pipeline.

- Python: `3.7.5`
- Kubeflow Pipelines: [`0.2.2`](https://github.com/kubeflow/pipelines/releases/tag/0.2.2)
- Tekton: [`0.10.0`](https://github.com/tektoncd/pipeline/releases/tag/v0.10.0)
- Tekton: [`0.11.0`](https://github.com/tektoncd/pipeline/releases/tag/v0.11.0-rc1)

2 changes: 2 additions & 0 deletions sdk/python/kfp_tekton/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@
# limitations under the License.

__version__ = '0.0.1'

tekton_api_version = 'tekton.dev/v1beta1'
8 changes: 6 additions & 2 deletions sdk/python/kfp_tekton/compiler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ def monkey_patch():


try:
# print("Applying monkey patch")
print("Applying KFP-Tekton compiler patch")
monkey_patch()
# uncomment the following lines to trace what triggers this code
# for line in traceback.format_stack():
# print(line.strip())
# raise ValueError("how did we get here")
except Exception as error:
traceback.print_exc()
print("Failed to apply monkey patch")
print("Failed to apply KFP-Tekton compiler patch")
sys.exit(1)
70 changes: 43 additions & 27 deletions sdk/python/kfp_tekton/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict

from kfp.compiler._k8s_helper import convert_k8s_obj_to_json
from kfp.compiler._op_to_template import _process_obj, _inputs_to_json, _outputs_to_json
from kfp import dsl
from kfp.dsl import ArtifactLocation
from kfp.dsl._container_op import BaseOp

from .. import tekton_api_version


def _process_base_ops(op: BaseOp):
"""Recursively go through the attrs listed in `attrs_with_pipelineparams`
Expand Down Expand Up @@ -57,28 +61,28 @@ def _op_to_template(op: BaseOp):

if isinstance(op, dsl.ContainerOp):
# default output artifacts
# output_artifact_paths = OrderedDict(op.output_artifact_paths)
output_artifact_paths = OrderedDict(op.output_artifact_paths)
# print(op.output_artifact_paths)
# # This should have been as easy as output_artifact_paths.update(op.file_outputs), but the _outputs_to_json function changes the output names and we must do the same here, so that the names are the same
# output_artifact_paths.update(sorted(((param.full_name, processed_op.file_outputs[param.name]) for param in processed_op.outputs.values()), key=lambda x: x[0]))

# output_artifacts = [
# convert_k8s_obj_to_json(
# ArtifactLocation.create_artifact_for_s3(
# op.artifact_location,
# name=name,
# path=path,
# key='runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz'))
# for name, path in output_artifact_paths.items()
# ]
# This should have been as easy as output_artifact_paths.update(op.file_outputs), but the _outputs_to_json function changes the output names and we must do the same here, so that the names are the same
output_artifact_paths.update(sorted(((param.full_name, processed_op.file_outputs[param.name]) for param in processed_op.outputs.values()), key=lambda x: x[0]))

output_artifacts = [
# convert_k8s_obj_to_json(
# ArtifactLocation.create_artifact_for_s3(
# op.artifact_location,
# name=name,
# path=path,
# key='runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz'))
# for name, path in output_artifact_paths.items()
]

# workflow template
container = convert_k8s_obj_to_json(
processed_op.container
)

template = {
'apiVersion': 'tekton.dev/v1alpha1',
'apiVersion': tekton_api_version,
'kind': 'Task',
'metadata': {'name': processed_op.name},
'spec': {
Expand Down Expand Up @@ -113,19 +117,31 @@ def _op_to_template(op: BaseOp):
artifact_arguments = processed_op.artifact_arguments if isinstance(processed_op, dsl.ContainerOp) else None
inputs = _inputs_to_json(processed_op.inputs, input_artifact_paths, artifact_arguments)
if inputs:
template['spec']['inputs'] = {'params': inputs['parameters']}

# NOTE: the following lines are commented out while we are working on supporting parallel tasks
# execution and parameter handling

# # outputs
# if isinstance(op, dsl.ContainerOp):
# param_outputs = processed_op.file_outputs
# elif isinstance(op, dsl.ResourceOp):
# param_outputs = processed_op.attribute_outputs
# outputs_dict = _outputs_to_json(op, processed_op.outputs, param_outputs, output_artifacts)
# if outputs_dict:
# template['outputs'] = outputs_dict
template['spec']['params'] = inputs['parameters']

# outputs
if isinstance(op, dsl.ContainerOp):
param_outputs = processed_op.file_outputs
elif isinstance(op, dsl.ResourceOp):
param_outputs = processed_op.attribute_outputs
outputs_dict = _outputs_to_json(op, processed_op.outputs, param_outputs, output_artifacts)
if outputs_dict:
template['spec']['results'] = []
for name, path in processed_op.file_outputs.items():
template['spec']['results'].append({
'name': name,
'description': path
})
# replace all occurrences of the output file path with the Tekton output parameter expression
for s in template['spec']['steps']:
s['command'] = [c.replace(path, '$(results.%s.path)' % name)
for c in s['command']]
s['args'] = [a.replace(path, '$(results.%s.path)' % name)
for a in s['args']]

# ****************************************************************************************************
# NOTE: the following lines are commented out while the corresponding features are being implemented
# ****************************************************************************************************

# # node selector
# if processed_op.node_selector:
Expand Down
22 changes: 19 additions & 3 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from kfp.components.structures import InputSpec
from kfp.dsl._metadata import _extract_pipeline_metadata

from .. import tekton_api_version


class TektonCompiler(Compiler) :
"""DSL Compiler to generate Tekton YAML.
Expand Down Expand Up @@ -93,8 +95,8 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
},
'params': [{
'name': p['name'],
'value': '$(params.%s)' % p['name']
} for p in t['spec'].get('inputs', {}).get('params', [])
'value': p.get('default', '')
} for p in t['spec'].get('params', [])
]
}
for t in tasks
Expand All @@ -106,9 +108,23 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
if op.dependent_names:
task['runAfter'] = op.dependent_names

# process input parameters from upstream tasks
pipeline_param_names = [p['name'] for p in params]
for task in task_refs:
op = pipeline.ops.get(task['name'])
for tp in task.get('params', []):
if tp['name'] in pipeline_param_names:
tp['value'] = '$(params.%s)' % tp['name']
break
else:
for pp in op.inputs:
if tp['name'] == pp.full_name:
tp['value'] = '$(tasks.%s.results.%s)' % (pp.op_name, pp.name)
break

# generate the Tekton Pipeline document
pipeline = {
'apiVersion': 'tekton.dev/v1alpha1',
'apiVersion': tekton_api_version,
'kind': 'Pipeline',
'metadata': {
'name': pipeline.name or 'Pipeline'
Expand Down
2 changes: 0 additions & 2 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@ def test_sequential_workflow(self):
from .testdata.sequential import sequential_pipeline
self._test_pipeline_workflow(sequential_pipeline, 'sequential.yaml')


def test_parallel_join_workflow(self):
"""
Test compiling a parallel join workflow.
"""
from .testdata.parallel_join import download_and_join
self._test_pipeline_workflow(download_and_join, 'parallel_join.yaml')


def _test_pipeline_workflow(self, pipeline_function, pipeline_yaml):
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml)
Expand Down
26 changes: 21 additions & 5 deletions sdk/python/tests/compiler/testdata/parallel_join.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
from kfp_tekton.compiler import TektonCompiler
#!/usr/bin/env python3

# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl


Expand Down Expand Up @@ -31,14 +46,15 @@ def download_and_join(
url1='gs://ml-pipeline-playground/shakespeare1.txt',
url2='gs://ml-pipeline-playground/shakespeare2.txt'
):
"""A three-step pipeline with first two running in parallel."""
"""A three-step pipeline with the first two steps running in parallel."""

download1_task = gcs_download_op(url1)
download2_task = gcs_download_op(url2)

# TODO: convert this task to pass parameters
echo_task = echo2_op(url1, url2).after(download1_task).after(download2_task)
echo_task = echo2_op(download1_task.output, download2_task.output)


if __name__ == '__main__':
TektonCompiler().compile(download_and_join, 'parallel_join.yaml')
# don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(download_and_join, __file__.replace('.py', '.yaml'))
64 changes: 39 additions & 25 deletions sdk/python/tests/compiler/testdata/parallel_join.yaml
Original file line number Diff line number Diff line change
@@ -1,62 +1,79 @@
apiVersion: tekton.dev/v1alpha1
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: gcs-download
spec:
inputs:
params:
- name: url1
params:
- name: url1
results:
- description: /tmp/results.txt
name: data
steps:
- args:
- gsutil cat $0 | tee $1
- $(inputs.params.url1)
- /tmp/results.txt
- $(results.data.path)
command:
- sh
- -c
image: google/cloud-sdk:279.0.0
name: gcs-download
---
apiVersion: tekton.dev/v1alpha1
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: gcs-download-2
spec:
inputs:
params:
- name: url2
params:
- name: url2
results:
- description: /tmp/results.txt
name: data
steps:
- args:
- gsutil cat $0 | tee $1
- $(inputs.params.url2)
- /tmp/results.txt
- $(results.data.path)
command:
- sh
- -c
image: google/cloud-sdk:279.0.0
name: gcs-download-2
---
apiVersion: tekton.dev/v1alpha1
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: echo
spec:
inputs:
params:
- name: url1
- name: url2
params:
- name: gcs-download-2-data
- name: gcs-download-data
steps:
- args:
- 'echo "Text 1: $0"; echo "Text 2: $1"'
- $(inputs.params.url1)
- $(inputs.params.url2)
- $(inputs.params.gcs-download-data)
- $(inputs.params.gcs-download-2-data)
command:
- sh
- -c
image: library/bash:4.4.23
name: echo
---
apiVersion: tekton.dev/v1alpha1
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
annotations:
Expand Down Expand Up @@ -86,12 +103,9 @@ spec:
name: gcs-download-2
- name: echo
params:
- name: url1
value: $(params.url1)
- name: url2
value: $(params.url2)
runAfter:
- gcs-download
- gcs-download-2
- name: gcs-download-2-data
value: $(tasks.gcs-download-2.results.data)
- name: gcs-download-data
value: $(tasks.gcs-download.results.data)
taskRef:
name: echo
11 changes: 8 additions & 3 deletions sdk/python/tests/compiler/testdata/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp_tekton.compiler import TektonCompiler
from kfp import dsl


Expand All @@ -40,7 +39,10 @@ def echo_op(text):
name='Sequential pipeline',
description='A pipeline with two sequential steps.'
)
def sequential_pipeline(url='gs://ml-pipeline-playground/shakespeare1.txt', path='/tmp/results.txt'):
def sequential_pipeline(
url='gs://ml-pipeline-playground/shakespeare1.txt',
path='/tmp/results.txt'
):
"""A pipeline with two sequential steps."""

download_task = gcs_download_op(url)
Expand All @@ -50,4 +52,7 @@ def sequential_pipeline(url='gs://ml-pipeline-playground/shakespeare1.txt', path


if __name__ == '__main__':
TektonCompiler().compile(sequential_pipeline, 'sequential.yaml')
# don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(sequential_pipeline, __file__.replace('.py', '.yaml'))

Loading

0 comments on commit 9ff9467

Please sign in to comment.