From bf44b1ec624a52364e4d05c9f38c718ae1f1b371 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Mon, 9 Oct 2023 10:34:10 -0700 Subject: [PATCH 1/3] add patch for skewed sample hierarchy/additional samples --- merlin/common/tasks.py | 48 ++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py index f1d06077a..fcd4fef9d 100644 --- a/merlin/common/tasks.py +++ b/merlin/common/tasks.py @@ -298,27 +298,33 @@ def add_merlin_expanded_chain_to_chord( # pylint: disable=R0913,R0914 LOG.debug("chain added to chord") else: # recurse down the sample_index hierarchy - LOG.debug("recursing down sample_index hierarchy") - for next_index in sample_index.children.values(): - next_index.name = os.path.join(sample_index.name, next_index.name) - LOG.debug("generating next step") - next_step = add_merlin_expanded_chain_to_chord.s( - task_type, - chain_, - samples[next_index.min - min_sample_id : next_index.max - min_sample_id], - labels, - next_index, - adapter_config, - next_index.min, - ) - next_step.set(queue=chain_[0].get_task_queue()) - LOG.debug(f"recursing with range {next_index.min}:{next_index.max}, {next_index.name} {signature(next_step)}") - LOG.debug(f"queuing samples[{next_index.min}:{next_index.max}] in for {chain_} in {next_index.name}...") - if self.request.is_eager: - next_step.delay() - else: - self.add_to_chord(next_step, lazy=False) - LOG.debug(f"queued for samples[{next_index.min}:{next_index.max}] in for {chain_} in {next_index.name}") + try: + LOG.debug("recursing down sample_index hierarchy") + for next_index in sample_index.children.values(): + next_index_name_before = next_index.name + next_index.name = os.path.join(sample_index.name, next_index.name) + LOG.debug("generating next step") + next_step = add_merlin_expanded_chain_to_chord.s( + task_type, + chain_, + samples[next_index.min - min_sample_id : next_index.max - min_sample_id], + labels, + next_index, + adapter_config, + next_index.min, + ) + next_step.set(queue=chain_[0].get_task_queue()) + LOG.debug(f"recursing with range {next_index.min}:{next_index.max}, {next_index.name} {signature(next_step)}") + LOG.debug(f"queuing samples[{next_index.min}:{next_index.max}] in for {chain_} in {next_index.name}...") + if self.request.is_eager: + next_step.delay() + else: + self.add_to_chord(next_step, lazy=False) + LOG.debug(f"queued for samples[{next_index.min}:{next_index.max}] in for {chain_} in {next_index.name}") + except Exception as e: + # Reset the index to what it was before so we don't accidentally create a bunch of extra samples upon restart + next_index.name = next_index_name_before + raise e return ReturnCode.OK From 4ee6d2e9406a99384c3b473071e2afc21f1410bd Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Mon, 9 Oct 2023 10:51:17 -0700 Subject: [PATCH 2/3] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 914d8616a..f994a4d05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - A bug where the filenames in iterative workflows kept appending `.out`, `.partial`, or `.expanded` to the filenames stored in the `merlin_info/` subdirectory +- A bug where a skewed sample hierarchy was created when a restart was necessary in the `add_merlin_expanded_chain_to_chord` task ## [1.10.3] ### Added From 7276c21d9190f53d2e473e5540dd7f86b4d67360 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Mon, 9 Oct 2023 12:17:47 -0700 Subject: [PATCH 3/3] catch narrower range of exceptions --- merlin/common/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py index fcd4fef9d..fbd401826 100644 --- a/merlin/common/tasks.py +++ b/merlin/common/tasks.py @@ -321,7 +321,7 @@ def add_merlin_expanded_chain_to_chord( # pylint: disable=R0913,R0914 else: self.add_to_chord(next_step, lazy=False) LOG.debug(f"queued for samples[{next_index.min}:{next_index.max}] in for {chain_} in {next_index.name}") - except Exception as e: + except retry_exceptions as e: # Reset the index to what it was before so we don't accidentally create a bunch of extra samples upon restart next_index.name = next_index_name_before raise e