Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Do_while task relevant loop over task calculation fix (#3351)
Browse files Browse the repository at this point in the history
* Simplifying the logic to check relevant task for do_while task

The cases where decision/ fork_join, dynamic_fork is part of loop over task then ideally do_while task must wait before all the task gets completed.

* integration tests

* spotless

* proper fix

* spotless

* integration test for remaining scenario.

* removed not necessary test case.

The test case was when decision task is part of loop over task and decision task contains two tasks and only one task is scheduled. But there will not be any case where we schedule only one task given the decision case has two tasks.

* spotless

* spotless 2

* spotless 3
  • Loading branch information
manan164 authored Dec 7, 2022
1 parent 12663ca commit 21abd2f
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,18 @@ private boolean isIterationComplete(
break;
}
}
return allTasksTerminal;

if (!allTasksTerminal) {
// Cases where tasks directly inside loop over are not completed.
// loopOver -> [task1 -> COMPLETED, task2 -> IN_PROGRESS]
return false;
}

// Check all the tasks in referenceNameToModel are completed or not. These are set of tasks
// which are not directly inside loopOver tasks, but they are under hierarchy
// loopOver -> [decisionTask -> COMPLETED [ task1 -> COMPLETED, task2 -> IN_PROGRESS]]
return referenceNameToModel.values().stream()
.noneMatch(taskModel -> !taskModel.getStatus().isTerminal());
}

boolean scheduleNextIteration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired

import com.netflix.conductor.common.metadata.tasks.Task
import com.netflix.conductor.common.metadata.tasks.TaskDef
import com.netflix.conductor.common.metadata.tasks.TaskResult
import com.netflix.conductor.common.run.Workflow
import com.netflix.conductor.common.utils.TaskUtils
import com.netflix.conductor.core.execution.tasks.Join
Expand All @@ -41,6 +42,7 @@ class DoWhileSpec extends AbstractSpecification {
'do_while_sub_workflow_integration_test.json',
'do_while_five_loop_over_integration_test.json',
'do_while_system_tasks.json',
'do_while_with_decision_task.json',
'do_while_set_variable_fix.json')
}

Expand Down Expand Up @@ -1144,6 +1146,91 @@ class DoWhileSpec extends AbstractSpecification {
}
}

def "Test workflow with Do While task contains decision task"() {
given: "The loop condition is set to use set variable"
def workflowInput = new HashMap()
def array = new ArrayList()
array.add(1);
array.add(2);
workflowInput['list'] = array

when: "A do_while workflow is started"
def workflowInstanceId = startWorkflow("DO_While_with_Decision_task", 1, "looptest", workflowInput, null)

then: "Verify that the loop over task is waiting for the wait task to get completed"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 4
tasks[0].taskType == 'DO_WHILE'
tasks[0].status == Task.Status.IN_PROGRESS
tasks[1].taskType == 'INLINE'
tasks[1].status == Task.Status.COMPLETED
tasks[2].taskType == 'SWITCH'
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == 'WAIT'
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "The wait task is completed"
def waitTask = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).tasks[3]
waitTask.status = Task.Status.COMPLETED
workflowExecutor.updateTask(new TaskResult(waitTask))

then: "Verify that the next iteration is scheduled and workflow is in running state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 8
tasks[0].taskType == 'DO_WHILE'
tasks[0].status == Task.Status.IN_PROGRESS
tasks[0].iteration == 2
tasks[1].taskType == 'INLINE'
tasks[1].status == Task.Status.COMPLETED
tasks[2].taskType == 'SWITCH'
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == 'WAIT'
tasks[3].status == Task.Status.COMPLETED
tasks[4].taskType == 'INLINE'
tasks[4].status == Task.Status.COMPLETED
tasks[5].taskType == 'INLINE'
tasks[5].status == Task.Status.COMPLETED
tasks[6].taskType == 'SWITCH'
tasks[6].status == Task.Status.COMPLETED
tasks[7].taskType == 'WAIT'
tasks[7].status == Task.Status.IN_PROGRESS
}

when: "The wait task is completed"
waitTask = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).tasks[7]
waitTask.status = Task.Status.COMPLETED
workflowExecutor.updateTask(new TaskResult(waitTask))

then: "Verify that the workflow is completed"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 9
tasks[0].taskType == 'DO_WHILE'
tasks[0].status == Task.Status.COMPLETED
tasks[0].iteration == 2
tasks[1].taskType == 'INLINE'
tasks[1].status == Task.Status.COMPLETED
tasks[2].taskType == 'SWITCH'
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == 'WAIT'
tasks[3].status == Task.Status.COMPLETED
tasks[4].taskType == 'INLINE'
tasks[4].status == Task.Status.COMPLETED
tasks[5].taskType == 'INLINE'
tasks[5].status == Task.Status.COMPLETED
tasks[6].taskType == 'SWITCH'
tasks[6].status == Task.Status.COMPLETED
tasks[7].taskType == 'WAIT'
tasks[7].status == Task.Status.COMPLETED
tasks[8].taskType == 'INLINE'
tasks[8].status == Task.Status.COMPLETED
}
}


void verifyTaskIteration(Task task, int iteration) {
assert task.getReferenceTaskName().endsWith(TaskUtils.getLoopOverTaskRefNameSuffix(task.getIteration()))
assert task.iteration == iteration
Expand Down
62 changes: 62 additions & 0 deletions test-harness/src/test/resources/do_while_with_decision_task.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
"name": "DO_While_with_Decision_task",
"description": "Program for testing loop behaviour",
"version": 1,
"schemaVersion": 2,
"ownerEmail": "xyz@company.eu",
"tasks": [
{
"name": "LoopTask",
"taskReferenceName": "LoopTask",
"type": "DO_WHILE",
"inputParameters": {
"list": "${workflow.input.list}"
},
"loopCondition": "$.LoopTask['iteration'] < $.list.length",
"loopOver": [
{
"name": "GetNumberAtIndex",
"taskReferenceName": "GetNumberAtIndex",
"type": "INLINE",
"inputParameters": {
"evaluatorType": "javascript",
"list": "${workflow.input.list}",
"iterator": "${LoopTask.output.iteration}",
"expression": "function getElement() { return $.list.get($.iterator - 1); } getElement();"
}
},
{
"name": "SwitchTask",
"taskReferenceName": "SwitchTask",
"type": "SWITCH",
"evaluatorType": "javascript",
"inputParameters": {
"param": "${GetNumberAtIndex.output.result}"
},
"expression": "$.param > 0",
"decisionCases": {
"true": [
{
"name": "WaitTask",
"taskReferenceName": "WaitTask",
"type": "WAIT",
"inputParameters": {
}
},
{
"name": "ComputeNumber",
"taskReferenceName": "ComputeNumber",
"type": "INLINE",
"inputParameters": {
"evaluatorType": "javascript",
"number": "${GetNumberAtIndex.output.result.number}",
"expression": "function compute() { return $.number+10; } compute();"
}
}
]
}
}
]
}
]
}

0 comments on commit 21abd2f

Please sign in to comment.