Skip to content

Commit

Permalink
Engine: Add the await_processes utility function
Browse files Browse the repository at this point in the history
The recent addition of the `wait` argument to the `submit` function
allows a user to submit process the daemon, while still have the
function block until the process is terminated, as a call to `run` would
do. This can be useful in interactive tutorials and demos where the code
should not avance until the process is done, but one still wants to
benefits of having the daemon run the process.

The downside of this approach is that it only allows to submit and wait
for a single process at a time. Here the `await_processes` function is
added. It takes a list of process nodes and will wait in a loop for all
of them reach a terminal state. The time between iterations can be
controlled by the `wait_interval` argument.
  • Loading branch information
sphuber committed Oct 27, 2023
1 parent 829b78e commit f91aa09
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 1 deletion.
1 change: 1 addition & 0 deletions aiida/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
'WorkChain',
'append_',
'assign_',
'await_processes',
'calcfunction',
'construct_awaitable',
'get_daemon_client',
Expand Down
25 changes: 24 additions & 1 deletion aiida/engine/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import typing as t

from aiida.common import InvalidOperation
from aiida.common.lang import type_check
from aiida.common.log import AIIDA_LOGGER
from aiida.manage import manager
from aiida.orm import ProcessNode
Expand All @@ -24,7 +25,7 @@
from .runners import ResultAndPk
from .utils import instantiate_process, is_process_scoped # pylint: disable=no-name-in-module

__all__ = ('run', 'run_get_pk', 'run_get_node', 'submit')
__all__ = ('run', 'run_get_pk', 'run_get_node', 'submit', 'await_processes')

TYPE_RUN_PROCESS = t.Union[Process, t.Type[Process], ProcessBuilder] # pylint: disable=invalid-name
# run can also be process function, but it is not clear what type this should be
Expand Down Expand Up @@ -130,6 +131,28 @@ def submit(process: TYPE_SUBMIT_PROCESS, wait: bool = False, wait_interval: int
return node


def await_processes(nodes: t.Sequence[ProcessNode], wait_interval: int = 1) -> None:
"""Run a loop until all processes are terminated.
:param nodes: Sequence of nodes that represent the processes to await.
:param wait_interval: The interval between each iteration of checking the status of all processes.
"""
type_check(nodes, (list, tuple))

if any(not isinstance(node, ProcessNode) for node in nodes):
raise TypeError(f'`nodes` should be a list of `ProcessNode`s but got: {nodes}')

start_time = time.time()
terminated = False

while not terminated:
running = [not node.is_terminated for node in nodes]
terminated = not any(running)
seconds_passed = time.time() - start_time
LOGGER.report(f'{running.count(False)} out of {len(nodes)} processes terminated. [{round(seconds_passed)} s]')
time.sleep(wait_interval)


# Allow one to also use run.get_node and run.get_pk as a shortcut, without having to import the functions themselves
run.get_node = run_get_node # type: ignore[attr-defined]
run.get_pk = run_get_pk # type: ignore[attr-defined]
17 changes: 17 additions & 0 deletions docs/source/topics/processes/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,23 @@ The function will submit the calculation to the daemon and immediately return co
This can be useful for tutorials and demos in interactive notebooks where the user should not continue before the process is done.
One could of course also use ``run`` (see below), but then the process would be lost if the interpreter gets accidentally shut down.
By using ``submit``, the process is run by the daemon which takes care of saving checkpoints so it can always be restarted in case of problems.
If you need to launch multiple processes in parallel and want to wait for all of them to be finished, simply use ``submit`` with the default ``wait=False`` and collect the returned nodes in a list.
You can then pass them to :func:`aiida.engine.launch.await_processes` which will return once all processes have terminated:

.. code:: python
from aiida.engine import submit, await_processes
nodes = []
for i in range(5):
node = submit(...)
nodes.append(node)
await_processes(nodes, wait_interval=10)
The ``await_processes`` function will loop every ``wait_interval`` seconds and check whether all processes (represented by the ``ProcessNode`` in the ``nodes`` list) have terminated.


The ``run`` function is called identically:

Expand Down
29 changes: 29 additions & 0 deletions tests/engine/test_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,35 @@ def test_submit_wait(aiida_local_code_factory):
assert node.is_finished_ok, node.exit_code


def test_await_processes_invalid():
"""Test :func:`aiida.engine.launch.await_processes` for invalid inputs."""
with pytest.raises(TypeError):
launch.await_processes(None)

with pytest.raises(TypeError):
launch.await_processes([orm.Data()])

with pytest.raises(TypeError):
launch.await_processes(orm.ProcessNode())


@pytest.mark.usefixtures('started_daemon_client')
def test_await_processes(aiida_local_code_factory, caplog):
"""Test :func:`aiida.engine.launch.await_processes`."""
builder = ArithmeticAddCalculation.get_builder()
builder.code = aiida_local_code_factory('core.arithmetic.add', '/bin/bash')
builder.x = orm.Int(1)
builder.y = orm.Int(2)
builder.metadata = {'options': {'resources': {'num_machines': 1}}}
node = launch.submit(builder)

assert not node.is_terminated
launch.await_processes([node])
assert node.is_terminated
assert len(caplog.records) > 0
assert 'out of 1 processes terminated.' in caplog.records[0].message


@pytest.mark.requires_rmq
class TestLaunchers:
"""Class to test process launchers."""
Expand Down

0 comments on commit f91aa09

Please sign in to comment.