Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

do not synchronously eval workflow during update task for select criteria #3146

Merged
merged 2 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@
import com.netflix.conductor.core.WorkflowContext;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.exception.ConflictException;
import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.core.exception.*;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.execution.tasks.Terminate;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
Expand All @@ -64,7 +60,7 @@
public class WorkflowExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowExecutor.class);
private static final int PARENT_WF_PRIORITY = 10;
private static final int EXPEDITED_PRIORITY = 10;

private final MetadataDAO metadataDAO;
private final QueueDAO queueDAO;
Expand Down Expand Up @@ -640,7 +636,7 @@ private void updateAndPushParents(WorkflowModel workflow, String operation) {
parentWorkflow.setStatus(WorkflowModel.Status.RUNNING);
parentWorkflow.setLastRetriedTime(System.currentTimeMillis());
executionDAOFacade.updateWorkflow(parentWorkflow);
pushParentWorkflow(parentWorkflowId);
expediteLazyWorkflowEvaluation(parentWorkflowId);

workflow = parentWorkflow;
}
Expand Down Expand Up @@ -878,7 +874,7 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) {
workflow.toShortString(),
workflow.getParentWorkflowId(),
workflow.getParentWorkflowTaskId());
pushParentWorkflow(workflow.getParentWorkflowId());
expediteLazyWorkflowEvaluation(workflow.getParentWorkflowId());
}

executionLockService.releaseLock(workflow.getWorkflowId());
Expand Down Expand Up @@ -965,7 +961,7 @@ public WorkflowModel terminateWorkflow(
workflow.toShortString(),
workflow.getParentWorkflowId(),
workflow.getParentWorkflowTaskId());
pushParentWorkflow(workflow.getParentWorkflowId());
expediteLazyWorkflowEvaluation(workflow.getParentWorkflowId());
}

if (!StringUtils.isBlank(failureWorkflow)) {
Expand Down Expand Up @@ -1193,7 +1189,52 @@ public void updateTask(TaskResult taskResult) {
task.getTaskDefName(), lastDuration, false, task.getStatus());
}

_decide(workflowId);
// sync evaluate workflow only if the task is not within a forked branch
if (isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) {
expediteLazyWorkflowEvaluation(workflowId);
} else {
_decide(workflowId);
apanicker-nflx marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Determines if a workflow can be lazily evaluated, if it meets any of these criteria
*
* <ul>
* <li>The task is NOT a loop task within DO_WHILE
* <li>The task is one of the intermediate tasks in a branch within a FORK_JOIN
* <li>The task is forked from a FORK_JOIN_DYNAMIC
* </ul>
*
* @param workflowDef The workflow definition of the workflow for which evaluation decision is
* to be made
* @param task The task which is attempting to trigger the evaluation
* @return true if workflow can be lazily evaluated, false otherwise
*/
@VisibleForTesting
boolean isLazyEvaluateWorkflow(WorkflowDef workflowDef, TaskModel task) {
if (task.isLoopOverTask()) {
return false;
}

String taskRefName = task.getReferenceTaskName();
List<WorkflowTask> workflowTasks = workflowDef.collectTasks();

List<WorkflowTask> forkTasks =
workflowTasks.stream()
.filter(t -> t.getType().equals(TaskType.FORK_JOIN.name()))
.collect(Collectors.toList());

List<WorkflowTask> joinTasks =
workflowTasks.stream()
.filter(t -> t.getType().equals(TaskType.JOIN.name()))
.collect(Collectors.toList());

if (forkTasks.stream().anyMatch(fork -> fork.has(taskRefName))) {
return joinTasks.stream().anyMatch(join -> join.getJoinOn().contains(taskRefName));
}

return workflowTasks.stream().noneMatch(t -> t.getTaskReferenceName().equals(taskRefName));
}

public TaskModel getTask(String taskId) {
Expand Down Expand Up @@ -1966,14 +2007,18 @@ private void executeSubworkflowTaskAndSyncData(
subWorkflowSystemTask.execute(subWorkflow, subWorkflowTask, this);
}

/** Pushes parent workflow id into the decider queue with a priority. */
private void pushParentWorkflow(String parentWorkflowId) {
if (queueDAO.containsMessage(DECIDER_QUEUE, parentWorkflowId)) {
queueDAO.postpone(DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0);
/**
* Pushes workflow id into the decider queue with a higher priority to expedite evaluation.
*
* @param workflowId The workflow to be evaluated at higher priority
*/
private void expediteLazyWorkflowEvaluation(String workflowId) {
if (queueDAO.containsMessage(DECIDER_QUEUE, workflowId)) {
queueDAO.postpone(DECIDER_QUEUE, workflowId, EXPEDITED_PRIORITY, 0);
} else {
queueDAO.push(DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0);
queueDAO.push(DECIDER_QUEUE, workflowId, EXPEDITED_PRIORITY, 0);
}

LOGGER.info("Pushed parent workflow {} to {}", parentWorkflowId, DECIDER_QUEUE);
LOGGER.info("Pushed workflow {} to {} for expedited evaluation", workflowId, DECIDER_QUEUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ public static void recordGauge(String name, long count) {
gauge(classQualifier, name, count);
}

public static void recordCounter(String name, long count, String... additionalTags) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to use this method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i noticed that we have a public method for gauge but not for a counter. Further, I plan to use the counter.

getCounter(classQualifier, name, additionalTags).increment(count);
}

public static void recordQueueWaitTime(String taskType, long queueWaitTime) {
getTimer(classQualifier, "task_queue_wait", "taskType", taskType)
.record(queueWaitTime, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2446,6 +2446,75 @@ public void testUpdateTaskWithOutCallbackAfterSeconds() {
argumentCaptor.getAllValues().get(0).getWorkflowInstanceId());
}

@Test
public void testIsLazyEvaluateWorkflow() {
// setup
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("lazyEvaluate");
workflowDef.setVersion(1);

WorkflowTask simpleTask = new WorkflowTask();
simpleTask.setType(SIMPLE.name());
simpleTask.setName("simple");
simpleTask.setTaskReferenceName("simple");

WorkflowTask forkTask = new WorkflowTask();
forkTask.setType(FORK_JOIN.name());
forkTask.setName("fork");
forkTask.setTaskReferenceName("fork");

WorkflowTask branchTask1 = new WorkflowTask();
branchTask1.setType(SIMPLE.name());
branchTask1.setName("branchTask1");
branchTask1.setTaskReferenceName("branchTask1");

WorkflowTask branchTask2 = new WorkflowTask();
branchTask2.setType(SIMPLE.name());
branchTask2.setName("branchTask2");
branchTask2.setTaskReferenceName("branchTask2");

forkTask.getForkTasks().add(Arrays.asList(branchTask1, branchTask2));

WorkflowTask joinTask = new WorkflowTask();
joinTask.setType(JOIN.name());
joinTask.setName("join");
joinTask.setTaskReferenceName("join");
joinTask.setJoinOn(List.of("branchTask2"));

WorkflowTask doWhile = new WorkflowTask();
doWhile.setType(DO_WHILE.name());
doWhile.setName("doWhile");
doWhile.setTaskReferenceName("doWhile");

WorkflowTask loopTask = new WorkflowTask();
loopTask.setType(SIMPLE.name());
loopTask.setName("loopTask");
loopTask.setTaskReferenceName("loopTask");

doWhile.setLoopOver(List.of(loopTask));

workflowDef.getTasks().addAll(List.of(simpleTask, forkTask, joinTask, doWhile));

TaskModel task = new TaskModel();

// when:
task.setReferenceTaskName("dynamic");
assertTrue(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task));

task.setReferenceTaskName("branchTask1");
assertFalse(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task));

task.setReferenceTaskName("branchTask2");
assertTrue(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task));

task.setReferenceTaskName("simple");
assertFalse(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task));

task.setReferenceTaskName("loopTask__1");
task.setIteration(1);
assertFalse(workflowExecutor.isLazyEvaluateWorkflow(workflowDef, task));
}

private WorkflowModel generateSampleWorkflow() {
// setup
WorkflowModel workflow = new WorkflowModel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ class DecisionTaskSpec extends AbstractSpecification {
when: "the task 'integration_task_20' is polled and completed"
def polledAndCompletedTask20Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_20', 'task1.integration.worker')

and: "the workflow is evaluated"
sweep(workflowInstanceId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this changes in tests? same comment for the others?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of the changes introduced, the workflow will be evaluated lazily for a join task, so we simulate the lazy evaluation by calling a sweep here.


then: "verify that the task is completed and acknowledged"
verifyPolledAndAcknowledgedTask(polledAndCompletedTask20Try1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,9 @@ class DoWhileSpec extends AbstractSpecification {
when: "Polling and completing third task"
Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')

and: "the workflow is evaluated"
sweep(workflowInstanceId)

then: "Verify that the task was polled and acknowledged and workflow is in completed state"
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class DynamicForkJoinSpec extends AbstractSpecification {
def pollAndCompleteTask3Try = workflowTestUtil.pollAndCompleteTask('integration_task_3', 'task3.worker',
['ok1': 'ov1'])

and: "workflow is evaluated by the reconciler"
sweep(workflowInstanceId)

then: "verify that the tasks were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try, ['k1': 'v1'])
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try, ['k2': 'v2'])
Expand Down Expand Up @@ -194,6 +197,9 @@ class DynamicForkJoinSpec extends AbstractSpecification {
when: "Poll and fail 'integration_task_2'"
def pollAndCompleteTask2Try = workflowTestUtil.pollAndFailTask('integration_task_2', 'task2.worker', 'it is a failure..')

and: "workflow is evaluated by the reconciler"
sweep(workflowInstanceId)

then: "verify that the tasks were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try, ['k1': 'v1'])

Expand Down Expand Up @@ -275,6 +281,9 @@ class DynamicForkJoinSpec extends AbstractSpecification {
when: "Poll and fail 'integration_task_2'"
def pollAndCompleteTask2Try1 = workflowTestUtil.pollAndFailTask('integration_task_2', 'task2.worker', 'it is a failure..')

and: "workflow is evaluated by the reconciler"
sweep(workflowInstanceId)

then: "verify that the tasks were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try1, ['k1': 'v1'])

Expand Down Expand Up @@ -319,6 +328,9 @@ class DynamicForkJoinSpec extends AbstractSpecification {
def pollAndCompleteTask3Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_3', 'task3.worker',
['ok1': 'ov1'])

and: "workflow is evaluated by the reconciler"
sweep(workflowInstanceId)

then: "verify that the tasks were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try2, ['k1': 'v1'])
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try1, ['k2': 'v2'])
Expand Down Expand Up @@ -727,6 +739,9 @@ class DynamicForkJoinSpec extends AbstractSpecification {
def pollAndCompleteTask2Try = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.worker')
def pollAndCompleteTask3Try = workflowTestUtil.pollAndCompleteTask('integration_task_3', 'task3.worker')

and: "workflow is evaluated by the reconciler"
sweep(workflowInstanceId)

then: "verify that the tasks were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try, ['k1': 'v1'])
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try, ['k2': 'v2'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ class ExternalPayloadStorageSpec extends AbstractSpecification {
when: "the second task of the left fork is polled and completed with external payload storage"
polledAndAckLargePayloadTask = workflowTestUtil.pollAndCompleteLargePayloadTask('integration_task_3', 'task3.integration.worker', taskOutputPath)

and: "the workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the 'integration_task_3' was polled and acknowledged"
verifyPolledAndAcknowledgedLargePayloadTask(polledAndAckLargePayloadTask)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class ForkJoinSpec extends AbstractSpecification {
when: "The other node of the fork is completed by completing 'integration_task_2'"
def polledAndAckTask2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task1.worker')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the 'integration_task_2' was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1)

Expand Down Expand Up @@ -215,6 +218,9 @@ class ForkJoinSpec extends AbstractSpecification {
def polledAndAckTask2Try1 = workflowTestUtil.pollAndFailTask('integration_task_2',
'task1.worker', 'Failed....')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the 'integration_task_2' was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1)

Expand Down Expand Up @@ -282,6 +288,9 @@ class ForkJoinSpec extends AbstractSpecification {
def polledAndAckTask2Try1 = workflowTestUtil.pollAndFailTask('integration_task_0_RT_2',
'task1.worker', 'Failed....')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the 'integration_task_0_RT_1' was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1)

Expand Down Expand Up @@ -326,10 +335,12 @@ class ForkJoinSpec extends AbstractSpecification {
then: "verify that the 'integration_task_3' was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask3Try1)


when: "The other node of the fork is completed by completing 'integration_task_0_RT_2'"
def polledAndAckTask2Try2 = workflowTestUtil.pollAndCompleteTask('integration_task_0_RT_2', 'task1.worker')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the 'integration_task_2' was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try2)

Expand Down Expand Up @@ -399,7 +410,6 @@ class ForkJoinSpec extends AbstractSpecification {
def polledAndAckTask12Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_12', 'task12.worker')
def polledAndAckTask13Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_13', 'task13.worker')


then: "verify that tasks 'integration_task_11', 'integration_task_12' and 'integration_task_13' were polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask11Try1)
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask12Try1)
Expand Down Expand Up @@ -486,6 +496,9 @@ class ForkJoinSpec extends AbstractSpecification {
when: "Poll and Complete tasks: 'integration_task_20'"
def polledAndAckTask20Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_20', 'task20.worker')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that tasks 'integration_task_20'polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask20Try1)

Expand Down Expand Up @@ -700,6 +713,9 @@ class ForkJoinSpec extends AbstractSpecification {
when: "poll and complete the 'integration_task_20'"
def polledAndAckTask20Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_20', 'task20.worker')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the task was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask20Try1)

Expand Down Expand Up @@ -1022,6 +1038,9 @@ class ForkJoinSpec extends AbstractSpecification {
when: "the simple task is polled and completed"
def polledAndCompletedSimpleTask = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.worker')

and: "workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the task was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndCompletedSimpleTask)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ class SwitchTaskSpec extends AbstractSpecification {
when: "the task 'integration_task_20' is polled and completed"
def polledAndCompletedTask20Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_20', 'task1.integration.worker')

and: "the workflow is evaluated"
sweep(workflowInstanceId)

then: "verify that the task is completed and acknowledged"
verifyPolledAndAcknowledgedTask(polledAndCompletedTask20Try1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
"taskReferenceName": "thejoin",
"type": "JOIN",
"joinOn": [
"basicJavaA",
"test_terminate_subworkflow",
"basicJavaB"
]
}
Expand Down