Skip to content

Commit

Permalink
feat(sdk): Add labels for artifact outputs when tasks are not referred (
Browse files Browse the repository at this point in the history
kubeflow#967)

* add labels for artifact outputs when tasks are not referred

* do not generate noninline test

* add option to also do individual artifactoutput

* add option to also do individual artifactoutput

* remove all output to artifact label

* fix sorting for test

* fix upstream conflicts
  • Loading branch information
Tomcli authored Jun 20, 2022
1 parent b899035 commit 52f06b6
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 7 deletions.
11 changes: 10 additions & 1 deletion sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

BIG_DATA_MIDPATH = "artifacts/$ORIG_PR_NAME"
BIG_DATA_PATH_FORMAT = "/".join(["$(workspaces.$TASK_NAME.path)", BIG_DATA_MIDPATH, "$TASKRUN_NAME", "$TASK_PARAM_NAME"])
ARTIFACT_OUTPUTLIST_ANNOTATION_KEY = 'artifact_outputs'


def fix_big_data_passing(workflow: dict, loops_pipeline: dict, loop_name_prefix: str) -> dict:
Expand Down Expand Up @@ -331,6 +332,7 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,
spec['results'] = [
output_parameter for output_parameter in spec.get('results', [])
]
spec['results'] = sorted(spec['results'], key=lambda k: k['name'])
# tekton results doesn't support underscore
renamed_results_in_pipeline_task = set()
for task_result in spec['results']:
Expand Down Expand Up @@ -488,8 +490,15 @@ def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict,
task_spec = task.get('taskSpec', {})
# Data passing for the task outputs
appended_taskrun_name = False
artifact_output_list = task_spec.get('metadata', {}).get('annotations', {}).get(ARTIFACT_OUTPUTLIST_ANNOTATION_KEY, '')
if artifact_output_list:
temp_list = json.loads(artifact_output_list)
artifact_output_list = []
for output in temp_list:
artifact_output_list.append(sanitize_k8s_name(output))
for task_output in task.get('taskSpec', {}).get('results', []):
if (task_name, task_output.get('name')) in outputs_tasks:
if (task_name, task_output.get('name')) in outputs_tasks or \
(artifact_output_list and task_output.get('name') in artifact_output_list):
if not task.get('taskSpec', {}).setdefault('workspaces', []):
task.get('taskSpec', {})['workspaces'].append({"name": task_name})
# Replace the args for the outputs in the task_spec
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ def test_recur_cond_workflow(self):
from .testdata.recur_cond import recur_and_condition
self._test_pipeline_workflow(recur_and_condition, 'recur_cond.yaml')

def test_artifact_outputs_workflow(self):
"""
Test compiling an artifact output workflow.
"""
from .testdata.artifact_outputs import artifact_outputs
self._test_pipeline_workflow(artifact_outputs, 'artifact_outputs.yaml', skip_noninlined=True)

def test_recur_cond_workflow(self):
"""
Test compiling a loop workflow using tekton loop dsl extension.
Expand Down
57 changes: 57 additions & 0 deletions sdk/python/tests/compiler/testdata/artifact_outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright 2022 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl, components
import json


def gcs_download_op(url: str):
return components.load_component_from_text("""
name: gcs-download
description: GCS - Download
inputs:
- {name: url, type: String}
outputs:
- {name: data, type: String}
- {name: data2, type: String}
implementation:
container:
image: google/cloud-sdk:279.0.0
command:
- sh
- -c
args:
- |
gsutil cat $0 | tee $1 | tee $2
- {inputValue: url}
- {outputPath: data}
- {outputPath: data2}
""")(url=url)


@dsl.pipeline(
name='artifact-out-pipeline',
description='Add labels to identify outputs as artifacts.'
)
def artifact_outputs(
url1: str = 'gs://ml-pipeline-playground/shakespeare1.txt'
):
"""Add labels to identify outputs as artifacts."""

download1_task = gcs_download_op(url1).add_pod_annotation(name='artifact_outputs', value=json.dumps(['data']))


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(artifact_outputs, __file__.replace('.py', '.yaml'))
114 changes: 114 additions & 0 deletions sdk/python/tests/compiler/testdata/artifact_outputs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: artifact-out-pipeline
annotations:
tekton.dev/output_artifacts: '{"gcs-download": [{"key": "artifacts/$PIPELINERUN/gcs-download/data.tgz",
"name": "gcs-download-data", "path": "/tmp/outputs/data/data"}, {"key": "artifacts/$PIPELINERUN/gcs-download/data2.tgz",
"name": "gcs-download-data2", "path": "/tmp/outputs/data2/data"}]}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"gcs-download": [["data", "$(workspaces.gcs-download.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/data"],
["data2", "$(results.data2.path)"]]}'
sidecar.istio.io/inject: "false"
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"description": "Add labels to identify
outputs as artifacts.", "inputs": [{"default": "gs://ml-pipeline-playground/shakespeare1.txt",
"name": "url1", "optional": true, "type": "String"}], "name": "artifact-out-pipeline"}'
spec:
params:
- name: url1
value: gs://ml-pipeline-playground/shakespeare1.txt
pipelineSpec:
params:
- name: url1
default: gs://ml-pipeline-playground/shakespeare1.txt
tasks:
- name: gcs-download
params:
- name: url1
value: $(params.url1)
taskSpec:
steps:
- name: main
args:
- |
gsutil cat $0 | tee $1 | tee $2
- $(inputs.params.url1)
- $(workspaces.gcs-download.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/data
- $(results.data2.path)
command:
- sh
- -c
image: google/cloud-sdk:279.0.0
env:
- name: ORIG_PR_NAME
valueFrom:
fieldRef:
fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun']
- image: busybox
name: output-taskrun-name
script: |
#!/bin/sh
set -exo pipefail
echo -n "$(context.taskRun.name)" > $(results.taskrun-name.path)
- image: busybox
name: copy-results-artifacts
script: |
#!/bin/sh
set -exo pipefail
TOTAL_SIZE=0
ARTIFACT_SIZE=`wc -c $(workspaces.gcs-download.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/data | awk '{print $1}'`
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
touch $(results.data.path)
if [[ $TOTAL_SIZE -lt 3072 ]]; then
cp $(workspaces.gcs-download.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/data $(results.data.path)
fi
onError: continue
env:
- name: ORIG_PR_NAME
valueFrom:
fieldRef:
fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun']
params:
- name: url1
results:
- name: data
type: string
description: /tmp/outputs/data/data
- name: data2
type: string
description: /tmp/outputs/data2/data
- name: taskrun-name
type: string
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
artifact_outputs: '["data"]'
pipelines.kubeflow.org/component_spec_digest: '{"name": "gcs-download",
"outputs": [{"name": "data", "type": "String"}, {"name": "data2", "type":
"String"}], "version": "gcs-download@sha256=e61aa54d889d66b15db1bd0a8959fda75d5aa799e468d8985610af4958612dfe"}'
tekton.dev/template: ''
workspaces:
- name: gcs-download
timeout: 525600m
timeout: 525600m
6 changes: 3 additions & 3 deletions sdk/python/tests/compiler/testdata/big_data_passing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,12 @@ spec:
fieldRef:
fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun']
results:
- name: odd-lines
type: string
description: /tmp/outputs/odd_lines/data
- name: even-lines
type: string
description: /tmp/outputs/even_lines/data
- name: odd-lines
type: string
description: /tmp/outputs/odd_lines/data
- name: taskrun-name
type: string
metadata:
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/tests/compiler/testdata/hidden_output_file.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ spec:
- name: data
type: string
description: /tmp/outputs/data/data
- name: underscore-test
type: string
description: /tmp/outputs/underscore_test/data
- name: multiple-underscore-test
type: string
description: /tmp/outputs/multiple_underscore_test/data
- name: underscore-test
type: string
description: /tmp/outputs/underscore_test/data
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
Expand Down

0 comments on commit 52f06b6

Please sign in to comment.