-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
_container_op.py
155 lines (122 loc) · 5.34 KB
/
_container_op.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . import _pipeline
from . import _pipeline_param
import re
from typing import Dict, List
class ContainerOp(object):
"""Represents an op implemented by a docker container image."""
def __init__(self, name: str, image: str, command: str=None, arguments: str=None,
file_inputs : Dict[_pipeline_param.PipelineParam, str]=None,
file_outputs : Dict[str, str]=None, is_exit_handler=False):
"""Create a new instance of ContainerOp.
Args:
name: the name of the op. Has to be unique within a pipeline.
image: the container image name, such as 'python:3.5-jessie'
command: the command to run in the container.
If None, uses default CMD in defined in container.
arguments: the arguments of the command. The command can include "%s" and supply
a PipelineParam as the string replacement. For example, ('echo %s' % input_param).
At container run time the argument will be 'echo param_value'.
file_inputs: Maps PipelineParams to local file paths. At pipeline run time,
the value of a PipelineParam is saved to its corresponding local file. It is
not implemented yet.
file_outputs: Maps output labels to local file paths. At pipeline run time,
the value of a PipelineParam is saved to its corresponding local file. It's
one way for outside world to receive outputs of the container.
is_exit_handler: Whether it is used as an exit handler.
"""
if not _pipeline.Pipeline.get_default_pipeline():
raise ValueError('Default pipeline not defined.')
self.human_name = name
self.name = _pipeline.Pipeline.get_default_pipeline().add_op(self, is_exit_handler)
self.image = image
self.command = command
self.arguments = arguments
self.is_exit_handler = is_exit_handler
self.memory_limit = None
self.memory_request = None
self.cpu_limit = None
self.cpu_request = None
matches = []
if arguments:
for arg in arguments:
match = re.findall(r'{{pipelineparam:op=([\w-]*);name=([\w-]+);value=(.*?)}}', str(arg))
matches += match
self.argument_inputs = [_pipeline_param.PipelineParam(x[1], x[0], x[2])
for x in list(set(matches))]
self.file_inputs = file_inputs
self.file_outputs = file_outputs
self.dependent_op_names = []
self.inputs = []
if self.argument_inputs:
self.inputs += self.argument_inputs
if file_inputs:
self.inputs += list(file_inputs.keys())
self.outputs = {}
if file_outputs:
self.outputs = {name: _pipeline_param.PipelineParam(name, op_name=self.name)
for name in file_outputs.keys()}
self.output=None
if len(self.outputs) == 1:
self.output = list(self.outputs.values())[0]
def after(self, op):
"""Specify explicit dependency on another op."""
self.dependent_op_names.append(op.name)
def _validate_memory_string(self, memory_string):
"""Validate a given string is valid for memory request or limit."""
if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', memory_string) is None:
raise ValueError('Invalid memory string. Should be an integer, or integer followed '
'by one of "E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki"')
def _validate_cpu_string(self, cpu_string):
"Validate a given string is valid for cpu request or limit."
if re.match(r'^[0-9]+m$', cpu_string) is not None:
return
try:
float(cpu_string)
except ValueError:
raise ValueError('Invalid cpu string. Should be float or integer, or integer followed '
'by "m".')
def set_memory_request(self, memory):
"""Set memory request (minimum) for this operator.
Args:
memory: a string which can be a number or a number followed by one of
"E", "P", "T", "G", "M", "K".
"""
self._validate_memory_string(memory)
self.memory_request = memory
def set_memory_limit(self, memory):
"""Set memory limit (maximum) for this operator.
Args:
memory: a string which can be a number or a number followed by one of
"E", "P", "T", "G", "M", "K".
"""
self._validate_memory_string(memory)
self.memory_limit = memory
def set_cpu_request(self, cpu):
"""Set cpu request (minimum) for this operator.
Args:
cpu: A string which can be a number or a number followed by "m", which means 1/1000.
"""
self._validate_cpu_string(cpu)
self.cpu_request = cpu
def set_cpu_limit(self, cpu):
"""Set cpu limit (maximum) for this operator.
Args:
cpu: A string which can be a number or a number followed by "m", which means 1/1000.
"""
self._validate_cpu_string(cpu)
self.cpu_limit = cpu
def __repr__(self):
return str({self.__class__.__name__: self.__dict__})