Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Fix for #1677, inspired from #2881. Loop-over tasks are created in th…
Browse files Browse the repository at this point in the history
…e DoWhile.execute method. The DO_WHILE TaskModel is persisted before the loop-over tasks are created. Therefore, the loop-over tasks will have access the iteration field in DO_WHILE TaskModel.
  • Loading branch information
aravindanr committed May 16, 2022
1 parent 0c972ef commit 9190e11
Show file tree
Hide file tree
Showing 10 changed files with 453 additions and 346 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ public class DeciderService {

private static final Logger LOGGER = LoggerFactory.getLogger(DeciderService.class);

@VisibleForTesting static final String MAX_TASK_LIMIT = "conductor.app.max-task-limit";

private final IDGenerator idGenerator;
private final ParametersUtils parametersUtils;
private final ExternalPayloadStorageUtils externalPayloadStorageUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1302,10 +1302,11 @@ public boolean decide(String workflowId) {
List<TaskModel> tasksToBeScheduled = outcome.tasksToBeScheduled;
setTaskDomains(tasksToBeScheduled, workflow);
List<TaskModel> tasksToBeUpdated = outcome.tasksToBeUpdated;
boolean stateChanged = false;

tasksToBeScheduled = dedupAndAddTasks(workflow, tasksToBeScheduled);

boolean stateChanged = scheduleTask(workflow, tasksToBeScheduled); // start

for (TaskModel task : outcome.tasksToBeScheduled) {
if (systemTaskRegistry.isSystemTask(task.getTaskType())
&& NON_TERMINAL_TASK.test(task)) {
Expand All @@ -1324,12 +1325,9 @@ public boolean decide(String workflowId) {
executionDAOFacade.updateWorkflow(workflow);
}

stateChanged = scheduleTask(workflow, tasksToBeScheduled) || stateChanged;

if (stateChanged) {
decide(workflowId);
}

} catch (TerminateWorkflowException twe) {
LOGGER.info("Execution terminated of workflow: {}", workflowId, twe);
terminate(workflow, twe);
Expand Down Expand Up @@ -1685,7 +1683,6 @@ private long getTaskDuration(long s, TaskModel task) {

@VisibleForTesting
boolean scheduleTask(WorkflowModel workflow, List<TaskModel> tasks) {
List<TaskModel> createdTasks;
List<TaskModel> tasksToBeQueued;
boolean startedSystemTasks = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
*/
package com.netflix.conductor.core.execution.mapper;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

Expand All @@ -26,7 +24,6 @@
import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.utils.TaskUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
Expand Down Expand Up @@ -63,7 +60,6 @@ public TaskType getTaskType() {
*/
@Override
public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {

LOGGER.debug("TaskMapperContext {} in DoWhileTaskMapper", taskMapperContext);

WorkflowTask workflowTask = taskMapperContext.getWorkflowTask();
Expand All @@ -72,11 +68,9 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
TaskModel task = workflowModel.getTaskByRefName(workflowTask.getTaskReferenceName());
if (task != null && task.getStatus().isTerminal()) {
// Since loopTask is already completed no need to schedule task again.
return Collections.emptyList();
return List.of();
}

List<TaskModel> tasksToBeScheduled = new ArrayList<>();
int retryCount = taskMapperContext.getRetryCount();
TaskDef taskDefinition =
Optional.ofNullable(taskMapperContext.getTaskDefinition())
.orElseGet(
Expand All @@ -86,28 +80,14 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
workflowTask.getName()))
.orElseGet(TaskDef::new));

TaskModel loopTask = taskMapperContext.createTaskModel();
loopTask.setTaskType(TaskType.TASK_TYPE_DO_WHILE);
loopTask.setIteration(1);
loopTask.setStatus(TaskModel.Status.IN_PROGRESS);
loopTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
loopTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds());

tasksToBeScheduled.add(loopTask);
List<WorkflowTask> loopOverTasks = workflowTask.getLoopOver();
List<TaskModel> tasks2 =
taskMapperContext
.getDeciderService()
.getTasksToBeScheduled(workflowModel, loopOverTasks.get(0), retryCount);
tasks2.forEach(
t -> {
t.setReferenceTaskName(
TaskUtils.appendIteration(
t.getReferenceTaskName(), loopTask.getIteration()));
t.setIteration(loopTask.getIteration());
});
tasksToBeScheduled.addAll(tasks2);
TaskModel doWhileTask = taskMapperContext.createTaskModel();
doWhileTask.setTaskType(TaskType.TASK_TYPE_DO_WHILE);
doWhileTask.setStatus(TaskModel.Status.IN_PROGRESS);
doWhileTask.setStartTime(System.currentTimeMillis());
doWhileTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
doWhileTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds());
doWhileTask.setRetryCount(taskMapperContext.getRetryCount());

return tasksToBeScheduled;
return List.of(doWhileTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,38 +52,49 @@ public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor exec

@Override
public boolean execute(
WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {
WorkflowModel workflow, TaskModel doWhileTaskModel, WorkflowExecutor workflowExecutor) {

boolean allDone = true;
boolean hasFailures = false;
StringBuilder failureReason = new StringBuilder();
Map<String, Object> output = new HashMap<>();
task.getOutputData().put("iteration", task.getIteration());

/*
* Get the latest set of tasks (the ones that have the highest retry count). We don't want to evaluate any tasks
* that have already failed if there is a more current one (a later retry count).
*/
Map<String, TaskModel> relevantTasks = new LinkedHashMap<>();
TaskModel relevantTask = null;
TaskModel relevantTask;
for (TaskModel t : workflow.getTasks()) {
if (task.getWorkflowTask()
if (doWhileTaskModel
.getWorkflowTask()
.has(TaskUtils.removeIterationFromTaskRefName(t.getReferenceTaskName()))
&& !task.getReferenceTaskName().equals(t.getReferenceTaskName())
&& task.getIteration() == t.getIteration()) {
&& !doWhileTaskModel.getReferenceTaskName().equals(t.getReferenceTaskName())
&& doWhileTaskModel.getIteration() == t.getIteration()) {
relevantTask = relevantTasks.get(t.getReferenceTaskName());
if (relevantTask == null || t.getRetryCount() > relevantTask.getRetryCount()) {
relevantTasks.put(t.getReferenceTaskName(), t);
}
}
}
Collection<TaskModel> loopOver = relevantTasks.values();
Collection<TaskModel> loopOverTasks = relevantTasks.values();
LOGGER.debug(
"Workflow {} waiting for tasks {} to complete iteration {}",
workflow.getWorkflowId(),
loopOver.stream().map(TaskModel::getReferenceTaskName).collect(Collectors.toList()),
task.getIteration());
for (TaskModel loopOverTask : loopOver) {
loopOverTasks.stream()
.map(TaskModel::getReferenceTaskName)
.collect(Collectors.toList()),
doWhileTaskModel.getIteration());

// if the loopOver collection is empty, no tasks inside the loop have been scheduled.
// so schedule it and exit the method.
if (loopOverTasks.isEmpty()) {
doWhileTaskModel.setIteration(1);
doWhileTaskModel.addOutput("iteration", doWhileTaskModel.getIteration());
return scheduleNextIteration(doWhileTaskModel, workflow, workflowExecutor);
}

for (TaskModel loopOverTask : loopOverTasks) {
TaskModel.Status taskStatus = loopOverTask.getStatus();
hasFailures = !taskStatus.isSuccessful();
if (hasFailures) {
Expand All @@ -97,43 +108,53 @@ public boolean execute(
break;
}
}
task.getOutputData().put(String.valueOf(task.getIteration()), output);
doWhileTaskModel
.getOutputData()
.put(String.valueOf(doWhileTaskModel.getIteration()), output);
if (hasFailures) {
LOGGER.debug(
"taskid {} failed in {} iteration", task.getTaskId(), task.getIteration() + 1);
return updateLoopTask(task, TaskModel.Status.FAILED, failureReason.toString());
"Task {} failed in {} iteration",
doWhileTaskModel.getTaskId(),
doWhileTaskModel.getIteration() + 1);
return updateLoopTask(
doWhileTaskModel, TaskModel.Status.FAILED, failureReason.toString());
} else if (!allDone) {
return false;
}
boolean shouldContinue;
try {
shouldContinue = getEvaluatedCondition(workflow, task, workflowExecutor);
LOGGER.debug("taskid {} condition evaluated to {}", task.getTaskId(), shouldContinue);
shouldContinue = getEvaluatedCondition(workflow, doWhileTaskModel, workflowExecutor);
LOGGER.debug(
"Task {} condition evaluated to {}",
doWhileTaskModel.getTaskId(),
shouldContinue);
if (shouldContinue) {
task.setIteration(task.getIteration() + 1);
return scheduleNextIteration(task, workflow, workflowExecutor);
doWhileTaskModel.setIteration(doWhileTaskModel.getIteration() + 1);
doWhileTaskModel.getOutputData().put("iteration", doWhileTaskModel.getIteration());
return scheduleNextIteration(doWhileTaskModel, workflow, workflowExecutor);
} else {
LOGGER.debug(
"taskid {} took {} iterations to complete",
task.getTaskId(),
task.getIteration() + 1);
return markLoopTaskSuccess(task);
"Task {} took {} iterations to complete",
doWhileTaskModel.getTaskId(),
doWhileTaskModel.getIteration() + 1);
return markLoopTaskSuccess(doWhileTaskModel);
}
} catch (ScriptException e) {
String message =
String.format(
"Unable to evaluate condition %s , exception %s",
task.getWorkflowTask().getLoopCondition(), e.getMessage());
doWhileTaskModel.getWorkflowTask().getLoopCondition(), e.getMessage());
LOGGER.error(message);
LOGGER.error("Marking task {} failed with error.", task.getTaskId());
return updateLoopTask(task, TaskModel.Status.FAILED_WITH_TERMINAL_ERROR, message);
LOGGER.error("Marking task {} failed with error.", doWhileTaskModel.getTaskId());
return updateLoopTask(
doWhileTaskModel, TaskModel.Status.FAILED_WITH_TERMINAL_ERROR, message);
}
}

boolean scheduleNextIteration(
TaskModel task, WorkflowModel workflow, WorkflowExecutor workflowExecutor) {
LOGGER.debug(
"Scheduling loop tasks for taskid {} as condition {} evaluated to true",
"Scheduling loop tasks for task {} as condition {} evaluated to true",
task.getTaskId(),
task.getWorkflowTask().getLoopCondition());
workflowExecutor.scheduleNextIteration(task, workflow);
Expand All @@ -149,7 +170,7 @@ boolean updateLoopTask(TaskModel task, TaskModel.Status status, String failureRe

boolean markLoopTaskSuccess(TaskModel task) {
LOGGER.debug(
"taskid {} took {} iterations to complete",
"task {} took {} iterations to complete",
task.getTaskId(),
task.getIteration() + 1);
task.setStatus(TaskModel.Status.COMPLETED);
Expand Down Expand Up @@ -197,7 +218,7 @@ boolean getEvaluatedCondition(
boolean shouldContinue = false;
if (condition != null) {
LOGGER.debug("Condition: {} is being evaluated", condition);
// Evaluate the expression by using the Nashhorn based script evaluator
// Evaluate the expression by using the Nashorn based script evaluator
shouldContinue = ScriptEvaluator.evalBool(condition, taskInput);
}
return shouldContinue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public Map<String, Object> getTaskInputV2(
"reasonForIncompletion", task.getReasonForIncompletion());
taskParams.put("callbackAfterSeconds", task.getCallbackAfterSeconds());
taskParams.put("workerId", task.getWorkerId());
taskParams.put("iteration", task.getIteration());
inputMap.put(
task.isLoopOverTask()
? TaskUtils.removeIterationFromTaskRefName(
Expand Down
Loading

0 comments on commit 9190e11

Please sign in to comment.