From 383a4f5a38857066470e898a9c6f69b0c672ae3c Mon Sep 17 00:00:00 2001 From: eterna2 Date: Fri, 8 Mar 2019 20:45:38 +0800 Subject: [PATCH] ContainerOp now can be updated with any k8s V1Container attributes as well as sidecars with Sidecar class. ContainerOp accepts PipelineParam in any valid k8 properties. --- sdk/python/kfp/compiler/_k8s_helper.py | 9 +- sdk/python/kfp/compiler/_op_to_template.py | 277 +++++ sdk/python/kfp/compiler/compiler.py | 171 +-- sdk/python/kfp/dsl/__init__.py | 2 +- sdk/python/kfp/dsl/_container_op.py | 1105 ++++++++++++----- sdk/python/kfp/dsl/_pipeline_param.py | 69 +- sdk/python/tests/compiler/compiler_tests.py | 5 +- .../with_sidecars_and_pipelineparams.py | 111 ++ .../with_sidecars_and_pipelineparams.yaml | 227 ++++ sdk/python/tests/dsl/container_op_tests.py | 9 +- sdk/python/tests/dsl/pipeline_param_tests.py | 25 +- 11 files changed, 1514 insertions(+), 496 deletions(-) create mode 100644 sdk/python/kfp/compiler/_op_to_template.py create mode 100644 sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.py create mode 100644 sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.yaml diff --git a/sdk/python/kfp/compiler/_k8s_helper.py b/sdk/python/kfp/compiler/_k8s_helper.py index bc54c307b0c..14ffe8024a9 100644 --- a/sdk/python/kfp/compiler/_k8s_helper.py +++ b/sdk/python/kfp/compiler/_k8s_helper.py @@ -19,6 +19,9 @@ import logging import re +from .. import dsl + + class K8sHelper(object): """ Kubernetes Helper """ @@ -159,7 +162,11 @@ def convert_k8s_obj_to_json(k8s_obj): for sub_obj in obj) elif isinstance(k8s_obj, (datetime, date)): return k8s_obj.isoformat() - + elif isinstance(k8s_obj, dsl.PipelineParam): + if isinstance(k8s_obj.value, str): + return k8s_obj.value + return '{{inputs.parameters.%s}}' % k8s_obj.full_name + if isinstance(k8s_obj, dict): obj_dict = k8s_obj else: diff --git a/sdk/python/kfp/compiler/_op_to_template.py b/sdk/python/kfp/compiler/_op_to_template.py new file mode 100644 index 00000000000..dfbb002c7da --- /dev/null +++ b/sdk/python/kfp/compiler/_op_to_template.py @@ -0,0 +1,277 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re +from typing import Union, List, Any, Callable, TypeVar, Dict + +from ._k8s_helper import K8sHelper +from .. import dsl + +# generics +T = TypeVar('T') + + +def _get_pipelineparam(payload: str) -> List[str]: + """Get a list of `PipelineParam` from a string. + + Args: + payload {str}: string + """ + + matches = re.findall( + r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', + payload) + return [ + dsl.PipelineParam(x[1], x[0], x[2]) for x in list(set(matches)) + ] + + +def _sanitize_pipelineparam(param: dsl.PipelineParam, in_place=True): + """Sanitize the name and op_name of a PipelineParam. + + Args: + params: a PipelineParam to sanitize + in_place: if set, do an in-place update to PipelineParm, otherwise return a + new instance of PipelineParam. + """ + if in_place: + param.name = K8sHelper.sanitize_k8s_name(param.name) + param.op_name = K8sHelper.sanitize_k8s_name( + param.op_name) if param.op_name else param.op_name + return param + + return dsl.PipelineParam( + K8sHelper.sanitize_k8s_name(param.name), + K8sHelper.sanitize_k8s_name(param.op_name), param.value) + + +def _sanitize_pipelineparams( + params: Union[dsl.PipelineParam, List[dsl.PipelineParam]], + in_place=True): + """Sanitize the name(s) of a PipelineParam (or a list of PipelineParam) and + return a list of sanitized PipelineParam. + + Args: + params: a PipelineParam or a list of PipelineParam to sanitize + in_place: if set, do an in-place update to the PipelineParm, otherwise return + new instances of PipelineParam. + """ + params = params if isinstance(params, list) else [params] + return [_sanitize_pipelineparam(param, in_place) for param in params] + + +def _process_obj(obj: Any, map_to_tmpl_var: dict): + """Recursively sanitize and replace any PipelineParam (instances and serialized strings) + in the object with the corresponding template variables + (i.e. '{{inputs.parameters.}}'). + + Args: + obj: any obj that may have PipelineParam + map_to_tmpl_var: a dict that maps an unsanitized pipeline + params signature into a template var + """ + # serialized str might be unsanitized + if isinstance(obj, str): + # get signature + pipeline_params = _get_pipelineparam(obj) + if not pipeline_params: + return obj + # replace all unsanitized signature with template var + for param in pipeline_params: + pattern = str(param) + sanitized = str(_sanitize_pipelineparam(param)) + obj = re.sub(pattern, map_to_tmpl_var[sanitized], obj) + + # list + if isinstance(obj, list): + return [_process_obj(item, map_to_tmpl_var) for item in obj] + + # tuple + if isinstance(obj, tuple): + return tuple((_process_obj(item, map_to_tmpl_var) for item in obj)) + + # dict + if isinstance(obj, dict): + return { + key: _process_obj(value, map_to_tmpl_var) + for key, value in obj.items() + } + + # pipelineparam + if isinstance(obj, dsl.PipelineParam): + # if not found in unsanitized map, then likely to be sanitized + return map_to_tmpl_var.get( + str(obj), '{{inputs.parameters.%s}}' % obj.full_name) + + # k8s_obj + if hasattr(obj, 'swagger_types') and isinstance(obj.swagger_types, dict): + # process everything inside recursively + for key in obj.swagger_types.keys(): + setattr(obj, key, _process_obj(getattr(obj, key), map_to_tmpl_var)) + # return json representation of the k8s obj + return K8sHelper.convert_k8s_obj_to_json(obj) + + # do nothing + return obj + + +def _process_container_ops(op: dsl.ContainerOp): + """Recursively go through the attrs listed in `attrs_with_pipelineparams` + and sanitize and replace pipeline params with template var string. + + Returns a processed `ContainerOp`. + + NOTE this is an in-place update to `ContainerOp`'s attributes (i.e. other than + `file_outputs`, and `outputs`, all `PipelineParam` are replaced with the + corresponding template variable strings). + + Args: + op {dsl.ContainerOp}: class that inherits from ds.ContainerOp + + Returns: + dsl.ContainerOp + """ + + # tmp map: unsanitized rpr -> sanitized PipelineParam + # in-place sanitize of all PipelineParam (except outputs and file_outputs) + _map = { + str(param): _sanitize_pipelineparam(param, in_place=True) + for param in op.inputs + } + + # map: unsanitized pipeline param rpr -> template var string + # used to replace unsanitized pipeline param strings with the corresponding + # template var strings + map_to_tmpl_var = { + key: '{{inputs.parameters.%s}}' % param.full_name + for key, param in _map.items() + } + + # process all attr with pipelineParams except inputs and outputs parameters + for key in op.attrs_with_pipelineparams: + setattr(op, key, _process_obj(getattr(op, key), map_to_tmpl_var)) + + return op + + +def _parameters_to_json(params: List[dsl.PipelineParam]): + """Converts a list of PipelineParam into an argo `parameter` JSON obj.""" + _to_json = (lambda param: dict(name=param.full_name, value=param.value) + if param.value else dict(name=param.full_name)) + params = [_to_json(param) for param in params] + # Sort to make the results deterministic. + params.sort(key=lambda x: x['name']) + return params + + +# TODO: artifacts? +def _inputs_to_json(inputs_params: List[dsl.PipelineParam], _artifacts=None): + """Converts a list of PipelineParam into an argo `inputs` JSON obj.""" + parameters = _parameters_to_json(inputs_params) + return {'parameters': parameters } if parameters else None + + +def _outputs_to_json(outputs: Dict[str, dsl.PipelineParam], + file_outputs: Dict[str, str], + output_artifacts: List[dict]): + """Creates an argo `outputs` JSON obj.""" + output_parameters = [] + for param in outputs.values(): + output_parameters.append({ + 'name': param.full_name, + 'valueFrom': { + 'path': file_outputs[param.name] + } + }) + output_parameters.sort(key=lambda x: x['name']) + ret = {} + if output_parameters: + ret['parameters'] = output_parameters + if output_artifacts: + ret['artifacts'] = output_artifacts + + return ret + + +def _build_conventional_artifact(name): + return { + 'name': name, + 'path': '/' + name + '.json', + 's3': { + # TODO: parameterize namespace for minio service + 'endpoint': 'minio-service.kubeflow:9000', + 'bucket': 'mlpipeline', + 'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz', + 'insecure': True, + 'accessKeySecret': { + 'name': 'mlpipeline-minio-artifact', + 'key': 'accesskey', + }, + 'secretKeySecret': { + 'name': 'mlpipeline-minio-artifact', + 'key': 'secretkey' + } + }, + } + +# TODO: generate argo python classes from swagger and use convert_k8s_obj_to_json?? +def _op_to_template(op: dsl.ContainerOp): + """Generate template given an operator inherited from dsl.ContainerOp.""" + + # NOTE in-place update to ContainerOp + # replace all PipelineParams (except in `file_outputs`, `outputs`, `inputs`) + # with template var strings + processed_op = _process_container_ops(op) + + # default output artifacts + output_artifacts = [ + _build_conventional_artifact(name) + for name in ['mlpipeline-ui-metadata', 'mlpipeline-metrics'] + ] + + # workflow template + template = { + 'name': op.name, + 'container': K8sHelper.convert_k8s_obj_to_json(op.container) + } + + # inputs + inputs = _inputs_to_json(op.inputs) + if inputs: + template['inputs'] = inputs + + # outputs + template['outputs'] = _outputs_to_json(op.outputs, op.file_outputs, + output_artifacts) + + # node selector + if processed_op.node_selector: + template['nodeSelector'] = processed_op.node_selector + + # metadata + if processed_op.pod_annotations or processed_op.pod_labels: + template['metadata'] = {} + if processed_op.pod_annotations: + template['metadata']['annotations'] = processed_op.pod_annotations + if processed_op.pod_labels: + template['metadata']['labels'] = processed_op.pod_labels + # retries + if processed_op.num_retries: + template['retryStrategy'] = {'limit': processed_op.num_retries} + + # sidecars + if processed_op.sidecars: + template['sidecars'] = processed_op.sidecars + + return template diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 7647f14bf81..600cb433bce 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -21,6 +21,8 @@ from .. import dsl from ._k8s_helper import K8sHelper +from ._op_to_template import _op_to_template + class Compiler(object): """DSL Compiler. @@ -117,7 +119,6 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups): # it as input for its parent groups. if param.value: continue - full_name = self._pipelineparam_full_name(param) if param.op_name: upstream_op = pipeline.ops[param.op_name] @@ -211,158 +212,8 @@ def _resolve_value_or_reference(self, value_or_reference, potential_references): else: return str(value_or_reference) - def _process_args(self, raw_args, argument_inputs): - if not raw_args: - return [] - processed_args = list(map(str, raw_args)) - for i, _ in enumerate(processed_args): - # unsanitized_argument_inputs stores a dict: string of sanitized param -> string of unsanitized param - matches = [] - match = re.findall(r'{{pipelineparam:op=([\w\s\_-]*);name=([\w\s\_-]+);value=(.*?)}}', str(processed_args[i])) - matches += match - unsanitized_argument_inputs = {} - for x in list(set(matches)): - sanitized_str = str(dsl.PipelineParam(K8sHelper.sanitize_k8s_name(x[1]), K8sHelper.sanitize_k8s_name(x[0]), x[2])) - unsanitized_argument_inputs[sanitized_str] = str(dsl.PipelineParam(x[1], x[0], x[2])) - - if argument_inputs: - for param in argument_inputs: - if str(param) in unsanitized_argument_inputs: - full_name = self._pipelineparam_full_name(param) - processed_args[i] = re.sub(unsanitized_argument_inputs[str(param)], '{{inputs.parameters.%s}}' % full_name, - processed_args[i]) - return processed_args - def _op_to_template(self, op): - """Generate template given an operator inherited from dsl.ContainerOp.""" - - def _build_conventional_artifact(name, path): - return { - 'name': name, - 'path': path, - 's3': { - # TODO: parameterize namespace for minio service - 'endpoint': 'minio-service.kubeflow:9000', - 'bucket': 'mlpipeline', - 'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz', - 'insecure': True, - 'accessKeySecret': { - 'name': 'mlpipeline-minio-artifact', - 'key': 'accesskey', - }, - 'secretKeySecret': { - 'name': 'mlpipeline-minio-artifact', - 'key': 'secretkey' - } - }, - } - - processed_arguments = self._process_args(op.arguments, op.argument_inputs) - processed_command = self._process_args(op.command, op.argument_inputs) - - input_parameters = [] - for param in op.inputs: - one_parameter = {'name': self._pipelineparam_full_name(param)} - if param.value: - one_parameter['value'] = str(param.value) - input_parameters.append(one_parameter) - # Sort to make the results deterministic. - input_parameters.sort(key=lambda x: x['name']) - - output_parameters = [] - for param in op.outputs.values(): - output_parameters.append({ - 'name': self._pipelineparam_full_name(param), - 'valueFrom': {'path': op.file_outputs[param.name]} - }) - output_parameters.sort(key=lambda x: x['name']) - - template = { - 'name': op.name, - 'container': { - 'image': op.image, - } - } - if processed_arguments: - template['container']['args'] = processed_arguments - if processed_command: - template['container']['command'] = processed_command - if input_parameters: - template['inputs'] = {'parameters': input_parameters} - - template['outputs'] = {} - if output_parameters: - template['outputs'] = {'parameters': output_parameters} - - # Generate artifact for metadata output - # The motivation of appending the minio info in the yaml - # is to specify a unique path for the metadata. - # TODO: after argo addresses the issue that configures a unique path - # for the artifact output when default artifact repository is configured, - # this part needs to be updated to use the default artifact repository. - output_artifacts = [] - output_artifacts.append(_build_conventional_artifact('mlpipeline-ui-metadata', '/mlpipeline-ui-metadata.json')) - output_artifacts.append(_build_conventional_artifact('mlpipeline-metrics', '/mlpipeline-metrics.json')) - template['outputs']['artifacts'] = output_artifacts - - # Set resources. - if op.resource_limits or op.resource_requests: - template['container']['resources'] = {} - if op.resource_limits: - template['container']['resources']['limits'] = op.resource_limits - if op.resource_requests: - template['container']['resources']['requests'] = op.resource_requests - - # Set nodeSelector. - if op.node_selector: - template['nodeSelector'] = op.node_selector - - if op.env_variables: - template['container']['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.env_variables)) - if op.volume_mounts: - template['container']['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.volume_mounts)) - - if op.pod_annotations or op.pod_labels: - template['metadata'] = {} - if op.pod_annotations: - template['metadata']['annotations'] = op.pod_annotations - if op.pod_labels: - template['metadata']['labels'] = op.pod_labels - - if op.num_retries: - template['retryStrategy'] = {'limit': op.num_retries} - - if op.sidecars: - def _sidecar_to_template(tsidecar): - index, sidecar = tsidecar - sidecar_template = { - 'name': '{}-{}-{}'.format(op.name, sidecar.name, index), - 'image': sidecar.image - } - sidecar_processed_arguments = self._process_args(op.arguments, op.argument_inputs) - sidecar_processed_command = self._process_args(op.command, op.argument_inputs) - if sidecar_processed_arguments: - sidecar_template['args'] = sidecar_processed_arguments - if sidecar_processed_command: - sidecar_template['command'] = sidecar_processed_command - # Set resources. - if sidecar.resource_limits or sidecar.resource_requests: - sidecar_template['resources'] = {} - if sidecar.resource_limits: - sidecar_template['resources']['limits'] = sidecar.resource_limits - if sidecar.resource_requests: - sidecar_template['resources']['requests'] = sidecar.resource_requests - # env variables - if sidecar.env_variables: - sidecar_template['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, sidecar.env_variables)) - # volume mounts - if sidecar.volume_mounts: - sidecar_template['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, sidecar.volume_mounts)) - return sidecar_template - - template['sidecars'] = list(map(_sidecar_to_template, enumerate(op.sidecars))) - - return template + return _op_to_template(op) def _group_to_template(self, group, inputs, outputs, dependencies): """Generate template given an OpsGroup. @@ -378,15 +229,14 @@ def _group_to_template(self, group, inputs, outputs, dependencies): template['inputs'] = { 'parameters': template_inputs } - # Generate outputs section. if outputs.get(group.name, None): template_outputs = [] - for param_name, depentent_name in outputs[group.name]: + for param_name, dependent_name in outputs[group.name]: template_outputs.append({ 'name': param_name, 'valueFrom': { - 'parameter': '{{tasks.%s.outputs.parameters.%s}}' % (depentent_name, param_name) + 'parameter': '{{tasks.%s.outputs.parameters.%s}}' % (dependent_name, param_name) } }) template_outputs.sort(key=lambda x: x['name']) @@ -451,7 +301,7 @@ def _create_templates(self, pipeline): templates.append(self._group_to_template(g, inputs, outputs, dependencies)) for op in pipeline.ops.values(): - templates.append(self._op_to_template(op)) + templates.append(_op_to_template(op)) return templates def _create_volumes(self, pipeline): @@ -463,9 +313,9 @@ def _create_volumes(self, pipeline): for v in op.volumes: # Remove volume duplicates which have the same name #TODO: check for duplicity based on the serialized volumes instead of just name. - if v.name not in volume_name_set: - volume_name_set.add(v.name) - volumes.append(K8sHelper.convert_k8s_obj_to_json(v)) + if v['name'] not in volume_name_set: + volume_name_set.add(v['name']) + volumes.append(v) volumes.sort(key=lambda x: x['name']) return volumes @@ -570,7 +420,7 @@ def _compile(self, pipeline_func): for op in p.ops.values(): sanitized_name = K8sHelper.sanitize_k8s_name(op.name) op.name = sanitized_name - for param in op.inputs + op.argument_inputs: + for param in op.inputs: # + op.argument_inputs: param.name = K8sHelper.sanitize_k8s_name(param.name) if param.op_name: param.op_name = K8sHelper.sanitize_k8s_name(param.op_name) @@ -590,7 +440,6 @@ def _compile(self, pipeline_func): op.file_outputs = sanitized_file_outputs sanitized_ops[sanitized_name] = op p.ops = sanitized_ops - workflow = self._create_pipeline_workflow(args_list_with_defaults, p) return workflow diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index fd4953642f7..413faab5c69 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -15,7 +15,7 @@ from ._pipeline_param import PipelineParam from ._pipeline import Pipeline, pipeline, get_pipeline_conf -from ._container_op import ContainerOp, SideCar +from ._container_op import ContainerOp, Sidecar from ._ops_group import OpsGroup, ExitHandler, Condition from ._component import python_component #TODO: expose the component decorator when ready \ No newline at end of file diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 7fecbb0babc..88c59eca8b1 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -12,352 +12,819 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re +from typing import Any, Dict, List, TypeVar, Union +from kubernetes.client.models import ( + V1Container, V1EnvVar, V1EnvFromSource, V1SecurityContext, V1Probe, + V1ResourceRequirements, V1VolumeDevice, V1VolumeMount, V1ContainerPort, + V1Lifecycle) from . import _pipeline from . import _pipeline_param -from ._pipeline_param import _extract_pipelineparams -from ._metadata import ComponentMeta -import re -from typing import Dict - - -class ContainerBase(object): - """Represents a container template in a pod""" - - def __init__(self, name: str, image: str): - """ - Args: - name: the name of the container specified as a DNS_LABEL. - image: the sidecar container image name, such as 'rabbitmq:latest'. - """ - self.name = name - self.image = image - self.resource_limits = {} - self.resource_requests = {} - self.volume_mounts = [] - self.env_variables = [] - - def _validate_memory_string(self, memory_string): - """Validate a given string is valid for memory request or limit.""" - - if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', memory_string) is None: - raise ValueError('Invalid memory string. Should be an integer, or integer followed ' - 'by one of "E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki"') - - def _validate_cpu_string(self, cpu_string): - "Validate a given string is valid for cpu request or limit." - - if re.match(r'^[0-9]+m$', cpu_string) is not None: - return - - try: - float(cpu_string) - except ValueError: - raise ValueError('Invalid cpu string. Should be float or integer, or integer followed ' - 'by "m".') - - def _validate_positive_number(self, str_value, param_name): - "Validate a given string is in positive integer format." - - try: - int_value = int(str_value) - except ValueError: - raise ValueError('Invalid {}. Should be integer.'.format(param_name)) - - if int_value <= 0: - raise ValueError('{} must be positive integer.'.format(param_name)) - - def add_resource_limit(self, resource_name, value): - """Add the resource limit of the container. - - Args: - resource_name: The name of the resource. It can be cpu, memory, etc. - value: The string value of the limit. - """ - - self.resource_limits[resource_name] = value - return self - - def add_resource_request(self, resource_name, value): - """Add the resource request of the container. - - Args: - resource_name: The name of the resource. It can be cpu, memory, etc. - value: The string value of the request. - """ - - self.resource_requests[resource_name] = value - return self - - def set_memory_request(self, memory): - """Set memory request (minimum) for this operator. - - Args: - memory: a string which can be a number or a number followed by one of - "E", "P", "T", "G", "M", "K". - """ - - self._validate_memory_string(memory) - return self.add_resource_request("memory", memory) - - def set_memory_limit(self, memory): - """Set memory limit (maximum) for this operator. - - Args: - memory: a string which can be a number or a number followed by one of - "E", "P", "T", "G", "M", "K". - """ - self._validate_memory_string(memory) - return self.add_resource_limit("memory", memory) - - def set_cpu_request(self, cpu): - """Set cpu request (minimum) for this operator. - Args: - cpu: A string which can be a number or a number followed by "m", which means 1/1000. - """ +# generics +T = TypeVar('T') +# type alias: either a string or a list of string +StringOrStringList = Union[str, List[str]] - self._validate_cpu_string(cpu) - return self.add_resource_request("cpu", cpu) - def set_cpu_limit(self, cpu): - """Set cpu limit (maximum) for this operator. +# util functions +def as_list(value: Any, if_none: Union[None, List] = None) -> List: + """Convert any value except None to a list if not already a list.""" + if value is None: + return if_none + return value if isinstance(value, list) else [value] - Args: - cpu: A string which can be a number or a number followed by "m", which means 1/1000. - """ - self._validate_cpu_string(cpu) - return self.add_resource_limit("cpu", cpu) +def create_and_append(current_list: Union[List[T], None], item: T) -> List[T]: + """Create a list (if needed) and appends an item to it.""" + current_list = current_list or [] + current_list.append(item) + return current_list - def set_gpu_limit(self, gpu, vendor = "nvidia"): - """Set gpu limit for the operator. This function add '.com/gpu' into resource limit. - Note that there is no need to add GPU request. GPUs are only supposed to be specified in - the limits section. See https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/. - Args: - gpu: A string which must be a positive number. - vendor: Optional. A string which is the vendor of the requested gpu. The supported values - are: 'nvidia' (default), and 'amd'. - """ - - self._validate_positive_number(gpu, 'gpu') - if vendor != 'nvidia' and vendor != 'amd': - raise ValueError('vendor can only be nvidia or amd.') - - return self.add_resource_limit("%s.com/gpu" % vendor, gpu) - - def add_volume_mount(self, volume_mount): - """Add volume to the container - - Args: - volume_mount: Kubernetes volume mount - For detailed spec, check volume mount definition - https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume_mount.py - """ - - self.volume_mounts.append(volume_mount) - return self - - def add_env_variable(self, env_variable): - """Add environment variable to the container. - - Args: - env_variable: Kubernetes environment variable - For detailed spec, check environment variable definition - https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_env_var.py +class Container(V1Container): """ + A wrapper over k8s container definition object (io.k8s.api.core.v1.Container), + which is used to represent the `container` property in argo's workflow + template (io.argoproj.workflow.v1alpha1.Template). + + `Container` class also comes with utility functions to set and update the + the various properties for a k8s container definition. + + NOTE: A notable difference is that `name` is not required and will not be + processed for `Container` (in contrast to `V1Container` where `name` is a + required property). + + See: + - https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_container.py + - https://github.com/argoproj/argo/blob/master/api/openapi-spec/swagger.json + + + Example: - self.env_variables.append(env_variable) - return self + from kfp.dsl import ContainerOp + from kubernetes.client.models import V1EnvVar + + # creates a operation + op = ContainerOp(name='bash-ops', i + mage='busybox:latest', + command=['echo'], + arguments=['$MSG']) -class SideCar(ContainerBase): - """Represents a sidecar container to be provisioned together with a docker container image for ContainerOp.""" + # returns a `Container` object from `ContainerOp` + # and add an environment variable to `Container` + op.container.add_env_variable(V1EnvVar(name='MSG', value='hello world')) - def __init__(self, name: str, image: str, command: str=None, arguments: str=None): """ - Args: - name: the name of the sidecar. - image: the sidecar container image name, such as 'rabbitmq:latest' - command: the command to run in the container. - If None, uses default CMD in defined in container. - arguments: the arguments of the command. The command can include "%s" and supply - a PipelineParam as the string replacement. For example, ('echo %s' % input_param). - At container run time the argument will be 'echo param_value'. """ - # name will be generated when attached to ContainerOps - super().__init__(name, image) - self.command = command - self.arguments = arguments - - matches = [] - for arg in (command or []) + (arguments or []): - match = re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', str(arg)) - matches += match - - self.argument_inputs = [_pipeline_param.PipelineParam(x[1], x[0], x[2]) - for x in list(set(matches))] - - -class ContainerOp(ContainerBase): - """Represents an op implemented by a docker container image.""" - - def __init__(self, name: str, image: str, command: str=None, arguments: str=None, - file_outputs : Dict[str, str]=None, is_exit_handler=False): - """Create a new instance of ContainerOp. - - Args: - name: the name of the op. It does not have to be unique within a pipeline - because the pipeline will generates a unique new name in case of conflicts. - image: the container image name, such as 'python:3.5-jessie' - command: the command to run in the container. - If None, uses default CMD in defined in container. - arguments: the arguments of the command. The command can include "%s" and supply - a PipelineParam as the string replacement. For example, ('echo %s' % input_param). - At container run time the argument will be 'echo param_value'. - file_outputs: Maps output labels to local file paths. At pipeline run time, - the value of a PipelineParam is saved to its corresponding local file. It's - one way for outside world to receive outputs of the container. - is_exit_handler: Whether it is used as an exit handler. + Attributes: + swagger_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. """ - if not _pipeline.Pipeline.get_default_pipeline(): - raise ValueError('Default pipeline not defined.') - - valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$' - if not re.match(valid_name_regex, name): - raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with letter: %s' % (name)) - - # human_name must exist to construct containerOps name - self.human_name = name - super().__init__(_pipeline.Pipeline.get_default_pipeline().add_op(self, is_exit_handler), image) - - self.command = command - self.arguments = arguments - self.is_exit_handler = is_exit_handler - self.node_selector = {} - self.volumes = [] - self.pod_annotations = {} - self.pod_labels = {} - self.num_retries = 0 - self.sidecars = [] - - matches = [] - for arg in (command or []) + (arguments or []): - match = re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', str(arg)) - matches += match - - self.argument_inputs = [_pipeline_param.PipelineParam(x[1], x[0], x[2]) - for x in list(set(matches))] - self.file_outputs = file_outputs - self.dependent_op_names = [] - - self.inputs = [] - if self.argument_inputs: - self.inputs += self.argument_inputs - - self.outputs = {} - if file_outputs: - self.outputs = {name: _pipeline_param.PipelineParam(name, op_name=self.name) - for name in file_outputs.keys()} - - self.output=None - if len(self.outputs) == 1: - self.output = list(self.outputs.values())[0] - - def apply(self, mod_func): - """Applies a modifier function to self. The function should return the passed object. - This is needed to chain "extention methods" to this class. + # remove `name` from swagger_types so `name` is not generated in the JSON + swagger_types = { + key: value + for key, value in V1Container.swagger_types.items() if key != 'name' + } + attribute_map = { + key: value + for key, value in V1Container.attribute_map.items() if key != 'name' + } + + def __init__(self, image: str, command: List[str], args: List[str], + **kwargs): + """Creates a new instance of `Container`. + + Args: + image {str}: image to use, e.g. busybox:latest + command {List[str]}: entrypoint array. Not executed within a shell. + args {List[str]}: arguments to entrypoint. + **kwargs: keyword arguments for `V1Container` + """ + # set name to '' if name is not provided + # k8s container MUST have a name + # argo workflow template does not need a name for container def + if not kwargs.get('name'): + kwargs['name'] = '' + + super(Container, self).__init__( + image=image, command=command, args=args, **kwargs) + + def _validate_memory_string(self, memory_string): + """Validate a given string is valid for memory request or limit.""" + + if isinstance(memory_string, _pipeline_param.PipelineParam): + if memory_string.value: + memory_string = memory_string.value + else: + return + + if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', + memory_string) is None: + raise ValueError( + 'Invalid memory string. Should be an integer, or integer followed ' + 'by one of "E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki"') + + def _validate_cpu_string(self, cpu_string): + "Validate a given string is valid for cpu request or limit." + + if isinstance(cpu_string, _pipeline_param.PipelineParam): + if cpu_string.value: + cpu_string = cpu_string.value + else: + return + + if re.match(r'^[0-9]+m$', cpu_string) is not None: + return + + try: + float(cpu_string) + except ValueError: + raise ValueError( + 'Invalid cpu string. Should be float or integer, or integer followed ' + 'by "m".') + + def _validate_positive_number(self, str_value, param_name): + "Validate a given string is in positive integer format." + + if isinstance(str_value, _pipeline_param.PipelineParam): + if str_value.value: + str_value = str_value.value + else: + return + + try: + int_value = int(str_value) + except ValueError: + raise ValueError( + 'Invalid {}. Should be integer.'.format(param_name)) + + if int_value <= 0: + raise ValueError('{} must be positive integer.'.format(param_name)) + + def add_resource_limit(self, resource_name, value): + """Add the resource limit of the container. + + Args: + resource_name: The name of the resource. It can be cpu, memory, etc. + value: The string value of the limit. + """ + + self.resources = self.resources or V1ResourceRequirements() + self.resources.limits = self.resources.limits or {} + self.resources.limits.update({resource_name: value}) + return self + + def add_resource_request(self, resource_name, value): + """Add the resource request of the container. + + Args: + resource_name: The name of the resource. It can be cpu, memory, etc. + value: The string value of the request. + """ + + self.resources = self.resources or V1ResourceRequirements() + self.resources.requests = self.resources.requests or {} + self.resources.requests.update({resource_name: value}) + return self + + def set_memory_request(self, memory): + """Set memory request (minimum) for this operator. + + Args: + memory: a string which can be a number or a number followed by one of + "E", "P", "T", "G", "M", "K". + """ + + self._validate_memory_string(memory) + return self.add_resource_request("memory", memory) + + def set_memory_limit(self, memory): + """Set memory limit (maximum) for this operator. + + Args: + memory: a string which can be a number or a number followed by one of + "E", "P", "T", "G", "M", "K". + """ + self._validate_memory_string(memory) + return self.add_resource_limit("memory", memory) + + def set_cpu_request(self, cpu): + """Set cpu request (minimum) for this operator. + + Args: + cpu: A string which can be a number or a number followed by "m", which means 1/1000. + """ + + self._validate_cpu_string(cpu) + return self.add_resource_request("cpu", cpu) + + def set_cpu_limit(self, cpu): + """Set cpu limit (maximum) for this operator. + + Args: + cpu: A string which can be a number or a number followed by "m", which means 1/1000. + """ + + self._validate_cpu_string(cpu) + return self.add_resource_limit("cpu", cpu) + + def set_gpu_limit(self, gpu, vendor="nvidia"): + """Set gpu limit for the operator. This function add '.com/gpu' into resource limit. + Note that there is no need to add GPU request. GPUs are only supposed to be specified in + the limits section. See https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/. + + Args: + gpu: A string which must be a positive number. + vendor: Optional. A string which is the vendor of the requested gpu. The supported values + are: 'nvidia' (default), and 'amd'. + """ + + self._validate_positive_number(gpu, 'gpu') + if vendor != 'nvidia' and vendor != 'amd': + raise ValueError('vendor can only be nvidia or amd.') + + return self.add_resource_limit("%s.com/gpu" % vendor, gpu) + + def add_volume_mount(self, volume_mount): + """Add volume to the container + + Args: + volume_mount: Kubernetes volume mount + For detailed spec, check volume mount definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume_mount.py + """ + + if not isinstance(volume_mount, V1VolumeMount): + raise ValueError( + 'invalid argument. Must be of instance `V1VolumeMount`.') - Example: - from kfp.gcp import use_gcp_secret - task = ( - train_op(...) - .set_memory_request('1GB') - .apply(use_gcp_secret('user-gcp-sa')) - .set_memory_limit('2GB') - ) + self.volume_mounts = create_and_append(self.volume_mounts, + volume_mount) + return self + + def add_volume_devices(self, volume_device): + """ + Add a block device to be used by the container. + + Args: + volume_device: Kubernetes volume device + For detailed spec, volume device definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume_device.py + """ + + if not isinstance(volume_device, V1VolumeDevice): + raise ValueError( + 'invalid argument. Must be of instance `V1VolumeDevice`.') + + self.volume_devices = create_and_append(self.volume_devices, + volume_device) + return self + + def add_env_variable(self, env_variable): + """Add environment variable to the container. + + Args: + env_variable: Kubernetes environment variable + For detailed spec, check environment variable definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_env_var.py + """ + + if not isinstance(env_variable, V1EnvVar): + raise ValueError( + 'invalid argument. Must be of instance `V1EnvVar`.') + + self.env = create_and_append(self.env, env_variable) + return self + + def add_env_from(self, env_from): + """Add a source to populate environment variables int the container. + + Args: + env_from: Kubernetes environment from source + For detailed spec, check environment from source definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_env_var_source.py + """ + + if not isinstance(env_from, V1EnvFromSource): + raise ValueError( + 'invalid argument. Must be of instance `V1EnvFromSource`.') + + self.env_from = create_and_append(self.env_from, env_from) + return self + + def set_image_pull_policy(self, image_pull_policy): + """Set image pull policy for the container. + + Args: + image_pull_policy: One of `Always`, `Never`, `IfNotPresent`. + """ + if image_pull_policy not in ['Always', 'Never', 'IfNotPresent']: + raise ValueError( + 'Invalid imagePullPolicy. Must be one of `Always`, `Never`, `IfNotPresent`.' + ) + + self.image_pull_policy = image_pull_policy + return self + + def add_port(self, container_port): + """Add a container port to the container. + + Args: + container_port: Kubernetes container port + For detailed spec, check container port definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_container_port.py + """ + + if not isinstance(container_port, V1ContainerPort): + raise ValueError( + 'invalid argument. Must be of instance `V1ContainerPort`.') + + self.ports = create_and_append(self.ports, container_port) + return self + + def set_security_context(self, security_context): + """Set security configuration to be applied on the container. + + Args: + security_context: Kubernetes security context + For detailed spec, check security context definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_security_context.py + """ + + if not isinstance(security_context, V1SecurityContext): + raise ValueError( + 'invalid argument. Must be of instance `V1SecurityContext`.') + + self.security_context = security_context + return self + + def set_stdin(self, stdin=True): + """ + Whether this container should allocate a buffer for stdin in the container + runtime. If this is not set, reads from stdin in the container will always + result in EOF. + + Args: + stdin: boolean flag + """ + + self.stdin = stdin + return self + + def set_stdin_once(self, stdin_once=True): + """ + Whether the container runtime should close the stdin channel after it has + been opened by a single attach. When stdin is true the stdin stream will + remain open across multiple attach sessions. If stdinOnce is set to true, + stdin is opened on container start, is empty until the first client attaches + to stdin, and then remains open and accepts data until the client + disconnects, at which time stdin is closed and remains closed until the + container is restarted. If this flag is false, a container processes that + reads from stdin will never receive an EOF. + + Args: + stdin_once: boolean flag + """ + + self.stdin_once = stdin_once + return self + + def set_termination_message_path(self, termination_message_path): + """ + Path at which the file to which the container's termination message will be + written is mounted into the container's filesystem. Message written is + intended to be brief final status, such as an assertion failure message. + Will be truncated by the node if greater than 4096 bytes. The total message + length across all containers will be limited to 12kb. + + Args: + termination_message_path: path for the termination message + """ + self.termination_message_path = termination_message_path + return self + + def set_termination_message_policy(self, termination_message_policy): + """ + Indicate how the termination message should be populated. File will use the + contents of terminationMessagePath to populate the container status message + on both success and failure. FallbackToLogsOnError will use the last chunk + of container log output if the termination message file is empty and the + container exited with an error. The log output is limited to 2048 bytes or + 80 lines, whichever is smaller. + + Args: + termination_message_policy: `File` or `FallbackToLogsOnError` + """ + if termination_message_policy not in ['File', 'FallbackToLogsOnError']: + raise ValueError( + 'terminationMessagePolicy must be `File` or `FallbackToLogsOnError`' + ) + self.termination_message_policy = termination_message_policy + return self + + def set_tty(self, tty=True): + """ + Whether this container should allocate a TTY for itself, also requires + 'stdin' to be true. + + Args: + tty: boolean flag + """ + + self.tty = tty + return self + + def set_readiness_probe(self, readiness_probe): + """ + Set a readiness probe for the container. + + Args: + readiness_probe: Kubernetes readiness probe + For detailed spec, check probe definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_probe.py + """ + + if not isinstance(readiness_probe, V1Probe): + raise ValueError( + 'invalid argument. Must be of instance `V1Probe`.') + + self.readiness_probe = readiness_probe + return self + + def set_liveness_probe(self, liveness_probe): + """ + Set a liveness probe for the container. + + Args: + liveness_probe: Kubernetes liveness probe + For detailed spec, check probe definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_probe.py + """ + + if not isinstance(liveness_probe, V1Probe): + raise ValueError( + 'invalid argument. Must be of instance `V1Probe`.') + + self.liveness_probe = liveness_probe + return self + + def set_lifecycle(self, lifecycle): + """ + Setup a lifecycle config for the container. + + Args: + lifecycle: Kubernetes lifecycle + For detailed spec, lifecycle definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_lifecycle.py + """ + + if not isinstance(lifecycle, V1Lifecycle): + raise ValueError( + 'invalid argument. Must be of instance `V1Lifecycle`.') + + self.lifecycle = lifecycle + return self + + +class Sidecar(Container): """ - return mod_func(self) + Represents an argo workflow sidecar (io.argoproj.workflow.v1alpha1.Sidecar) + to be used in `sidecars` property in argo's workflow template + (io.argoproj.workflow.v1alpha1.Template). - def after(self, op): - """Specify explicit dependency on another op.""" - self.dependent_op_names.append(op.name) - return self + `Sidecar` inherits from `Container` class with an addition of `mirror_volume_mounts` + attribute (`mirrorVolumeMounts` property). - def add_volume(self, volume): - """Add K8s volume to the container + See https://github.com/argoproj/argo/blob/master/api/openapi-spec/swagger.json - Args: - volume: Kubernetes volumes - For detailed spec, check volume definition - https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume.py - """ - - self.volumes.append(volume) - return self - - def add_node_selector_constraint(self, label_name, value): - """Add a constraint for nodeSelector. Each constraint is a key-value pair label. For the - container to be eligible to run on a node, the node must have each of the constraints appeared - as labels. - - Args: - label_name: The name of the constraint label. - value: The value of the constraint label. - """ - - self.node_selector[label_name] = value - return self - - def add_pod_annotation(self, name: str, value: str): - """Adds a pod's metadata annotation. + Example - Args: - name: The name of the annotation. - value: The value of the annotation. - """ + from kfp.dsl import ContainerOp, Sidecar - self.pod_annotations[name] = value - return self - def add_pod_label(self, name: str, value: str): - """Adds a pod's metadata label. + # creates a `ContainerOp` and adds a redis `Sidecar` + op = (ContainerOp(name='foo-op', image='busybox:latest') + .add_sidecar( + Sidecar(name='redis', image='redis:alpine'))) - Args: - name: The name of the label. - value: The value of the label. """ - - self.pod_labels[name] = value - return self - - def set_retry(self, num_retries: int): - """Sets the number of times the task is retried until it's declared failed. - - Args: - num_retries: Number of times to retry on failures. """ - - self.num_retries = num_retries - return self - - def add_sidecar(self, sidecar: SideCar): - """Add a sidecar to the ContainerOps. - - Args: - sidecar: SideCar object. + Attributes: + swagger_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. """ - self.sidecars.append(sidecar) - - def __repr__(self): - return str({self.__class__.__name__: self.__dict__}) - - def _set_metadata(self, metadata): - '''_set_metadata passes the containerop the metadata information - Args: - metadata (ComponentMeta): component metadata - ''' - if not isinstance(metadata, ComponentMeta): - raise ValueError('_set_medata is expecting ComponentMeta.') - self._metadata = metadata \ No newline at end of file + # adds `mirror_volume_mounts` to `Sidecar` swagger definition + # NOTE inherits definition from `V1Container` rather than `Container` + # because `Container` has no `name` property. + swagger_types = dict( + **V1Container.swagger_types, mirror_volume_mounts='bool') + + attribute_map = dict( + **V1Container.attribute_map, mirror_volume_mounts='mirrorVolumeMounts') + + def __init__(self, + name: str, + image: str, + command: StringOrStringList = None, + args: StringOrStringList = None, + mirror_volume_mounts: bool = None, + **kwargs): + """Creates a new instance of `Sidecar`. + + Args: + name {str}: unique name for the sidecar container + image {str}: image to use for the sidecar container, e.g. redis:alpine + command {StringOrStringList}: entrypoint array. Not executed within a shell. + args {StringOrStringList}: arguments to the entrypoint. + mirror_volume_mounts {bool}: MirrorVolumeMounts will mount the same + volumes specified in the main container to the sidecar (including artifacts), + at the same mountPaths. This enables dind daemon to partially see the same + filesystem as the main container in order to use features such as docker + volume binding + **kwargs: keyword arguments available for `Container` + + """ + super().__init__( + name=name, + image=image, + command=as_list(command), + args=as_list(args), + **kwargs) + + self.mirror_volume_mounts = mirror_volume_mounts + + def set_mirror_volume_mounts(self, mirror_volume_mounts=True): + """ + Setting mirrorVolumeMounts to true will mount the same volumes specified + in the main container to the sidecar (including artifacts), at the same + mountPaths. This enables dind daemon to partially see the same filesystem + as the main container in order to use features such as docker volume + binding. + + Args: + mirror_volume_mounts: boolean flag + """ + + self.mirror_volume_mounts = mirror_volume_mounts + return self + + @property + def inputs(self): + """A list of PipelineParam found in the Sidecar object.""" + return _pipeline_param.extract_pipelineparams_from_any(self) + + +class ContainerOp(object): + """Represents an op implemented by a container image.""" + + # list of attributes that might have pipeline params - used to generate + # the input parameters during compilation. + # Excludes `file_outputs` and `outputs` as they are handled separately + # in the compilation process to generate the DAGs and task io parameters. + attrs_with_pipelineparams = [ + '_container', 'node_selector', 'volumes', 'pod_annotations', + 'pod_labels', 'num_retries', 'sidecars' + ] + + def __init__(self, + name: str, + image: str, + command: StringOrStringList = None, + arguments: StringOrStringList = None, + file_outputs: Dict[str, str] = None, + is_exit_handler=False): + """Create a new instance of ContainerOp. + + Args: + name: the name of the op. It does not have to be unique within a pipeline + because the pipeline will generates a unique new name in case of conflicts. + image: the container image name, such as 'python:3.5-jessie' + command: the command to run in the container. + If None, uses default CMD in defined in container. + arguments: the arguments of the command. The command can include "%s" and supply + a PipelineParam as the string replacement. For example, ('echo %s' % input_param). + At container run time the argument will be 'echo param_value'. + file_outputs: Maps output labels to local file paths. At pipeline run time, + the value of a PipelineParam is saved to its corresponding local file. It's + one way for outside world to receive outputs of the container. + is_exit_handler: Whether it is used as an exit handler. + """ + if not _pipeline.Pipeline.get_default_pipeline(): + raise ValueError('Default pipeline not defined.') + + valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$' + if not re.match(valid_name_regex, name): + raise ValueError( + 'Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with letter: %s' + % (name)) + + # convert to list if not a list + command = as_list(command) + arguments = as_list(arguments) + + # human_name must exist to construct containerOps name + self.human_name = name + # actual name for argo workflow + self.name = _pipeline.Pipeline.get_default_pipeline().add_op( + self, is_exit_handler) + + # `container` prop in `io.argoproj.workflow.v1alpha1.Template` + self._container = Container( + image=image, args=arguments, command=command) + # for chaining, and returning back to `ContainerOp` when updating `Container` + # i.e + # op._container.set_image_policy('Always').parent == op # True + setattr(self._container, "parent", self) + + # NOTE for backward compatibility (remove in future?) + # proxy old ContainerOp callables to Container + + # attributes to NOT proxy + ignore_set = frozenset(['to_dict', 'to_str']) + + # decorator func to proxy a method in `Container` into `ContainerOp` + def _proxy(proxy_attr): + """Decorator func to proxy to ContainerOp.container""" + + def _decorated(*args, **kwargs): + ret = getattr(self._container, proxy_attr)(*args, **kwargs) + if ret == self._container: + return self + return ret + + return _decorated + + # iter thru container and attach a proxy func to the container method + for attr_to_proxy in dir(self._container): + func = getattr(self._container, attr_to_proxy) + # only proxy public callables + if hasattr(func, + '__call__') and (attr_to_proxy[0] != '_' + ) and attr_to_proxy not in ignore_set: + setattr(self, attr_to_proxy, _proxy(attr_to_proxy)) + + # TODO: proper k8s definitions so that `convert_k8s_obj_to_json` can be used? + # `io.argoproj.workflow.v1alpha1.Template` properties + self.node_selector = {} + self.volumes = [] + self.pod_annotations = {} + self.pod_labels = {} + self.num_retries = 0 + self.sidecars = [] + + # attributes specific to `ContainerOp` + self._inputs = [] + self.file_outputs = file_outputs + self.dependent_op_names = [] + self.is_exit_handler = is_exit_handler + + self.outputs = {} + if file_outputs: + self.outputs = { + name: _pipeline_param.PipelineParam(name, op_name=self.name) + for name in file_outputs.keys() + } + + self.output = None + if len(self.outputs) == 1: + self.output = list(self.outputs.values())[0] + + @property + def inputs(self): + """List of PipelineParams that will be converted into input parameters + (io.argoproj.workflow.v1alpha1.Inputs) for the argo workflow. + """ + # iterate thru and extract all the `PipelineParam` in `ContainerOp` when + # called the 1st time (because there are in-place updates to `PipelineParam` + # during compilation - remove in-place updates for easier debugging?) + if not self._inputs: + self._inputs = [] + # TODO replace with proper k8s obj? + for key in self.attrs_with_pipelineparams: + self._inputs += [ + param for param in _pipeline_param. + extract_pipelineparams_from_any(getattr(self, key)) + ] + return self._inputs + + @inputs.setter + def inputs(self, value): + # to support in-place updates + self._inputs = value + + @property + def container(self): + """`Container` object that represents the `container` property in + `io.argoproj.workflow.v1alpha1.Template`. Can be used to update the + container configurations. + + Example: + import kfp.dsl as dsl + from kubernetes.client.models import V1EnvVar + + @dsl.pipeline(name='example_pipeline') + def immediate_value_pipeline(): + op1 = (dsl.ContainerOp(name='example', image='nginx:alpine') + .container + .add_env_variable(V1EnvVar(name='HOST', value='foo.bar')) + .add_env_variable(V1EnvVar(name='PORT', value='80')) + .parent # return the parent `ContainerOp` + ) + """ + return self._container + + def apply(self, mod_func): + """Applies a modifier function to self. The function should return the passed object. + This is needed to chain "extention methods" to this class. + + Example: + from kfp.gcp import use_gcp_secret + task = ( + train_op(...) + .set_memory_request('1GB') + .apply(use_gcp_secret('user-gcp-sa')) + .set_memory_limit('2GB') + ) + """ + return mod_func(self) + + def after(self, op): + """Specify explicit dependency on another op.""" + self.dependent_op_names.append(op.name) + return self + + def add_volume(self, volume): + """Add K8s volume to the container + + Args: + volume: Kubernetes volumes + For detailed spec, check volume definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume.py + """ + self.volumes.append(volume) + return self + + def add_node_selector_constraint(self, label_name, value): + """Add a constraint for nodeSelector. Each constraint is a key-value pair label. For the + container to be eligible to run on a node, the node must have each of the constraints appeared + as labels. + + Args: + label_name: The name of the constraint label. + value: The value of the constraint label. + """ + + self.node_selector[label_name] = value + return self + + def add_pod_annotation(self, name: str, value: str): + """Adds a pod's metadata annotation. + + Args: + name: The name of the annotation. + value: The value of the annotation. + """ + + self.pod_annotations[name] = value + return self + + def add_pod_label(self, name: str, value: str): + """Adds a pod's metadata label. + + Args: + name: The name of the label. + value: The value of the label. + """ + + self.pod_labels[name] = value + return self + + def set_retry(self, num_retries: int): + """Sets the number of times the task is retried until it's declared failed. + + Args: + num_retries: Number of times to retry on failures. + """ + + self.num_retries = num_retries + return self + + def add_sidecar(self, sidecar: Sidecar): + """Add a sidecar to the ContainerOps. + + Args: + sidecar: SideCar object. + """ + + self.sidecars.append(sidecar) + return self + + def __repr__(self): + return str({self.__class__.__name__: self.__dict__}) diff --git a/sdk/python/kfp/dsl/_pipeline_param.py b/sdk/python/kfp/dsl/_pipeline_param.py index a2c0d67714c..27cfd831d57 100644 --- a/sdk/python/kfp/dsl/_pipeline_param.py +++ b/sdk/python/kfp/dsl/_pipeline_param.py @@ -14,14 +14,13 @@ import re -from collections import namedtuple - - +from collections import namedtuple, Iterable +from typing import List # TODO: Move this to a separate class # For now, this identifies a condition with only "==" operator supported. ConditionOperator = namedtuple('ConditionOperator', 'operator operand1 operand2') -def _extract_pipelineparams(payloads: str or list[str]): +def _extract_pipelineparams(payloads: str or List[str]): """_extract_pipelineparam extract a list of PipelineParam instances from the payload string. Note: this function removes all duplicate matches. @@ -37,6 +36,54 @@ def _extract_pipelineparams(payloads: str or list[str]): matches += re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', payload) return [PipelineParam(x[1], x[0], x[2]) for x in list(set(matches))] + +def extract_pipelineparams_from_any(payload) -> List['PipelineParam']: + """Recursively extract PipelineParam instances or serialized string from any object or list of objects. + + Args: + payload (str or k8_obj or list[str or k8_obj]): a string/a list + of strings that contains serialized pipelineparams or a k8 definition + object. + Return: + List[PipelineParam] + """ + if not payload: + return [] + + # PipelineParam + if isinstance(payload, PipelineParam): + return [payload] + + # str + if isinstance(payload, str): + return _extract_pipelineparams(payload) + + # list or tuple or iterable + if isinstance(payload, list) or isinstance(payload, tuple) or isinstance(payload, Iterable): + pipeline_params = [] + for item in payload: + pipeline_params += extract_pipelineparams_from_any(item) + return list(set(pipeline_params)) + + # dict + if isinstance(payload, dict): + pipeline_params = [] + for item in payload.values(): + pipeline_params += extract_pipelineparams_from_any(item) + return list(set(pipeline_params)) + + # k8s object + if hasattr(payload, 'swagger_types') and isinstance(payload.swagger_types, dict): + pipeline_params = [] + for key in payload.swagger_types.keys(): + pipeline_params += extract_pipelineparams_from_any(getattr(payload, key)) + + return list(set(pipeline_params)) + + # return empty list + return [] + + class PipelineParam(object): """Representing a future value that is passed between pipeline components. @@ -66,9 +113,19 @@ def __init__(self, name: str, op_name: str=None, value: str=None): if op_name and value: raise ValueError('op_name and value cannot be both set.') - self.op_name = op_name self.name = name - self.value = value + # ensure value is None even if empty string or empty list + # so that serialization and unserialization remain consistent + # (i.e. None => '' => None) + self.op_name = op_name if op_name else None + self.value = value if value else None + + @property + def full_name(self): + """Unique name in the argo yaml for the PipelineParam""" + if self.op_name: + return self.op_name + '-' + self.name + return self.name def __str__(self): """String representation. diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 97218e0fd2a..de7e2534457 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -203,7 +203,6 @@ def _test_py_compile(self, file_base_name): with open(os.path.join(test_data_dir, file_base_name + '.yaml'), 'r') as f: golden = yaml.load(f) compiled = self._get_yaml_from_tar(target_tar) - self.maxDiff = None self.assertEqual(golden, compiled) finally: @@ -213,6 +212,10 @@ def test_py_compile_basic(self): """Test basic sequential pipeline.""" self._test_py_compile('basic') + def test_py_compile_with_sidecars_and_pipelineparams(self): + """Test pipeline with_sidecars and pipelineparams in any k8s attributes.""" + self._test_py_compile('with_sidecars_and_pipelineparams') + def test_py_compile_condition(self): """Test a pipeline with conditions.""" self._test_py_compile('coin') diff --git a/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.py b/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.py new file mode 100644 index 00000000000..0c06ae22a26 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.py @@ -0,0 +1,111 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp.dsl as dsl +import kfp.gcp as gcp + + +class GetFrequentWordOp(dsl.ContainerOp): + """A get frequent word class representing a component in ML Pipelines. + + The class provides a nice interface to users by hiding details such as container, + command, arguments. + """ + + def __init__(self, name, message): + """Args: + name: An identifier of the step which needs to be unique within a pipeline. + message: a dsl.PipelineParam object representing an input message. + """ + super(GetFrequentWordOp, self).__init__( + name=name, + image='python:3.5-jessie', + command=['sh', '-c'], + arguments=[ + 'python -c "from collections import Counter; ' + 'words = Counter(\'%s\'.split()); print(max(words, key=words.get))" ' + '| tee /tmp/message.txt' % message + ], + file_outputs={'word': '/tmp/message.txt'}) + + +class SaveMessageOp(dsl.ContainerOp): + """A class representing a component in ML Pipelines. + + It saves a message to a given output_path. + """ + + def __init__(self, name, message, output_path): + """Args: + name: An identifier of the step which needs to be unique within a pipeline. + message: a dsl.PipelineParam object representing the message to be saved. + output_path: a dsl.PipelineParam object representing the GCS path for output file. + """ + super(SaveMessageOp, self).__init__( + name=name, + image='google/cloud-sdk', + command=['sh', '-c'], + arguments=[ + 'echo %s | tee /tmp/results.txt | gsutil cp /tmp/results.txt %s' + % (message, output_path) + ]) + + +class ExitHandlerOp(dsl.ContainerOp): + """A class representing a component in ML Pipelines. + """ + + def __init__(self, name): + super(ExitHandlerOp, self).__init__( + name=name, + image='python:3.5-jessie', + command=['sh', '-c'], + arguments=['echo exit!']) + + +@dsl.pipeline( + name='Save Most Frequent', + description='Get Most Frequent Word and Save to GCS') +def save_most_frequent_word(message: str, + outputpath: str, + cpu_limit='0.5', + gpu_limit='2', + mirror=True, + sidecar_image_tag='latest'): + """A pipeline function describing the orchestration of the workflow.""" + + exit_op = ExitHandlerOp('exiting') + with dsl.ExitHandler(exit_op): + counter = GetFrequentWordOp(name='get-Frequent', message=message) + counter.set_memory_request('200M') + + saver = SaveMessageOp( + name='save', message=counter.output, output_path=outputpath) + + # update k8s container definition with pipeline params + (saver.container + .set_cpu_limit(cpu_limit) + .set_gpu_limit(gpu_limit) + .set_image_pull_policy("Always")) + + saver.add_node_selector_constraint('cloud.google.com/gke-accelerator', + 'nvidia-tesla-k80') + saver.apply( + gcp.use_tpu(tpu_cores=8, tpu_resource='v2', tf_version='1.12')) + + # add sidecar with str-based PipelineParam, as well as PipelineParam to k8s properties + saver.add_sidecar( + dsl.Sidecar('busybox', + 'busybox:%s' % sidecar_image_tag) + .set_mirror_volume_mounts(mirror)) diff --git a/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.yaml b/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.yaml new file mode 100644 index 00000000000..34c85b2ace4 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/with_sidecars_and_pipelineparams.yaml @@ -0,0 +1,227 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: argoproj.io/v1alpha1 +spec: + entrypoint: save-most-frequent + onExit: exiting + arguments: + parameters: + - name: message + - name: outputpath + - name: cpu-limit + value: '0.5' + - name: gpu-limit + value: '2' + - name: mirror + value: 'True' + - name: sidecar-image-tag + value: latest + serviceAccountName: pipeline-runner + templates: + - name: exit-handler-1 + inputs: + parameters: + - name: message + - name: mirror + - name: outputpath + - name: sidecar-image-tag + dag: + tasks: + - arguments: + parameters: + - name: message + value: "{{inputs.parameters.message}}" + template: get-frequent + name: get-frequent + - dependencies: + - get-frequent + arguments: + parameters: + - name: get-frequent-word + value: "{{tasks.get-frequent.outputs.parameters.get-frequent-word}}" + - name: mirror + value: "{{inputs.parameters.mirror}}" + - name: outputpath + value: "{{inputs.parameters.outputpath}}" + - name: sidecar-image-tag + value: "{{inputs.parameters.sidecar-image-tag}}" + template: save + name: save + - outputs: + artifacts: + - path: "/mlpipeline-ui-metadata.json" + name: mlpipeline-ui-metadata + s3: + insecure: true + endpoint: minio-service.kubeflow:9000 + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + bucket: mlpipeline + - path: "/mlpipeline-metrics.json" + name: mlpipeline-metrics + s3: + insecure: true + endpoint: minio-service.kubeflow:9000 + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + bucket: mlpipeline + container: + command: + - sh + - "-c" + image: python:3.5-jessie + args: + - echo exit! + name: exiting + - outputs: + artifacts: + - path: "/mlpipeline-ui-metadata.json" + name: mlpipeline-ui-metadata + s3: + insecure: true + endpoint: minio-service.kubeflow:9000 + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + bucket: mlpipeline + - path: "/mlpipeline-metrics.json" + name: mlpipeline-metrics + s3: + insecure: true + endpoint: minio-service.kubeflow:9000 + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + bucket: mlpipeline + parameters: + - valueFrom: + path: "/tmp/message.txt" + name: get-frequent-word + container: + command: + - sh + - "-c" + image: python:3.5-jessie + resources: + requests: + memory: 200M + args: + - python -c "from collections import Counter; words = Counter('{{inputs.parameters.message}}'.split()); + print(max(words, key=words.get))" | tee /tmp/message.txt + inputs: + parameters: + - name: message + name: get-frequent + - sidecars: + - mirrorVolumeMounts: "{{inputs.parameters.mirror}}" + image: busybox:{{inputs.parameters.sidecar-image-tag}} + name: busybox + metadata: + annotations: + tf-version.cloud-tpus.google.com: '1.12' + inputs: + parameters: + - name: get-frequent-word + - name: mirror + - name: outputpath + - name: sidecar-image-tag + container: + imagePullPolicy: Always + command: + - sh + - "-c" + resources: + limits: + cloud-tpus.google.com/v2: '8' + nvidia.com/gpu: "{{inputs.parameters.gpu-limit}}" + cpu: "{{inputs.parameters.cpu-limit}}" + image: google/cloud-sdk + args: + - echo {{inputs.parameters.get-frequent-word}} | tee /tmp/results.txt | gsutil + cp /tmp/results.txt {{inputs.parameters.outputpath}} + nodeSelector: + cloud.google.com/gke-accelerator: nvidia-tesla-k80 + outputs: + artifacts: + - path: "/mlpipeline-ui-metadata.json" + name: mlpipeline-ui-metadata + s3: + insecure: true + endpoint: minio-service.kubeflow:9000 + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + bucket: mlpipeline + - path: "/mlpipeline-metrics.json" + name: mlpipeline-metrics + s3: + insecure: true + endpoint: minio-service.kubeflow:9000 + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + bucket: mlpipeline + name: save + - name: save-most-frequent + inputs: + parameters: + - name: message + - name: mirror + - name: outputpath + - name: sidecar-image-tag + dag: + tasks: + - arguments: + parameters: + - name: message + value: "{{inputs.parameters.message}}" + - name: mirror + value: "{{inputs.parameters.mirror}}" + - name: outputpath + value: "{{inputs.parameters.outputpath}}" + - name: sidecar-image-tag + value: "{{inputs.parameters.sidecar-image-tag}}" + template: exit-handler-1 + name: exit-handler-1 + - name: exiting + template: exiting +kind: Workflow +metadata: + generateName: save-most-frequent- diff --git a/sdk/python/tests/dsl/container_op_tests.py b/sdk/python/tests/dsl/container_op_tests.py index 957694d59c4..8f37df3ac21 100644 --- a/sdk/python/tests/dsl/container_op_tests.py +++ b/sdk/python/tests/dsl/container_op_tests.py @@ -13,7 +13,7 @@ # limitations under the License. -from kfp.dsl import Pipeline, PipelineParam, ContainerOp +from kfp.dsl import Pipeline, PipelineParam, ContainerOp, Sidecar import unittest class TestContainerOp(unittest.TestCase): @@ -23,14 +23,19 @@ def test_basic(self): with Pipeline('somename') as p: param1 = PipelineParam('param1') param2 = PipelineParam('param2') - op1 = ContainerOp(name='op1', image='image', + op1 = (ContainerOp(name='op1', image='image', arguments=['%s hello %s %s' % (param1, param2, param1)], file_outputs={'out1': '/tmp/b'}) + .add_sidecar(Sidecar(name='sidecar1', image='image1')) + .add_sidecar(Sidecar(name='sidecar2', image='image2'))) self.assertCountEqual([x.name for x in op1.inputs], ['param1', 'param2']) self.assertCountEqual(list(op1.outputs.keys()), ['out1']) self.assertCountEqual([x.op_name for x in op1.outputs.values()], ['op1']) self.assertEqual(op1.output.name, 'out1') + self.assertCountEqual([sidecar.name for sidecar in op1.sidecars], ['sidecar1', 'sidecar2']) + self.assertCountEqual([sidecar.image for sidecar in op1.sidecars], ['image1', 'image2']) + def test_after_op(self): """Test duplicate ops.""" diff --git a/sdk/python/tests/dsl/pipeline_param_tests.py b/sdk/python/tests/dsl/pipeline_param_tests.py index ed403aa6516..489c08e880b 100644 --- a/sdk/python/tests/dsl/pipeline_param_tests.py +++ b/sdk/python/tests/dsl/pipeline_param_tests.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. - +from kubernetes.client.models import V1Container, V1EnvVar from kfp.dsl import PipelineParam -from kfp.dsl._pipeline_param import _extract_pipelineparams +from kfp.dsl._pipeline_param import _extract_pipelineparams, extract_pipelineparams_from_any import unittest @@ -37,8 +37,8 @@ def test_str_repr(self): p = PipelineParam(name='param3', value='value3') self.assertEqual('{{pipelineparam:op=;name=param3;value=value3}}', str(p)) - def test_extract_pipelineparam(self): - """Test _extract_pipeleineparam.""" + def test_extract_pipelineparams(self): + """Test _extract_pipeleineparams.""" p1 = PipelineParam(name='param1', op_name='op1') p2 = PipelineParam(name='param2') @@ -49,4 +49,19 @@ def test_extract_pipelineparam(self): self.assertListEqual([p1, p2, p3], params) payload = [str(p1) + stuff_chars + str(p2), str(p2) + stuff_chars + str(p3)] params = _extract_pipelineparams(payload) - self.assertListEqual([p1, p2, p3], params) \ No newline at end of file + self.assertListEqual([p1, p2, p3], params) + + def test_extract_pipelineparams_from_any(self): + """Test extract_pipeleineparams.""" + p1 = PipelineParam(name='param1', op_name='op1') + p2 = PipelineParam(name='param2') + p3 = PipelineParam(name='param3', value='value3') + stuff_chars = ' between ' + payload = str(p1) + stuff_chars + str(p2) + stuff_chars + str(p3) + + container = V1Container(name=p1, + image=p2, + env=[V1EnvVar(name="foo", value=payload)]) + + params = extract_pipelineparams_from_any(container) + self.assertListEqual(sorted([p1, p2, p3]), sorted(params))