-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add volume, volumemount and env to container op #300
Changes from 16 commits
2dd451f
d7d35bf
f33cb44
38b6cdc
ead9828
cfdaa36
9785fda
e48143c
2310f21
3053dcc
acc5eff
1beddd3
adb9a74
1035814
1bc9543
bc77782
15fcbac
2e5cd7a
6520f3d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,13 +13,12 @@ | |
# limitations under the License. | ||
|
||
|
||
from collections import defaultdict | ||
import copy | ||
import inspect | ||
import re | ||
import string | ||
import tarfile | ||
import tempfile | ||
from collections import defaultdict | ||
|
||
import yaml | ||
|
||
from .. import dsl | ||
|
@@ -42,7 +41,8 @@ def my_pipeline(a: dsl.PipelineParam, b: dsl.PipelineParam): | |
""" | ||
|
||
def _sanitize_name(self, name): | ||
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-') #from _make_kubernetes_name | ||
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip( | ||
'-').rstrip('-') # from _make_kubernetes_name | ||
|
||
def _param_full_name(self, param): | ||
if param.op_name: | ||
|
@@ -80,7 +80,8 @@ def _op_to_template(self, op): | |
if op.argument_inputs: | ||
for param in op.argument_inputs: | ||
full_name = self._param_full_name(param) | ||
processed_args[i] = re.sub(str(param), '{{inputs.parameters.%s}}' % full_name, | ||
processed_args[i] = re.sub(str(param), | ||
'{{inputs.parameters.%s}}' % full_name, | ||
processed_args[i]) | ||
input_parameters = [] | ||
for param in op.inputs: | ||
|
@@ -94,8 +95,8 @@ def _op_to_template(self, op): | |
output_parameters = [] | ||
for param in op.outputs.values(): | ||
output_parameters.append({ | ||
'name': self._param_full_name(param), | ||
'valueFrom': {'path': op.file_outputs[param.name]} | ||
'name': self._param_full_name(param), | ||
'valueFrom': {'path': op.file_outputs[param.name]} | ||
}) | ||
output_parameters.sort(key=lambda x: x['name']) | ||
|
||
|
@@ -121,8 +122,10 @@ def _op_to_template(self, op): | |
# for the artifact output when default artifact repository is configured, | ||
# this part needs to be updated to use the default artifact repository. | ||
output_artifacts = [] | ||
output_artifacts.append(self._build_conventional_artifact('mlpipeline-ui-metadata')) | ||
output_artifacts.append(self._build_conventional_artifact('mlpipeline-metrics')) | ||
output_artifacts.append( | ||
self._build_conventional_artifact('mlpipeline-ui-metadata')) | ||
output_artifacts.append( | ||
self._build_conventional_artifact('mlpipeline-metrics')) | ||
template['outputs']['artifacts'] = output_artifacts | ||
if op.command: | ||
template['container']['command'] = op.command | ||
|
@@ -140,23 +143,21 @@ def _op_to_template(self, op): | |
if op.memory_request or op.cpu_request: | ||
template['container']['resources']['requests'] = {} | ||
if op.memory_request: | ||
template['container']['resources']['requests']['memory'] = op.memory_request | ||
template['container']['resources']['requests'][ | ||
'memory'] = op.memory_request | ||
if op.cpu_request: | ||
template['container']['resources']['requests']['cpu'] = op.cpu_request | ||
|
||
if op.gcp_secret: | ||
template['container']['env'] = [ | ||
{ | ||
'name': 'GOOGLE_APPLICATION_CREDENTIALS', | ||
'value': ('/secret/gcp-credentials/%s.json' % op.gcp_secret), | ||
}, | ||
] | ||
template['container']['volumeMounts'] = [ | ||
{ | ||
'name': op.name + '-gcp-credentials', | ||
'mountPath': '/secret/gcp-credentials', | ||
}, | ||
] | ||
if op.env_variables: | ||
env_variables = [] | ||
for e in op.env_variables: | ||
env_variables.append(self._convert_k8s_obj_to_dic(e)) | ||
template['container']['env'] = env_variables | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can be done in one line (didn't try it so syntax might be off): template['container']['env'] = map(self._convert_k8s_obj_to_dic, op.env_variables) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same for volume. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. it's much cleaner. |
||
if op.volume_mounts: | ||
volume_mounts = [] | ||
for vm in op.volume_mounts: | ||
volume_mounts.append(self._convert_k8s_obj_to_dic(vm)) | ||
template['container']['volumeMounts'] = volume_mounts | ||
return template | ||
|
||
def _get_groups_for_ops(self, root_group): | ||
|
@@ -170,6 +171,7 @@ def _get_groups_for_ops(self, root_group): | |
op itself. The list of a given operator is sorted in a way that the farthest | ||
group is the first and operator itself is the last. | ||
""" | ||
|
||
def _get_op_groups_helper(current_groups, ops_to_groups): | ||
root_group = current_groups[-1] | ||
for g in root_group.groups: | ||
|
@@ -194,15 +196,16 @@ def _get_groups_helper(group): | |
return groups | ||
|
||
return _get_groups_helper(root_group) | ||
|
||
def _get_uncommon_ancestors(self, op_groups, op1, op2): | ||
"""Helper function to get unique ancestors between two ops. | ||
|
||
For example, op1's ancestor groups are [root, G1, G2, G3, op1], op2's ancestor groups are | ||
[root, G1, G4, op2], then it returns a tuple ([G2, G3, op1], [G4, op2]). | ||
""" | ||
both_groups = [op_groups[op1.name], op_groups[op2.name]] | ||
common_groups_len = sum(1 for x in zip(*both_groups) if x==(x[0],)*len(x)) | ||
common_groups_len = sum( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please undo the auto-format changes so that the diff is smaller and changes are more clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack. thanks |
||
1 for x in zip(*both_groups) if x == (x[0],) * len(x)) | ||
group1 = op_groups[op1.name][common_groups_len:] | ||
group2 = op_groups[op2.name][common_groups_len:] | ||
return (group1, group2) | ||
|
@@ -248,13 +251,13 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups): | |
outputs[g].add((full_name, None)) | ||
else: | ||
# If not last upstream group, output value comes from one of its child. | ||
outputs[g].add((full_name, upstream_groups[i+1])) | ||
outputs[g].add((full_name, upstream_groups[i + 1])) | ||
else: | ||
if not op.is_exit_handler: | ||
for g in op_groups[op.name]: | ||
inputs[g].add((full_name, None)) | ||
return inputs, outputs | ||
|
||
def _get_condition_params_for_ops(self, root_group): | ||
"""Get parameters referenced in conditions of ops.""" | ||
|
||
|
@@ -276,7 +279,7 @@ def _get_condition_params_for_ops_helper(group, current_conditions_params): | |
|
||
_get_condition_params_for_ops_helper(root_group, []) | ||
return conditions | ||
|
||
def _get_dependencies(self, pipeline, root_group, op_groups): | ||
"""Get dependent groups and ops for all ops and groups. | ||
|
||
|
@@ -304,12 +307,14 @@ def _get_dependencies(self, pipeline, root_group, op_groups): | |
return dependencies | ||
|
||
def _create_condition(self, condition): | ||
left = ('{{inputs.parameters.%s}}' % self._param_full_name(condition.operand1) | ||
if isinstance(condition.operand1, dsl.PipelineParam) | ||
else str(condition.operand1)) | ||
right = ('{{inputs.parameters.%s}}' % self._param_full_name(condition.operand2) | ||
if isinstance(condition.operand2, dsl.PipelineParam) | ||
else str(condition.operand2)) | ||
left = ( | ||
'{{inputs.parameters.%s}}' % self._param_full_name(condition.operand1) | ||
if isinstance(condition.operand1, dsl.PipelineParam) | ||
else str(condition.operand1)) | ||
right = ( | ||
'{{inputs.parameters.%s}}' % self._param_full_name(condition.operand2) | ||
if isinstance(condition.operand2, dsl.PipelineParam) | ||
else str(condition.operand2)) | ||
return ('%s == %s' % (left, right)) | ||
|
||
def _group_to_template(self, group, inputs, outputs, dependencies): | ||
|
@@ -334,7 +339,8 @@ def _group_to_template(self, group, inputs, outputs, dependencies): | |
template_outputs.append({ | ||
'name': param_name, | ||
'valueFrom': { | ||
'parameter': '{{tasks.%s.outputs.parameters.%s}}' % (depentent_name, param_name) | ||
'parameter': '{{tasks.%s.outputs.parameters.%s}}' % ( | ||
depentent_name, param_name) | ||
} | ||
}) | ||
template_outputs.sort(key=lambda x: x['name']) | ||
|
@@ -346,15 +352,15 @@ def _group_to_template(self, group, inputs, outputs, dependencies): | |
# operates in "step" mode where condition is supported. | ||
only_child = group.groups[0] | ||
step = { | ||
'name': only_child.name, | ||
'template': only_child.name, | ||
'name': only_child.name, | ||
'template': only_child.name, | ||
} | ||
if inputs.get(only_child.name, None): | ||
arguments = [] | ||
for param_name, dependent_name in inputs[only_child.name]: | ||
arguments.append({ | ||
'name': param_name, | ||
'value': '{{inputs.parameters.%s}}' % param_name | ||
'name': param_name, | ||
'value': '{{inputs.parameters.%s}}' % param_name | ||
}) | ||
arguments.sort(key=lambda x: x['name']) | ||
step['arguments'] = {'parameters': arguments} | ||
|
@@ -382,7 +388,8 @@ def _group_to_template(self, group, inputs, outputs, dependencies): | |
# The value comes from an upstream sibling. | ||
arguments.append({ | ||
'name': param_name, | ||
'value': '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name) | ||
'value': '{{tasks.%s.outputs.parameters.%s}}' % ( | ||
dependent_name, param_name) | ||
}) | ||
else: | ||
# The value comes from its parent. | ||
|
@@ -395,17 +402,18 @@ def _group_to_template(self, group, inputs, outputs, dependencies): | |
tasks.append(task) | ||
tasks.sort(key=lambda x: x['name']) | ||
template['dag'] = {'tasks': tasks} | ||
return template | ||
return template | ||
|
||
def _create_new_groups(self, root_group): | ||
"""Create a copy of the input group, and insert extra groups for conditions.""" | ||
|
||
new_group = copy.deepcopy(root_group) | ||
|
||
def _insert_group_for_condition_helper(group): | ||
for i, g in enumerate(group.groups): | ||
if g.type == 'condition': | ||
child_condition_group = dsl.OpsGroup('condition-child', g.name + '-child') | ||
child_condition_group = dsl.OpsGroup('condition-child', | ||
g.name + '-child') | ||
child_condition_group.ops = g.ops | ||
child_condition_group.groups = g.groups | ||
g.groups = [child_condition_group] | ||
|
@@ -416,7 +424,7 @@ def _insert_group_for_condition_helper(group): | |
|
||
_insert_group_for_condition_helper(new_group) | ||
return new_group | ||
|
||
def _create_templates(self, pipeline): | ||
"""Create all groups and ops templates in the pipeline.""" | ||
|
||
|
@@ -425,13 +433,15 @@ def _create_templates(self, pipeline): | |
new_root_group = self._create_new_groups(pipeline.groups[0]) | ||
|
||
op_groups = self._get_groups_for_ops(new_root_group) | ||
inputs, outputs = self._get_inputs_outputs(pipeline, new_root_group, op_groups) | ||
inputs, outputs = self._get_inputs_outputs(pipeline, new_root_group, | ||
op_groups) | ||
dependencies = self._get_dependencies(pipeline, new_root_group, op_groups) | ||
groups = self._get_groups(new_root_group) | ||
|
||
templates = [] | ||
for g in groups: | ||
templates.append(self._group_to_template(g, inputs, outputs, dependencies)) | ||
templates.append( | ||
self._group_to_template(g, inputs, outputs, dependencies)) | ||
|
||
for op in pipeline.ops.values(): | ||
templates.append(self._op_to_template(op)) | ||
|
@@ -441,14 +451,9 @@ def _create_volumes(self, pipeline): | |
"""Create volumes required for the templates""" | ||
volumes = [] | ||
for op in pipeline.ops.values(): | ||
if op.gcp_secret: | ||
volume = { | ||
'name': op.name + '-gcp-credentials', | ||
'secret': { | ||
'secretName': op.gcp_secret, | ||
} | ||
} | ||
volumes.append(volume) | ||
if op.volumes: | ||
for v in op.volumes: | ||
volumes.append(self._convert_k8s_obj_to_dic(v)) | ||
volumes.sort(key=lambda x: x['name']) | ||
return volumes | ||
|
||
|
@@ -505,7 +510,8 @@ def _validate_exit_handler(self, pipeline): | |
def _validate_exit_handler_helper(group, exiting_op_names, handler_exists): | ||
if group.type == 'exit_handler': | ||
if handler_exists or len(exiting_op_names) > 1: | ||
raise ValueError('Only one global exit_handler is allowed and all ops need to be included.') | ||
raise ValueError( | ||
'Only one global exit_handler is allowed and all ops need to be included.') | ||
handler_exists = True | ||
|
||
if group.ops: | ||
|
@@ -542,12 +548,61 @@ def _compile(self, pipeline_func): | |
args_list_with_defaults = [dsl.PipelineParam(self._sanitize_name(arg_name)) | ||
for arg_name in argspec.args] | ||
if argspec.defaults: | ||
for arg, default in zip(reversed(args_list_with_defaults), reversed(argspec.defaults)): | ||
for arg, default in zip(reversed(args_list_with_defaults), | ||
reversed(argspec.defaults)): | ||
arg.value = default.value | ||
|
||
workflow = self._create_pipeline_workflow(args_list_with_defaults, p) | ||
return workflow | ||
|
||
def _convert_k8s_obj_to_dic(self, obj): | ||
""" | ||
Builds a JSON K8s object. | ||
|
||
If obj is None, return None. | ||
If obj is str, int, long, float, bool, return directly. | ||
If obj is datetime.datetime, datetime.date | ||
convert to string in iso8601 format. | ||
If obj is list, sanitize each element in the list. | ||
If obj is dict, return the dict. | ||
If obj is swagger model, return the properties dict. | ||
|
||
:param obj: The data to serialize. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ":param" and ":return": are these new Python style docstring? In Python it is usually: Args: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
:return: The serialized form of data. | ||
""" | ||
|
||
from six import text_type, integer_types | ||
PRIMITIVE_TYPES = (float, bool, bytes, text_type) + integer_types | ||
from datetime import date, datetime | ||
if obj is None: | ||
return None | ||
elif isinstance(obj, PRIMITIVE_TYPES): | ||
return obj | ||
elif isinstance(obj, list): | ||
return [self._convert_k8s_obj_to_dic(sub_obj) | ||
for sub_obj in obj] | ||
elif isinstance(obj, tuple): | ||
return tuple(self._convert_k8s_obj_to_dic(sub_obj) | ||
for sub_obj in obj) | ||
elif isinstance(obj, (datetime, date)): | ||
return obj.isoformat() | ||
|
||
if isinstance(obj, dict): | ||
obj_dict = obj | ||
else: | ||
# Convert model obj to dict except | ||
# attributes `swagger_types`, `attribute_map` | ||
# and attributes which value is not None. | ||
# Convert attribute name to json key in | ||
# model definition for request. | ||
from six import iteritems | ||
obj_dict = {obj.attribute_map[attr]: getattr(obj, attr) | ||
for attr, _ in iteritems(obj.swagger_types) | ||
if getattr(obj, attr) is not None} | ||
|
||
return {key: self._convert_k8s_obj_to_dic(val) | ||
Ark-kun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for key, val in iteritems(obj_dict)} | ||
|
||
def compile(self, pipeline_func, package_path): | ||
"""Compile the given pipeline function into workflow yaml. | ||
|
||
|
@@ -556,7 +611,7 @@ def compile(self, pipeline_func, package_path): | |
package_path: the output workflow tar.gz file path. for example, "~/a.tar.gz" | ||
""" | ||
workflow = self._compile(pipeline_func) | ||
yaml.Dumper.ignore_aliases = lambda *args : True | ||
yaml.Dumper.ignore_aliases = lambda *args: True | ||
yaml_text = yaml.dump(workflow, default_flow_style=False) | ||
|
||
from contextlib import closing | ||
|
@@ -565,4 +620,4 @@ def compile(self, pipeline_func, package_path): | |
with closing(BytesIO(yaml_text.encode())) as yaml_file: | ||
tarinfo = tarfile.TarInfo('pipeline.yaml') | ||
tarinfo.size = len(yaml_file.getvalue()) | ||
tar.addfile(tarinfo, fileobj=yaml_file) | ||
tar.addfile(tarinfo, fileobj=yaml_file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor issue: It would be best if these lines are cut in the more natural places, for example after ['memory']?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack. thanks