Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Fix for failure workflow
Browse files Browse the repository at this point in the history
- fix correlationId for failure workflow
- add failure taskId to input
  • Loading branch information
jxu-nflx committed Feb 1, 2022
1 parent cd528e1 commit bc5d073
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,13 @@ private void endExecution(WorkflowModel workflow) {
String.format(
"Workflow is %s by TERMINATE task: %s",
terminationStatus, terminateTask.get().getTaskId());
if (WorkflowModel.Status.FAILED.name().equals(terminationStatus)) {
workflow.setStatus(WorkflowModel.Status.FAILED);
workflow = terminate(workflow, new TerminateWorkflowException(reason));
if (WorkflowStatus.FAILED.name().equals(terminationStatus)) {
workflow.setStatus(WorkflowStatus.FAILED);
workflow =
terminate(
workflow,
new TerminateWorkflowException(
reason, workflow.getStatus(), terminateTask.get()));
} else {
workflow.setReasonForIncompletion(reason);
workflow = completeWorkflow(workflow);
Expand Down Expand Up @@ -904,6 +908,18 @@ public void terminateWorkflow(String workflowId, String reason) {
*/
public WorkflowModel terminateWorkflow(
WorkflowModel workflow, String reason, String failureWorkflow) {
return terminateWorkflow(workflow, null, reason, failureWorkflow);
}

/**
* @param workflow the workflow to be terminated
* @param failedTask the failed task that caused the workflow termination, can be null
* @param reason the reason for termination
* @param failureWorkflow the failure workflow (if any) to be triggered as a result of this
* termination
*/
public Workflow terminateWorkflow(
Workflow workflow, Task failedTask, String reason, String failureWorkflow) {
try {
executionLockService.acquireLock(workflow.getWorkflowId(), 60000);

Expand Down Expand Up @@ -974,6 +990,9 @@ public WorkflowModel terminateWorkflow(
input.put("workflowId", workflowId);
input.put("reason", reason);
input.put("failureStatus", workflow.getStatus().toString());
if (failedTask != null) {
input.put("failureTaskId", failedTask.getTaskId());
}

try {
WorkflowDef latestFailureWorkflow =
Expand All @@ -990,7 +1009,7 @@ public WorkflowModel terminateWorkflow(
latestFailureWorkflow,
input,
null,
workflowId,
workflow.getCorrelationId(),
null,
workflow.getTaskToDomain());

Expand Down Expand Up @@ -1790,7 +1809,10 @@ private WorkflowModel terminate(
executionDAOFacade.updateTask(terminateWorkflowException.getTask());
}
return terminateWorkflow(
workflow, terminateWorkflowException.getMessage(), failureWorkflow);
workflow,
terminateWorkflowException.getTask(),
terminateWorkflowException.getMessage(),
failureWorkflow);
}

private boolean rerunWF(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,56 @@ public void testResumeWorkflow() {
verify(queueDAO, times(1)).push(anyString(), anyString(), anyInt(), anyLong());
}

private WorkflowModel generateSampleWorkflow() {
@Test
@SuppressWarnings("unchecked")
public void testTerminateWorkflowWithFailureWorkflow() {
Workflow workflow = new Workflow();
workflow.setWorkflowId("1");
workflow.setCorrelationId("testid");
workflow.setWorkflowDefinition(new WorkflowDef());
workflow.setStatus(Workflow.WorkflowStatus.RUNNING);
workflow.setOwnerApp("junit_test");
workflow.setStartTime(10L);
workflow.setEndTime(100L);
workflow.setOutput(Collections.EMPTY_MAP);

Task failedTask = new Task();
failedTask.setTaskId("taskid");
failedTask.setReferenceTaskName("failed");
failedTask.setStatus(Status.FAILED);
workflow.getTasks().addAll(Arrays.asList(failedTask));

WorkflowDef failureWorkflowDef = new WorkflowDef();
failureWorkflowDef.setName("failure workflow");
when(metadataDAO.getLatestWorkflowDef(failureWorkflowDef.getName()))
.thenReturn(Optional.of(failureWorkflowDef));

Workflow failureWorkflow = new Workflow();
failureWorkflow.setWorkflowId("2");
failureWorkflow.setWorkflowDefinition(failureWorkflowDef);
when(executionDAOFacade.getWorkflowById(anyString(), anyBoolean()))
.thenReturn(failureWorkflow);

when(executionLockService.acquireLock(anyString())).thenReturn(true);

workflowExecutor.terminateWorkflow(
workflow, failedTask, "reason", failureWorkflowDef.getName());

assertEquals(Workflow.WorkflowStatus.TERMINATED, workflow.getStatus());
ArgumentCaptor<Workflow> argumentCaptor = ArgumentCaptor.forClass(Workflow.class);
verify(executionDAOFacade, times(1)).createWorkflow(argumentCaptor.capture());
assertEquals(
workflow.getCorrelationId(),
argumentCaptor.getAllValues().get(0).getCorrelationId());
assertEquals(
workflow.getWorkflowId(),
argumentCaptor.getAllValues().get(0).getInput().get("workflowId"));
assertEquals(
failedTask.getTaskId(),
argumentCaptor.getAllValues().get(0).getInput().get("failureTaskId"));
}

private Workflow generateSampleWorkflow() {
// setup
WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId("testRetryWorkflowId");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.test.integration

import org.springframework.beans.factory.annotation.Autowired

import com.netflix.conductor.common.metadata.tasks.Task
import com.netflix.conductor.common.run.Workflow
import com.netflix.conductor.core.execution.tasks.SubWorkflow
import com.netflix.conductor.test.base.AbstractSpecification

import spock.lang.Shared

class FailureWorkflowSpec extends AbstractSpecification {

@Shared
def WORKFLOW_WITH_TERMINATE_TASK_FAILED = 'test_terminate_task_failed_wf'

@Shared
def PARENT_WORKFLOW_WITH_FAILURE_TASK = 'test_task_failed_parent_wf'

@Autowired
SubWorkflow subWorkflowTask

def setup() {
workflowTestUtil.registerWorkflows(
'failure_workflow_for_terminate_task_workflow.json',
'terminate_task_failed_workflow_integration.json',
'test_task_failed_parent_workflow.json',
'test_task_failed_sub_workflow.json'
)
}

def "Test workflow with a task that failed"() {
given: "workflow input"
def workflowInput = new HashMap()
workflowInput['a'] = 1

when: "Start the workflow which has the failed task"
def testId = 'testId'
def workflowInstanceId = workflowExecutor.startWorkflow(WORKFLOW_WITH_TERMINATE_TASK_FAILED, 1,
testId, workflowInput, null, null, null)

then: "Verify that the workflow has failed"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.FAILED
tasks.size() == 2
reasonForIncompletion.contains('Workflow is FAILED by TERMINATE task')
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'LAMBDA'
tasks[0].seq == 1
tasks[1].status == Task.Status.COMPLETED
tasks[1].taskType == 'TERMINATE'
tasks[1].seq == 2
output
def failedWorkflowId = output['conductor.failure_workflow'] as String
def workflowCorrelationId = correlationId
def workflowFailureTaskId = tasks[1].taskId
with(workflowExecutionService.getExecutionStatus(failedWorkflowId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
correlationId == workflowCorrelationId
input['workflowId'] == workflowInstanceId
input['failureTaskId'] == workflowFailureTaskId
tasks.size() == 1
tasks[0].taskType == 'LAMBDA'
}
}
}

def "Test workflow with a task failed in subworkflow"() {
given: "workflow input"
def workflowInput = new HashMap()
workflowInput['a'] = 1

when: "Start the workflow which has the subworkflow task"
def workflowInstanceId = workflowExecutor.startWorkflow(PARENT_WORKFLOW_WITH_FAILURE_TASK, 1,
'', workflowInput, null, null, null)

then: "verify that the workflow has started and the tasks are as expected"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 2
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'LAMBDA'
tasks[0].referenceTaskName == 'lambdaTask1'
tasks[0].seq == 1
tasks[1].status == Task.Status.SCHEDULED
tasks[1].taskType == 'SUB_WORKFLOW'
tasks[1].seq == 2
}

when: "subworkflow is retrieved"
def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowTaskId = workflow.getTaskByRefName("test_task_failed_sub_wf").getTaskId()
asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId)
workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowId = workflow.getTaskByRefName("test_task_failed_sub_wf").subWorkflowId

then: "verify that the sub workflow has failed"
with(workflowExecutionService.getExecutionStatus(subWorkflowId, true)) {
status == Workflow.WorkflowStatus.FAILED
tasks.size() == 2
reasonForIncompletion.contains('Workflow is FAILED by TERMINATE task')
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'LAMBDA'
tasks[0].seq == 1
tasks[1].status == Task.Status.COMPLETED
tasks[1].taskType == 'TERMINATE'
tasks[1].seq == 2
}

then: "Verify that the workflow has failed and correct inputs passed into the failure workflow"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.FAILED
tasks.size() == 2
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'LAMBDA'
tasks[0].referenceTaskName == 'lambdaTask1'
tasks[0].seq == 1
tasks[1].status == Task.Status.FAILED
tasks[1].taskType == 'SUB_WORKFLOW'
tasks[1].seq == 2
def failedWorkflowId = output['conductor.failure_workflow'] as String
def workflowCorrelationId = correlationId
def workflowFailureTaskId = tasks[1].taskId
with(workflowExecutionService.getExecutionStatus(failedWorkflowId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
correlationId == workflowCorrelationId
input['workflowId'] == workflowInstanceId
input['failureTaskId'] == workflowFailureTaskId
tasks.size() == 1
tasks[0].taskType == 'LAMBDA'
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"name": "test_task_failed_parent_wf",
"version": 1,
"tasks": [
{
"name": "test_lambda_task1",
"taskReferenceName": "lambdaTask1",
"inputParameters": {
"lambdaValue": "${workflow.input.lambdaValue}",
"scriptExpression": "var i = 10; if ($.lambdaValue == 1){ return {testvalue: 'Lambda value was 1', iValue: i} } else { return {testvalue: 'Lambda value was NOT 1', iValue: i + 3} }"
},
"type": "LAMBDA"
},
{
"name": "test_task_failed_sub_wf",
"taskReferenceName": "test_task_failed_sub_wf",
"inputParameters": {
},
"type": "SUB_WORKFLOW",
"subWorkflowParam": {
"name": "test_task_failed_sub_wf"
}
},
{
"name": "test_lambda_task2",
"taskReferenceName": "lambdaTask2",
"inputParameters": {
"lambdaValue": "${workflow.input.lambdaValue}",
"scriptExpression": "var i = 10; if ($.lambdaValue == 1){ return {testvalue: 'Lambda value was 1', iValue: i} } else { return {testvalue: 'Lambda value was NOT 1', iValue: i + 3} }"
},
"type": "LAMBDA"
}
],
"schemaVersion": 2,
"ownerEmail": "test@harness.com",
"failureWorkflow": "failure_workflow"
}
65 changes: 65 additions & 0 deletions test-harness/src/test/resources/test_task_failed_sub_workflow.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
{
"name": "test_task_failed_sub_wf",
"version": 1,
"tasks": [
{
"name": "lambda",
"taskReferenceName": "lambda0",
"inputParameters": {
"input": "${workflow.input}",
"scriptExpression": "if ($.input.a==1){return {testvalue: true}} else{return {testvalue: false}}"
},
"type": "LAMBDA",
"decisionCases": {},
"defaultCase": [],
"forkTasks": [],
"startDelay": 0,
"joinOn": [],
"optional": false,
"defaultExclusiveJoinTask": [],
"asyncComplete": false,
"loopOver": []
},
{
"name": "terminate",
"taskReferenceName": "terminate0",
"inputParameters": {
"terminationStatus": "FAILED",
"workflowOutput": "${lambda0.output}"
},
"type": "TERMINATE",
"decisionCases": {},
"defaultCase": [],
"forkTasks": [],
"startDelay": 0,
"joinOn": [],
"optional": false,
"defaultExclusiveJoinTask": [],
"asyncComplete": false,
"loopOver": []
},
{
"name": "integration_task_2",
"taskReferenceName": "t2",
"inputParameters": {},
"type": "SIMPLE",
"decisionCases": {},
"defaultCase": [],
"forkTasks": [],
"startDelay": 0,
"joinOn": [],
"optional": false,
"defaultExclusiveJoinTask": [],
"asyncComplete": false,
"loopOver": []
}
],
"inputParameters": [],
"outputParameters": {},
"schemaVersion": 2,
"restartable": true,
"workflowStatusListenerEnabled": false,
"timeoutPolicy": "ALERT_ONLY",
"timeoutSeconds": 0,
"ownerEmail": "test@harness.com"
}

0 comments on commit bc5d073

Please sign in to comment.