diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 711eac10f4..af041d9bf5 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -801,9 +801,13 @@ private void endExecution(WorkflowModel workflow) { String.format( "Workflow is %s by TERMINATE task: %s", terminationStatus, terminateTask.get().getTaskId()); - if (WorkflowModel.Status.FAILED.name().equals(terminationStatus)) { - workflow.setStatus(WorkflowModel.Status.FAILED); - workflow = terminate(workflow, new TerminateWorkflowException(reason)); + if (WorkflowStatus.FAILED.name().equals(terminationStatus)) { + workflow.setStatus(WorkflowStatus.FAILED); + workflow = + terminate( + workflow, + new TerminateWorkflowException( + reason, workflow.getStatus(), terminateTask.get())); } else { workflow.setReasonForIncompletion(reason); workflow = completeWorkflow(workflow); @@ -904,6 +908,18 @@ public void terminateWorkflow(String workflowId, String reason) { */ public WorkflowModel terminateWorkflow( WorkflowModel workflow, String reason, String failureWorkflow) { + return terminateWorkflow(workflow, null, reason, failureWorkflow); + } + + /** + * @param workflow the workflow to be terminated + * @param failedTask the failed task that caused the workflow termination, can be null + * @param reason the reason for termination + * @param failureWorkflow the failure workflow (if any) to be triggered as a result of this + * termination + */ + public Workflow terminateWorkflow( + Workflow workflow, Task failedTask, String reason, String failureWorkflow) { try { executionLockService.acquireLock(workflow.getWorkflowId(), 60000); @@ -974,6 +990,9 @@ public WorkflowModel terminateWorkflow( input.put("workflowId", workflowId); input.put("reason", reason); input.put("failureStatus", workflow.getStatus().toString()); + if (failedTask != null) { + input.put("failureTaskId", failedTask.getTaskId()); + } try { WorkflowDef latestFailureWorkflow = @@ -990,7 +1009,7 @@ public WorkflowModel terminateWorkflow( latestFailureWorkflow, input, null, - workflowId, + workflow.getCorrelationId(), null, workflow.getTaskToDomain()); @@ -1790,7 +1809,10 @@ private WorkflowModel terminate( executionDAOFacade.updateTask(terminateWorkflowException.getTask()); } return terminateWorkflow( - workflow, terminateWorkflowException.getMessage(), failureWorkflow); + workflow, + terminateWorkflowException.getTask(), + terminateWorkflowException.getMessage(), + failureWorkflow); } private boolean rerunWF( diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index 7d202e0d37..eda35038bb 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -2024,7 +2024,56 @@ public void testResumeWorkflow() { verify(queueDAO, times(1)).push(anyString(), anyString(), anyInt(), anyLong()); } - private WorkflowModel generateSampleWorkflow() { + @Test + @SuppressWarnings("unchecked") + public void testTerminateWorkflowWithFailureWorkflow() { + Workflow workflow = new Workflow(); + workflow.setWorkflowId("1"); + workflow.setCorrelationId("testid"); + workflow.setWorkflowDefinition(new WorkflowDef()); + workflow.setStatus(Workflow.WorkflowStatus.RUNNING); + workflow.setOwnerApp("junit_test"); + workflow.setStartTime(10L); + workflow.setEndTime(100L); + workflow.setOutput(Collections.EMPTY_MAP); + + Task failedTask = new Task(); + failedTask.setTaskId("taskid"); + failedTask.setReferenceTaskName("failed"); + failedTask.setStatus(Status.FAILED); + workflow.getTasks().addAll(Arrays.asList(failedTask)); + + WorkflowDef failureWorkflowDef = new WorkflowDef(); + failureWorkflowDef.setName("failure workflow"); + when(metadataDAO.getLatestWorkflowDef(failureWorkflowDef.getName())) + .thenReturn(Optional.of(failureWorkflowDef)); + + Workflow failureWorkflow = new Workflow(); + failureWorkflow.setWorkflowId("2"); + failureWorkflow.setWorkflowDefinition(failureWorkflowDef); + when(executionDAOFacade.getWorkflowById(anyString(), anyBoolean())) + .thenReturn(failureWorkflow); + + when(executionLockService.acquireLock(anyString())).thenReturn(true); + + workflowExecutor.terminateWorkflow( + workflow, failedTask, "reason", failureWorkflowDef.getName()); + + assertEquals(Workflow.WorkflowStatus.TERMINATED, workflow.getStatus()); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Workflow.class); + verify(executionDAOFacade, times(1)).createWorkflow(argumentCaptor.capture()); + assertEquals( + workflow.getCorrelationId(), + argumentCaptor.getAllValues().get(0).getCorrelationId()); + assertEquals( + workflow.getWorkflowId(), + argumentCaptor.getAllValues().get(0).getInput().get("workflowId")); + assertEquals( + failedTask.getTaskId(), + argumentCaptor.getAllValues().get(0).getInput().get("failureTaskId")); + } + + private Workflow generateSampleWorkflow() { // setup WorkflowModel workflow = new WorkflowModel(); workflow.setWorkflowId("testRetryWorkflowId"); diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/FailureWorkflowSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/FailureWorkflowSpec.groovy new file mode 100644 index 0000000000..12891cea2c --- /dev/null +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/FailureWorkflowSpec.groovy @@ -0,0 +1,146 @@ +/* + * Copyright 2021 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.test.integration + +import org.springframework.beans.factory.annotation.Autowired + +import com.netflix.conductor.common.metadata.tasks.Task +import com.netflix.conductor.common.run.Workflow +import com.netflix.conductor.core.execution.tasks.SubWorkflow +import com.netflix.conductor.test.base.AbstractSpecification + +import spock.lang.Shared + +class FailureWorkflowSpec extends AbstractSpecification { + + @Shared + def WORKFLOW_WITH_TERMINATE_TASK_FAILED = 'test_terminate_task_failed_wf' + + @Shared + def PARENT_WORKFLOW_WITH_FAILURE_TASK = 'test_task_failed_parent_wf' + + @Autowired + SubWorkflow subWorkflowTask + + def setup() { + workflowTestUtil.registerWorkflows( + 'failure_workflow_for_terminate_task_workflow.json', + 'terminate_task_failed_workflow_integration.json', + 'test_task_failed_parent_workflow.json', + 'test_task_failed_sub_workflow.json' + ) + } + + def "Test workflow with a task that failed"() { + given: "workflow input" + def workflowInput = new HashMap() + workflowInput['a'] = 1 + + when: "Start the workflow which has the failed task" + def testId = 'testId' + def workflowInstanceId = workflowExecutor.startWorkflow(WORKFLOW_WITH_TERMINATE_TASK_FAILED, 1, + testId, workflowInput, null, null, null) + + then: "Verify that the workflow has failed" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + tasks.size() == 2 + reasonForIncompletion.contains('Workflow is FAILED by TERMINATE task') + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'LAMBDA' + tasks[0].seq == 1 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'TERMINATE' + tasks[1].seq == 2 + output + def failedWorkflowId = output['conductor.failure_workflow'] as String + def workflowCorrelationId = correlationId + def workflowFailureTaskId = tasks[1].taskId + with(workflowExecutionService.getExecutionStatus(failedWorkflowId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + correlationId == workflowCorrelationId + input['workflowId'] == workflowInstanceId + input['failureTaskId'] == workflowFailureTaskId + tasks.size() == 1 + tasks[0].taskType == 'LAMBDA' + } + } + } + + def "Test workflow with a task failed in subworkflow"() { + given: "workflow input" + def workflowInput = new HashMap() + workflowInput['a'] = 1 + + when: "Start the workflow which has the subworkflow task" + def workflowInstanceId = workflowExecutor.startWorkflow(PARENT_WORKFLOW_WITH_FAILURE_TASK, 1, + '', workflowInput, null, null, null) + + then: "verify that the workflow has started and the tasks are as expected" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 2 + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'LAMBDA' + tasks[0].referenceTaskName == 'lambdaTask1' + tasks[0].seq == 1 + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'SUB_WORKFLOW' + tasks[1].seq == 2 + } + + when: "subworkflow is retrieved" + def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) + def subWorkflowTaskId = workflow.getTaskByRefName("test_task_failed_sub_wf").getTaskId() + asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) + workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) + def subWorkflowId = workflow.getTaskByRefName("test_task_failed_sub_wf").subWorkflowId + + then: "verify that the sub workflow has failed" + with(workflowExecutionService.getExecutionStatus(subWorkflowId, true)) { + status == Workflow.WorkflowStatus.FAILED + tasks.size() == 2 + reasonForIncompletion.contains('Workflow is FAILED by TERMINATE task') + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'LAMBDA' + tasks[0].seq == 1 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'TERMINATE' + tasks[1].seq == 2 + } + + then: "Verify that the workflow has failed and correct inputs passed into the failure workflow" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + tasks.size() == 2 + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'LAMBDA' + tasks[0].referenceTaskName == 'lambdaTask1' + tasks[0].seq == 1 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'SUB_WORKFLOW' + tasks[1].seq == 2 + def failedWorkflowId = output['conductor.failure_workflow'] as String + def workflowCorrelationId = correlationId + def workflowFailureTaskId = tasks[1].taskId + with(workflowExecutionService.getExecutionStatus(failedWorkflowId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + correlationId == workflowCorrelationId + input['workflowId'] == workflowInstanceId + input['failureTaskId'] == workflowFailureTaskId + tasks.size() == 1 + tasks[0].taskType == 'LAMBDA' + } + } + } +} diff --git a/test-harness/src/test/resources/test_task_failed_parent_workflow.json b/test-harness/src/test/resources/test_task_failed_parent_workflow.json new file mode 100644 index 0000000000..c1c32369e6 --- /dev/null +++ b/test-harness/src/test/resources/test_task_failed_parent_workflow.json @@ -0,0 +1,37 @@ +{ + "name": "test_task_failed_parent_wf", + "version": 1, + "tasks": [ + { + "name": "test_lambda_task1", + "taskReferenceName": "lambdaTask1", + "inputParameters": { + "lambdaValue": "${workflow.input.lambdaValue}", + "scriptExpression": "var i = 10; if ($.lambdaValue == 1){ return {testvalue: 'Lambda value was 1', iValue: i} } else { return {testvalue: 'Lambda value was NOT 1', iValue: i + 3} }" + }, + "type": "LAMBDA" + }, + { + "name": "test_task_failed_sub_wf", + "taskReferenceName": "test_task_failed_sub_wf", + "inputParameters": { + }, + "type": "SUB_WORKFLOW", + "subWorkflowParam": { + "name": "test_task_failed_sub_wf" + } + }, + { + "name": "test_lambda_task2", + "taskReferenceName": "lambdaTask2", + "inputParameters": { + "lambdaValue": "${workflow.input.lambdaValue}", + "scriptExpression": "var i = 10; if ($.lambdaValue == 1){ return {testvalue: 'Lambda value was 1', iValue: i} } else { return {testvalue: 'Lambda value was NOT 1', iValue: i + 3} }" + }, + "type": "LAMBDA" + } + ], + "schemaVersion": 2, + "ownerEmail": "test@harness.com", + "failureWorkflow": "failure_workflow" +} \ No newline at end of file diff --git a/test-harness/src/test/resources/test_task_failed_sub_workflow.json b/test-harness/src/test/resources/test_task_failed_sub_workflow.json new file mode 100644 index 0000000000..5f1e76e9e0 --- /dev/null +++ b/test-harness/src/test/resources/test_task_failed_sub_workflow.json @@ -0,0 +1,65 @@ +{ + "name": "test_task_failed_sub_wf", + "version": 1, + "tasks": [ + { + "name": "lambda", + "taskReferenceName": "lambda0", + "inputParameters": { + "input": "${workflow.input}", + "scriptExpression": "if ($.input.a==1){return {testvalue: true}} else{return {testvalue: false}}" + }, + "type": "LAMBDA", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "terminate", + "taskReferenceName": "terminate0", + "inputParameters": { + "terminationStatus": "FAILED", + "workflowOutput": "${lambda0.output}" + }, + "type": "TERMINATE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_task_2", + "taskReferenceName": "t2", + "inputParameters": {}, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + "inputParameters": [], + "outputParameters": {}, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file