Skip to content

Commit

Permalink
Make process functions submittable (#4539)
Browse files Browse the repository at this point in the history
The limitation that process functions were not submittable, meaning they
could not be sent to a daemon worker but could only be run by the current
interpreter, was a historical one. Before the introduction of the system
of processes in v1.0, a `calcfunction` was nothing more than the
execution of a normal function. However, now, a process function creates
a `Process` instance in the background, just as any other process. This
means it can also be serialized and deserialized by a daemon worker.

Here we remove the limitation of process functions not being submittable
simply by removing the check. Note that there is no need to change the
implementation other than adding two attributes on the decorated function
that specify the corresponding process class and the method that allows
to recreate the instance from the serialized instance.

Co-authored-by: Sebastiaan Huber <mail@sphuber.net>
  • Loading branch information
unkcpz and sphuber authored Nov 8, 2020
1 parent e5c2d0e commit ac4c881
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 11 deletions.
71 changes: 68 additions & 3 deletions .ci/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from aiida.manage.caching import enable_caching
from aiida.orm import CalcJobNode, load_node, Int, Str, List, Dict, load_code
from aiida.plugins import CalculationFactory, WorkflowFactory
from aiida.workflows.arithmetic.add_multiply import add_multiply, add
from workchains import (
NestedWorkChain, DynamicNonDbInput, DynamicDbInput, DynamicMixedInput, ListEcho, CalcFunctionRunnerWorkChain,
WorkFunctionRunnerWorkChain, NestedInputNamespace, SerializeWorkChain, ArithmeticAddBaseWorkChain
Expand Down Expand Up @@ -72,6 +73,30 @@ def print_report(pk):
print(f'Note: the command failed, message: {exception}')


def validate_process_functions(expected_results):
"""Validate the calcfunction and workfunction."""
valid = True
for pk, expected_result in expected_results.items():
calc = load_node(pk)
if not calc.is_finished_ok:
print(f'Calc<{pk}> not finished ok: process_state<{calc.process_state}> exit_status<{calc.exit_status}>')
print_report(pk)
valid = False

try:
actual_result = calc.outputs.result
except exceptions.NotExistent:
print(f'Could not retrieve `result` output for process<{pk}>')
print_report(pk)
valid = False

if actual_result != expected_result:
print(f'* UNEXPECTED VALUE {actual_result} for calc pk={pk}: I expected {expected_result}')
valid = False

return valid


def validate_calculations(expected_results):
"""Validate the calculations."""
valid = True
Expand Down Expand Up @@ -194,6 +219,33 @@ def validate_cached(cached_calcs):
return valid


def launch_calcfunction(inputval):
"""Launch workfunction to the daemon"""
inputs = {
'x': Int(inputval),
'y': Int(inputval),
}
res = inputval + inputval
expected_result = Int(res)
process = submit(add, **inputs)
print(f'launched calcfunction {process.uuid}, pk={process.pk}')
return process, expected_result


def launch_workfunction(inputval):
"""Launch workfunction to the daemon"""
inputs = {
'x': Int(inputval),
'y': Int(inputval),
'z': Int(inputval),
}
res = (inputval + inputval) * inputval
expected_result = Int(res)
process = submit(add_multiply, **inputs)
print(f'launched workfunction {process.uuid}, pk={process.pk}')
return process, expected_result


def launch_calculation(code, counter, inputval):
"""
Launch calculations to the daemon through the Process layer
Expand Down Expand Up @@ -339,7 +391,8 @@ def run_multiply_add_workchain():

def main():
"""Launch a bunch of calculation jobs and workchains."""
# pylint: disable=too-many-locals,too-many-statements
# pylint: disable=too-many-locals,too-many-statements,too-many-branches
expected_results_process_functions = {}
expected_results_calculations = {}
expected_results_workchains = {}
code_doubler = load_code(CODENAME_DOUBLER)
Expand All @@ -356,6 +409,16 @@ def main():
print('Running the `MultiplyAddWorkChain`')
run_multiply_add_workchain()

# Submitting the calcfunction through the launchers
print('Submitting calcfunction to the daemon')
proc, expected_result = launch_calcfunction(inputval=1)
expected_results_process_functions[proc.pk] = expected_result

# Submitting the workfunction through the launchers
print('Submitting workfunction to the daemon')
proc, expected_result = launch_workfunction(inputval=1)
expected_results_process_functions[proc.pk] = expected_result

# Submitting the Calculations the new way directly through the launchers
print(f'Submitting {NUMBER_CALCULATIONS} calculations to the daemon')
for counter in range(1, NUMBER_CALCULATIONS + 1):
Expand Down Expand Up @@ -419,7 +482,8 @@ def main():

calculation_pks = sorted(expected_results_calculations.keys())
workchains_pks = sorted(expected_results_workchains.keys())
pks = calculation_pks + workchains_pks
process_functions_pks = sorted(expected_results_process_functions.keys())
pks = calculation_pks + workchains_pks + process_functions_pks

print('Wating for end of execution...')
start_time = time.time()
Expand Down Expand Up @@ -473,7 +537,8 @@ def main():

if (
validate_calculations(expected_results_calculations) and
validate_workchains(expected_results_workchains) and validate_cached(cached_calcs)
validate_workchains(expected_results_workchains) and validate_cached(cached_calcs) and
validate_process_functions(expected_results_process_functions)
):
print_daemon_log()
print('')
Expand Down
4 changes: 1 addition & 3 deletions aiida/engine/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from aiida.manage import manager
from .processes.functions import FunctionProcess
from .processes.process import Process
from .utils import is_process_function, is_process_scoped, instantiate_process
from .utils import is_process_scoped, instantiate_process

__all__ = ('run', 'run_get_pk', 'run_get_node', 'submit')

Expand Down Expand Up @@ -96,8 +96,6 @@ def submit(process, **inputs):
:return: the calculation node of the process
:rtype: :class:`aiida.orm.ProcessNode`
"""
assert not is_process_function(process), 'Cannot submit a process function'

# Submitting from within another process requires `self.submit` unless it is a work function, in which case the
# current process in the scope should be an instance of `FunctionProcess`
if is_process_scoped() and not isinstance(Process.current(), FunctionProcess):
Expand Down
2 changes: 2 additions & 0 deletions aiida/engine/processes/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ def decorated_function(*args, **kwargs):
decorated_function.run_get_node = run_get_node
decorated_function.is_process_function = True
decorated_function.node_class = node_class
decorated_function.process_class = process_class
decorated_function.recreate_from = process_class.recreate_from
decorated_function.spec = process_class.spec

return decorated_function
Expand Down
2 changes: 2 additions & 0 deletions aiida/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ def instantiate_process(runner, process, *args, **inputs):
builder = process
process_class = builder.process_class
inputs.update(**builder._inputs(prune=True)) # pylint: disable=protected-access
elif is_process_function(process):
process_class = process.process_class
elif issubclass(process, Process):
process_class = process
else:
Expand Down
1 change: 1 addition & 0 deletions docs/source/howto/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ However, be careful, if you make these intervals too short, the daemon workers m
An additional note of importance is that each interval is guaranteed to be respected per daemon worker individually, but not as a collective.
That is to say, if the safe interval is set to 60 seconds, any single worker is guaranteed to open a connection to that machine at most once every minute, however, if you have multiple active daemon workers, the machine may be accessed more than once per minute.

.. _how-to:faq:process-not-importable-daemon:

Why would a process that runs fine locally raise an exception when submitted to the daemon?
===========================================================================================
Expand Down
1 change: 1 addition & 0 deletions docs/source/topics/processes/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ When a process is 'submitted', an instance of the ``Process`` is created, along
This is called a 'process checkpoint', more information on which :ref:`will follow later<topics:processes:concepts:checkpoints>`.
Subsequently, the process instance is shut down and a 'continuation task' is sent to the process queue of RabbitMQ.
This task is simply a small message that just contains an identifier for the process.
In order to reconstruct the process from a `checkpoint`, the process needs to be importable in the daemon environment by a) giving it an :ref:`associated entry point<how-to:plugin-codes:entry-points>` or b) :ref:`including its module path<how-to:faq:process-not-importable-daemon>` in the ``PYTHONPATH`` that the daemon workers will have.

All the daemon runners, when they are launched, subscribe to the process queue and RabbitMQ will distribute the continuation tasks to them as they come in, making sure that each task is only sent to one runner at a time.
The receiving daemon runner can restore the process instance in memory from the checkpoint that was stored in the database and continue the execution.
Expand Down
6 changes: 3 additions & 3 deletions docs/source/topics/processes/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ For example, when we want to run an instance of the :py:class:`~aiida.calculatio
The function will submit the calculation to the daemon and immediately return control to the interpreter, returning the node that is used to represent the process in the provenance graph.

.. warning::
Process functions, i.e. python functions decorated with the ``calcfunction`` or ``workfunction`` decorators, **cannot be submitted** but can only be run.
For a process to be submittable, the class or function needs to be importable in the daemon environment by a) giving it an :ref:`associated entry point<how-to:plugin-codes:entry-points>` or b) :ref:`including its module path<how-to:faq:process-not-importable-daemon>` in the ``PYTHONPATH`` that the daemon workers will have.

The ``run`` function is called identically:

Expand All @@ -356,8 +356,8 @@ The examples used above would look like the following:
.. include:: include/snippets/launch/launch_submit_dictionary.py
:code: python

Process functions, i.e. :ref:`calculation functions<topics:calculations:concepts:calcfunctions>` and :ref:`work functions<topics:workflows:concepts:workfunctions>`, can be launched like any other process as explained above, with the only exception that they **cannot be submitted**.
In addition to this limitation, process functions have two additional methods of being launched:
Process functions, i.e. :ref:`calculation functions<topics:calculations:concepts:calcfunctions>` and :ref:`work functions<topics:workflows:concepts:workfunctions>`, can be launched like any other process as explained above.
Process functions have two additional methods of being launched:

* Simply *calling* the function
* Using the internal run method attributes
Expand Down
7 changes: 5 additions & 2 deletions tests/engine/test_process_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from aiida.backends.testbase import AiidaTestCase
from aiida.engine import run, run_get_node, submit, calcfunction, workfunction, Process, ExitCode
from aiida.orm.nodes.data.bool import get_true_node
from aiida.workflows.arithmetic.add_multiply import add_multiply

DEFAULT_INT = 256
DEFAULT_LABEL = 'Default label'
Expand Down Expand Up @@ -350,8 +351,10 @@ def test_launchers(self):
self.assertEqual(result, get_true_node())
self.assertTrue(isinstance(node, orm.CalcFunctionNode))

with self.assertRaises(AssertionError):
submit(self.function_return_true)
# Process function can be submitted and will be run by a daemon worker as long as the function is importable
# Note that the actual running is not tested here but is done so in `.ci/test_daemon.py`.
node = submit(add_multiply, x=orm.Int(1), y=orm.Int(2), z=orm.Int(3))
assert isinstance(node, orm.WorkFunctionNode)

def test_return_exit_code(self):
"""
Expand Down

0 comments on commit ac4c881

Please sign in to comment.