Skip to content

Commit

Permalink
🐛 FIX: Standardise transport task interrupt handling (#4692)
Browse files Browse the repository at this point in the history
For all transport tasks (upload, submit, update, retrieve),
both `plumpy.futures.CancelledError` and `plumpy.process_states.Interruption` exceptions
should be ignored by the exponential backoff mechanism (i.e. the task should not be retried)
and raised directly (as opposed to as a `TransportTaskException`),
so that they can be correctly caught by the `Waiting.execute` method.

As an example, this fixes a known bug, whereby the upload task could not be
cancelled via `CTRL-C` in an ipython shell.
  • Loading branch information
chrisjsewell authored Feb 9, 2021
1 parent b5cc416 commit e99227b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 25 deletions.
35 changes: 13 additions & 22 deletions aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ async def do_upload():

try:
logger.info(f'scheduled request to upload CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, PreSubmitException)
ignore_exceptions = (plumpy.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption)
skip_submit = await exponential_backoff_retry(
do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except PreSubmitException:
raise
except plumpy.futures.CancelledError:
pass
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
raise
except Exception:
logger.warning(f'uploading CalcJob<{node.pk}> failed')
raise TransportTaskException(f'upload_calculation failed {max_attempts} times consecutively')
Expand Down Expand Up @@ -139,15 +139,12 @@ async def do_submit():

try:
logger.info(f'scheduled request to submit CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
result = await exponential_backoff_retry(
do_submit,
initial_interval,
max_attempts,
logger=node.logger,
ignore_exceptions=plumpy.process_states.Interruption
do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except plumpy.process_states.Interruption:
pass
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): # pylint: disable=try-except-raise
raise
except Exception:
logger.warning(f'submitting CalcJob<{node.pk}> failed')
raise TransportTaskException(f'submit_calculation failed {max_attempts} times consecutively')
Expand Down Expand Up @@ -201,14 +198,11 @@ async def do_update():

try:
logger.info(f'scheduled request to update CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
job_done = await exponential_backoff_retry(
do_update,
initial_interval,
max_attempts,
logger=node.logger,
ignore_exceptions=plumpy.process_states.Interruption
do_update, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except plumpy.process_states.Interruption:
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): # pylint: disable=try-except-raise
raise
except Exception:
logger.warning(f'updating CalcJob<{node.pk}> failed')
Expand Down Expand Up @@ -270,14 +264,11 @@ async def do_retrieve():

try:
logger.info(f'scheduled request to retrieve CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
result = await exponential_backoff_retry(
do_retrieve,
initial_interval,
max_attempts,
logger=node.logger,
ignore_exceptions=plumpy.process_states.Interruption
do_retrieve, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except plumpy.process_states.Interruption:
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): # pylint: disable=try-except-raise
raise
except Exception:
logger.warning(f'retrieving CalcJob<{node.pk}> failed')
Expand Down
6 changes: 3 additions & 3 deletions aiida/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import contextlib
from datetime import datetime
import logging
from typing import Any, Awaitable, Callable, Iterator, List, Optional, Type, Union, TYPE_CHECKING
from typing import Any, Awaitable, Callable, Iterator, List, Optional, Tuple, Type, Union, TYPE_CHECKING

if TYPE_CHECKING:
from .processes import Process, ProcessBuilder
Expand Down Expand Up @@ -160,7 +160,7 @@ async def exponential_backoff_retry(
initial_interval: Union[int, float] = 10.0,
max_attempts: int = 5,
logger: Optional[logging.Logger] = None,
ignore_exceptions=None
ignore_exceptions: Union[None, Type[Exception], Tuple[Type[Exception], ...]] = None
) -> Any:
"""
Coroutine to call a function, recalling it with an exponential backoff in the case of an exception
Expand All @@ -173,7 +173,7 @@ async def exponential_backoff_retry(
:param fct: the function to call, which will be turned into a coroutine first if it is not already
:param initial_interval: the time to wait after the first caught exception before calling the coroutine again
:param max_attempts: the maximum number of times to call the coroutine before re-raising the exception
:param ignore_exceptions: list or tuple of exceptions to ignore, i.e. when caught do nothing and simply re-raise
:param ignore_exceptions: exceptions to ignore, i.e. when caught do nothing and simply re-raise
:return: result if the ``coro`` call completes within ``max_attempts`` retries without raising
"""
if logger is None:
Expand Down

0 comments on commit e99227b

Please sign in to comment.