diff --git a/src/aiida/calculations/monitors/base.py b/src/aiida/calculations/monitors/base.py index e165647ea4..459f4eba9d 100644 --- a/src/aiida/calculations/monitors/base.py +++ b/src/aiida/calculations/monitors/base.py @@ -3,6 +3,7 @@ from __future__ import annotations import tempfile +from pathlib import Path from aiida.orm import CalcJobNode from aiida.transports import Transport @@ -19,7 +20,11 @@ def always_kill(node: CalcJobNode, transport: Transport) -> str | None: :returns: A string if the job should be killed, `None` otherwise. """ with tempfile.NamedTemporaryFile('w+') as handle: - transport.getfile('_aiidasubmit.sh', handle.name) + cwd = node.get_remote_workdir() + if cwd is None: + raise ValueError('The remote work directory cannot be None') + + transport.getfile(str(Path(cwd).joinpath('_aiidasubmit.sh')), handle.name) handle.seek(0) output = handle.read() diff --git a/src/aiida/cmdline/commands/cmd_computer.py b/src/aiida/cmdline/commands/cmd_computer.py index 4cdfc2176f..42feb36d25 100644 --- a/src/aiida/cmdline/commands/cmd_computer.py +++ b/src/aiida/cmdline/commands/cmd_computer.py @@ -134,11 +134,7 @@ def _computer_create_temp_file(transport, scheduler, authinfo, computer): file_content = f"Test from 'verdi computer test' on {datetime.datetime.now().isoformat()}" workdir = authinfo.get_workdir().format(username=transport.whoami()) - try: - transport.chdir(workdir) - except OSError: - transport.makedirs(workdir) - transport.chdir(workdir) + transport.makedirs(workdir, ignore_existing=True) with tempfile.NamedTemporaryFile(mode='w+') as tempf: fname = os.path.split(tempf.name)[1] diff --git a/src/aiida/engine/daemon/execmanager.py b/src/aiida/engine/daemon/execmanager.py index 045347404c..73c30cab61 100644 --- a/src/aiida/engine/daemon/execmanager.py +++ b/src/aiida/engine/daemon/execmanager.py @@ -15,10 +15,10 @@ from __future__ import annotations import os -import pathlib import shutil from collections.abc import Mapping from logging import LoggerAdapter +from pathlib import Path from tempfile import NamedTemporaryFile, TemporaryDirectory from typing import TYPE_CHECKING, Any, List, Optional, Tuple, Union from typing import Mapping as MappingType @@ -103,7 +103,7 @@ def upload_calculation( # If we are performing a dry-run, the working directory should actually be a local folder that should already exist if dry_run: - workdir = transport.getcwd() + workdir = Path(folder.abspath) else: remote_user = transport.whoami() remote_working_directory = computer.get_workdir().format(username=remote_user) @@ -114,16 +114,13 @@ def upload_calculation( ) # If it already exists, no exception is raised - try: - transport.chdir(remote_working_directory) - except OSError: + if not transport.path_exists(remote_working_directory): logger.debug( - f'[submission of calculation {node.pk}] Unable to ' - f'chdir in {remote_working_directory}, trying to create it' + f'[submission of calculation {node.pk}] Path ' + f'{remote_working_directory} does not exist, trying to create it' ) try: transport.makedirs(remote_working_directory) - transport.chdir(remote_working_directory) except EnvironmentError as exc: raise exceptions.ConfigurationError( f'[submission of calculation {node.pk}] ' @@ -135,20 +132,18 @@ def upload_calculation( # in the calculation properties using _set_remote_dir # and I do not have to know the logic, but I just need to # read the absolute path from the calculation properties. - transport.mkdir(calc_info.uuid[:2], ignore_existing=True) - transport.chdir(calc_info.uuid[:2]) - transport.mkdir(calc_info.uuid[2:4], ignore_existing=True) - transport.chdir(calc_info.uuid[2:4]) + workdir = Path(remote_working_directory).joinpath(calc_info.uuid[:2], calc_info.uuid[2:4]) + transport.makedirs(str(workdir), ignore_existing=True) try: # The final directory may already exist, most likely because this function was already executed once, but - # failed and as a result was rescheduled by the eninge. In this case it would be fine to delete the folder + # failed and as a result was rescheduled by the engine. In this case it would be fine to delete the folder # and create it from scratch, except that we cannot be sure that this the actual case. Therefore, to err on # the safe side, we move the folder to the lost+found directory before recreating the folder from scratch - transport.mkdir(calc_info.uuid[4:]) + transport.mkdir(str(workdir.joinpath(calc_info.uuid[4:]))) except OSError: # Move the existing directory to lost+found, log a warning and create a clean directory anyway - path_existing = os.path.join(transport.getcwd(), calc_info.uuid[4:]) + path_existing = os.path.join(str(workdir), calc_info.uuid[4:]) path_lost_found = os.path.join(remote_working_directory, REMOTE_WORK_DIRECTORY_LOST_FOUND) path_target = os.path.join(path_lost_found, calc_info.uuid) logger.warning( @@ -161,13 +156,11 @@ def upload_calculation( transport.rmtree(path_existing) # Now we can create a clean folder for this calculation - transport.mkdir(calc_info.uuid[4:]) + transport.mkdir(str(workdir.joinpath(calc_info.uuid[4:]))) finally: - transport.chdir(calc_info.uuid[4:]) + workdir = workdir.joinpath(calc_info.uuid[4:]) - # I store the workdir of the calculation for later file retrieval - workdir = transport.getcwd() - node.set_remote_workdir(workdir) + node.set_remote_workdir(str(workdir)) # I first create the code files, so that the code can put # default files to be overwritten by the plugin itself. @@ -178,22 +171,25 @@ def upload_calculation( # Note: this will possibly overwrite files for root, dirnames, filenames in code.base.repository.walk(): # mkdir of root - transport.makedirs(str(root), ignore_existing=True) + transport.makedirs(str(workdir.joinpath(root)), ignore_existing=True) # remotely mkdir first for dirname in dirnames: - transport.makedirs(str(root / dirname), ignore_existing=True) + transport.makedirs(str(workdir.joinpath(root, dirname)), ignore_existing=True) # Note, once #2579 is implemented, use the `node.open` method instead of the named temporary file in # combination with the new `Transport.put_object_from_filelike` # Since the content of the node could potentially be binary, we read the raw bytes and pass them on for filename in filenames: with NamedTemporaryFile(mode='wb+') as handle: - content = code.base.repository.get_object_content((pathlib.Path(root) / filename), mode='rb') + content = code.base.repository.get_object_content(Path(root) / filename, mode='rb') handle.write(content) handle.flush() - transport.put(handle.name, str(root / filename)) - transport.chmod(str(code.filepath_executable), 0o755) # rwxr-xr-x + transport.put(handle.name, str(workdir.joinpath(root, filename))) + if code.filepath_executable.is_absolute(): + transport.chmod(str(code.filepath_executable), 0o755) # rwxr-xr-x + else: + transport.chmod(str(workdir.joinpath(code.filepath_executable)), 0o755) # rwxr-xr-x # local_copy_list is a list of tuples, each with (uuid, dest_path, rel_path) # NOTE: validation of these lists are done inside calculation.presubmit() @@ -210,20 +206,22 @@ def upload_calculation( for file_copy_operation in file_copy_operation_order: if file_copy_operation is FileCopyOperation.LOCAL: - _copy_local_files(logger, node, transport, inputs, local_copy_list) + _copy_local_files(logger, node, transport, inputs, local_copy_list, workdir=workdir) elif file_copy_operation is FileCopyOperation.REMOTE: if not dry_run: - _copy_remote_files(logger, node, computer, transport, remote_copy_list, remote_symlink_list) + _copy_remote_files( + logger, node, computer, transport, remote_copy_list, remote_symlink_list, workdir=workdir + ) elif file_copy_operation is FileCopyOperation.SANDBOX: if not dry_run: - _copy_sandbox_files(logger, node, transport, folder) + _copy_sandbox_files(logger, node, transport, folder, workdir=workdir) else: raise RuntimeError(f'file copy operation {file_copy_operation} is not yet implemented.') # In a dry_run, the working directory is the raw input folder, which will already contain these resources if dry_run: if remote_copy_list: - filepath = os.path.join(workdir, '_aiida_remote_copy_list.txt') + filepath = os.path.join(str(workdir), '_aiida_remote_copy_list.txt') with open(filepath, 'w', encoding='utf-8') as handle: # type: ignore[assignment] for _, remote_abs_path, dest_rel_path in remote_copy_list: handle.write( @@ -232,7 +230,7 @@ def upload_calculation( ) if remote_symlink_list: - filepath = os.path.join(workdir, '_aiida_remote_symlink_list.txt') + filepath = os.path.join(str(workdir), '_aiida_remote_symlink_list.txt') with open(filepath, 'w', encoding='utf-8') as handle: # type: ignore[assignment] for _, remote_abs_path, dest_rel_path in remote_symlink_list: handle.write( @@ -276,12 +274,12 @@ def upload_calculation( node.base.repository._update_repository_metadata() if not dry_run: - return RemoteData(computer=computer, remote_path=workdir) + return RemoteData(computer=computer, remote_path=str(workdir)) return None -def _copy_remote_files(logger, node, computer, transport, remote_copy_list, remote_symlink_list): +def _copy_remote_files(logger, node, computer, transport, remote_copy_list, remote_symlink_list, workdir: Path): """Perform the copy instructions of the ``remote_copy_list`` and ``remote_symlink_list``.""" for remote_computer_uuid, remote_abs_path, dest_rel_path in remote_copy_list: if remote_computer_uuid == computer.uuid: @@ -290,7 +288,7 @@ def _copy_remote_files(logger, node, computer, transport, remote_copy_list, remo f'remotely, directly on the machine {computer.label}' ) try: - transport.copy(remote_abs_path, dest_rel_path) + transport.copy(remote_abs_path, str(workdir.joinpath(dest_rel_path))) except FileNotFoundError: logger.warning( f'[submission of calculation {node.pk}] Unable to copy remote ' @@ -314,10 +312,10 @@ def _copy_remote_files(logger, node, computer, transport, remote_copy_list, remo f'[submission of calculation {node.pk}] copying {dest_rel_path} remotely, ' f'directly on the machine {computer.label}' ) - remote_dirname = pathlib.Path(dest_rel_path).parent + remote_dirname = Path(dest_rel_path).parent try: - transport.makedirs(remote_dirname, ignore_existing=True) - transport.symlink(remote_abs_path, dest_rel_path) + transport.makedirs(str(workdir.joinpath(remote_dirname)), ignore_existing=True) + transport.symlink(remote_abs_path, str(workdir.joinpath(dest_rel_path))) except OSError: logger.warning( f'[submission of calculation {node.pk}] Unable to create remote symlink ' @@ -330,9 +328,8 @@ def _copy_remote_files(logger, node, computer, transport, remote_copy_list, remo ) -def _copy_local_files(logger, node, transport, inputs, local_copy_list): +def _copy_local_files(logger, node, transport, inputs, local_copy_list, workdir: Path): """Perform the copy instructions of the ``local_copy_list``.""" - for uuid, filename, target in local_copy_list: logger.debug(f'[submission of calculation {node.uuid}] copying local file/folder to {target}') @@ -348,7 +345,7 @@ def _copy_local_files(logger, node, transport, inputs, local_copy_list): # The transport class can only copy files directly from the file system, so the files in the source node's repo # have to first be copied to a temporary directory on disk. with TemporaryDirectory() as tmpdir: - dirpath = pathlib.Path(tmpdir) + dirpath = Path(tmpdir) # If no explicit source filename is defined, we assume the top-level directory filename_source = filename or '.' @@ -359,14 +356,14 @@ def _copy_local_files(logger, node, transport, inputs, local_copy_list): # The logic below takes care of an edge case where the source is a file but the target is a directory. In # this case, the v2.5.1 implementation would raise an `IsADirectoryError` exception, because it would try # to open the directory in the sandbox folder as a file when writing the contents. - if file_type_source == FileType.FILE and target and transport.isdir(target): + if file_type_source == FileType.FILE and target and transport.isdir(str(workdir.joinpath(target))): raise IsADirectoryError # In case the source filename is specified and it is a directory that already exists in the remote, we # want to avoid nested directories in the target path to replicate the behavior of v2.5.1. This is done by # setting the target filename to '.', which means the contents of the node will be copied in the top level # of the temporary directory, whose contents are then copied into the target directory. - if filename and transport.isdir(filename): + if filename and transport.isdir(str(workdir.joinpath(filename))): filename_target = '.' filepath_target = (dirpath / filename_target).resolve().absolute() @@ -375,21 +372,25 @@ def _copy_local_files(logger, node, transport, inputs, local_copy_list): if file_type_source == FileType.DIRECTORY: # If the source object is a directory, we copy its entire contents data_node.base.repository.copy_tree(filepath_target, filename_source) - transport.put(f'{dirpath}/*', target or '.', overwrite=True) + transport.put( + f'{dirpath}/*', + str(workdir.joinpath(target)) if target else str(workdir.joinpath('.')), + overwrite=True, + ) else: # Otherwise, simply copy the file with filepath_target.open('wb') as handle: with data_node.base.repository.open(filename_source, 'rb') as source: shutil.copyfileobj(source, handle) - transport.makedirs(str(pathlib.Path(target).parent), ignore_existing=True) - transport.put(str(filepath_target), target) + transport.makedirs(str(workdir.joinpath(Path(target).parent)), ignore_existing=True) + transport.put(str(filepath_target), str(workdir.joinpath(target))) -def _copy_sandbox_files(logger, node, transport, folder): +def _copy_sandbox_files(logger, node, transport, folder, workdir: Path): """Copy the contents of the sandbox folder to the working directory.""" for filename in folder.get_content_list(): logger.debug(f'[submission of calculation {node.pk}] copying file/folder {filename}...') - transport.put(folder.get_abs_path(filename), filename) + transport.put(folder.get_abs_path(filename), str(workdir.joinpath(filename))) def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str | ExitCode: @@ -454,14 +455,14 @@ def stash_calculation(calculation: CalcJobNode, transport: Transport) -> None: EXEC_LOGGER.debug(f'stashing files for calculation<{calculation.pk}>: {source_list}', extra=logger_extra) uuid = calculation.uuid - source_basepath = pathlib.Path(calculation.get_remote_workdir()) - target_basepath = pathlib.Path(stash_options['target_base']) / uuid[:2] / uuid[2:4] / uuid[4:] + source_basepath = Path(calculation.get_remote_workdir()) + target_basepath = Path(stash_options['target_base']) / uuid[:2] / uuid[2:4] / uuid[4:] for source_filename in source_list: if transport.has_magic(source_filename): copy_instructions = [] for globbed_filename in transport.glob(str(source_basepath / source_filename)): - target_filepath = target_basepath / pathlib.Path(globbed_filename).relative_to(source_basepath) + target_filepath = target_basepath / Path(globbed_filename).relative_to(source_basepath) copy_instructions.append((globbed_filename, target_filepath)) else: copy_instructions = [(source_basepath / source_filename, target_basepath / source_filename)] @@ -523,8 +524,6 @@ def retrieve_calculation( retrieved_files = FolderData() with transport: - transport.chdir(workdir) - # First, retrieve the files of folderdata retrieve_list = calculation.get_retrieve_list() retrieve_temporary_list = calculation.get_retrieve_temporary_list() @@ -601,10 +600,10 @@ def retrieve_files_from_list( * a string * a list - If it is a string, it represents the remote absolute filepath of the file. + If it is a string, it represents the remote absolute or relative filepath of the file. If the item is a list, the elements will correspond to the following: - * remotepath + * remotepath (relative path) * localpath * depth @@ -616,18 +615,21 @@ def retrieve_files_from_list( :param folder: an absolute path to a folder that contains the files to copy. :param retrieve_list: the list of files to retrieve. """ + workdir = Path(calculation.get_remote_workdir()) for item in retrieve_list: if isinstance(item, (list, tuple)): tmp_rname, tmp_lname, depth = item # if there are more than one file I do something differently if transport.has_magic(tmp_rname): - remote_names = transport.glob(tmp_rname) + remote_names = transport.glob(str(workdir.joinpath(tmp_rname))) local_names = [] for rem in remote_names: + # get the relative path so to make local_names relative + rel_rem = os.path.relpath(rem, str(workdir)) if depth is None: - local_names.append(os.path.join(tmp_lname, rem)) + local_names.append(os.path.join(tmp_lname, rel_rem)) else: - to_append = rem.split(os.path.sep)[-depth:] if depth > 0 else [] + to_append = rel_rem.split(os.path.sep)[-depth:] if depth > 0 else [] local_names.append(os.path.sep.join([tmp_lname] + to_append)) else: remote_names = [tmp_rname] @@ -638,13 +640,22 @@ def retrieve_files_from_list( new_folder = os.path.join(folder, os.path.split(this_local_file)[0]) if not os.path.exists(new_folder): os.makedirs(new_folder) - elif transport.has_magic(item): # it is a string - remote_names = transport.glob(item) - local_names = [os.path.split(rem)[1] for rem in remote_names] else: - remote_names = [item] - local_names = [os.path.split(item)[1]] + abs_item = item if item.startswith('/') else str(workdir.joinpath(item)) + + if transport.has_magic(abs_item): + remote_names = transport.glob(abs_item) + local_names = [os.path.split(rem)[1] for rem in remote_names] + else: + remote_names = [abs_item] + local_names = [os.path.split(abs_item)[1]] for rem, loc in zip(remote_names, local_names): transport.logger.debug(f"[retrieval of calc {calculation.pk}] Trying to retrieve remote item '{rem}'") - transport.get(rem, os.path.join(folder, loc), ignore_nonexisting=True) + + if rem.startswith('/'): + to_get = rem + else: + to_get = str(workdir.joinpath(rem)) + + transport.get(to_get, os.path.join(folder, loc), ignore_nonexisting=True) diff --git a/src/aiida/engine/processes/calcjobs/calcjob.py b/src/aiida/engine/processes/calcjobs/calcjob.py index 70a72ba307..d5acfca5fc 100644 --- a/src/aiida/engine/processes/calcjobs/calcjob.py +++ b/src/aiida/engine/processes/calcjobs/calcjob.py @@ -643,7 +643,6 @@ def _perform_dry_run(self): with LocalTransport() as transport: with SubmitTestFolder() as folder: calc_info = self.presubmit(folder) - transport.chdir(folder.abspath) upload_calculation(self.node, transport, calc_info, folder, inputs=self.inputs, dry_run=True) self.node.dry_run_info = { # type: ignore[attr-defined] 'folder': folder.abspath, diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index 8b8231634f..1059d277ba 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -253,7 +253,6 @@ async def task_monitor_job( async def do_monitor(): with transport_queue.request_transport(authinfo) as request: transport = await cancellable.with_interrupt(request) - transport.chdir(node.get_remote_workdir()) return monitors.process(node, transport) try: diff --git a/src/aiida/orm/nodes/data/remote/base.py b/src/aiida/orm/nodes/data/remote/base.py index 9147a58d10..1fc691d113 100644 --- a/src/aiida/orm/nodes/data/remote/base.py +++ b/src/aiida/orm/nodes/data/remote/base.py @@ -58,13 +58,10 @@ def is_empty(self): transport = authinfo.get_transport() with transport: - try: - transport.chdir(self.get_remote_path()) - except OSError: - # If the transport OSError the directory no longer exists and was deleted + if not transport.isdir(self.get_remote_path()): return True - return not transport.listdir() + return not transport.listdir(self.get_remote_path()) def getfile(self, relpath, destpath): """Connects to the remote folder and retrieves the content of a file. @@ -96,22 +93,15 @@ def listdir(self, relpath='.'): authinfo = self.get_authinfo() with authinfo.get_transport() as transport: - try: - full_path = os.path.join(self.get_remote_path(), relpath) - transport.chdir(full_path) - except OSError as exception: - if exception.errno in (2, 20): # directory not existing or not a directory - exc = OSError( - f'The required remote folder {full_path} on {self.computer.label} does not exist, is not a ' - 'directory or has been deleted.' - ) - exc.errno = exception.errno - raise exc from exception - else: - raise + full_path = os.path.join(self.get_remote_path(), relpath) + if not transport.isdir(full_path): + raise OSError( + f'The required remote folder {full_path} on {self.computer.label} does not exist, is not a ' + 'directory or has been deleted.' + ) try: - return transport.listdir() + return transport.listdir(full_path) except OSError as exception: if exception.errno in (2, 20): # directory not existing or not a directory exc = OSError( @@ -132,22 +122,15 @@ def listdir_withattributes(self, path='.'): authinfo = self.get_authinfo() with authinfo.get_transport() as transport: - try: - full_path = os.path.join(self.get_remote_path(), path) - transport.chdir(full_path) - except OSError as exception: - if exception.errno in (2, 20): # directory not existing or not a directory - exc = OSError( - f'The required remote folder {full_path} on {self.computer.label} does not exist, is not a ' - 'directory or has been deleted.' - ) - exc.errno = exception.errno - raise exc from exception - else: - raise + full_path = os.path.join(self.get_remote_path(), path) + if not transport.isdir(full_path): + raise OSError( + f'The required remote folder {full_path} on {self.computer.label} does not exist, is not a ' + 'directory or has been deleted.' + ) try: - return transport.listdir_withattributes() + return transport.listdir_withattributes(full_path) except OSError as exception: if exception.errno in (2, 20): # directory not existing or not a directory exc = OSError( diff --git a/src/aiida/orm/utils/remote.py b/src/aiida/orm/utils/remote.py index 2518791fb8..f55cedc35a 100644 --- a/src/aiida/orm/utils/remote.py +++ b/src/aiida/orm/utils/remote.py @@ -39,11 +39,8 @@ def clean_remote(transport: Transport, path: str) -> None: if not transport.is_open: raise ValueError('the transport should already be open') - basedir, relative_path = os.path.split(path) - try: - transport.chdir(basedir) - transport.rmtree(relative_path) + transport.rmtree(path) except OSError: pass diff --git a/src/aiida/schedulers/plugins/bash.py b/src/aiida/schedulers/plugins/bash.py index 0511a4cb99..f2e1da6db6 100644 --- a/src/aiida/schedulers/plugins/bash.py +++ b/src/aiida/schedulers/plugins/bash.py @@ -26,11 +26,12 @@ class BashCliScheduler(Scheduler, metaclass=abc.ABCMeta): def submit_job(self, working_directory: str, filename: str) -> str | ExitCode: """Submit a job. - :param working_directory: The absolute filepath to the working directory where the job is to be exectued. + :param working_directory: The absolute filepath to the working directory where the job is to be executed. :param filename: The filename of the submission script relative to the working directory. """ - self.transport.chdir(working_directory) - result = self.transport.exec_command_wait(self._get_submit_command(escape_for_bash(filename))) + result = self.transport.exec_command_wait( + self._get_submit_command(escape_for_bash(filename)), workdir=working_directory + ) return self._parse_submit_output(*result) def get_jobs( diff --git a/src/aiida/schedulers/scheduler.py b/src/aiida/schedulers/scheduler.py index 5168762f80..3cd4136984 100644 --- a/src/aiida/schedulers/scheduler.py +++ b/src/aiida/schedulers/scheduler.py @@ -129,7 +129,7 @@ def create_job_resource(cls, **kwargs): def submit_job(self, working_directory: str, filename: str) -> str | ExitCode: """Submit a job. - :param working_directory: The absolute filepath to the working directory where the job is to be exectued. + :param working_directory: The absolute filepath to the working directory where the job is to be executed. :param filename: The filename of the submission script relative to the working directory. :returns: """ diff --git a/src/aiida/transports/plugins/local.py b/src/aiida/transports/plugins/local.py index b8263620d3..755476a066 100644 --- a/src/aiida/transports/plugins/local.py +++ b/src/aiida/transports/plugins/local.py @@ -8,14 +8,6 @@ ########################################################################### """Local transport""" -### -### GP: a note on the local transport: -### I believe that we must not use os.chdir to keep track of the folder -### in which we are, since this may have very nasty side effects in other -### parts of code, and make things not thread-safe. -### we should instead keep track internally of the 'current working directory' -### in the exact same way as paramiko does already. - import contextlib import errno import glob @@ -101,7 +93,11 @@ def curdir(self): raise TransportInternalError('Error, local method called for LocalTransport without opening the channel first') def chdir(self, path): - """Changes directory to path, emulated internally. + """ + PLEASE DON'T USE `chdir()` IN NEW DEVELOPMENTS, INSTEAD DIRECTLY PASS ABSOLUTE PATHS TO INTERFACE. + `chdir()` is DEPRECATED and will be removed in the next major version. + + Changes directory to path, emulated internally. :param path: path to cd into :raise OSError: if the directory does not have read attributes. """ @@ -123,7 +119,11 @@ def normalize(self, path='.'): return os.path.realpath(os.path.join(self.curdir, path)) def getcwd(self): - """Returns the current working directory, emulated by the transport""" + """ + PLEASE DON'T USE `getcwd()` IN NEW DEVELOPMENTS, INSTEAD DIRECTLY PASS ABSOLUTE PATHS TO INTERFACE. + `getcwd()` is DEPRECATED and will be removed in the next major version. + + Returns the current working directory, emulated by the transport""" return self.curdir @staticmethod @@ -695,11 +695,9 @@ def isfile(self, path): return os.path.isfile(os.path.join(self.curdir, path)) @contextlib.contextmanager - def _exec_command_internal(self, command, **kwargs): + def _exec_command_internal(self, command, workdir=None, **kwargs): """Executes the specified command in bash login shell. - Before the command is executed, changes directory to the current - working directory as returned by self.getcwd(). For executing commands and waiting for them to finish, use exec_command_wait. @@ -710,6 +708,10 @@ def _exec_command_internal(self, command, **kwargs): :param command: the command to execute. The command is assumed to be already escaped using :py:func:`aiida.common.escaping.escape_for_bash`. + :param workdir: (optional, default=None) if set, the command will be executed + in the specified working directory. + if None, the command will be executed in the current working directory, + from DEPRECATED `self.getcwd()`. :return: a tuple with (stdin, stdout, stderr, proc), where stdin, stdout and stderr behave as file-like objects, @@ -724,26 +726,35 @@ def _exec_command_internal(self, command, **kwargs): command = bash_commmand + escape_for_bash(command) + if workdir: + cwd = workdir + else: + cwd = self.getcwd() + with subprocess.Popen( command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - cwd=self.getcwd(), + cwd=cwd, start_new_session=True, ) as process: yield process - def exec_command_wait_bytes(self, command, stdin=None, **kwargs): + def exec_command_wait_bytes(self, command, stdin=None, workdir=None, **kwargs): """Executes the specified command and waits for it to finish. :param command: the command to execute + :param workdir: (optional, default=None) if set, the command will be executed + in the specified working directory. + if None, the command will be executed in the current working directory, + from DEPRECATED `self.getcwd()`. :return: a tuple with (return_value, stdout, stderr) where stdout and stderr are both bytes and the return_value is an int. """ - with self._exec_command_internal(command) as process: + with self._exec_command_internal(command, workdir) as process: if stdin is not None: # Implicitly assume that the desired encoding is 'utf-8' if I receive a string. # Also, if I get a StringIO, I just read it all in memory and put it into a BytesIO. diff --git a/src/aiida/transports/plugins/ssh.py b/src/aiida/transports/plugins/ssh.py index c62f17a67b..d6159fe46f 100644 --- a/src/aiida/transports/plugins/ssh.py +++ b/src/aiida/transports/plugins/ssh.py @@ -581,7 +581,11 @@ def __str__(self): return f"{'OPEN' if self._is_open else 'CLOSED'} [{conn_info}]" def chdir(self, path): - """Change directory of the SFTP session. Emulated internally by paramiko. + """ + PLEASE DON'T USE `chdir()` IN NEW DEVELOPMENTS, INSTEAD DIRECTLY PASS ABSOLUTE PATHS TO INTERFACE. + `chdir()` is DEPRECATED and will be removed in the next major version. + + Change directory of the SFTP session. Emulated internally by paramiko. Differently from paramiko, if you pass None to chdir, nothing happens and the cwd is unchanged. @@ -646,7 +650,11 @@ def lstat(self, path): return self.sftp.lstat(path) def getcwd(self): - """Return the current working directory for this SFTP session, as + """ + PLEASE DON'T USE `getcwd()` IN NEW DEVELOPMENTS, INSTEAD DIRECTLY PASS ABSOLUTE PATHS TO INTERFACE. + `getcwd()` is DEPRECATED and will be removed in the next major version. + + Return the current working directory for this SFTP session, as emulated by paramiko. If no directory has been set with chdir, this method will return None. But in __enter__ this is set explicitly, so this should never happen within this class. @@ -1218,17 +1226,18 @@ def listdir(self, path='.', pattern=None): :param pattern: returns the list of files matching pattern. Unix only. (Use to emulate ``ls *`` for example) """ - if not pattern: - return self.sftp.listdir(path) if path.startswith('/'): - base_dir = path + abs_dir = path else: - base_dir = os.path.join(self.getcwd(), path) + abs_dir = os.path.join(self.getcwd(), path) - filtered_list = self.glob(os.path.join(base_dir, pattern)) - if not base_dir.endswith('/'): - base_dir += '/' - return [re.sub(base_dir, '', i) for i in filtered_list] + if not pattern: + return self.sftp.listdir(abs_dir) + + filtered_list = self.glob(os.path.join(abs_dir, pattern)) + if not abs_dir.endswith('/'): + abs_dir += '/' + return [re.sub(abs_dir, '', i) for i in filtered_list] def remove(self, path): """Remove a single file at 'path'""" @@ -1276,11 +1285,9 @@ def isfile(self, path): return False raise # Typically if I don't have permissions (errno=13) - def _exec_command_internal(self, command, combine_stderr=False, bufsize=-1): + def _exec_command_internal(self, command, combine_stderr=False, bufsize=-1, workdir=None): """Executes the specified command in bash login shell. - Before the command is executed, changes directory to the current - working directory as returned by self.getcwd(). For executing commands and waiting for them to finish, use exec_command_wait. @@ -1291,6 +1298,10 @@ def _exec_command_internal(self, command, combine_stderr=False, bufsize=-1): stderr on the same buffer (i.e., stdout). Note: If combine_stderr is True, stderr will always be empty. :param bufsize: same meaning of the one used by paramiko. + :param workdir: (optional, default=None) if set, the command will be executed + in the specified working directory. + if None, the command will be executed in the current working directory, + from DEPRECATED `self.getcwd()`, if that has a value. :return: a tuple with (stdin, stdout, stderr, channel), where stdin, stdout and stderr behave as file-like objects, @@ -1300,8 +1311,10 @@ def _exec_command_internal(self, command, combine_stderr=False, bufsize=-1): channel = self.sshclient.get_transport().open_session() channel.set_combine_stderr(combine_stderr) - if self.getcwd() is not None: - escaped_folder = escape_for_bash(self.getcwd()) + if workdir is not None: + command_to_execute = f'cd {workdir} && ( {command} )' + elif (cwd := self.getcwd()) is not None: + escaped_folder = escape_for_bash(cwd) command_to_execute = f'cd {escaped_folder} && ( {command} )' else: command_to_execute = command @@ -1320,7 +1333,9 @@ def _exec_command_internal(self, command, combine_stderr=False, bufsize=-1): return stdin, stdout, stderr, channel - def exec_command_wait_bytes(self, command, stdin=None, combine_stderr=False, bufsize=-1, timeout=0.01): + def exec_command_wait_bytes( + self, command, stdin=None, combine_stderr=False, bufsize=-1, timeout=0.01, workdir=None + ): """Executes the specified command and waits for it to finish. :param command: the command to execute @@ -1330,6 +1345,8 @@ def exec_command_wait_bytes(self, command, stdin=None, combine_stderr=False, buf self._exec_command_internal() :param bufsize: same meaning of paramiko. :param timeout: ssh channel timeout for stdout, stderr. + :param workdir: (optional, default=None) if set, the command will be executed + in the specified working directory. :return: a tuple with (return_value, stdout, stderr) where stdout and stderr are both bytes and the return_value is an int. @@ -1337,7 +1354,9 @@ def exec_command_wait_bytes(self, command, stdin=None, combine_stderr=False, buf import socket import time - ssh_stdin, stdout, stderr, channel = self._exec_command_internal(command, combine_stderr, bufsize=bufsize) + ssh_stdin, stdout, stderr, channel = self._exec_command_internal( + command, combine_stderr, bufsize=bufsize, workdir=workdir + ) if stdin is not None: if isinstance(stdin, str): diff --git a/src/aiida/transports/transport.py b/src/aiida/transports/transport.py index b144a02485..a6d755973e 100644 --- a/src/aiida/transports/transport.py +++ b/src/aiida/transports/transport.py @@ -14,9 +14,11 @@ import re import sys from collections import OrderedDict +from pathlib import Path from aiida.common.exceptions import InternalError from aiida.common.lang import classproperty +from aiida.common.warnings import warn_deprecation __all__ = ('Transport',) @@ -245,13 +247,21 @@ def get_safe_open_interval(self): @abc.abstractmethod def chdir(self, path): - """Change directory to 'path' + """ + DEPRECATED: This method is deprecated and should be removed in the next major version. + PLEASE DON'T USE IT IN THE INTERFACE!! + Change directory to 'path'. :param str path: path to change working directory into. :raises: OSError, if the requested path does not exist :rtype: str """ + warn_deprecation( + '`chdir()` is deprecated and will be removed in the next major version.', + version=3, + ) + @abc.abstractmethod def chmod(self, path, mode): """Change permissions of a path. @@ -363,35 +373,38 @@ def copy_from_remote_to_remote(self, transportdestination, remotesource, remoted transportdestination.put(os.path.join(sandbox.abspath, filename), remotedestination, **kwargs_put) @abc.abstractmethod - def _exec_command_internal(self, command, **kwargs): + def _exec_command_internal(self, command, workdir=None, **kwargs): """Execute the command on the shell, similarly to os.system. - Enforce the execution to be run from the cwd (as given by - self.getcwd), if this is not None. + Enforce the execution to be run from `workdir`. If possible, use the higher-level exec_command_wait function. :param str command: execute the command given as a string + :param workdir: (optional, default=None) if set, the command will be executed + in the specified working directory. :return: stdin, stdout, stderr and the session, when this exists \ (can be None). """ @abc.abstractmethod - def exec_command_wait_bytes(self, command, stdin=None, **kwargs): + def exec_command_wait_bytes(self, command, stdin=None, workdir=None, **kwargs): """Execute the command on the shell, waits for it to finish, and return the retcode, the stdout and the stderr as bytes. - Enforce the execution to be run from the pwd (as given by self.getcwd), if this is not None. + Enforce the execution to be run from workdir, if this is not None. The command implementation can have some additional plugin-specific kwargs. :param str command: execute the command given as a string :param stdin: (optional,default=None) can be a string or a file-like object. + :param workdir: (optional, default=None) if set, the command will be executed + in the specified working directory. :return: a tuple: the retcode (int), stdout (bytes) and stderr (bytes). """ - def exec_command_wait(self, command, stdin=None, encoding='utf-8', **kwargs): + def exec_command_wait(self, command, stdin=None, encoding='utf-8', workdir=None, **kwargs): """Executes the specified command and waits for it to finish. :note: this function also decodes the bytes received into a string with the specified encoding, @@ -406,11 +419,15 @@ def exec_command_wait(self, command, stdin=None, encoding='utf-8', **kwargs): :param command: the command to execute :param stdin: (optional,default=None) can be a string or a file-like object. :param encoding: the encoding to use to decode the byte stream received from the remote command execution. + :param workdir: (optional, default=None) if set, the command will be executed + in the specified working directory. :return: a tuple with (return_value, stdout, stderr) where stdout and stderr are both strings, decoded with the specified encoding. """ - retval, stdout_bytes, stderr_bytes = self.exec_command_wait_bytes(command=command, stdin=stdin, **kwargs) + retval, stdout_bytes, stderr_bytes = self.exec_command_wait_bytes( + command=command, stdin=stdin, workdir=workdir, **kwargs + ) # Return the decoded strings return (retval, stdout_bytes.decode(encoding), stderr_bytes.decode(encoding)) @@ -443,11 +460,19 @@ def gettree(self, remotepath, localpath, *args, **kwargs): @abc.abstractmethod def getcwd(self): - """Get working directory + """ + DEPRECATED: This method is deprecated and should be removed in the next major version. + PLEASE DON'T USE IT IN THE INTERFACE!! + Get working directory :return: a string identifying the current working directory """ + warn_deprecation( + '`getcwd()` is deprecated and will be removed in the next major version.', + version=3, + ) + @abc.abstractmethod def get_attribute(self, path): """Return an object FixedFieldsAttributeDict for file in a given path, @@ -514,6 +539,8 @@ def listdir_withattributes(self, path='.', pattern=None): entries '.' and '..' even if they are present in the directory. :param str path: path to list (default to '.') + if using a relative path, it is relative to the current working directory, + taken from DEPRECATED `self.getcwd()`. :param str pattern: if used, listdir returns a list of files matching filters in Unix style. Unix only. :return: a list of dictionaries, one per entry. @@ -531,9 +558,17 @@ def listdir_withattributes(self, path='.', pattern=None): transport.get_attribute(); isdir is a boolean indicating if the object is a directory or not. """ retlist = [] - full_path = self.getcwd() - for file_name in self.listdir(): - filepath = os.path.join(full_path, file_name) + if path.startswith('/'): + cwd = Path(path).resolve().as_posix() + else: + warn_deprecation( + 'Using relative paths in `listdir_withattributes` is no longer supported ' + 'and will be removed in the next major version.', + version=3, + ) + cwd = self.getcwd() + for file_name in self.listdir(cwd): + filepath = os.path.join(cwd, file_name) attributes = self.get_attribute(filepath) retlist.append({'name': file_name, 'attributes': attributes, 'isdir': self.isdir(filepath)}) return retlist @@ -689,7 +724,18 @@ def glob(self, pathname): """Return a list of paths matching a pathname pattern. The pattern may contain simple shell-style wildcards a la fnmatch. + + :param str pathname: the pathname pattern to match. + It should only be absolute path. + DEPRECATED: using relative path is deprecated. + :return: a list of paths matching the pattern. """ + if not pathname.startswith('/'): + warn_deprecation( + 'Using relative paths across transport in `glob` is deprecated ' + 'and will be removed in the next major version.', + version=3, + ) return list(self.iglob(pathname)) def iglob(self, pathname): @@ -757,6 +803,7 @@ def glob0(self, dirname, basename): return [] def has_magic(self, string): + """Return True if the given string contains any special shell characters.""" return self._MAGIC_CHECK.search(string) is not None def _gotocomputer_string(self, remotedir): diff --git a/tests/conftest.py b/tests/conftest.py index 1dcb644b21..2166cc06c0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -202,6 +202,36 @@ def _generate_work_chain(entry_point, inputs=None): return _generate_work_chain +@pytest.fixture +def generate_calcjob_node(): + """Generate an instance of a `CalcJobNode`.""" + from aiida.engine import ProcessState + + def _generate_calcjob_node( + process_state: ProcessState = ProcessState.FINISHED, + exit_status: int | None = None, + entry_point: str | None = None, + workdir: pathlib.Path | None = None, + ): + """Generate an instance of a `CalcJobNode`.. + + :param process_state: state to set + :param exit_status: optional exit status, will be set to `0` if `process_state` is `ProcessState.FINISHED` + :return: a `CalcJobNode` instance. + """ + from aiida.orm import CalcJobNode + + if process_state is ProcessState.FINISHED and exit_status is None: + exit_status = 0 + + calcjob_node = CalcJobNode(process_type=entry_point) + calcjob_node.set_remote_workdir(workdir) + + return calcjob_node + + return _generate_calcjob_node + + @pytest.fixture def generate_calculation_node(): """Generate an instance of a `CalculationNode`.""" diff --git a/tests/engine/daemon/test_execmanager.py b/tests/engine/daemon/test_execmanager.py index 5b34f099a7..79692b689b 100644 --- a/tests/engine/daemon/test_execmanager.py +++ b/tests/engine/daemon/test_execmanager.py @@ -111,7 +111,7 @@ def test_hierarchy_utility(file_hierarchy, tmp_path, create_file_hierarchy, seri ) def test_retrieve_files_from_list( tmp_path_factory, - generate_calculation_node, + generate_calcjob_node, file_hierarchy, retrieve_list, expected_hierarchy, @@ -125,8 +125,7 @@ def test_retrieve_files_from_list( create_file_hierarchy(file_hierarchy, source) with LocalTransport() as transport: - node = generate_calculation_node() - transport.chdir(source) + node = generate_calcjob_node(workdir=source) execmanager.retrieve_files_from_list(node, transport, target, retrieve_list) assert serialize_file_hierarchy(target, read_bytes=False) == expected_hierarchy diff --git a/tests/transports/test_all_plugins.py b/tests/transports/test_all_plugins.py index c536b196a2..0a25add0a7 100644 --- a/tests/transports/test_all_plugins.py +++ b/tests/transports/test_all_plugins.py @@ -1305,8 +1305,7 @@ def test_asynchronous_execution(custom_transport): tmpf.write(b'#!/bin/bash\nsleep 10\n') tmpf.flush() - transport.chdir('/tmp') - transport.putfile(tmpf.name, script_fname) + transport.putfile(tmpf.name, os.path.join('/tmp', script_fname)) timestamp_before = time.time() job_id_string = scheduler.submit_job('/tmp', script_fname)