Skip to content

Commit

Permalink
Add volumemount support and fix parameter bugs (#40)
Browse files Browse the repository at this point in the history
* add volumemount support and fix parameter bugs

* remove sdk/samples folder and use real kfp example

* fix formatting

* update unittest and instructions

* update license header

* update license header

* update license
  • Loading branch information
Tomcli authored Mar 20, 2020
1 parent aef20e0 commit 8e944cf
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 128 deletions.
13 changes: 9 additions & 4 deletions sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ Here we update the `Compiler` of the KFP SDK to generate `Tekton` YAML for a bas
- Kubeflow Pipelines: [`0.2.2`](https://github.com/kubeflow/pipelines/releases/tag/0.2.2)
- Tekton: [`0.11.0`](https://github.com/tektoncd/pipeline/releases/tag/v0.11.0-rc1)
- Tekton CLI: [`0.8.0`](https://github.com/tektoncd/cli/releases/tag/v0.8.0)


## Tested Pipelines
- [Execution Order](https://github.com/kubeflow/pipelines/blob/master/samples/core/execution_order/execution_order.py)
- [Parallel Join](https://github.com/kubeflow/pipelines/blob/master/samples/core/parallel_join/parallel_join.py)
- [Watson ML](https://github.com/kubeflow/pipelines/blob/master/samples/contrib/ibm-samples/watson/watson_train_serve_pipeline.py)
- Watson ML pipeline requires the default service account to have list, write, and delete secrets permission.

## Steps

1. Clone the kfp-tekton repo:
Expand All @@ -40,9 +46,8 @@ Here we update the `Compiler` of the KFP SDK to generate `Tekton` YAML for a bas
- `./sdk/python/tests/run_tests.sh`

5. Compile the sample pipeline:

- `cd sdk/samples`
- `dsl-compile-tekton --py parallel_join.py --output pipeline.yaml`

- `dsl-compile-tekton --py sdk/python/tests/compiler/testdata/parallel_join.py --output pipeline.yaml`

6. Run the sample pipeline on a Tekton cluster:

Expand Down
6 changes: 3 additions & 3 deletions sdk/python/kfp_tekton/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def _op_to_template(op: BaseOp):
if outputs_dict:
template['spec']['results'] = []
for name, path in processed_op.file_outputs.items():
name = name.replace('_', '-') # replace '_' to '-' since tekton results doesn't support underscore
template['spec']['results'].append({
'name': name,
'description': path
Expand Down Expand Up @@ -190,9 +191,8 @@ def _op_to_template(op: BaseOp):

# volumes
if processed_op.volumes:
raise NotImplementedError("'volumes' are not (yet) implemented")
template['volumes'] = [convert_k8s_obj_to_json(volume) for volume in processed_op.volumes]
template['volumes'].sort(key=lambda x: x['name'])
template['spec']['volumes'] = [convert_k8s_obj_to_json(volume) for volume in processed_op.volumes]
template['spec']['volumes'].sort(key=lambda x: x['name'])

# Display name
if processed_op.display_name:
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
for tp in task.get('params', []):
if tp['name'] in pipeline_param_names:
tp['value'] = '$(params.%s)' % tp['name']
break
else:
for pp in op.inputs:
if tp['name'] == pp.full_name:
tp['value'] = '$(tasks.%s.results.%s)' % (pp.op_name, pp.name)
# replace '_' to '-' since tekton results doesn't support underscore
tp['value'] = '$(tasks.%s.results.%s)' % (pp.op_name, pp.name.replace('_', '-'))
break

# generate the Tekton Pipeline document
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ def test_parallel_join_workflow(self):
from .testdata.parallel_join import download_and_join
self._test_pipeline_workflow(download_and_join, 'parallel_join.yaml')

def test_volume_workflow(self):
"""
Test compiling a Waston ML workflow.
"""
from .testdata.volume import volume_pipeline
self._test_pipeline_workflow(volume_pipeline, 'volume.yaml')

def _test_pipeline_workflow(self, pipeline_function, pipeline_yaml):
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml)
Expand Down
1 change: 0 additions & 1 deletion sdk/python/tests/compiler/testdata/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,3 @@ def sequential_pipeline(
# don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(sequential_pipeline, __file__.replace('.py', '.yaml'))

44 changes: 44 additions & 0 deletions sdk/python/tests/compiler/testdata/volume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import kfp.dsl as dsl
from kubernetes import client as k8s_client


@dsl.pipeline(
name='Volume',
description='A pipeline with volume.'
)
def volume_pipeline():
op1 = dsl.ContainerOp(
name='download',
image='google/cloud-sdk',
command=['sh', '-c'],
arguments=['ls | tee /tmp/results.txt'],
file_outputs={'downloaded': '/tmp/results.txt'}) \
.add_volume(k8s_client.V1Volume(name='gcp-credentials',
secret=k8s_client.V1SecretVolumeSource(
secret_name='user-gcp-sa'))) \
.add_volume_mount(k8s_client.V1VolumeMount(
mount_path='/secret/gcp-credentials', name='gcp-credentials')) \
.add_env_variable(k8s_client.V1EnvVar(
name='GOOGLE_APPLICATION_CREDENTIALS',
value='/secret/gcp-credentials/user-gcp-sa.json')) \
.add_env_variable(k8s_client.V1EnvVar(name='Foo', value='bar'))
op2 = dsl.ContainerOp(
name='echo',
image='library/bash',
command=['sh', '-c'],
arguments=['echo %s' % op1.output])
79 changes: 79 additions & 0 deletions sdk/python/tests/compiler/testdata/volume.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: download
spec:
results:
- description: /tmp/results.txt
name: downloaded
steps:
- args:
- ls | tee $(results.downloaded.path)
command:
- sh
- -c
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /secret/gcp-credentials/user-gcp-sa.json
- name: Foo
value: bar
image: google/cloud-sdk
name: download
volumeMounts:
- mountPath: /secret/gcp-credentials
name: gcp-credentials
volumes:
- name: gcp-credentials
secret:
secretName: user-gcp-sa
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: echo
spec:
params:
- name: download-downloaded
steps:
- args:
- echo $(inputs.params.download-downloaded)
command:
- sh
- -c
image: library/bash
name: echo
---
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "A pipeline with volume.",
"name": "Volume"}'
name: volume
spec:
params: []
tasks:
- name: download
params: []
taskRef:
name: download
- name: echo
params:
- name: download-downloaded
value: $(tasks.download.results.downloaded)
taskRef:
name: echo
60 changes: 0 additions & 60 deletions sdk/samples/parallel_join.py

This file was deleted.

58 changes: 0 additions & 58 deletions sdk/samples/sequential.py

This file was deleted.

0 comments on commit 8e944cf

Please sign in to comment.