Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Relax k8s sanitization #2634

Merged
merged 8 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions samples/core/loop_static/loop_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@

@dsl.pipeline(name='my-pipeline')
def pipeline(my_pipe_param=10):
loop_args = [{'a': 1, 'b': 2}, {'a': 10, 'b': 20}]
loop_args = [{'A_a': 1, 'B_b': 2}, {'A_a': 10, 'B_b': 20}]
with dsl.ParallelFor(loop_args) as item:
op1 = dsl.ContainerOp(
name="my-in-coop1",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op1 %s %s" % (item.a, my_pipe_param)],
arguments=["echo op1 %s %s" % (item.A_a, my_pipe_param)],
)

op2 = dsl.ContainerOp(
name="my-in-coop2",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op2 %s" % item.b],
arguments=["echo op2 %s" % item.B_b],
)

op_out = dsl.ContainerOp(
Expand Down
2 changes: 1 addition & 1 deletion samples/core/volume_ops/volume_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
def volumeop_basic(size):
vop = dsl.VolumeOp(
name="create_pvc",
name="create-pvc",
resource_name="my-pvc",
modes=dsl.VOLUME_MODE_RWO,
size=size
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/kfp/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,9 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, para
if pipeline_package_path:
pipeline_obj = self._extract_pipeline_yaml(pipeline_package_path)
pipeline_json_string = json.dumps(pipeline_obj)
api_params = [kfp_server_api.ApiParameter(name=sanitize_k8s_name(k), value=str(v))
for k,v in params.items()]
api_params = [kfp_server_api.ApiParameter(
name=sanitize_k8s_name(name=k, allow_capital_underscore=True),
value=str(v)) for k,v in params.items()]
key = kfp_server_api.models.ApiResourceKey(id=experiment_id,
type=kfp_server_api.models.ApiResourceType.EXPERIMENT)
reference = kfp_server_api.models.ApiResourceReference(key, kfp_server_api.models.ApiRelationship.OWNER)
Expand Down
15 changes: 13 additions & 2 deletions sdk/python/kfp/compiler/_k8s_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,22 @@
from .. import dsl


def sanitize_k8s_name(name):
def sanitize_k8s_name(name, allow_capital_underscore=False):
"""From _make_kubernetes_name
sanitize_k8s_name cleans and converts the names in the workflow.

Args:
name: original name,
allow_capital_underscore: whether to allow capital letter and underscore
in this name.

Returns:
sanitized name.
"""
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')
if allow_capital_underscore:
return re.sub('-+', '-', re.sub('[^-_0-9A-Za-z]+', '-', name)).lstrip('-').rstrip('-')
else:
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')


def convert_k8s_obj_to_json(k8s_obj):
Expand Down
14 changes: 7 additions & 7 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies):
for argument_set in loop_tasks:
c_dict = {}
for k, v in argument_set.items():
c_dict[sanitize_k8s_name(k)] = v
c_dict[sanitize_k8s_name(k, True)] = v
sanitized_tasks.append(c_dict)
else:
sanitized_tasks = loop_tasks
Expand Down Expand Up @@ -698,23 +698,23 @@ def _sanitize_and_inject_artifact(self, pipeline: dsl.Pipeline, pipeline_conf=No
sanitized_name = sanitize_k8s_name(op.name)
op.name = sanitized_name
for param in op.outputs.values():
param.name = sanitize_k8s_name(param.name)
param.name = sanitize_k8s_name(param.name, True)
if param.op_name:
param.op_name = sanitize_k8s_name(param.op_name)
if op.output is not None and not isinstance(op.output, dsl._container_op._MultipleOutputsError):
op.output.name = sanitize_k8s_name(op.output.name)
op.output.name = sanitize_k8s_name(op.output.name, True)
op.output.op_name = sanitize_k8s_name(op.output.op_name)
if op.dependent_names:
op.dependent_names = [sanitize_k8s_name(name) for name in op.dependent_names]
if isinstance(op, dsl.ContainerOp) and op.file_outputs is not None:
sanitized_file_outputs = {}
for key in op.file_outputs.keys():
sanitized_file_outputs[sanitize_k8s_name(key)] = op.file_outputs[key]
sanitized_file_outputs[sanitize_k8s_name(key, True)] = op.file_outputs[key]
op.file_outputs = sanitized_file_outputs
elif isinstance(op, dsl.ResourceOp) and op.attribute_outputs is not None:
sanitized_attribute_outputs = {}
for key in op.attribute_outputs.keys():
sanitized_attribute_outputs[sanitize_k8s_name(key)] = \
sanitized_attribute_outputs[sanitize_k8s_name(key, True)] = \
op.attribute_outputs[key]
op.attribute_outputs = sanitized_attribute_outputs
sanitized_ops[sanitized_name] = op
Expand Down Expand Up @@ -757,7 +757,7 @@ def _create_workflow(self,
if arg_name == input.name:
arg_type = input.type
break
args_list.append(dsl.PipelineParam(sanitize_k8s_name(arg_name), param_type=arg_type))
args_list.append(dsl.PipelineParam(sanitize_k8s_name(arg_name, True), param_type=arg_type))

with dsl.Pipeline(pipeline_name) as dsl_pipeline:
pipeline_func(*args_list)
Expand All @@ -770,7 +770,7 @@ def _create_workflow(self,
# Fill in the default values.
args_list_with_defaults = []
if pipeline_meta.inputs:
args_list_with_defaults = [dsl.PipelineParam(sanitize_k8s_name(arg_name))
args_list_with_defaults = [dsl.PipelineParam(sanitize_k8s_name(arg_name, True))
for arg_name in argspec.args]
if argspec.defaults:
for arg, default in zip(reversed(args_list_with_defaults), reversed(argspec.defaults)):
Expand Down
21 changes: 16 additions & 5 deletions sdk/python/kfp/dsl/_pipeline_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,21 @@
PipelineParamTuple = namedtuple('PipelineParamTuple', 'name op pattern')


def sanitize_k8s_name(name):
"""From _make_kubernetes_name
sanitize_k8s_name cleans and converts the names in the workflow.
"""
def sanitize_k8s_name(name, allow_capital_underscore=False):
"""From _make_kubernetes_name
sanitize_k8s_name cleans and converts the names in the workflow.

Args:
name: original name,
allow_capital_underscore: whether to allow capital letter and underscore
in this name.

Returns:
sanitized name.
"""
if allow_capital_underscore:
return re.sub('-+', '-', re.sub('[^-_0-9A-Za-z]+', '-', name)).lstrip('-').rstrip('-')
else:
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')


Expand All @@ -42,7 +53,7 @@ def match_serialized_pipelineparam(payload: str):
for match in matches:
pattern = '{{pipelineparam:op=%s;name=%s}}' % (match[0], match[1])
param_tuples.append(PipelineParamTuple(
name=sanitize_k8s_name(match[1]),
name=sanitize_k8s_name(match[1], True),
op=sanitize_k8s_name(match[0]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is a proper place for sanitization. The names should probably be converted in the compiler:

param_tuples = dsl.match_serialized_pipelineparam(obj)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I was curious why there are two sanitize functions as well.

numerology marked this conversation as resolved.
Show resolved Hide resolved
pattern=pattern))
return param_tuples
Expand Down
12 changes: 6 additions & 6 deletions sdk/python/tests/compiler/testdata/pipelineparams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ spec:
parameters:
- name: tag
value: latest
- name: sleep-ms
- name: sleep_ms
value: '10'
templates:
- name: download
inputs:
parameters:
- name: sleep-ms
- name: sleep_ms
- name: tag
container:
image: busybox:{{inputs.parameters.tag}}
args:
- sleep {{inputs.parameters.sleep-ms}}; wget localhost:5678 -O /tmp/results.txt
- sleep {{inputs.parameters.sleep_ms}}; wget localhost:5678 -O /tmp/results.txt
command:
- sh
- "-c"
Expand Down Expand Up @@ -70,15 +70,15 @@ spec:
- name: pipelineparams
inputs:
parameters:
- name: sleep-ms
- name: sleep_ms
- name: tag
dag:
tasks:
- name: download
arguments:
parameters:
- name: sleep-ms
value: "{{inputs.parameters.sleep-ms}}"
- name: sleep_ms
value: "{{inputs.parameters.sleep_ms}}"
- name: tag
value: "{{inputs.parameters.tag}}"
template: download
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/compiler/testdata/recursive_while.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ metadata:
spec:
arguments:
parameters:
- name: maxval
- name: maxVal
value: '12'
entrypoint: pipeline-flip-coin
serviceAccountName: pipeline-runner
Expand Down
12 changes: 6 additions & 6 deletions sdk/python/tests/compiler/testdata/volume_snapshotop_rokurl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ metadata:
spec:
arguments:
parameters:
- name: rok-url
- name: rok_url
entrypoint: volumesnapshotop-rokurl
serviceAccountName: pipeline-runner
templates:
Expand Down Expand Up @@ -56,7 +56,7 @@ spec:
successCondition: status.readyToUse == true
- inputs:
parameters:
- name: rok-url
- name: rok_url
name: create-volume-1
outputs:
parameters:
Expand All @@ -72,7 +72,7 @@ spec:
resource:
action: create
manifest: "apiVersion: v1\nkind: PersistentVolumeClaim\nmetadata:\n annotations:\n\
\ rok/origin: '{{inputs.parameters.rok-url}}'\n name: '{{workflow.name}}-vol1'\n\
\ rok/origin: '{{inputs.parameters.rok_url}}'\n name: '{{workflow.name}}-vol1'\n\
spec:\n accessModes:\n - ReadWriteMany\n resources:\n requests:\n \
\ storage: 1Gi\n"
- inputs:
Expand Down Expand Up @@ -192,8 +192,8 @@ spec:
template: create-snapshot-2
- arguments:
parameters:
- name: rok-url
value: '{{inputs.parameters.rok-url}}'
- name: rok_url
value: '{{inputs.parameters.rok_url}}'
name: create-volume-1
template: create-volume-1
- arguments:
Expand Down Expand Up @@ -242,5 +242,5 @@ spec:
template: step3-output
inputs:
parameters:
- name: rok-url
- name: rok_url
name: volumesnapshotop-rokurl
26 changes: 13 additions & 13 deletions sdk/python/tests/compiler/testdata/withitem_basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ metadata:
spec:
arguments:
parameters:
- name: my-pipe-param
- name: my_pipe_param
value: '10'
entrypoint: my-pipeline
serviceAccountName: pipeline-runner
Expand All @@ -19,8 +19,8 @@ spec:
parameters:
- name: loop-item-param-00000001-subvar-a
value: '{{inputs.parameters.loop-item-param-00000001-subvar-a}}'
- name: my-pipe-param
value: '{{inputs.parameters.my-pipe-param}}'
- name: my_pipe_param
value: '{{inputs.parameters.my_pipe_param}}'
name: my-in-coop1
template: my-in-coop1
- arguments:
Expand All @@ -33,19 +33,19 @@ spec:
parameters:
- name: loop-item-param-00000001-subvar-a
- name: loop-item-param-00000001-subvar-b
- name: my-pipe-param
- name: my_pipe_param
name: for-loop-for-loop-00000001-1
- container:
args:
- echo op1 {{inputs.parameters.loop-item-param-00000001-subvar-a}} {{inputs.parameters.my-pipe-param}}
- echo op1 {{inputs.parameters.loop-item-param-00000001-subvar-a}} {{inputs.parameters.my_pipe_param}}
command:
- sh
- -c
image: library/bash:4.4.23
inputs:
parameters:
- name: loop-item-param-00000001-subvar-a
- name: my-pipe-param
- name: my_pipe_param
name: my-in-coop1
- container:
args:
Expand All @@ -60,14 +60,14 @@ spec:
name: my-in-coop2
- container:
args:
- echo {{inputs.parameters.my-pipe-param}}
- echo {{inputs.parameters.my_pipe_param}}
command:
- sh
- -c
image: library/bash:4.4.23
inputs:
parameters:
- name: my-pipe-param
- name: my_pipe_param
name: my-out-cop
- dag:
tasks:
Expand All @@ -77,8 +77,8 @@ spec:
value: '{{item.a}}'
- name: loop-item-param-00000001-subvar-b
value: '{{item.b}}'
- name: my-pipe-param
value: '{{inputs.parameters.my-pipe-param}}'
- name: my_pipe_param
value: '{{inputs.parameters.my_pipe_param}}'
name: for-loop-for-loop-00000001-1
template: for-loop-for-loop-00000001-1
withItems:
Expand All @@ -88,11 +88,11 @@ spec:
b: 20
- arguments:
parameters:
- name: my-pipe-param
value: '{{inputs.parameters.my-pipe-param}}'
- name: my_pipe_param
value: '{{inputs.parameters.my_pipe_param}}'
name: my-out-cop
template: my-out-cop
inputs:
parameters:
- name: my-pipe-param
- name: my_pipe_param
name: my-pipeline
Loading