Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
added TaskMapperContext#createTaskModel
Browse files Browse the repository at this point in the history
  • Loading branch information
aravindanr committed Mar 25, 2022
1 parent 4ed4e1c commit 4f56879
Show file tree
Hide file tree
Showing 21 changed files with 66 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,27 +79,18 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
WorkflowModel workflowModel = taskMapperContext.getWorkflowModel();
Map<String, Object> taskInput = taskMapperContext.getTaskInput();
int retryCount = taskMapperContext.getRetryCount();
String taskId = taskMapperContext.getTaskId();

// get the expression to be evaluated
String caseValue = getEvaluatedCaseValue(workflowTask, taskInput);

// QQ why is the case value and the caseValue passed and caseOutput passes as the same ??
TaskModel decisionTask = new TaskModel();
TaskModel decisionTask = taskMapperContext.createTaskModel();
decisionTask.setTaskType(TaskType.TASK_TYPE_DECISION);
decisionTask.setTaskDefName(TaskType.TASK_TYPE_DECISION);
decisionTask.setReferenceTaskName(workflowTask.getTaskReferenceName());
decisionTask.setWorkflowInstanceId(workflowModel.getWorkflowId());
decisionTask.setWorkflowType(workflowModel.getWorkflowName());
decisionTask.setCorrelationId(workflowModel.getCorrelationId());
decisionTask.setScheduledTime(System.currentTimeMillis());
decisionTask.addInput("case", caseValue);
decisionTask.addOutput("caseOutput", Collections.singletonList(caseValue));
decisionTask.setTaskId(taskId);
decisionTask.setStartTime(System.currentTimeMillis());
decisionTask.setStatus(TaskModel.Status.IN_PROGRESS);
decisionTask.setWorkflowTask(workflowTask);
decisionTask.setWorkflowPriority(workflowModel.getPriority());
tasksToBeScheduled.add(decisionTask);

// get the list of tasks based on the decision
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
return Collections.emptyList();
}

String taskId = taskMapperContext.getTaskId();
List<TaskModel> tasksToBeScheduled = new ArrayList<>();
int retryCount = taskMapperContext.getRetryCount();
TaskDef taskDefinition =
Expand All @@ -87,18 +86,10 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
workflowTask.getName()))
.orElseGet(TaskDef::new));

TaskModel loopTask = new TaskModel();
TaskModel loopTask = taskMapperContext.createTaskModel();
loopTask.setTaskType(TaskType.TASK_TYPE_DO_WHILE);
loopTask.setTaskDefName(workflowTask.getName());
loopTask.setReferenceTaskName(workflowTask.getTaskReferenceName());
loopTask.setWorkflowInstanceId(workflowModel.getWorkflowId());
loopTask.setCorrelationId(workflowModel.getCorrelationId());
loopTask.setWorkflowType(workflowModel.getWorkflowName());
loopTask.setScheduledTime(System.currentTimeMillis());
loopTask.setTaskId(taskId);
loopTask.setIteration(1);
loopTask.setStatus(TaskModel.Status.IN_PROGRESS);
loopTask.setWorkflowTask(workflowTask);
loopTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
loopTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,18 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
workflowModel,
taskDefinition,
taskMapperContext.getTaskId());
TaskModel dynamicTask = new TaskModel();

// IMPORTANT: The WorkflowTask that is inside TaskMapperContext is changed above
// createTaskModel() must be called here so the changes are reflected in the created
// TaskModel
TaskModel dynamicTask = taskMapperContext.createTaskModel();
dynamicTask.setStartDelayInSeconds(workflowTask.getStartDelay());
dynamicTask.setTaskId(taskMapperContext.getTaskId());
dynamicTask.setReferenceTaskName(workflowTask.getTaskReferenceName());
dynamicTask.setInputData(input);
dynamicTask.setWorkflowInstanceId(workflowModel.getWorkflowId());
dynamicTask.setWorkflowType(workflowModel.getWorkflowName());
dynamicTask.setStatus(TaskModel.Status.SCHEDULED);
dynamicTask.setTaskType(workflowTask.getType());
dynamicTask.setTaskDefName(workflowTask.getName());
dynamicTask.setCorrelationId(workflowModel.getCorrelationId());
dynamicTask.setScheduledTime(System.currentTimeMillis());
dynamicTask.setRetryCount(retryCount);
dynamicTask.setCallbackAfterSeconds(workflowTask.getStartDelay());
dynamicTask.setResponseTimeoutSeconds(taskDefinition.getResponseTimeoutSeconds());
dynamicTask.setWorkflowTask(workflowTask);
dynamicTask.setTaskType(taskName);
dynamicTask.setRetriedTaskId(retriedTaskId);
dynamicTask.setWorkflowPriority(workflowModel.getPriority());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package com.netflix.conductor.core.execution.mapper;

import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -63,22 +62,14 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
String sink = (String) eventTaskInput.get("sink");
Boolean asynComplete = (Boolean) eventTaskInput.get("asyncComplete");

TaskModel eventTask = new TaskModel();
TaskModel eventTask = taskMapperContext.createTaskModel();
eventTask.setTaskType(TASK_TYPE_EVENT);
eventTask.setTaskDefName(workflowTask.getName());
eventTask.setReferenceTaskName(workflowTask.getTaskReferenceName());
eventTask.setWorkflowInstanceId(workflowModel.getWorkflowId());
eventTask.setWorkflowType(workflowModel.getWorkflowName());
eventTask.setCorrelationId(workflowModel.getCorrelationId());
eventTask.setScheduledTime(System.currentTimeMillis());
eventTask.setStatus(TaskModel.Status.SCHEDULED);

eventTask.setInputData(eventTaskInput);
eventTask.getInputData().put("sink", sink);
eventTask.getInputData().put("asyncComplete", asynComplete);
eventTask.setTaskId(taskId);
eventTask.setStatus(TaskModel.Status.SCHEDULED);
eventTask.setWorkflowPriority(workflowModel.getPriority());
eventTask.setWorkflowTask(workflowTask);

return Collections.singletonList(eventTask);
return List.of(eventTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package com.netflix.conductor.core.execution.mapper;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -24,7 +23,6 @@
import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

@Component
public class ExclusiveJoinTaskMapper implements TaskMapper {
Expand All @@ -42,8 +40,6 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
LOGGER.debug("TaskMapperContext {} in ExclusiveJoinTaskMapper", taskMapperContext);

WorkflowTask workflowTask = taskMapperContext.getWorkflowTask();
WorkflowModel workflowModel = taskMapperContext.getWorkflowModel();
String taskId = taskMapperContext.getTaskId();

Map<String, Object> joinInput = new HashMap<>();
joinInput.put("joinOn", workflowTask.getJoinOn());
Expand All @@ -52,21 +48,13 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
joinInput.put("defaultExclusiveJoinTask", workflowTask.getDefaultExclusiveJoinTask());
}

TaskModel joinTask = new TaskModel();
TaskModel joinTask = taskMapperContext.createTaskModel();
joinTask.setTaskType(TaskType.TASK_TYPE_EXCLUSIVE_JOIN);
joinTask.setTaskDefName(TaskType.TASK_TYPE_EXCLUSIVE_JOIN);
joinTask.setReferenceTaskName(workflowTask.getTaskReferenceName());
joinTask.setWorkflowInstanceId(workflowModel.getWorkflowId());
joinTask.setCorrelationId(workflowModel.getCorrelationId());
joinTask.setWorkflowType(workflowModel.getWorkflowName());
joinTask.setScheduledTime(System.currentTimeMillis());
joinTask.setStartTime(System.currentTimeMillis());
joinTask.setInputData(joinInput);
joinTask.setTaskId(taskId);
joinTask.setStatus(TaskModel.Status.IN_PROGRESS);
joinTask.setWorkflowPriority(workflowModel.getPriority());
joinTask.setWorkflowTask(workflowTask);

return Collections.singletonList(joinTask);
return List.of(joinTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
Map<String, Map<String, Object>> tasksInput = workflowTasksAndInputPair.getRight();

// Create Fork Task which needs to be followed by the dynamic tasks
TaskModel forkDynamicTask =
createDynamicForkTask(workflowTask, workflowModel, taskId, dynForkTasks);
TaskModel forkDynamicTask = createDynamicForkTask(taskMapperContext, dynForkTasks);

mappedTasks.add(forkDynamicTask);

Expand Down Expand Up @@ -212,7 +211,7 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)

if (joinWorkflowTask == null || !joinWorkflowTask.getType().equals(TaskType.JOIN.name())) {
throw new TerminateWorkflowException(
"Dynamic join definition is not followed by a join task. Check the blueprint");
"Dynamic join definition is not followed by a join task. Check the workflow definition.");
}

// Create Join task
Expand All @@ -239,17 +238,10 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
*/
@VisibleForTesting
TaskModel createDynamicForkTask(
WorkflowTask workflowTask,
WorkflowModel workflowModel,
String taskId,
List<WorkflowTask> dynForkTasks) {
TaskModel forkDynamicTask = new TaskModel();
TaskMapperContext taskMapperContext, List<WorkflowTask> dynForkTasks) {
TaskModel forkDynamicTask = taskMapperContext.createTaskModel();
forkDynamicTask.setTaskType(TaskType.TASK_TYPE_FORK);
forkDynamicTask.setTaskDefName(TaskType.TASK_TYPE_FORK);
forkDynamicTask.setReferenceTaskName(workflowTask.getTaskReferenceName());
forkDynamicTask.setWorkflowInstanceId(workflowModel.getWorkflowId());
forkDynamicTask.setCorrelationId(workflowModel.getCorrelationId());
forkDynamicTask.setScheduledTime(System.currentTimeMillis());
forkDynamicTask.setEndTime(System.currentTimeMillis());
List<String> forkedTaskNames =
dynForkTasks.stream()
Expand All @@ -261,10 +253,7 @@ TaskModel createDynamicForkTask(
.put(
"forkedTaskDefs",
dynForkTasks); // TODO: Remove this parameter in the later releases
forkDynamicTask.setTaskId(taskId);
forkDynamicTask.setStatus(TaskModel.Status.COMPLETED);
forkDynamicTask.setWorkflowTask(workflowTask);
forkDynamicTask.setWorkflowPriority(workflowModel.getPriority());
return forkDynamicTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,13 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
WorkflowModel workflowModel = taskMapperContext.getWorkflowModel();
int retryCount = taskMapperContext.getRetryCount();

String taskId = taskMapperContext.getTaskId();

List<TaskModel> tasksToBeScheduled = new LinkedList<>();
TaskModel forkTask = new TaskModel();
TaskModel forkTask = taskMapperContext.createTaskModel();
forkTask.setTaskType(TaskType.TASK_TYPE_FORK);
forkTask.setTaskDefName(TaskType.TASK_TYPE_FORK);
forkTask.setReferenceTaskName(workflowTask.getTaskReferenceName());
forkTask.setWorkflowInstanceId(workflowModel.getWorkflowId());
forkTask.setWorkflowType(workflowModel.getWorkflowName());
forkTask.setCorrelationId(workflowModel.getCorrelationId());
forkTask.setScheduledTime(System.currentTimeMillis());
forkTask.setStartTime(System.currentTimeMillis());
forkTask.setInputData(taskInput);
forkTask.setTaskId(taskId);
forkTask.setStatus(TaskModel.Status.COMPLETED);
forkTask.setWorkflowPriority(workflowModel.getPriority());
forkTask.setWorkflowTask(workflowTask);

tasksToBeScheduled.add(forkTask);
List<List<WorkflowTask>> forkTasks = workflowTask.getForkTasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,29 +83,19 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
workflowTask.getInputParameters(), workflowModel, taskId, taskDefinition);
Boolean asynComplete = (Boolean) input.get("asyncComplete");

TaskModel httpTask = new TaskModel();
httpTask.setTaskType(workflowTask.getType());
httpTask.setTaskDefName(workflowTask.getName());
httpTask.setReferenceTaskName(workflowTask.getTaskReferenceName());
httpTask.setWorkflowInstanceId(workflowModel.getWorkflowId());
httpTask.setWorkflowType(workflowModel.getWorkflowName());
httpTask.setCorrelationId(workflowModel.getCorrelationId());
httpTask.setScheduledTime(System.currentTimeMillis());
httpTask.setTaskId(taskId);
TaskModel httpTask = taskMapperContext.createTaskModel();
httpTask.setInputData(input);
httpTask.getInputData().put("asyncComplete", asynComplete);
httpTask.setStatus(TaskModel.Status.SCHEDULED);
httpTask.setRetryCount(retryCount);
httpTask.setCallbackAfterSeconds(workflowTask.getStartDelay());
httpTask.setWorkflowTask(workflowTask);
httpTask.setWorkflowPriority(workflowModel.getPriority());
if (Objects.nonNull(taskDefinition)) {
httpTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
httpTask.setRateLimitFrequencyInSeconds(
taskDefinition.getRateLimitFrequencyInSeconds());
httpTask.setIsolationGroupId(taskDefinition.getIsolationGroupId());
httpTask.setExecutionNameSpace(taskDefinition.getExecutionNameSpace());
}
return Collections.singletonList(httpTask);
return List.of(httpTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package com.netflix.conductor.core.execution.mapper;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -72,21 +71,12 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
taskId,
taskDefinition);

TaskModel inlineTask = new TaskModel();
TaskModel inlineTask = taskMapperContext.createTaskModel();
inlineTask.setTaskType(TaskType.TASK_TYPE_INLINE);
inlineTask.setTaskDefName(taskMapperContext.getWorkflowTask().getName());
inlineTask.setReferenceTaskName(taskMapperContext.getWorkflowTask().getTaskReferenceName());
inlineTask.setWorkflowInstanceId(workflowModel.getWorkflowId());
inlineTask.setWorkflowType(workflowModel.getWorkflowName());
inlineTask.setCorrelationId(workflowModel.getCorrelationId());
inlineTask.setStartTime(System.currentTimeMillis());
inlineTask.setScheduledTime(System.currentTimeMillis());
inlineTask.setInputData(taskInput);
inlineTask.setTaskId(taskId);
inlineTask.setStatus(TaskModel.Status.IN_PROGRESS);
inlineTask.setWorkflowTask(workflowTask);
inlineTask.setWorkflowPriority(workflowModel.getPriority());

return Collections.singletonList(inlineTask);
return List.of(inlineTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package com.netflix.conductor.core.execution.mapper;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -62,21 +61,13 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
Map<String, Object> joinInput = new HashMap<>();
joinInput.put("joinOn", workflowTask.getJoinOn());

TaskModel joinTask = new TaskModel();
TaskModel joinTask = taskMapperContext.createTaskModel();
joinTask.setTaskType(TaskType.TASK_TYPE_JOIN);
joinTask.setTaskDefName(TaskType.TASK_TYPE_JOIN);
joinTask.setReferenceTaskName(workflowTask.getTaskReferenceName());
joinTask.setWorkflowInstanceId(workflowModel.getWorkflowId());
joinTask.setCorrelationId(workflowModel.getCorrelationId());
joinTask.setWorkflowType(workflowModel.getWorkflowName());
joinTask.setScheduledTime(System.currentTimeMillis());
joinTask.setStartTime(System.currentTimeMillis());
joinTask.setInputData(joinInput);
joinTask.setTaskId(taskId);
joinTask.setStatus(TaskModel.Status.IN_PROGRESS);
joinTask.setWorkflowTask(workflowTask);
joinTask.setWorkflowPriority(workflowModel.getPriority());

return Collections.singletonList(joinTask);
return List.of(joinTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package com.netflix.conductor.core.execution.mapper;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -63,21 +62,11 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
parametersUtils.getTaskInputV2(
workflowTask.getInputParameters(), workflowModel, taskId, taskDefinition);

TaskModel jsonJQTransformTask = new TaskModel();
jsonJQTransformTask.setTaskType(workflowTask.getType());
jsonJQTransformTask.setTaskDefName(workflowTask.getName());
jsonJQTransformTask.setReferenceTaskName(workflowTask.getTaskReferenceName());
jsonJQTransformTask.setWorkflowInstanceId(workflowModel.getWorkflowId());
jsonJQTransformTask.setWorkflowType(workflowModel.getWorkflowName());
jsonJQTransformTask.setCorrelationId(workflowModel.getCorrelationId());
TaskModel jsonJQTransformTask = taskMapperContext.createTaskModel();
jsonJQTransformTask.setStartTime(System.currentTimeMillis());
jsonJQTransformTask.setScheduledTime(System.currentTimeMillis());
jsonJQTransformTask.setInputData(taskInput);
jsonJQTransformTask.setTaskId(taskId);
jsonJQTransformTask.setStatus(TaskModel.Status.IN_PROGRESS);
jsonJQTransformTask.setWorkflowTask(workflowTask);
jsonJQTransformTask.setWorkflowPriority(workflowModel.getPriority());

return Collections.singletonList(jsonJQTransformTask);
return List.of(jsonJQTransformTask);
}
}
Loading

0 comments on commit 4f56879

Please sign in to comment.