From 3c50f8c237576207b59cc90a455d90593cacca15 Mon Sep 17 00:00:00 2001 From: Martin Uhrin Date: Mon, 15 Jan 2018 11:03:54 +0100 Subject: [PATCH 1/8] [WIP] towards a blocking launch Problem is, this is inherently not supported in plum and cannot be achieved using a runner because it does not give us a future. Will require some more work. --- aiida/daemon/new.py | 1 - aiida/work/events.py | 7 ++++++ aiida/work/launch.py | 16 ++++++++++++- aiida/work/rmq.py | 53 ++++++++++++++++++++++++++++++++++++++++++- aiida/work/runners.py | 8 +++++-- examples/work/eos.py | 2 +- 6 files changed, 81 insertions(+), 6 deletions(-) create mode 100644 aiida/work/events.py diff --git a/aiida/daemon/new.py b/aiida/daemon/new.py index 26d779b2e9..440945f3a3 100644 --- a/aiida/daemon/new.py +++ b/aiida/daemon/new.py @@ -31,7 +31,6 @@ def tick_legacy_workflows(runner): logging.getLogger('').addHandler(console) logger = logging.getLogger(__name__) - logger.warning("TEST") config = get_profile_config(backends.settings.AIIDADB_PROFILE) diff --git a/aiida/work/events.py b/aiida/work/events.py new file mode 100644 index 0000000000..954ec188ec --- /dev/null +++ b/aiida/work/events.py @@ -0,0 +1,7 @@ +import plum + +new_event_loop = plum.new_event_loop + + +def run_until_complete(future, loop): + plum.run_until_complete(future, loop) diff --git a/aiida/work/launch.py b/aiida/work/launch.py index a07f3c1003..4fa2a2df30 100644 --- a/aiida/work/launch.py +++ b/aiida/work/launch.py @@ -10,6 +10,7 @@ from collections import namedtuple +from . import rmq from . import runners from . import utils @@ -17,10 +18,23 @@ RunningInfo = namedtuple("RunningInfo", ["type", "pid"]) +_rmq_control_panel = None + + +def get_rmq_control_panel(): + global _rmq_control_panel + if _rmq_control_panel is None: + _rmq_control_panel = rmq.BlockingProcessControlPanel('aiida') + return _rmq_control_panel + def submit(process_class, **inputs): assert not utils.is_workfunction(process_class), "Cannot submit a workfunction" - runner = runners.get_runner() + + submit_runner = runners.new_runner(rmq_submit=True, enable_persistence=True) + future = submit_runner.submit(process_class, **inputs) + return submit_runner.run_until_complete(future) + return runner.submit(process_class, **inputs) diff --git a/aiida/work/rmq.py b/aiida/work/rmq.py index 03407dcdc8..796e1b1b9a 100644 --- a/aiida/work/rmq.py +++ b/aiida/work/rmq.py @@ -1,10 +1,11 @@ import json import plum -import uuid from aiida.utils.serialize import serialize_data, deserialize_data from plum import rmq +from . import events + _MESSAGE_EXCHANGE = 'messages' _LAUNCH_QUEUE = 'process.queue' @@ -76,3 +77,53 @@ def request_status(self, pid): @property def launch(self): return self._launch + + +class BlockingProcessControlPanel(object): + """ + A blocking adapter for the ProcessControlPanel. + """ + class _Launcher(object): + def __init__(self, parent): + self._parent = parent + + def launch_process(self, process_class, init_args=None, init_kwargs=None): + future = self._parent._control_panel.launch.launch_process(process_class, init_args, init_kwargs) + return self._parent._run(future) + + def continue_process(self, pid, tag=None): + future = self._parent._control_panel.launch.continue_process(pid, tag) + return self._parent._run(future) + + def __init__(self, prefix, rmq_config=None, testing_mode=False): + if rmq_config is None: + rmq_config = { + 'url': 'amqp://localhost', + 'prefix': 'aiida', + } + + self._loop = events.new_event_loop() + self._connector = plum.rmq.RmqConnector(amqp_url=rmq_config['url'], loop=self._loop) + self._control_panel = ProcessControlPanel(prefix, self._connector, testing_mode) + self.launch = self._Launcher(self) + + self._connector.connect() + + def pause_process(self, pid): + future = self._control_panel.pause_process(pid) + return self._run(future) + + def play_process(self, pid): + future = self._control_panel.pause_process(pid) + return self._run(future) + + def kill_process(self, pid, msg=None): + future = self._control_panel.kill_process(pid, msg) + return self._run(future) + + def request_status(self, pid): + future = self._control_panel.request_status(pid) + return self._run(future) + + def _run(self, future): + return events.run_until_complete(future, self._loop) diff --git a/aiida/work/runners.py b/aiida/work/runners.py index 85d85af95a..d2da0c62a5 100644 --- a/aiida/work/runners.py +++ b/aiida/work/runners.py @@ -6,6 +6,7 @@ import logging import aiida.orm +from . import events from . import persistence from . import rmq from . import transports @@ -21,7 +22,6 @@ _runner = None - def get_runner(): global _runner if _runner is None: @@ -51,6 +51,10 @@ def new_daemon_runner(rmq_prefix='aiida', rmq_create_connection=None): return runner +def create_connector(rmq_config): + return plum.rmq.RmqConnector(amqp_url=rmq_config['url'], loop=self._loop) + + def convert_to_inputs(workfunction, *args, **kwargs): """ """ @@ -105,7 +109,7 @@ class Runner(object): def __init__(self, rmq_config=None, loop=None, poll_interval=5., rmq_submit=False, enable_persistence=True, persister=None): - self._loop = loop if loop is not None else plum.new_event_loop() + self._loop = loop if loop is not None else events.new_event_loop() self._poll_interval = poll_interval self._transport = transports.TransportQueue(self._loop) diff --git a/examples/work/eos.py b/examples/work/eos.py index 981a05ec69..c6a51ba596 100644 --- a/examples/work/eos.py +++ b/examples/work/eos.py @@ -99,7 +99,7 @@ def print_result(self): codename = 'pw.x@localhost' pseudo_family_name = 'SSSP_eff_PBE_0.7' -result = work.run(EquationOfState, +result = work.submit(EquationOfState, structure=structure, codename=Str(codename), pseudo_family=Str(pseudo_family_name) From 63388baca0002d6def9e0caa558fb99273fe457b Mon Sep 17 00:00:00 2001 From: Martin Uhrin Date: Mon, 15 Jan 2018 14:14:25 +0100 Subject: [PATCH 2/8] Changed return link to [return] and added called_by --- aiida/orm/implementation/general/calculation/__init__.py | 9 ++++++++- aiida/work/processes.py | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/aiida/orm/implementation/general/calculation/__init__.py b/aiida/orm/implementation/general/calculation/__init__.py index b90b2cee65..f9013c933e 100644 --- a/aiida/orm/implementation/general/calculation/__init__.py +++ b/aiida/orm/implementation/general/calculation/__init__.py @@ -14,7 +14,6 @@ from aiida.orm.mixins import SealableWithUpdatableAttributes - def _parse_single_arg(function_name, additional_parameter, args, kwargs): """ @@ -219,6 +218,14 @@ def __call__(self, parent_node, *args, **kwargs): def called(self): return self.get_outputs(link_type=LinkType.CALL) + @property + def called_by(self): + called_by = self.get_inputs(link_type=LinkType.CALL) + if called_by: + return called_by[0] + else: + return None + def get_linkname(self, link, *args, **kwargs): """ Return the linkname used for a given input link diff --git a/aiida/work/processes.py b/aiida/work/processes.py index bf8aa12469..8f9f081c2d 100644 --- a/aiida/work/processes.py +++ b/aiida/work/processes.py @@ -148,7 +148,7 @@ class Process(plum.process.Process): """ __metaclass__ = abc.ABCMeta - SINGLE_RETURN_LINKNAME = '[result]' + SINGLE_RETURN_LINKNAME = '[return]' # This is used for saving node pks in the saved instance state NODE_TYPE = uuid.UUID('5cac9bab-6f46-485b-9e81-d6a666cfdc1b') From efdd9e7f6733ed406988faf75cf2186d523ded8c Mon Sep 17 00:00:00 2001 From: Martin Uhrin Date: Mon, 15 Jan 2018 15:01:03 +0100 Subject: [PATCH 3/8] Further work to get launch working --- aiida/work/launch.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/aiida/work/launch.py b/aiida/work/launch.py index 4fa2a2df30..f33b567490 100644 --- a/aiida/work/launch.py +++ b/aiida/work/launch.py @@ -10,6 +10,7 @@ from collections import namedtuple +from . import persistence from . import rmq from . import runners from . import utils @@ -18,6 +19,7 @@ RunningInfo = namedtuple("RunningInfo", ["type", "pid"]) +_persister = None _rmq_control_panel = None @@ -28,14 +30,20 @@ def get_rmq_control_panel(): return _rmq_control_panel +def _get_persister(): + global _persister + if _persister is None: + _persister = persistence.AiiDAPersister() + return _persister + + def submit(process_class, **inputs): assert not utils.is_workfunction(process_class), "Cannot submit a workfunction" - submit_runner = runners.new_runner(rmq_submit=True, enable_persistence=True) - future = submit_runner.submit(process_class, **inputs) - return submit_runner.run_until_complete(future) - - return runner.submit(process_class, **inputs) + process = runners._create_process(process_class, None, input_kwargs=inputs) + _get_persister().save_checkpoint(process) + get_rmq_control_panel().launch.continue_process(process.pid) + return process.calc def run(process, *args, **inputs): From 551e8ff35a53132bd4ea48b431afb7efb3970f92 Mon Sep 17 00:00:00 2001 From: Martin Uhrin Date: Tue, 16 Jan 2018 11:36:54 +0100 Subject: [PATCH 4/8] Reorganised and generalised print of tree The ASCII visualisation of trees and call graphs has now been abstracted and placed in a util function. The work command calls through. --- aiida/cmdline/commands/work.py | 51 ++++++++++++++++--- aiida/utils/ascii_vis.py | 89 ++++++++++++++++++++++++++++++++-- aiida/work/runners.py | 2 +- 3 files changed, 132 insertions(+), 10 deletions(-) diff --git a/aiida/cmdline/commands/work.py b/aiida/cmdline/commands/work.py index d24176aaec..4c1f388ce1 100644 --- a/aiida/cmdline/commands/work.py +++ b/aiida/cmdline/commands/work.py @@ -7,13 +7,15 @@ # For further information on the license, see the LICENSE.txt file # # For further information please visit http://www.aiida.net # ########################################################################### -import click -from functools import partial import logging +from functools import partial + +import click from tabulate import tabulate -from aiida.cmdline.commands import work, verdi from aiida.cmdline.baseclass import VerdiCommandWithSubcommands +from aiida.cmdline.commands import work, verdi +from aiida.utils.ascii_vis import print_tree_descending CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) LIST_CMDLINE_PROJECT_CHOICES = ['id', 'ctime', 'label', 'uuid', 'descr', 'mtime', 'state', 'sealed'] @@ -179,7 +181,6 @@ def report(pk, levelname, order_by, indent_size, max_depth): import itertools from aiida.orm.backend import construct - from aiida.orm.log import OrderSpecifier, ASCENDING, DESCENDING from aiida.orm.querybuilder import QueryBuilder from aiida.orm.calculation.work import WorkCalculation @@ -341,7 +342,7 @@ def kill(pks): futures = [] for pk in pks: future = runner.rmq.kill_process(pk) - future.add_done_callback(partial(_action_done, "pause", pk)) + future.add_done_callback(partial(_action_done, "kill", pk)) futures.append(future) runner.run_until_complete(plum.gather(*futures)) @@ -392,10 +393,48 @@ def _action_done(intent, pk, future): click.echo("{} {} OK".format(intent, pk)) +@work.command('status', context_settings=CONTEXT_SETTINGS) +@click.argument('pks', nargs=-1, type=int) +def status(pks): + from aiida import try_load_dbenv + try_load_dbenv() + import aiida.orm + from aiida.utils.ascii_vis import print_call_graph + + for pk in pks: + calc_node = aiida.orm.load_node(pk) + print_call_graph(calc_node) + # status_info = _create_status_info(calc_node) + # print_tree_descending(status_info) + + +def _create_status_info(calc_node): + status_line = _format_status_line(calc_node) + called = calc_node.called + if called: + return status_line, [_create_status_info(child) for child in called] + else: + return status_line + + +def _format_status_line(calc_node): + from aiida.orm.calculation.work import WorkCalculation + from aiida.orm.calculation.job import JobCalculation + + if isinstance(calc_node, WorkCalculation): + label = calc_node.get_attr('_process_label') + state = calc_node.get_attr('process_state') + elif isinstance(calc_node, JobCalculation): + label = type(calc_node).__name__ + state = str(calc_node.get_state()) + else: + raise TypeError("Unknown type") + return "{} [{}]".format(label, calc_node.pk, state) + + def _build_query(projections=None, order_by=None, limit=None, past_days=None): import datetime from aiida.utils import timezone - from aiida.orm.mixins import Sealable from aiida.orm.querybuilder import QueryBuilder from aiida.orm.calculation.work import WorkCalculation diff --git a/aiida/utils/ascii_vis.py b/aiida/utils/ascii_vis.py index ee8740e592..89baf52dcc 100644 --- a/aiida/utils/ascii_vis.py +++ b/aiida/utils/ascii_vis.py @@ -7,13 +7,15 @@ # For further information on the license, see the LICENSE.txt file # # For further information please visit http://www.aiida.net # ########################################################################### - from aiida.common.links import LinkType from ete3 import Tree +__all__ = ['draw_children', 'draw_parents', 'print_call_graph'] + + def draw_parents(node, node_label=None, show_pk=True, dist=2, - follow_links_of_type=None): + follow_links_of_type=None): """ Print an ASCII tree of the parents of the given node. @@ -77,7 +79,7 @@ def get_ascii_tree(node, node_label=None, show_pk=True, max_depth=1, :rtype: str """ tree_string = build_tree( - node, node_label, show_pk, max_depth, follow_links_of_type, descend + node, node_label, show_pk, max_depth, follow_links_of_type, descend ) t = Tree("({});".format(tree_string), format=1) return t.get_ascii(show_internal=True) @@ -157,3 +159,84 @@ def _generate_node_label(node, node_attr, show_pk): def _ctime(node): return node.ctime + + +def calc_info(calc_node): + from aiida.orm.calculation.work import WorkCalculation + from aiida.orm.calculation.job import JobCalculation + + if isinstance(calc_node, WorkCalculation): + label = calc_node.get_attr('_process_label') + state = calc_node.get_attr('process_state') + elif isinstance(calc_node, JobCalculation): + label = type(calc_node).__name__ + state = str(calc_node.get_state()) + else: + raise TypeError("Unknown type") + return "{} [{}]".format(label, calc_node.pk, state) + + +def print_call_graph(calc_node, info_fn=calc_info): + """ + Print a tree like the POSIX tree command for the calculation call graph + :param calc_node: The calculation node + :param info_fn: An optional function that takes the node and returns a string + of information to be displayed for each node. + """ + call_tree = build_call_graph(calc_node, info_fn=info_fn) + print_tree_descending(call_tree) + + +def build_call_graph(calc_node, info_fn=calc_info): + info_string = info_fn(calc_node) + called = calc_node.called + if called: + return info_string, [build_call_graph(child, info_fn) for child in called] + else: + return info_string + + +def format_tree_descending(tree, prefix=u"", pos=-1): + text = [] + + if isinstance(tree, tuple): + info = tree[0] + else: + info = tree + + if pos == -1: + pre = "" + elif pos == 0: + pre = u"{}{}".format(prefix, TREE_FIRST_ENTRY) + elif pos == 1: + pre = u"{}{}".format(prefix, TREE_MIDDLE_ENTRY) + else: + pre = u"{}{}".format(prefix, TREE_LAST_ENTRY) + text.append(u"{}{}".format(pre, info)) + + if isinstance(tree, tuple): + key, value = tree + num_entries = len(value) + if pos == -1 or pos == 2: + new_prefix = u"{} ".format(prefix) + else: + new_prefix = u"{}\u2502 ".format(prefix) + for i, entry in enumerate(value): + if i == num_entries - 1: + pos = 2 + elif i == 0: + pos = 0 + else: + pos = 1 + text.append(format_tree_descending(entry, new_prefix, pos)) + + return "\n".join(text) + + +def print_tree_descending(tree, prefix=u"", pos=-1): + print(format_tree_descending(tree, prefix, pos)) + + +TREE_MIDDLE_ENTRY = u"\u251C\u2500\u2500 " +TREE_FIRST_ENTRY = TREE_MIDDLE_ENTRY +TREE_LAST_ENTRY = u"\u2514\u2500\u2500 " diff --git a/aiida/work/runners.py b/aiida/work/runners.py index 85d85af95a..81f9df6089 100644 --- a/aiida/work/runners.py +++ b/aiida/work/runners.py @@ -103,7 +103,7 @@ class Runner(object): _rmq_connector = None _communicator = None - def __init__(self, rmq_config=None, loop=None, poll_interval=5., + def __init__(self, rmq_config=None, loop=None, poll_interval=0., rmq_submit=False, enable_persistence=True, persister=None): self._loop = loop if loop is not None else plum.new_event_loop() self._poll_interval = poll_interval From 314c4df1bd13061931d89f55baa3e3dfb0cb45e9 Mon Sep 17 00:00:00 2001 From: Martin Uhrin Date: Wed, 17 Jan 2018 10:44:19 +0100 Subject: [PATCH 5/8] Tidied up work commands --- aiida/cmdline/commands/work.py | 2 -- aiida/utils/ascii_vis.py | 6 +++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/aiida/cmdline/commands/work.py b/aiida/cmdline/commands/work.py index 4c1f388ce1..e53da630a4 100644 --- a/aiida/cmdline/commands/work.py +++ b/aiida/cmdline/commands/work.py @@ -404,8 +404,6 @@ def status(pks): for pk in pks: calc_node = aiida.orm.load_node(pk) print_call_graph(calc_node) - # status_info = _create_status_info(calc_node) - # print_tree_descending(status_info) def _create_status_info(calc_node): diff --git a/aiida/utils/ascii_vis.py b/aiida/utils/ascii_vis.py index 89baf52dcc..a9ca6fca12 100644 --- a/aiida/utils/ascii_vis.py +++ b/aiida/utils/ascii_vis.py @@ -173,7 +173,7 @@ def calc_info(calc_node): state = str(calc_node.get_state()) else: raise TypeError("Unknown type") - return "{} [{}]".format(label, calc_node.pk, state) + return u"{} [{}]".format(label, calc_node.pk, state) def print_call_graph(calc_node, info_fn=calc_info): @@ -205,7 +205,7 @@ def format_tree_descending(tree, prefix=u"", pos=-1): info = tree if pos == -1: - pre = "" + pre = u"" elif pos == 0: pre = u"{}{}".format(prefix, TREE_FIRST_ENTRY) elif pos == 1: @@ -230,7 +230,7 @@ def format_tree_descending(tree, prefix=u"", pos=-1): pos = 1 text.append(format_tree_descending(entry, new_prefix, pos)) - return "\n".join(text) + return u"\n".join(text) def print_tree_descending(tree, prefix=u"", pos=-1): From 51d36ace1443437064381dc194e90ff0fd1ac01d Mon Sep 17 00:00:00 2001 From: Martin Uhrin Date: Thu, 18 Jan 2018 18:03:50 +0100 Subject: [PATCH 6/8] Tore out old daemon and fixed changed verdi work play/pause/kill Got rid of 'old new' daemon code. Changed play/pause/kill to not need a runner - doesn't make sense. Now these use the blocking control panel. --- aiida/backends/tests/__init__.py | 2 +- aiida/backends/tests/work/daemon.py | 185 +------------------------- aiida/backends/tests/work/test_rmq.py | 15 +-- aiida/cmdline/commands/work.py | 55 ++++---- aiida/daemon/new.py | 4 +- aiida/daemon/tasks.py | 27 +--- aiida/orm/calculation/job/sum.py | 99 ++++++++++++++ aiida/parsers/sum.py | 62 +++++++++ aiida/work/__init__.py | 4 +- aiida/work/daemon.py | 112 ---------------- aiida/work/events.py | 2 +- aiida/work/globals.py | 58 -------- aiida/work/launch.py | 26 +--- aiida/work/rmq.py | 88 ++++++------ aiida/work/runners.py | 9 +- bin/daemon.py | 33 ----- 16 files changed, 258 insertions(+), 523 deletions(-) create mode 100644 aiida/orm/calculation/job/sum.py create mode 100644 aiida/parsers/sum.py delete mode 100644 aiida/work/daemon.py delete mode 100644 aiida/work/globals.py delete mode 100644 bin/daemon.py diff --git a/aiida/backends/tests/__init__.py b/aiida/backends/tests/__init__.py index 8981aac8fa..039e6ff22b 100644 --- a/aiida/backends/tests/__init__.py +++ b/aiida/backends/tests/__init__.py @@ -52,7 +52,7 @@ 'orm.data.frozendict': ['aiida.backends.tests.orm.data.frozendict'], 'orm.log': ['aiida.backends.tests.orm.log'], 'work.class_loader': ['aiida.backends.tests.work.class_loader'], - # 'work.daemon': ['aiida.backends.tests.work.daemon'], + 'work.daemon': ['aiida.backends.tests.work.daemon'], 'work.persistence': ['aiida.backends.tests.work.persistence'], 'work.process': ['aiida.backends.tests.work.process'], 'work.processSpec': ['aiida.backends.tests.work.processSpec'], diff --git a/aiida/backends/tests/work/daemon.py b/aiida/backends/tests/work/daemon.py index 2649e5a097..958e6b34a4 100644 --- a/aiida/backends/tests/work/daemon.py +++ b/aiida/backends/tests/work/daemon.py @@ -8,190 +8,7 @@ # For further information please visit http://www.aiida.net # ########################################################################### from aiida.backends.testbase import AiidaTestCase -import tempfile -from shutil import rmtree -import unittest -import plum -from aiida.work.persistence import Persistence -from aiida.orm.calculation.job import JobCalculation -from aiida.orm.data.base import get_true_node -import aiida.work.daemon as daemon -from aiida.work.processes import Process -from aiida.work.launch import submit -from aiida.common.lang import override -from aiida.orm import load_node -import aiida.work.utils as util -from aiida.work.test_utils import DummyProcess, ExceptionProcess -import aiida.work.daemon as work_daemon -from aiida.work.utils import CalculationHeartbeat - -#@unittest.skip("Rewriting daemon") -class ProcessEventsTester(Process): - EVENTS = ["create", "start", "run", "wait", "resume", "finish", "emitted", - "stop", "failed", ] - - @classmethod - def define(cls, spec): - super(ProcessEventsTester, cls).define(spec) - for label in ["create", "start", "run", "wait", "resume", - "finish", "emitted", "stop"]: - spec.optional_output(label) - - def __init__(self, inputs, pid, logger=None): - super(ProcessEventsTester, self).__init__(inputs, pid, logger) - self._emitted = False - - @override - def on_create(self): - super(ProcessEventsTester, self).on_create() - self.out("create", get_true_node()) - - @override - def on_start(self): - super(ProcessEventsTester, self).on_start() - self.out("start", get_true_node()) - - @override - def on_run(self): - super(ProcessEventsTester, self).on_run() - self.out("run", get_true_node()) - - @override - def on_output_emitted(self, output_port, value, dynamic): - super(ProcessEventsTester, self).on_output_emitted( - output_port, value, dynamic) - if not self._emitted: - self._emitted = True - self.out("emitted", get_true_node()) - - @override - def on_wait(self, awaiting_uuid): - super(ProcessEventsTester, self).on_wait(awaiting_uuid) - self.out("wait", get_true_node()) - - @override - def on_resume(self): - super(ProcessEventsTester, self).on_resume() - self.out("resume", get_true_node()) - - @override - def on_finish(self): - super(ProcessEventsTester, self).on_finish() - self.out("finish", get_true_node()) - - @override - def on_stop(self): - super(ProcessEventsTester, self).on_stop() - self.out("stop", get_true_node()) - - @override - def _run(self): - return plum.Continue(self.finish) - - def finish(self, wait_on): - pass - -@unittest.skip("Rewriting daemon") -class FailCreateFromSavedStateProcess(DummyProcess): - """ - This class emulates a failure that occurs when loading the process from - a saved state. - """ - - @override - def load_instance_state(self, saved_state, logger): - super(FailCreateFromSavedStateProcess, self).load_instance_state(saved_state) - raise RuntimeError() - - -@unittest.skip("Moving to new daemon") class TestDaemon(AiidaTestCase): - def setUp(self): - self.assertEquals(len(util.ProcessStack.stack()), 0) - - self.storedir = tempfile.mkdtemp() - self.storage = Persistence.create_from_basedir(self.storedir) - - def tearDown(self): - self.assertEquals(len(util.ProcessStack.stack()), 0) - rmtree(self.storedir) - - def test_submit(self): - # This call should create an entry in the database with a PK - rinfo = submit(DummyProcess) - self.assertIsNotNone(rinfo) - self.assertIsNotNone(load_node(pk=rinfo.pid)) - - def test_tick(self): - registry = ProcessRegistry() - - rinfo = submit(ProcessEventsTester, _jobs_store=self.storage) - # Tick the engine a number of times or until there is no more work - i = 0 - while daemon.launch_pending_jobs(self.storage): - self.assertLess(i, 10, "Engine not done after 10 ticks") - i += 1 - self.assertTrue(registry.has_finished(rinfo.pid)) - - def test_multiple_processes(self): - submit(DummyProcess, _jobs_store=self.storage) - submit(ExceptionProcess, _jobs_store=self.storage) - submit(ExceptionProcess, _jobs_store=self.storage) - submit(DummyProcess, _jobs_store=self.storage) - - self.assertFalse(daemon.launch_pending_jobs(self.storage)) - - def test_create_fail(self): - registry = ProcessRegistry() - - dp_rinfo = submit(DummyProcess, _jobs_store=self.storage) - fail_rinfo = submit(FailCreateFromSavedStateProcess, _jobs_store=self.storage) - - # Tick the engine a number of times or until there is no more work - i = 0 - while daemon.launch_pending_jobs(self.storage): - self.assertLess(i, 10, "Engine not done after 10 ticks") - i += 1 - - self.assertTrue(registry.has_finished(dp_rinfo.pid)) - self.assertFalse(registry.has_finished(fail_rinfo.pid)) - - -@unittest.skip("Moving to new daemon") -class TestJobCalculationDaemon(AiidaTestCase): - def test_launch_pending_submitted(self): - num_at_start = len(work_daemon.get_all_pending_job_calculations()) - - # Create the calclation - calc_params = { - 'computer': self.computer, - 'resources': {'num_machines': 1, - 'num_mpiprocs_per_machine': 1} - } - c = JobCalculation(**calc_params) - c.store() - c.submit() - - self.assertIsNone(c.get_attr(CalculationHeartbeat.HEARTBEAT_EXPIRES, None)) - pending = work_daemon.get_all_pending_job_calculations() - self.assertEqual(len(pending), num_at_start + 1) - self.assertIn(c.pk, [p.pk for p in pending]) - - def test_launch_pending_expired(self): - num_at_start = len(work_daemon.get_all_pending_job_calculations()) - - calc_params = { - 'computer': self.computer, - 'resources': {'num_machines': 1, - 'num_mpiprocs_per_machine': 1} - } - c = JobCalculation(**calc_params) - c._set_attr(CalculationHeartbeat.HEARTBEAT_EXPIRES, 0) - c.store() - c.submit() - - pending = work_daemon.get_all_pending_job_calculations() - self.assertEqual(len(pending), num_at_start + 1) - self.assertIn(c.pk, [p.pk for p in pending]) + pass diff --git a/aiida/backends/tests/work/test_rmq.py b/aiida/backends/tests/work/test_rmq.py index b055cc44a2..0d137f4506 100644 --- a/aiida/backends/tests/work/test_rmq.py +++ b/aiida/backends/tests/work/test_rmq.py @@ -8,6 +8,7 @@ import aiida.work.test_utils as test_utils from aiida.orm.data import base import aiida.work as work +from aiida.work import rmq from aiida.orm.calculation.work import WorkCalculation __copyright__ = u"Copyright (c), This file is part of the AiiDA platform. For further information please visit http://www.aiida.net/. All rights reserved." @@ -16,13 +17,13 @@ __version__ = "0.7.0" -class TestProcess(AiidaTestCase): +class TestProcessControl(AiidaTestCase): """ Test AiiDA's RabbitMQ functionalities. """ def setUp(self): - super(TestProcess, self).setUp() + super(TestProcessControl, self).setUp() prefix = "{}.{}".format(self.__class__.__name__, uuid.uuid4()) self.loop = plum.new_event_loop() @@ -104,16 +105,6 @@ def test_kill(self): # TODO: Check kill message self.assertTrue(result) - # def test_launch_and_get_status(self): - # a = base.Int(5) - # b = base.Int(10) - # - # calc_node = self.runner.submit(test_utils.AddProcess, a=a, b=b) - # self._wait_for_calc(calc_node) - # future = self.runner.rmq.request_status(calc_node.pk) - # result = plum.run_until_complete(future, self.loop) - # self.assertIsNotNone(result) - def _wait_for_calc(self, calc_node, timeout=5.): def stop(*args): self.loop.stop() diff --git a/aiida/cmdline/commands/work.py b/aiida/cmdline/commands/work.py index e53da630a4..daa1c6e550 100644 --- a/aiida/cmdline/commands/work.py +++ b/aiida/cmdline/commands/work.py @@ -334,18 +334,18 @@ def kill_old(pks): def kill(pks): from aiida import try_load_dbenv try_load_dbenv() - import plum from aiida import work - runner = work.get_runner() + control_panel = work.new_blocking_control_panel() - futures = [] for pk in pks: - future = runner.rmq.kill_process(pk) - future.add_done_callback(partial(_action_done, "kill", pk)) - futures.append(future) - - runner.run_until_complete(plum.gather(*futures)) + try: + if control_panel.kill_process(pk): + click.echo("Killed '{}'".format(pk)) + else: + click.echo("Problem killing '{}'".format(pk)) + except (work.RemoteException, work.DeliveryFailed) as e: + print("Failed to kill '{}': {}".format(pk, e.message)) @work.command('pause', context_settings=CONTEXT_SETTINGS) @@ -353,18 +353,18 @@ def kill(pks): def pause(pks): from aiida import try_load_dbenv try_load_dbenv() - import plum from aiida import work - runner = work.get_runner() + control_panel = work.new_blocking_control_panel() - futures = [] for pk in pks: - future = runner.rmq.pause_process(pk) - future.add_done_callback(partial(_action_done, "pause", pk)) - futures.append(future) - - runner.run_until_complete(plum.gather(*futures)) + try: + if control_panel.pause_process(pk): + click.echo("Paused '{}'".format(pk)) + else: + click.echo("Problem pausing '{}'".format(pk)) + except (work.RemoteException, work.DeliveryFailed) as e: + print("Failed to pause '{}': {}".format(pk, e.message)) @work.command('play', context_settings=CONTEXT_SETTINGS) @@ -372,25 +372,18 @@ def pause(pks): def play(pks): from aiida import try_load_dbenv try_load_dbenv() - import plum from aiida import work - runner = work.get_runner() + control_panel = work.new_blocking_control_panel() - futures = [] for pk in pks: - future = runner.rmq.play_process(pk) - future.add_done_callback(partial(_action_done, "play", pk)) - futures.append(future) - - runner.run_until_complete(plum.gather(*futures)) - - -def _action_done(intent, pk, future): - if future.exception() is not None: - click.echo("Failed to {} process {}: {}".format(intent, pk, future.exception())) - else: - click.echo("{} {} OK".format(intent, pk)) + try: + if control_panel.play_process(pk): + click.echo("Played '{}'".format(pk)) + else: + click.echo("Problem playing '{}'".format(pk)) + except (work.RemoteException, work.DeliveryFailed) as e: + print("Failed to play '{}': {}".format(pk, e.message)) @work.command('status', context_settings=CONTEXT_SETTINGS) diff --git a/aiida/daemon/new.py b/aiida/daemon/new.py index 440945f3a3..2afb4b531c 100644 --- a/aiida/daemon/new.py +++ b/aiida/daemon/new.py @@ -1,6 +1,8 @@ from functools import partial import logging +import aiida.work.rmq + def tick_legacy_workflows(runner): tasks.workflow_stepper() @@ -37,7 +39,7 @@ def tick_legacy_workflows(runner): # TODO: Add the profile name to the RMQ prefix rmq_config = { 'url': 'amqp://localhost', - 'prefix': 'aiida', + 'prefix': aiida.work.rmq._get_prefix(), } runner = work.DaemonRunner(rmq_config=rmq_config, rmq_submit=True) work.set_runner(runner) diff --git a/aiida/daemon/tasks.py b/aiida/daemon/tasks.py index 3c4d8464dc..f435e27a60 100644 --- a/aiida/daemon/tasks.py +++ b/aiida/daemon/tasks.py @@ -89,23 +89,12 @@ def retriever(): set_daemon_timestamp(task_name='retriever', when='stop') -@periodic_task( - run_every=timedelta( - seconds=config.get("DAEMON_INTERVALS_TICK_WORKFLOWS", DAEMON_INTERVALS_TICK_WORKFLOWS) - ) -) -def tick_work(): - configure_logging(daemon=True, daemon_log_file=DAEMON_LOG_FILE) - from aiida.work.daemon import launch_pending_jobs - print "aiida.daemon.tasks.tick_workflows: Ticking workflows" - launch_pending_jobs() - @periodic_task( run_every=timedelta( seconds=config.get("DAEMON_INTERVALS_WFSTEP", DAEMON_INTERVALS_WFSTEP) ) ) -def workflow_stepper(): # daemon for legacy workflow +def workflow_stepper(): # daemon for legacy workflow configure_logging(daemon=True, daemon_log_file=DAEMON_LOG_FILE) from aiida.daemon.workflowmanager import execute_steps print "aiida.daemon.tasks.workflowmanager: Checking for workflows to manage" @@ -131,13 +120,9 @@ def workflow_stepper(): # daemon for legacy workflow def manual_tick_all(): from aiida.daemon.execmanager import submit_jobs, update_jobs, retrieve_jobs - from aiida.work.daemon import launch_pending_jobs, launch_all_pending_job_calculations from aiida.daemon.workflowmanager import execute_steps - if DAEMON_USE_NEW: - tick_work() - launch_all_pending_job_calculations() - else: - submit_jobs() - update_jobs() - retrieve_jobs() - execute_steps() # legacy workflows \ No newline at end of file + + submit_jobs() + update_jobs() + retrieve_jobs() + execute_steps() # legacy workflows diff --git a/aiida/orm/calculation/job/sum.py b/aiida/orm/calculation/job/sum.py new file mode 100644 index 0000000000..9f176c3795 --- /dev/null +++ b/aiida/orm/calculation/job/sum.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### + +from aiida.orm import JobCalculation +from aiida.orm.data.parameter import ParameterData +from aiida.common.utils import classproperty +from aiida.common.exceptions import InputValidationError +from aiida.common.exceptions import ValidationError +from aiida.common.datastructures import CalcInfo, CodeInfo +import json + +class SumCalculation(JobCalculation): + """ + A generic plugin for adding two numbers. + """ + + def _init_internal_params(self): + super(SumCalculation, self)._init_internal_params() + + self._DEFAULT_INPUT_FILE = 'in.json' + self._DEFAULT_OUTPUT_FILE = 'out.json' + self._default_parser = 'sum' + + @classproperty + def _use_methods(cls): + """ + Additional use_* methods for the namelists class. + """ + retdict = JobCalculation._use_methods + retdict.update({ + "parameters": { + 'valid_types': ParameterData, + 'additional_parameter': None, + 'linkname': 'parameters', + 'docstring': ("Use a node that specifies the input parameters " + "for the namelists"), + }, + }) + return retdict + + def _prepare_for_submission(self,tempfolder, inputdict): + """ + This is the routine to be called when you want to create + the input files and related stuff with a plugin. + + :param tempfolder: a aiida.common.folders.Folder subclass where + the plugin should put all its files. + :param inputdict: a dictionary with the input nodes, as they would + be returned by get_inputs_dict (with the Code!) + """ + try: + parameters = inputdict.pop(self.get_linkname('parameters')) + except KeyError: + raise InputValidationError("No parameters specified for this " + "calculation") + if not isinstance(parameters, ParameterData): + raise InputValidationError("parameters is not of type " + "ParameterData") + try: + code = inputdict.pop(self.get_linkname('code')) + except KeyError: + raise InputValidationError("No code specified for this " + "calculation") + if inputdict: + raise ValidationError("Cannot add other nodes beside parameters") + + ############################## + # END OF INITIAL INPUT CHECK # + ############################## + + input_json = parameters.get_dict() + + # write all the input to a file + input_filename = tempfolder.get_abs_path(self._DEFAULT_INPUT_FILE) + with open(input_filename, 'w') as infile: + json.dump(input_json, infile) + + # ============================ calcinfo ================================ + + calcinfo = CalcInfo() + calcinfo.uuid = self.uuid + calcinfo.local_copy_list = [] + calcinfo.remote_copy_list = [] + calcinfo.retrieve_list = [self._DEFAULT_OUTPUT_FILE] + + codeinfo = CodeInfo() + codeinfo.cmdline_params = [self._DEFAULT_INPUT_FILE,self._DEFAULT_OUTPUT_FILE] + codeinfo.code_uuid = code.uuid + calcinfo.codes_info = [codeinfo] + + return calcinfo + diff --git a/aiida/parsers/sum.py b/aiida/parsers/sum.py new file mode 100644 index 0000000000..d171483052 --- /dev/null +++ b/aiida/parsers/sum.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### + +from aiida.orm.calculation.job.sum import SumCalculation +from aiida.parsers.parser import Parser +from aiida.parsers.exceptions import OutputParsingError +from aiida.orm.data.parameter import ParameterData + +import json + + +class SumParser(Parser): + """ + This class is the implementation of the Parser class for Sum. + """ + + def parse_with_retrieved(self, retrieved): + """ + Parses the datafolder, stores results. + This parser for this simple code does simply store in the DB a node + representing the file of forces in real space + """ + + successful = True + # select the folder object + # Check that the retrieved folder is there + try: + out_folder = retrieved[self._calc._get_linkname_retrieved()] + except KeyError: + self.logger.error("No retrieved folder found") + return False, () + + # check what is inside the folder + list_of_files = out_folder.get_folder_list() + # at least the stdout should exist + if self._calc._DEFAULT_OUTPUT_FILE not in list_of_files: + successful = False + self.logger.error("Output json not found") + return successful, () + + try: + with open(out_folder.get_abs_path(self._calc._DEFAULT_OUTPUT_FILE)) as f: + out_dict = json.load(f) + except ValueError: + successful = False + self.logger.error("Error parsing the output json") + return successful, () + + output_data = ParameterData(dict=out_dict) + link_name = self.get_linkname_outparams() + new_nodes_list = [(link_name, output_data)] + + return successful, new_nodes_list + + diff --git a/aiida/work/__init__.py b/aiida/work/__init__.py index 4664c22724..fd674f4238 100644 --- a/aiida/work/__init__.py +++ b/aiida/work/__init__.py @@ -14,6 +14,7 @@ from .job_processes import * from .launch import * from .processes import * +from .rmq import * from .runners import * from .utils import * from .workfunctions import * @@ -21,4 +22,5 @@ __all__ = (processes.__all__ + runners.__all__ + utils.__all__ + workchain.__all__ + launch.__all__ + workfunctions.__all__ + - ['ProcessState'] + class_loader.__all__ + job_processes.__all__) + ['ProcessState'] + class_loader.__all__ + job_processes.__all__ + + rmq.__all__) diff --git a/aiida/work/daemon.py b/aiida/work/daemon.py deleted file mode 100644 index 03709eb6be..0000000000 --- a/aiida/work/daemon.py +++ /dev/null @@ -1,112 +0,0 @@ -# -*- coding: utf-8 -*- -########################################################################### -# Copyright (c), The AiiDA team. All rights reserved. # -# This file is part of the AiiDA code. # -# # -# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core # -# For further information on the license, see the LICENSE.txt file # -# For further information please visit http://www.aiida.net # -########################################################################### -import logging -import time - -import aiida.work.globals -import aiida.work.persistence -from aiida.orm.calculation.job import JobCalculation -from aiida.orm.mixins import Sealable -from aiida.orm.querybuilder import QueryBuilder -from aiida.work.job_processes import ContinueJobCalculation -from aiida.work.utils import CalculationHeartbeat -from plum.exceptions import LockError -from . import runners - -# Until we fix the broken daemon logger https://github.com/aiidateam/aiida_core/issues/943 -# _LOGGER = logging.getLogger(__name__) -_LOGGER = logging.getLogger('daemon') - -import traceback -import aiida.work.persistence - - -def launch_pending_jobs(storage=None, loop=None): - if storage is None: - storage = aiida.work.globals.get_persistence() - if loop is None: - loop = runners.get_runner() - - executor = aiida.work.globals.get_thread_executor() - for proc in _load_all_processes(storage, loop): - # if executor.has_process(proc.pid): - # # If already playing, skip - # continue - - try: - storage.persist_process(proc) - f = executor.play(proc) - except LockError: - pass - except BaseException: - _LOGGER.error("Failed to play process '{}':\n{}".format( - proc.pid, traceback.format_exc())) - - -def _load_all_processes(storage, loop): - procs = [] - for cp in storage.get_checkpoints(): - try: - procs.append(loop.create(cp)) - except KeyboardInterrupt: - raise - except BaseException as exception: - import traceback - _LOGGER.warning("Failed to load process from checkpoint with " - "pid '{}'\n{}: {}".format(cp['pid'], exception.__class__.__name__, exception)) - _LOGGER.error(traceback.format_exc()) - return procs - - -def launch_all_pending_job_calculations(): - """ - Launch all JobCalculations that are not currently being processed - """ - storage = aiida.work.globals.get_persistence() - executor = aiida.work.globals.get_thread_executor() - for calc in get_all_pending_job_calculations(): - try: - # if executor.has_process(calc.pk): - # # If already playing, skip - # continue - - proc = ContinueJobCalculation(inputs={'_calc': calc}) - storage.persist_process(proc) - f = executor.play(proc) - except BaseException: - _LOGGER.error("Failed to launch job '{}'\n{}".format( - calc.pk, traceback.format_exc())) - else: - # Check if the process finished or was stopped early - if not proc.has_finished(): - more_work = True - - -def get_all_pending_job_calculations(): - """ - Get all JobCalculations that are in an active state but have no heartbeat - - :return: A list of those calculations - :rtype: list - """ - q = QueryBuilder() - q.append( - JobCalculation, - filters={ - 'state': {'in': ContinueJobCalculation.ACTIVE_CALC_STATES}, - 'attributes': {'!has_key': Sealable.SEALED_KEY}, - 'or': [ - {'attributes': {'!has_key': CalculationHeartbeat.HEARTBEAT_EXPIRES}}, - {'attributes.{}'.format(CalculationHeartbeat.HEARTBEAT_EXPIRES): {'<': time.time()}} - ], - }, - ) - - return [_[0] for _ in q.all()] diff --git a/aiida/work/events.py b/aiida/work/events.py index 954ec188ec..138e5abf79 100644 --- a/aiida/work/events.py +++ b/aiida/work/events.py @@ -4,4 +4,4 @@ def run_until_complete(future, loop): - plum.run_until_complete(future, loop) + return plum.run_until_complete(future, loop) diff --git a/aiida/work/globals.py b/aiida/work/globals.py deleted file mode 100644 index d97d174301..0000000000 --- a/aiida/work/globals.py +++ /dev/null @@ -1,58 +0,0 @@ -# -*- coding: utf-8 -*- -import plum.class_loader -from aiida.work.class_loader import ClassLoader - -# Have globals that can be used by all of AiiDA -class_loader = plum.class_loader.ClassLoader(ClassLoader()) -_loop = None -_thread_executor = None -_rmq_control_panel = None -_persistence = None - -def get_thread_executor(): - """ - Dinges - """ - from aiida.work.runners import get_runner - return get_runner() - - -def get_persistence(): - """ - Get the global persistence object - - :return: The persistence object - """ - from aiida.work.persistence import get_global_persistence - return get_global_persistence() - - -def enable_rmq_subscribers(): - """ - Enable RMQ subscribers for the global process manager. This means that RMQ - launch, status and control messages will be listened for and processed. - - Use this to enable RMQ support for AiiDA. - """ - import aiida.work.rmq as rmq - rmq.enable_subscribers(get_thread_executor(), "aiida") - - -def enable_rmq_event_publisher(): - import aiida.work.rmq as rmq - rmq.enable_process_event_publisher() - - -def enable_rmq_all(): - enable_rmq_subscribers() - enable_rmq_event_publisher() - - -def get_rmq_control_panel(): - global _rmq_control_panel - if _rmq_control_panel is None: - from aiida.work.rmq import ProcessControlPanel - cp = ProcessControlPanel("aiida") - _rmq_control_panel = cp - - return _rmq_control_panel diff --git a/aiida/work/launch.py b/aiida/work/launch.py index f33b567490..6cdbdc2e09 100644 --- a/aiida/work/launch.py +++ b/aiida/work/launch.py @@ -10,9 +10,9 @@ from collections import namedtuple -from . import persistence -from . import rmq +import aiida.orm from . import runners +from . import rmq from . import utils __all__ = ['run', 'run_get_pid', 'run_get_node', 'submit'] @@ -20,30 +20,14 @@ RunningInfo = namedtuple("RunningInfo", ["type", "pid"]) _persister = None -_rmq_control_panel = None - - -def get_rmq_control_panel(): - global _rmq_control_panel - if _rmq_control_panel is None: - _rmq_control_panel = rmq.BlockingProcessControlPanel('aiida') - return _rmq_control_panel - - -def _get_persister(): - global _persister - if _persister is None: - _persister = persistence.AiiDAPersister() - return _persister def submit(process_class, **inputs): assert not utils.is_workfunction(process_class), "Cannot submit a workfunction" - process = runners._create_process(process_class, None, input_kwargs=inputs) - _get_persister().save_checkpoint(process) - get_rmq_control_panel().launch.continue_process(process.pid) - return process.calc + pid = rmq.new_blocking_control_panel().execute_process_start( + process_class, init_kwargs={'inputs': inputs}) + return aiida.orm.load_node(pid) def run(process, *args, **inputs): diff --git a/aiida/work/rmq.py b/aiida/work/rmq.py index bd35bfca23..bb59c96e8d 100644 --- a/aiida/work/rmq.py +++ b/aiida/work/rmq.py @@ -1,15 +1,19 @@ import json import plum +import plum.rmq from aiida.utils.serialize import serialize_data, deserialize_data - -from aiida.common.exceptions import MultipleObjectsError, NotExistent from aiida.common.setup import get_profile_config, RMQ_PREFIX_KEY from aiida.backends import settings -from plum import rmq from . import events +__all__ = ['new_blocking_control_panel', 'BlockingProcessControlPanel', + 'RemoteException', 'DeliveryFailed'] + +RemoteException = plum.RemoteException +DeliveryFailed = plum.DeliveryFailed + _MESSAGE_EXCHANGE = 'messages' _LAUNCH_QUEUE = 'process.queue' @@ -59,7 +63,7 @@ def __init__(self, prefix, rmq_connector, testing_mode=False): exchange_name=message_exchange) task_queue_name = "{}.{}".format(prefix, _LAUNCH_QUEUE) - self._launch = rmq.launch.ProcessLaunchPublisher( + self._launch = plum.rmq.ProcessLaunchPublisher( self._connector, exchange_name=get_message_exchange_name(prefix), task_queue_name=task_queue_name, @@ -72,68 +76,70 @@ def ready_future(self): self.communicator.initialised_future()) def pause_process(self, pid): - return self.communicator.rpc_send(pid, plum.PAUSE_MSG) + return self.execute_action(plum.PauseAction(pid)) def play_process(self, pid): - return self.communicator.rpc_send(pid, plum.PLAY_MSG) + return self.execute_action(plum.PlayAction(pid)) def kill_process(self, pid, msg=None): - return self.communicator.rpc_send(pid, plum.CANCEL_MSG) + return self.execute_action(plum.CancelAction(pid)) def request_status(self, pid): - return self.communicator.rpc_send(pid, plum.STATUS_MSG) + return self.execute_action(plum.StatusAction(pid)) - @property - def launch(self): - return self._launch + def launch_process(self, process_class, init_args=None, init_kwargs=None): + action = plum.rmq.LaunchProcessAction(process_class, init_args, init_kwargs) + action.execute(self._launch) + return action + def continue_process(self, pid): + action = plum.rmq.ContinueProcessAction(pid) + action.execute(self._launch) + return action -class BlockingProcessControlPanel(object): - """ - A blocking adapter for the ProcessControlPanel. - """ + def execute_process(self, process_class, init_args=None, init_kwargs=None): + action = plum.rmq.ExecuteProcessAction(process_class, init_args, init_kwargs) + action.execute(self._launch) + return action - class _Launcher(object): - def __init__(self, parent): - self._parent = parent + def execute_action(self, action): + action.execute(self.communicator) + return action - def launch_process(self, process_class, init_args=None, init_kwargs=None): - future = self._parent._control_panel.launch.launch_process(process_class, init_args, init_kwargs) - return self._parent._run(future) - def continue_process(self, pid, tag=None): - future = self._parent._control_panel.launch.continue_process(pid, tag) - return self._parent._run(future) +class BlockingProcessControlPanel(ProcessControlPanel): + """ + A blocking adapter for the ProcessControlPanel. + """ def __init__(self, prefix, rmq_config=None, testing_mode=False): if rmq_config is None: rmq_config = { 'url': 'amqp://localhost', - 'prefix': 'aiida', + 'prefix': prefix, } self._loop = events.new_event_loop() self._connector = plum.rmq.RmqConnector(amqp_url=rmq_config['url'], loop=self._loop) - self._control_panel = ProcessControlPanel(prefix, self._connector, testing_mode) - self.launch = self._Launcher(self) + super(BlockingProcessControlPanel, self).__init__(prefix, self._connector, testing_mode) self._connector.connect() - def pause_process(self, pid): - future = self._control_panel.pause_process(pid) - return self._run(future) + def execute_process_start(self, process_class, init_args=None, init_kwargs=None): + action = plum.rmq.ExecuteProcessAction(process_class, init_args, init_kwargs) + action.execute(self._launch) + return events.run_until_complete(action.get_launch_future(), self._loop) - def play_process(self, pid): - future = self._control_panel.pause_process(pid) - return self._run(future) + def execute_action(self, action): + action.execute(self.communicator) + return events.run_until_complete(action, self._loop) - def kill_process(self, pid, msg=None): - future = self._control_panel.kill_process(pid, msg) - return self._run(future) - def request_status(self, pid): - future = self._control_panel.request_status(pid) - return self._run(future) +def new_blocking_control_panel(): + """ + Create a new blocking control panel based on the current profile configuration - def _run(self, future): - return events.run_until_complete(future, self._loop) + :return: A new control panel instance + :rtype: :class:`BlockingProcessControlPanel` + """ + return BlockingProcessControlPanel(_get_prefix()) diff --git a/aiida/work/runners.py b/aiida/work/runners.py index fd31141b09..e7717ca864 100644 --- a/aiida/work/runners.py +++ b/aiida/work/runners.py @@ -22,6 +22,7 @@ _runner = None + def get_runner(): global _runner if _runner is None: @@ -40,7 +41,7 @@ def new_runner(**kwargs): if 'rmq_config' not in kwargs: kwargs['rmq_config'] = { 'url': 'amqp://localhost', - 'prefix': 'aiida', + 'prefix': rmq._get_prefix(), } return Runner(**kwargs) @@ -51,10 +52,6 @@ def new_daemon_runner(rmq_prefix='aiida', rmq_create_connection=None): return runner -def create_connector(rmq_config): - return plum.rmq.RmqConnector(amqp_url=rmq_config['url'], loop=self._loop) - - def convert_to_inputs(workfunction, *args, **kwargs): """ """ @@ -206,7 +203,7 @@ def submit(self, process_class, *args, **inputs): if self._rmq_submit: process = _create_process(process_class, self, input_args=args, input_kwargs=inputs) self.persister.save_checkpoint(process) - self.rmq.launch.continue_process(process.pid) + self.rmq.continue_process(process.pid) return process.calc else: # Run in this runner diff --git a/bin/daemon.py b/bin/daemon.py deleted file mode 100644 index a6839814f1..0000000000 --- a/bin/daemon.py +++ /dev/null @@ -1,33 +0,0 @@ -import click -import logging -import time - -from aiida.backends.utils import load_dbenv, is_dbenv_loaded - -if not is_dbenv_loaded(): - load_dbenv() - -from aiida.work.daemon import launch_pending_jobs -import aiida.work.daemon as work_daemon - - -@click.command() -@click.option('-v', '--verbose', count=True) -def run_daemon(verbose): - if verbose is not None: - if verbose == 1: - level = logging.INFO - else: - level = logging.DEBUG - - FORMAT = "[%(filename)s:%(lineno)s - %(funcName)s()] %(message)s" - logging.basicConfig(level=level, format=FORMAT) - - while True: - launch_pending_jobs() - #work_daemon.launch_all_pending_job_calculations() - time.sleep(10) - - -if __name__ == "__main__": - run_daemon() From e36da4ea3eee329b5f8a9f1bed621ad2afc2cf80 Mon Sep 17 00:00:00 2001 From: Martin Uhrin Date: Thu, 18 Jan 2018 18:09:24 +0100 Subject: [PATCH 7/8] Removing files accidentally committed --- aiida/orm/calculation/job/sum.py | 99 --------------------- aiida/parsers/sum.py | 62 -------------- examples/work/eos.py | 108 ----------------------- examples/work/eos_workfunction.py | 138 ------------------------------ 4 files changed, 407 deletions(-) delete mode 100644 aiida/orm/calculation/job/sum.py delete mode 100644 aiida/parsers/sum.py delete mode 100644 examples/work/eos.py delete mode 100644 examples/work/eos_workfunction.py diff --git a/aiida/orm/calculation/job/sum.py b/aiida/orm/calculation/job/sum.py deleted file mode 100644 index 9f176c3795..0000000000 --- a/aiida/orm/calculation/job/sum.py +++ /dev/null @@ -1,99 +0,0 @@ -# -*- coding: utf-8 -*- -########################################################################### -# Copyright (c), The AiiDA team. All rights reserved. # -# This file is part of the AiiDA code. # -# # -# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core # -# For further information on the license, see the LICENSE.txt file # -# For further information please visit http://www.aiida.net # -########################################################################### - -from aiida.orm import JobCalculation -from aiida.orm.data.parameter import ParameterData -from aiida.common.utils import classproperty -from aiida.common.exceptions import InputValidationError -from aiida.common.exceptions import ValidationError -from aiida.common.datastructures import CalcInfo, CodeInfo -import json - -class SumCalculation(JobCalculation): - """ - A generic plugin for adding two numbers. - """ - - def _init_internal_params(self): - super(SumCalculation, self)._init_internal_params() - - self._DEFAULT_INPUT_FILE = 'in.json' - self._DEFAULT_OUTPUT_FILE = 'out.json' - self._default_parser = 'sum' - - @classproperty - def _use_methods(cls): - """ - Additional use_* methods for the namelists class. - """ - retdict = JobCalculation._use_methods - retdict.update({ - "parameters": { - 'valid_types': ParameterData, - 'additional_parameter': None, - 'linkname': 'parameters', - 'docstring': ("Use a node that specifies the input parameters " - "for the namelists"), - }, - }) - return retdict - - def _prepare_for_submission(self,tempfolder, inputdict): - """ - This is the routine to be called when you want to create - the input files and related stuff with a plugin. - - :param tempfolder: a aiida.common.folders.Folder subclass where - the plugin should put all its files. - :param inputdict: a dictionary with the input nodes, as they would - be returned by get_inputs_dict (with the Code!) - """ - try: - parameters = inputdict.pop(self.get_linkname('parameters')) - except KeyError: - raise InputValidationError("No parameters specified for this " - "calculation") - if not isinstance(parameters, ParameterData): - raise InputValidationError("parameters is not of type " - "ParameterData") - try: - code = inputdict.pop(self.get_linkname('code')) - except KeyError: - raise InputValidationError("No code specified for this " - "calculation") - if inputdict: - raise ValidationError("Cannot add other nodes beside parameters") - - ############################## - # END OF INITIAL INPUT CHECK # - ############################## - - input_json = parameters.get_dict() - - # write all the input to a file - input_filename = tempfolder.get_abs_path(self._DEFAULT_INPUT_FILE) - with open(input_filename, 'w') as infile: - json.dump(input_json, infile) - - # ============================ calcinfo ================================ - - calcinfo = CalcInfo() - calcinfo.uuid = self.uuid - calcinfo.local_copy_list = [] - calcinfo.remote_copy_list = [] - calcinfo.retrieve_list = [self._DEFAULT_OUTPUT_FILE] - - codeinfo = CodeInfo() - codeinfo.cmdline_params = [self._DEFAULT_INPUT_FILE,self._DEFAULT_OUTPUT_FILE] - codeinfo.code_uuid = code.uuid - calcinfo.codes_info = [codeinfo] - - return calcinfo - diff --git a/aiida/parsers/sum.py b/aiida/parsers/sum.py deleted file mode 100644 index d171483052..0000000000 --- a/aiida/parsers/sum.py +++ /dev/null @@ -1,62 +0,0 @@ -# -*- coding: utf-8 -*- -########################################################################### -# Copyright (c), The AiiDA team. All rights reserved. # -# This file is part of the AiiDA code. # -# # -# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core # -# For further information on the license, see the LICENSE.txt file # -# For further information please visit http://www.aiida.net # -########################################################################### - -from aiida.orm.calculation.job.sum import SumCalculation -from aiida.parsers.parser import Parser -from aiida.parsers.exceptions import OutputParsingError -from aiida.orm.data.parameter import ParameterData - -import json - - -class SumParser(Parser): - """ - This class is the implementation of the Parser class for Sum. - """ - - def parse_with_retrieved(self, retrieved): - """ - Parses the datafolder, stores results. - This parser for this simple code does simply store in the DB a node - representing the file of forces in real space - """ - - successful = True - # select the folder object - # Check that the retrieved folder is there - try: - out_folder = retrieved[self._calc._get_linkname_retrieved()] - except KeyError: - self.logger.error("No retrieved folder found") - return False, () - - # check what is inside the folder - list_of_files = out_folder.get_folder_list() - # at least the stdout should exist - if self._calc._DEFAULT_OUTPUT_FILE not in list_of_files: - successful = False - self.logger.error("Output json not found") - return successful, () - - try: - with open(out_folder.get_abs_path(self._calc._DEFAULT_OUTPUT_FILE)) as f: - out_dict = json.load(f) - except ValueError: - successful = False - self.logger.error("Error parsing the output json") - return successful, () - - output_data = ParameterData(dict=out_dict) - link_name = self.get_linkname_outparams() - new_nodes_list = [(link_name, output_data)] - - return successful, new_nodes_list - - diff --git a/examples/work/eos.py b/examples/work/eos.py deleted file mode 100644 index c6a51ba596..0000000000 --- a/examples/work/eos.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env runaiida -# -*- coding: utf-8 -*- - -from aiida.backends.utils import load_dbenv, is_dbenv_loaded - -if not is_dbenv_loaded(): - load_dbenv() - -from aiida.orm import CalculationFactory, DataFactory, Code -from aiida.orm.data.base import BaseType, Float, Str -from aiida.orm.data.structure import StructureData -from aiida.orm.data.parameter import ParameterData -from aiida.orm.data.array.kpoints import KpointsData -from aiida.work.utils import ProcessStack -from aiida.work.workfunctions import workfunction -from aiida.work.workchain import WorkChain, ToContext, while_, Outputs -from aiida import work -from examples.work import common - -PwCalculation = CalculationFactory('quantumespresso.pw') - - -@workfunction -def create_diamond_fcc(element, alat): - """ - Workfunction to create a diamond crystal structure with a given element. - - :param element: The element to create the structure with. - :return: The structure. - """ - from numpy import array - the_cell = array([[0., 0.5, 0.5], - [0.5, 0., 0.5], - [0.5, 0.5, 0.]]) * alat - StructureData = DataFactory("structure") - structure = StructureData(cell=the_cell) - structure.append_atom(position=(0., 0., 0.), symbols=str(element)) - structure.append_atom(position=(0.25 * alat, 0.25 * alat, 0.25 * alat), symbols=str(element)) - return structure - - -@workfunction -def rescale(structure, scale): - """ - Workfunction to rescale a structure - - :param structure: An AiiDA structure to rescale - :param scale: The scale factor - :return: The rescaled structure - """ - the_ase = structure.get_ase() - new_ase = the_ase.copy() - new_ase.set_cell(the_ase.get_cell() * float(scale), scale_atoms=True) - new_structure = DataFactory('structure')(ase=new_ase) - return new_structure - - -class EquationOfState(WorkChain): - @classmethod - def define(cls, spec): - super(EquationOfState, cls).define(spec) - spec.input("structure", valid_type=StructureData) - spec.input("codename", valid_type=BaseType) - spec.input("pseudo_family", valid_type=BaseType) - spec.outline( - cls.begin, - while_(cls.not_finished)( - cls.run_pw, - cls.print_result - ) - ) - - def begin(self): - self.ctx.scales = (0.94, 0.96, 0.98, 1., 1.02, 1.04, 1.06) - self.ctx.i = 0 - - def not_finished(self): - return self.ctx.i < len(self.ctx.scales) - - def run_pw(self): - scale = self.ctx.scales[self.ctx.i] - scaled = rescale(self.inputs.structure, Float(scale)) - - inputs = common.generate_scf_input_params( - scaled, self.inputs.codename, self.inputs.pseudo_family) - - # Launch the code - process = PwCalculation.process() - future = self.submit(process, **inputs) - - return ToContext(result=Outputs(future)) - - def print_result(self): - print self.ctx.scales[self.ctx.i], self.ctx.result['output_parameters'].dict.energy - self.ctx.i += 1 - - -structure = create_diamond_fcc(Str('C'), Float(3.57)) -codename = 'pw.x@localhost' -pseudo_family_name = 'SSSP_eff_PBE_0.7' - -result = work.submit(EquationOfState, - structure=structure, - codename=Str(codename), - pseudo_family=Str(pseudo_family_name) - ) - -print(result) diff --git a/examples/work/eos_workfunction.py b/examples/work/eos_workfunction.py deleted file mode 100644 index 6e84b0af81..0000000000 --- a/examples/work/eos_workfunction.py +++ /dev/null @@ -1,138 +0,0 @@ -# -*- coding: utf-8 -*- - -from aiida.backends.utils import load_dbenv, is_dbenv_loaded - -if not is_dbenv_loaded(): - load_dbenv() - -import numpy as np -import matplotlib.pyplot as plt -from aiida.orm.code import Code -from aiida.orm.utils import DataFactory -from aiida.orm.data.base import * -from aiida.work import * - - -@workfunction -def create_diamond_fcc(element, alat): - from numpy import array - the_cell = array([[0., 0.5, 0.5], - [0.5, 0., 0.5], - [0.5, 0.5, 0.]]) * alat - StructureData = DataFactory("structure") - structure = StructureData(cell=the_cell) - structure.append_atom(position=(0., 0., 0.), symbols=str(element)) - structure.append_atom(position=(0.25 * alat, 0.25 * alat, 0.25 * alat), symbols=str(element)) - return structure - - -@workfunction -def rescale(structure, scale): - the_ase = structure.get_ase() - new_ase = the_ase.copy() - new_ase.set_cell(the_ase.get_cell() * float(scale), scale_atoms=True) - new_structure = DataFactory('structure')(ase=new_ase) - return new_structure - - -def get_pseudos(structure, family_name): - from collections import defaultdict - from aiida.orm.data.upf import get_pseudos_from_structure - - # A dict {kind_name: pseudo_object} - kind_pseudo_dict = get_pseudos_from_structure(structure, family_name) - - # We have to group the species by pseudo, I use the pseudo PK - # pseudo_dict will just map PK->pseudo_object - pseudo_dict = {} - # Will contain a list of all species of the pseudo with given PK - pseudo_species = defaultdict(list) - - for kindname, pseudo in kind_pseudo_dict.iteritems(): - pseudo_dict[pseudo.pk] = pseudo - pseudo_species[pseudo.pk].append(kindname) - - pseudos = {} - for pseudo_pk in pseudo_dict: - pseudo = pseudo_dict[pseudo_pk] - kinds = pseudo_species[pseudo_pk] - for kind in kinds: - pseudos[kind] = pseudo - - return pseudos - - -def generate_scf_input_params(structure, codename, pseudo_family): - # The inputs - inputs = PwCalculation.process().get_inputs_template() - - # The structure - inputs.structure = structure - - inputs.code = Code.get_from_string(codename.value) - inputs._options.resources = {"num_machines": 1} - inputs._options.max_wallclock_seconds = 30 * 60 - - # Kpoints - KpointsData = DataFactory("array.kpoints") - kpoints = KpointsData() - kpoints_mesh = 2 - kpoints.set_kpoints_mesh([kpoints_mesh, kpoints_mesh, kpoints_mesh]) - inputs.kpoints = kpoints - - # Calculation parameters - parameters_dict = { - "CONTROL": {"calculation": "scf", - "tstress": True, # Important that this stays to get stress - "tprnfor": True, }, - "SYSTEM": {"ecutwfc": 30., - "ecutrho": 200., }, - "ELECTRONS": {"conv_thr": 1.e-6, } - } - ParameterData = DataFactory("parameter") - inputs.parameters = ParameterData(dict=parameters_dict) - - # Pseudopotentials - inputs.pseudo = get_pseudos(structure, str(pseudo_family)) - - return inputs - - -from aiida.orm.data.base import Str -from aiida.orm.utils import CalculationFactory - -PwCalculation = CalculationFactory('quantumespresso.pw') -PwProcess = PwCalculation.process() - - -@workfunction -def eos(structure, codename, pseudo_family): - scales = (0.96, 0.98, 0.99, 1.0, 1.02, 1.04) - # Plotting - fig, ax = plt.subplots() - ax.set_xlim([scales[0], scales[-1]]) - ax.set_xlabel(u"Cell length [Å]") - ax.set_ylabel(u"Total energy [eV]") - line, = ax.plot([], []) - - # Loop over calculating energies at given scales - energies = [] - for i, s in enumerate(scales): - rescaled = rescale(structure, Float(s)) - inputs = generate_scf_input_params(rescaled, codename, pseudo_family) - outputs = run(PwProcess, **inputs) - energy = outputs['output_parameters'].dict.energy - energies.append(energy) - - # Plot - line.set_xdata(scales[0:len(energies)]) - line.set_ydata(energies) - ax.relim() - ax.autoscale_view() - fig.canvas.draw() - - -structure = create_diamond_fcc(Str('C'), Float(3.65)) -codename = Str('pw.x@localhost') -pseudo_family = Str('SSSP_eff_PBE_0.7') -eos(structure, codename, pseudo_family) From 6b8ba827527de1312342a3a717fd26d4853bf2d4 Mon Sep 17 00:00:00 2001 From: Sebastiaan Huber Date: Fri, 19 Jan 2018 12:29:38 +0100 Subject: [PATCH 8/8] Remove stale references from work documentation --- docs/source/work/dev.rst | 6 ------ 1 file changed, 6 deletions(-) diff --git a/docs/source/work/dev.rst b/docs/source/work/dev.rst index 9bb67322b5..355a01c7c7 100644 --- a/docs/source/work/dev.rst +++ b/docs/source/work/dev.rst @@ -15,15 +15,9 @@ This section describes the different classes related to workflows, workfunctions .. automodule:: aiida.work.context :members: -.. automodule:: aiida.work.daemon - :members: - .. automodule:: aiida.work.db_types :members: -.. automodule:: aiida.work.globals - :members: - .. automodule:: aiida.work.launch :members: