Skip to content

Commit

Permalink
Add traverse_graph / AGE as engine for export
Browse files Browse the repository at this point in the history
The export function now uses the get_nodes_delete function (with the
traverse_graph underlying interface using AGE as the main engine) to
collect the extra nodes that are needed to keep a consistent provenance.
This is performed, more specifically, by the 'retrieve_linked_nodes'
function. Whereas previously a different query was performed for each
new node added in the previous query step, this new implementation
should do a single new query for all the nodes that were added in the
previous query step. So these changes are not only important as a first
step to homogenize graph traversal throughout the whole code: an
improvement in the export procedure is expected as well.
  • Loading branch information
lekah authored and ramirezfranciscof committed Jan 10, 2020
1 parent cadad4c commit 19d9f5b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 304 deletions.
31 changes: 27 additions & 4 deletions aiida/tools/importexport/dbexport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
)
from aiida.tools.importexport.common.utils import export_shard_uuid
from aiida.tools.importexport.dbexport.utils import (
check_licenses, fill_in_query, serialize_dict, check_process_nodes_sealed, retrieve_linked_nodes
check_licenses, fill_in_query, serialize_dict, check_process_nodes_sealed
)

from .zip import ZipFolder
Expand Down Expand Up @@ -138,6 +138,7 @@ def export_tree(
:raises `~aiida.common.exceptions.LicensingException`: if any node is licensed under forbidden license.
"""
from collections import defaultdict
from aiida.tools.graph.graph_traversers import get_nodes_export

if not silent:
print('STARTING EXPORT...')
Expand Down Expand Up @@ -223,9 +224,31 @@ def export_tree(
if not silent:
print('RETRIEVING LINKED NODES AND STORING LINKS...')

to_be_exported, links_uuid, graph_traversal_rules = retrieve_linked_nodes(
given_calculation_entry_ids, given_data_entry_ids, **kwargs
)
initial_nodes_ids = given_calculation_entry_ids.union(given_data_entry_ids)
traverse_output = get_nodes_export(starting_pks=initial_nodes_ids, get_links=True, **kwargs)
to_be_exported = traverse_output['nodes']
graph_traversal_rules = traverse_output['rules']

# I create a utility dictionary for mapping pk to uuid.
if traverse_output['nodes']:
qbuilder = orm.QueryBuilder().append(
orm.Node,
project=('id', 'uuid'),
filters={'id': {
'in': traverse_output['nodes']
}},
)
pk_2_uuid_dict = dict(qbuilder.all())
else:
pk_2_uuid_dict = {}

# The set of tuples now has to be transformed to a list of dicts
links_uuid = [{
'input': pk_2_uuid_dict[link.source_id],
'output': pk_2_uuid_dict[link.target_id],
'label': link.link_label,
'type': link.link_type
} for link in traverse_output['links']]

## Universal "entities" attributed to all types of nodes
# Logs
Expand Down
300 changes: 0 additions & 300 deletions aiida/tools/importexport/dbexport/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,303 +271,3 @@ def check_process_nodes_sealed(nodes):
'All ProcessNodes must be sealed before they can be exported. '
'Node(s) with PK(s): {} is/are not sealed.'.format(', '.join(str(pk) for pk in nodes - sealed_nodes))
)


def _retrieve_linked_nodes_query(current_node, input_type, output_type, direction, link_type_value):
"""Helper function for :py:func:`~aiida.tools.importexport.dbexport.utils.retrieve_linked_nodes`
A general :py:class:`~aiida.orm.querybuilder.QueryBuilder` query, retrieving linked Nodes and returning link
information and the found Nodes.
:param current_node: The current Node's PK.
:type current_node: int
:param input_type: Source Node class for Link
:type input_type: :py:class:`~aiida.orm.nodes.data.data.Data`,
:py:class:`~aiida.orm.nodes.process.process.ProcessNode`.
:param output_type: Target Node class for Link
:type output_type: :py:class:`~aiida.orm.nodes.data.data.Data`,
:py:class:`~aiida.orm.nodes.process.process.ProcessNode`.
:param direction: Link direction, must be either ``'forward'`` or ``'backward'``.
:type direction: str
:param link_type_value: A :py:class:`~aiida.common.links.LinkType` value, e.g. ``LinkType.RETURN.value``.
:type link_type_value: str
:return: Dictionary of link information to be used for the export archive and set of found Nodes.
"""
found_nodes = set()
links_uuid_dict = {}
filters_input = {}
filters_output = {}

if direction == 'forward':
filters_input['id'] = current_node
elif direction == 'backward':
filters_output['id'] = current_node
else:
raise exceptions.ExportValidationError('direction must be either "forward" or "backward"')

builder = QueryBuilder()
builder.append(input_type, project=['uuid', 'id'], tag='input', filters=filters_input)
builder.append(
output_type,
project=['uuid', 'id'],
with_incoming='input',
filters=filters_output,
edge_filters={'type': link_type_value},
edge_project=['label', 'type']
)

for input_uuid, input_pk, output_uuid, output_pk, link_label, link_type in builder.iterall():
links_uuid_entry = {
'input': str(input_uuid),
'output': str(output_uuid),
'label': str(link_label),
'type': str(link_type)
}
links_uuid_dict[frozenset(links_uuid_entry.items())] = links_uuid_entry

node_pk = output_pk if direction == 'forward' else input_pk
found_nodes.add(node_pk)

return links_uuid_dict, found_nodes


def retrieve_linked_nodes(process_nodes, data_nodes, **kwargs): # pylint: disable=too-many-statements
"""Recursively retrieve linked Nodes and the links
The rules for recursively following links/edges in the provenance graph are as follows,
where the Node types in bold symbolize the Node that is currently being exported, i.e.,
it is this Node onto which the Link in question has been found.
+----------------------+---------------------+---------------------+----------------+---------+
|**LinkType_Direction**| **From** | **To** |Follow (default)|Togglable|
+======================+=====================+=====================+================+=========+
| INPUT_CALC_FORWARD | **Data** | CalculationNode | False | True |
+----------------------+---------------------+---------------------+----------------+---------+
| INPUT_CALC_BACKWARD | Data | **CalculationNode** | True | False |
+----------------------+---------------------+---------------------+----------------+---------+
| CREATE_FORWARD | **CalculationNode** | Data | True | False |
+----------------------+---------------------+---------------------+----------------+---------+
| CREATE_BACKWARD | CalculationNode | **Data** | True | True |
+----------------------+---------------------+---------------------+----------------+---------+
| RETURN_FORWARD | **WorkflowNode** | Data | True | False |
+----------------------+---------------------+---------------------+----------------+---------+
| RETURN_BACKWARD | WorkflowNode | **Data** | False | True |
+----------------------+---------------------+---------------------+----------------+---------+
| INPUT_WORK_FORWARD | **Data** | WorkflowNode | False | True |
+----------------------+---------------------+---------------------+----------------+---------+
| INPUT_WORK_BACKWARD | Data | **WorkflowNode** | True | False |
+----------------------+---------------------+---------------------+----------------+---------+
| CALL_CALC_FORWARD | **WorkflowNode** | CalculationNode | True | False |
+----------------------+---------------------+---------------------+----------------+---------+
| CALL_CALC_BACKWARD | WorkflowNode | **CalculationNode** | False | True |
+----------------------+---------------------+---------------------+----------------+---------+
| CALL_WORK_FORWARD | **WorkflowNode** | WorkflowNode | True | False |
+----------------------+---------------------+---------------------+----------------+---------+
| CALL_WORK_BACKWARD | WorkflowNode | **WorkflowNode** | False | True |
+----------------------+---------------------+---------------------+----------------+---------+
:param process_nodes: Set of :py:class:`~aiida.orm.nodes.process.process.ProcessNode` node PKs.
:param data_nodes: Set of :py:class:`~aiida.orm.nodes.data.data.Data` node PKs.
:param input_calc_forward: Follow INPUT_CALC links in the forward direction (recursively).
:param create_backward: Follow CREATE links in the backward direction (recursively).
:param return_backward: Follow RETURN links in the backward direction (recursively).
:param input_work_forward: Follow INPUT_WORK links in the forward direction (recursively
:param call_calc_backward: Follow CALL_CALC links in the backward direction (recursively).
:param call_work_backward: Follow CALL_WORK links in the backward direction (recursively).
:return: Set of retrieved Nodes, list of links information, and updated dict of LINK_FLAGS.
:raises `~aiida.tools.importexport.common.exceptions.ExportValidationError`: if wrong or too many kwargs are given.
"""
from aiida.common.links import LinkType, GraphTraversalRules
from aiida.orm import Data

# Initialization and set flags according to rules
retrieved_nodes = set()
links_uuid_dict = {}
traversal_rules = {}

# Create the dictionary with graph traversal rules to be used in determing complete node set to be exported
for name, rule in GraphTraversalRules.EXPORT.value.items():

# Check that rules that are not toggleable are not specified in the keyword arguments
if not rule.toggleable and name in kwargs:
raise exceptions.ExportValidationError('traversal rule {} is not toggleable'.format(name))

# Use the rule value passed in the keyword arguments, or if not the case, use the default
traversal_rules[name] = kwargs.pop(name, rule.default)

# We repeat until there are no further nodes to be visited
while process_nodes or data_nodes:

# If is is a ProcessNode
if process_nodes:
current_node_pk = process_nodes.pop()
# If it is already visited continue to the next node
if current_node_pk in retrieved_nodes:
continue

# Otherwise say that it is a node to be exported
retrieved_nodes.add(current_node_pk)

# INPUT_CALC(Data, CalculationNode) - Backward
if traversal_rules['input_calc_backward']:
links_uuids, found_nodes = _retrieve_linked_nodes_query(
current_node_pk,
input_type=Data,
output_type=ProcessNode,
direction='backward',
link_type_value=LinkType.INPUT_CALC.value
)
data_nodes.update(found_nodes - retrieved_nodes)
links_uuid_dict.update(links_uuids)

# CREATE(CalculationNode, Data) - Forward
if traversal_rules['create_forward']:
links_uuids, found_nodes = _retrieve_linked_nodes_query(
current_node_pk,
input_type=ProcessNode,
output_type=Data,
direction='forward',
link_type_value=LinkType.CREATE.value
)
data_nodes.update(found_nodes - retrieved_nodes)
links_uuid_dict.update(links_uuids)

# RETURN(WorkflowNode, Data) - Forward
if traversal_rules['return_forward']:
links_uuids, found_nodes = _retrieve_linked_nodes_query(
current_node_pk,
input_type=ProcessNode,
output_type=Data,
direction='forward',
link_type_value=LinkType.RETURN.value
)
data_nodes.update(found_nodes - retrieved_nodes)
links_uuid_dict.update(links_uuids)

# INPUT_WORK(Data, WorkflowNode) - Backward
if traversal_rules['input_work_backward']:
links_uuids, found_nodes = _retrieve_linked_nodes_query(
current_node_pk,
input_type=Data,
output_type=ProcessNode,
direction='backward',
link_type_value=LinkType.INPUT_WORK.value
)
data_nodes.update(found_nodes - retrieved_nodes)
links_uuid_dict.update(links_uuids)

# CALL_CALC(WorkflowNode, CalculationNode) - Forward
if traversal_rules['call_calc_forward']:
links_uuids, found_nodes = _retrieve_linked_nodes_query(
current_node_pk,
input_type=ProcessNode,
output_type=ProcessNode,
direction='forward',
link_type_value=LinkType.CALL_CALC.value
)
process_nodes.update(found_nodes - retrieved_nodes)
links_uuid_dict.update(links_uuids)

# CALL_CALC(WorkflowNode, CalculationNode) - Backward
if traversal_rules['call_calc_backward']:
links_uuids, found_nodes = _retrieve_linked_nodes_query(
current_node_pk,
input_type=ProcessNode,
output_type=ProcessNode,
direction='backward',
link_type_value=LinkType.CALL_CALC.value
)
process_nodes.update(found_nodes - retrieved_nodes)
links_uuid_dict.update(links_uuids)

# CALL_WORK(WorkflowNode, WorkflowNode) - Forward
if traversal_rules['call_work_forward']:
links_uuids, found_nodes = _retrieve_linked_nodes_query(
current_node_pk,
input_type=ProcessNode,
output_type=ProcessNode,
direction='forward',
link_type_value=LinkType.CALL_WORK.value
)
process_nodes.update(found_nodes - retrieved_nodes)
links_uuid_dict.update(links_uuids)

# CALL_WORK(WorkflowNode, WorkflowNode) - Backward
if traversal_rules['call_work_backward']:
links_uuids, found_nodes = _retrieve_linked_nodes_query(
current_node_pk,
input_type=ProcessNode,
output_type=ProcessNode,
direction='backward',
link_type_value=LinkType.CALL_WORK.value
)
process_nodes.update(found_nodes - retrieved_nodes)
links_uuid_dict.update(links_uuids)

# If it is a Data
else:
current_node_pk = data_nodes.pop()
# If it is already visited continue to the next node
if current_node_pk in retrieved_nodes:
continue

# Otherwise say that it is a node to be exported
retrieved_nodes.add(current_node_pk)

# INPUT_CALC(Data, CalculationNode) - Forward
if traversal_rules['input_calc_forward']:
links_uuids, found_nodes = _retrieve_linked_nodes_query(
current_node_pk,
input_type=Data,
output_type=ProcessNode,
direction='forward',
link_type_value=LinkType.INPUT_CALC.value
)
process_nodes.update(found_nodes - retrieved_nodes)
links_uuid_dict.update(links_uuids)

# CREATE(CalculationNode, Data) - Backward
if traversal_rules['create_backward']:
links_uuids, found_nodes = _retrieve_linked_nodes_query(
current_node_pk,
input_type=ProcessNode,
output_type=Data,
direction='backward',
link_type_value=LinkType.CREATE.value
)
process_nodes.update(found_nodes - retrieved_nodes)
links_uuid_dict.update(links_uuids)

# RETURN(WorkflowNode, Data) - Backward
if traversal_rules['return_backward']:
links_uuids, found_nodes = _retrieve_linked_nodes_query(
current_node_pk,
input_type=ProcessNode,
output_type=Data,
direction='backward',
link_type_value=LinkType.RETURN.value
)
process_nodes.update(found_nodes - retrieved_nodes)
links_uuid_dict.update(links_uuids)

# INPUT_WORK(Data, WorkflowNode) - Forward
if traversal_rules['input_work_forward']:
links_uuids, found_nodes = _retrieve_linked_nodes_query(
current_node_pk,
input_type=Data,
output_type=ProcessNode,
direction='forward',
link_type_value=LinkType.INPUT_WORK.value
)
process_nodes.update(found_nodes - retrieved_nodes)
links_uuid_dict.update(links_uuids)

return retrieved_nodes, list(links_uuid_dict.values()), traversal_rules

0 comments on commit 19d9f5b

Please sign in to comment.