Skip to content

Commit

Permalink
Move last_job_info from JSON-serialized string to dictionary (#3651)
Browse files Browse the repository at this point in the history
For historical reasons, this field was stored as a JSON-serialized
string in the `last_jobinfo` attribute of a CalcJob. However, this
is cumbersome and makes querying very hard.

We now replace this with a dictionary, thanks to new commands to
get directly a dictionary (with serialized fields, so that the
dictionary is JSON-serializable). These (and existing) methods of
the JobInfo class are now also tested.

Finally, the attribute key has been renamed from `last_jobinfo` to
`last_job_info`, for consistency with the key `detailed_job_info`
introduced in #3639. By changing the type of the content, the field is
anyway not directly usable as before in scripts, so changing the name
is not an additional issue. This should not give a real backward-
incompatibility problem, since this field was there mostly for
debugging reasons.
  • Loading branch information
giovannipizzi authored and sphuber committed Dec 13, 2019
1 parent 7895662 commit 9092b93
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 55 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ jobs:
run: |
pip install numpy==1.17.4
pip install -e .[all]
pip freeze
- name: Run pre-commit
run:
Expand Down
2 changes: 0 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,9 @@
aiida/plugins/entry.py|
aiida/plugins/info.py|
aiida/plugins/registry.py|
aiida/schedulers/datastructures.py|
aiida/schedulers/plugins/direct.py|
aiida/schedulers/plugins/lsf.py|
aiida/schedulers/plugins/pbsbaseclasses.py|
aiida/schedulers/plugins/pbsbaseclasses.py|
aiida/schedulers/plugins/sge.py|
aiida/schedulers/plugins/slurm.py|
aiida/schedulers/plugins/test_direct.py|
Expand Down
11 changes: 5 additions & 6 deletions aiida/orm/nodes/process/calculation/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CalcJobNode(CalculationNode):
SCHEDULER_JOB_ID_KEY = 'job_id'
SCHEDULER_STATE_KEY = 'scheduler_state'
SCHEDULER_LAST_CHECK_TIME_KEY = 'scheduler_lastchecktime'
SCHEDULER_LAST_JOB_INFO_KEY = 'last_jobinfo'
SCHEDULER_LAST_JOB_INFO_KEY = 'last_job_info'
SCHEDULER_DETAILED_JOB_INFO_KEY = 'detailed_job_info'

# Base path within the repository where to put objects by default
Expand Down Expand Up @@ -436,7 +436,7 @@ def set_last_job_info(self, last_job_info):
:param last_job_info: a `JobInfo` object
"""
self.set_attribute(self.SCHEDULER_LAST_JOB_INFO_KEY, last_job_info.serialize())
self.set_attribute(self.SCHEDULER_LAST_JOB_INFO_KEY, last_job_info.get_dict())

def get_last_job_info(self):
"""Return the last information asked to the scheduler about the status of the job.
Expand All @@ -445,11 +445,10 @@ def get_last_job_info(self):
"""
from aiida.schedulers.datastructures import JobInfo

last_job_info_serialized = self.get_attribute(self.SCHEDULER_LAST_JOB_INFO_KEY, None)
last_job_info_dictserialized = self.get_attribute(self.SCHEDULER_LAST_JOB_INFO_KEY, None)

if last_job_info_serialized is not None:
job_info = JobInfo()
job_info.load_from_serialized(last_job_info_serialized)
if last_job_info_dictserialized is not None:
job_info = JobInfo.load_from_dict(last_job_info_dictserialized)
else:
job_info = None

Expand Down
111 changes: 70 additions & 41 deletions aiida/schedulers/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

SCHEDULER_LOGGER = AIIDA_LOGGER.getChild('scheduler')

__all__ = ('JobState', 'JobResource', 'JobTemplate', 'JobInfo', 'NodeNumberJobResource', 'ParEnvJobResource', 'MachineInfo')
__all__ = (
'JobState', 'JobResource', 'JobTemplate', 'JobInfo', 'NodeNumberJobResource', 'ParEnvJobResource', 'MachineInfo'
)


class JobState(Enum):
Expand Down Expand Up @@ -115,7 +117,7 @@ def accepts_default_mpiprocs_per_machine(cls):
"""
return True

def __init__(self, **kwargs):
def __init__(self, **kwargs): # pylint: disable=too-many-branches,too-many-statements
"""
Initialize the job resources from the passed arguments (the valid keys can be
obtained with the function self.get_valid_keys()).
Expand Down Expand Up @@ -169,17 +171,21 @@ def __init__(self, **kwargs):
raise ValueError('num_cores_per_mpiproc must an integer')

if kwargs:
raise TypeError('The following parameters were not recognized for '
'the JobResource: {}'.format(kwargs.keys()))
raise TypeError(
'The following parameters were not recognized for '
'the JobResource: {}'.format(kwargs.keys())
)

if num_machines is None:
# Use default value, if not provided
if num_mpiprocs_per_machine is None:
num_mpiprocs_per_machine = default_mpiprocs_per_machine

if num_mpiprocs_per_machine is None or tot_num_mpiprocs is None:
raise TypeError('At least two among num_machines, '
'num_mpiprocs_per_machine or tot_num_mpiprocs must be specified')
raise TypeError(
'At least two among num_machines, '
'num_mpiprocs_per_machine or tot_num_mpiprocs must be specified'
)
else:
# To avoid divisions by zero
if num_mpiprocs_per_machine <= 0:
Expand All @@ -196,8 +202,10 @@ def __init__(self, **kwargs):

if num_mpiprocs_per_machine is None:
if tot_num_mpiprocs is None:
raise TypeError('At least two among num_machines, '
'num_mpiprocs_per_machine or tot_num_mpiprocs must be specified')
raise TypeError(
'At least two among num_machines, '
'num_mpiprocs_per_machine or tot_num_mpiprocs must be specified'
)
else:
# To avoid divisions by zero
if num_machines <= 0:
Expand All @@ -209,10 +217,12 @@ def __init__(self, **kwargs):

if tot_num_mpiprocs is not None:
if tot_num_mpiprocs != self.num_mpiprocs_per_machine * self.num_machines:
raise ValueError('tot_num_mpiprocs must be equal to '
'num_mpiprocs_per_machine * num_machines, and in particular it '
'should be a multiple of num_mpiprocs_per_machine and/or '
'num_machines')
raise ValueError(
'tot_num_mpiprocs must be equal to '
'num_mpiprocs_per_machine * num_machines, and in particular it '
'should be a multiple of num_mpiprocs_per_machine and/or '
'num_machines'
)

if self.num_mpiprocs_per_machine <= 0:
raise ValueError('num_mpiprocs_per_machine must be >= 1')
Expand Down Expand Up @@ -262,8 +272,10 @@ def __init__(self, **kwargs):

default_mpiprocs_per_machine = kwargs.pop('default_mpiprocs_per_machine', None)
if default_mpiprocs_per_machine is not None:
raise ConfigurationError('default_mpiprocs_per_machine cannot be set '
'for schedulers that use ParEnvJobResource')
raise ConfigurationError(
'default_mpiprocs_per_machine cannot be set '
'for schedulers that use ParEnvJobResource'
)

if self.tot_num_mpiprocs <= 0:
raise ValueError('tot_num_mpiprocs must be >= 1')
Expand All @@ -283,7 +295,7 @@ def accepts_default_mpiprocs_per_machine(cls):
return False


class JobTemplate(DefaultFieldsAttributeDict):
class JobTemplate(DefaultFieldsAttributeDict): # pylint: disable=too-many-instance-attributes
"""
A template for submitting jobs. This contains all required information
to create the job header.
Expand Down Expand Up @@ -374,9 +386,6 @@ class JobTemplate(DefaultFieldsAttributeDict):
Values are given by aiida.common.datastructures.CodeRunMode.
"""

# #TODO: validation key? also call the validate function in the proper
# place then.

_default_fields = (
'shebang',
'submit_as_hold',
Expand Down Expand Up @@ -424,7 +433,7 @@ class MachineInfo(DefaultFieldsAttributeDict):
)


class JobInfo(DefaultFieldsAttributeDict):
class JobInfo(DefaultFieldsAttributeDict): # pylint: disable=too-many-instance-attributes
"""
Contains properties for a job in the queue.
Most of the fields are taken from DRMAA v.2.
Expand Down Expand Up @@ -475,10 +484,12 @@ class JobInfo(DefaultFieldsAttributeDict):
'finished' state, of type datetime.datetime
"""

_default_fields = ('job_id', 'title', 'exit_status', 'terminating_signal', 'annotation', 'job_state',
'job_substate', 'allocated_machines', 'job_owner', 'num_mpiprocs', 'num_cpus', 'num_machines',
'queue_name', 'account', 'qos', 'wallclock_time_seconds', 'requested_wallclock_time_seconds', 'cpu_time',
'submission_time', 'dispatch_time', 'finish_time')
_default_fields = (
'job_id', 'title', 'exit_status', 'terminating_signal', 'annotation', 'job_state', 'job_substate',
'allocated_machines', 'job_owner', 'num_mpiprocs', 'num_cpus', 'num_machines', 'queue_name', 'account', 'qos',
'wallclock_time_seconds', 'requested_wallclock_time_seconds', 'cpu_time', 'submission_time', 'dispatch_time',
'finish_time'
)

# If some fields require special serializers, specify them here.
# You then need to define also the respective _serialize_FIELDTYPE and
Expand Down Expand Up @@ -522,7 +533,6 @@ def _serialize_date(value):

# is_naive check from django.utils.timezone
if value.tzinfo is None or value.tzinfo.utcoffset(value) is None:
# TODO: FIX TIMEZONE
SCHEDULER_LOGGER.debug('Datetime to serialize in JobInfo is naive, this should be fixed!')
# v = v.replace(tzinfo = pytz.utc)
return {'date': value.strftime('%Y-%m-%dT%H:%M:%S.%f'), 'timezone': None}
Expand All @@ -545,14 +555,15 @@ def _deserialize_date(value):
if value['timezone'] is None:
# naive date
return datetime.datetime.strptime(value['date'], '%Y-%m-%dT%H:%M:%S.%f')
elif value['timezone'] == 'UTC':
if value['timezone'] == 'UTC':
return datetime.datetime.strptime(value['date'], '%Y-%m-%dT%H:%M:%S.%f').replace(tzinfo=pytz.utc)

# Try your best
return datetime.datetime.strptime(value['date'],
'%Y-%m-%dT%H:%M:%S.%f').replace(tzinfo=pytz.timezone(value['timezone']))

def serialize_field(self, value, field_type):
@classmethod
def serialize_field(cls, value, field_type):
"""
Serialise a particular field value
Expand All @@ -564,11 +575,12 @@ def serialize_field(self, value, field_type):
if field_type is None:
return value

serializer_method = getattr(self, '_serialize_{}'.format(field_type))
serializer_method = getattr(cls, '_serialize_{}'.format(field_type))

return serializer_method(value)

def deserialize_field(self, value, field_type):
@classmethod
def deserialize_field(cls, value, field_type):
"""
Deserialise the value of a particular field with a type
:param value: The value
Expand All @@ -578,30 +590,47 @@ def deserialize_field(self, value, field_type):
if field_type is None:
return value

deserializer_method = getattr(self, '_deserialize_{}'.format(field_type))
deserializer_method = getattr(cls, '_deserialize_{}'.format(field_type))

return deserializer_method(value)

def serialize(self):
"""
Serialise the current data
:return: A serialised representation of the current data
Serialize the current data (as obtained by ``self.get_dict()``) into a JSON string.
:return: A string with serialised representation of the current data.
"""
from aiida.common import json

ser_data = {k: self.serialize_field(v, self._special_serializers.get(k, None)) for k, v in self.items()}
return json.dumps(self.get_dict())

def get_dict(self):
"""
Serialise the current data into a dictionary that is JSON-serializable.
:return: A dictionary
"""
return {k: self.serialize_field(v, self._special_serializers.get(k, None)) for k, v in self.items()}

return json.dumps(ser_data)
@classmethod
def load_from_dict(cls, data):
"""
Create a new instance loading the values from serialised data in dictionary form
def load_from_serialized(self, data):
:param data: The dictionary with the data to load from
"""
Load value from serialised data
:param data: The data to load from
:return: The value after loading
instance = cls()
for key, value in data.items():
instance[key] = cls.deserialize_field(value, cls._special_serializers.get(key, None))
return instance

@classmethod
def load_from_serialized(cls, data):
"""
from aiida.common import json
Create a new instance loading the values from JSON-serialised data as a string
deser_data = json.loads(data)
:param data: The string with the JSON-serialised data to load from
"""
from aiida.common import json

for key, value in deser_data.items():
self[key] = self.deserialize_field(value, self._special_serializers.get(key, None))
return cls.load_from_dict(json.loads(data))
6 changes: 3 additions & 3 deletions aiida/schedulers/plugins/pbsbaseclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,10 @@ def _parse_joblist_output(self, retval, stdout, stderr):
node.name, data = exec_host.split('/')
data = data.split('*')
if len(data) == 1:
node.jobIndex = int(data[0])
node.jobIndex = int(data[0]) # pylint: disable=invalid-name
node.num_cpus = 1
elif len(data) == 2:
node.jobIndex = int(data[0])
node.jobIndex = int(data[0]) # pylint: disable=invalid-name
node.num_cpus = int(data[1])
else:
raise ValueError('Wrong number of pieces: {} '
Expand Down Expand Up @@ -581,7 +581,7 @@ def _parse_joblist_output(self, retval, stdout, stderr):
_LOGGER.debug("No 'queue' field for job id {}".format(this_job.job_id))

try:
this_job.RequestedWallclockTime = (self._convert_time(raw_data['resource_list.walltime']))
this_job.RequestedWallclockTime = (self._convert_time(raw_data['resource_list.walltime'])) # pylint: disable=invalid-name
except KeyError:
_LOGGER.debug("No 'resource_list.walltime' field for job id {}".format(this_job.job_id))
except ValueError:
Expand Down
2 changes: 1 addition & 1 deletion aiida/schedulers/plugins/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ def _parse_joblist_output(self, retval, stdout, stderr):
this_job.queue_name = thisjob_dict['partition']

try:
this_job.requested_wallclock_time_seconds = (self._convert_time(thisjob_dict['time_limit']))
this_job.requested_wallclock_time_seconds = (self._convert_time(thisjob_dict['time_limit'])) # pylint: disable=invalid-name
except ValueError:
self.logger.warning('Error parsing the time limit for job id {}'.format(this_job.job_id))

Expand Down
34 changes: 34 additions & 0 deletions aiida/schedulers/test_datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,37 @@ def test_init(self):

with self.assertRaises(ValueError):
_ = NodeNumberJobResource(num_mpiprocs_per_machine=8, tot_num_mpiprocs=15)

def test_serialization(self):
"""Test the serialization/deserialization of JobInfo classes."""
from aiida.schedulers.datastructures import JobInfo, JobState
from datetime import datetime

dict_serialized_content = {
'job_id': '12723',
'title': 'some title',
'queue_name': 'some_queue',
'account': 'my_account'
}

to_serialize = {'job_state': (JobState.QUEUED, 'job_state'), 'submission_time': (datetime.now(), 'date')}

job_info = JobInfo()
for key, val in dict_serialized_content.items():
setattr(job_info, key, val)

for key, (val, field_type) in to_serialize.items():
setattr(job_info, key, val)
# Also append to the dictionary for easier comparison later
dict_serialized_content[key] = JobInfo.serialize_field(value=val, field_type=field_type)

self.assertEqual(job_info.get_dict(), dict_serialized_content)
# Full loop via JSON, moving data from job_info to job_info2;
# we check that the content is fully preserved
job_info2 = JobInfo.load_from_serialized(job_info.serialize())
self.assertEqual(job_info2.get_dict(), dict_serialized_content)

# Check that fields are properly re-serialized with the correct type
self.assertEqual(job_info2.job_state, to_serialize['job_state'][0])
# Check that fields are properly re-serialized with the correct type
self.assertEqual(job_info2.submission_time, to_serialize['submission_time'][0])
3 changes: 1 addition & 2 deletions docs/source/import_export/main.rst
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,8 @@ A sample of the ``data.json`` file follows:
"_scheduler-stdout.txt",
"_scheduler-stderr.txt"
],
"last_jobinfo": {
"last_job_info": {
"job_state": "DONE",
"detailedJobinfo": "AiiDA MESSAGE: This scheduler does not implement the routine get_detailed_jobinfo to retrieve the information on a job after it has finished.",
"job_id": "13489"
},
"scheduler_state": "DONE",
Expand Down

0 comments on commit 9092b93

Please sign in to comment.