From 911c47a6c5bbd7333dfdcd98795fdbc49fb0f696 Mon Sep 17 00:00:00 2001 From: Anoop Panicker Date: Wed, 3 Aug 2022 16:58:22 -0700 Subject: [PATCH 1/2] do not synchronously eval workflow under select criteria --- .../core/execution/WorkflowExecutor.java | 51 ++++++++++++-- .../netflix/conductor/metrics/Monitors.java | 4 ++ .../core/execution/TestWorkflowExecutor.java | 69 +++++++++++++++++++ .../test/integration/DecisionTaskSpec.groovy | 3 + .../test/integration/DoWhileSpec.groovy | 3 + .../integration/DynamicForkJoinSpec.groovy | 15 ++++ .../ExternalPayloadStorageSpec.groovy | 3 + .../test/integration/ForkJoinSpec.groovy | 23 ++++++- .../test/integration/SwitchTaskSpec.groovy | 3 + .../terminate_task_parent_workflow.json | 2 +- 10 files changed, 167 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index e38f050f51..fa82a05a26 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -34,11 +34,7 @@ import com.netflix.conductor.core.WorkflowContext; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.dal.ExecutionDAOFacade; -import com.netflix.conductor.core.exception.ConflictException; -import com.netflix.conductor.core.exception.NonTransientException; -import com.netflix.conductor.core.exception.NotFoundException; -import com.netflix.conductor.core.exception.TerminateWorkflowException; -import com.netflix.conductor.core.exception.TransientException; +import com.netflix.conductor.core.exception.*; import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry; import com.netflix.conductor.core.execution.tasks.Terminate; import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask; @@ -1193,7 +1189,50 @@ public void updateTask(TaskResult taskResult) { task.getTaskDefName(), lastDuration, false, task.getStatus()); } - _decide(workflowId); + // evaluate workflow only if the task is not within a forked branch + if (!isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) { + _decide(workflowId); + } + } + + /** + * Determines if a workflow can be lazily evaluated, if it meets any of these criteria + * + * + * + * @param workflowDef The workflow definition of the workflow for which evaluation decision is + * to be made + * @param task The task which is attempting to trigger the evaluation + * @return true if workflow can be lazily evaluated, false otherwise + */ + @VisibleForTesting + boolean isLazyEvaluateWorkflow(WorkflowDef workflowDef, TaskModel task) { + if (task.isLoopOverTask()) { + return false; + } + + String taskRefName = task.getReferenceTaskName(); + List workflowTasks = workflowDef.collectTasks(); + + List forkTasks = + workflowTasks.stream() + .filter(t -> t.getType().equals(TaskType.FORK_JOIN.name())) + .collect(Collectors.toList()); + + List joinTasks = + workflowTasks.stream() + .filter(t -> t.getType().equals(TaskType.JOIN.name())) + .collect(Collectors.toList()); + + if (forkTasks.stream().anyMatch(fork -> fork.has(taskRefName))) { + return joinTasks.stream().anyMatch(join -> join.getJoinOn().contains(taskRefName)); + } + + return workflowTasks.stream().noneMatch(t -> t.getTaskReferenceName().equals(taskRefName)); } public TaskModel getTask(String taskId) { diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index 0a1163ae54..a8d49ab858 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -168,6 +168,10 @@ public static void recordGauge(String name, long count) { gauge(classQualifier, name, count); } + public static void recordCounter(String name, long count, String... additionalTags) { + getCounter(classQualifier, name, additionalTags).increment(count); + } + public static void recordQueueWaitTime(String taskType, long queueWaitTime) { getTimer(classQualifier, "task_queue_wait", "taskType", taskType) .record(queueWaitTime, TimeUnit.MILLISECONDS); diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index 1a3800648b..55b336eac6 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -2446,6 +2446,75 @@ public void testUpdateTaskWithOutCallbackAfterSeconds() { argumentCaptor.getAllValues().get(0).getWorkflowInstanceId()); } + @Test + public void testIsLazyEvaluateWorkflow() { + // setup + WorkflowDef workflowDef = new WorkflowDef(); + workflowDef.setName("lazyEvaluate"); + workflowDef.setVersion(1); + + WorkflowTask simpleTask = new WorkflowTask(); + simpleTask.setType(SIMPLE.name()); + simpleTask.setName("simple"); + simpleTask.setTaskReferenceName("simple"); + + WorkflowTask forkTask = new WorkflowTask(); + forkTask.setType(FORK_JOIN.name()); + forkTask.setName("fork"); + forkTask.setTaskReferenceName("fork"); + + WorkflowTask branchTask1 = new WorkflowTask(); + branchTask1.setType(SIMPLE.name()); + branchTask1.setName("branchTask1"); + branchTask1.setTaskReferenceName("branchTask1"); + + WorkflowTask branchTask2 = new WorkflowTask(); + branchTask2.setType(SIMPLE.name()); + branchTask2.setName("branchTask2"); + branchTask2.setTaskReferenceName("branchTask2"); + + forkTask.getForkTasks().add(Arrays.asList(branchTask1, branchTask2)); + + WorkflowTask joinTask = new WorkflowTask(); + joinTask.setType(JOIN.name()); + joinTask.setName("join"); + joinTask.setTaskReferenceName("join"); + joinTask.setJoinOn(List.of("branchTask2")); + + WorkflowTask doWhile = new WorkflowTask(); + doWhile.setType(DO_WHILE.name()); + doWhile.setName("doWhile"); + doWhile.setTaskReferenceName("doWhile"); + + WorkflowTask loopTask = new WorkflowTask(); + loopTask.setType(SIMPLE.name()); + loopTask.setName("loopTask"); + loopTask.setTaskReferenceName("loopTask"); + + doWhile.setLoopOver(List.of(loopTask)); + + workflowDef.getTasks().addAll(List.of(simpleTask, forkTask, joinTask, doWhile)); + + TaskModel task = new TaskModel(); + + // when: + task.setReferenceTaskName("dynamic"); + assertTrue(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task)); + + task.setReferenceTaskName("branchTask1"); + assertFalse(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task)); + + task.setReferenceTaskName("branchTask2"); + assertTrue(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task)); + + task.setReferenceTaskName("simple"); + assertFalse(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task)); + + task.setReferenceTaskName("loopTask__1"); + task.setIteration(1); + assertFalse(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task)); + } + private WorkflowModel generateSampleWorkflow() { // setup WorkflowModel workflow = new WorkflowModel(); diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DecisionTaskSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DecisionTaskSpec.groovy index 66012e6fbf..ff6e3e685b 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DecisionTaskSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DecisionTaskSpec.groovy @@ -183,6 +183,9 @@ class DecisionTaskSpec extends AbstractSpecification { when: "the task 'integration_task_20' is polled and completed" def polledAndCompletedTask20Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_20', 'task1.integration.worker') + and: "the workflow is evaluated" + sweep(workflowInstanceId) + then: "verify that the task is completed and acknowledged" verifyPolledAndAcknowledgedTask(polledAndCompletedTask20Try1) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy index 0c92dc01b7..c2c4bbc2f7 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy @@ -1045,6 +1045,9 @@ class DoWhileSpec extends AbstractSpecification { when: "Polling and completing third task" Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker') + and: "the workflow is evaluated" + sweep(workflowInstanceId) + then: "Verify that the task was polled and acknowledged and workflow is in completed state" verifyPolledAndAcknowledgedTask(polledAndCompletedTask2) with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy index 46ec3342ef..6deaa812be 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy @@ -97,6 +97,9 @@ class DynamicForkJoinSpec extends AbstractSpecification { def pollAndCompleteTask3Try = workflowTestUtil.pollAndCompleteTask('integration_task_3', 'task3.worker', ['ok1': 'ov1']) + and: "workflow is evaluated by the reconciler" + sweep(workflowInstanceId) + then: "verify that the tasks were polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try, ['k1': 'v1']) workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try, ['k2': 'v2']) @@ -194,6 +197,9 @@ class DynamicForkJoinSpec extends AbstractSpecification { when: "Poll and fail 'integration_task_2'" def pollAndCompleteTask2Try = workflowTestUtil.pollAndFailTask('integration_task_2', 'task2.worker', 'it is a failure..') + and: "workflow is evaluated by the reconciler" + sweep(workflowInstanceId) + then: "verify that the tasks were polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try, ['k1': 'v1']) @@ -275,6 +281,9 @@ class DynamicForkJoinSpec extends AbstractSpecification { when: "Poll and fail 'integration_task_2'" def pollAndCompleteTask2Try1 = workflowTestUtil.pollAndFailTask('integration_task_2', 'task2.worker', 'it is a failure..') + and: "workflow is evaluated by the reconciler" + sweep(workflowInstanceId) + then: "verify that the tasks were polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try1, ['k1': 'v1']) @@ -319,6 +328,9 @@ class DynamicForkJoinSpec extends AbstractSpecification { def pollAndCompleteTask3Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_3', 'task3.worker', ['ok1': 'ov1']) + and: "workflow is evaluated by the reconciler" + sweep(workflowInstanceId) + then: "verify that the tasks were polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try2, ['k1': 'v1']) workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try1, ['k2': 'v2']) @@ -727,6 +739,9 @@ class DynamicForkJoinSpec extends AbstractSpecification { def pollAndCompleteTask2Try = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.worker') def pollAndCompleteTask3Try = workflowTestUtil.pollAndCompleteTask('integration_task_3', 'task3.worker') + and: "workflow is evaluated by the reconciler" + sweep(workflowInstanceId) + then: "verify that the tasks were polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try, ['k1': 'v1']) workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try, ['k2': 'v2']) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy index 44907f3dfb..8fd38f3339 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy @@ -342,6 +342,9 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { when: "the second task of the left fork is polled and completed with external payload storage" polledAndAckLargePayloadTask = workflowTestUtil.pollAndCompleteLargePayloadTask('integration_task_3', 'task3.integration.worker', taskOutputPath) + and: "the workflow is evaluated" + sweep(workflowInstanceId) + then: "verify that the 'integration_task_3' was polled and acknowledged" verifyPolledAndAcknowledgedLargePayloadTask(polledAndAckLargePayloadTask) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy index 2351badb2c..bde98ff889 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy @@ -133,6 +133,9 @@ class ForkJoinSpec extends AbstractSpecification { when: "The other node of the fork is completed by completing 'integration_task_2'" def polledAndAckTask2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task1.worker') + and: "workflow is evaluated" + sweep(workflowInstanceId) + then: "verify that the 'integration_task_2' was polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1) @@ -215,6 +218,9 @@ class ForkJoinSpec extends AbstractSpecification { def polledAndAckTask2Try1 = workflowTestUtil.pollAndFailTask('integration_task_2', 'task1.worker', 'Failed....') + and: "workflow is evaluated" + sweep(workflowInstanceId) + then: "verify that the 'integration_task_2' was polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1) @@ -282,6 +288,9 @@ class ForkJoinSpec extends AbstractSpecification { def polledAndAckTask2Try1 = workflowTestUtil.pollAndFailTask('integration_task_0_RT_2', 'task1.worker', 'Failed....') + and: "workflow is evaluated" + sweep(workflowInstanceId) + then: "verify that the 'integration_task_0_RT_1' was polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1) @@ -326,10 +335,12 @@ class ForkJoinSpec extends AbstractSpecification { then: "verify that the 'integration_task_3' was polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask3Try1) - when: "The other node of the fork is completed by completing 'integration_task_0_RT_2'" def polledAndAckTask2Try2 = workflowTestUtil.pollAndCompleteTask('integration_task_0_RT_2', 'task1.worker') + and: "workflow is evaluated" + sweep(workflowInstanceId) + then: "verify that the 'integration_task_2' was polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try2) @@ -399,7 +410,6 @@ class ForkJoinSpec extends AbstractSpecification { def polledAndAckTask12Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_12', 'task12.worker') def polledAndAckTask13Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_13', 'task13.worker') - then: "verify that tasks 'integration_task_11', 'integration_task_12' and 'integration_task_13' were polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask11Try1) workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask12Try1) @@ -486,6 +496,9 @@ class ForkJoinSpec extends AbstractSpecification { when: "Poll and Complete tasks: 'integration_task_20'" def polledAndAckTask20Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_20', 'task20.worker') + and: "workflow is evaluated" + sweep(workflowInstanceId) + then: "verify that tasks 'integration_task_20'polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask20Try1) @@ -700,6 +713,9 @@ class ForkJoinSpec extends AbstractSpecification { when: "poll and complete the 'integration_task_20'" def polledAndAckTask20Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_20', 'task20.worker') + and: "workflow is evaluated" + sweep(workflowInstanceId) + then: "verify that the task was polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask20Try1) @@ -1022,6 +1038,9 @@ class ForkJoinSpec extends AbstractSpecification { when: "the simple task is polled and completed" def polledAndCompletedSimpleTask = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.worker') + and: "workflow is evaluated" + sweep(workflowInstanceId) + then: "verify that the task was polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndCompletedSimpleTask) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SwitchTaskSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SwitchTaskSpec.groovy index 64e62aa6af..d12f089ecc 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SwitchTaskSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SwitchTaskSpec.groovy @@ -183,6 +183,9 @@ class SwitchTaskSpec extends AbstractSpecification { when: "the task 'integration_task_20' is polled and completed" def polledAndCompletedTask20Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_20', 'task1.integration.worker') + and: "the workflow is evaluated" + sweep(workflowInstanceId) + then: "verify that the task is completed and acknowledged" verifyPolledAndAcknowledgedTask(polledAndCompletedTask20Try1) diff --git a/test-harness/src/test/resources/terminate_task_parent_workflow.json b/test-harness/src/test/resources/terminate_task_parent_workflow.json index f38790e66f..e88f873e07 100644 --- a/test-harness/src/test/resources/terminate_task_parent_workflow.json +++ b/test-harness/src/test/resources/terminate_task_parent_workflow.json @@ -67,7 +67,7 @@ "taskReferenceName": "thejoin", "type": "JOIN", "joinOn": [ - "basicJavaA", + "test_terminate_subworkflow", "basicJavaB" ] } From 03a6251ee3627eec0ad146aa68174d4885e71a2d Mon Sep 17 00:00:00 2001 From: Anoop Panicker Date: Fri, 5 Aug 2022 11:40:49 -0700 Subject: [PATCH 2/2] expedite lazy evaluation with higher priority --- .../core/execution/WorkflowExecutor.java | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index fa82a05a26..a321b6050a 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -60,7 +60,7 @@ public class WorkflowExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowExecutor.class); - private static final int PARENT_WF_PRIORITY = 10; + private static final int EXPEDITED_PRIORITY = 10; private final MetadataDAO metadataDAO; private final QueueDAO queueDAO; @@ -636,7 +636,7 @@ private void updateAndPushParents(WorkflowModel workflow, String operation) { parentWorkflow.setStatus(WorkflowModel.Status.RUNNING); parentWorkflow.setLastRetriedTime(System.currentTimeMillis()); executionDAOFacade.updateWorkflow(parentWorkflow); - pushParentWorkflow(parentWorkflowId); + expediteLazyWorkflowEvaluation(parentWorkflowId); workflow = parentWorkflow; } @@ -874,7 +874,7 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) { workflow.toShortString(), workflow.getParentWorkflowId(), workflow.getParentWorkflowTaskId()); - pushParentWorkflow(workflow.getParentWorkflowId()); + expediteLazyWorkflowEvaluation(workflow.getParentWorkflowId()); } executionLockService.releaseLock(workflow.getWorkflowId()); @@ -961,7 +961,7 @@ public WorkflowModel terminateWorkflow( workflow.toShortString(), workflow.getParentWorkflowId(), workflow.getParentWorkflowTaskId()); - pushParentWorkflow(workflow.getParentWorkflowId()); + expediteLazyWorkflowEvaluation(workflow.getParentWorkflowId()); } if (!StringUtils.isBlank(failureWorkflow)) { @@ -1189,8 +1189,10 @@ public void updateTask(TaskResult taskResult) { task.getTaskDefName(), lastDuration, false, task.getStatus()); } - // evaluate workflow only if the task is not within a forked branch - if (!isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) { + // sync evaluate workflow only if the task is not within a forked branch + if (isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) { + expediteLazyWorkflowEvaluation(workflowId); + } else { _decide(workflowId); } } @@ -2005,14 +2007,18 @@ private void executeSubworkflowTaskAndSyncData( subWorkflowSystemTask.execute(subWorkflow, subWorkflowTask, this); } - /** Pushes parent workflow id into the decider queue with a priority. */ - private void pushParentWorkflow(String parentWorkflowId) { - if (queueDAO.containsMessage(DECIDER_QUEUE, parentWorkflowId)) { - queueDAO.postpone(DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0); + /** + * Pushes workflow id into the decider queue with a higher priority to expedite evaluation. + * + * @param workflowId The workflow to be evaluated at higher priority + */ + private void expediteLazyWorkflowEvaluation(String workflowId) { + if (queueDAO.containsMessage(DECIDER_QUEUE, workflowId)) { + queueDAO.postpone(DECIDER_QUEUE, workflowId, EXPEDITED_PRIORITY, 0); } else { - queueDAO.push(DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0); + queueDAO.push(DECIDER_QUEUE, workflowId, EXPEDITED_PRIORITY, 0); } - LOGGER.info("Pushed parent workflow {} to {}", parentWorkflowId, DECIDER_QUEUE); + LOGGER.info("Pushed workflow {} to {} for expedited evaluation", workflowId, DECIDER_QUEUE); } }