Skip to content

Commit

Permalink
Support input artifact (#104)
Browse files Browse the repository at this point in the history
* support input artifacts

* fix conflicts

* fix merging bug
  • Loading branch information
Tomcli authored Apr 22, 2020
1 parent e8d973a commit 8830915
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 13 deletions.
45 changes: 34 additions & 11 deletions sdk/python/kfp_tekton/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import yaml
import re
import os
import copy

from .. import tekton_api_version

Expand Down Expand Up @@ -213,6 +214,17 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
steps.extend(template['spec']['steps'])
template['spec']['steps'] = steps

# initial base_step and volume setup
base_step = {
'image': 'busybox',
'name': 'copy-results',
'script': '#!/bin/sh\nset -exo pipefail\n'
}
volume_mount_step_template = []
volume_template = []
mounted_param_paths = []
replaced_param_list = []

# inputs
input_artifact_paths = processed_op.input_artifact_paths if isinstance(processed_op, dsl.ContainerOp) else None
artifact_arguments = processed_op.artifact_arguments if isinstance(processed_op, dsl.ContainerOp) else None
Expand All @@ -222,8 +234,27 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
template['spec']['params'] = inputs['parameters']
elif isinstance(op, dsl.ResourceOp):
template['spec']['params'].extend(inputs['parameters'])
elif 'artifacts' in inputs:
raise NotImplementedError("input artifacts are not yet implemented")
if 'artifacts' in inputs:
# The input artifacts in KFP is not pulling from s3, it will always be passed as a raw input.
# Visit https://github.com/kubeflow/pipelines/issues/336 for more details on the implementation.
copy_inputs_step = copy.deepcopy(base_step)
copy_inputs_step['name'] = 'copy-inputs'
for artifact in inputs['artifacts']:
if 'raw' in artifact:
copy_inputs_step['script'] += 'echo -n "%s" > %s\n' % (artifact['raw']['data'], artifact['path'])
mountPath = artifact['path'].rsplit("/", 1)[0]
if mountPath not in mounted_param_paths:
volume_mount_step_template.append({'name': artifact['name'], 'mountPath': artifact['path'].rsplit("/", 1)[0]})
volume_template.append({'name': artifact['name'], 'emptyDir': {}})
mounted_param_paths.append(mountPath)
copy_inputs_step['script'] = literal_str(copy_inputs_step['script'])
with_inputs_step = [copy_inputs_step]
with_inputs_step.extend(template['spec']['steps'])
template['spec']['steps'] = with_inputs_step
if volume_mount_step_template:
template['spec']['stepTemplate'] = {}
template['spec']['stepTemplate']['volumeMounts'] = volume_mount_step_template
template['spec']['volumes'] = volume_template

# outputs
if isinstance(op, dsl.ContainerOp):
Expand All @@ -234,10 +265,6 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
param_outputs = {}
outputs_dict = _outputs_to_json(op, op_outputs, param_outputs, output_artifacts)
if outputs_dict:
volume_mount_step_template = []
volume_template = []
mounted_param_paths = []
replaced_param_list = []
if outputs_dict.get('parameters'):
"""
Since Tekton results need to be under /tekton/results. If file output paths cannot be
Expand All @@ -254,11 +281,7 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
cp $LOCALPATH $(results.data.path);
"""
template['spec']['results'] = []
copy_results_step = {
'image': 'busybox',
'name': 'copy-results',
'script': '#!/bin/sh\nset -exo pipefail\n'
}
copy_results_step = copy.deepcopy(base_step)
for name, path in processed_op.file_outputs.items():
name = name.replace('_', '-') # replace '_' to '-' since tekton results doesn't support underscore
template['spec']['results'].append({
Expand Down
9 changes: 8 additions & 1 deletion sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,18 @@ def test_pipeline_transformers_workflow(self):

def test_artifact_location_workflow(self):
"""
Test compiling a artifact location workflow.
Test compiling an artifact location workflow.
"""
from .testdata.artifact_location import custom_artifact_location
self._test_pipeline_workflow(custom_artifact_location, 'artifact_location.yaml', enable_artifacts=True)

def test_input_artifact_raw_value_workflow(self):
"""
Test compiling an input artifact workflow.
"""
from .testdata.input_artifact_raw_value import input_artifact_pipeline
self._test_pipeline_workflow(input_artifact_pipeline, 'input_artifact_raw_value.yaml')

def test_katib_workflow(self):
"""
Test compiling a katib workflow.
Expand Down
70 changes: 70 additions & 0 deletions sdk/python/tests/compiler/testdata/input_artifact_raw_value.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import sys
from pathlib import Path

sys.path.insert(0, __file__ + '/../../../../')

import kfp
from kfp import dsl


def component_with_inline_input_artifact(text: str):
return dsl.ContainerOp(
name='component_with_inline_input_artifact',
image='alpine',
command=['cat', dsl.InputArgumentPath(text, path='/tmp/inputs/text/data', input='text')], # path and input are optional
)


def component_with_input_artifact(text):
'''A component that passes text as input artifact'''

return dsl.ContainerOp(
name='component_with_input_artifact',
artifact_argument_paths=[
dsl.InputArgumentPath(argument=text, path='/tmp/inputs/text/data', input='text'), # path and input are optional
],
image='alpine',
command=['cat', '/tmp/inputs/text/data'],
)

def component_with_hardcoded_input_artifact_value():
'''A component that passes hard-coded text as input artifact'''
return component_with_input_artifact('hard-coded artifact value')


def component_with_input_artifact_value_from_file(file_path):
'''A component that passes contents of a file as input artifact'''
return component_with_input_artifact(Path(file_path).read_text())


@dsl.pipeline(
name='Pipeline with artifact input raw argument value.',
description='Pipeline shows how to define artifact inputs and pass raw artifacts to them.'
)
def input_artifact_pipeline():
component_with_inline_input_artifact('Constant artifact value')
component_with_input_artifact('Constant artifact value')
component_with_hardcoded_input_artifact_value()

file_path = str(Path(__file__).parent.joinpath('input_artifact_raw_value.txt'))
component_with_input_artifact_value_from_file(file_path)

if __name__ == '__main__':
# don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(input_artifact_pipeline, __file__.replace('.py', '.yaml'))
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Text from a file with hard-coded artifact value
142 changes: 142 additions & 0 deletions sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: component-with-inline-input-artifact
spec:
stepTemplate:
volumeMounts:
- mountPath: /tmp/inputs/text
name: text
steps:
- image: busybox
name: copy-inputs
script: |
#!/bin/sh
set -exo pipefail
echo -n "Constant artifact value" > /tmp/inputs/text/data
- command:
- cat
- /tmp/inputs/text/data
image: alpine
name: component-with-inline-input-artifact
volumes:
- emptyDir: {}
name: text
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: component-with-input-artifact
spec:
stepTemplate:
volumeMounts:
- mountPath: /tmp/inputs/text
name: text
steps:
- image: busybox
name: copy-inputs
script: |
#!/bin/sh
set -exo pipefail
echo -n "Constant artifact value" > /tmp/inputs/text/data
- command:
- cat
- /tmp/inputs/text/data
image: alpine
name: component-with-input-artifact
volumes:
- emptyDir: {}
name: text
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: component-with-input-artifact-2
spec:
stepTemplate:
volumeMounts:
- mountPath: /tmp/inputs/text
name: text
steps:
- image: busybox
name: copy-inputs
script: |
#!/bin/sh
set -exo pipefail
echo -n "hard-coded artifact value" > /tmp/inputs/text/data
- command:
- cat
- /tmp/inputs/text/data
image: alpine
name: component-with-input-artifact-2
volumes:
- emptyDir: {}
name: text
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: component-with-input-artifact-3
spec:
stepTemplate:
volumeMounts:
- mountPath: /tmp/inputs/text
name: text
steps:
- image: busybox
name: copy-inputs
script: |
#!/bin/sh
set -exo pipefail
echo -n "Text from a file with hard-coded artifact value
" > /tmp/inputs/text/data
- command:
- cat
- /tmp/inputs/text/data
image: alpine
name: component-with-input-artifact-3
volumes:
- emptyDir: {}
name: text
---
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "Pipeline shows how to
define artifact inputs and pass raw artifacts to them.", "name": "Pipeline with
artifact input raw argument value."}'
name: pipeline-with-artifact-input-raw-argument-value
spec:
params: []
tasks:
- name: component-with-inline-input-artifact
params: []
taskRef:
name: component-with-inline-input-artifact
- name: component-with-input-artifact
params: []
taskRef:
name: component-with-input-artifact
- name: component-with-input-artifact-2
params: []
taskRef:
name: component-with-input-artifact-2
- name: component-with-input-artifact-3
params: []
taskRef:
name: component-with-input-artifact-3
2 changes: 1 addition & 1 deletion sdk/python/tests/test_kfp_samples_report.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ SUCCESS: basic_no_decorator.py
SUCCESS: coin.py
SUCCESS: compose.py
SUCCESS: default_value.py
FAILURE: input_artifact_raw_value.py
SUCCESS: input_artifact_raw_value.py
FAILURE: loop_over_lightweight_output.py
SUCCESS: param_op_transform.py
SUCCESS: param_substitutions.py
Expand Down

0 comments on commit 8830915

Please sign in to comment.