Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert PR #4416 #4519

Merged
merged 1 commit into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 14 additions & 23 deletions aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

REMOTE_WORK_DIRECTORY_LOST_FOUND = 'lost+found'

EXEC_LOGGER = AIIDA_LOGGER.getChild('execmanager')
execlogger = AIIDA_LOGGER.getChild('execmanager')


def upload_calculation(node, transport, calc_info, folder, inputs=None, dry_run=False):
Expand All @@ -47,7 +47,7 @@ def upload_calculation(node, transport, calc_info, folder, inputs=None, dry_run=
# chance to perform the state transition. Upon reloading this calculation, it will re-attempt the upload.
link_label = 'remote_folder'
if node.get_outgoing(RemoteData, link_label_filter=link_label).first():
EXEC_LOGGER.warning(f'CalcJobNode<{node.pk}> already has a `{link_label}` output: skipping upload')
execlogger.warning(f'CalcJobNode<{node.pk}> already has a `{link_label}` output: skipping upload')
return calc_info

computer = node.computer
Expand All @@ -57,7 +57,7 @@ def upload_calculation(node, transport, calc_info, folder, inputs=None, dry_run=

logger_extra = get_dblogger_extra(node)
transport.set_logger_extra(logger_extra)
logger = LoggerAdapter(logger=EXEC_LOGGER, extra=logger_extra)
logger = LoggerAdapter(logger=execlogger, extra=logger_extra)

if not dry_run and node.has_cached_links():
raise ValueError(
Expand Down Expand Up @@ -159,16 +159,6 @@ def upload_calculation(node, transport, calc_info, folder, inputs=None, dry_run=
remote_symlink_list = calc_info.remote_symlink_list or []
provenance_exclude_list = calc_info.provenance_exclude_list or []

# First creates the directory structure locally before copying the sandbox folder, so that all the intermediate
# folders for the files in the copy_lists are there before calling the copy methods of the transport (or else
# these will fail).
# Alternatively, one would have to call the path creation methods of the transports just before calling the
# copy methods to make sure each path is there, unnecessarily duplicating the number of connections requested.
for _, _, target_relpath in local_copy_list + remote_copy_list + remote_symlink_list:
dirname = os.path.dirname(target_relpath)
if dirname:
os.makedirs(os.path.join(folder.abspath, dirname), exist_ok=True)

for uuid, filename, target in local_copy_list:
logger.debug(f'[submission of calculation {node.uuid}] copying local file/folder to {target}')

Expand Down Expand Up @@ -200,6 +190,9 @@ def find_data_node(inputs, uuid):
if data_node is None:
logger.warning(f'failed to load Node<{uuid}> specified in the `local_copy_list`')
else:
dirname = os.path.dirname(target)
if dirname:
os.makedirs(os.path.join(folder.abspath, dirname), exist_ok=True)
with folder.open(target, 'wb') as handle:
with data_node.open(filename, 'rb') as source:
shutil.copyfileobj(source, handle)
Expand Down Expand Up @@ -343,15 +336,15 @@ def retrieve_calculation(calculation, transport, retrieved_temporary_folder):
logger_extra = get_dblogger_extra(calculation)
workdir = calculation.get_remote_workdir()

EXEC_LOGGER.debug(f'Retrieving calc {calculation.pk}', extra=logger_extra)
EXEC_LOGGER.debug(f'[retrieval of calc {calculation.pk}] chdir {workdir}', extra=logger_extra)
execlogger.debug(f'Retrieving calc {calculation.pk}', extra=logger_extra)
execlogger.debug(f'[retrieval of calc {calculation.pk}] chdir {workdir}', extra=logger_extra)

# If the calculation already has a `retrieved` folder, simply return. The retrieval was apparently already completed
# before, which can happen if the daemon is restarted and it shuts down after retrieving but before getting the
# chance to perform the state transition. Upon reloading this calculation, it will re-attempt the retrieval.
link_label = calculation.link_label_retrieved
if calculation.get_outgoing(FolderData, link_label_filter=link_label).first():
EXEC_LOGGER.warning(
execlogger.warning(
f'CalcJobNode<{calculation.pk}> already has a `{link_label}` output folder: skipping retrieval'
)
return
Expand Down Expand Up @@ -384,13 +377,13 @@ def retrieve_calculation(calculation, transport, retrieved_temporary_folder):

# Log the files that were retrieved in the temporary folder
for filename in os.listdir(retrieved_temporary_folder):
EXEC_LOGGER.debug(
execlogger.debug(
f"[retrieval of calc {calculation.pk}] Retrieved temporary file or folder '{filename}'",
extra=logger_extra
)

# Store everything
EXEC_LOGGER.debug(
execlogger.debug(
f'[retrieval of calc {calculation.pk}] Storing retrieved_files={retrieved_files.pk}', extra=logger_extra
)
retrieved_files.store()
Expand Down Expand Up @@ -427,9 +420,7 @@ def kill_calculation(calculation, transport):
if job is not None and job.job_state != JobState.DONE:
raise exceptions.RemoteOperationError(f'scheduler.kill({job_id}) was unsuccessful')
else:
EXEC_LOGGER.warning(
'scheduler.kill() failed but job<{%s}> no longer seems to be running regardless', job_id
)
execlogger.warning('scheduler.kill() failed but job<{%s}> no longer seems to be running regardless', job_id)

return True

Expand All @@ -438,7 +429,7 @@ def _retrieve_singlefiles(job, transport, folder, retrieve_file_list, logger_ext
"""Retrieve files specified through the singlefile list mechanism."""
singlefile_list = []
for (linkname, subclassname, filename) in retrieve_file_list:
EXEC_LOGGER.debug(
execlogger.debug(
'[retrieval of calc {}] Trying '
"to retrieve remote singlefile '{}'".format(job.pk, filename),
extra=logger_extra
Expand All @@ -459,7 +450,7 @@ def _retrieve_singlefiles(job, transport, folder, retrieve_file_list, logger_ext
singlefiles.append(singlefile)

for fil in singlefiles:
EXEC_LOGGER.debug(f'[retrieval of calc {job.pk}] Storing retrieved_singlefile={fil.pk}', extra=logger_extra)
execlogger.debug(f'[retrieval of calc {job.pk}] Storing retrieved_singlefile={fil.pk}', extra=logger_extra)
fil.store()


Expand Down
75 changes: 1 addition & 74 deletions tests/engine/daemon/test_execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@
"""Tests for the :mod:`aiida.engine.daemon.execmanager` module."""
import io
import os
import pathlib
import pytest

from aiida import orm
from aiida.engine.daemon import execmanager
from aiida.common.datastructures import CalcInfo, CodeInfo
from aiida.transports.plugins.local import LocalTransport


Expand Down Expand Up @@ -66,6 +63,7 @@ def test_upload_local_copy_list(fixture_sandbox, aiida_localhost, aiida_local_co

Specifically, verify that files in the ``local_copy_list`` do not end up in the repository of the node.
"""
from aiida.common.datastructures import CalcInfo, CodeInfo
from aiida.orm import CalcJobNode, SinglefileData

inputs = {
Expand All @@ -92,74 +90,3 @@ def test_upload_local_copy_list(fixture_sandbox, aiida_localhost, aiida_local_co
execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)

assert node.list_object_names() == []


def test_upload_calculation(aiida_localhost, aiida_local_code_factory, fixture_sandbox, tmp_path):
"""Test the `upload_calculation` function, and specifically the copy lists."""

# FolderData: needs to be stored when is checked by upload_calculation
folder_node = orm.FolderData()
folder_node.put_object_from_filelike(io.StringIO('dummy_content_1'), 'file_1.txt')
folder_node.store()

# RemoteData: upload_calculation will not check if it is stored (should it?)
remote_node = orm.RemoteData(computer=aiida_localhost, remote_path=str(tmp_path))

source_path2 = str(tmp_path / 'file_2.txt')
with open(source_path2, 'w') as handle:
handle.write('dummy_content_2')
handle.flush()

source_path3 = str(tmp_path / 'file_3.txt')
with open(source_path3, 'w') as handle:
handle.write('dummy_content_3')
handle.flush()

# CodeInfo: needs to be set up as normal for any CalcJob
code_node = aiida_local_code_factory('arithmetic.add', '/bin/bash')
code_info = CodeInfo()
code_info.code_uuid = code_node.uuid

# CalcInfo: besides normal setups, we also need to manually set calc_info.uuid
# as we are skipping the step of the engine were this happens.
calc_node = orm.CalcJobNode(computer=aiida_localhost).store()
calc_info = CalcInfo()
calc_info.uuid = calc_node.uuid
calc_info.codes_info = [code_info]
calc_info.local_copy_list = [(folder_node.uuid, 'file_1.txt', 'local_file/file_1.txt')]
calc_info.remote_copy_list = [(remote_node.computer.uuid, source_path2, 'remote_file/file_2.txt')]
calc_info.remote_symlink_list = [(remote_node.computer.uuid, source_path3, 'symlink_file/file_3.sym')]

# We need to manually open a transport and pass it to upload_calculation, together
# with a pre-set up sandbox folder (in this case it can be empty since we are mostly
# checking through the copy lists)
with LocalTransport() as transport:
execmanager.upload_calculation(calc_node, transport, calc_info, fixture_sandbox)

calc_folder_path = pathlib.Path(calc_node.get_remote_workdir())

# Although in principle not necessary, the checks are performed increasingly from the
# existence of folders to content of contained files so it will be easier to identify
# which part of the copying is broken.
list_of_folders = [
calc_folder_path / 'local_file',
calc_folder_path / 'remote_file',
calc_folder_path / 'symlink_file',
]
assert sorted(list(calc_folder_path.iterdir())) == sorted(list_of_folders)

assert list((calc_folder_path / 'local_file').iterdir()) == [calc_folder_path / 'local_file/file_1.txt']
assert list((calc_folder_path / 'remote_file').iterdir()) == [calc_folder_path / 'remote_file/file_2.txt']
assert list((calc_folder_path / 'symlink_file').iterdir()) == [calc_folder_path / 'symlink_file/file_3.sym']

full_file_path = str(calc_folder_path / 'local_file/file_1.txt')
with open(full_file_path, 'r') as handle:
assert handle.read() == 'dummy_content_1'

full_file_path = str(calc_folder_path / 'remote_file/file_2.txt')
with open(full_file_path, 'r') as handle:
assert handle.read() == 'dummy_content_2'

full_file_path = str(calc_folder_path / 'symlink_file/file_3.sym')
with open(full_file_path, 'r') as handle:
assert handle.read() == 'dummy_content_3'