Skip to content

Commit

Permalink
Engine: Do not let DuplicateSubcriberError except a Process
Browse files Browse the repository at this point in the history
It is possible that when a daemon worker tries to continue a process,
that a ``kiwipy.DuplicateSubscriberError`` is raised. This happens when
the current worker has already subscribed itself with this process
identifier. The call to ``_continue`` will call ``Process.init`` which
will add RPC and broadcast subscribers. ``kiwipy`` and ``aiormq`` further
down keep track of processes that are already subscribed and if
subscribed again, a ``DuplicateSubscriberIdentifier`` is raised.

Possible reasons for the worker receiving a process task that it already
has, include:

  1. The user mistakenly recreates the corresponding task, thinking the
     original task was lost.
  2. RabbitMQ requeues the task because the daemon worker lost its
     connection or did not respond to the heartbeat in time, and the
     task is sent to the same worker once it regains connection.

Here we assume that the existence of another subscriber indicates that
the process is still being run by this worker. We thus ignore the request
to have the worker take it on again and acknowledge the current task.

If our assumption was wrong and the original task was no longer being
worked on, the user can resubmit the task once the list of subscribers of
the process has been cleared. Note: In the second case we are deleting
the *original* task, and once the worker finishes running the process
there won't be a task in RabbitMQ to acknowledge anymore.
This, however, is silently ignored.

Note: the exception is raised by ``kiwipy`` based on an internal cache it
and ``aiormq`` keep of the current subscribers. This means that this will
only occur when the tasks is resent to the *same* daemon worker. If
another worker were to receive it, no exception would be raised as the
check is client and not server based.
  • Loading branch information
sphuber committed Oct 27, 2022
1 parent dc1c0a5 commit 1d840bc
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 1 deletion.
29 changes: 28 additions & 1 deletion aiida/manage/external/rmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging
import traceback

from kiwipy import Future, communications
from kiwipy import DuplicateSubscriberIdentifier, Future, communications
import pamqp.encode
import plumpy

Expand Down Expand Up @@ -212,6 +212,33 @@ async def _continue(self, communicator, pid, nowait, tag=None):
message = 'the class of the process could not be imported.'
self.handle_continue_exception(node, exception, message)
raise
except DuplicateSubscriberIdentifier:
# This happens when the current worker has already subscribed itself with this process identifier. The call
# to ``_continue`` will call ``Process.init`` which will add RPC and broadcast subscribers. ``kiwipy`` and
# ``aiormq`` further down keep track of processes that are already subscribed and if subscribed again, a
# ``DuplicateSubscriberIdentifier`` is raised. Possible reasons for the worker receiving a process task that
# it already has, include:
#
# 1. The user mistakenly recreates the corresponding task, thinking the original task was lost.
# 2. RabbitMQ requeues the task because the daemon worker lost its connection or did not respond to the
# heartbeat in time, and the task is sent to the same worker once it regains connection.
#
# Here we assume that the existence of another subscriber indicates that the process is still being run by
# this worker. We thus ignore the request to have the worker take it on again and acknowledge the current
# task (`return False`). If our assumption was wrong and the original task was no longer being worked on,
# the user can resubmit the task once the list of subscribers of the process has been cleared. Note: In the
# second case we are deleting the *original* task, and once the worker finishes running the process there
# won't be a task in RabbitMQ to acknowledge anymore. This, however, is silently ignored.
#
# Note: the exception is raised by ``kiwipy`` based on an internal cache it and ``aiormq`` keep of the
# current subscribers. This means that this will only occur when the tasks is resent to the *same* daemon
# worker. If another worker were to receive it, no exception would be raised as the check is client and not
# server based.
LOGGER.exception(
'A subscriber with the process id<%d> already exists, which most likely means this worker is already '
'working on it and this task was sent as a duplicate by mistake. Deleting the task now.', pid
)
return False
except asyncio.CancelledError: # pylint: disable=try-except-raise
# note this is only required in python<=3.7,
# where asyncio.CancelledError inherits from Exception
Expand Down
52 changes: 52 additions & 0 deletions tests/manage/external/test_rmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
# For further information please visit http://www.aiida.net #
###########################################################################
"""Tests for the `aiida.manage.external.rmq` module."""
import pathlib

from kiwipy.rmq import RmqThreadCommunicator
import pytest

from aiida.engine.processes import ProcessState, control
from aiida.manage.external import rmq
from aiida.orm import Int


@pytest.mark.parametrize(('args', 'kwargs', 'expected'), (
Expand Down Expand Up @@ -57,3 +61,51 @@ def test_add_rpc_subscriber(communicator):
def test_add_broadcast_subscriber(communicator):
"""Test ``add_broadcast_subscriber``."""
communicator.add_broadcast_subscriber(None)


@pytest.mark.requires_rmq
def test_duplicate_subscriber_identifier(aiida_local_code_factory, started_daemon_client, submit_and_await):
"""Test that a ``DuplicateSubscriberError`` in ``ProcessLauncher._continue`` does not except the process.
It is possible that when a daemon worker tries to continue a process, that a ``kiwipy.DuplicateSubscriberError`` is
raised, which means that it already subscribed itself to be running that process.
This can occur for at least two reasons:
* The user manually recreated the process task, mistakenly thinking it had been lost
* RabbitMQ requeues the task because the daemon worker lost its connection or did not respond to the
heartbeat in time, and the task is sent to the same worker once it regains connection.
In both cases, the actual task is still actually being run and we should not let this exception except the entire
process. Unfortunately, these duplicate tasks still happen quite a lot when the daemon workers are under heavy load
and we don't want a bunch of processes to be excepted because of this.
In most cases, just ignoring the task wil be the best solution. In the end, the original task is likely to complete.
If it turns out that the task actually got lost and the process is stuck, the user can find this error message in
the logs and manually recreate the task, using for example ``verdi devel revive``.
Note that this exception is only raised within a single worker, i.e., if another worker subscribes to the same
process, that should not incur this inception and that is not what we are testing here. This test should therefore
be ran with a single daemon worker.
"""
code = aiida_local_code_factory(entry_point='core.arithmetic.add', executable='/bin/bash')

builder = code.get_builder()
builder.x = Int(1)
builder.y = Int(1)
builder.metadata.options.sleep = 2 # Add a sleep to give time to send duplicate task before it finishing

# Submit the process to the daemon and wait for it to be picked up (signalled by it going in waiting state).
node = submit_and_await(builder, ProcessState.WAITING)

# Recreate process task causing the daemon to pick it up again and incurring the ``DuplicateSubscriberError``
control.revive_processes([node], wait=True)

# Wait for the original node to be finished
submit_and_await(node, ProcessState.FINISHED)

# The original node should now have finished normally and not excepted
assert node.is_finished_ok, (node.process_state, node.exit_status)

# Verify that the receiving of the duplicate task was logged by the daemon
daemon_log = pathlib.Path(started_daemon_client.daemon_log_file).read_text(encoding='utf-8')
assert f'Error: A subscriber with the process id<{node.pk}> already exists' in daemon_log

0 comments on commit 1d840bc

Please sign in to comment.