From 1d840bcac1f86f934186af5a27c0d2bcd83c45bb Mon Sep 17 00:00:00 2001 From: Sebastiaan Huber Date: Sat, 22 Oct 2022 09:54:41 +0200 Subject: [PATCH] Engine: Do not let `DuplicateSubcriberError` except a `Process` It is possible that when a daemon worker tries to continue a process, that a ``kiwipy.DuplicateSubscriberError`` is raised. This happens when the current worker has already subscribed itself with this process identifier. The call to ``_continue`` will call ``Process.init`` which will add RPC and broadcast subscribers. ``kiwipy`` and ``aiormq`` further down keep track of processes that are already subscribed and if subscribed again, a ``DuplicateSubscriberIdentifier`` is raised. Possible reasons for the worker receiving a process task that it already has, include: 1. The user mistakenly recreates the corresponding task, thinking the original task was lost. 2. RabbitMQ requeues the task because the daemon worker lost its connection or did not respond to the heartbeat in time, and the task is sent to the same worker once it regains connection. Here we assume that the existence of another subscriber indicates that the process is still being run by this worker. We thus ignore the request to have the worker take it on again and acknowledge the current task. If our assumption was wrong and the original task was no longer being worked on, the user can resubmit the task once the list of subscribers of the process has been cleared. Note: In the second case we are deleting the *original* task, and once the worker finishes running the process there won't be a task in RabbitMQ to acknowledge anymore. This, however, is silently ignored. Note: the exception is raised by ``kiwipy`` based on an internal cache it and ``aiormq`` keep of the current subscribers. This means that this will only occur when the tasks is resent to the *same* daemon worker. If another worker were to receive it, no exception would be raised as the check is client and not server based. --- aiida/manage/external/rmq.py | 29 ++++++++++++++++- tests/manage/external/test_rmq.py | 52 +++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/aiida/manage/external/rmq.py b/aiida/manage/external/rmq.py index 28dc771a2e..b2f6c45e18 100644 --- a/aiida/manage/external/rmq.py +++ b/aiida/manage/external/rmq.py @@ -14,7 +14,7 @@ import logging import traceback -from kiwipy import Future, communications +from kiwipy import DuplicateSubscriberIdentifier, Future, communications import pamqp.encode import plumpy @@ -212,6 +212,33 @@ async def _continue(self, communicator, pid, nowait, tag=None): message = 'the class of the process could not be imported.' self.handle_continue_exception(node, exception, message) raise + except DuplicateSubscriberIdentifier: + # This happens when the current worker has already subscribed itself with this process identifier. The call + # to ``_continue`` will call ``Process.init`` which will add RPC and broadcast subscribers. ``kiwipy`` and + # ``aiormq`` further down keep track of processes that are already subscribed and if subscribed again, a + # ``DuplicateSubscriberIdentifier`` is raised. Possible reasons for the worker receiving a process task that + # it already has, include: + # + # 1. The user mistakenly recreates the corresponding task, thinking the original task was lost. + # 2. RabbitMQ requeues the task because the daemon worker lost its connection or did not respond to the + # heartbeat in time, and the task is sent to the same worker once it regains connection. + # + # Here we assume that the existence of another subscriber indicates that the process is still being run by + # this worker. We thus ignore the request to have the worker take it on again and acknowledge the current + # task (`return False`). If our assumption was wrong and the original task was no longer being worked on, + # the user can resubmit the task once the list of subscribers of the process has been cleared. Note: In the + # second case we are deleting the *original* task, and once the worker finishes running the process there + # won't be a task in RabbitMQ to acknowledge anymore. This, however, is silently ignored. + # + # Note: the exception is raised by ``kiwipy`` based on an internal cache it and ``aiormq`` keep of the + # current subscribers. This means that this will only occur when the tasks is resent to the *same* daemon + # worker. If another worker were to receive it, no exception would be raised as the check is client and not + # server based. + LOGGER.exception( + 'A subscriber with the process id<%d> already exists, which most likely means this worker is already ' + 'working on it and this task was sent as a duplicate by mistake. Deleting the task now.', pid + ) + return False except asyncio.CancelledError: # pylint: disable=try-except-raise # note this is only required in python<=3.7, # where asyncio.CancelledError inherits from Exception diff --git a/tests/manage/external/test_rmq.py b/tests/manage/external/test_rmq.py index 940f4f5681..7c9d1ef4fe 100644 --- a/tests/manage/external/test_rmq.py +++ b/tests/manage/external/test_rmq.py @@ -8,10 +8,14 @@ # For further information please visit http://www.aiida.net # ########################################################################### """Tests for the `aiida.manage.external.rmq` module.""" +import pathlib + from kiwipy.rmq import RmqThreadCommunicator import pytest +from aiida.engine.processes import ProcessState, control from aiida.manage.external import rmq +from aiida.orm import Int @pytest.mark.parametrize(('args', 'kwargs', 'expected'), ( @@ -57,3 +61,51 @@ def test_add_rpc_subscriber(communicator): def test_add_broadcast_subscriber(communicator): """Test ``add_broadcast_subscriber``.""" communicator.add_broadcast_subscriber(None) + + +@pytest.mark.requires_rmq +def test_duplicate_subscriber_identifier(aiida_local_code_factory, started_daemon_client, submit_and_await): + """Test that a ``DuplicateSubscriberError`` in ``ProcessLauncher._continue`` does not except the process. + + It is possible that when a daemon worker tries to continue a process, that a ``kiwipy.DuplicateSubscriberError`` is + raised, which means that it already subscribed itself to be running that process. + This can occur for at least two reasons: + + * The user manually recreated the process task, mistakenly thinking it had been lost + * RabbitMQ requeues the task because the daemon worker lost its connection or did not respond to the + heartbeat in time, and the task is sent to the same worker once it regains connection. + + In both cases, the actual task is still actually being run and we should not let this exception except the entire + process. Unfortunately, these duplicate tasks still happen quite a lot when the daemon workers are under heavy load + and we don't want a bunch of processes to be excepted because of this. + + In most cases, just ignoring the task wil be the best solution. In the end, the original task is likely to complete. + If it turns out that the task actually got lost and the process is stuck, the user can find this error message in + the logs and manually recreate the task, using for example ``verdi devel revive``. + + Note that this exception is only raised within a single worker, i.e., if another worker subscribes to the same + process, that should not incur this inception and that is not what we are testing here. This test should therefore + be ran with a single daemon worker. + """ + code = aiida_local_code_factory(entry_point='core.arithmetic.add', executable='/bin/bash') + + builder = code.get_builder() + builder.x = Int(1) + builder.y = Int(1) + builder.metadata.options.sleep = 2 # Add a sleep to give time to send duplicate task before it finishing + + # Submit the process to the daemon and wait for it to be picked up (signalled by it going in waiting state). + node = submit_and_await(builder, ProcessState.WAITING) + + # Recreate process task causing the daemon to pick it up again and incurring the ``DuplicateSubscriberError`` + control.revive_processes([node], wait=True) + + # Wait for the original node to be finished + submit_and_await(node, ProcessState.FINISHED) + + # The original node should now have finished normally and not excepted + assert node.is_finished_ok, (node.process_state, node.exit_status) + + # Verify that the receiving of the duplicate task was logged by the daemon + daemon_log = pathlib.Path(started_daemon_client.daemon_log_file).read_text(encoding='utf-8') + assert f'Error: A subscriber with the process id<{node.pk}> already exists' in daemon_log