Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

ModelMapper.getLeanCopy() copies tasks #2829

Merged
merged 2 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;

Expand All @@ -33,8 +31,6 @@
@Component
public class ModelMapper {

private static final Logger LOGGER = LoggerFactory.getLogger(ModelMapper.class);

private final ExternalPayloadStorageUtils externalPayloadStorageUtils;

public ModelMapper(ExternalPayloadStorageUtils externalPayloadStorageUtils) {
Expand All @@ -61,7 +57,10 @@ public WorkflowModel getFullCopy(WorkflowModel workflowModel) {
public WorkflowModel getLeanCopy(WorkflowModel workflowModel) {
WorkflowModel leanWorkflowModel = workflowModel.copy();
externalizeWorkflowData(leanWorkflowModel);
workflowModel.getTasks().forEach(this::getLeanCopy);
leanWorkflowModel.setTasks(
workflowModel.getTasks().stream()
.map(this::getLeanCopy)
.collect(Collectors.toList()));
return leanWorkflowModel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1715,15 +1715,15 @@ boolean scheduleTask(WorkflowModel workflow, List<TaskModel> tasks) {
String.valueOf(workflow.getWorkflowVersion()));

// Save the tasks in the DAO
createdTasks = executionDAOFacade.createTasks(tasks);
executionDAOFacade.createTasks(tasks);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯


List<TaskModel> systemTasks =
createdTasks.stream()
tasks.stream()
.filter(task -> systemTaskRegistry.isSystemTask(task.getTaskType()))
.collect(Collectors.toList());

tasksToBeQueued =
createdTasks.stream()
tasks.stream()
.filter(task -> !systemTaskRegistry.isSystemTask(task.getTaskType()))
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public TaskModel getTaskByRefName(String refName) {
return found.getLast();
}

/** @return a deep copy of the workflow instance */
/** @return a copy of the workflow instance */
public WorkflowModel copy() {
WorkflowModel copy = new WorkflowModel();
BeanUtils.copyProperties(this, copy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,11 @@ class ModelMapperSpec extends Specification {
externalOutputPayloadStoragePath == '/relative/workflow/path'
endTime == 100L
tasks.size() == 2
!tasks[0].is(workflowModel.tasks[0]) // check if TaskModel is copied
tasks[0].taskId == 'taskId1'
tasks[0].status == TaskModel.Status.SCHEDULED
tasks[0].inputData == ['key1': 'value1']
!tasks[1].is(workflowModel.tasks[1]) // check if TaskModel is copied
tasks[1].taskId == 'taskId2'
tasks[1].status == TaskModel.Status.IN_PROGRESS
tasks[1].externalInputPayloadStoragePath == '/relative/task/path'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,7 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) {
if (workflow != null) {
if (includeTasks) {
List<TaskModel> tasks = getTasksForWorkflow(workflowId);
tasks.sort(
Comparator.comparingLong(TaskModel::getScheduledTime)
.thenComparingInt(TaskModel::getSeq));
tasks.sort(Comparator.comparingInt(TaskModel::getSeq));
workflow.setTasks(tasks);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,7 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) {
if (workflow != null) {
if (includeTasks) {
List<TaskModel> tasks = getTasksForWorkflow(workflowId);
tasks.sort(
Comparator.comparingLong(TaskModel::getScheduledTime)
.thenComparingInt(TaskModel::getSeq));
tasks.sort(Comparator.comparingInt(TaskModel::getSeq));
workflow.setTasks(tasks);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,7 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) {
"getWorkflow", json.length(), "n/a", workflow.getWorkflowName());
if (includeTasks) {
List<TaskModel> tasks = getTasksForWorkflow(workflowId);
tasks.sort(
Comparator.comparingLong(TaskModel::getScheduledTime)
.thenComparingInt(TaskModel::getSeq));
tasks.sort(Comparator.comparingInt(TaskModel::getSeq));
workflow.setTasks(tasks);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class ExternalPayloadStorageSpec extends AbstractSpecification {
@Shared
def WORKFLOW_WITH_DECISION_AND_TERMINATE = 'ConditionalTerminateWorkflow'

@Shared
def WORKFLOW_WITH_SYNCHRONOUS_SYSTEM_TASK = 'workflow_with_synchronous_system_task'

@Autowired
UserTask userTask

Expand All @@ -59,7 +62,8 @@ class ExternalPayloadStorageSpec extends AbstractSpecification {
'conditional_system_task_workflow_integration_test.json',
'fork_join_integration_test.json',
'simple_workflow_with_sub_workflow_inline_def_integration_test.json',
'decision_and_terminate_integration_test.json'
'decision_and_terminate_integration_test.json',
'workflow_with_synchronous_system_task.json'
)
}

Expand Down Expand Up @@ -125,6 +129,47 @@ class ExternalPayloadStorageSpec extends AbstractSpecification {
}
}

def "Test workflow with synchronous system task using external payload storage"() {
given: "An existing workflow definition with sync system task followed by a simple task"
metadataService.getWorkflowDef(WORKFLOW_WITH_SYNCHRONOUS_SYSTEM_TASK, 1)

and: "input required to start large payload workflow"
def correlationId = 'wf_external_storage'
def workflowInputPath = INITIAL_WORKFLOW_INPUT_PATH

when: "the workflow is started"
def workflowInstanceId = workflowExecutor.startWorkflow(WORKFLOW_WITH_SYNCHRONOUS_SYSTEM_TASK, 1, correlationId, null, workflowInputPath, null, null)

then: "verify that the workflow is in a RUNNING state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.SCHEDULED
}

when: "poll and complete the 'integration_task_1' with external payload storage"
def taskOutputPath = TASK_OUTPUT_PATH
def pollAndCompleteLargePayloadTask = workflowTestUtil.pollAndCompleteLargePayloadTask('integration_task_1', 'task1.integration.worker', taskOutputPath)

then: "verify that the 'integration_task_1' was polled and acknowledged"
verifyPolledAndAcknowledgedLargePayloadTask(pollAndCompleteLargePayloadTask)

and: "verify that the 'integration_task1' is complete and the next task is scheduled"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 2
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.COMPLETED
tasks[0].outputData.isEmpty()
tasks[0].externalOutputPayloadStoragePath == taskOutputPath
tasks[1].taskType == 'JSON_JQ_TRANSFORM'
tasks[1].status == Task.Status.COMPLETED
tasks[1].externalInputPayloadStoragePath == INPUT_PAYLOAD_PATH
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding an assert on the json task output

tasks[1].outputData['result'] == 104 // output of .tp2.TEST_SAMPLE | length expression from output.json. On assertion failure, check workflow definition and output.json
}
}

def "Test conditional workflow with system task using external payload storage"() {

given: "An existing workflow definition"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification {
}

def "Test wait time out task based simple workflow"() {
when: "Start a workflow based on a task that has a registered wiat time out"
when: "Start a workflow based on a task that has a registered wait time out"
def workflowInstanceId = workflowExecutor.startWorkflow(WAIT_TIME_OUT_WORKFLOW, 1,
'', [:], null, null, null)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"name": "workflow_with_synchronous_system_task",
"description": "A workflow with a simple task followed a synchronous task",
"version": 1,
"tasks": [
{
"name": "integration_task_1",
"taskReferenceName": "t1",
"type": "SIMPLE"
},
{
"name": "jsonjq",
"taskReferenceName": "jsonjq",
"inputParameters": {
"queryExpression": ".tp2.TEST_SAMPLE | length",
"tp1": "${workflow.input.param1}",
"tp2": "${t1.output.op}"
},
"type": "JSON_JQ_TRANSFORM"
}
],
"inputParameters": [],
"outputParameters": {
"data": "${jsonjq.output.resources}"
},
"schemaVersion": 2,
"restartable": true,
"workflowStatusListenerEnabled": false,
"ownerEmail": "example@email.com",
"timeoutPolicy": "ALERT_ONLY",
"timeoutSeconds": 0,
"variables": {},
"inputTemplate": {}
}