Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix problem with prematurely erased sandbox folder retrieved_temporary_folder #1168

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis-data/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def create_calculation(code, counter, inputval, use_cache=False):
calc.set_max_wallclock_seconds(5 * 60) # 5 min
calc.set_resources({"num_machines": 1})
calc.set_withmpi(False)
calc.set_parser_name('simpleplugins.templatereplacer.test.doubler')
calc.set_parser_name('simpleplugins.templatereplacer.doubler')

calc.use_parameters(parameters)
calc.use_template(template)
Expand Down
92 changes: 42 additions & 50 deletions aiida/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
the routines make reference to the suitable plugins for all
plugin-specific operations.
"""
import os
from backports import tempfile

from aiida.backends.utils import get_authinfo
from aiida.common.datastructures import calc_states
from aiida.scheduler.datastructures import job_states
Expand All @@ -28,13 +31,9 @@
from aiida.orm import DataFactory
from aiida.orm.data.folder import FolderData
from aiida.common.log import get_dblogger_extra
import os


# Until we fix the broken daemon logger https://github.com/aiidateam/aiida_core/issues/943
# execlogger = aiidalogger.getChild('execmanager')
import logging
execlogger = logging.getLogger('execmanager')
execlogger = aiidalogger.getChild('execmanager')


def update_running_calcs_status(authinfo):
Expand Down Expand Up @@ -694,42 +693,44 @@ def retrieve_and_parse(calc, transport=None):
logger_extra = get_dblogger_extra(calc)
transport._set_logger_extra(logger_extra)

try:
retrieved_temporary_folder = retrieve_all(calc, transport, logger_extra)
except Exception:
import traceback
with tempfile.TemporaryDirectory() as retrieved_temporary_folder:

tb = traceback.format_exc()
newextradict = logger_extra.copy()
newextradict['full_traceback'] = tb
try:
retrieve_all(calc, transport, retrieved_temporary_folder, logger_extra)
except Exception:
import traceback

execlogger.error(
"Error retrieving calc {}. Traceback: {}".format(calc.pk, tb),
extra=newextradict
)
_set_state_noraise(calc, calc_states.RETRIEVALFAILED)
return False
tb = traceback.format_exc()
newextradict = logger_extra.copy()
newextradict['full_traceback'] = tb

try:
parse_results(calc, retrieved_temporary_folder, logger_extra)
except Exception:
import traceback
execlogger.error(
"Error retrieving calc {}. Traceback: {}".format(calc.pk, tb),
extra=newextradict
)
_set_state_noraise(calc, calc_states.RETRIEVALFAILED)
return False

tb = traceback.format_exc()
newextradict = logger_extra.copy()
newextradict['full_traceback'] = tb
execlogger.error(
"Error parsing calc {}. Traceback: {}".format(calc.pk, tb),
extra=newextradict
)
# TODO: add a 'comment' to the calculation
_set_state_noraise(calc, calc_states.PARSINGFAILED)
return False
try:
parse_results(calc, retrieved_temporary_folder, logger_extra)
except Exception:
import traceback

tb = traceback.format_exc()
newextradict = logger_extra.copy()
newextradict['full_traceback'] = tb
execlogger.error(
"Error parsing calc {}. Traceback: {}".format(calc.pk, tb),
extra=newextradict
)
# TODO: add a 'comment' to the calculation
_set_state_noraise(calc, calc_states.PARSINGFAILED)
return False

return True


def retrieve_all(job, transport, logger_extra=None):
def retrieve_all(job, transport, retrieved_temporary_folder, logger_extra=None):
try:
job._set_state(calc_states.RETRIEVING)
except ModificationNotAllowed:
Expand Down Expand Up @@ -764,28 +765,23 @@ def retrieve_all(job, transport, logger_extra=None):
retrieve_singlefile_list = job._get_retrieve_singlefile_list()

with SandboxFolder() as folder:
retrieve_files_from_list(job, transport, folder, retrieve_list)
retrieve_files_from_list(job, transport, folder.abspath, retrieve_list)
# Here I retrieved everything; now I store them inside the calculation
retrieved_files.replace_with_folder(folder.abspath, overwrite=True)

# Second, retrieve the singlefiles
with SandboxFolder() as folder:
_retrieve_singlefiles(job, transport, folder, retrieve_singlefile_list, logger_extra)

# Retrieve the temporary files in a separate temporary folder if any files were
# Retrieve the temporary files in the retrieved_temporary_folder if any files were
# specified in the 'retrieve_temporary_list' key
if retrieve_temporary_list:
retrieved_temporary_folder = FolderData()
with SandboxFolder() as folder:
retrieve_files_from_list(job, transport, folder, retrieve_temporary_list)
retrieved_temporary_folder.replace_with_folder(folder.abspath, overwrite=True)
retrieve_files_from_list(job, transport, retrieved_temporary_folder, retrieve_temporary_list)

# Log the files that were retrieved in the temporary folder
for entry in retrieved_temporary_folder.get_folder_list():
for filename in os.listdir(retrieved_temporary_folder):
execlogger.debug("[retrieval of calc {}] Retrieved temporary file or folder '{}'".format(
job.pk, entry), extra=logger_extra)
else:
retrieved_temporary_folder = None
job.pk, filename), extra=logger_extra)

# Store everything
execlogger.debug(
Expand All @@ -794,8 +790,6 @@ def retrieve_all(job, transport, logger_extra=None):
extra=logger_extra)
retrieved_files.store()

return retrieved_temporary_folder


def parse_results(job, retrieved_temporary_folder=None, logger_extra=None):
job._set_state(calc_states.PARSING)
Expand Down Expand Up @@ -957,11 +951,9 @@ def retrieve_files_from_list(calculation, transport, folder, retrieve_list):
upto what level of the original remotepath nesting the files will be copied.

:param transport: the Transport instance
:param folder: a local Folder instance for the transport to store files into
:param folder: an absolute path to a folder to copy files in
:param retrieve_list: the list of files to retrieve
"""
import os

for item in retrieve_list:
if isinstance(item, list):
tmp_rname, tmp_lname, depth = item
Expand All @@ -979,7 +971,7 @@ def retrieve_files_from_list(calculation, transport, folder, retrieve_list):
if depth > 1: # create directories in the folder, if needed
for this_local_file in local_names:
new_folder = os.path.join(
folder.abspath,
folder,
os.path.split(this_local_file)[0])
if not os.path.exists(new_folder):
os.makedirs(new_folder)
Expand All @@ -993,4 +985,4 @@ def retrieve_files_from_list(calculation, transport, folder, retrieve_list):

for rem, loc in zip(remote_names, local_names):
transport.logger.debug("[retrieval of calc {}] Trying to retrieve remote item '{}'".format(calculation.pk, rem))
transport.get(rem, os.path.join(folder.abspath, loc), ignore_nonexisting=True)
transport.get(rem, os.path.join(folder, loc), ignore_nonexisting=True)
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def parse_with_retrieved(self, retrieved):

:param retrieved: a dictionary of retrieved nodes
"""
import os

output_nodes = []

try:
Expand Down Expand Up @@ -65,18 +67,25 @@ def parse_with_retrieved(self, retrieved):
# FolderData node in the 'retrieved' arguments
if retrieve_temporary_files is not None:
try:
temporary_folder = retrieved[self.retrieved_temporary_folder_key]
retrieved_temporary_folder = retrieved[self.retrieved_temporary_folder_key]
except KeyError:
self.logger.error('the {} was not passed as an argument'.format(self.retrieved_temporary_folder_key))
return False, ()

for retrieved_file in retrieve_temporary_files:
if retrieved_file not in temporary_folder.get_folder_list():
self.logger.error('the file {} was not found in the temporary retrieved folder'.format(retrieved_file))

file_path = os.path.join(retrieved_temporary_folder, retrieved_file)

if not os.path.isfile(file_path):
self.logger.error('the file {} was not found in the temporary retrieved folder {}'
.format(retrieved_file, retrieved_temporary_folder))
return False, ()

with open(file_path, 'r') as handle:
parsed_value = handle.read().strip()

# We always strip the content of the file from whitespace to simplify testing for expected output
output_dict['retrieved_temporary_files'].append((retrieved_file, temporary_folder.get_file_content(retrieved_file).strip()))
output_dict['retrieved_temporary_files'].append((retrieved_file, parsed_value))

output_parameters = ParameterData(dict=output_dict)
output_nodes.append((self.get_linkname_outparams(), output_parameters))
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
],
'aiida.cmdline': [],
'aiida.parsers': [
'simpleplugins.templatereplacer.test.doubler = aiida.parsers.simpleplugins.templatereplacer.test:TemplatereplacerDoublerParser',
'simpleplugins.templatereplacer.doubler = aiida.parsers.simpleplugins.templatereplacer.doubler:TemplatereplacerDoublerParser',
],
'aiida.schedulers': [
'direct = aiida.scheduler.plugins.direct:DirectScheduler',
Expand Down