Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Node to the new backend interface #2481

Merged
merged 4 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions .ci/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
from aiida.common import exceptions
from aiida.manage.caching import enable_caching
from aiida.daemon.client import get_daemon_client
from aiida.orm import Code, CalculationFactory, DataFactory
from aiida.orm import Code, CalculationFactory, DataFactory, load_node
from aiida.orm.node.data.int import Int
from aiida.orm.node.data.str import Str
from aiida.orm.node.data.list import List
from aiida.orm.node import CalcJobNode
from aiida.orm import CalcJobNode
from aiida.work.launch import run_get_node, submit
from aiida.work.persistence import ObjectLoader
from workchains import (
Expand Down Expand Up @@ -55,7 +55,6 @@ def print_daemon_log():


def jobs_have_finished(pks):
global load_node # instantiated by the verdi run command
finished_list = [load_node(pk).is_terminated for pk in pks]
node_list = [load_node(pk) for pk in pks]
num_finished = len([_ for _ in finished_list if _])
Expand All @@ -79,7 +78,6 @@ def print_report(pk):


def validate_calculations(expected_results):
global load_node # instantiated by the verdi run command
valid = True
actual_dict = {}
for pk, expected_dict in expected_results.items():
Expand Down Expand Up @@ -112,7 +110,6 @@ def validate_calculations(expected_results):


def validate_workchains(expected_results):
global load_node # instantiated by the verdi run command
valid = True
for pk, expected_value in expected_results.items():
this_valid = True
Expand Down Expand Up @@ -148,7 +145,6 @@ def validate_cached(cached_calcs):
"""
Check that the calculations with created with caching are indeed cached.
"""
global load_node # instantiated by the verdi run command
valid = True
for calc in cached_calcs:

Expand All @@ -158,18 +154,18 @@ def validate_cached(cached_calcs):
print_report(calc.pk)
valid = False

if '_aiida_cached_from' not in calc.extras() or calc.get_hash() != calc.get_extra('_aiida_hash'):
if '_aiida_cached_from' not in calc.extras or calc.get_hash() != calc.get_extra('_aiida_hash'):
print('Cached calculation<{}> has invalid hash'.format(calc.pk))
print_report(calc.pk)
valid = False

if isinstance(calc, CalcJobNode):
if 'raw_input' not in calc.folder.get_content_list():
if 'raw_input' not in calc.repository._get_folder_pathsubfolder.get_content_list():
print("Cached calculation <{}> does not have a 'raw_input' folder".format(calc.pk))
print_report(calc.pk)
valid = False
original_calc = load_node(calc.get_extra('_aiida_cached_from'))
if 'raw_input' not in original_calc.folder.get_content_list():
if 'raw_input' not in original_calc.repository._get_folder_pathsubfolder.get_content_list():
print("Original calculation <{}> does not have a 'raw_input' folder after being cached from."
.format(original_calc.pk))
valid = False
Expand All @@ -183,7 +179,7 @@ def launch_calculation(code, counter, inputval):
"""
process, inputs, expected_result = create_calculation_process(code=code, inputval=inputval)
calc = submit(process, **inputs)
print("[{}] launched calculation {}, pk={}".format(counter, calc.uuid, calc.dbnode.pk))
print("[{}] launched calculation {}, pk={}".format(counter, calc.uuid, calc.pk))
return calc, expected_result


Expand Down
7 changes: 0 additions & 7 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,21 +151,14 @@
aiida/orm/implementation/calculation.py|
aiida/orm/implementation/django/authinfo.py|
aiida/orm/implementation/django/backend.py|
aiida/orm/implementation/django/code.py|
aiida/orm/implementation/django/computer.py|
aiida/orm/implementation/django/__init__.py|
aiida/orm/implementation/django/node.py|
aiida/orm/implementation/django/user.py|
aiida/orm/implementation/django/utils.py|
aiida/orm/implementation/general/__init__.py|
aiida/orm/implementation/general/node.py|
aiida/orm/implementation/__init__.py|
aiida/orm/implementation/sqlalchemy/authinfo.py|
aiida/orm/implementation/sqlalchemy/backend.py|
aiida/orm/implementation/sqlalchemy/code.py|
aiida/orm/implementation/sqlalchemy/computer.py|
aiida/orm/implementation/sqlalchemy/__init__.py|
aiida/orm/implementation/sqlalchemy/node.py|
aiida/orm/implementation/sqlalchemy/querytool.py|
aiida/orm/implementation/sqlalchemy/utils.py|
aiida/orm/__init__.py|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def delete_trajectory_symbols_array(apps, _):
trajectory = load_node(t_pk)
modifier.del_value_for_node(DbNode.objects.get(pk=trajectory.pk), 'array|symbols')
# Remove the .npy file (using delete_array raises ModificationNotAllowed error)
trajectory._get_folder_pathsubfolder.remove_path('symbols.npy') # pylint: disable=protected-access
trajectory.repository._get_folder_pathsubfolder.remove_path('symbols.npy') # pylint: disable=protected-access


def create_trajectory_symbols_array(apps, _):
Expand All @@ -61,12 +61,12 @@ def create_trajectory_symbols_array(apps, _):
'id', flat=True)
for t_pk in trajectories_pk:
trajectory = load_node(t_pk)
symbols = numpy.array(trajectory.get_attr('symbols'))
symbols = numpy.array(trajectory.get_attribute('symbols'))
# Save the .npy file (using set_array raises ModificationNotAllowed error)
with tempfile.NamedTemporaryFile() as _file:
numpy.save(_file, symbols)
_file.flush()
trajectory._get_folder_pathsubfolder.insert_path(_file.name, 'symbols.npy') # pylint: disable=protected-access
trajectory.repository._get_folder_pathsubfolder.insert_path(_file.name, 'symbols.npy') # pylint: disable=protected-access
modifier.set_value_for_node(DbNode.objects.get(pk=trajectory.pk), 'array|symbols', list(symbols.shape))


Expand Down
219 changes: 1 addition & 218 deletions aiida/backends/djsite/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import sys

import six
from six.moves import zip, range
Expand Down Expand Up @@ -1502,6 +1501,7 @@ class WorkflowDataValueType(Enumerate):
wf_start_call = "start"
wf_exit_call = "exit"
wf_default_call = "none"

@python_2_unicode_compatible
class DbWorkflow(m.Model):
uuid = m.UUIDField(default=get_new_uuid, unique=True)
Expand All @@ -1527,127 +1527,6 @@ class DbWorkflow(m.Model):
# Return aiida Node instances or their subclasses instead of DbNode instances
aiidaobjects = AiidaObjectManager()

def set_state(self, _state):
self.state = _state
self.save()

def set_script_md5(self, _md5):

self.script_md5 = _md5
self.save()

def add_data(self, dict, d_type):
try:
for k in dict.keys():
p, create = self.data.get_or_create(name=k, data_type=d_type)
p.set_value(dict[k])
except Exception as e:
raise

def get_data(self, d_type):
try:
dict = {}
for p in self.data.filter(parent=self, data_type=d_type):
dict[p.name] = p.get_value()
return dict
except Exception as e:
raise

def add_parameters(self, dict, force=False):
from aiida.common.datastructures import wf_states, wf_data_types

if not self.state == wf_states.INITIALIZED and not force:
raise ValueError("Cannot add initial parameters to an already initialized workflow")

self.add_data(dict, wf_data_types.PARAMETER)

def add_parameter(self, name, value):
self.add_parameters({name: value})

def get_parameters(self):
from aiida.common.datastructures import wf_data_types

return self.get_data(wf_data_types.PARAMETER)

def get_parameter(self, name):
res = self.get_parameters()
if name in res:
return res[name]
else:
raise ValueError("Error retrieving results: {0}".format(name))

def add_results(self, dict):
from aiida.common.datastructures import wf_data_types

self.add_data(dict, wf_data_types.RESULT)

def add_result(self, name, value):
self.add_results({name: value})

def get_results(self):
from aiida.common.datastructures import wf_data_types

return self.get_data(wf_data_types.RESULT)

def get_result(self, name):
res = self.get_results()
if name in res:
return res[name]
else:
raise ValueError("Error retrieving results: {0}".format(name))

def add_attributes(self, dict):
from aiida.common.datastructures import wf_data_types

self.add_data(dict, wf_data_types.ATTRIBUTE)

def add_attribute(self, name, value):
self.add_attributes({name: value})

def get_attributes(self):
from aiida.common.datastructures import wf_data_types

return self.get_data(wf_data_types.ATTRIBUTE)

def get_attribute(self, name):
res = self.get_attributes()
if name in res:
return res[name]
else:
raise ValueError("Error retrieving results: {0}".format(name))

def clear_report(self):
self.report = ''
self.save()

def append_to_report(self, _text):
from aiida.common.timezone import UTC
import datetime

now = datetime.datetime.utcnow().replace(tzinfo=UTC)
self.report += str(now) + "] " + _text + "\n"
self.save()

def get_calculations(self):
from aiida.orm.node import CalcJobNode

return CalcJobNode.query(workflow_step=self.steps)

def get_sub_workflows(self):
return DbWorkflow.objects.filter(parent_workflow_step=self.steps.all())

def is_subworkflow(self):
"""
Return True if this is a subworkflow, False if it is a root workflow,
launched by the user.
"""
return len(self.parent_workflow_step.all()) > 0

def finish(self):
from aiida.common.datastructures import wf_states

self.state = wf_states.FINISHED

def __str__(self):
simplename = self.module_class
# node pk + type
Expand All @@ -1672,40 +1551,6 @@ class DbWorkflowData(m.Model):
class Meta:
unique_together = (("parent", "name", "data_type"))

def set_value(self, arg):
from aiida.orm.node import Node
from aiida.common.datastructures import wf_data_value_types
import aiida.common.json as json

try:
if isinstance(arg, Node) or issubclass(arg.__class__, Node):
if arg.pk is None:
raise ValueError("Cannot add an unstored node as an attribute of a Workflow!")
self.aiida_obj = arg.dbnode
self.value_type = wf_data_value_types.AIIDA
self.save()
else:
self.json_value = json.dumps(arg)
self.value_type = wf_data_value_types.JSON
self.save()
except Exception as exc:
six.reraise(ValueError, "Cannot set the parameter {}".format(self.name), sys.exc_info()[2])

def get_value(self):
from aiida.orm.convert import get_orm_entity
import aiida.common.json as json

from aiida.common.datastructures import wf_data_value_types

if self.value_type == wf_data_value_types.JSON:
return json.loads(self.json_value)
elif self.value_type == wf_data_value_types.AIIDA:
return get_orm_entity(self.aiida_obj)
elif self.value_type == wf_data_value_types.NONE:
return None
else:
raise ValueError("Cannot rebuild the parameter {}".format(self.name))

def __str__(self):
return "Data for workflow {} [{}]: {}".format(
self.parent.module_class, self.parent.pk, self.name)
Expand All @@ -1730,68 +1575,6 @@ class DbWorkflowStep(m.Model):
class Meta:
unique_together = (("parent", "name"))

def add_calculation(self, step_calculation):
from aiida.orm.node import CalcJobNode

if (not isinstance(step_calculation, CalcJobNode)):
raise ValueError("Cannot add a non-Calculation object to a workflow step")

try:
self.calculations.add(step_calculation)
except:
raise ValueError("Error adding calculation to step")

def get_calculations(self, state=None):
from aiida.orm.node import CalcJobNode

if (state == None):
return CalcJobNode.query(workflow_step=self)
else:
return CalcJobNode.query(workflow_step=self).filter(
dbattributes__key="state", dbattributes__tval=state)

def remove_calculations(self):
self.calculations.all().delete()

def add_sub_workflow(self, sub_wf):
from aiida.orm.workflow import Workflow

if (not issubclass(sub_wf.__class__, Workflow) and not isinstance(sub_wf, Workflow)):
raise ValueError("Cannot add a workflow not of type Workflow")
try:
self.sub_workflows.add(sub_wf.dbworkflowinstance)
except:
raise ValueError("Error adding calculation to step")

def get_sub_workflows(self):
return self.sub_workflows(manager='aiidaobjects').all()

def remove_sub_workflows(self):
self.sub_workflows.all().delete()

def is_finished(self):
from aiida.common.datastructures import wf_states

return self.state == wf_states.FINISHED

def set_nextcall(self, _nextcall):
self.nextcall = _nextcall
self.save()

def set_state(self, _state):
self.state = _state
self.save()

def reinitialize(self):
from aiida.common.datastructures import wf_states

self.set_state(wf_states.INITIALIZED)

def finish(self):
from aiida.common.datastructures import wf_states

self.set_state(wf_states.FINISHED)

def __str__(self):
return "Step {} for workflow {} [{}]".format(self.name,
self.parent.module_class, self.parent.pk)
Loading