Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Engine: Do not let DuplicateSubcriberError except a Process #5715

Merged
merged 2 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion aiida/manage/external/rmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging
import traceback

from kiwipy import Future, communications
from kiwipy import DuplicateSubscriberIdentifier, Future, communications
import pamqp.encode
import plumpy

Expand Down Expand Up @@ -212,6 +212,33 @@ async def _continue(self, communicator, pid, nowait, tag=None):
message = 'the class of the process could not be imported.'
self.handle_continue_exception(node, exception, message)
raise
except DuplicateSubscriberIdentifier:
# This happens when the current worker has already subscribed itself with this process identifier. The call
# to ``_continue`` will call ``Process.init`` which will add RPC and broadcast subscribers. ``kiwipy`` and
# ``aiormq`` further down keep track of processes that are already subscribed and if subscribed again, a
# ``DuplicateSubscriberIdentifier`` is raised. Possible reasons for the worker receiving a process task that
# it already has, include:
#
# 1. The user mistakenly recreates the corresponding task, thinking the original task was lost.
# 2. RabbitMQ requeues the task because the daemon worker lost its connection or did not respond to the
# heartbeat in time, and the task is sent to the same worker once it regains connection.
#
# Here we assume that the existence of another subscriber indicates that the process is still being run by
# this worker. We thus ignore the request to have the worker take it on again and acknowledge the current
# task (`return False`). If our assumption was wrong and the original task was no longer being worked on,
# the user can resubmit the task once the list of subscribers of the process has been cleared. Note: In the
# second case we are deleting the *original* task, and once the worker finishes running the process there
# won't be a task in RabbitMQ to acknowledge anymore. This, however, is silently ignored.
#
# Note: the exception is raised by ``kiwipy`` based on an internal cache it and ``aiormq`` keep of the
# current subscribers. This means that this will only occur when the tasks is resent to the *same* daemon
# worker. If another worker were to receive it, no exception would be raised as the check is client and not
# server based.
LOGGER.exception(
'A subscriber with the process id<%d> already exists, which most likely means this worker is already '
'working on it and this task was sent as a duplicate by mistake. Deleting the task now.', pid
)
return False
except asyncio.CancelledError: # pylint: disable=try-except-raise
# note this is only required in python<=3.7,
# where asyncio.CancelledError inherits from Exception
Expand Down
2 changes: 1 addition & 1 deletion aiida/manage/tests/pytest_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def get_code(entry_point, executable, computer=aiida_localhost, label=None, prep
return get_code


@pytest.fixture(scope='session')
@pytest.fixture()
def daemon_client(aiida_profile):
"""Return a daemon client for the configured test profile for the test session.

Expand Down
52 changes: 52 additions & 0 deletions tests/manage/external/test_rmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
# For further information please visit http://www.aiida.net #
###########################################################################
"""Tests for the `aiida.manage.external.rmq` module."""
import pathlib

from kiwipy.rmq import RmqThreadCommunicator
import pytest

from aiida.engine.processes import ProcessState, control
from aiida.manage.external import rmq
from aiida.orm import Int


@pytest.mark.parametrize(('args', 'kwargs', 'expected'), (
Expand Down Expand Up @@ -57,3 +61,51 @@ def test_add_rpc_subscriber(communicator):
def test_add_broadcast_subscriber(communicator):
"""Test ``add_broadcast_subscriber``."""
communicator.add_broadcast_subscriber(None)


@pytest.mark.requires_rmq
def test_duplicate_subscriber_identifier(aiida_local_code_factory, started_daemon_client, submit_and_await):
"""Test that a ``DuplicateSubscriberError`` in ``ProcessLauncher._continue`` does not except the process.

It is possible that when a daemon worker tries to continue a process, that a ``kiwipy.DuplicateSubscriberError`` is
raised, which means that it already subscribed itself to be running that process.
This can occur for at least two reasons:

* The user manually recreated the process task, mistakenly thinking it had been lost
* RabbitMQ requeues the task because the daemon worker lost its connection or did not respond to the
heartbeat in time, and the task is sent to the same worker once it regains connection.

In both cases, the actual task is still actually being run and we should not let this exception except the entire
process. Unfortunately, these duplicate tasks still happen quite a lot when the daemon workers are under heavy load
and we don't want a bunch of processes to be excepted because of this.

In most cases, just ignoring the task wil be the best solution. In the end, the original task is likely to complete.
If it turns out that the task actually got lost and the process is stuck, the user can find this error message in
the logs and manually recreate the task, using for example ``verdi devel revive``.

Note that this exception is only raised within a single worker, i.e., if another worker subscribes to the same
process, that should not incur this inception and that is not what we are testing here. This test should therefore
be ran with a single daemon worker.
"""
code = aiida_local_code_factory(entry_point='core.arithmetic.add', executable='/bin/bash')

builder = code.get_builder()
builder.x = Int(1)
builder.y = Int(1)
builder.metadata.options.sleep = 2 # Add a sleep to give time to send duplicate task before it finishing

# Submit the process to the daemon and wait for it to be picked up (signalled by it going in waiting state).
node = submit_and_await(builder, ProcessState.WAITING)

# Recreate process task causing the daemon to pick it up again and incurring the ``DuplicateSubscriberError``
control.revive_processes([node], wait=True)

# Wait for the original node to be finished
submit_and_await(node, ProcessState.FINISHED)

# The original node should now have finished normally and not excepted
assert node.is_finished_ok, (node.process_state, node.exit_status)

# Verify that the receiving of the duplicate task was logged by the daemon
daemon_log = pathlib.Path(started_daemon_client.daemon_log_file).read_text(encoding='utf-8')
assert f'Error: A subscriber with the process id<{node.pk}> already exists' in daemon_log