From b27dd33a41e27a99e9decc4d00be652daa3a5679 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 7 Sep 2023 15:58:21 -0700 Subject: [PATCH] prevent already scheduled tasks from being added to opentasks list (#457) --- .../models/DurableOrchestrationContext.py | 3 ++ .../test_sequential_orchestrator.py | 44 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 8ab6dcee..df101003 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -661,6 +661,9 @@ def history_to_string(event): def _add_to_open_tasks(self, task: TaskBase): + if task._is_scheduled: + return + if isinstance(task, AtomicTask): if task.id is None: task.id = self._sequence_number diff --git a/tests/orchestrator/test_sequential_orchestrator.py b/tests/orchestrator/test_sequential_orchestrator.py index 4cac7334..39413523 100644 --- a/tests/orchestrator/test_sequential_orchestrator.py +++ b/tests/orchestrator/test_sequential_orchestrator.py @@ -95,6 +95,25 @@ def generator_function_reducing_when_all(context): yield context.call_activity("Hello", "London") return "" + +def generator_function_reuse_task_in_whenany(context): + task1 = context.call_activity("Hello", "Tokyo") + task2 = context.call_activity("Hello", "Seattle") + pending_tasks = [task1, task2] + + # Yield until first task is completed + finished_task1 = yield context.task_any(pending_tasks) + + # Remove completed task from pending tasks + pending_tasks.remove(finished_task1) + + task3 = context.call_activity("Hello", "London") + tasks = pending_tasks + [task3] + + # Yield remaining tasks + yield context.task_any(tasks) + return "" + def generator_function_compound_tasks(context): yield context.call_activity("Hello", "Tokyo") @@ -731,6 +750,31 @@ def test_reducing_when_any_pattern(): assert_orchestration_state_equals(expected, result) +def test_reducing_when_any_pattern(): + """Tests that a user can call when_any on a progressively smaller list of already scheduled tasks""" + context_builder = ContextBuilder('generator_function_reuse_task_in_whenany', replay_schema=ReplaySchema.V2) + add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"") + add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"") + add_hello_completed_events(context_builder, 2, "\"Hello London!\"") + + result = get_orchestration_state_result( + context_builder, generator_function_reuse_task_in_whenany) + + # this scenario is only supported for V2 replay + expected_state = base_expected_state("",replay_schema=ReplaySchema.V2) + expected_state._actions = [ + [WhenAnyAction( + [CallActivityAction("Hello", "Seattle"), CallActivityAction("Hello", "Tokyo")]), + WhenAnyAction( + [CallActivityAction("Hello", "London")]) + ] + ] + + expected_state._is_done = True + expected = expected_state.to_json() + + assert_orchestration_state_equals(expected, result) + def test_compound_tasks_return_single_action_in_V2(): """Tests that compound tasks, in the v2 replay schema, are represented as a single "deep" action""" context_builder = ContextBuilder('test_v2_replay_schema', replay_schema=ReplaySchema.V2)