From 49767120222572c7a18595bf0f5e061929d8bae7 Mon Sep 17 00:00:00 2001 From: Sebastiaan Huber Date: Wed, 6 Mar 2019 18:52:00 +0100 Subject: [PATCH] Redefine the structure of the `CalcInfo.local_copy_list` With the upcoming change of the node repository, files within it can no longer be assumed to necessarily be stored on the local file system. It is therefore impossible to address files within it through an absolute file path, however, that is exactly what the `local_copy_list` expects as the first item of its tuples. Now, the `Node` class only exposes methods to get either the content of a repository file or an filelike object. To comply with this change in design the structure of the `local_copy_list` tuples is changed to tuples of length three where each element represents: * UUID of the node * Relative key of the file within the node repository * Relative path within the working directory where to copy the file The `upload_calculation` function of the execmanager has been updated to this interface change, but because our `Transport` interface does not yet provide a method to put an object from a filelike object, we have to create a temporary file on the local file system whose absolute filepath can be passed to the `Transport.put` call. Once the transport is updated to provide a `put_object_from_filelike`, this inefficiency can be removed. --- .../calculations/plugins/templatereplacer.py | 2 +- aiida/common/datastructures.py | 2 +- aiida/engine/daemon/execmanager.py | 118 ++++++++---------- aiida/engine/processes/calcjobs/calcjob.py | 2 +- aiida/orm/nodes/data/singlefile.py | 8 +- aiida/orm/nodes/data/upf.py | 8 +- aiida/orm/utils/serialize.py | 6 +- 7 files changed, 70 insertions(+), 76 deletions(-) diff --git a/aiida/calculations/plugins/templatereplacer.py b/aiida/calculations/plugins/templatereplacer.py index 4fd5b608db..4ac381d52e 100644 --- a/aiida/calculations/plugins/templatereplacer.py +++ b/aiida/calculations/plugins/templatereplacer.py @@ -137,7 +137,7 @@ def prepare_for_submission(self, folder): raise exceptions.InputValidationError("You are asking to copy a file link {}, " "but there is no input link with such a name".format(link_name)) if isinstance(fileobj, orm.SinglefileData): - local_copy_list.append((fileobj.get_file_abs_path(), dest_rel_path)) + local_copy_list.append((fileobj.uuid, fileobj.filename, dest_rel_path)) elif isinstance(fileobj, orm.RemoteData): # can be a folder remote_copy_list.append((fileobj.computer.uuid, fileobj.get_remote_path(), dest_rel_path)) else: diff --git a/aiida/common/datastructures.py b/aiida/common/datastructures.py index 8b55d6c2a8..24ccc484a8 100644 --- a/aiida/common/datastructures.py +++ b/aiida/common/datastructures.py @@ -64,7 +64,7 @@ class CalcInfo(DefaultFieldsAttributeDict): ('linkname_from calc to singlefile', 'subclass of singlefile', 'filename') Each tuple represents a file that will be retrieved from cluster and saved in SinglefileData nodes - * local_copy_list: a list of tuples with format ('localabspath', 'relativedestpath') + * local_copy_list: a list of tuples with format ('node_uuid', 'filename', relativedestpath') * remote_copy_list: a list of tuples with format ('remotemachinename', 'remoteabspath', 'relativedestpath') * remote_symlink_list: a list of tuples with format ('remotemachinename', 'remoteabspath', 'relativedestpath') * codes_info: a list of dictionaries used to pass the info of the execution of a code diff --git a/aiida/engine/daemon/execmanager.py b/aiida/engine/daemon/execmanager.py index c730d6dd8c..cc25939551 100644 --- a/aiida/engine/daemon/execmanager.py +++ b/aiida/engine/daemon/execmanager.py @@ -34,18 +34,19 @@ execlogger = AIIDA_LOGGER.getChild('execmanager') -def upload_calculation(calculation, transport, calc_info, script_filename): - """ - Upload a calculation +def upload_calculation(node, transport, calc_info, script_filename): + """Upload a `CalcJob` instance - :param calculation: the instance of CalcJobNode to submit. + :param node: the `CalcJobNode`. :param transport: an already opened transport to use to submit the calculation. - :param calc_info: the calculation info datastructure returned by `CalcJobNode._presubmit` - :param script_filename: the job launch script returned by `CalcJobNode._presubmit` + :param calc_info: the calculation info datastructure returned by `CalcJobNode.presubmit` + :param script_filename: the job launch script returned by `CalcJobNode.presubmit` """ + from logging import LoggerAdapter + from tempfile import NamedTemporaryFile from aiida.orm import load_node, Code, RemoteData - computer = calculation.computer + computer = node.computer if not computer.is_enabled(): return @@ -53,17 +54,15 @@ def upload_calculation(calculation, transport, calc_info, script_filename): codes_info = calc_info.codes_info input_codes = [load_node(_.code_uuid, sub_classes=(Code,)) for _ in codes_info] - logger_extra = get_dblogger_extra(calculation) + logger_extra = get_dblogger_extra(node) transport.set_logger_extra(logger_extra) + logger = LoggerAdapter(logger=execlogger, extra=logger_extra) - if calculation.has_cached_links(): - raise ValueError("Cannot submit calculation {} because it has " - "cached input links! If you " - "just want to test the submission, use the " - "test_submit() method, otherwise store all links" - "first".format(calculation.pk)) + if node.has_cached_links(): + raise ValueError("Cannot submit calculation {} because it has cached input links! If you just want to test the " + "submission, use the test_submit() method, otherwise store all links first".format(node.pk)) - folder = calculation._raw_input_folder + folder = node._raw_input_folder # NOTE: some logic is partially replicated in the 'test_submit' # method of CalcJobNode. If major logic changes are done @@ -71,21 +70,19 @@ def upload_calculation(calculation, transport, calc_info, script_filename): remote_user = transport.whoami() # TODO Doc: {username} field # TODO: if something is changed here, fix also 'verdi computer test' - remote_working_directory = computer.get_workdir().format( - username=remote_user) + remote_working_directory = computer.get_workdir().format(username=remote_user) if not remote_working_directory.strip(): raise exceptions.ConfigurationError( - "[submission of calculation {}] " - "No remote_working_directory configured for computer " - "'{}'".format(calculation.pk, computer.name)) + "[submission of calculation {}] No remote_working_directory configured for computer '{}'".format( + node.pk, computer.name)) # If it already exists, no exception is raised try: transport.chdir(remote_working_directory) except IOError: - execlogger.debug( + logger.debug( "[submission of calculation {}] Unable to chdir in {}, trying to create it".format( - calculation.pk, remote_working_directory), extra=logger_extra) + node.pk, remote_working_directory)) try: transport.makedirs(remote_working_directory) transport.chdir(remote_working_directory) @@ -94,7 +91,7 @@ def upload_calculation(calculation, transport, calc_info, script_filename): "[submission of calculation {}] " "Unable to create the remote directory {} on " "computer '{}': {}".format( - calculation.pk, remote_working_directory, computer.name, exc)) + node.pk, remote_working_directory, computer.name, exc)) # Store remotely with sharding (here is where we choose # the folder structure of remote jobs; then I store this # in the calculation properties using _set_remote_dir @@ -116,7 +113,7 @@ def upload_calculation(calculation, transport, calc_info, script_filename): path_existing = os.path.join(transport.getcwd(), calc_info.uuid[4:]) path_lost_found = os.path.join(remote_working_directory, REMOTE_WORK_DIRECTORY_LOST_FOUND) path_target = os.path.join(path_lost_found, calc_info.uuid) - execlogger.warning('tried to create path {} but it already exists, moving the entire folder to {}'.format( + logger.warning('tried to create path {} but it already exists, moving the entire folder to {}'.format( path_existing, path_target)) # Make sure the lost+found directory exists, then copy the existing folder there and delete the original @@ -131,7 +128,7 @@ def upload_calculation(calculation, transport, calc_info, script_filename): # I store the workdir of the calculation for later file retrieval workdir = transport.getcwd() - calculation.set_remote_workdir(workdir) + node.set_remote_workdir(workdir) # I first create the code files, so that the code can put # default files to be overwritten by the plugin itself. @@ -146,74 +143,67 @@ def upload_calculation(calculation, transport, calc_info, script_filename): # copy all files, recursively with folders for f in folder.get_content_list(): - execlogger.debug("[submission of calculation {}] " - "copying file/folder {}...".format(calculation.pk, f), - extra=logger_extra) + logger.debug("[submission of calculation {}] copying file/folder {}...".format(node.pk, f)) transport.put(folder.get_abs_path(f), f) # local_copy_list is a list of tuples, - # each with (src_abs_path, dest_rel_path) - # NOTE: validation of these lists are done - # inside calculation._presubmit() + # each with (uuid, dest_rel_path) + # NOTE: validation of these lists are done inside calculation.presubmit() local_copy_list = calc_info.local_copy_list remote_copy_list = calc_info.remote_copy_list remote_symlink_list = calc_info.remote_symlink_list if local_copy_list is not None: - for src_abs_path, dest_rel_path in local_copy_list: - execlogger.debug("[submission of calculation {}] " - "copying local file/folder to {}".format( - calculation.pk, dest_rel_path), - extra=logger_extra) - transport.put(src_abs_path, dest_rel_path) + for uuid, filename, target in local_copy_list: + logger.debug("[submission of calculation {}] copying local file/folder to {}".format(node.pk, target)) + + try: + data_node = load_node(uuid=uuid) + except exceptions.NotExistent: + logger.warning('failed to load Node<{}> specified in the `local_copy_list`'.format(uuid)) + + # Note, once #2579 is implemented, use the `node.open` method instead of the named temporary file in + # combination with the new `Transport.put_object_from_filelike` + with NamedTemporaryFile(mode='w+') as handle: + handle.write(data_node.get_object_content(filename)) + handle.flush() + transport.put(handle.name, target) if remote_copy_list is not None: - for (remote_computer_uuid, remote_abs_path, - dest_rel_path) in remote_copy_list: + for (remote_computer_uuid, remote_abs_path, dest_rel_path) in remote_copy_list: if remote_computer_uuid == computer.uuid: - execlogger.debug("[submission of calculation {}] " - "copying {} remotely, directly on the machine " - "{}".format(calculation.pk, dest_rel_path, computer.name)) + logger.debug("[submission of calculation {}] copying {} remotely, directly on the machine {}".format( + node.pk, dest_rel_path, computer.name)) try: transport.copy(remote_abs_path, dest_rel_path) except (IOError, OSError): - execlogger.warning("[submission of calculation {}] " - "Unable to copy remote resource from {} to {}! " - "Stopping.".format(calculation.pk, - remote_abs_path, dest_rel_path), - extra=logger_extra) + logger.warning("[submission of calculation {}] Unable to copy remote resource from {} to {}! " + "Stopping.".format(node.pk, remote_abs_path, dest_rel_path)) raise else: - # TODO: implement copy between two different - # machines! + # TODO: implement copy between two different machines! raise NotImplementedError( - "[presubmission of calculation {}] " - "Remote copy between two different machines is " - "not implemented yet".format(calculation.pk)) + "[submission of calculation {}] Remote copy between two different machines is " + "not implemented yet".format(node.pk)) if remote_symlink_list is not None: for (remote_computer_uuid, remote_abs_path, dest_rel_path) in remote_symlink_list: if remote_computer_uuid == computer.uuid: - execlogger.debug("[submission of calculation {}] " - "copying {} remotely, directly on the machine " - "{}".format(calculation.pk, dest_rel_path, computer.name)) + logger.debug("[submission of calculation {}] copying {} remotely, directly on the machine {}".format( + node.pk, dest_rel_path, computer.name)) try: transport.symlink(remote_abs_path, dest_rel_path) except (IOError, OSError): - execlogger.warning("[submission of calculation {}] " - "Unable to create remote symlink from {} to {}! " - "Stopping.".format(calculation.pk, - remote_abs_path, dest_rel_path), - extra=logger_extra) + logger.warning("[submission of calculation {}] Unable to create remote symlink from {} to {}! " + "Stopping.".format(node.pk, remote_abs_path, dest_rel_path)) raise else: - raise IOError("It is not possible to create a symlink " - "between two different machines for " - "calculation {}".format(calculation.pk)) + raise IOError("It is not possible to create a symlink between two different machines for " + "calculation {}".format(node.pk)) remotedata = RemoteData(computer=computer, remote_path=workdir) - remotedata.add_incoming(calculation, link_type=LinkType.CREATE, link_label='remote_folder') + remotedata.add_incoming(node, link_type=LinkType.CREATE, link_label='remote_folder') remotedata.store() return calc_info, script_filename diff --git a/aiida/engine/processes/calcjobs/calcjob.py b/aiida/engine/processes/calcjobs/calcjob.py index 41c52de156..8c1e5c7500 100644 --- a/aiida/engine/processes/calcjobs/calcjob.py +++ b/aiida/engine/processes/calcjobs/calcjob.py @@ -374,7 +374,7 @@ def presubmit(self, folder): this_pk = self.node.pk if self.node.pk is not None else "[UNSTORED]" local_copy_list = calcinfo.local_copy_list try: - validate_list_of_string_tuples(local_copy_list, tuple_length=2) + validate_list_of_string_tuples(local_copy_list, tuple_length=3) except ValidationError as exc: raise PluginInternalError("[presubmission of calc {}] " "local_copy_list format problem: {}".format(this_pk, exc)) diff --git a/aiida/orm/nodes/data/singlefile.py b/aiida/orm/nodes/data/singlefile.py index 71b1c712aa..3be944dd88 100644 --- a/aiida/orm/nodes/data/singlefile.py +++ b/aiida/orm/nodes/data/singlefile.py @@ -36,13 +36,17 @@ def filename(self): """ return self.get_attribute('filename') - def open(self, mode='r'): # pylint: disable=arguments-differ + def open(self, key=None, mode='r'): """Return an open file handle to the content of this data node. + :param key: optional key within the repository, by default is the `filename` set in the attributes :param mode: the mode with which to open the file handle :return: a file handle in read mode """ - return self._repository.open(self.filename, mode=mode) + if key is None: + key = self.filename + + return self._repository.open(key, mode=mode) def get_content(self): """Return the content of the single file stored for this data node. diff --git a/aiida/orm/nodes/data/upf.py b/aiida/orm/nodes/data/upf.py index 2b803a9fdf..4ee54df0ea 100644 --- a/aiida/orm/nodes/data/upf.py +++ b/aiida/orm/nodes/data/upf.py @@ -381,11 +381,11 @@ def store(self, *args, **kwargs): if self.is_stored: return self - with self.open('r') as handle: + with self.open(mode='r') as handle: parsed_data = parse_upf(handle) # Open in binary mode which is required for generating the md5 checksum - with self.open('rb') as handle: + with self.open(mode='rb') as handle: md5 = md5_from_filelike(handle) try: @@ -457,11 +457,11 @@ def _validate(self): super(UpfData, self)._validate() - with self.open('r') as handle: + with self.open(mode='r') as handle: parsed_data = parse_upf(handle) # Open in binary mode which is required for generating the md5 checksum - with self.open('rb') as handle: + with self.open(mode='rb') as handle: md5 = md5_from_filelike(handle) try: diff --git a/aiida/orm/utils/serialize.py b/aiida/orm/utils/serialize.py index ad8513d6dd..9d51fcfa15 100644 --- a/aiida/orm/utils/serialize.py +++ b/aiida/orm/utils/serialize.py @@ -44,7 +44,7 @@ def represent_node(dumper, node): :return: the representation """ if not node.is_stored: - raise ValueError("The node must be stored to be able to represent") + raise ValueError('node {}<{}> cannot be represented because it is not stored'.format(type(node), node.uuid)) return dumper.represent_scalar(_NODE_TAG, u'%s' % node.uuid) @@ -71,7 +71,7 @@ def represent_group(dumper, group): :return: the representation """ if not group.is_stored: - raise ValueError("The group must be stored to be able to represent") + raise ValueError('group {} cannot be represented because it is not stored'.format(group)) return dumper.represent_scalar(_GROUP_TAG, u'%s' % group.uuid) @@ -98,7 +98,7 @@ def represent_computer(dumper, computer): :return: the representation """ if not computer.is_stored: - raise ValueError("The computer must be stored to be able to represent") + raise ValueError('computer {} cannot be represented because it is not stored'.format(computer)) return dumper.represent_scalar(_COMPUTER_TAG, u'%s' % computer.uuid)