Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Add failedTaskId to WorkflowModel
Browse files Browse the repository at this point in the history
  • Loading branch information
jxu-nflx committed Feb 1, 2022
1 parent 25590ad commit c57ebd8
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -569,6 +568,7 @@ public void restart(String workflowId, boolean useLatestDefinitions) {

workflow.getTasks().clear();
workflow.setReasonForIncompletion(null);
workflow.setFailedTaskId(null);
workflow.setCreateTime(System.currentTimeMillis());
workflow.setEndTime(0);
workflow.setLastRetriedTime(0);
Expand Down Expand Up @@ -854,6 +854,7 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) {
workflow.setTasks(workflow.getTasks());
workflow.setOutput(workflow.getOutput());
workflow.setReasonForIncompletion(workflow.getReasonForIncompletion());
workflow.setFailedTaskId(workflow.getFailedTaskId());
workflow.setExternalOutputPayloadStoragePath(
workflow.getExternalOutputPayloadStoragePath());

Expand Down Expand Up @@ -979,9 +980,8 @@ public WorkflowModel terminateWorkflow(
input.put("workflowId", workflowId);
input.put("reason", reason);
input.put("failureStatus", workflow.getStatus().toString());
TaskModel failedTask = findLastFailedTask(workflow);
if (failedTask != null) {
input.put("failureTaskId", failedTask.getTaskId());
if (workflow.getFailedTaskId() != null) {
input.put("failureTaskId", workflow.getFailedTaskId());
}

try {
Expand Down Expand Up @@ -1787,6 +1787,10 @@ private WorkflowModel terminate(
workflow.setStatus(terminateWorkflowException.getWorkflowStatus());
}

if (terminateWorkflowException.getTask() != null && workflow.getFailedTaskId() == null) {
workflow.setFailedTaskId(terminateWorkflowException.getTask().getTaskId());
}

String failureWorkflow = workflow.getWorkflowDefinition().getFailureWorkflow();
if (failureWorkflow != null) {
if (failureWorkflow.startsWith("$")) {
Expand Down Expand Up @@ -1821,6 +1825,7 @@ private boolean rerunWF(
workflow.setStatus(WorkflowModel.Status.RUNNING);
// Reset failure reason from previous run to default
workflow.setReasonForIncompletion(null);
workflow.setFailedTaskId(null);
workflow.setFailedReferenceTaskNames(new HashSet<>());
if (correlationId != null) {
workflow.setCorrelationId(correlationId);
Expand Down Expand Up @@ -1867,6 +1872,7 @@ private boolean rerunWF(
workflow.setStatus(WorkflowModel.Status.RUNNING);
// Reset failure reason from previous run to default
workflow.setReasonForIncompletion(null);
workflow.setFailedTaskId(null);
workflow.setFailedReferenceTaskNames(new HashSet<>());
if (correlationId != null) {
workflow.setCorrelationId(correlationId);
Expand Down Expand Up @@ -1995,32 +2001,4 @@ private void pushParentWorkflow(String parentWorkflowId) {
queueDAO.push(Utils.DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0);
}
}

private TaskModel findLastFailedTask(WorkflowModel workflow) {
Stream<TaskModel> tasks = workflow.getTasks().stream().filter(UNSUCCESSFUL_TERMINAL_TASK);
return tasks.reduce((a, b) -> b).orElse(findFailedTerminateTask(workflow));
}

private TaskModel findFailedTerminateTask(WorkflowModel workflow) {
Optional<TaskModel> terminateTask =
workflow.getTasks().stream()
.filter(
t ->
TaskType.TERMINATE.name().equals(t.getTaskType())
&& t.getStatus().isTerminal()
&& t.getStatus().isSuccessful())
.findFirst();
if (terminateTask.isPresent()) {
TaskModel task = terminateTask.get();
String terminationStatus =
(String)
task.getWorkflowTask()
.getInputParameters()
.get(Terminate.getTerminationStatusParameter());
if (WorkflowModel.Status.FAILED.name().equals(terminationStatus)) {
return task;
}
}
return null;
}
}
11 changes: 11 additions & 0 deletions core/src/main/java/com/netflix/conductor/model/WorkflowModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ public boolean isSuccessful() {

private String updatedBy;

// Capture the failed taskId if the workflow execution failed because of task failure
private String failedTaskId;

public Status getStatus() {
return status;
}
Expand Down Expand Up @@ -307,6 +310,14 @@ public void setUpdatedBy(String updatedBy) {
this.updatedBy = updatedBy;
}

public String getFailedTaskId() {
return failedTaskId;
}

public void setFailedTaskId(String failedTaskId) {
this.failedTaskId = failedTaskId;
}

/**
* Convenience method for accessing the workflow definition name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,10 @@ public void testResumeWorkflow() {
@Test
@SuppressWarnings("unchecked")
public void testTerminateWorkflowWithFailureWorkflow() {
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("workflow");
workflowDef.setFailureWorkflow("failure_workflow");

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId("1");
workflow.setCorrelationId("testid");
Expand All @@ -2035,6 +2039,7 @@ public void testTerminateWorkflowWithFailureWorkflow() {
workflow.setOwnerApp("junit_test");
workflow.setEndTime(100L);
workflow.setOutput(Collections.EMPTY_MAP);
workflow.setWorkflowDefinition(workflowDef);

TaskModel successTask = new TaskModel();
successTask.setTaskId("taskid1");
Expand All @@ -2048,21 +2053,17 @@ public void testTerminateWorkflowWithFailureWorkflow() {
workflow.getTasks().addAll(Arrays.asList(successTask, failedTask));

WorkflowDef failureWorkflowDef = new WorkflowDef();
failureWorkflowDef.setName("failure workflow");
failureWorkflowDef.setName("failure_workflow");
when(metadataDAO.getLatestWorkflowDef(failureWorkflowDef.getName()))
.thenReturn(Optional.of(failureWorkflowDef));

WorkflowModel failureWorkflow = new WorkflowModel();
failureWorkflow.setWorkflowId("2");
failureWorkflow.setWorkflowDefinition(failureWorkflowDef);
when(executionDAOFacade.getWorkflowModel(anyString(), anyBoolean()))
.thenReturn(failureWorkflow);

when(executionDAOFacade.getWorkflowModel(workflow.getWorkflowId(), true))
.thenReturn(workflow);
when(executionLockService.acquireLock(anyString())).thenReturn(true);

workflowExecutor.terminateWorkflow(workflow, "reason", failureWorkflowDef.getName());
workflowExecutor.decide(workflow.getWorkflowId());

assertEquals(WorkflowModel.Status.TERMINATED, workflow.getStatus());
assertEquals(WorkflowModel.Status.FAILED, workflow.getStatus());
ArgumentCaptor<WorkflowModel> argumentCaptor = ArgumentCaptor.forClass(WorkflowModel.class);
verify(executionDAOFacade, times(1)).createWorkflow(argumentCaptor.capture());
assertEquals(
Expand Down

0 comments on commit c57ebd8

Please sign in to comment.