Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in process function interrupt handlers causing verdi daemon stop to timeout #2966

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion aiida/engine/daemon/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ def start_daemon():
configure_logging(daemon=True, daemon_log_file=daemon_client.daemon_log_file)

try:
runner = get_manager().create_daemon_runner()
manager = get_manager()
runner = manager.create_daemon_runner()
manager.set_runner(runner)
except Exception as exception:
LOGGER.exception('daemon runner failed to start')
raise
Expand Down
37 changes: 26 additions & 11 deletions aiida/engine/processes/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,16 @@ def run_get_node(*args, **kwargs):
Run the FunctionProcess with the supplied inputs in a local runner.

The function will have to create a new runner for the FunctionProcess instead of using the global runner,
because otherwise if this workfunction were to call another one from within its scope, that would use
because otherwise if this process function were to call another one from within its scope, that would use
the same runner and it would be blocking the event loop from continuing.

:param args: input arguments to construct the FunctionProcess
:param kwargs: input keyword arguments to construct the FunctionProcess
:return: tuple of the outputs of the process and the process node pk
:rtype: (dict, int)
"""
runner = get_manager().create_runner(with_persistence=False)
manager = get_manager()
runner = manager.create_runner(with_persistence=False)
inputs = process_class.create_inputs(*args, **kwargs)

# Remove all the known inputs from the kwargs
Expand All @@ -140,18 +141,32 @@ def run_get_node(*args, **kwargs):

process = process_class(inputs=inputs, runner=runner)

def kill_process(_num, _frame):
"""Send the kill signal to the process in the current scope."""
LOGGER.critical('runner received interrupt, killing process %s', process.pid)
process.kill(msg='Process was killed because the runner received an interrupt')
# Only add handlers for interrupt signal to kill the process if we are in a local and not a daemon runner.
# Without this check, running process functions in a daemon worker would be killed if the daemon is shutdown
current_runner = manager.get_runner()
original_handler = None
kill_signal = signal.SIGINT

signal.signal(signal.SIGINT, kill_process)
signal.signal(signal.SIGTERM, kill_process)
if not current_runner.is_daemon_runner:

result = process.execute()
def kill_process(_num, _frame):
"""Send the kill signal to the process in the current scope."""
from tornado import gen
LOGGER.critical('runner received interrupt, killing process %s', process.pid)
result = process.kill(msg='Process was killed because the runner received an interrupt')
raise gen.Return(result)

# Close the runner properly
runner.close()
# Store the current handler on the signal such that it can be restored after process has terminated
original_handler = signal.getsignal(kill_signal)
signal.signal(kill_signal, kill_process)

try:
result = process.execute()
finally:
# If the `original_handler` is set, that means the `kill_process` was bound, which needs to be reset
if original_handler:
signal.signal(signal.SIGINT, original_handler)
runner.close()

store_provenance = inputs.get('metadata', {}).get('store_provenance', True)
if not store_provenance:
Expand Down
2 changes: 1 addition & 1 deletion aiida/engine/processes/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def kill(self, msg=None):

if killing:
# We are waiting for things to be killed, so return the 'gathered' future
result = plumpy.gather(result)
result = plumpy.gather(killing)

return result

Expand Down
9 changes: 9 additions & 0 deletions aiida/engine/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ def job_manager(self):
def controller(self):
return self._controller

@property
def is_daemon_runner(self):
"""Return whether the runner is a daemon runner, which means it submits processes over RabbitMQ.

:return: True if the runner is a daemon runner
:rtype: bool
"""
return self._rmq_submit

def is_closed(self):
return self._closed

Expand Down