Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add volume, volumemount and env to container op #300

Merged
merged 19 commits into from
Nov 17, 2018
171 changes: 113 additions & 58 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
# limitations under the License.


from collections import defaultdict
import copy
import inspect
import re
import string
import tarfile
import tempfile
from collections import defaultdict

import yaml

from .. import dsl
Expand All @@ -42,7 +41,8 @@ def my_pipeline(a: dsl.PipelineParam, b: dsl.PipelineParam):
"""

def _sanitize_name(self, name):
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-') #from _make_kubernetes_name
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip(
'-').rstrip('-') # from _make_kubernetes_name

def _param_full_name(self, param):
if param.op_name:
Expand Down Expand Up @@ -80,7 +80,8 @@ def _op_to_template(self, op):
if op.argument_inputs:
for param in op.argument_inputs:
full_name = self._param_full_name(param)
processed_args[i] = re.sub(str(param), '{{inputs.parameters.%s}}' % full_name,
processed_args[i] = re.sub(str(param),
'{{inputs.parameters.%s}}' % full_name,
processed_args[i])
input_parameters = []
for param in op.inputs:
Expand All @@ -94,8 +95,8 @@ def _op_to_template(self, op):
output_parameters = []
for param in op.outputs.values():
output_parameters.append({
'name': self._param_full_name(param),
'valueFrom': {'path': op.file_outputs[param.name]}
'name': self._param_full_name(param),
'valueFrom': {'path': op.file_outputs[param.name]}
})
output_parameters.sort(key=lambda x: x['name'])

Expand All @@ -121,8 +122,10 @@ def _op_to_template(self, op):
# for the artifact output when default artifact repository is configured,
# this part needs to be updated to use the default artifact repository.
output_artifacts = []
output_artifacts.append(self._build_conventional_artifact('mlpipeline-ui-metadata'))
output_artifacts.append(self._build_conventional_artifact('mlpipeline-metrics'))
output_artifacts.append(
self._build_conventional_artifact('mlpipeline-ui-metadata'))
output_artifacts.append(
self._build_conventional_artifact('mlpipeline-metrics'))
template['outputs']['artifacts'] = output_artifacts
if op.command:
template['container']['command'] = op.command
Expand All @@ -140,23 +143,21 @@ def _op_to_template(self, op):
if op.memory_request or op.cpu_request:
template['container']['resources']['requests'] = {}
if op.memory_request:
template['container']['resources']['requests']['memory'] = op.memory_request
template['container']['resources']['requests'][
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor issue: It would be best if these lines are cut in the more natural places, for example after ['memory']?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. thanks

'memory'] = op.memory_request
if op.cpu_request:
template['container']['resources']['requests']['cpu'] = op.cpu_request

if op.gcp_secret:
template['container']['env'] = [
{
'name': 'GOOGLE_APPLICATION_CREDENTIALS',
'value': ('/secret/gcp-credentials/%s.json' % op.gcp_secret),
},
]
template['container']['volumeMounts'] = [
{
'name': op.name + '-gcp-credentials',
'mountPath': '/secret/gcp-credentials',
},
]
if op.env_variables:
env_variables = []
for e in op.env_variables:
env_variables.append(self._convert_k8s_obj_to_dic(e))
template['container']['env'] = env_variables
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be done in one line (didn't try it so syntax might be off):

template['container']['env'] = map(self._convert_k8s_obj_to_dic, op.env_variables)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for volume.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. it's much cleaner.

if op.volume_mounts:
volume_mounts = []
for vm in op.volume_mounts:
volume_mounts.append(self._convert_k8s_obj_to_dic(vm))
template['container']['volumeMounts'] = volume_mounts
return template

def _get_groups_for_ops(self, root_group):
Expand All @@ -170,6 +171,7 @@ def _get_groups_for_ops(self, root_group):
op itself. The list of a given operator is sorted in a way that the farthest
group is the first and operator itself is the last.
"""

def _get_op_groups_helper(current_groups, ops_to_groups):
root_group = current_groups[-1]
for g in root_group.groups:
Expand All @@ -194,15 +196,16 @@ def _get_groups_helper(group):
return groups

return _get_groups_helper(root_group)

def _get_uncommon_ancestors(self, op_groups, op1, op2):
"""Helper function to get unique ancestors between two ops.

For example, op1's ancestor groups are [root, G1, G2, G3, op1], op2's ancestor groups are
[root, G1, G4, op2], then it returns a tuple ([G2, G3, op1], [G4, op2]).
"""
both_groups = [op_groups[op1.name], op_groups[op2.name]]
common_groups_len = sum(1 for x in zip(*both_groups) if x==(x[0],)*len(x))
common_groups_len = sum(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please undo the auto-format changes so that the diff is smaller and changes are more clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. thanks

1 for x in zip(*both_groups) if x == (x[0],) * len(x))
group1 = op_groups[op1.name][common_groups_len:]
group2 = op_groups[op2.name][common_groups_len:]
return (group1, group2)
Expand Down Expand Up @@ -248,13 +251,13 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups):
outputs[g].add((full_name, None))
else:
# If not last upstream group, output value comes from one of its child.
outputs[g].add((full_name, upstream_groups[i+1]))
outputs[g].add((full_name, upstream_groups[i + 1]))
else:
if not op.is_exit_handler:
for g in op_groups[op.name]:
inputs[g].add((full_name, None))
return inputs, outputs

def _get_condition_params_for_ops(self, root_group):
"""Get parameters referenced in conditions of ops."""

Expand All @@ -276,7 +279,7 @@ def _get_condition_params_for_ops_helper(group, current_conditions_params):

_get_condition_params_for_ops_helper(root_group, [])
return conditions

def _get_dependencies(self, pipeline, root_group, op_groups):
"""Get dependent groups and ops for all ops and groups.

Expand Down Expand Up @@ -304,12 +307,14 @@ def _get_dependencies(self, pipeline, root_group, op_groups):
return dependencies

def _create_condition(self, condition):
left = ('{{inputs.parameters.%s}}' % self._param_full_name(condition.operand1)
if isinstance(condition.operand1, dsl.PipelineParam)
else str(condition.operand1))
right = ('{{inputs.parameters.%s}}' % self._param_full_name(condition.operand2)
if isinstance(condition.operand2, dsl.PipelineParam)
else str(condition.operand2))
left = (
'{{inputs.parameters.%s}}' % self._param_full_name(condition.operand1)
if isinstance(condition.operand1, dsl.PipelineParam)
else str(condition.operand1))
right = (
'{{inputs.parameters.%s}}' % self._param_full_name(condition.operand2)
if isinstance(condition.operand2, dsl.PipelineParam)
else str(condition.operand2))
return ('%s == %s' % (left, right))

def _group_to_template(self, group, inputs, outputs, dependencies):
Expand All @@ -334,7 +339,8 @@ def _group_to_template(self, group, inputs, outputs, dependencies):
template_outputs.append({
'name': param_name,
'valueFrom': {
'parameter': '{{tasks.%s.outputs.parameters.%s}}' % (depentent_name, param_name)
'parameter': '{{tasks.%s.outputs.parameters.%s}}' % (
depentent_name, param_name)
}
})
template_outputs.sort(key=lambda x: x['name'])
Expand All @@ -346,15 +352,15 @@ def _group_to_template(self, group, inputs, outputs, dependencies):
# operates in "step" mode where condition is supported.
only_child = group.groups[0]
step = {
'name': only_child.name,
'template': only_child.name,
'name': only_child.name,
'template': only_child.name,
}
if inputs.get(only_child.name, None):
arguments = []
for param_name, dependent_name in inputs[only_child.name]:
arguments.append({
'name': param_name,
'value': '{{inputs.parameters.%s}}' % param_name
'name': param_name,
'value': '{{inputs.parameters.%s}}' % param_name
})
arguments.sort(key=lambda x: x['name'])
step['arguments'] = {'parameters': arguments}
Expand Down Expand Up @@ -382,7 +388,8 @@ def _group_to_template(self, group, inputs, outputs, dependencies):
# The value comes from an upstream sibling.
arguments.append({
'name': param_name,
'value': '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name)
'value': '{{tasks.%s.outputs.parameters.%s}}' % (
dependent_name, param_name)
})
else:
# The value comes from its parent.
Expand All @@ -395,17 +402,18 @@ def _group_to_template(self, group, inputs, outputs, dependencies):
tasks.append(task)
tasks.sort(key=lambda x: x['name'])
template['dag'] = {'tasks': tasks}
return template
return template

def _create_new_groups(self, root_group):
"""Create a copy of the input group, and insert extra groups for conditions."""

new_group = copy.deepcopy(root_group)

def _insert_group_for_condition_helper(group):
for i, g in enumerate(group.groups):
if g.type == 'condition':
child_condition_group = dsl.OpsGroup('condition-child', g.name + '-child')
child_condition_group = dsl.OpsGroup('condition-child',
g.name + '-child')
child_condition_group.ops = g.ops
child_condition_group.groups = g.groups
g.groups = [child_condition_group]
Expand All @@ -416,7 +424,7 @@ def _insert_group_for_condition_helper(group):

_insert_group_for_condition_helper(new_group)
return new_group

def _create_templates(self, pipeline):
"""Create all groups and ops templates in the pipeline."""

Expand All @@ -425,13 +433,15 @@ def _create_templates(self, pipeline):
new_root_group = self._create_new_groups(pipeline.groups[0])

op_groups = self._get_groups_for_ops(new_root_group)
inputs, outputs = self._get_inputs_outputs(pipeline, new_root_group, op_groups)
inputs, outputs = self._get_inputs_outputs(pipeline, new_root_group,
op_groups)
dependencies = self._get_dependencies(pipeline, new_root_group, op_groups)
groups = self._get_groups(new_root_group)

templates = []
for g in groups:
templates.append(self._group_to_template(g, inputs, outputs, dependencies))
templates.append(
self._group_to_template(g, inputs, outputs, dependencies))

for op in pipeline.ops.values():
templates.append(self._op_to_template(op))
Expand All @@ -441,14 +451,9 @@ def _create_volumes(self, pipeline):
"""Create volumes required for the templates"""
volumes = []
for op in pipeline.ops.values():
if op.gcp_secret:
volume = {
'name': op.name + '-gcp-credentials',
'secret': {
'secretName': op.gcp_secret,
}
}
volumes.append(volume)
if op.volumes:
for v in op.volumes:
volumes.append(self._convert_k8s_obj_to_dic(v))
volumes.sort(key=lambda x: x['name'])
return volumes

Expand Down Expand Up @@ -505,7 +510,8 @@ def _validate_exit_handler(self, pipeline):
def _validate_exit_handler_helper(group, exiting_op_names, handler_exists):
if group.type == 'exit_handler':
if handler_exists or len(exiting_op_names) > 1:
raise ValueError('Only one global exit_handler is allowed and all ops need to be included.')
raise ValueError(
'Only one global exit_handler is allowed and all ops need to be included.')
handler_exists = True

if group.ops:
Expand Down Expand Up @@ -542,12 +548,61 @@ def _compile(self, pipeline_func):
args_list_with_defaults = [dsl.PipelineParam(self._sanitize_name(arg_name))
for arg_name in argspec.args]
if argspec.defaults:
for arg, default in zip(reversed(args_list_with_defaults), reversed(argspec.defaults)):
for arg, default in zip(reversed(args_list_with_defaults),
reversed(argspec.defaults)):
arg.value = default.value

workflow = self._create_pipeline_workflow(args_list_with_defaults, p)
return workflow

def _convert_k8s_obj_to_dic(self, obj):
"""
Builds a JSON K8s object.

If obj is None, return None.
If obj is str, int, long, float, bool, return directly.
If obj is datetime.datetime, datetime.date
convert to string in iso8601 format.
If obj is list, sanitize each element in the list.
If obj is dict, return the dict.
If obj is swagger model, return the properties dict.

:param obj: The data to serialize.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

":param" and ":return": are these new Python style docstring? In Python it is usually:

Args:
arg1: ...
arg2: ...
Returns:
...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

:return: The serialized form of data.
"""

from six import text_type, integer_types
PRIMITIVE_TYPES = (float, bool, bytes, text_type) + integer_types
from datetime import date, datetime
if obj is None:
return None
elif isinstance(obj, PRIMITIVE_TYPES):
return obj
elif isinstance(obj, list):
return [self._convert_k8s_obj_to_dic(sub_obj)
for sub_obj in obj]
elif isinstance(obj, tuple):
return tuple(self._convert_k8s_obj_to_dic(sub_obj)
for sub_obj in obj)
elif isinstance(obj, (datetime, date)):
return obj.isoformat()

if isinstance(obj, dict):
obj_dict = obj
else:
# Convert model obj to dict except
# attributes `swagger_types`, `attribute_map`
# and attributes which value is not None.
# Convert attribute name to json key in
# model definition for request.
from six import iteritems
obj_dict = {obj.attribute_map[attr]: getattr(obj, attr)
for attr, _ in iteritems(obj.swagger_types)
if getattr(obj, attr) is not None}

return {key: self._convert_k8s_obj_to_dic(val)
Ark-kun marked this conversation as resolved.
Show resolved Hide resolved
for key, val in iteritems(obj_dict)}

def compile(self, pipeline_func, package_path):
"""Compile the given pipeline function into workflow yaml.

Expand All @@ -556,7 +611,7 @@ def compile(self, pipeline_func, package_path):
package_path: the output workflow tar.gz file path. for example, "~/a.tar.gz"
"""
workflow = self._compile(pipeline_func)
yaml.Dumper.ignore_aliases = lambda *args : True
yaml.Dumper.ignore_aliases = lambda *args: True
yaml_text = yaml.dump(workflow, default_flow_style=False)

from contextlib import closing
Expand All @@ -565,4 +620,4 @@ def compile(self, pipeline_func, package_path):
with closing(BytesIO(yaml_text.encode())) as yaml_file:
tarinfo = tarfile.TarInfo('pipeline.yaml')
tarinfo.size = len(yaml_file.getvalue())
tar.addfile(tarinfo, fileobj=yaml_file)
tar.addfile(tarinfo, fileobj=yaml_file)
Loading