Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redefine the structure of the CalcInfo.local_copy_list #2581

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aiida/calculations/plugins/templatereplacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def prepare_for_submission(self, folder):
raise exceptions.InputValidationError("You are asking to copy a file link {}, "
"but there is no input link with such a name".format(link_name))
if isinstance(fileobj, orm.SinglefileData):
local_copy_list.append((fileobj.get_file_abs_path(), dest_rel_path))
local_copy_list.append((fileobj.uuid, fileobj.filename, dest_rel_path))
elif isinstance(fileobj, orm.RemoteData): # can be a folder
remote_copy_list.append((fileobj.computer.uuid, fileobj.get_remote_path(), dest_rel_path))
else:
Expand Down
2 changes: 1 addition & 1 deletion aiida/common/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class CalcInfo(DefaultFieldsAttributeDict):
('linkname_from calc to singlefile', 'subclass of singlefile', 'filename')
Each tuple represents a file that will be retrieved from cluster and saved in SinglefileData nodes

* local_copy_list: a list of tuples with format ('localabspath', 'relativedestpath')
* local_copy_list: a list of tuples with format ('node_uuid', 'filename', relativedestpath')
* remote_copy_list: a list of tuples with format ('remotemachinename', 'remoteabspath', 'relativedestpath')
* remote_symlink_list: a list of tuples with format ('remotemachinename', 'remoteabspath', 'relativedestpath')
* codes_info: a list of dictionaries used to pass the info of the execution of a code
Expand Down
118 changes: 54 additions & 64 deletions aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,58 +34,55 @@
execlogger = AIIDA_LOGGER.getChild('execmanager')


def upload_calculation(calculation, transport, calc_info, script_filename):
"""
Upload a calculation
def upload_calculation(node, transport, calc_info, script_filename):
"""Upload a `CalcJob` instance

:param calculation: the instance of CalcJobNode to submit.
:param node: the `CalcJobNode`.
:param transport: an already opened transport to use to submit the calculation.
:param calc_info: the calculation info datastructure returned by `CalcJobNode._presubmit`
:param script_filename: the job launch script returned by `CalcJobNode._presubmit`
:param calc_info: the calculation info datastructure returned by `CalcJobNode.presubmit`
:param script_filename: the job launch script returned by `CalcJobNode.presubmit`
"""
from logging import LoggerAdapter
from tempfile import NamedTemporaryFile
from aiida.orm import load_node, Code, RemoteData

computer = calculation.computer
computer = node.computer

if not computer.is_enabled():
return

codes_info = calc_info.codes_info
input_codes = [load_node(_.code_uuid, sub_classes=(Code,)) for _ in codes_info]

logger_extra = get_dblogger_extra(calculation)
logger_extra = get_dblogger_extra(node)
transport.set_logger_extra(logger_extra)
logger = LoggerAdapter(logger=execlogger, extra=logger_extra)

if calculation.has_cached_links():
raise ValueError("Cannot submit calculation {} because it has "
"cached input links! If you "
"just want to test the submission, use the "
"test_submit() method, otherwise store all links"
"first".format(calculation.pk))
if node.has_cached_links():
raise ValueError("Cannot submit calculation {} because it has cached input links! If you just want to test the "
"submission, use the test_submit() method, otherwise store all links first".format(node.pk))

folder = calculation._raw_input_folder
folder = node._raw_input_folder

# NOTE: some logic is partially replicated in the 'test_submit'
# method of CalcJobNode. If major logic changes are done
# here, make sure to update also the test_submit routine
remote_user = transport.whoami()
# TODO Doc: {username} field
# TODO: if something is changed here, fix also 'verdi computer test'
remote_working_directory = computer.get_workdir().format(
username=remote_user)
remote_working_directory = computer.get_workdir().format(username=remote_user)
if not remote_working_directory.strip():
raise exceptions.ConfigurationError(
"[submission of calculation {}] "
"No remote_working_directory configured for computer "
"'{}'".format(calculation.pk, computer.name))
"[submission of calculation {}] No remote_working_directory configured for computer '{}'".format(
node.pk, computer.name))

# If it already exists, no exception is raised
try:
transport.chdir(remote_working_directory)
except IOError:
execlogger.debug(
logger.debug(
"[submission of calculation {}] Unable to chdir in {}, trying to create it".format(
calculation.pk, remote_working_directory), extra=logger_extra)
node.pk, remote_working_directory))
try:
transport.makedirs(remote_working_directory)
transport.chdir(remote_working_directory)
Expand All @@ -94,7 +91,7 @@ def upload_calculation(calculation, transport, calc_info, script_filename):
"[submission of calculation {}] "
"Unable to create the remote directory {} on "
"computer '{}': {}".format(
calculation.pk, remote_working_directory, computer.name, exc))
node.pk, remote_working_directory, computer.name, exc))
# Store remotely with sharding (here is where we choose
# the folder structure of remote jobs; then I store this
# in the calculation properties using _set_remote_dir
Expand All @@ -116,7 +113,7 @@ def upload_calculation(calculation, transport, calc_info, script_filename):
path_existing = os.path.join(transport.getcwd(), calc_info.uuid[4:])
path_lost_found = os.path.join(remote_working_directory, REMOTE_WORK_DIRECTORY_LOST_FOUND)
path_target = os.path.join(path_lost_found, calc_info.uuid)
execlogger.warning('tried to create path {} but it already exists, moving the entire folder to {}'.format(
logger.warning('tried to create path {} but it already exists, moving the entire folder to {}'.format(
path_existing, path_target))

# Make sure the lost+found directory exists, then copy the existing folder there and delete the original
Expand All @@ -131,7 +128,7 @@ def upload_calculation(calculation, transport, calc_info, script_filename):

# I store the workdir of the calculation for later file retrieval
workdir = transport.getcwd()
calculation.set_remote_workdir(workdir)
node.set_remote_workdir(workdir)

# I first create the code files, so that the code can put
# default files to be overwritten by the plugin itself.
Expand All @@ -146,74 +143,67 @@ def upload_calculation(calculation, transport, calc_info, script_filename):

# copy all files, recursively with folders
for f in folder.get_content_list():
execlogger.debug("[submission of calculation {}] "
"copying file/folder {}...".format(calculation.pk, f),
extra=logger_extra)
logger.debug("[submission of calculation {}] copying file/folder {}...".format(node.pk, f))
transport.put(folder.get_abs_path(f), f)

# local_copy_list is a list of tuples,
# each with (src_abs_path, dest_rel_path)
# NOTE: validation of these lists are done
# inside calculation._presubmit()
# each with (uuid, dest_rel_path)
# NOTE: validation of these lists are done inside calculation.presubmit()
local_copy_list = calc_info.local_copy_list
remote_copy_list = calc_info.remote_copy_list
remote_symlink_list = calc_info.remote_symlink_list

if local_copy_list is not None:
for src_abs_path, dest_rel_path in local_copy_list:
execlogger.debug("[submission of calculation {}] "
"copying local file/folder to {}".format(
calculation.pk, dest_rel_path),
extra=logger_extra)
transport.put(src_abs_path, dest_rel_path)
for uuid, filename, target in local_copy_list:
logger.debug("[submission of calculation {}] copying local file/folder to {}".format(node.pk, target))

try:
data_node = load_node(uuid=uuid)
except exceptions.NotExistent:
logger.warning('failed to load Node<{}> specified in the `local_copy_list`'.format(uuid))

# Note, once #2579 is implemented, use the `node.open` method instead of the named temporary file in
# combination with the new `Transport.put_object_from_filelike`
with NamedTemporaryFile(mode='w+') as handle:
handle.write(data_node.get_object_content(filename))
handle.flush()
transport.put(handle.name, target)

if remote_copy_list is not None:
for (remote_computer_uuid, remote_abs_path,
dest_rel_path) in remote_copy_list:
for (remote_computer_uuid, remote_abs_path, dest_rel_path) in remote_copy_list:
if remote_computer_uuid == computer.uuid:
execlogger.debug("[submission of calculation {}] "
"copying {} remotely, directly on the machine "
"{}".format(calculation.pk, dest_rel_path, computer.name))
logger.debug("[submission of calculation {}] copying {} remotely, directly on the machine {}".format(
node.pk, dest_rel_path, computer.name))
try:
transport.copy(remote_abs_path, dest_rel_path)
except (IOError, OSError):
execlogger.warning("[submission of calculation {}] "
"Unable to copy remote resource from {} to {}! "
"Stopping.".format(calculation.pk,
remote_abs_path, dest_rel_path),
extra=logger_extra)
logger.warning("[submission of calculation {}] Unable to copy remote resource from {} to {}! "
"Stopping.".format(node.pk, remote_abs_path, dest_rel_path))
raise
else:
# TODO: implement copy between two different
# machines!
# TODO: implement copy between two different machines!
raise NotImplementedError(
"[presubmission of calculation {}] "
"Remote copy between two different machines is "
"not implemented yet".format(calculation.pk))
"[submission of calculation {}] Remote copy between two different machines is "
"not implemented yet".format(node.pk))

if remote_symlink_list is not None:
for (remote_computer_uuid, remote_abs_path,
dest_rel_path) in remote_symlink_list:
if remote_computer_uuid == computer.uuid:
execlogger.debug("[submission of calculation {}] "
"copying {} remotely, directly on the machine "
"{}".format(calculation.pk, dest_rel_path, computer.name))
logger.debug("[submission of calculation {}] copying {} remotely, directly on the machine {}".format(
node.pk, dest_rel_path, computer.name))
try:
transport.symlink(remote_abs_path, dest_rel_path)
except (IOError, OSError):
execlogger.warning("[submission of calculation {}] "
"Unable to create remote symlink from {} to {}! "
"Stopping.".format(calculation.pk,
remote_abs_path, dest_rel_path),
extra=logger_extra)
logger.warning("[submission of calculation {}] Unable to create remote symlink from {} to {}! "
"Stopping.".format(node.pk, remote_abs_path, dest_rel_path))
raise
else:
raise IOError("It is not possible to create a symlink "
"between two different machines for "
"calculation {}".format(calculation.pk))
raise IOError("It is not possible to create a symlink between two different machines for "
"calculation {}".format(node.pk))

remotedata = RemoteData(computer=computer, remote_path=workdir)
remotedata.add_incoming(calculation, link_type=LinkType.CREATE, link_label='remote_folder')
remotedata.add_incoming(node, link_type=LinkType.CREATE, link_label='remote_folder')
remotedata.store()

return calc_info, script_filename
Expand Down
2 changes: 1 addition & 1 deletion aiida/engine/processes/calcjobs/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def presubmit(self, folder):
this_pk = self.node.pk if self.node.pk is not None else "[UNSTORED]"
local_copy_list = calcinfo.local_copy_list
try:
validate_list_of_string_tuples(local_copy_list, tuple_length=2)
validate_list_of_string_tuples(local_copy_list, tuple_length=3)
except ValidationError as exc:
raise PluginInternalError("[presubmission of calc {}] "
"local_copy_list format problem: {}".format(this_pk, exc))
Expand Down
8 changes: 6 additions & 2 deletions aiida/orm/nodes/data/singlefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@ def filename(self):
"""
return self.get_attribute('filename')

def open(self, mode='r'): # pylint: disable=arguments-differ
def open(self, key=None, mode='r'):
"""Return an open file handle to the content of this data node.

:param key: optional key within the repository, by default is the `filename` set in the attributes
:param mode: the mode with which to open the file handle
:return: a file handle in read mode
"""
return self._repository.open(self.filename, mode=mode)
if key is None:
key = self.filename

return self._repository.open(key, mode=mode)

def get_content(self):
"""Return the content of the single file stored for this data node.
Expand Down
8 changes: 4 additions & 4 deletions aiida/orm/nodes/data/upf.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,11 @@ def store(self, *args, **kwargs):
if self.is_stored:
return self

with self.open('r') as handle:
with self.open(mode='r') as handle:
parsed_data = parse_upf(handle)

# Open in binary mode which is required for generating the md5 checksum
with self.open('rb') as handle:
with self.open(mode='rb') as handle:
md5 = md5_from_filelike(handle)

try:
Expand Down Expand Up @@ -457,11 +457,11 @@ def _validate(self):

super(UpfData, self)._validate()

with self.open('r') as handle:
with self.open(mode='r') as handle:
parsed_data = parse_upf(handle)

# Open in binary mode which is required for generating the md5 checksum
with self.open('rb') as handle:
with self.open(mode='rb') as handle:
md5 = md5_from_filelike(handle)

try:
Expand Down
6 changes: 3 additions & 3 deletions aiida/orm/utils/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def represent_node(dumper, node):
:return: the representation
"""
if not node.is_stored:
raise ValueError("The node must be stored to be able to represent")
raise ValueError('node {}<{}> cannot be represented because it is not stored'.format(type(node), node.uuid))
return dumper.represent_scalar(_NODE_TAG, u'%s' % node.uuid)


Expand All @@ -71,7 +71,7 @@ def represent_group(dumper, group):
:return: the representation
"""
if not group.is_stored:
raise ValueError("The group must be stored to be able to represent")
raise ValueError('group {} cannot be represented because it is not stored'.format(group))
return dumper.represent_scalar(_GROUP_TAG, u'%s' % group.uuid)


Expand All @@ -98,7 +98,7 @@ def represent_computer(dumper, computer):
:return: the representation
"""
if not computer.is_stored:
raise ValueError("The computer must be stored to be able to represent")
raise ValueError('computer {} cannot be represented because it is not stored'.format(computer))
return dumper.represent_scalar(_COMPUTER_TAG, u'%s' % computer.uuid)


Expand Down