diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/DecisionTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/DecisionTaskMapper.java index d627ff8ae3..b2fd2fed1a 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/DecisionTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/DecisionTaskMapper.java @@ -79,27 +79,18 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { WorkflowModel workflowModel = taskMapperContext.getWorkflowModel(); Map taskInput = taskMapperContext.getTaskInput(); int retryCount = taskMapperContext.getRetryCount(); - String taskId = taskMapperContext.getTaskId(); // get the expression to be evaluated String caseValue = getEvaluatedCaseValue(workflowTask, taskInput); // QQ why is the case value and the caseValue passed and caseOutput passes as the same ?? - TaskModel decisionTask = new TaskModel(); + TaskModel decisionTask = taskMapperContext.createTaskModel(); decisionTask.setTaskType(TaskType.TASK_TYPE_DECISION); decisionTask.setTaskDefName(TaskType.TASK_TYPE_DECISION); - decisionTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - decisionTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - decisionTask.setWorkflowType(workflowModel.getWorkflowName()); - decisionTask.setCorrelationId(workflowModel.getCorrelationId()); - decisionTask.setScheduledTime(System.currentTimeMillis()); decisionTask.addInput("case", caseValue); decisionTask.addOutput("caseOutput", Collections.singletonList(caseValue)); - decisionTask.setTaskId(taskId); decisionTask.setStartTime(System.currentTimeMillis()); decisionTask.setStatus(TaskModel.Status.IN_PROGRESS); - decisionTask.setWorkflowTask(workflowTask); - decisionTask.setWorkflowPriority(workflowModel.getPriority()); tasksToBeScheduled.add(decisionTask); // get the list of tasks based on the decision diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/DoWhileTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/DoWhileTaskMapper.java index 692834dca7..1691e69dfe 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/DoWhileTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/DoWhileTaskMapper.java @@ -75,7 +75,6 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { return Collections.emptyList(); } - String taskId = taskMapperContext.getTaskId(); List tasksToBeScheduled = new ArrayList<>(); int retryCount = taskMapperContext.getRetryCount(); TaskDef taskDefinition = @@ -87,18 +86,10 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { workflowTask.getName())) .orElseGet(TaskDef::new)); - TaskModel loopTask = new TaskModel(); + TaskModel loopTask = taskMapperContext.createTaskModel(); loopTask.setTaskType(TaskType.TASK_TYPE_DO_WHILE); - loopTask.setTaskDefName(workflowTask.getName()); - loopTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - loopTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - loopTask.setCorrelationId(workflowModel.getCorrelationId()); - loopTask.setWorkflowType(workflowModel.getWorkflowName()); - loopTask.setScheduledTime(System.currentTimeMillis()); - loopTask.setTaskId(taskId); loopTask.setIteration(1); loopTask.setStatus(TaskModel.Status.IN_PROGRESS); - loopTask.setWorkflowTask(workflowTask); loopTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency()); loopTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds()); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/DynamicTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/DynamicTaskMapper.java index 189551666c..7eb9372f01 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/DynamicTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/DynamicTaskMapper.java @@ -88,22 +88,18 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) workflowModel, taskDefinition, taskMapperContext.getTaskId()); - TaskModel dynamicTask = new TaskModel(); + + // IMPORTANT: The WorkflowTask that is inside TaskMapperContext is changed above + // createTaskModel() must be called here so the changes are reflected in the created + // TaskModel + TaskModel dynamicTask = taskMapperContext.createTaskModel(); dynamicTask.setStartDelayInSeconds(workflowTask.getStartDelay()); - dynamicTask.setTaskId(taskMapperContext.getTaskId()); - dynamicTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); dynamicTask.setInputData(input); - dynamicTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - dynamicTask.setWorkflowType(workflowModel.getWorkflowName()); dynamicTask.setStatus(TaskModel.Status.SCHEDULED); dynamicTask.setTaskType(workflowTask.getType()); - dynamicTask.setTaskDefName(workflowTask.getName()); - dynamicTask.setCorrelationId(workflowModel.getCorrelationId()); - dynamicTask.setScheduledTime(System.currentTimeMillis()); dynamicTask.setRetryCount(retryCount); dynamicTask.setCallbackAfterSeconds(workflowTask.getStartDelay()); dynamicTask.setResponseTimeoutSeconds(taskDefinition.getResponseTimeoutSeconds()); - dynamicTask.setWorkflowTask(workflowTask); dynamicTask.setTaskType(taskName); dynamicTask.setRetriedTaskId(retriedTaskId); dynamicTask.setWorkflowPriority(workflowModel.getPriority()); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/EventTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/EventTaskMapper.java index 893c396fdb..a581ec17e2 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/EventTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/EventTaskMapper.java @@ -12,7 +12,6 @@ */ package com.netflix.conductor.core.execution.mapper; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -63,22 +62,14 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { String sink = (String) eventTaskInput.get("sink"); Boolean asynComplete = (Boolean) eventTaskInput.get("asyncComplete"); - TaskModel eventTask = new TaskModel(); + TaskModel eventTask = taskMapperContext.createTaskModel(); eventTask.setTaskType(TASK_TYPE_EVENT); - eventTask.setTaskDefName(workflowTask.getName()); - eventTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - eventTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - eventTask.setWorkflowType(workflowModel.getWorkflowName()); - eventTask.setCorrelationId(workflowModel.getCorrelationId()); - eventTask.setScheduledTime(System.currentTimeMillis()); + eventTask.setStatus(TaskModel.Status.SCHEDULED); + eventTask.setInputData(eventTaskInput); eventTask.getInputData().put("sink", sink); eventTask.getInputData().put("asyncComplete", asynComplete); - eventTask.setTaskId(taskId); - eventTask.setStatus(TaskModel.Status.SCHEDULED); - eventTask.setWorkflowPriority(workflowModel.getPriority()); - eventTask.setWorkflowTask(workflowTask); - return Collections.singletonList(eventTask); + return List.of(eventTask); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/ExclusiveJoinTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/ExclusiveJoinTaskMapper.java index 85a346db02..5fdb86ea2e 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/ExclusiveJoinTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/ExclusiveJoinTaskMapper.java @@ -12,7 +12,6 @@ */ package com.netflix.conductor.core.execution.mapper; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -24,7 +23,6 @@ import com.netflix.conductor.common.metadata.tasks.TaskType; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; import com.netflix.conductor.model.TaskModel; -import com.netflix.conductor.model.WorkflowModel; @Component public class ExclusiveJoinTaskMapper implements TaskMapper { @@ -42,8 +40,6 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { LOGGER.debug("TaskMapperContext {} in ExclusiveJoinTaskMapper", taskMapperContext); WorkflowTask workflowTask = taskMapperContext.getWorkflowTask(); - WorkflowModel workflowModel = taskMapperContext.getWorkflowModel(); - String taskId = taskMapperContext.getTaskId(); Map joinInput = new HashMap<>(); joinInput.put("joinOn", workflowTask.getJoinOn()); @@ -52,21 +48,13 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { joinInput.put("defaultExclusiveJoinTask", workflowTask.getDefaultExclusiveJoinTask()); } - TaskModel joinTask = new TaskModel(); + TaskModel joinTask = taskMapperContext.createTaskModel(); joinTask.setTaskType(TaskType.TASK_TYPE_EXCLUSIVE_JOIN); joinTask.setTaskDefName(TaskType.TASK_TYPE_EXCLUSIVE_JOIN); - joinTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - joinTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - joinTask.setCorrelationId(workflowModel.getCorrelationId()); - joinTask.setWorkflowType(workflowModel.getWorkflowName()); - joinTask.setScheduledTime(System.currentTimeMillis()); joinTask.setStartTime(System.currentTimeMillis()); joinTask.setInputData(joinInput); - joinTask.setTaskId(taskId); joinTask.setStatus(TaskModel.Status.IN_PROGRESS); - joinTask.setWorkflowPriority(workflowModel.getPriority()); - joinTask.setWorkflowTask(workflowTask); - return Collections.singletonList(joinTask); + return List.of(joinTask); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java index a0479b52a1..21b41f38a8 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java @@ -138,8 +138,7 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) Map> tasksInput = workflowTasksAndInputPair.getRight(); // Create Fork Task which needs to be followed by the dynamic tasks - TaskModel forkDynamicTask = - createDynamicForkTask(workflowTask, workflowModel, taskId, dynForkTasks); + TaskModel forkDynamicTask = createDynamicForkTask(taskMapperContext, dynForkTasks); mappedTasks.add(forkDynamicTask); @@ -212,7 +211,7 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) if (joinWorkflowTask == null || !joinWorkflowTask.getType().equals(TaskType.JOIN.name())) { throw new TerminateWorkflowException( - "Dynamic join definition is not followed by a join task. Check the blueprint"); + "Dynamic join definition is not followed by a join task. Check the workflow definition."); } // Create Join task @@ -239,17 +238,10 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) */ @VisibleForTesting TaskModel createDynamicForkTask( - WorkflowTask workflowTask, - WorkflowModel workflowModel, - String taskId, - List dynForkTasks) { - TaskModel forkDynamicTask = new TaskModel(); + TaskMapperContext taskMapperContext, List dynForkTasks) { + TaskModel forkDynamicTask = taskMapperContext.createTaskModel(); forkDynamicTask.setTaskType(TaskType.TASK_TYPE_FORK); forkDynamicTask.setTaskDefName(TaskType.TASK_TYPE_FORK); - forkDynamicTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - forkDynamicTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - forkDynamicTask.setCorrelationId(workflowModel.getCorrelationId()); - forkDynamicTask.setScheduledTime(System.currentTimeMillis()); forkDynamicTask.setEndTime(System.currentTimeMillis()); List forkedTaskNames = dynForkTasks.stream() @@ -261,10 +253,7 @@ TaskModel createDynamicForkTask( .put( "forkedTaskDefs", dynForkTasks); // TODO: Remove this parameter in the later releases - forkDynamicTask.setTaskId(taskId); forkDynamicTask.setStatus(TaskModel.Status.COMPLETED); - forkDynamicTask.setWorkflowTask(workflowTask); - forkDynamicTask.setWorkflowPriority(workflowModel.getPriority()); return forkDynamicTask; } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinTaskMapper.java index 7c2572d5e5..653dea36ef 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinTaskMapper.java @@ -69,23 +69,13 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) WorkflowModel workflowModel = taskMapperContext.getWorkflowModel(); int retryCount = taskMapperContext.getRetryCount(); - String taskId = taskMapperContext.getTaskId(); - List tasksToBeScheduled = new LinkedList<>(); - TaskModel forkTask = new TaskModel(); + TaskModel forkTask = taskMapperContext.createTaskModel(); forkTask.setTaskType(TaskType.TASK_TYPE_FORK); forkTask.setTaskDefName(TaskType.TASK_TYPE_FORK); - forkTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - forkTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - forkTask.setWorkflowType(workflowModel.getWorkflowName()); - forkTask.setCorrelationId(workflowModel.getCorrelationId()); - forkTask.setScheduledTime(System.currentTimeMillis()); forkTask.setStartTime(System.currentTimeMillis()); forkTask.setInputData(taskInput); - forkTask.setTaskId(taskId); forkTask.setStatus(TaskModel.Status.COMPLETED); - forkTask.setWorkflowPriority(workflowModel.getPriority()); - forkTask.setWorkflowTask(workflowTask); tasksToBeScheduled.add(forkTask); List> forkTasks = workflowTask.getForkTasks(); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/HTTPTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/HTTPTaskMapper.java index 60bbd83a3f..cdedc5c2e2 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/HTTPTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/HTTPTaskMapper.java @@ -83,22 +83,12 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) workflowTask.getInputParameters(), workflowModel, taskId, taskDefinition); Boolean asynComplete = (Boolean) input.get("asyncComplete"); - TaskModel httpTask = new TaskModel(); - httpTask.setTaskType(workflowTask.getType()); - httpTask.setTaskDefName(workflowTask.getName()); - httpTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - httpTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - httpTask.setWorkflowType(workflowModel.getWorkflowName()); - httpTask.setCorrelationId(workflowModel.getCorrelationId()); - httpTask.setScheduledTime(System.currentTimeMillis()); - httpTask.setTaskId(taskId); + TaskModel httpTask = taskMapperContext.createTaskModel(); httpTask.setInputData(input); httpTask.getInputData().put("asyncComplete", asynComplete); httpTask.setStatus(TaskModel.Status.SCHEDULED); httpTask.setRetryCount(retryCount); httpTask.setCallbackAfterSeconds(workflowTask.getStartDelay()); - httpTask.setWorkflowTask(workflowTask); - httpTask.setWorkflowPriority(workflowModel.getPriority()); if (Objects.nonNull(taskDefinition)) { httpTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency()); httpTask.setRateLimitFrequencyInSeconds( @@ -106,6 +96,6 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) httpTask.setIsolationGroupId(taskDefinition.getIsolationGroupId()); httpTask.setExecutionNameSpace(taskDefinition.getExecutionNameSpace()); } - return Collections.singletonList(httpTask); + return List.of(httpTask); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/InlineTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/InlineTaskMapper.java index 75683f652a..9125f0139b 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/InlineTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/InlineTaskMapper.java @@ -12,7 +12,6 @@ */ package com.netflix.conductor.core.execution.mapper; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -72,21 +71,12 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { taskId, taskDefinition); - TaskModel inlineTask = new TaskModel(); + TaskModel inlineTask = taskMapperContext.createTaskModel(); inlineTask.setTaskType(TaskType.TASK_TYPE_INLINE); - inlineTask.setTaskDefName(taskMapperContext.getWorkflowTask().getName()); - inlineTask.setReferenceTaskName(taskMapperContext.getWorkflowTask().getTaskReferenceName()); - inlineTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - inlineTask.setWorkflowType(workflowModel.getWorkflowName()); - inlineTask.setCorrelationId(workflowModel.getCorrelationId()); inlineTask.setStartTime(System.currentTimeMillis()); - inlineTask.setScheduledTime(System.currentTimeMillis()); inlineTask.setInputData(taskInput); - inlineTask.setTaskId(taskId); inlineTask.setStatus(TaskModel.Status.IN_PROGRESS); - inlineTask.setWorkflowTask(workflowTask); - inlineTask.setWorkflowPriority(workflowModel.getPriority()); - return Collections.singletonList(inlineTask); + return List.of(inlineTask); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/JoinTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/JoinTaskMapper.java index 811200cdba..34facffaa2 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/JoinTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/JoinTaskMapper.java @@ -12,7 +12,6 @@ */ package com.netflix.conductor.core.execution.mapper; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -62,21 +61,13 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { Map joinInput = new HashMap<>(); joinInput.put("joinOn", workflowTask.getJoinOn()); - TaskModel joinTask = new TaskModel(); + TaskModel joinTask = taskMapperContext.createTaskModel(); joinTask.setTaskType(TaskType.TASK_TYPE_JOIN); joinTask.setTaskDefName(TaskType.TASK_TYPE_JOIN); - joinTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - joinTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - joinTask.setCorrelationId(workflowModel.getCorrelationId()); - joinTask.setWorkflowType(workflowModel.getWorkflowName()); - joinTask.setScheduledTime(System.currentTimeMillis()); joinTask.setStartTime(System.currentTimeMillis()); joinTask.setInputData(joinInput); - joinTask.setTaskId(taskId); joinTask.setStatus(TaskModel.Status.IN_PROGRESS); - joinTask.setWorkflowTask(workflowTask); - joinTask.setWorkflowPriority(workflowModel.getPriority()); - return Collections.singletonList(joinTask); + return List.of(joinTask); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/JsonJQTransformTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/JsonJQTransformTaskMapper.java index 8d705af855..7aeabd81c7 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/JsonJQTransformTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/JsonJQTransformTaskMapper.java @@ -12,7 +12,6 @@ */ package com.netflix.conductor.core.execution.mapper; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -63,21 +62,11 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { parametersUtils.getTaskInputV2( workflowTask.getInputParameters(), workflowModel, taskId, taskDefinition); - TaskModel jsonJQTransformTask = new TaskModel(); - jsonJQTransformTask.setTaskType(workflowTask.getType()); - jsonJQTransformTask.setTaskDefName(workflowTask.getName()); - jsonJQTransformTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - jsonJQTransformTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - jsonJQTransformTask.setWorkflowType(workflowModel.getWorkflowName()); - jsonJQTransformTask.setCorrelationId(workflowModel.getCorrelationId()); + TaskModel jsonJQTransformTask = taskMapperContext.createTaskModel(); jsonJQTransformTask.setStartTime(System.currentTimeMillis()); - jsonJQTransformTask.setScheduledTime(System.currentTimeMillis()); jsonJQTransformTask.setInputData(taskInput); - jsonJQTransformTask.setTaskId(taskId); jsonJQTransformTask.setStatus(TaskModel.Status.IN_PROGRESS); - jsonJQTransformTask.setWorkflowTask(workflowTask); - jsonJQTransformTask.setWorkflowPriority(workflowModel.getPriority()); - return Collections.singletonList(jsonJQTransformTask); + return List.of(jsonJQTransformTask); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/KafkaPublishTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/KafkaPublishTaskMapper.java index 6a7e0ddd34..de852c4f59 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/KafkaPublishTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/KafkaPublishTaskMapper.java @@ -80,21 +80,11 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) parametersUtils.getTaskInputV2( workflowTask.getInputParameters(), workflowModel, taskId, taskDefinition); - TaskModel kafkaPublishTask = new TaskModel(); - kafkaPublishTask.setTaskType(workflowTask.getType()); - kafkaPublishTask.setTaskDefName(workflowTask.getName()); - kafkaPublishTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - kafkaPublishTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - kafkaPublishTask.setWorkflowType(workflowModel.getWorkflowName()); - kafkaPublishTask.setCorrelationId(workflowModel.getCorrelationId()); - kafkaPublishTask.setScheduledTime(System.currentTimeMillis()); - kafkaPublishTask.setTaskId(taskId); + TaskModel kafkaPublishTask = taskMapperContext.createTaskModel(); kafkaPublishTask.setInputData(input); kafkaPublishTask.setStatus(TaskModel.Status.SCHEDULED); kafkaPublishTask.setRetryCount(retryCount); kafkaPublishTask.setCallbackAfterSeconds(workflowTask.getStartDelay()); - kafkaPublishTask.setWorkflowTask(workflowTask); - kafkaPublishTask.setWorkflowPriority(workflowModel.getPriority()); if (Objects.nonNull(taskDefinition)) { kafkaPublishTask.setExecutionNameSpace(taskDefinition.getExecutionNameSpace()); kafkaPublishTask.setIsolationGroupId(taskDefinition.getIsolationGroupId()); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/LambdaTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/LambdaTaskMapper.java index d0432fd9da..52e08be777 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/LambdaTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/LambdaTaskMapper.java @@ -12,7 +12,6 @@ */ package com.netflix.conductor.core.execution.mapper; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -73,21 +72,12 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { taskId, taskDefinition); - TaskModel lambdaTask = new TaskModel(); + TaskModel lambdaTask = taskMapperContext.createTaskModel(); lambdaTask.setTaskType(TaskType.TASK_TYPE_LAMBDA); - lambdaTask.setTaskDefName(taskMapperContext.getWorkflowTask().getName()); - lambdaTask.setReferenceTaskName(taskMapperContext.getWorkflowTask().getTaskReferenceName()); - lambdaTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - lambdaTask.setWorkflowType(workflowModel.getWorkflowName()); - lambdaTask.setCorrelationId(workflowModel.getCorrelationId()); lambdaTask.setStartTime(System.currentTimeMillis()); - lambdaTask.setScheduledTime(System.currentTimeMillis()); lambdaTask.setInputData(taskInput); - lambdaTask.setTaskId(taskId); lambdaTask.setStatus(TaskModel.Status.IN_PROGRESS); - lambdaTask.setWorkflowTask(workflowTask); - lambdaTask.setWorkflowPriority(workflowModel.getPriority()); - return Collections.singletonList(lambdaTask); + return List.of(lambdaTask); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SetVariableTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SetVariableTaskMapper.java index 58bc3513d9..a5ba3a4437 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SetVariableTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SetVariableTaskMapper.java @@ -12,19 +12,15 @@ */ package com.netflix.conductor.core.execution.mapper; -import java.util.Collections; import java.util.List; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import com.netflix.conductor.common.metadata.tasks.TaskType; -import com.netflix.conductor.common.metadata.workflow.WorkflowTask; import com.netflix.conductor.core.exception.TerminateWorkflowException; import com.netflix.conductor.model.TaskModel; -import com.netflix.conductor.model.WorkflowModel; @Component public class SetVariableTaskMapper implements TaskMapper { @@ -41,26 +37,11 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) throws TerminateWorkflowException { LOGGER.debug("TaskMapperContext {} in SetVariableMapper", taskMapperContext); - WorkflowTask workflowTask = taskMapperContext.getWorkflowTask(); - WorkflowModel workflowModel = taskMapperContext.getWorkflowModel(); - Map taskInput = taskMapperContext.getTaskInput(); - String taskId = taskMapperContext.getTaskId(); - - TaskModel varTask = new TaskModel(); - varTask.setTaskType(workflowTask.getType()); - varTask.setTaskDefName(workflowTask.getName()); - varTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - varTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - varTask.setWorkflowType(workflowModel.getWorkflowName()); - varTask.setCorrelationId(workflowModel.getCorrelationId()); + TaskModel varTask = taskMapperContext.createTaskModel(); varTask.setStartTime(System.currentTimeMillis()); - varTask.setScheduledTime(System.currentTimeMillis()); - varTask.setInputData(taskInput); - varTask.setTaskId(taskId); + varTask.setInputData(taskMapperContext.getTaskInput()); varTask.setStatus(TaskModel.Status.IN_PROGRESS); - varTask.setWorkflowTask(workflowTask); - varTask.setWorkflowPriority(workflowModel.getPriority()); - return Collections.singletonList(varTask); + return List.of(varTask); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SimpleTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SimpleTaskMapper.java index fa8711d683..6f5a1f9916 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SimpleTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SimpleTaskMapper.java @@ -12,7 +12,6 @@ */ package com.netflix.conductor.core.execution.mapper; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -51,7 +50,8 @@ public TaskType getTaskType() { } /** - * This method maps a {@link WorkflowTask} of type {@link TaskType#SIMPLE} to a {@link Task} + * This method maps a {@link WorkflowTask} of type {@link TaskType#SIMPLE} to a {@link + * TaskModel} * * @param taskMapperContext: A wrapper class containing the {@link WorkflowTask}, {@link * WorkflowDef}, {@link WorkflowModel} and a string representation of the TaskId @@ -86,26 +86,17 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) workflowModel, taskDefinition, taskMapperContext.getTaskId()); - TaskModel simpleTask = new TaskModel(); + TaskModel simpleTask = taskMapperContext.createTaskModel(); + simpleTask.setTaskType(workflowTask.getName()); simpleTask.setStartDelayInSeconds(workflowTask.getStartDelay()); - simpleTask.setTaskId(taskMapperContext.getTaskId()); - simpleTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); simpleTask.setInputData(input); - simpleTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - simpleTask.setWorkflowType(workflowModel.getWorkflowName()); simpleTask.setStatus(TaskModel.Status.SCHEDULED); - simpleTask.setTaskType(workflowTask.getName()); - simpleTask.setTaskDefName(workflowTask.getName()); - simpleTask.setCorrelationId(workflowModel.getCorrelationId()); - simpleTask.setScheduledTime(System.currentTimeMillis()); simpleTask.setRetryCount(retryCount); simpleTask.setCallbackAfterSeconds(workflowTask.getStartDelay()); simpleTask.setResponseTimeoutSeconds(taskDefinition.getResponseTimeoutSeconds()); - simpleTask.setWorkflowTask(workflowTask); simpleTask.setRetriedTaskId(retriedTaskId); - simpleTask.setWorkflowPriority(workflowModel.getPriority()); simpleTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency()); simpleTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds()); - return Collections.singletonList(simpleTask); + return List.of(simpleTask); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapper.java index 38d06231d3..4c1a2b8fb4 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapper.java @@ -75,26 +75,17 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { subWorkflowTaskToDomain = (Map) uncheckedTaskToDomain; } - TaskModel subWorkflowTask = new TaskModel(); + TaskModel subWorkflowTask = taskMapperContext.createTaskModel(); subWorkflowTask.setTaskType(TASK_TYPE_SUB_WORKFLOW); - subWorkflowTask.setTaskDefName(workflowTask.getName()); - subWorkflowTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - subWorkflowTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - subWorkflowTask.setWorkflowType(workflowModel.getWorkflowName()); - subWorkflowTask.setCorrelationId(workflowModel.getCorrelationId()); - subWorkflowTask.setScheduledTime(System.currentTimeMillis()); subWorkflowTask.addInput("subWorkflowName", subWorkflowName); subWorkflowTask.addInput("subWorkflowVersion", subWorkflowVersion); subWorkflowTask.addInput("subWorkflowTaskToDomain", subWorkflowTaskToDomain); subWorkflowTask.addInput("subWorkflowDefinition", subWorkflowDefinition); subWorkflowTask.addInput("workflowInput", taskMapperContext.getTaskInput()); - subWorkflowTask.setTaskId(taskId); subWorkflowTask.setStatus(TaskModel.Status.SCHEDULED); - subWorkflowTask.setWorkflowTask(workflowTask); - subWorkflowTask.setWorkflowPriority(workflowModel.getPriority()); subWorkflowTask.setCallbackAfterSeconds(workflowTask.getStartDelay()); LOGGER.debug("SubWorkflowTask {} created to be Scheduled", subWorkflowTask); - return Collections.singletonList(subWorkflowTask); + return List.of(subWorkflowTask); } @VisibleForTesting diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SwitchTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SwitchTaskMapper.java index 410094a2cb..6f860fd868 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SwitchTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SwitchTaskMapper.java @@ -78,7 +78,6 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { WorkflowModel workflowModel = taskMapperContext.getWorkflowModel(); Map taskInput = taskMapperContext.getTaskInput(); int retryCount = taskMapperContext.getRetryCount(); - String taskId = taskMapperContext.getTaskId(); // get the expression to be evaluated String evaluatorType = workflowTask.getEvaluatorType(); @@ -91,21 +90,13 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { String evalResult = "" + evaluator.evaluate(workflowTask.getExpression(), taskInput); // QQ why is the case value and the caseValue passed and caseOutput passes as the same ?? - TaskModel switchTask = new TaskModel(); + TaskModel switchTask = taskMapperContext.createTaskModel(); switchTask.setTaskType(TaskType.TASK_TYPE_SWITCH); switchTask.setTaskDefName(TaskType.TASK_TYPE_SWITCH); - switchTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - switchTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - switchTask.setWorkflowType(workflowModel.getWorkflowName()); - switchTask.setCorrelationId(workflowModel.getCorrelationId()); - switchTask.setScheduledTime(System.currentTimeMillis()); switchTask.getInputData().put("case", evalResult); switchTask.getOutputData().put("evaluationResult", Collections.singletonList(evalResult)); - switchTask.setTaskId(taskId); switchTask.setStartTime(System.currentTimeMillis()); switchTask.setStatus(TaskModel.Status.IN_PROGRESS); - switchTask.setWorkflowTask(workflowTask); - switchTask.setWorkflowPriority(workflowModel.getPriority()); tasksToBeScheduled.add(switchTask); // get the list of tasks based on the evaluated expression diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/TaskMapperContext.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/TaskMapperContext.java index 55aaabc81a..a34c4a0e4f 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/TaskMapperContext.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/TaskMapperContext.java @@ -18,6 +18,7 @@ import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; import com.netflix.conductor.core.execution.DeciderService; +import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; /** Business Object class used for interaction between the DeciderService and Different Mappers */ @@ -96,6 +97,24 @@ public DeciderService getDeciderService() { return deciderService; } + public TaskModel createTaskModel() { + TaskModel taskModel = new TaskModel(); + taskModel.setReferenceTaskName(workflowTask.getTaskReferenceName()); + taskModel.setWorkflowInstanceId(workflowModel.getWorkflowId()); + taskModel.setWorkflowType(workflowModel.getWorkflowName()); + taskModel.setCorrelationId(workflowModel.getCorrelationId()); + taskModel.setScheduledTime(System.currentTimeMillis()); + + taskModel.setTaskId(taskId); + taskModel.setWorkflowTask(workflowTask); + taskModel.setWorkflowPriority(workflowModel.getPriority()); + + // the following properties are overridden by some TaskMapper implementations + taskModel.setTaskType(workflowTask.getType()); + taskModel.setTaskDefName(workflowTask.getName()); + return taskModel; + } + @Override public String toString() { return "TaskMapperContext{" diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/TerminateTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/TerminateTaskMapper.java index 4032ba1a24..e5fcf5c2b6 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/TerminateTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/TerminateTaskMapper.java @@ -20,15 +20,12 @@ import org.springframework.stereotype.Component; import com.netflix.conductor.common.metadata.tasks.TaskType; -import com.netflix.conductor.common.metadata.workflow.WorkflowTask; import com.netflix.conductor.core.utils.ParametersUtils; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_TERMINATE; -import static java.util.Collections.singletonList; - @Component public class TerminateTaskMapper implements TaskMapper { @@ -49,7 +46,6 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { logger.debug("TaskMapperContext {} in TerminateTaskMapper", taskMapperContext); - WorkflowTask workflowTask = taskMapperContext.getWorkflowTask(); WorkflowModel workflowModel = taskMapperContext.getWorkflowModel(); String taskId = taskMapperContext.getTaskId(); @@ -60,20 +56,11 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { taskId, null); - TaskModel task = new TaskModel(); + TaskModel task = taskMapperContext.createTaskModel(); task.setTaskType(TASK_TYPE_TERMINATE); - task.setTaskDefName(workflowTask.getName()); - task.setReferenceTaskName(workflowTask.getTaskReferenceName()); - task.setWorkflowInstanceId(workflowModel.getWorkflowId()); - task.setWorkflowType(workflowModel.getWorkflowName()); - task.setCorrelationId(workflowModel.getCorrelationId()); - task.setScheduledTime(System.currentTimeMillis()); task.setStartTime(System.currentTimeMillis()); task.setInputData(taskInput); - task.setTaskId(taskId); task.setStatus(TaskModel.Status.IN_PROGRESS); - task.setWorkflowTask(workflowTask); - task.setWorkflowPriority(workflowModel.getPriority()); - return singletonList(task); + return List.of(task); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/UserDefinedTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/UserDefinedTaskMapper.java index d875aa91c6..c7919d7ddf 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/UserDefinedTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/UserDefinedTaskMapper.java @@ -12,7 +12,6 @@ */ package com.netflix.conductor.core.execution.mapper; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -95,24 +94,15 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) parametersUtils.getTaskInputV2( workflowTask.getInputParameters(), workflowModel, taskId, taskDefinition); - TaskModel userDefinedTask = new TaskModel(); - userDefinedTask.setTaskType(workflowTask.getType()); - userDefinedTask.setTaskDefName(workflowTask.getName()); - userDefinedTask.setReferenceTaskName(workflowTask.getTaskReferenceName()); - userDefinedTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - userDefinedTask.setWorkflowType(workflowModel.getWorkflowName()); - userDefinedTask.setCorrelationId(workflowModel.getCorrelationId()); - userDefinedTask.setScheduledTime(System.currentTimeMillis()); - userDefinedTask.setTaskId(taskId); + TaskModel userDefinedTask = taskMapperContext.createTaskModel(); userDefinedTask.setInputData(input); userDefinedTask.setStatus(TaskModel.Status.SCHEDULED); userDefinedTask.setRetryCount(retryCount); userDefinedTask.setCallbackAfterSeconds(workflowTask.getStartDelay()); - userDefinedTask.setWorkflowTask(workflowTask); - userDefinedTask.setWorkflowPriority(workflowModel.getPriority()); userDefinedTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency()); userDefinedTask.setRateLimitFrequencyInSeconds( taskDefinition.getRateLimitFrequencyInSeconds()); - return Collections.singletonList(userDefinedTask); + + return List.of(userDefinedTask); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapper.java index 4c4f6dd9bf..051c7d5049 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapper.java @@ -12,7 +12,6 @@ */ package com.netflix.conductor.core.execution.mapper; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -55,7 +54,6 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { LOGGER.debug("TaskMapperContext {} in WaitTaskMapper", taskMapperContext); - WorkflowTask workflowTask = taskMapperContext.getWorkflowTask(); WorkflowModel workflowModel = taskMapperContext.getWorkflowModel(); String taskId = taskMapperContext.getTaskId(); @@ -66,19 +64,10 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { taskId, null); - TaskModel waitTask = new TaskModel(); + TaskModel waitTask = taskMapperContext.createTaskModel(); waitTask.setTaskType(TASK_TYPE_WAIT); - waitTask.setTaskDefName(taskMapperContext.getWorkflowTask().getName()); - waitTask.setReferenceTaskName(taskMapperContext.getWorkflowTask().getTaskReferenceName()); - waitTask.setWorkflowInstanceId(workflowModel.getWorkflowId()); - waitTask.setWorkflowType(workflowModel.getWorkflowName()); - waitTask.setCorrelationId(workflowModel.getCorrelationId()); - waitTask.setScheduledTime(System.currentTimeMillis()); waitTask.setInputData(waitTaskInput); - waitTask.setTaskId(taskId); waitTask.setStatus(TaskModel.Status.IN_PROGRESS); - waitTask.setWorkflowTask(workflowTask); - waitTask.setWorkflowPriority(workflowModel.getPriority()); - return Collections.singletonList(waitTask); + return List.of(waitTask); } }