diff --git a/maestrowf/datastructures/core/executiongraph.py b/maestrowf/datastructures/core/executiongraph.py index 9ebdbf1e..0cc0dcbd 100644 --- a/maestrowf/datastructures/core/executiongraph.py +++ b/maestrowf/datastructures/core/executiongraph.py @@ -688,7 +688,7 @@ def write_status(self, path): def _check_study_completion(self): # We cancelled, return True marking study as complete. - if self.is_canceled: + if self.is_canceled and not self.in_progress: LOGGER.info("Cancelled -- completing study.") return StudyStatus.CANCELLED @@ -753,7 +753,7 @@ def execute_ready_steps(self): # For the status of each currently in progress job, check its # state. cleanup_steps = set() # Steps that are in progress showing failed. - + cancel_steps = set() # Steps that have dependencies to mark cancelled for name, status in job_status.items(): LOGGER.debug("Checking job '%s' with status %s.", name, status) record = self.values[name] @@ -841,12 +841,18 @@ def execute_ready_steps(self): LOGGER.info("Step '%s' was cancelled.", name) self.in_progress.remove(name) record.mark_end(State.CANCELLED) + cancel_steps.update(self.bfs_subtree(name)[0]) # Let's handle all the failed steps in one go. for node in cleanup_steps: self.failed_steps.add(node) self.values[node].mark_end(State.FAILED) + # Handle dependent steps that need cancelling + for node in cancel_steps: + self.cancelled_steps.add(node) + self.values[node].mark_end(State.CANCELLED) + # Now that we've checked the statuses of existing jobs we need to make # sure dependencies haven't been met. for key in self.values.keys(): @@ -872,9 +878,9 @@ def execute_ready_steps(self): "Unfulfilled dependencies: %s", self._dependencies[key]) - s_completed = filter( + s_completed = list(filter( lambda x: x in self.completed_steps, - self._dependencies[key]) + self._dependencies[key])) self._dependencies[key] = \ self._dependencies[key] - set(s_completed) LOGGER.debug( diff --git a/maestrowf/interfaces/script/_flux/flux0_26_0.py b/maestrowf/interfaces/script/_flux/flux0_26_0.py index 1ebb37c5..f0188e2a 100644 --- a/maestrowf/interfaces/script/_flux/flux0_26_0.py +++ b/maestrowf/interfaces/script/_flux/flux0_26_0.py @@ -232,9 +232,15 @@ def cancel(cls, joblist): "\n".join(str(j) for j in joblist), ) + # NOTE: cannot pickle JobID instances, so must store as strings and + # reconstruct for use + jobs_rpc = flux.job.list.JobList( + cls.flux_handle, + ids=[flux.job.JobID(jid) for jid in joblist]) + cancel_code = CancelCode.OK cancel_rcode = 0 - for job in joblist: + for job in jobs_rpc: try: LOGGER.debug("Cancelling Job %s...", job) flux.job.cancel(cls.flux_handle, int(job)) diff --git a/maestrowf/interfaces/script/_flux/flux0_49_0.py b/maestrowf/interfaces/script/_flux/flux0_49_0.py index 4e870170..869c62bc 100644 --- a/maestrowf/interfaces/script/_flux/flux0_49_0.py +++ b/maestrowf/interfaces/script/_flux/flux0_49_0.py @@ -20,7 +20,7 @@ class FluxInterface_0490(FluxInterface): - # This utility class is for Flux 0.26.0 + # This utility class is for Flux 0.49.0 key = "0.49.0" flux_handle = None @@ -242,14 +242,20 @@ def cancel(cls, joblist): "\n".join(str(j) for j in joblist), ) + # NOTE: cannot pickle JobID instances, so must store as strings and + # reconstruct for use + jobs_rpc = flux.job.list.JobList( + cls.flux_handle, + ids=[flux.job.JobID(jid) for jid in joblist]) + cancel_code = CancelCode.OK cancel_rcode = 0 - for job in joblist: + for job in jobs_rpc.jobs(): try: - LOGGER.debug("Cancelling Job %s...", job) - flux.job.cancel(cls.flux_handle, int(job)) + LOGGER.debug("Cancelling Job %s...", str(job.id.f58)) + flux.job.cancel(cls.flux_handle, int(job.id)) except Exception as exception: - LOGGER.error(str(exception)) + LOGGER.error("Job %s: %s", str(job.id.f58), str(exception)) cancel_code = CancelCode.ERROR cancel_rcode = 1 diff --git a/pyproject.toml b/pyproject.toml index 0dfc0924..710dda7f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool] [tool.poetry] name = "maestrowf" -version = "1.1.10dev4" +version = "1.1.10dev5" description = "A tool to easily orchestrate general computational workflows both locally and on supercomputers." license = "MIT License" classifiers = [