Skip to content

Commit

Permalink
Update result name to be case sensitive (kubeflow#1013)
Browse files Browse the repository at this point in the history
* update result name to be case sensitive

* add custom task test case
  • Loading branch information
Tomcli authored Jul 26, 2022
1 parent e2aa560 commit eaac53d
Show file tree
Hide file tree
Showing 22 changed files with 410 additions and 170 deletions.
14 changes: 7 additions & 7 deletions sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,
renamed_results_in_pipeline_task = set()
for task_result in spec['results']:
task_result_old_name = task_result.get('name')
task_result_new_name = sanitize_k8s_name(task_result_old_name)
task_result_new_name = sanitize_k8s_name(task_result_old_name, allow_capital=True)
if task_result_new_name != task_result_old_name:
task_result['name'] = task_result_new_name
renamed_results_in_pipeline_task.add(
Expand Down Expand Up @@ -378,7 +378,7 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,
argument['value'] = '$(tasks.%s.%s.%s)' % (
argument_placeholder_parts[1],
argument_placeholder_parts[2],
sanitize_k8s_name(argument_placeholder_parts[3]))
sanitize_k8s_name(argument_placeholder_parts[3], allow_capital=True))

workflow = jsonify_annotations(workflow)
# Need to confirm:
Expand Down Expand Up @@ -456,7 +456,7 @@ def big_data_passing_pipeline(name: str, template: dict, inputs_tasks: set(),
if artifact_output_list:
tmp_list = set()
for output in json.loads(artifact_output_list):
tmp_list.add(sanitize_k8s_name(output))
tmp_list.add(sanitize_k8s_name(output, allow_capital=True))
for task_output in task.get('taskSpec', {}).get('results', []):
if task_output.get('name') in tmp_list:
if not task.setdefault('workspaces', []):
Expand Down Expand Up @@ -515,7 +515,7 @@ def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict,
temp_list = json.loads(artifact_output_list)
artifact_output_list = []
for output in temp_list:
artifact_output_list.append(sanitize_k8s_name(output))
artifact_output_list.append(sanitize_k8s_name(output, allow_capital=True))
for task_output in task.get('taskSpec', {}).get('results', []):
if (task_name, task_output.get('name')) in outputs_tasks or \
(artifact_output_list and task_output.get('name') in artifact_output_list):
Expand All @@ -525,7 +525,7 @@ def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict,
# $(results.task_output.get('name').path) -->
# $(workspaces.task_name.path)/task_name-task_output.get('name')
placeholder = '$(results.%s.path)' % (sanitize_k8s_name(
task_output.get('name')))
task_output.get('name'), allow_capital=True))
workspaces_parameter = '$(workspaces.%s.path)/%s/%s/%s' % (
task_name, BIG_DATA_MIDPATH, "$(context.taskRun.name)", task_output.get('name'))
if env.get('OUTPUT_BIG_DATA_PATH', 'false').lower() == 'true':
Expand Down Expand Up @@ -622,7 +622,7 @@ def append_taskrun_params(task_name_append: str, task_path_name: str):
if task_param_task_name:
workspaces_parameter = '$(workspaces.%s.path)/%s/$(params.%s-trname)/%s' % (
task_name, BIG_DATA_MIDPATH, task_param_task_name, task_param_param_name)
task_path = sanitize_k8s_name(task_param_param_name) + OUTPUT_RESULT_PATH_SUFFIX
task_path = sanitize_k8s_name(task_param_param_name, allow_capital=True) + OUTPUT_RESULT_PATH_SUFFIX
if env.get('OUTPUT_BIG_DATA_PATH', 'false').lower() == 'true':
workspaces_parameter = '$(workspaces.%s.path)/$(params.%s)' % (task_name, '-'.join([task_param_task_name, task_path]))
if task_param_task_name != task_name:
Expand Down Expand Up @@ -664,7 +664,7 @@ def append_taskrun_params(task_name_append: str, task_path_name: str):
for index, artifact_tuple in enumerate(artifact_i):
artifact_name, artifact = artifact_tuple
src = artifact
dst = '$(results.%s.path)' % sanitize_k8s_name(result['name'])
dst = '$(results.%s.path)' % sanitize_k8s_name(result['name'], allow_capital=True)
if artifact_name == result['name'] and src != dst:
add_copy_results_artifacts_step = True
total_size_command = 'ARTIFACT_SIZE=`wc -c %s${SUFFIX} | awk \'{print $1}\'`\n' % src + \
Expand Down
7 changes: 5 additions & 2 deletions sdk/python/kfp_tekton/compiler/_k8s_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def sanitize_k8s_name(name,
allow_slash=False,
max_length=57,
suffix_space=0,
rev_truncate=False):
rev_truncate=False,
allow_capital=False):
"""From _make_kubernetes_name
sanitize_k8s_name cleans and converts the names in the workflow.
Expand All @@ -46,7 +47,9 @@ def sanitize_k8s_name(name,
k8s_name = re.sub('[^-_./0-9A-Za-z]+', '-', name)

if not allow_capital_underscore:
k8s_name = re.sub('_', '-', k8s_name.lower())
k8s_name = re.sub('_', '-', k8s_name)
if not allow_capital:
k8s_name = k8s_name.lower()

if not allow_dot:
k8s_name = re.sub('[.]', '-', k8s_name)
Expand Down
18 changes: 9 additions & 9 deletions sdk/python/kfp_tekton/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ def _get_copy_result_step_template(step_number: int, result_maps: list):
"""
args = [""]
for key in result_maps[step_number].keys():
sanitize_key = sanitize_k8s_name(key)
sanitize_key = sanitize_k8s_name(key, allow_capital=True)
args[0] += "mv %s%s $(results.%s.path);\n" % (TEKTON_HOME_RESULT_PATH, sanitize_key, sanitize_key)
if step_number > 0:
for key in result_maps[step_number - 1].keys():
sanitize_key = sanitize_k8s_name(key)
sanitize_key = sanitize_k8s_name(key, allow_capital=True)
args[0] += "mv $(results.%s.path) %s%s;\n" % (sanitize_key, TEKTON_HOME_RESULT_PATH, sanitize_key)
return {
"name": "copy-results-%s" % str(step_number),
Expand Down Expand Up @@ -288,23 +288,23 @@ def _process_parameters(processed_op: BaseOp,
commands = []
for c in s['command']:
if path in c:
c = c.replace(path, '$(results.%s.path)' % sanitize_k8s_name(name))
c = c.replace(path, '$(results.%s.path)' % sanitize_k8s_name(name, allow_capital=True))
need_copy_step = False
commands.append(c)
s['command'] = commands
if 'args' in s:
args = []
for a in s['args']:
if path in a:
a = a.replace(path, '$(results.%s.path)' % sanitize_k8s_name(name))
a = a.replace(path, '$(results.%s.path)' % sanitize_k8s_name(name, allow_capital=True))
need_copy_step = False
args.append(a)
s['args'] = args
if path == '/tekton/results/' + sanitize_k8s_name(name):
if path == '/tekton/results/' + sanitize_k8s_name(name, allow_capital=True):
need_copy_step = False
# If file output path cannot be found/replaced, use emptyDir to copy it to the tekton/results path
if need_copy_step:
script = script + 'cp ' + path + ' $(results.%s.path);\n' % sanitize_k8s_name(name)
script = script + 'cp ' + path + ' $(results.%s.path);\n' % sanitize_k8s_name(name, allow_capital=True)
mount_path = path.rsplit("/", 1)[0]
if mount_path not in mounted_param_paths:
_add_mount_path(name, path, mount_path, volume_mount_step_template, volume_template, mounted_param_paths)
Expand Down Expand Up @@ -346,7 +346,7 @@ def _process_output_artifacts(outputs_dict: Dict[Text, Any],
parameter_name = sanitize_k8s_name(artifact['name'], allow_capital_underscore=True, max_length=float('Inf'))
artifact_name = artifact_to_result_mapping.get(parameter_name, parameter_name)
if parameter_name in replaced_param_list:
artifact_items.append([artifact_name, "$(results.%s.path)" % sanitize_k8s_name(artifact_name)])
artifact_items.append([artifact_name, "$(results.%s.path)" % sanitize_k8s_name(artifact_name, allow_capital=True)])
else:
artifact_items.append([artifact_name, artifact['path']])
mount_path = artifact['path'].rsplit("/", 1)[0]
Expand Down Expand Up @@ -633,7 +633,7 @@ def _op_to_template(op: BaseOp,
for key in result_size_map.keys():
# Replace main step results that are not in the first bin to the Tekton home path
if key not in verified_result_size_map[0].keys():
sanitize_key = sanitize_k8s_name(key)
sanitize_key = sanitize_k8s_name(key, allow_capital=True)
for i, a in enumerate(step['args']):
a = a.replace('$(results.%s.path)' % sanitize_key, '%s%s' % (TEKTON_HOME_RESULT_PATH, sanitize_key))
step['args'][i] = a
Expand All @@ -648,6 +648,6 @@ def _op_to_template(op: BaseOp,
# Update actifact item location to the latest stage in order to properly track and store all the artifacts.
for i, artifact in enumerate(artifact_items[op.name]):
if artifact[0] not in verified_result_size_map[step_counter].keys():
artifact[1] = '%s%s' % (TEKTON_HOME_RESULT_PATH, sanitize_k8s_name(artifact[0]))
artifact[1] = '%s%s' % (TEKTON_HOME_RESULT_PATH, sanitize_k8s_name(artifact[0], allow_capital=True))
artifact_items[op.name][i] = artifact
return template
32 changes: 18 additions & 14 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies, pipeline_
replace_str = param[1] + '-'
self.loops_pipeline[group_name]['spec']['params'].append({
'name': param[0], 'value': '$(tasks.%s.results.%s)' % (
param[1], sanitize_k8s_name(param[0].replace(replace_str, '', 1))
param[1], sanitize_k8s_name(param[0].replace(replace_str, '', 1), allow_capital=True)
)
})
if not param[1]:
Expand Down Expand Up @@ -370,7 +370,7 @@ def input_helper(custom_task, sub_group, param_list):
replace_str = param[1] + '-'
custom_task['spec']['params'].append({
'name': param[0], 'value': '$(tasks.%s.results.%s)' % (
param[1], sanitize_k8s_name(param[0].replace(replace_str, '', 1))
param[1], sanitize_k8s_name(param[0].replace(replace_str, '', 1), allow_capital=True)
)
})
if not param[1] and param[0] not in param_list:
Expand All @@ -385,7 +385,7 @@ def process_pipelineparam(s):
if pipe_param[0] == '':
s = s.replace("{{pipelineparam:op=%s;name=%s}}" % pipe_param, '$(params.%s)' % pipe_param[1])
else:
param_name = sanitize_k8s_name(pipe_param[1])
param_name = sanitize_k8s_name(pipe_param[1], allow_capital=True)
s = s.replace("{{pipelineparam:op=%s;name=%s}}" % pipe_param, '$(tasks.%s.results.%s)' % (
sanitize_k8s_name(pipe_param[0]),
param_name))
Expand All @@ -398,7 +398,7 @@ def process_pipelineparam(s):
if v.op_name is None:
v = '$(params.%s)' % v.name
else:
param_name = sanitize_k8s_name(v.name)
param_name = sanitize_k8s_name(v.name, allow_capital=True)
v = '$(tasks.%s.results.%s)' % (
sanitize_k8s_name(v.op_name),
param_name)
Expand Down Expand Up @@ -467,7 +467,7 @@ def process_pipelineparam(s):
if pipeline_param.op_name is None:
withparam_value = '$(params.%s)' % pipeline_param.name
else:
param_name = sanitize_k8s_name(pipeline_param.name)
param_name = sanitize_k8s_name(pipeline_param.name, allow_capital=True)
withparam_value = '$(tasks.%s.results.%s)' % (
sanitize_k8s_name(pipeline_param.op_name),
param_name)
Expand Down Expand Up @@ -495,7 +495,7 @@ def process_pipelineparam(s):
if v.op_name is None:
v = '$(params.%s)' % v.name
else:
param_name = sanitize_k8s_name(v.name)
param_name = sanitize_k8s_name(v.name, allow_capital=True)
v = '$(tasks.%s.results.%s)' % (
sanitize_k8s_name(v.op_name),
param_name)
Expand All @@ -522,7 +522,8 @@ def process_parameter(parameter):
parameter_value = str(parameter)
if isinstance(parameter, dsl.PipelineParam):
if parameter.op_name:
parameter_value = '$(tasks.' + parameter.op_name + '.results.' + sanitize_k8s_name(parameter.name) + ')'
parameter_value = '$(tasks.' + parameter.op_name + '.results.' + \
sanitize_k8s_name(parameter.name, allow_capital=True) + ')'
else:
parameter_value = '$(params.' + parameter.name + ')'
return parameter_value
Expand Down Expand Up @@ -956,7 +957,7 @@ def is_custom_task_output(operand) -> bool:
def map_cel_vars(a):
if a.get('type', '') == dsl.PipelineParam:
op_name = sanitize_k8s_name(a['op_name'])
output_name = sanitize_k8s_name(a['output_name'])
output_name = sanitize_k8s_name(a['output_name'], allow_capital=True)
return '$(tasks.%s.results.%s)' % (op_name, output_name)
else:
return a.get('value', '')
Expand Down Expand Up @@ -1083,13 +1084,15 @@ def map_cel_vars(a):
# Process input parameters if needed
if isinstance(condition.operand1, dsl.PipelineParam):
if condition.operand1.op_name:
operand_value = '$(tasks.' + condition.operand1.op_name + '.results.' + sanitize_k8s_name(condition.operand1.name) + ')'
operand_value = '$(tasks.' + condition.operand1.op_name + '.results.' + \
sanitize_k8s_name(condition.operand1.name, allow_capital=True) + ')'
else:
operand_value = '$(params.' + condition.operand1.name + ')'
input_params.append(operand_value)
if isinstance(condition.operand2, dsl.PipelineParam):
if condition.operand2.op_name:
operand_value = '$(tasks.' + condition.operand2.op_name + '.results.' + sanitize_k8s_name(condition.operand2.name) + ')'
operand_value = '$(tasks.' + condition.operand2.op_name + '.results.' + \
sanitize_k8s_name(condition.operand2.name, allow_capital=True) + ')'
else:
operand_value = '$(params.' + condition.operand2.name + ')'
input_params.append(operand_value)
Expand Down Expand Up @@ -1226,7 +1229,8 @@ def get_when_task(input_task_when, depended_conditions):
for pp in op.inputs:
if pipeline_param == pp.full_name:
# Parameters from Tekton results need to be sanitized
substitute_param = '$(tasks.%s.results.%s)' % (sanitize_k8s_name(pp.op_name), sanitize_k8s_name(pp.name))
substitute_param = '$(tasks.%s.results.%s)' % (sanitize_k8s_name(pp.op_name),
sanitize_k8s_name(pp.name, allow_capital=True))
tp['value'] = re.sub('\$\(inputs.params.%s\)' % pipeline_param, substitute_param, tp.get('value', ''))
break
# Not necessary for Tekton execution
Expand Down Expand Up @@ -1406,15 +1410,15 @@ def _sanitize_and_inject_artifact(self, pipeline: dsl.Pipeline, pipeline_conf=No
if len(param.op_name) > 128:
raise ValueError('Input parameter cannot be longer than 128 characters. \
\nInput name: %s. \nOp name: %s' % (param.op_name, op.name))
param.op_name = sanitize_k8s_name(param.op_name, max_length=float('inf'))
param.op_name = sanitize_k8s_name(param.op_name, max_length=float('inf'), allow_capital=True)
# sanitized output params
for param in op.outputs.values():
param.name = sanitize_k8s_name(param.name, True)
if param.op_name:
param.op_name = sanitize_k8s_name(param.op_name)
param.op_name = sanitize_k8s_name(param.op_name, allow_capital=True)
if op.output is not None and not isinstance(op.output, dsl._container_op._MultipleOutputsError):
op.output.name = sanitize_k8s_name(op.output.name, True)
op.output.op_name = sanitize_k8s_name(op.output.op_name)
op.output.op_name = sanitize_k8s_name(op.output.op_name, allow_capital=True)
if op.dependent_names:
op.dependent_names = [sanitize_k8s_name(name) for name in op.dependent_names]
if isinstance(op, dsl.ContainerOp) and op.file_outputs is not None:
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/kfp_tekton/tekton.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ def AnySequencer(any: Iterable[Union[dsl.ContainerOp, ConditionOperator]],

def processOperand(operand) -> (str, str):
if isinstance(operand, dsl.PipelineParam):
return "results_" + sanitize_k8s_name(operand.op_name) + "_" + sanitize_k8s_name(operand.name), operand.op_name
return "results_" + sanitize_k8s_name(operand.op_name) + "_" + \
sanitize_k8s_name(operand.name, allow_capital=True), operand.op_name
else:
# Do the same as in _get_super_condition_template to check whehter it's int
try:
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ def test_sequential_workflow(self):
from .testdata.sequential import sequential_pipeline
self._test_pipeline_workflow(sequential_pipeline, 'sequential.yaml', skip_noninlined=True)

def test_custom_task_output_workflow(self):
"""
Test compiling a custom task output workflow.
"""
from .testdata.custom_task_output import uppercase_vs_lowercase
self._test_pipeline_workflow(uppercase_vs_lowercase, 'custom_task_output.yaml', skip_noninlined=True)

def test_parallel_join_workflow(self):
"""
Test compiling a parallel join workflow.
Expand Down
10 changes: 5 additions & 5 deletions sdk/python/tests/compiler/testdata/any_sequencer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ metadata:
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"any-test": [["status", "$(results.status.path)"]],
"flip-coin": [["Output", "$(results.output.path)"]], "sleepcomponent": [], "sleepcomponent-2":
"flip-coin": [["Output", "$(results.Output.path)"]], "sleepcomponent": [], "sleepcomponent-2":
[], "sleepcomponent-3": [], "sleepcomponent-4": []}'
sidecar.istio.io/inject: "false"
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
Expand Down Expand Up @@ -98,7 +98,7 @@ spec:
- name: main
args:
- '----output-paths'
- $(results.output.path)
- $(results.Output.path)
command:
- sh
- -ec
Expand Down Expand Up @@ -145,7 +145,7 @@ spec:
f.write(_output_serializers[idx](_outputs[idx]))
image: python:alpine3.6
results:
- name: output
- name: Output
type: string
description: /tmp/outputs/Output/data
metadata:
Expand Down Expand Up @@ -195,7 +195,7 @@ spec:
- --statusPath
- $(results.status.path)
- -c
- results_flip-coin_output == 'heads'
- results_flip-coin_Output == 'heads'
command:
- any-task
image: dspipelines/any-sequencer:latest
Expand All @@ -211,7 +211,7 @@ spec:
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "any_test", "outputs":
[{"description": "The output file to create the status", "name": "status"}],
"version": "any_test@sha256=03b2b97be43cc77f2df70fd915a8fcd2e061341d7904af268cb4b2a4bbd86a76"}'
"version": "any_test@sha256=c4aaac826d09733534b96ec4a17c6646fa9d71e55d1e4bece1a5c0d20873e563"}'
tekton.dev/template: ''
params:
- name: pipelineRun-name
Expand Down
Loading

0 comments on commit eaac53d

Please sign in to comment.