From 21abd2f1447d012837d124919770d1c54e812d25 Mon Sep 17 00:00:00 2001 From: Manan Bhatt Date: Wed, 7 Dec 2022 08:42:42 +0530 Subject: [PATCH] Do_while task relevant loop over task calculation fix (#3351) * Simplifying the logic to check relevant task for do_while task The cases where decision/ fork_join, dynamic_fork is part of loop over task then ideally do_while task must wait before all the task gets completed. * integration tests * spotless * proper fix * spotless * integration test for remaining scenario. * removed not necessary test case. The test case was when decision task is part of loop over task and decision task contains two tasks and only one task is scheduled. But there will not be any case where we schedule only one task given the decision case has two tasks. * spotless * spotless 2 * spotless 3 --- .../core/execution/tasks/DoWhile.java | 13 ++- .../test/integration/DoWhileSpec.groovy | 87 +++++++++++++++++++ .../do_while_with_decision_task.json | 62 +++++++++++++ 3 files changed, 161 insertions(+), 1 deletion(-) create mode 100644 test-harness/src/test/resources/do_while_with_decision_task.json diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/DoWhile.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/DoWhile.java index 9e14cf31a0..3dbc72eb9e 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/DoWhile.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/DoWhile.java @@ -185,7 +185,18 @@ private boolean isIterationComplete( break; } } - return allTasksTerminal; + + if (!allTasksTerminal) { + // Cases where tasks directly inside loop over are not completed. + // loopOver -> [task1 -> COMPLETED, task2 -> IN_PROGRESS] + return false; + } + + // Check all the tasks in referenceNameToModel are completed or not. These are set of tasks + // which are not directly inside loopOver tasks, but they are under hierarchy + // loopOver -> [decisionTask -> COMPLETED [ task1 -> COMPLETED, task2 -> IN_PROGRESS]] + return referenceNameToModel.values().stream() + .noneMatch(taskModel -> !taskModel.getStatus().isTerminal()); } boolean scheduleNextIteration( diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy index b94786b8db..33ba261c8f 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy @@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef +import com.netflix.conductor.common.metadata.tasks.TaskResult import com.netflix.conductor.common.run.Workflow import com.netflix.conductor.common.utils.TaskUtils import com.netflix.conductor.core.execution.tasks.Join @@ -41,6 +42,7 @@ class DoWhileSpec extends AbstractSpecification { 'do_while_sub_workflow_integration_test.json', 'do_while_five_loop_over_integration_test.json', 'do_while_system_tasks.json', + 'do_while_with_decision_task.json', 'do_while_set_variable_fix.json') } @@ -1144,6 +1146,91 @@ class DoWhileSpec extends AbstractSpecification { } } + def "Test workflow with Do While task contains decision task"() { + given: "The loop condition is set to use set variable" + def workflowInput = new HashMap() + def array = new ArrayList() + array.add(1); + array.add(2); + workflowInput['list'] = array + + when: "A do_while workflow is started" + def workflowInstanceId = startWorkflow("DO_While_with_Decision_task", 1, "looptest", workflowInput, null) + + then: "Verify that the loop over task is waiting for the wait task to get completed" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[0].taskType == 'DO_WHILE' + tasks[0].status == Task.Status.IN_PROGRESS + tasks[1].taskType == 'INLINE' + tasks[1].status == Task.Status.COMPLETED + tasks[2].taskType == 'SWITCH' + tasks[2].status == Task.Status.COMPLETED + tasks[3].taskType == 'WAIT' + tasks[3].status == Task.Status.IN_PROGRESS + } + + when: "The wait task is completed" + def waitTask = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).tasks[3] + waitTask.status = Task.Status.COMPLETED + workflowExecutor.updateTask(new TaskResult(waitTask)) + + then: "Verify that the next iteration is scheduled and workflow is in running state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 8 + tasks[0].taskType == 'DO_WHILE' + tasks[0].status == Task.Status.IN_PROGRESS + tasks[0].iteration == 2 + tasks[1].taskType == 'INLINE' + tasks[1].status == Task.Status.COMPLETED + tasks[2].taskType == 'SWITCH' + tasks[2].status == Task.Status.COMPLETED + tasks[3].taskType == 'WAIT' + tasks[3].status == Task.Status.COMPLETED + tasks[4].taskType == 'INLINE' + tasks[4].status == Task.Status.COMPLETED + tasks[5].taskType == 'INLINE' + tasks[5].status == Task.Status.COMPLETED + tasks[6].taskType == 'SWITCH' + tasks[6].status == Task.Status.COMPLETED + tasks[7].taskType == 'WAIT' + tasks[7].status == Task.Status.IN_PROGRESS + } + + when: "The wait task is completed" + waitTask = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).tasks[7] + waitTask.status = Task.Status.COMPLETED + workflowExecutor.updateTask(new TaskResult(waitTask)) + + then: "Verify that the workflow is completed" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 9 + tasks[0].taskType == 'DO_WHILE' + tasks[0].status == Task.Status.COMPLETED + tasks[0].iteration == 2 + tasks[1].taskType == 'INLINE' + tasks[1].status == Task.Status.COMPLETED + tasks[2].taskType == 'SWITCH' + tasks[2].status == Task.Status.COMPLETED + tasks[3].taskType == 'WAIT' + tasks[3].status == Task.Status.COMPLETED + tasks[4].taskType == 'INLINE' + tasks[4].status == Task.Status.COMPLETED + tasks[5].taskType == 'INLINE' + tasks[5].status == Task.Status.COMPLETED + tasks[6].taskType == 'SWITCH' + tasks[6].status == Task.Status.COMPLETED + tasks[7].taskType == 'WAIT' + tasks[7].status == Task.Status.COMPLETED + tasks[8].taskType == 'INLINE' + tasks[8].status == Task.Status.COMPLETED + } + } + + void verifyTaskIteration(Task task, int iteration) { assert task.getReferenceTaskName().endsWith(TaskUtils.getLoopOverTaskRefNameSuffix(task.getIteration())) assert task.iteration == iteration diff --git a/test-harness/src/test/resources/do_while_with_decision_task.json b/test-harness/src/test/resources/do_while_with_decision_task.json new file mode 100644 index 0000000000..3211081945 --- /dev/null +++ b/test-harness/src/test/resources/do_while_with_decision_task.json @@ -0,0 +1,62 @@ +{ + "name": "DO_While_with_Decision_task", + "description": "Program for testing loop behaviour", + "version": 1, + "schemaVersion": 2, + "ownerEmail": "xyz@company.eu", + "tasks": [ + { + "name": "LoopTask", + "taskReferenceName": "LoopTask", + "type": "DO_WHILE", + "inputParameters": { + "list": "${workflow.input.list}" + }, + "loopCondition": "$.LoopTask['iteration'] < $.list.length", + "loopOver": [ + { + "name": "GetNumberAtIndex", + "taskReferenceName": "GetNumberAtIndex", + "type": "INLINE", + "inputParameters": { + "evaluatorType": "javascript", + "list": "${workflow.input.list}", + "iterator": "${LoopTask.output.iteration}", + "expression": "function getElement() { return $.list.get($.iterator - 1); } getElement();" + } + }, + { + "name": "SwitchTask", + "taskReferenceName": "SwitchTask", + "type": "SWITCH", + "evaluatorType": "javascript", + "inputParameters": { + "param": "${GetNumberAtIndex.output.result}" + }, + "expression": "$.param > 0", + "decisionCases": { + "true": [ + { + "name": "WaitTask", + "taskReferenceName": "WaitTask", + "type": "WAIT", + "inputParameters": { + } + }, + { + "name": "ComputeNumber", + "taskReferenceName": "ComputeNumber", + "type": "INLINE", + "inputParameters": { + "evaluatorType": "javascript", + "number": "${GetNumberAtIndex.output.result.number}", + "expression": "function compute() { return $.number+10; } compute();" + } + } + ] + } + } + ] + } + ] +} \ No newline at end of file