diff --git a/lib/ramble/ramble/application.py b/lib/ramble/ramble/application.py index 7e48d6919..c2e1a2ed8 100644 --- a/lib/ramble/ramble/application.py +++ b/lib/ramble/ramble/application.py @@ -47,6 +47,7 @@ import ramble.util.stats import ramble.util.graph import ramble.util.class_attributes +import ramble.util.lock as lk from ramble.util.logger import logger from ramble.util.shell_utils import source_str from ramble.util.naming import NS_SEPARATOR @@ -148,6 +149,9 @@ def __init__(self, file_path): self._pipeline_graphs = None self.package_manager = None self.custom_executables = {} + self._exp_lock = None + self._input_lock = None + self._software_lock = None self.hash_inventory = { "application_definition": None, @@ -171,6 +175,17 @@ def __init__(self, file_path): ramble.util.directives.define_directive_methods(self) + def experiment_lock(self): + """Create a lock for the experiment directory, and return it""" + if self._exp_lock is None: + lock_path = os.path.join( + self.expander.expand_var("{experiment_run_dir}"), ".ramble-experiment" + ) + + self._exp_lock = lk.Lock(lock_path) + + return self._exp_lock + def copy(self): """Deep copy an application instance""" new_copy = type(self)(self._file_path) @@ -1231,23 +1246,25 @@ def _mirror_inputs(self, workspace, app_inst=None): Perform mirroring of inputs within this application class. """ + mirror_lock = lk.Lock(os.path.join(workspace.input_mirror_path, ".ramble-mirror")) self._inputs_and_fetchers(self.expander.workload_name) - for input_file, input_conf in self._input_fetchers.items(): - mirror_paths = ramble.mirror.mirror_archive_paths( - input_conf["fetcher"], os.path.join(self.name, input_file) - ) - fetch_dir = os.path.join(workspace.input_mirror_path, self.name) - fs.mkdirp(fetch_dir) - stage = ramble.stage.InputStage( - input_conf["fetcher"], - name=input_conf["namespace"], - path=fetch_dir, - mirror_paths=mirror_paths, - lock=False, - ) + with lk.WriteTransaction(mirror_lock): + for input_file, input_conf in self._input_fetchers.items(): + mirror_paths = ramble.mirror.mirror_archive_paths( + input_conf["fetcher"], os.path.join(self.name, input_file) + ) + fetch_dir = os.path.join(workspace.input_mirror_path, self.name) + fs.mkdirp(fetch_dir) + stage = ramble.stage.InputStage( + input_conf["fetcher"], + name=input_conf["namespace"], + path=fetch_dir, + mirror_paths=mirror_paths, + lock=False, + ) - stage.cache_mirror(workspace.input_mirror_cache, workspace.input_mirror_stats) + stage.cache_mirror(workspace.input_mirror_cache, workspace.input_mirror_stats) register_phase("get_inputs", pipeline="setup") @@ -1280,23 +1297,26 @@ def _get_inputs(self, workspace, app_inst=None): input_dir = os.path.dirname(input_path) input_base = os.path.basename(input_path) - with ramble.stage.InputStage( - input_conf["fetcher"], - name=input_namespace, - path=input_dir, - mirror_paths=mirror_paths, - ) as stage: - stage.set_subdir(input_base) - stage.fetch() - if input_conf["fetcher"].digest: - stage.check() - stage.cache_local() - - if input_conf["expand"]: - try: - stage.expand_archive() - except spack.util.executable.ProcessError: - pass + input_lock = lk.Lock(os.path.join(input_dir, ".ramble-input")) + + with lk.WriteTransaction(input_lock): + with ramble.stage.InputStage( + input_conf["fetcher"], + name=input_namespace, + path=input_dir, + mirror_paths=mirror_paths, + ) as stage: + stage.set_subdir(input_base) + stage.fetch() + if input_conf["fetcher"].digest: + stage.check() + stage.cache_local() + + if input_conf["expand"]: + try: + stage.expand_archive() + except spack.util.executable.ProcessError: + pass workspace.add_to_cache(input_tuple) else: @@ -1326,10 +1346,12 @@ def _license_includes(self, workspace, app_inst=None): for action, conf in app_licenses.items(): (env_cmds, var_set) = action_funcs[action](conf, var_set, shell=shell) - with open(self.license_file, "w+") as f: - for cmd in env_cmds: - if cmd: - f.write(cmd + "\n") + lock = lk.Lock(os.path.join(self.license_path, ".ramble-license")) + with lk.WriteTransaction(lock): + with open(self.license_file, "w+") as f: + for cmd in env_cmds: + if cmd: + f.write(cmd + "\n") register_phase("make_experiments", pipeline="setup", run_after=["get_inputs"]) @@ -1344,24 +1366,30 @@ def _make_experiments(self, workspace, app_inst=None): _check_shell_support(self) - experiment_run_dir = self.expander.experiment_run_dir - fs.mkdirp(experiment_run_dir) + exp_lock = self.experiment_lock() - exec_vars = {} + with lk.WriteTransaction(exp_lock): + experiment_run_dir = self.expander.experiment_run_dir + fs.mkdirp(experiment_run_dir) - for mod in self._modifier_instances: - exec_vars.update(mod.modded_variables(self)) + exec_vars = {} - for template_name, template_conf in workspace.all_templates(): - expand_path = os.path.join(experiment_run_dir, template_name) - logger.msg(f"Writing template {template_name} to {expand_path}") + for mod in self._modifier_instances: + exec_vars.update(mod.modded_variables(self)) + + for template_name, template_conf in workspace.all_templates(): + expand_path = os.path.join(experiment_run_dir, template_name) + logger.msg(f"Writing template {template_name} to {expand_path}") + + with open(expand_path, "w+") as f: + f.write( + self.expander.expand_var(template_conf["contents"], extra_vars=exec_vars) + ) + os.chmod(expand_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH | stat.S_IXOTH) - with open(expand_path, "w+") as f: - f.write(self.expander.expand_var(template_conf["contents"], extra_vars=exec_vars)) - os.chmod(expand_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH | stat.S_IXOTH) + experiment_script = workspace.experiments_script + experiment_script.write(self.expander.expand_var("{batch_submit}\n")) - experiment_script = workspace.experiments_script - experiment_script.write(self.expander.expand_var("{batch_submit}\n")) self.set_status(status=experiment_status.SETUP) def _clean_hash_variables(self, workspace, variables): @@ -1480,8 +1508,9 @@ def _write_inventory(self, workspace, app_inst=None): experiment_run_dir = self.expander.experiment_run_dir inventory_file = os.path.join(experiment_run_dir, self._inventory_file_name) - with open(inventory_file, "w+") as f: - spack.util.spack_json.dump(self.hash_inventory, f) + with lk.WriteTransaction(self.experiment_lock()): + with open(inventory_file, "w+") as f: + spack.util.spack_json.dump(self.hash_inventory, f) register_phase("archive_experiments", pipeline="archive") @@ -1503,40 +1532,43 @@ def _archive_experiments(self, workspace, app_inst=None): fs.mkdirp(archive_experiment_dir) - # Copy all of the templates to the archive directory - for template_name, _ in workspace.all_templates(): - src = os.path.join(experiment_run_dir, template_name) - if os.path.exists(src): - shutil.copy(src, archive_experiment_dir) + archive_lock = lk.Lock(os.path.join(archive_experiment_dir, ".ramble-exp-archive")) - # Copy all figure of merit files - criteria_list = workspace.success_list - analysis_files, _, _ = self._analysis_dicts(criteria_list) - for file, file_conf in analysis_files.items(): - if os.path.exists(file): - shutil.copy(file, archive_experiment_dir) + with lk.WriteTransaction(archive_lock): + # Copy all of the templates to the archive directory + for template_name, _ in workspace.all_templates(): + src = os.path.join(experiment_run_dir, template_name) + if os.path.exists(src): + shutil.copy(src, archive_experiment_dir) - # Copy all archive patterns - archive_patterns = set(self.archive_patterns.keys()) - if self.package_manager: - for pattern in self.package_manager.archive_patterns.keys(): - archive_patterns.add(pattern) + # Copy all figure of merit files + criteria_list = workspace.success_list + analysis_files, _, _ = self._analysis_dicts(criteria_list) + for file, file_conf in analysis_files.items(): + if os.path.exists(file): + shutil.copy(file, archive_experiment_dir) - for mod in self._modifier_instances: - for pattern in mod.archive_patterns.keys(): - archive_patterns.add(pattern) + # Copy all archive patterns + archive_patterns = set(self.archive_patterns.keys()) + if self.package_manager: + for pattern in self.package_manager.archive_patterns.keys(): + archive_patterns.add(pattern) + + for mod in self._modifier_instances: + for pattern in mod.archive_patterns.keys(): + archive_patterns.add(pattern) - for pattern in archive_patterns: - exp_pattern = self.expander.expand_var(pattern) - for file in glob.glob(exp_pattern): - dest_dir = os.path.dirname(file.replace(workspace.root, ws_archive_dir)) - fs.mkdirp(dest_dir) - shutil.copy(file, dest_dir) + for pattern in archive_patterns: + exp_pattern = self.expander.expand_var(pattern) + for file in glob.glob(exp_pattern): + dest_dir = os.path.dirname(file.replace(workspace.root, ws_archive_dir)) + fs.mkdirp(dest_dir) + shutil.copy(file, dest_dir) - for file_name in [self._inventory_file_name, self._status_file_name]: - file = os.path.join(experiment_run_dir, file_name) - if os.path.exists(file): - shutil.copy(file, archive_experiment_dir) + for file_name in [self._inventory_file_name, self._status_file_name]: + file = os.path.join(experiment_run_dir, file_name) + if os.path.exists(file): + shutil.copy(file, archive_experiment_dir) register_phase("prepare_analysis", pipeline="analyze") @@ -1594,84 +1626,89 @@ def format_context(context_match, context_format): files, contexts, foms = self._analysis_dicts(criteria_list) + exp_lock = self.experiment_lock() + # Iterate over files. We already know they exist - for file, file_conf in files.items(): + with lk.ReadTransaction(exp_lock): + for file, file_conf in files.items(): - # Start with no active contexts in a file. - active_contexts = {} - logger.debug(f"Reading log file: {file}") + # Start with no active contexts in a file. + active_contexts = {} + logger.debug(f"Reading log file: {file}") - if not os.path.exists(file): - logger.debug(f"Skipping analysis of non-existent file: {file}") - continue - - per_file_crit_objs = [ - criteria_list.find_criteria(c) for c in file_conf["success_criteria"] - ] + if not os.path.exists(file): + logger.debug(f"Skipping analysis of non-existent file: {file}") + continue - with open(file) as f: - for line in f.readlines(): - logger.debug(f"Line: {line}") - new_per_file_crit_objs = [] - for crit_obj in per_file_crit_objs: - logger.debug(f"Looking for criteria {crit_obj.name}") - if crit_obj.passed(line, self): - crit_obj.mark_found() - elif crit_obj.anti_matched(line): - crit_obj.mark_anti_found() - else: - new_per_file_crit_objs.append(crit_obj) - per_file_crit_objs = new_per_file_crit_objs - - for context in file_conf["contexts"]: - context_conf = contexts[context] - context_match = context_conf["regex"].match(line) - - if context_match: - context_name = format_context(context_match, context_conf["format"]) - logger.debug("Line was: %s" % line) - logger.debug(f" Context match {context} -- {context_name}") - - active_contexts[context] = context_name - - if context_name not in fom_values: - fom_values[context_name] = {} - - for fom in file_conf["foms"]: - logger.debug(f" Testing for fom {fom}") - fom_conf = foms[fom] - fom_match = fom_conf["regex"].match(line) - - if fom_match: - fom_vars = {} - for k, v in fom_match.groupdict().items(): - fom_vars[k] = v - fom_name = self.expander.expand_var(fom, extra_vars=fom_vars) - - if fom_conf["group"] in fom_conf["regex"].groupindex: - logger.debug(" --- Matched fom %s" % fom_name) - fom_contexts = [] - if fom_conf["contexts"]: - for context in fom_conf["contexts"]: - context_name = ( - active_contexts[context] - if context in active_contexts - else _NULL_CONTEXT - ) - fom_contexts.append(context_name) - else: - fom_contexts.append(_NULL_CONTEXT) - - for context in fom_contexts: - if context not in fom_values: - fom_values[context] = {} - fom_val = fom_match.group(fom_conf["group"]) - fom_values[context][fom_name] = { - "value": fom_val, - "units": fom_conf["units"], - "origin": fom_conf["origin"], - "origin_type": fom_conf["origin_type"], - } + per_file_crit_objs = [ + criteria_list.find_criteria(c) for c in file_conf["success_criteria"] + ] + + with open(file) as f: + for line in f.readlines(): + logger.debug(f"Line: {line}") + new_per_file_crit_objs = [] + for crit_obj in per_file_crit_objs: + logger.debug(f"Looking for criteria {crit_obj.name}") + if crit_obj.passed(line, self): + crit_obj.mark_found() + elif crit_obj.anti_matched(line): + crit_obj.mark_anti_found() + else: + new_per_file_crit_objs.append(crit_obj) + per_file_crit_objs = new_per_file_crit_objs + + for context in file_conf["contexts"]: + context_conf = contexts[context] + context_match = context_conf["regex"].match(line) + + if context_match: + context_name = format_context( + context_match, context_conf["format"] + ) + logger.debug("Line was: %s" % line) + logger.debug(f" Context match {context} -- {context_name}") + + active_contexts[context] = context_name + + if context_name not in fom_values: + fom_values[context_name] = {} + + for fom in file_conf["foms"]: + logger.debug(f" Testing for fom {fom}") + fom_conf = foms[fom] + fom_match = fom_conf["regex"].match(line) + + if fom_match: + fom_vars = {} + for k, v in fom_match.groupdict().items(): + fom_vars[k] = v + fom_name = self.expander.expand_var(fom, extra_vars=fom_vars) + + if fom_conf["group"] in fom_conf["regex"].groupindex: + logger.debug(" --- Matched fom %s" % fom_name) + fom_contexts = [] + if fom_conf["contexts"]: + for context in fom_conf["contexts"]: + context_name = ( + active_contexts[context] + if context in active_contexts + else _NULL_CONTEXT + ) + fom_contexts.append(context_name) + else: + fom_contexts.append(_NULL_CONTEXT) + + for context in fom_contexts: + if context not in fom_values: + fom_values[context] = {} + fom_val = fom_match.group(fom_conf["group"]) + fom_values[context][fom_name] = { + "value": fom_val, + "units": fom_conf["units"], + "origin": fom_conf["origin"], + "origin_type": fom_conf["origin_type"], + } # Test all non-file based success criteria for criteria_obj in criteria_list.all_criteria(): @@ -2057,11 +2094,13 @@ def read_status(self): ) if os.path.isfile(status_path): - with open(status_path) as f: - status_data = spack.util.spack_json.load(f) - self.variables[self.keywords.experiment_status] = status_data[ - self.keywords.experiment_status - ] + exp_lock = self.experiment_lock() + with lk.ReadTransaction(exp_lock): + with open(status_path) as f: + status_data = spack.util.spack_json.load(f) + self.variables[self.keywords.experiment_status] = status_data[ + self.keywords.experiment_status + ] else: self.set_status(experiment_status.UNKNOWN) @@ -2089,8 +2128,10 @@ def _write_status(self, workspace, app_inst=None): status_path = os.path.join(exp_dir, self._status_file_name) if os.path.exists(exp_dir): - with open(status_path, "w+") as f: - spack.util.spack_json.dump(status_data, f) + exp_lock = self.experiment_lock() + with lk.ReadTransaction(exp_lock): + with open(status_path, "w+") as f: + spack.util.spack_json.dump(status_data, f) register_phase("deploy_artifacts", pipeline="pushdeployment") @@ -2106,15 +2147,18 @@ def _copy_files(obj_inst, obj_type, repo_root): repo_path = os.path.join(workspace.named_deployment, "object_repo") - _copy_files(self, ramble.repository.ObjectTypes.applications, repo_path) + repo_lock = lk.Lock(repo_path) - for mod_inst in self._modifier_instances: - _copy_files(mod_inst, ramble.repository.ObjectTypes.modifiers, repo_path) + with lk.WriteTransaction(repo_lock): + _copy_files(self, ramble.repository.ObjectTypes.applications, repo_path) - if self.package_manager is not None: - _copy_files( - self.package_manager, ramble.repository.ObjectTypes.package_managers, repo_path - ) + for mod_inst in self._modifier_instances: + _copy_files(mod_inst, ramble.repository.ObjectTypes.modifiers, repo_path) + + if self.package_manager is not None: + _copy_files( + self.package_manager, ramble.repository.ObjectTypes.package_managers, repo_path + ) register_builtin("env_vars", required=True) diff --git a/lib/ramble/ramble/workspace/workspace.py b/lib/ramble/ramble/workspace/workspace.py index 53ebfdaf6..7e0a31146 100644 --- a/lib/ramble/ramble/workspace/workspace.py +++ b/lib/ramble/ramble/workspace/workspace.py @@ -543,52 +543,54 @@ def _re_read(self): def _read(self): # Create the workspace config section - self.config_sections["workspace"] = { - "filename": self.config_file_path, - "path": self.config_file_path, - "schema": config_schema, - "section_filename": self.config_file_path, - "raw_yaml": None, - "yaml": None, - } + with lk.ReadTransaction(self.txlock): + self.config_sections["workspace"] = { + "filename": self.config_file_path, + "path": self.config_file_path, + "schema": config_schema, + "section_filename": self.config_file_path, + "raw_yaml": None, + "yaml": None, + } - keywords = ramble.keywords.keywords + keywords = ramble.keywords.keywords - read_default = not os.path.exists(self.config_file_path) - if read_default: - self._read_config(config_section, default_config_yaml()) - else: - with open(self.config_file_path) as f: - self._read_config(config_section, f) - - read_default_script = self.read_default_template - ext_len = len(workspace_template_extension) - if os.path.exists(self.config_dir): - for filename in os.listdir(self.config_dir): - if filename.endswith(workspace_template_extension): - read_default_script = False - template_name = filename[0:-ext_len] - template_path = os.path.join(self.config_dir, filename) - if keywords.is_reserved(template_name): - raise RambleInvalidTemplateNameError( - f"Template file {filename} results in a " - f"template name of {template_name}" + " which is reserved by ramble." - ) - - with open(template_path) as f: - self._read_template(template_name, f.read()) - - if os.path.exists(self.auxiliary_software_dir): - for filename in os.listdir(self.auxiliary_software_dir): - aux_file_path = os.path.join(self.auxiliary_software_dir, filename) - with open(aux_file_path) as f: - self._read_auxiliary_software_file(filename, f.read()) - - if read_default_script: - template_name = workspace_execution_template[0:-ext_len] - self._read_template(template_name, template_execute_script) - - self._read_all_application_configs() + read_default = not os.path.exists(self.config_file_path) + if read_default: + self._read_config(config_section, default_config_yaml()) + else: + with open(self.config_file_path) as f: + self._read_config(config_section, f) + + read_default_script = self.read_default_template + ext_len = len(workspace_template_extension) + if os.path.exists(self.config_dir): + for filename in os.listdir(self.config_dir): + if filename.endswith(workspace_template_extension): + read_default_script = False + template_name = filename[0:-ext_len] + template_path = os.path.join(self.config_dir, filename) + if keywords.is_reserved(template_name): + raise RambleInvalidTemplateNameError( + f"Template file {filename} results in a " + f"template name of {template_name}" + + " which is reserved by ramble." + ) + + with open(template_path) as f: + self._read_template(template_name, f.read()) + + if os.path.exists(self.auxiliary_software_dir): + for filename in os.listdir(self.auxiliary_software_dir): + aux_file_path = os.path.join(self.auxiliary_software_dir, filename) + with open(aux_file_path) as f: + self._read_auxiliary_software_file(filename, f.read()) + + if read_default_script: + template_name = workspace_execution_template[0:-ext_len] + self._read_template(template_name, template_execute_script) + + self._read_all_application_configs() def _read_all_application_configs(self): path_replacements = {"workspace": self.root, "workspace_config": self.config_dir} @@ -671,29 +673,32 @@ def _read_auxiliary_software_file(self, name, f): def write(self, software_dir=None, inputs_dir=None): """Write an in-memory workspace to its location on disk.""" - # Ensure required directory structure exists - fs.mkdirp(self.path) - fs.mkdirp(self.config_dir) - fs.mkdirp(self.auxiliary_software_dir) - fs.mkdirp(self.log_dir) - fs.mkdirp(self.experiment_dir) - - if inputs_dir: - os.symlink(os.path.abspath(inputs_dir), self.input_dir, target_is_directory=True) - elif not os.path.exists(self.input_dir): - fs.mkdirp(self.input_dir) - - if software_dir: - os.symlink(os.path.abspath(software_dir), self.software_dir, target_is_directory=True) - elif not os.path.exists(self.software_dir): - fs.mkdirp(self.software_dir) + with lk.WriteTransaction(self.txlock, acquire=self._re_read): + # Ensure required directory structure exists + fs.mkdirp(self.path) + fs.mkdirp(self.config_dir) + fs.mkdirp(self.auxiliary_software_dir) + fs.mkdirp(self.log_dir) + fs.mkdirp(self.experiment_dir) + + if inputs_dir: + os.symlink(os.path.abspath(inputs_dir), self.input_dir, target_is_directory=True) + elif not os.path.exists(self.input_dir): + fs.mkdirp(self.input_dir) + + if software_dir: + os.symlink( + os.path.abspath(software_dir), self.software_dir, target_is_directory=True + ) + elif not os.path.exists(self.software_dir): + fs.mkdirp(self.software_dir) - fs.mkdirp(self.shared_dir) - fs.mkdirp(self.shared_license_dir) + fs.mkdirp(self.shared_dir) + fs.mkdirp(self.shared_license_dir) - self._write_config(config_section) + self._write_config(config_section) - self._write_templates() + self._write_templates() def _write_config(self, section): """Update YAML config file for this workspace, based on @@ -1278,12 +1283,17 @@ def active(self): """True if this workspace is currently active.""" return _active_workspace and self.path == _active_workspace.path + @property + def internal_subdir(self): + """Subdirectory for housing ramble internals""" + return os.path.join(self.root, ".ramble-workspace") + @property def _transaction_lock_path(self): """The location of the lock file used to synchronize multiple processes updating the same workspace. """ - return os.path.join(self.root, "transaction_lock") + return os.path.join(self.internal_subdir, "transaction_lock") @property def experiment_dir(self):