diff --git a/CHANGELOG.md b/CHANGELOG.md index 914d8616a..f994a4d05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - A bug where the filenames in iterative workflows kept appending `.out`, `.partial`, or `.expanded` to the filenames stored in the `merlin_info/` subdirectory +- A bug where a skewed sample hierarchy was created when a restart was necessary in the `add_merlin_expanded_chain_to_chord` task ## [1.10.3] ### Added diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py index f1d06077a..fbd401826 100644 --- a/merlin/common/tasks.py +++ b/merlin/common/tasks.py @@ -298,27 +298,33 @@ def add_merlin_expanded_chain_to_chord( # pylint: disable=R0913,R0914 LOG.debug("chain added to chord") else: # recurse down the sample_index hierarchy - LOG.debug("recursing down sample_index hierarchy") - for next_index in sample_index.children.values(): - next_index.name = os.path.join(sample_index.name, next_index.name) - LOG.debug("generating next step") - next_step = add_merlin_expanded_chain_to_chord.s( - task_type, - chain_, - samples[next_index.min - min_sample_id : next_index.max - min_sample_id], - labels, - next_index, - adapter_config, - next_index.min, - ) - next_step.set(queue=chain_[0].get_task_queue()) - LOG.debug(f"recursing with range {next_index.min}:{next_index.max}, {next_index.name} {signature(next_step)}") - LOG.debug(f"queuing samples[{next_index.min}:{next_index.max}] in for {chain_} in {next_index.name}...") - if self.request.is_eager: - next_step.delay() - else: - self.add_to_chord(next_step, lazy=False) - LOG.debug(f"queued for samples[{next_index.min}:{next_index.max}] in for {chain_} in {next_index.name}") + try: + LOG.debug("recursing down sample_index hierarchy") + for next_index in sample_index.children.values(): + next_index_name_before = next_index.name + next_index.name = os.path.join(sample_index.name, next_index.name) + LOG.debug("generating next step") + next_step = add_merlin_expanded_chain_to_chord.s( + task_type, + chain_, + samples[next_index.min - min_sample_id : next_index.max - min_sample_id], + labels, + next_index, + adapter_config, + next_index.min, + ) + next_step.set(queue=chain_[0].get_task_queue()) + LOG.debug(f"recursing with range {next_index.min}:{next_index.max}, {next_index.name} {signature(next_step)}") + LOG.debug(f"queuing samples[{next_index.min}:{next_index.max}] in for {chain_} in {next_index.name}...") + if self.request.is_eager: + next_step.delay() + else: + self.add_to_chord(next_step, lazy=False) + LOG.debug(f"queued for samples[{next_index.min}:{next_index.max}] in for {chain_} in {next_index.name}") + except retry_exceptions as e: + # Reset the index to what it was before so we don't accidentally create a bunch of extra samples upon restart + next_index.name = next_index_name_before + raise e return ReturnCode.OK