From 79909b711e4946609898b7c566eb149881878cd2 Mon Sep 17 00:00:00 2001 From: Jiaofen Xu Date: Thu, 20 Jan 2022 20:37:39 -0800 Subject: [PATCH] Use workflow to find failed task --- .../core/execution/WorkflowExecutor.java | 47 ++++++++++++------- .../core/execution/TestWorkflowExecutor.java | 12 +++-- 2 files changed, 39 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index af041d9bf5..1efd71cede 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -15,6 +15,7 @@ import java.util.*; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -908,18 +909,6 @@ public void terminateWorkflow(String workflowId, String reason) { */ public WorkflowModel terminateWorkflow( WorkflowModel workflow, String reason, String failureWorkflow) { - return terminateWorkflow(workflow, null, reason, failureWorkflow); - } - - /** - * @param workflow the workflow to be terminated - * @param failedTask the failed task that caused the workflow termination, can be null - * @param reason the reason for termination - * @param failureWorkflow the failure workflow (if any) to be triggered as a result of this - * termination - */ - public Workflow terminateWorkflow( - Workflow workflow, Task failedTask, String reason, String failureWorkflow) { try { executionLockService.acquireLock(workflow.getWorkflowId(), 60000); @@ -990,6 +979,7 @@ public Workflow terminateWorkflow( input.put("workflowId", workflowId); input.put("reason", reason); input.put("failureStatus", workflow.getStatus().toString()); + Task failedTask = findLastFailedTask(workflow); if (failedTask != null) { input.put("failureTaskId", failedTask.getTaskId()); } @@ -1809,10 +1799,7 @@ private WorkflowModel terminate( executionDAOFacade.updateTask(terminateWorkflowException.getTask()); } return terminateWorkflow( - workflow, - terminateWorkflowException.getTask(), - terminateWorkflowException.getMessage(), - failureWorkflow); + workflow, terminateWorkflowException.getMessage(), failureWorkflow); } private boolean rerunWF( @@ -2008,4 +1995,32 @@ private void pushParentWorkflow(String parentWorkflowId) { queueDAO.push(Utils.DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0); } } + + private Task findLastFailedTask(Workflow workflow) { + Stream tasks = workflow.getTasks().stream().filter(UNSUCCESSFUL_TERMINAL_TASK); + return tasks.reduce((a, b) -> b).orElse(findFailedTerminateTask(workflow)); + } + + private Task findFailedTerminateTask(Workflow workflow) { + Optional terminateTask = + workflow.getTasks().stream() + .filter( + t -> + TERMINATE.name().equals(t.getTaskType()) + && t.getStatus().isTerminal() + && t.getStatus().isSuccessful()) + .findFirst(); + if (terminateTask.isPresent()) { + Task task = terminateTask.get(); + String terminationStatus = + (String) + task.getWorkflowTask() + .getInputParameters() + .get(Terminate.getTerminationStatusParameter()); + if (WorkflowStatus.FAILED.name().equals(terminationStatus)) { + return task; + } + } + return null; + } } diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index eda35038bb..5aac83b754 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -2037,11 +2037,16 @@ public void testTerminateWorkflowWithFailureWorkflow() { workflow.setEndTime(100L); workflow.setOutput(Collections.EMPTY_MAP); + Task successTask = new Task(); + successTask.setTaskId("taskid1"); + successTask.setReferenceTaskName("success"); + successTask.setStatus(Status.COMPLETED); + Task failedTask = new Task(); - failedTask.setTaskId("taskid"); + failedTask.setTaskId("taskid2"); failedTask.setReferenceTaskName("failed"); failedTask.setStatus(Status.FAILED); - workflow.getTasks().addAll(Arrays.asList(failedTask)); + workflow.getTasks().addAll(Arrays.asList(successTask, failedTask)); WorkflowDef failureWorkflowDef = new WorkflowDef(); failureWorkflowDef.setName("failure workflow"); @@ -2056,8 +2061,7 @@ public void testTerminateWorkflowWithFailureWorkflow() { when(executionLockService.acquireLock(anyString())).thenReturn(true); - workflowExecutor.terminateWorkflow( - workflow, failedTask, "reason", failureWorkflowDef.getName()); + workflowExecutor.terminateWorkflow(workflow, "reason", failureWorkflowDef.getName()); assertEquals(Workflow.WorkflowStatus.TERMINATED, workflow.getStatus()); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Workflow.class);