From 19d9f5b3c5a09dc3f86748f57bfc0804f678da6a Mon Sep 17 00:00:00 2001 From: Leonid Kahle Date: Wed, 18 Dec 2019 12:48:45 +0100 Subject: [PATCH] Add traverse_graph / AGE as engine for export The export function now uses the get_nodes_delete function (with the traverse_graph underlying interface using AGE as the main engine) to collect the extra nodes that are needed to keep a consistent provenance. This is performed, more specifically, by the 'retrieve_linked_nodes' function. Whereas previously a different query was performed for each new node added in the previous query step, this new implementation should do a single new query for all the nodes that were added in the previous query step. So these changes are not only important as a first step to homogenize graph traversal throughout the whole code: an improvement in the export procedure is expected as well. --- aiida/tools/importexport/dbexport/__init__.py | 31 +- aiida/tools/importexport/dbexport/utils.py | 300 ------------------ 2 files changed, 27 insertions(+), 304 deletions(-) diff --git a/aiida/tools/importexport/dbexport/__init__.py b/aiida/tools/importexport/dbexport/__init__.py index 9d5043e8ff..0ac216392a 100644 --- a/aiida/tools/importexport/dbexport/__init__.py +++ b/aiida/tools/importexport/dbexport/__init__.py @@ -29,7 +29,7 @@ ) from aiida.tools.importexport.common.utils import export_shard_uuid from aiida.tools.importexport.dbexport.utils import ( - check_licenses, fill_in_query, serialize_dict, check_process_nodes_sealed, retrieve_linked_nodes + check_licenses, fill_in_query, serialize_dict, check_process_nodes_sealed ) from .zip import ZipFolder @@ -138,6 +138,7 @@ def export_tree( :raises `~aiida.common.exceptions.LicensingException`: if any node is licensed under forbidden license. """ from collections import defaultdict + from aiida.tools.graph.graph_traversers import get_nodes_export if not silent: print('STARTING EXPORT...') @@ -223,9 +224,31 @@ def export_tree( if not silent: print('RETRIEVING LINKED NODES AND STORING LINKS...') - to_be_exported, links_uuid, graph_traversal_rules = retrieve_linked_nodes( - given_calculation_entry_ids, given_data_entry_ids, **kwargs - ) + initial_nodes_ids = given_calculation_entry_ids.union(given_data_entry_ids) + traverse_output = get_nodes_export(starting_pks=initial_nodes_ids, get_links=True, **kwargs) + to_be_exported = traverse_output['nodes'] + graph_traversal_rules = traverse_output['rules'] + + # I create a utility dictionary for mapping pk to uuid. + if traverse_output['nodes']: + qbuilder = orm.QueryBuilder().append( + orm.Node, + project=('id', 'uuid'), + filters={'id': { + 'in': traverse_output['nodes'] + }}, + ) + pk_2_uuid_dict = dict(qbuilder.all()) + else: + pk_2_uuid_dict = {} + + # The set of tuples now has to be transformed to a list of dicts + links_uuid = [{ + 'input': pk_2_uuid_dict[link.source_id], + 'output': pk_2_uuid_dict[link.target_id], + 'label': link.link_label, + 'type': link.link_type + } for link in traverse_output['links']] ## Universal "entities" attributed to all types of nodes # Logs diff --git a/aiida/tools/importexport/dbexport/utils.py b/aiida/tools/importexport/dbexport/utils.py index b539ee4a35..a51a83d3b5 100644 --- a/aiida/tools/importexport/dbexport/utils.py +++ b/aiida/tools/importexport/dbexport/utils.py @@ -271,303 +271,3 @@ def check_process_nodes_sealed(nodes): 'All ProcessNodes must be sealed before they can be exported. ' 'Node(s) with PK(s): {} is/are not sealed.'.format(', '.join(str(pk) for pk in nodes - sealed_nodes)) ) - - -def _retrieve_linked_nodes_query(current_node, input_type, output_type, direction, link_type_value): - """Helper function for :py:func:`~aiida.tools.importexport.dbexport.utils.retrieve_linked_nodes` - - A general :py:class:`~aiida.orm.querybuilder.QueryBuilder` query, retrieving linked Nodes and returning link - information and the found Nodes. - - :param current_node: The current Node's PK. - :type current_node: int - - :param input_type: Source Node class for Link - :type input_type: :py:class:`~aiida.orm.nodes.data.data.Data`, - :py:class:`~aiida.orm.nodes.process.process.ProcessNode`. - - :param output_type: Target Node class for Link - :type output_type: :py:class:`~aiida.orm.nodes.data.data.Data`, - :py:class:`~aiida.orm.nodes.process.process.ProcessNode`. - - :param direction: Link direction, must be either ``'forward'`` or ``'backward'``. - :type direction: str - - :param link_type_value: A :py:class:`~aiida.common.links.LinkType` value, e.g. ``LinkType.RETURN.value``. - :type link_type_value: str - - :return: Dictionary of link information to be used for the export archive and set of found Nodes. - """ - found_nodes = set() - links_uuid_dict = {} - filters_input = {} - filters_output = {} - - if direction == 'forward': - filters_input['id'] = current_node - elif direction == 'backward': - filters_output['id'] = current_node - else: - raise exceptions.ExportValidationError('direction must be either "forward" or "backward"') - - builder = QueryBuilder() - builder.append(input_type, project=['uuid', 'id'], tag='input', filters=filters_input) - builder.append( - output_type, - project=['uuid', 'id'], - with_incoming='input', - filters=filters_output, - edge_filters={'type': link_type_value}, - edge_project=['label', 'type'] - ) - - for input_uuid, input_pk, output_uuid, output_pk, link_label, link_type in builder.iterall(): - links_uuid_entry = { - 'input': str(input_uuid), - 'output': str(output_uuid), - 'label': str(link_label), - 'type': str(link_type) - } - links_uuid_dict[frozenset(links_uuid_entry.items())] = links_uuid_entry - - node_pk = output_pk if direction == 'forward' else input_pk - found_nodes.add(node_pk) - - return links_uuid_dict, found_nodes - - -def retrieve_linked_nodes(process_nodes, data_nodes, **kwargs): # pylint: disable=too-many-statements - """Recursively retrieve linked Nodes and the links - - The rules for recursively following links/edges in the provenance graph are as follows, - where the Node types in bold symbolize the Node that is currently being exported, i.e., - it is this Node onto which the Link in question has been found. - - +----------------------+---------------------+---------------------+----------------+---------+ - |**LinkType_Direction**| **From** | **To** |Follow (default)|Togglable| - +======================+=====================+=====================+================+=========+ - | INPUT_CALC_FORWARD | **Data** | CalculationNode | False | True | - +----------------------+---------------------+---------------------+----------------+---------+ - | INPUT_CALC_BACKWARD | Data | **CalculationNode** | True | False | - +----------------------+---------------------+---------------------+----------------+---------+ - | CREATE_FORWARD | **CalculationNode** | Data | True | False | - +----------------------+---------------------+---------------------+----------------+---------+ - | CREATE_BACKWARD | CalculationNode | **Data** | True | True | - +----------------------+---------------------+---------------------+----------------+---------+ - | RETURN_FORWARD | **WorkflowNode** | Data | True | False | - +----------------------+---------------------+---------------------+----------------+---------+ - | RETURN_BACKWARD | WorkflowNode | **Data** | False | True | - +----------------------+---------------------+---------------------+----------------+---------+ - | INPUT_WORK_FORWARD | **Data** | WorkflowNode | False | True | - +----------------------+---------------------+---------------------+----------------+---------+ - | INPUT_WORK_BACKWARD | Data | **WorkflowNode** | True | False | - +----------------------+---------------------+---------------------+----------------+---------+ - | CALL_CALC_FORWARD | **WorkflowNode** | CalculationNode | True | False | - +----------------------+---------------------+---------------------+----------------+---------+ - | CALL_CALC_BACKWARD | WorkflowNode | **CalculationNode** | False | True | - +----------------------+---------------------+---------------------+----------------+---------+ - | CALL_WORK_FORWARD | **WorkflowNode** | WorkflowNode | True | False | - +----------------------+---------------------+---------------------+----------------+---------+ - | CALL_WORK_BACKWARD | WorkflowNode | **WorkflowNode** | False | True | - +----------------------+---------------------+---------------------+----------------+---------+ - - :param process_nodes: Set of :py:class:`~aiida.orm.nodes.process.process.ProcessNode` node PKs. - :param data_nodes: Set of :py:class:`~aiida.orm.nodes.data.data.Data` node PKs. - - :param input_calc_forward: Follow INPUT_CALC links in the forward direction (recursively). - :param create_backward: Follow CREATE links in the backward direction (recursively). - :param return_backward: Follow RETURN links in the backward direction (recursively). - :param input_work_forward: Follow INPUT_WORK links in the forward direction (recursively - :param call_calc_backward: Follow CALL_CALC links in the backward direction (recursively). - :param call_work_backward: Follow CALL_WORK links in the backward direction (recursively). - - :return: Set of retrieved Nodes, list of links information, and updated dict of LINK_FLAGS. - - :raises `~aiida.tools.importexport.common.exceptions.ExportValidationError`: if wrong or too many kwargs are given. - """ - from aiida.common.links import LinkType, GraphTraversalRules - from aiida.orm import Data - - # Initialization and set flags according to rules - retrieved_nodes = set() - links_uuid_dict = {} - traversal_rules = {} - - # Create the dictionary with graph traversal rules to be used in determing complete node set to be exported - for name, rule in GraphTraversalRules.EXPORT.value.items(): - - # Check that rules that are not toggleable are not specified in the keyword arguments - if not rule.toggleable and name in kwargs: - raise exceptions.ExportValidationError('traversal rule {} is not toggleable'.format(name)) - - # Use the rule value passed in the keyword arguments, or if not the case, use the default - traversal_rules[name] = kwargs.pop(name, rule.default) - - # We repeat until there are no further nodes to be visited - while process_nodes or data_nodes: - - # If is is a ProcessNode - if process_nodes: - current_node_pk = process_nodes.pop() - # If it is already visited continue to the next node - if current_node_pk in retrieved_nodes: - continue - - # Otherwise say that it is a node to be exported - retrieved_nodes.add(current_node_pk) - - # INPUT_CALC(Data, CalculationNode) - Backward - if traversal_rules['input_calc_backward']: - links_uuids, found_nodes = _retrieve_linked_nodes_query( - current_node_pk, - input_type=Data, - output_type=ProcessNode, - direction='backward', - link_type_value=LinkType.INPUT_CALC.value - ) - data_nodes.update(found_nodes - retrieved_nodes) - links_uuid_dict.update(links_uuids) - - # CREATE(CalculationNode, Data) - Forward - if traversal_rules['create_forward']: - links_uuids, found_nodes = _retrieve_linked_nodes_query( - current_node_pk, - input_type=ProcessNode, - output_type=Data, - direction='forward', - link_type_value=LinkType.CREATE.value - ) - data_nodes.update(found_nodes - retrieved_nodes) - links_uuid_dict.update(links_uuids) - - # RETURN(WorkflowNode, Data) - Forward - if traversal_rules['return_forward']: - links_uuids, found_nodes = _retrieve_linked_nodes_query( - current_node_pk, - input_type=ProcessNode, - output_type=Data, - direction='forward', - link_type_value=LinkType.RETURN.value - ) - data_nodes.update(found_nodes - retrieved_nodes) - links_uuid_dict.update(links_uuids) - - # INPUT_WORK(Data, WorkflowNode) - Backward - if traversal_rules['input_work_backward']: - links_uuids, found_nodes = _retrieve_linked_nodes_query( - current_node_pk, - input_type=Data, - output_type=ProcessNode, - direction='backward', - link_type_value=LinkType.INPUT_WORK.value - ) - data_nodes.update(found_nodes - retrieved_nodes) - links_uuid_dict.update(links_uuids) - - # CALL_CALC(WorkflowNode, CalculationNode) - Forward - if traversal_rules['call_calc_forward']: - links_uuids, found_nodes = _retrieve_linked_nodes_query( - current_node_pk, - input_type=ProcessNode, - output_type=ProcessNode, - direction='forward', - link_type_value=LinkType.CALL_CALC.value - ) - process_nodes.update(found_nodes - retrieved_nodes) - links_uuid_dict.update(links_uuids) - - # CALL_CALC(WorkflowNode, CalculationNode) - Backward - if traversal_rules['call_calc_backward']: - links_uuids, found_nodes = _retrieve_linked_nodes_query( - current_node_pk, - input_type=ProcessNode, - output_type=ProcessNode, - direction='backward', - link_type_value=LinkType.CALL_CALC.value - ) - process_nodes.update(found_nodes - retrieved_nodes) - links_uuid_dict.update(links_uuids) - - # CALL_WORK(WorkflowNode, WorkflowNode) - Forward - if traversal_rules['call_work_forward']: - links_uuids, found_nodes = _retrieve_linked_nodes_query( - current_node_pk, - input_type=ProcessNode, - output_type=ProcessNode, - direction='forward', - link_type_value=LinkType.CALL_WORK.value - ) - process_nodes.update(found_nodes - retrieved_nodes) - links_uuid_dict.update(links_uuids) - - # CALL_WORK(WorkflowNode, WorkflowNode) - Backward - if traversal_rules['call_work_backward']: - links_uuids, found_nodes = _retrieve_linked_nodes_query( - current_node_pk, - input_type=ProcessNode, - output_type=ProcessNode, - direction='backward', - link_type_value=LinkType.CALL_WORK.value - ) - process_nodes.update(found_nodes - retrieved_nodes) - links_uuid_dict.update(links_uuids) - - # If it is a Data - else: - current_node_pk = data_nodes.pop() - # If it is already visited continue to the next node - if current_node_pk in retrieved_nodes: - continue - - # Otherwise say that it is a node to be exported - retrieved_nodes.add(current_node_pk) - - # INPUT_CALC(Data, CalculationNode) - Forward - if traversal_rules['input_calc_forward']: - links_uuids, found_nodes = _retrieve_linked_nodes_query( - current_node_pk, - input_type=Data, - output_type=ProcessNode, - direction='forward', - link_type_value=LinkType.INPUT_CALC.value - ) - process_nodes.update(found_nodes - retrieved_nodes) - links_uuid_dict.update(links_uuids) - - # CREATE(CalculationNode, Data) - Backward - if traversal_rules['create_backward']: - links_uuids, found_nodes = _retrieve_linked_nodes_query( - current_node_pk, - input_type=ProcessNode, - output_type=Data, - direction='backward', - link_type_value=LinkType.CREATE.value - ) - process_nodes.update(found_nodes - retrieved_nodes) - links_uuid_dict.update(links_uuids) - - # RETURN(WorkflowNode, Data) - Backward - if traversal_rules['return_backward']: - links_uuids, found_nodes = _retrieve_linked_nodes_query( - current_node_pk, - input_type=ProcessNode, - output_type=Data, - direction='backward', - link_type_value=LinkType.RETURN.value - ) - process_nodes.update(found_nodes - retrieved_nodes) - links_uuid_dict.update(links_uuids) - - # INPUT_WORK(Data, WorkflowNode) - Forward - if traversal_rules['input_work_forward']: - links_uuids, found_nodes = _retrieve_linked_nodes_query( - current_node_pk, - input_type=Data, - output_type=ProcessNode, - direction='forward', - link_type_value=LinkType.INPUT_WORK.value - ) - process_nodes.update(found_nodes - retrieved_nodes) - links_uuid_dict.update(links_uuids) - - return retrieved_nodes, list(links_uuid_dict.values()), traversal_rules