From 98ccb588b510f58636aa3349cba4b7defbe762de Mon Sep 17 00:00:00 2001 From: Aravindan Ramkumar <1028385+aravindanr@users.noreply.github.com> Date: Tue, 8 Mar 2022 11:28:55 -0800 Subject: [PATCH 1/2] ModelMapper.getLeanCopy() copies tasks --- .../conductor/core/dal/ModelMapper.java | 9 ++-- .../core/execution/WorkflowExecutor.java | 6 +-- .../conductor/model/WorkflowModel.java | 2 +- .../conductor/core/dal/ModelMapperSpec.groovy | 2 + .../ExternalPayloadStorageSpec.groovy | 47 ++++++++++++++++++- ...workflow_with_synchronous_system_task.json | 34 ++++++++++++++ 6 files changed, 90 insertions(+), 10 deletions(-) create mode 100644 test-harness/src/test/resources/workflow_with_synchronous_system_task.json diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ModelMapper.java b/core/src/main/java/com/netflix/conductor/core/dal/ModelMapper.java index f566d5ea32..b2f0be8b34 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ModelMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ModelMapper.java @@ -16,8 +16,6 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Component; @@ -33,8 +31,6 @@ @Component public class ModelMapper { - private static final Logger LOGGER = LoggerFactory.getLogger(ModelMapper.class); - private final ExternalPayloadStorageUtils externalPayloadStorageUtils; public ModelMapper(ExternalPayloadStorageUtils externalPayloadStorageUtils) { @@ -61,7 +57,10 @@ public WorkflowModel getFullCopy(WorkflowModel workflowModel) { public WorkflowModel getLeanCopy(WorkflowModel workflowModel) { WorkflowModel leanWorkflowModel = workflowModel.copy(); externalizeWorkflowData(leanWorkflowModel); - workflowModel.getTasks().forEach(this::getLeanCopy); + leanWorkflowModel.setTasks( + workflowModel.getTasks().stream() + .map(this::getLeanCopy) + .collect(Collectors.toList())); return leanWorkflowModel; } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index e4083fd137..1fd88f9096 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -1715,15 +1715,15 @@ boolean scheduleTask(WorkflowModel workflow, List tasks) { String.valueOf(workflow.getWorkflowVersion())); // Save the tasks in the DAO - createdTasks = executionDAOFacade.createTasks(tasks); + executionDAOFacade.createTasks(tasks); List systemTasks = - createdTasks.stream() + tasks.stream() .filter(task -> systemTaskRegistry.isSystemTask(task.getTaskType())) .collect(Collectors.toList()); tasksToBeQueued = - createdTasks.stream() + tasks.stream() .filter(task -> !systemTaskRegistry.isSystemTask(task.getTaskType())) .collect(Collectors.toList()); diff --git a/core/src/main/java/com/netflix/conductor/model/WorkflowModel.java b/core/src/main/java/com/netflix/conductor/model/WorkflowModel.java index 53230f19db..9685f91de1 100644 --- a/core/src/main/java/com/netflix/conductor/model/WorkflowModel.java +++ b/core/src/main/java/com/netflix/conductor/model/WorkflowModel.java @@ -377,7 +377,7 @@ public TaskModel getTaskByRefName(String refName) { return found.getLast(); } - /** @return a deep copy of the workflow instance */ + /** @return a copy of the workflow instance */ public WorkflowModel copy() { WorkflowModel copy = new WorkflowModel(); BeanUtils.copyProperties(this, copy); diff --git a/core/src/test/groovy/com/netflix/conductor/core/dal/ModelMapperSpec.groovy b/core/src/test/groovy/com/netflix/conductor/core/dal/ModelMapperSpec.groovy index 713970f497..8f4851883b 100644 --- a/core/src/test/groovy/com/netflix/conductor/core/dal/ModelMapperSpec.groovy +++ b/core/src/test/groovy/com/netflix/conductor/core/dal/ModelMapperSpec.groovy @@ -104,9 +104,11 @@ class ModelMapperSpec extends Specification { externalOutputPayloadStoragePath == '/relative/workflow/path' endTime == 100L tasks.size() == 2 + !tasks[0].is(workflowModel.tasks[0]) // check if TaskModel is copied tasks[0].taskId == 'taskId1' tasks[0].status == TaskModel.Status.SCHEDULED tasks[0].inputData == ['key1': 'value1'] + !tasks[1].is(workflowModel.tasks[1]) // check if TaskModel is copied tasks[1].taskId == 'taskId2' tasks[1].status == TaskModel.Status.IN_PROGRESS tasks[1].externalInputPayloadStoragePath == '/relative/task/path' diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy index 935ce94f69..0b71317dd8 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy @@ -48,6 +48,9 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { @Shared def WORKFLOW_WITH_DECISION_AND_TERMINATE = 'ConditionalTerminateWorkflow' + @Shared + def WORKFLOW_WITH_SYNCHRONOUS_SYSTEM_TASK = 'workflow_with_synchronous_system_task' + @Autowired UserTask userTask @@ -59,7 +62,8 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { 'conditional_system_task_workflow_integration_test.json', 'fork_join_integration_test.json', 'simple_workflow_with_sub_workflow_inline_def_integration_test.json', - 'decision_and_terminate_integration_test.json' + 'decision_and_terminate_integration_test.json', + 'workflow_with_synchronous_system_task.json' ) } @@ -125,6 +129,47 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { } } + def "Test workflow with synchronous system task using external payload storage"() { + given: "An existing workflow definition with sync system task followed by a simple task" + metadataService.getWorkflowDef(WORKFLOW_WITH_SYNCHRONOUS_SYSTEM_TASK, 1) + + and: "input required to start large payload workflow" + def correlationId = 'wf_external_storage' + def workflowInputPath = INITIAL_WORKFLOW_INPUT_PATH + + when: "the workflow is started" + def workflowInstanceId = workflowExecutor.startWorkflow(WORKFLOW_WITH_SYNCHRONOUS_SYSTEM_TASK, 1, correlationId, null, workflowInputPath, null, null) + + then: "verify that the workflow is in a RUNNING state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].taskType == 'integration_task_1' + tasks[0].status == Task.Status.SCHEDULED + } + + when: "poll and complete the 'integration_task_1' with external payload storage" + def taskOutputPath = TASK_OUTPUT_PATH + def pollAndCompleteLargePayloadTask = workflowTestUtil.pollAndCompleteLargePayloadTask('integration_task_1', 'task1.integration.worker', taskOutputPath) + + then: "verify that the 'integration_task_1' was polled and acknowledged" + verifyPolledAndAcknowledgedLargePayloadTask(pollAndCompleteLargePayloadTask) + + and: "verify that the 'integration_task1' is complete and the next task is scheduled" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 2 + tasks[0].taskType == 'integration_task_1' + tasks[0].status == Task.Status.COMPLETED + tasks[0].outputData.isEmpty() + tasks[0].externalOutputPayloadStoragePath == taskOutputPath + tasks[1].taskType == 'JSON_JQ_TRANSFORM' + tasks[1].status == Task.Status.COMPLETED + tasks[1].externalInputPayloadStoragePath == INPUT_PAYLOAD_PATH + tasks[1].outputData['result'] == 104 // output of .tp2.TEST_SAMPLE | length expression from output.json. On assertion failure, check workflow definition and output.json + } + } + def "Test conditional workflow with system task using external payload storage"() { given: "An existing workflow definition" diff --git a/test-harness/src/test/resources/workflow_with_synchronous_system_task.json b/test-harness/src/test/resources/workflow_with_synchronous_system_task.json new file mode 100644 index 0000000000..c56fda7451 --- /dev/null +++ b/test-harness/src/test/resources/workflow_with_synchronous_system_task.json @@ -0,0 +1,34 @@ +{ + "name": "workflow_with_synchronous_system_task", + "description": "A workflow with a simple task followed a synchronous task", + "version": 1, + "tasks": [ + { + "name": "integration_task_1", + "taskReferenceName": "t1", + "type": "SIMPLE" + }, + { + "name": "jsonjq", + "taskReferenceName": "jsonjq", + "inputParameters": { + "queryExpression": ".tp2.TEST_SAMPLE | length", + "tp1": "${workflow.input.param1}", + "tp2": "${t1.output.op}" + }, + "type": "JSON_JQ_TRANSFORM" + } + ], + "inputParameters": [], + "outputParameters": { + "data": "${jsonjq.output.resources}" + }, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "ownerEmail": "example@email.com", + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "variables": {}, + "inputTemplate": {} +} From d7c5f495be4f7debe1201e3713eb97c9db42a006 Mon Sep 17 00:00:00 2001 From: Aravindan Ramkumar <1028385+aravindanr@users.noreply.github.com> Date: Tue, 8 Mar 2022 15:58:05 -0800 Subject: [PATCH 2/2] Tasks are ordered using TaskModel.getSeq() --- .../com/netflix/conductor/mysql/dao/MySQLExecutionDAO.java | 4 +--- .../netflix/conductor/postgres/dao/PostgresExecutionDAO.java | 4 +--- .../com/netflix/conductor/redis/dao/RedisExecutionDAO.java | 4 +--- .../test/integration/WorkflowAndTaskConfigurationSpec.groovy | 2 +- 4 files changed, 4 insertions(+), 10 deletions(-) diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/mysql/dao/MySQLExecutionDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/mysql/dao/MySQLExecutionDAO.java index fb07329854..9c50ad7d64 100644 --- a/mysql-persistence/src/main/java/com/netflix/conductor/mysql/dao/MySQLExecutionDAO.java +++ b/mysql-persistence/src/main/java/com/netflix/conductor/mysql/dao/MySQLExecutionDAO.java @@ -333,9 +333,7 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) { if (workflow != null) { if (includeTasks) { List tasks = getTasksForWorkflow(workflowId); - tasks.sort( - Comparator.comparingLong(TaskModel::getScheduledTime) - .thenComparingInt(TaskModel::getSeq)); + tasks.sort(Comparator.comparingInt(TaskModel::getSeq)); workflow.setTasks(tasks); } } diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresExecutionDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresExecutionDAO.java index ed42620ac9..510563d363 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresExecutionDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresExecutionDAO.java @@ -335,9 +335,7 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) { if (workflow != null) { if (includeTasks) { List tasks = getTasksForWorkflow(workflowId); - tasks.sort( - Comparator.comparingLong(TaskModel::getScheduledTime) - .thenComparingInt(TaskModel::getSeq)); + tasks.sort(Comparator.comparingInt(TaskModel::getSeq)); workflow.setTasks(tasks); } } diff --git a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java index f41d413569..e653d3d6f5 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java @@ -499,9 +499,7 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) { "getWorkflow", json.length(), "n/a", workflow.getWorkflowName()); if (includeTasks) { List tasks = getTasksForWorkflow(workflowId); - tasks.sort( - Comparator.comparingLong(TaskModel::getScheduledTime) - .thenComparingInt(TaskModel::getSeq)); + tasks.sort(Comparator.comparingInt(TaskModel::getSeq)); workflow.setTasks(tasks); } } diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy index 408c884e49..d0dd8be3ec 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy @@ -714,7 +714,7 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { } def "Test wait time out task based simple workflow"() { - when: "Start a workflow based on a task that has a registered wiat time out" + when: "Start a workflow based on a task that has a registered wait time out" def workflowInstanceId = workflowExecutor.startWorkflow(WAIT_TIME_OUT_WORKFLOW, 1, '', [:], null, null, null)