Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Use workflow to find failed task
Browse files Browse the repository at this point in the history
  • Loading branch information
jxu-nflx committed Feb 1, 2022
1 parent bc5d073 commit 79909b7
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -908,18 +909,6 @@ public void terminateWorkflow(String workflowId, String reason) {
*/
public WorkflowModel terminateWorkflow(
WorkflowModel workflow, String reason, String failureWorkflow) {
return terminateWorkflow(workflow, null, reason, failureWorkflow);
}

/**
* @param workflow the workflow to be terminated
* @param failedTask the failed task that caused the workflow termination, can be null
* @param reason the reason for termination
* @param failureWorkflow the failure workflow (if any) to be triggered as a result of this
* termination
*/
public Workflow terminateWorkflow(
Workflow workflow, Task failedTask, String reason, String failureWorkflow) {
try {
executionLockService.acquireLock(workflow.getWorkflowId(), 60000);

Expand Down Expand Up @@ -990,6 +979,7 @@ public Workflow terminateWorkflow(
input.put("workflowId", workflowId);
input.put("reason", reason);
input.put("failureStatus", workflow.getStatus().toString());
Task failedTask = findLastFailedTask(workflow);
if (failedTask != null) {
input.put("failureTaskId", failedTask.getTaskId());
}
Expand Down Expand Up @@ -1809,10 +1799,7 @@ private WorkflowModel terminate(
executionDAOFacade.updateTask(terminateWorkflowException.getTask());
}
return terminateWorkflow(
workflow,
terminateWorkflowException.getTask(),
terminateWorkflowException.getMessage(),
failureWorkflow);
workflow, terminateWorkflowException.getMessage(), failureWorkflow);
}

private boolean rerunWF(
Expand Down Expand Up @@ -2008,4 +1995,32 @@ private void pushParentWorkflow(String parentWorkflowId) {
queueDAO.push(Utils.DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0);
}
}

private Task findLastFailedTask(Workflow workflow) {
Stream<Task> tasks = workflow.getTasks().stream().filter(UNSUCCESSFUL_TERMINAL_TASK);
return tasks.reduce((a, b) -> b).orElse(findFailedTerminateTask(workflow));
}

private Task findFailedTerminateTask(Workflow workflow) {
Optional<Task> terminateTask =
workflow.getTasks().stream()
.filter(
t ->
TERMINATE.name().equals(t.getTaskType())
&& t.getStatus().isTerminal()
&& t.getStatus().isSuccessful())
.findFirst();
if (terminateTask.isPresent()) {
Task task = terminateTask.get();
String terminationStatus =
(String)
task.getWorkflowTask()
.getInputParameters()
.get(Terminate.getTerminationStatusParameter());
if (WorkflowStatus.FAILED.name().equals(terminationStatus)) {
return task;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2037,11 +2037,16 @@ public void testTerminateWorkflowWithFailureWorkflow() {
workflow.setEndTime(100L);
workflow.setOutput(Collections.EMPTY_MAP);

Task successTask = new Task();
successTask.setTaskId("taskid1");
successTask.setReferenceTaskName("success");
successTask.setStatus(Status.COMPLETED);

Task failedTask = new Task();
failedTask.setTaskId("taskid");
failedTask.setTaskId("taskid2");
failedTask.setReferenceTaskName("failed");
failedTask.setStatus(Status.FAILED);
workflow.getTasks().addAll(Arrays.asList(failedTask));
workflow.getTasks().addAll(Arrays.asList(successTask, failedTask));

WorkflowDef failureWorkflowDef = new WorkflowDef();
failureWorkflowDef.setName("failure workflow");
Expand All @@ -2056,8 +2061,7 @@ public void testTerminateWorkflowWithFailureWorkflow() {

when(executionLockService.acquireLock(anyString())).thenReturn(true);

workflowExecutor.terminateWorkflow(
workflow, failedTask, "reason", failureWorkflowDef.getName());
workflowExecutor.terminateWorkflow(workflow, "reason", failureWorkflowDef.getName());

assertEquals(Workflow.WorkflowStatus.TERMINATED, workflow.getStatus());
ArgumentCaptor<Workflow> argumentCaptor = ArgumentCaptor.forClass(Workflow.class);
Expand Down

0 comments on commit 79909b7

Please sign in to comment.