-
Notifications
You must be signed in to change notification settings - Fork 191
/
Copy pathtasks.py
446 lines (364 loc) · 20.1 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Transport tasks for calculation jobs."""
import functools
import logging
import tempfile
from tornado.gen import coroutine, Return
import plumpy
from aiida.common.datastructures import CalcJobState
from aiida.common.exceptions import FeatureNotAvailable, TransportTaskException
from aiida.common.folders import SandboxFolder
from aiida.engine.daemon import execmanager
from aiida.engine.utils import exponential_backoff_retry, interruptable_task
from aiida.schedulers.datastructures import JobState
from ..process import ProcessState
UPLOAD_COMMAND = 'upload'
SUBMIT_COMMAND = 'submit'
UPDATE_COMMAND = 'update'
RETRIEVE_COMMAND = 'retrieve'
KILL_COMMAND = 'kill'
TRANSPORT_TASK_RETRY_INITIAL_INTERVAL = 20
TRANSPORT_TASK_MAXIMUM_ATTEMTPS = 5
logger = logging.getLogger(__name__)
class PreSubmitException(Exception):
"""Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`."""
@coroutine
def task_upload_job(process, transport_queue, cancellable):
"""Transport task that will attempt to upload the files of a job calculation to the remote.
The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
retry after an interval that increases exponentially with the number of retries, for a maximum number of retries.
If all retries fail, the task will raise a TransportTaskException
:param node: the node that represents the job calculation
:param transport_queue: the TransportQueue from which to request a Transport
:param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled
:type cancellable: :class:`aiida.engine.utils.InterruptableFuture`
:raises: Return if the tasks was successfully completed
:raises: TransportTaskException if after the maximum number of retries the transport task still excepted
"""
node = process.node
if node.get_state() == CalcJobState.SUBMITTING:
logger.warning('CalcJob<{}> already marked as SUBMITTING, skipping task_update_job'.format(node.pk))
raise Return
initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
authinfo = node.computer.get_authinfo(node.user)
@coroutine
def do_upload():
with transport_queue.request_transport(authinfo) as request:
transport = yield cancellable.with_interrupt(request)
with SandboxFolder() as folder:
# Any exception thrown in `presubmit` call is not transient so we circumvent the exponential backoff
try:
calc_info = process.presubmit(folder)
except Exception as exception: # pylint: disable=broad-except
raise PreSubmitException('exception occurred in presubmit call') from exception
else:
execmanager.upload_calculation(node, transport, calc_info, folder)
raise Return
try:
logger.info('scheduled request to upload CalcJob<{}>'.format(node.pk))
ignore_exceptions = (plumpy.CancelledError, PreSubmitException)
result = yield exponential_backoff_retry(
do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except PreSubmitException:
raise
except plumpy.CancelledError:
pass
except Exception:
logger.warning('uploading CalcJob<{}> failed'.format(node.pk))
raise TransportTaskException('upload_calculation failed {} times consecutively'.format(max_attempts))
else:
logger.info('uploading CalcJob<{}> successful'.format(node.pk))
node.set_state(CalcJobState.SUBMITTING)
raise Return(result)
@coroutine
def task_submit_job(node, transport_queue, cancellable):
"""Transport task that will attempt to submit a job calculation.
The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
retry after an interval that increases exponentially with the number of retries, for a maximum number of retries.
If all retries fail, the task will raise a TransportTaskException
:param node: the node that represents the job calculation
:param transport_queue: the TransportQueue from which to request a Transport
:param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled
:type cancellable: :class:`aiida.engine.utils.InterruptableFuture`
:raises: Return if the tasks was successfully completed
:raises: TransportTaskException if after the maximum number of retries the transport task still excepted
"""
if node.get_state() == CalcJobState.WITHSCHEDULER:
assert node.get_job_id() is not None, 'job is WITHSCHEDULER, however, it does not have a job id'
logger.warning('CalcJob<{}> already marked as WITHSCHEDULER, skipping task_submit_job'.format(node.pk))
raise Return(node.get_job_id())
initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
authinfo = node.computer.get_authinfo(node.user)
@coroutine
def do_submit():
with transport_queue.request_transport(authinfo) as request:
transport = yield cancellable.with_interrupt(request)
raise Return(execmanager.submit_calculation(node, transport))
try:
logger.info('scheduled request to submit CalcJob<{}>'.format(node.pk))
result = yield exponential_backoff_retry(
do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=plumpy.Interruption
)
except plumpy.Interruption:
pass
except Exception:
logger.warning('submitting CalcJob<{}> failed'.format(node.pk))
raise TransportTaskException('submit_calculation failed {} times consecutively'.format(max_attempts))
else:
logger.info('submitting CalcJob<{}> successful'.format(node.pk))
node.set_state(CalcJobState.WITHSCHEDULER)
raise Return(result)
@coroutine
def task_update_job(node, job_manager, cancellable):
"""Transport task that will attempt to update the scheduler status of the job calculation.
The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
retry after an interval that increases exponentially with the number of retries, for a maximum number of retries.
If all retries fail, the task will raise a TransportTaskException
:param node: the node that represents the job calculation
:type node: :class:`aiida.orm.nodes.process.calculation.calcjob.CalcJobNode`
:param job_manager: The job manager
:type job_manager: :class:`aiida.engine.processes.calcjobs.manager.JobManager`
:param cancellable: A cancel flag
:type cancellable: :class:`aiida.engine.utils.InterruptableFuture`
:raises: Return containing True if the tasks was successfully completed, False otherwise
"""
if node.get_state() == CalcJobState.RETRIEVING:
logger.warning('CalcJob<{}> already marked as RETRIEVING, skipping task_update_job'.format(node.pk))
raise Return(True)
initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
authinfo = node.computer.get_authinfo(node.user)
job_id = node.get_job_id()
@coroutine
def do_update():
# Get the update request
with job_manager.request_job_info_update(authinfo, job_id) as update_request:
job_info = yield cancellable.with_interrupt(update_request)
if job_info is None:
# If the job is computed or not found assume it's done
node.set_scheduler_state(JobState.DONE)
job_done = True
else:
node.set_last_job_info(job_info)
node.set_scheduler_state(job_info.job_state)
job_done = job_info.job_state == JobState.DONE
raise Return(job_done)
try:
logger.info('scheduled request to update CalcJob<{}>'.format(node.pk))
job_done = yield exponential_backoff_retry(
do_update, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=plumpy.Interruption
)
except plumpy.Interruption:
raise
except Exception:
logger.warning('updating CalcJob<{}> failed'.format(node.pk))
raise TransportTaskException('update_calculation failed {} times consecutively'.format(max_attempts))
else:
logger.info('updating CalcJob<{}> successful'.format(node.pk))
if job_done:
node.set_state(CalcJobState.RETRIEVING)
raise Return(job_done)
@coroutine
def task_retrieve_job(node, transport_queue, retrieved_temporary_folder, cancellable):
"""Transport task that will attempt to retrieve all files of a completed job calculation.
The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
retry after an interval that increases exponentially with the number of retries, for a maximum number of retries.
If all retries fail, the task will raise a TransportTaskException
:param node: the node that represents the job calculation
:param transport_queue: the TransportQueue from which to request a Transport
:param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled
:type cancellable: :class:`aiida.engine.utils.InterruptableFuture`
:raises: Return if the tasks was successfully completed
:raises: TransportTaskException if after the maximum number of retries the transport task still excepted
"""
if node.get_state() == CalcJobState.PARSING:
logger.warning('CalcJob<{}> already marked as PARSING, skipping task_retrieve_job'.format(node.pk))
raise Return
initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
authinfo = node.computer.get_authinfo(node.user)
@coroutine
def do_retrieve():
with transport_queue.request_transport(authinfo) as request:
transport = yield cancellable.with_interrupt(request)
# Perform the job accounting and set it on the node if successful. If the scheduler does not implement this
# still set the attribute but set it to `None`. This way we can distinguish calculation jobs for which the
# accounting was called but could not be set.
scheduler = node.computer.get_scheduler()
scheduler.set_transport(transport)
try:
detailed_job_info = scheduler.get_detailed_job_info(node.get_job_id())
except FeatureNotAvailable:
logger.info('detailed job info not available for scheduler of CalcJob<{}>'.format(node.pk))
node.set_detailed_job_info(None)
else:
node.set_detailed_job_info(detailed_job_info)
raise Return(execmanager.retrieve_calculation(node, transport, retrieved_temporary_folder))
try:
logger.info('scheduled request to retrieve CalcJob<{}>'.format(node.pk))
yield exponential_backoff_retry(
do_retrieve, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=plumpy.Interruption
)
except plumpy.Interruption:
raise
except Exception:
logger.warning('retrieving CalcJob<{}> failed'.format(node.pk))
raise TransportTaskException('retrieve_calculation failed {} times consecutively'.format(max_attempts))
else:
node.set_state(CalcJobState.PARSING)
logger.info('retrieving CalcJob<{}> successful'.format(node.pk))
raise Return
@coroutine
def task_kill_job(node, transport_queue, cancellable):
"""Transport task that will attempt to kill a job calculation.
The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
retry after an interval that increases exponentially with the number of retries, for a maximum number of retries.
If all retries fail, the task will raise a TransportTaskException
:param node: the node that represents the job calculation
:param transport_queue: the TransportQueue from which to request a Transport
:param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled
:type cancellable: :class:`aiida.engine.utils.InterruptableFuture`
:raises: Return if the tasks was successfully completed
:raises: TransportTaskException if after the maximum number of retries the transport task still excepted
"""
initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
if node.get_state() in [CalcJobState.UPLOADING, CalcJobState.SUBMITTING]:
logger.warning('CalcJob<{}> killed, it was in the {} state'.format(node.pk, node.get_state()))
raise Return(True)
authinfo = node.computer.get_authinfo(node.user)
@coroutine
def do_kill():
with transport_queue.request_transport(authinfo) as request:
transport = yield cancellable.with_interrupt(request)
raise Return(execmanager.kill_calculation(node, transport))
try:
logger.info('scheduled request to kill CalcJob<{}>'.format(node.pk))
result = yield exponential_backoff_retry(do_kill, initial_interval, max_attempts, logger=node.logger)
except plumpy.Interruption:
raise
except Exception:
logger.warning('killing CalcJob<{}> failed'.format(node.pk))
raise TransportTaskException('kill_calculation failed {} times consecutively'.format(max_attempts))
else:
logger.info('killing CalcJob<{}> successful'.format(node.pk))
node.set_scheduler_state(JobState.DONE)
raise Return(result)
class Waiting(plumpy.Waiting):
"""The waiting state for the `CalcJob` process."""
def __init__(self, process, done_callback, msg=None, data=None):
"""
:param :class:`~plumpy.base.state_machine.StateMachine` process: The process this state belongs to
"""
super().__init__(process, done_callback, msg, data)
self._task = None
self._killing = None
def load_instance_state(self, saved_state, load_context):
super().load_instance_state(saved_state, load_context)
self._task = None
self._killing = None
@coroutine
def execute(self):
"""Override the execute coroutine of the base `Waiting` state."""
# pylint: disable=too-many-branches
node = self.process.node
transport_queue = self.process.runner.transport
command = self.data
process_status = 'Waiting for transport task: {}'.format(command)
try:
if command == UPLOAD_COMMAND:
node.set_process_status(process_status)
yield self._launch_task(task_upload_job, self.process, transport_queue)
raise Return(self.submit())
elif command == SUBMIT_COMMAND:
node.set_process_status(process_status)
yield self._launch_task(task_submit_job, node, transport_queue)
raise Return(self.update())
elif self.data == UPDATE_COMMAND:
job_done = False
while not job_done:
scheduler_state = node.get_scheduler_state()
scheduler_state_string = scheduler_state.name if scheduler_state else 'UNKNOWN'
process_status = 'Monitoring scheduler: job state {}'.format(scheduler_state_string)
node.set_process_status(process_status)
job_done = yield self._launch_task(task_update_job, node, self.process.runner.job_manager)
raise Return(self.retrieve())
elif self.data == RETRIEVE_COMMAND:
node.set_process_status(process_status)
# Create a temporary folder that has to be deleted by JobProcess.retrieved after successful parsing
temp_folder = tempfile.mkdtemp()
yield self._launch_task(task_retrieve_job, node, transport_queue, temp_folder)
raise Return(self.parse(temp_folder))
else:
raise RuntimeError('Unknown waiting command')
except TransportTaskException as exception:
raise plumpy.PauseInterruption('Pausing after failed transport task: {}'.format(exception))
except plumpy.KillInterruption:
yield self._launch_task(task_kill_job, node, transport_queue)
self._killing.set_result(True)
raise
except Return:
node.set_process_status(None)
raise
except (plumpy.Interruption, plumpy.CancelledError):
node.set_process_status('Transport task {} was interrupted'.format(command))
raise
finally:
# If we were trying to kill but we didn't deal with it, make sure it's set here
if self._killing and not self._killing.done():
self._killing.set_result(False)
@coroutine
def _launch_task(self, coro, *args, **kwargs):
"""Launch a coroutine as a task, making sure to make it interruptable."""
task_fn = functools.partial(coro, *args, **kwargs)
try:
self._task = interruptable_task(task_fn)
result = yield self._task
raise Return(result)
finally:
self._task = None
def upload(self):
"""Return the `Waiting` state that will `upload` the `CalcJob`."""
msg = 'Waiting for calculation folder upload'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=UPLOAD_COMMAND)
def submit(self):
"""Return the `Waiting` state that will `submit` the `CalcJob`."""
msg = 'Waiting for scheduler submission'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=SUBMIT_COMMAND)
def update(self):
"""Return the `Waiting` state that will `update` the `CalcJob`."""
msg = 'Waiting for scheduler update'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=UPDATE_COMMAND)
def retrieve(self):
"""Return the `Waiting` state that will `retrieve` the `CalcJob`."""
msg = 'Waiting to retrieve'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=RETRIEVE_COMMAND)
def parse(self, retrieved_temporary_folder):
"""Return the `Running` state that will parse the `CalcJob`.
:param retrieved_temporary_folder: temporary folder used in retrieving that can be used during parsing.
"""
return self.create_state(ProcessState.RUNNING, self.process.parse, retrieved_temporary_folder)
def interrupt(self, reason):
"""Interrupt the `Waiting` state by calling interrupt on the transport task `InterruptableFuture`."""
if self._task is not None:
self._task.interrupt(reason)
if isinstance(reason, plumpy.KillInterruption):
if self._killing is None:
self._killing = plumpy.Future()
return self._killing