-
-
Notifications
You must be signed in to change notification settings - Fork 143
/
htcondor.py
266 lines (222 loc) · 8.25 KB
/
htcondor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
import logging
import re
import shlex
import dask
from dask.utils import parse_bytes
from .core import JobQueueCluster, Job, job_parameters, cluster_parameters
logger = logging.getLogger(__name__)
class HTCondorJob(Job):
_script_template = """
%(shebang)s
%(job_header)s
Environment = "%(quoted_environment)s"
Arguments = "%(quoted_arguments)s"
Executable = %(executable)s
Queue
""".lstrip()
submit_command = "condor_submit"
cancel_command = "condor_rm"
job_id_regexp = r"(?P<job_id>\d+\.\d+)"
# condor sets argv[0] of the executable to "condor_exec.exe", which confuses
# Python (can't find its libs), so we have to go through the shell.
executable = "/bin/sh"
config_name = "htcondor"
def __init__(
self,
scheduler=None,
name=None,
disk=None,
job_extra=None,
config_name=None,
submit_command_extra=None,
cancel_command_extra=None,
**base_class_kwargs
):
super().__init__(
scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs
)
if disk is None:
disk = dask.config.get("jobqueue.%s.disk" % self.config_name)
if disk is None:
raise ValueError(
"You must specify how much disk to use per job like ``disk='1 GB'``"
)
self.worker_disk = parse_bytes(disk)
if job_extra is None:
self.job_extra = dask.config.get(
"jobqueue.%s.job-extra" % self.config_name, {}
)
else:
self.job_extra = job_extra
env_extra = base_class_kwargs.get("env_extra", None)
if env_extra is None:
env_extra = dask.config.get(
"jobqueue.%s.env-extra" % self.config_name, default=[]
)
self.env_dict = self.env_lines_to_dict(env_extra)
self.job_header_dict = {
"MY.DaskWorkerName": '"htcondor--$F(MY.JobId)--"',
"RequestCpus": "MY.DaskWorkerCores",
"RequestMemory": "floor(MY.DaskWorkerMemory / 1048576)",
"RequestDisk": "floor(MY.DaskWorkerDisk / 1024)",
"MY.JobId": '"$(ClusterId).$(ProcId)"',
"MY.DaskWorkerCores": self.worker_cores,
"MY.DaskWorkerMemory": self.worker_memory,
"MY.DaskWorkerDisk": self.worker_disk,
}
if self.log_directory:
self.job_header_dict.update(
{
"LogDirectory": self.log_directory,
# $F(...) strips quotes
"Output": "$(LogDirectory)/worker-$F(MY.JobId).out",
"Error": "$(LogDirectory)/worker-$F(MY.JobId).err",
"Log": "$(LogDirectory)/worker-$(ClusterId).log",
# We kill all the workers to stop them so we need to stream their
# output+error if we ever want to see anything
"Stream_Output": True,
"Stream_Error": True,
}
)
if self.job_extra:
self.job_header_dict.update(self.job_extra)
if submit_command_extra is None:
submit_command_extra = dask.config.get(
"jobqueue.%s.submit-command-extra" % self.config_name, []
)
self.submit_command = (
HTCondorJob.submit_command
+ " "
+ " ".join(shlex.quote(arg) for arg in submit_command_extra)
)
if cancel_command_extra is None:
cancel_command_extra = dask.config.get(
"jobqueue.%s.cancel-command-extra" % self.config_name, []
)
self.cancel_command = (
HTCondorJob.cancel_command
+ " "
+ " ".join(shlex.quote(arg) for arg in cancel_command_extra)
)
def env_lines_to_dict(self, env_lines):
"""Convert an array of export statements (what we get from env-extra
in the config) into a dict"""
env_dict = {}
for env_line in env_lines:
split_env_line = shlex.split(env_line)
if split_env_line[0] == "export":
split_env_line = split_env_line[1:]
for item in split_env_line:
if "=" in item:
k, v = item.split("=", 1)
env_dict[k] = v
return env_dict
def job_script(self):
"""Construct a job submission script"""
quoted_arguments = quote_arguments(["-c", self._command_template])
quoted_environment = quote_environment(self.env_dict)
job_header_lines = "\n".join(
"%s = %s" % (k, v) for k, v in self.job_header_dict.items()
)
return self._script_template % {
"shebang": self.shebang,
"job_header": job_header_lines,
"quoted_environment": quoted_environment,
"quoted_arguments": quoted_arguments,
"executable": self.executable,
}
def _job_id_from_submit_output(self, out):
cluster_id_regexp = r"submitted to cluster (\d+)"
match = re.search(cluster_id_regexp, out)
if match is None:
msg = (
"Could not parse cluster id from submission command output.\n"
"Cluster id regexp is {!r}\n"
"Submission command output is:\n{}".format(cluster_id_regexp, out)
)
raise ValueError(msg)
return "%s.0" % match.group(1)
def _double_up_quotes(instr):
return instr.replace("'", "''").replace('"', '""')
def quote_arguments(args):
"""Quote a string or list of strings using the Condor submit file "new" argument quoting rules.
Returns
-------
str
The arguments in a quoted form.
Warnings
--------
You will need to surround the result in double-quotes before using it in
the Arguments attribute.
Examples
--------
>>> quote_arguments(["3", "simple", "arguments"])
'3 simple arguments'
>>> quote_arguments(["one", "two with spaces", "three"])
'one \'two with spaces\' three'
>>> quote_arguments(["one", "\"two\"", "spacy 'quoted' argument"])
'one ""two"" \'spacey \'\'quoted\'\' argument\''
"""
if isinstance(args, str):
args_list = [args]
else:
args_list = args
quoted_args = []
for a in args_list:
qa = _double_up_quotes(a)
if " " in qa or "'" in qa:
qa = "'" + qa + "'"
quoted_args.append(qa)
return " ".join(quoted_args)
def quote_environment(env):
"""Quote a dict of strings using the Condor submit file "new" environment quoting rules.
Returns
-------
str
The environment in quoted form.
Warnings
--------
You will need to surround the result in double-quotes before using it in
the Environment attribute.
Examples
--------
>>> from collections import OrderedDict
>>> quote_environment(OrderedDict([("one", 1), ("two", '"2"'), ("three", "spacey 'quoted' value")]))
'one=1 two=""2"" three=\'spacey \'\'quoted\'\' value\''
"""
if not isinstance(env, dict):
raise TypeError("env must be a dict")
entries = []
for k, v in env.items():
qv = _double_up_quotes(str(v))
if " " in qv or "'" in qv:
qv = "'" + qv + "'"
entries.append("%s=%s" % (k, qv))
return " ".join(entries)
class HTCondorCluster(JobQueueCluster):
__doc__ = """ Launch Dask on an HTCondor cluster with a shared file system
Parameters
----------
disk : str
Total amount of disk per job
job_extra : dict
Extra submit file attributes for the job
submit_command_extra : list of str
Extra arguments to pass to condor_submit
cancel_command_extra : list of str
Extra arguments to pass to condor_rm
{job}
{cluster}
Examples
--------
>>> from dask_jobqueue.htcondor import HTCondorCluster
>>> cluster = HTCondorCluster(cores=24, memory="4GB", disk="4GB")
>>> cluster.scale(jobs=10) # ask for 10 jobs
>>> from dask.distributed import Client
>>> client = Client(cluster)
This also works with adaptive clusters. This automatically launches and kill workers based on load.
>>> cluster.adapt(maximum_jobs=20)
""".format(
job=job_parameters, cluster=cluster_parameters
)
job_cls = HTCondorJob