Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2872 from Netflix/taskmapper_refactoring
Browse files Browse the repository at this point in the history
TaskMapperContext refactoring.
  • Loading branch information
aravindanr committed Mar 30, 2022
2 parents a0e7314 + 0b6750f commit 91adde7
Show file tree
Hide file tree
Showing 44 changed files with 441 additions and 709 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public ExecutorService executorService(ConductorProperties conductorProperties)
}

@Bean
@Qualifier("taskProcessorsMap")
@Qualifier("taskMappersByTaskType")
public Map<TaskType, TaskMapper> getTaskMappers(List<TaskMapper> taskMappers) {
return taskMappers.stream().collect(Collectors.toMap(TaskMapper::getTaskType, identity()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public DeciderService(
MetadataDAO metadataDAO,
ExternalPayloadStorageUtils externalPayloadStorageUtils,
SystemTaskRegistry systemTaskRegistry,
@Qualifier("taskProcessorsMap") Map<TaskType, TaskMapper> taskMappers,
@Qualifier("taskMappersByTaskType") Map<TaskType, TaskMapper> taskMappers,
@Value("${conductor.app.taskPendingTimeThreshold:60m}")
Duration taskPendingTimeThreshold) {
this.metadataDAO = metadataDAO;
Expand Down Expand Up @@ -835,10 +835,9 @@ public List<TaskModel> getTasksToBeScheduled(
String taskId = IDGenerator.generate();
TaskMapperContext taskMapperContext =
TaskMapperContext.newBuilder()
.withWorkflowDefinition(workflow.getWorkflowDefinition())
.withWorkflowInstance(workflow)
.withWorkflowModel(workflow)
.withTaskDefinition(taskToSchedule.getTaskDefinition())
.withTaskToSchedule(taskToSchedule)
.withWorkflowTask(taskToSchedule)
.withTaskInput(input)
.withRetryCount(retryCount)
.withRetryTaskId(retriedTaskId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,39 +75,30 @@ public TaskType getTaskType() {
public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
LOGGER.debug("TaskMapperContext {} in DecisionTaskMapper", taskMapperContext);
List<TaskModel> tasksToBeScheduled = new LinkedList<>();
WorkflowTask taskToSchedule = taskMapperContext.getTaskToSchedule();
WorkflowModel workflowInstance = taskMapperContext.getWorkflowInstance();
WorkflowTask workflowTask = taskMapperContext.getWorkflowTask();
WorkflowModel workflowModel = taskMapperContext.getWorkflowModel();
Map<String, Object> taskInput = taskMapperContext.getTaskInput();
int retryCount = taskMapperContext.getRetryCount();
String taskId = taskMapperContext.getTaskId();

// get the expression to be evaluated
String caseValue = getEvaluatedCaseValue(taskToSchedule, taskInput);
String caseValue = getEvaluatedCaseValue(workflowTask, taskInput);

// QQ why is the case value and the caseValue passed and caseOutput passes as the same ??
TaskModel decisionTask = new TaskModel();
TaskModel decisionTask = taskMapperContext.createTaskModel();
decisionTask.setTaskType(TaskType.TASK_TYPE_DECISION);
decisionTask.setTaskDefName(TaskType.TASK_TYPE_DECISION);
decisionTask.setReferenceTaskName(taskToSchedule.getTaskReferenceName());
decisionTask.setWorkflowInstanceId(workflowInstance.getWorkflowId());
decisionTask.setWorkflowType(workflowInstance.getWorkflowName());
decisionTask.setCorrelationId(workflowInstance.getCorrelationId());
decisionTask.setScheduledTime(System.currentTimeMillis());
decisionTask.addInput("case", caseValue);
decisionTask.addOutput("caseOutput", Collections.singletonList(caseValue));
decisionTask.setTaskId(taskId);
decisionTask.setStartTime(System.currentTimeMillis());
decisionTask.setStatus(TaskModel.Status.IN_PROGRESS);
decisionTask.setWorkflowTask(taskToSchedule);
decisionTask.setWorkflowPriority(workflowInstance.getPriority());
tasksToBeScheduled.add(decisionTask);

// get the list of tasks based on the decision
List<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(caseValue);
List<WorkflowTask> selectedTasks = workflowTask.getDecisionCases().get(caseValue);
// if the tasks returned are empty based on evaluated case value, then get the default case
// if there is one
if (selectedTasks == null || selectedTasks.isEmpty()) {
selectedTasks = taskToSchedule.getDefaultCase();
selectedTasks = workflowTask.getDefaultCase();
}
// once there are selected tasks that need to proceeded as part of the decision, get the
// next task to be scheduled by using the decider service
Expand All @@ -120,7 +111,7 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
taskMapperContext
.getDeciderService()
.getTasksToBeScheduled(
workflowInstance,
workflowModel,
selectedTask,
retryCount,
taskMapperContext.getRetryTaskId());
Expand All @@ -134,14 +125,14 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
* This method evaluates the case expression of a decision task and returns a string
* representation of the evaluated result.
*
* @param taskToSchedule: The decision task that has the case expression to be evaluated.
* @param workflowTask: The decision task that has the case expression to be evaluated.
* @param taskInput: the input which has the values that will be used in evaluating the case
* expression.
* @return A String representation of the evaluated result
*/
@VisibleForTesting
String getEvaluatedCaseValue(WorkflowTask taskToSchedule, Map<String, Object> taskInput) {
String expression = taskToSchedule.getCaseExpression();
String getEvaluatedCaseValue(WorkflowTask workflowTask, Map<String, Object> taskInput) {
String expression = workflowTask.getCaseExpression();
String caseValue;
if (StringUtils.isNotBlank(expression)) {
LOGGER.debug("Case being evaluated using decision expression: {}", expression);
Expand All @@ -159,7 +150,7 @@ String getEvaluatedCaseValue(WorkflowTask taskToSchedule, Map<String, Object> ta
// representation of caseValue
LOGGER.debug(
"No Expression available on the decision task, case value being assigned as param name");
String paramName = taskToSchedule.getCaseValueParam();
String paramName = workflowTask.getCaseValueParam();
caseValue = "" + taskInput.get(paramName);
}
return caseValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,15 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {

LOGGER.debug("TaskMapperContext {} in DoWhileTaskMapper", taskMapperContext);

WorkflowTask taskToSchedule = taskMapperContext.getTaskToSchedule();
WorkflowModel workflowInstance = taskMapperContext.getWorkflowInstance();
WorkflowTask workflowTask = taskMapperContext.getWorkflowTask();
WorkflowModel workflowModel = taskMapperContext.getWorkflowModel();

TaskModel task = workflowInstance.getTaskByRefName(taskToSchedule.getTaskReferenceName());
TaskModel task = workflowModel.getTaskByRefName(workflowTask.getTaskReferenceName());
if (task != null && task.getStatus().isTerminal()) {
// Since loopTask is already completed no need to schedule task again.
return Collections.emptyList();
}

String taskId = taskMapperContext.getTaskId();
List<TaskModel> tasksToBeScheduled = new ArrayList<>();
int retryCount = taskMapperContext.getRetryCount();
TaskDef taskDefinition =
Expand All @@ -84,30 +83,22 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
() ->
Optional.ofNullable(
metadataDAO.getTaskDef(
taskToSchedule.getName()))
workflowTask.getName()))
.orElseGet(TaskDef::new));

TaskModel loopTask = new TaskModel();
TaskModel loopTask = taskMapperContext.createTaskModel();
loopTask.setTaskType(TaskType.TASK_TYPE_DO_WHILE);
loopTask.setTaskDefName(taskToSchedule.getName());
loopTask.setReferenceTaskName(taskToSchedule.getTaskReferenceName());
loopTask.setWorkflowInstanceId(workflowInstance.getWorkflowId());
loopTask.setCorrelationId(workflowInstance.getCorrelationId());
loopTask.setWorkflowType(workflowInstance.getWorkflowName());
loopTask.setScheduledTime(System.currentTimeMillis());
loopTask.setTaskId(taskId);
loopTask.setIteration(1);
loopTask.setStatus(TaskModel.Status.IN_PROGRESS);
loopTask.setWorkflowTask(taskToSchedule);
loopTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
loopTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds());

tasksToBeScheduled.add(loopTask);
List<WorkflowTask> loopOverTasks = taskToSchedule.getLoopOver();
List<WorkflowTask> loopOverTasks = workflowTask.getLoopOver();
List<TaskModel> tasks2 =
taskMapperContext
.getDeciderService()
.getTasksToBeScheduled(workflowInstance, loopOverTasks.get(0), retryCount);
.getTasksToBeScheduled(workflowModel, loopOverTasks.get(0), retryCount);
tasks2.forEach(
t -> {
t.setReferenceTaskName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,43 +70,38 @@ public TaskType getTaskType() {
public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
throws TerminateWorkflowException {
LOGGER.debug("TaskMapperContext {} in DynamicTaskMapper", taskMapperContext);
WorkflowTask taskToSchedule = taskMapperContext.getTaskToSchedule();
WorkflowTask workflowTask = taskMapperContext.getWorkflowTask();
Map<String, Object> taskInput = taskMapperContext.getTaskInput();
WorkflowModel workflowInstance = taskMapperContext.getWorkflowInstance();
WorkflowModel workflowModel = taskMapperContext.getWorkflowModel();
int retryCount = taskMapperContext.getRetryCount();
String retriedTaskId = taskMapperContext.getRetryTaskId();

String taskNameParam = taskToSchedule.getDynamicTaskNameParam();
String taskNameParam = workflowTask.getDynamicTaskNameParam();
String taskName = getDynamicTaskName(taskInput, taskNameParam);
taskToSchedule.setName(taskName);
TaskDef taskDefinition = getDynamicTaskDefinition(taskToSchedule);
taskToSchedule.setTaskDefinition(taskDefinition);
workflowTask.setName(taskName);
TaskDef taskDefinition = getDynamicTaskDefinition(workflowTask);
workflowTask.setTaskDefinition(taskDefinition);

Map<String, Object> input =
parametersUtils.getTaskInput(
taskToSchedule.getInputParameters(),
workflowInstance,
workflowTask.getInputParameters(),
workflowModel,
taskDefinition,
taskMapperContext.getTaskId());
TaskModel dynamicTask = new TaskModel();
dynamicTask.setStartDelayInSeconds(taskToSchedule.getStartDelay());
dynamicTask.setTaskId(taskMapperContext.getTaskId());
dynamicTask.setReferenceTaskName(taskToSchedule.getTaskReferenceName());

// IMPORTANT: The WorkflowTask that is inside TaskMapperContext is changed above
// createTaskModel() must be called here so the changes are reflected in the created
// TaskModel
TaskModel dynamicTask = taskMapperContext.createTaskModel();
dynamicTask.setStartDelayInSeconds(workflowTask.getStartDelay());
dynamicTask.setInputData(input);
dynamicTask.setWorkflowInstanceId(workflowInstance.getWorkflowId());
dynamicTask.setWorkflowType(workflowInstance.getWorkflowName());
dynamicTask.setStatus(TaskModel.Status.SCHEDULED);
dynamicTask.setTaskType(taskToSchedule.getType());
dynamicTask.setTaskDefName(taskToSchedule.getName());
dynamicTask.setCorrelationId(workflowInstance.getCorrelationId());
dynamicTask.setScheduledTime(System.currentTimeMillis());
dynamicTask.setRetryCount(retryCount);
dynamicTask.setCallbackAfterSeconds(taskToSchedule.getStartDelay());
dynamicTask.setCallbackAfterSeconds(workflowTask.getStartDelay());
dynamicTask.setResponseTimeoutSeconds(taskDefinition.getResponseTimeoutSeconds());
dynamicTask.setWorkflowTask(taskToSchedule);
dynamicTask.setTaskType(taskName);
dynamicTask.setRetriedTaskId(retriedTaskId);
dynamicTask.setWorkflowPriority(workflowInstance.getPriority());
dynamicTask.setWorkflowPriority(workflowModel.getPriority());
return Collections.singletonList(dynamicTask);
}

Expand Down Expand Up @@ -140,26 +135,25 @@ String getDynamicTaskName(Map<String, Object> taskInput, String taskNameParam)
/**
* This method gets the TaskDefinition for a specific {@link WorkflowTask}
*
* @param taskToSchedule: An instance of {@link WorkflowTask} which has the name of the using
* @param workflowTask: An instance of {@link WorkflowTask} which has the name of the using
* which the {@link TaskDef} can be retrieved.
* @return An instance of TaskDefinition
* @throws TerminateWorkflowException : in case of no workflow definition available
*/
@VisibleForTesting
TaskDef getDynamicTaskDefinition(WorkflowTask taskToSchedule)
TaskDef getDynamicTaskDefinition(WorkflowTask workflowTask)
throws TerminateWorkflowException { // TODO this is a common pattern in code base can
// be moved to DAO
return Optional.ofNullable(taskToSchedule.getTaskDefinition())
return Optional.ofNullable(workflowTask.getTaskDefinition())
.orElseGet(
() ->
Optional.ofNullable(
metadataDAO.getTaskDef(taskToSchedule.getName()))
Optional.ofNullable(metadataDAO.getTaskDef(workflowTask.getName()))
.orElseThrow(
() -> {
String reason =
String.format(
"Invalid task specified. Cannot find task by name %s in the task definitions",
taskToSchedule.getName());
workflowTask.getName());
return new TerminateWorkflowException(reason);
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package com.netflix.conductor.core.execution.mapper;

import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -51,34 +50,26 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {

LOGGER.debug("TaskMapperContext {} in EventTaskMapper", taskMapperContext);

WorkflowTask taskToSchedule = taskMapperContext.getTaskToSchedule();
WorkflowModel workflowInstance = taskMapperContext.getWorkflowInstance();
WorkflowTask workflowTask = taskMapperContext.getWorkflowTask();
WorkflowModel workflowModel = taskMapperContext.getWorkflowModel();
String taskId = taskMapperContext.getTaskId();

taskToSchedule.getInputParameters().put("sink", taskToSchedule.getSink());
taskToSchedule.getInputParameters().put("asyncComplete", taskToSchedule.isAsyncComplete());
workflowTask.getInputParameters().put("sink", workflowTask.getSink());
workflowTask.getInputParameters().put("asyncComplete", workflowTask.isAsyncComplete());
Map<String, Object> eventTaskInput =
parametersUtils.getTaskInputV2(
taskToSchedule.getInputParameters(), workflowInstance, taskId, null);
workflowTask.getInputParameters(), workflowModel, taskId, null);
String sink = (String) eventTaskInput.get("sink");
Boolean asynComplete = (Boolean) eventTaskInput.get("asyncComplete");

TaskModel eventTask = new TaskModel();
TaskModel eventTask = taskMapperContext.createTaskModel();
eventTask.setTaskType(TASK_TYPE_EVENT);
eventTask.setTaskDefName(taskToSchedule.getName());
eventTask.setReferenceTaskName(taskToSchedule.getTaskReferenceName());
eventTask.setWorkflowInstanceId(workflowInstance.getWorkflowId());
eventTask.setWorkflowType(workflowInstance.getWorkflowName());
eventTask.setCorrelationId(workflowInstance.getCorrelationId());
eventTask.setScheduledTime(System.currentTimeMillis());
eventTask.setStatus(TaskModel.Status.SCHEDULED);

eventTask.setInputData(eventTaskInput);
eventTask.getInputData().put("sink", sink);
eventTask.getInputData().put("asyncComplete", asynComplete);
eventTask.setTaskId(taskId);
eventTask.setStatus(TaskModel.Status.SCHEDULED);
eventTask.setWorkflowPriority(workflowInstance.getPriority());
eventTask.setWorkflowTask(taskToSchedule);

return Collections.singletonList(eventTask);
return List.of(eventTask);
}
}
Loading

0 comments on commit 91adde7

Please sign in to comment.