From a57b40a5f5972a922cafdd15222d9415ca816d25 Mon Sep 17 00:00:00 2001 From: Remco Buddelmeijer Date: Mon, 17 Oct 2022 18:07:51 +0200 Subject: [PATCH 1/8] Remove tasks from Elasticsearch when workflow is removed (#1505) * Add removeTask method for IndexDAO Implement removeTask and asyncRemoveTask methods to the IndexDAO interface. Implemented in ElasticSearchDAOV6, ElasticSearchRestDAOV6, and NoopIndexDAO. * Implement removal of tasks for removeWorkflow - Update ExecutionDAOFacade#removeWorkflow with two new parameters: removeTasks and archiveTasks - Added ExecutionDAOFacade#removeTaskIndex - Added methods updateTask and asyncUpdateWorkflow to IndexDAO - Added methods updateTask and asyncUpdateWorkflow to ElasticSearchDAOV6 and ElasticSearchRestDAOV6 * Add tests for removing workflow through /workflow/remove - Introduced tests for WorkflowServiceTest - Introduced tests for WorkflowResourceTest - Introduced test for ExecutionDAOFacadeTest - Introduced test for QueueResilienceSpec.groovy * Update Javadoc to include task archival when workflow archived and removeTasks true --- .../core/dal/ExecutionDAOFacade.java | 75 +++++++++++++++- .../core/execution/WorkflowExecutor.java | 3 +- .../conductor/core/index/NoopIndexDAO.java | 17 ++++ .../com/netflix/conductor/dao/IndexDAO.java | 39 ++++++++ .../conductor/service/ExecutionService.java | 4 +- .../conductor/service/WorkflowService.java | 7 +- .../service/WorkflowServiceImpl.java | 8 +- .../core/dal/ExecutionDAOFacadeTest.java | 57 ++++++++++-- .../service/WorkflowServiceTest.java | 36 +++++--- .../es6/dao/index/ElasticSearchDAOV6.java | 90 +++++++++++++++++++ .../es6/dao/index/ElasticSearchRestDAOV6.java | 89 ++++++++++++++++++ .../server/service/WorkflowServiceImpl.java | 3 +- .../rest/controllers/WorkflowResource.java | 6 +- .../controllers/WorkflowResourceTest.java | 14 ++- .../resiliency/QueueResiliencySpec.groovy | 51 +++++++++-- 15 files changed, 457 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index b8ac050eca..89cb28580d 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -44,6 +44,7 @@ import com.netflix.conductor.core.exception.TerminateWorkflowException; import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils; +import com.netflix.conductor.core.utils.QueueUtils; import com.netflix.conductor.dao.*; import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.TaskModel; @@ -333,10 +334,26 @@ public void removeFromPendingWorkflow(String workflowType, String workflowId) { * * @param workflowId the id of the workflow to be removed * @param archiveWorkflow if true, the workflow will be archived in the {@link IndexDAO} after - * removal from {@link ExecutionDAO} + * removal from {@link ExecutionDAO}. Next to this if removeTasks is true, the tasks + * associated with the workflow will also be archived in the {@link IndexDAO} after removal + * from {@link ExecutionDAO}. + * @param removeTasks if true, the tasks associated with the workflow will be removed */ - public void removeWorkflow(String workflowId, boolean archiveWorkflow) { + public void removeWorkflow(String workflowId, boolean archiveWorkflow, boolean removeTasks) { + if (!removeTasks) { + LOGGER.info("Not removing tasks of workflow: {}", workflowId); + } + WorkflowModel workflow = getWorkflowModelFromDataStore(workflowId, true); + List tasks = workflow.getTasks(); + + executionDAO.removeWorkflow(workflowId); + if (removeTasks) { + tasks.forEach( + task -> { + executionDAO.removeTask(task.getTaskId()); + }); + } try { removeWorkflowIndex(workflow, archiveWorkflow); @@ -344,13 +361,42 @@ public void removeWorkflow(String workflowId, boolean archiveWorkflow) { throw new TransientException("Workflow can not be serialized to json", e); } - executionDAO.removeWorkflow(workflowId); + if (removeTasks) { + tasks.forEach( + task -> { + try { + removeTaskIndex(workflow, task, archiveWorkflow); + } catch (JsonProcessingException e) { + throw new TransientException( + String.format( + "Task %s of workflow %s can not be serialized to json", + task.getTaskId(), workflow.getWorkflowId()), + e); + } + }); + } try { queueDAO.remove(DECIDER_QUEUE, workflowId); } catch (Exception e) { LOGGER.info("Error removing workflow: {} from decider queue", workflowId, e); } + + if (removeTasks) { + tasks.forEach( + task -> { + try { + queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()); + } catch (Exception e) { + LOGGER.info( + "Error removing task: {} of workflow: {} from {} queue", + workflowId, + task.getTaskId(), + QueueUtils.getQueueName(task), + e); + } + }); + } } private void removeWorkflowIndex(WorkflowModel workflow, boolean archiveWorkflow) @@ -509,6 +555,29 @@ public void removeTask(String taskId) { executionDAO.removeTask(taskId); } + private void removeTaskIndex(WorkflowModel workflow, TaskModel task, boolean archiveTask) + throws JsonProcessingException { + if (archiveTask) { + if (task.getStatus().isTerminal()) { + // Only allow archival if task is in terminal state + // DO NOT archive async, since if archival errors out, task data will be lost + indexDAO.updateTask( + workflow.getWorkflowId(), + task.getTaskId(), + new String[] {RAW_JSON_FIELD, ARCHIVED_FIELD}, + new Object[] {objectMapper.writeValueAsString(task), true}); + } else { + throw new IllegalArgumentException( + String.format( + "Cannot archive task: %s of workflow: %s with status: %s", + task.getTaskId(), workflow.getWorkflowId(), task.getStatus())); + } + } else { + // Not archiving, remove task from index + indexDAO.asyncRemoveTask(workflow.getWorkflowId(), task.getTaskId()); + } + } + public void extendLease(TaskModel taskModel) { taskModel.setUpdateTime(System.currentTimeMillis()); executionDAO.updateTask(taskModel); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 83cb435dae..a1c1de3589 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -389,8 +389,9 @@ public String startWorkflow( // It's possible the remove workflow call hits an exception as well, in that case we // want to log both errors to help diagnosis. + // For now the tasks are not removed or archived. try { - executionDAOFacade.removeWorkflow(workflowId, false); + executionDAOFacade.removeWorkflow(workflowId, false, false); } catch (Exception rwe) { LOGGER.error("Could not remove the workflowId: " + workflowId, rwe); } diff --git a/core/src/main/java/com/netflix/conductor/core/index/NoopIndexDAO.java b/core/src/main/java/com/netflix/conductor/core/index/NoopIndexDAO.java index 4a7f427cbc..05d6550b78 100644 --- a/core/src/main/java/com/netflix/conductor/core/index/NoopIndexDAO.java +++ b/core/src/main/java/com/netflix/conductor/core/index/NoopIndexDAO.java @@ -78,6 +78,23 @@ public CompletableFuture asyncUpdateWorkflow( return CompletableFuture.completedFuture(null); } + @Override + public void removeTask(String workflowId, String taskId) {} + + @Override + public CompletableFuture asyncRemoveTask(String workflowId, String taskId) { + return CompletableFuture.completedFuture(null); + } + + @Override + public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) {} + + @Override + public CompletableFuture asyncUpdateTask( + String workflowId, String taskId, String[] keys, Object[] values) { + return CompletableFuture.completedFuture(null); + } + @Override public String get(String workflowInstanceId, String key) { return null; diff --git a/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java b/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java index 490758d151..927856657a 100644 --- a/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java @@ -111,6 +111,45 @@ SearchResult searchTasks( CompletableFuture asyncUpdateWorkflow( String workflowInstanceId, String[] keys, Object[] values); + /** + * Remove the task index + * + * @param workflowId workflow containing task + * @param taskId task to be removed + */ + void removeTask(String workflowId, String taskId); + + /** + * Remove the task index asynchronously + * + * @param workflowId workflow containing task + * @param taskId task to be removed + * @return CompletableFuture of type void + */ + CompletableFuture asyncRemoveTask(String workflowId, String taskId); + + /** + * Updates the index + * + * @param workflowId id of the workflow + * @param taskId id of the task + * @param keys keys to be updated + * @param values values. Number of keys and values MUST match. + */ + void updateTask(String workflowId, String taskId, String[] keys, Object[] values); + + /** + * Updates the index + * + * @param workflowId id of the workflow + * @param taskId id of the task + * @param keys keys to be updated + * @param values values. Number of keys and values MUST match. + * @return CompletableFuture of type void + */ + CompletableFuture asyncUpdateTask( + String workflowId, String taskId, String[] keys, Object[] values); + /** * Retrieves a specific field from the index * diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index 56cefd255c..9835b41b97 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -378,8 +378,8 @@ public List getRunningWorkflows(String workflowName, int version) { return executionDAOFacade.getRunningWorkflowIds(workflowName, version); } - public void removeWorkflow(String workflowId, boolean archiveWorkflow) { - executionDAOFacade.removeWorkflow(workflowId, archiveWorkflow); + public void removeWorkflow(String workflowId, boolean archiveWorkflow, boolean removeTasks) { + executionDAOFacade.removeWorkflow(workflowId, archiveWorkflow, removeTasks); } public SearchResult search( diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowService.java b/core/src/main/java/com/netflix/conductor/service/WorkflowService.java index 10f9e18425..29359b6b33 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowService.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowService.java @@ -171,11 +171,14 @@ Workflow getExecutionStatus( * Removes the workflow from the system. * * @param workflowId WorkflowID of the workflow you want to remove from system. - * @param archiveWorkflow Archives the workflow. + * @param archiveWorkflow Archives the workflow instead of removing it. Next to this if + * removeTasks is true, the tasks associated with the workflow will be archived as well. + * @param removeTasks Whether to remove the associated tasks from system. */ void deleteWorkflow( @NotEmpty(message = "WorkflowId cannot be null or empty.") String workflowId, - boolean archiveWorkflow); + boolean archiveWorkflow, + boolean removeTasks); /** * Retrieves all the running workflows. diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java index 68565d9bc2..cda6a9266c 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java @@ -252,10 +252,12 @@ public Workflow getExecutionStatus(String workflowId, boolean includeTasks) { * Removes the workflow from the system. * * @param workflowId WorkflowID of the workflow you want to remove from system. - * @param archiveWorkflow Archives the workflow. + * @param archiveWorkflow Archives the workflow instead of removing it. Next to this if + * removeTasks is true, the tasks associated with the workflow will be archived as well. + * @param removeTasks Whether to remove the associated tasks from system. */ - public void deleteWorkflow(String workflowId, boolean archiveWorkflow) { - executionService.removeWorkflow(workflowId, archiveWorkflow); + public void deleteWorkflow(String workflowId, boolean archiveWorkflow, boolean removeTasks) { + executionService.removeWorkflow(workflowId, archiveWorkflow, removeTasks); } /** diff --git a/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java b/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java index 69a4f83c2a..d251c4eb39 100644 --- a/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java +++ b/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java @@ -137,24 +137,65 @@ public void testGetWorkflowsByCorrelationId() { } @Test - public void testRemoveWorkflow() { + public void testRemoveWorkflowWithoutTasks() { WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowId("workflowId"); workflow.setStatus(WorkflowModel.Status.COMPLETED); + + TaskModel task = new TaskModel(); + task.setTaskId("taskId"); + workflow.setTasks(Collections.singletonList(task)); + + when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); + executionDAOFacade.removeWorkflow("workflowId", false, false); + verify(indexDAO, never()).updateWorkflow(anyString(), any(), any()); + verify(indexDAO, never()).updateTask(anyString(), anyString(), any(), any()); + verify(indexDAO, times(1)).asyncRemoveWorkflow(anyString()); + verify(indexDAO, never()).asyncRemoveTask(anyString(), anyString()); + } + + @Test + public void testRemoveWorkflowWithTasks() { + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowId("workflowId"); + workflow.setStatus(WorkflowModel.Status.COMPLETED); + + TaskModel task = new TaskModel(); + task.setTaskId("taskId"); + workflow.setTasks(Collections.singletonList(task)); + + when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); + executionDAOFacade.removeWorkflow("workflowId", false, true); + verify(indexDAO, never()).updateWorkflow(anyString(), any(), any()); + verify(indexDAO, never()).updateTask(anyString(), anyString(), any(), any()); + verify(indexDAO, times(1)).asyncRemoveWorkflow(anyString()); + verify(indexDAO, times(1)).asyncRemoveTask(anyString(), anyString()); + } + + @Test + public void testArchiveWorkflowWithoutTasks() throws Exception { + InputStream stream = TestDeciderService.class.getResourceAsStream("/completed.json"); + WorkflowModel workflow = objectMapper.readValue(stream, WorkflowModel.class); + when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); - executionDAOFacade.removeWorkflow("workflowId", false); - verify(indexDAO, never()).updateWorkflow(any(), any(), any()); - verify(indexDAO, times(1)).asyncRemoveWorkflow(workflow.getWorkflowId()); + executionDAOFacade.removeWorkflow("workflowId", true, false); + verify(indexDAO, times(1)).updateWorkflow(anyString(), any(), any()); + verify(indexDAO, never()).updateTask(anyString(), anyString(), any(), any()); + verify(indexDAO, never()).removeWorkflow(anyString()); + verify(indexDAO, never()).removeTask(anyString(), anyString()); } @Test - public void testArchiveWorkflow() throws Exception { + public void testArchiveWorkflowWithTasks() throws Exception { InputStream stream = TestDeciderService.class.getResourceAsStream("/completed.json"); WorkflowModel workflow = objectMapper.readValue(stream, WorkflowModel.class); when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); - executionDAOFacade.removeWorkflow("workflowId", true); - verify(indexDAO, times(1)).updateWorkflow(any(), any(), any()); - verify(indexDAO, never()).removeWorkflow(any()); + executionDAOFacade.removeWorkflow("workflowId", true, true); + verify(indexDAO, times(1)).updateWorkflow(anyString(), any(), any()); + verify(indexDAO, times(15)).updateTask(anyString(), anyString(), any(), any()); + verify(indexDAO, never()).removeWorkflow(anyString()); + verify(indexDAO, never()).removeTask(anyString(), anyString()); } @Test diff --git a/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java b/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java index aef6e5a2eb..0821611453 100644 --- a/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java @@ -41,13 +41,7 @@ import static com.netflix.conductor.TestUtils.getConstraintViolationMessages; import static org.junit.Assert.*; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -244,15 +238,33 @@ public void testNotFoundExceptionGetExecutionStatus() { } @Test - public void testDeleteWorkflow() { - workflowService.deleteWorkflow("w123", true); - verify(executionService, times(1)).removeWorkflow(anyString(), anyBoolean()); + public void testDeleteWorkflowWithoutTask() { + workflowService.deleteWorkflow("w123", true, false); + verify(executionService, times(1)).removeWorkflow(anyString(), anyBoolean(), eq(false)); + } + + @Test + public void testDeleteWorkflowWithTask() { + workflowService.deleteWorkflow("w123", true, true); + verify(executionService, times(1)).removeWorkflow(anyString(), anyBoolean(), eq(true)); + } + + @Test(expected = ConstraintViolationException.class) + public void testInvalidDeleteWorkflowWithoutTask() { + try { + workflowService.deleteWorkflow(null, true, false); + } catch (ConstraintViolationException ex) { + assertEquals(1, ex.getConstraintViolations().size()); + Set messages = getConstraintViolationMessages(ex.getConstraintViolations()); + assertTrue(messages.contains("WorkflowId cannot be null or empty.")); + throw ex; + } } @Test(expected = ConstraintViolationException.class) - public void testInvalidDeleteWorkflow() { + public void testInvalidDeleteWorkflowWithTask() { try { - workflowService.deleteWorkflow(null, true); + workflowService.deleteWorkflow(null, true, true); } catch (ConstraintViolationException ex) { assertEquals(1, ex.getConstraintViolations().size()); Set messages = getConstraintViolationMessages(ex.getConstraintViolations()); diff --git a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java index 716be69815..4361eb7438 100644 --- a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java +++ b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java @@ -732,6 +732,96 @@ public CompletableFuture asyncUpdateWorkflow( () -> updateWorkflow(workflowInstanceId, keys, values), executorService); } + @Override + public void removeTask(String workflowId, String taskId) { + try { + long startTime = Instant.now().toEpochMilli(); + String docType = StringUtils.isBlank(docTypeOverride) ? TASK_DOC_TYPE : docTypeOverride; + + SearchResult taskSearchResult = + searchTasks( + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("workflowId", workflowId)) + .must(QueryBuilders.termQuery("taskId", taskId)) + .toString(), + "*", + 0, + 1, + null); + + if (taskSearchResult.getTotalHits() == 0) { + LOGGER.error("Task: {} does not belong to workflow: {}", taskId, workflowId); + } + + DeleteRequest request = new DeleteRequest(taskIndexName, docType, taskId); + DeleteResponse response = elasticSearchClient.delete(request).actionGet(); + long endTime = Instant.now().toEpochMilli(); + + if (response.getResult() == DocWriteResponse.Result.DELETED) { + LOGGER.error( + "Index removal failed - task not found by id: {} of workflow: {}", + taskId, + workflowId); + } + LOGGER.debug( + "Time taken {} for removing task:{} of workflow: {}", + endTime - startTime, + taskId, + workflowId); + Monitors.recordESIndexTime("remove_task", docType, endTime - startTime); + Monitors.recordWorkerQueueSize( + "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size()); + } catch (Exception e) { + LOGGER.error( + "Failed to remove task: {} of workflow: {} from index", taskId, workflowId, e); + Monitors.error(CLASS_NAME, "remove"); + } + } + + @Override + public CompletableFuture asyncRemoveTask(String workflowId, String taskId) { + return CompletableFuture.runAsync(() -> removeTask(workflowId, taskId), executorService); + } + + @Override + public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) { + if (keys.length != values.length) { + throw new IllegalArgumentException("Number of keys and values do not match"); + } + + long startTime = Instant.now().toEpochMilli(); + String docType = StringUtils.isBlank(docTypeOverride) ? TASK_DOC_TYPE : docTypeOverride; + + UpdateRequest request = new UpdateRequest(taskIndexName, docType, taskId); + Map source = + IntStream.range(0, keys.length) + .boxed() + .collect(Collectors.toMap(i -> keys[i], i -> values[i])); + request.doc(source); + LOGGER.debug( + "Updating task: {} of workflow: {} in elasticsearch index: {}", + taskId, + workflowId, + taskIndexName); + elasticSearchClient.update(request).actionGet(); + long endTime = Instant.now().toEpochMilli(); + LOGGER.debug( + "Time taken {} for updating task: {} of workflow: {}", + endTime - startTime, + taskId, + workflowId); + Monitors.recordESIndexTime("update_task", docType, endTime - startTime); + Monitors.recordWorkerQueueSize( + "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size()); + } + + @Override + public CompletableFuture asyncUpdateTask( + String workflowId, String taskId, String[] keys, Object[] values) { + return CompletableFuture.runAsync( + () -> updateTask(workflowId, taskId, keys, values), executorService); + } + @Override public String get(String workflowInstanceId, String fieldToGet) { String docType = StringUtils.isBlank(docTypeOverride) ? WORKFLOW_DOC_TYPE : docTypeOverride; diff --git a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java index 764329ab35..d70a00bcc2 100644 --- a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java +++ b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java @@ -810,6 +810,95 @@ public CompletableFuture asyncUpdateWorkflow( () -> updateWorkflow(workflowInstanceId, keys, values), executorService); } + @Override + public void removeTask(String workflowId, String taskId) { + long startTime = Instant.now().toEpochMilli(); + String docType = StringUtils.isBlank(docTypeOverride) ? TASK_DOC_TYPE : docTypeOverride; + + SearchResult taskSearchResult = + searchTasks( + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("workflowId", workflowId)) + .must(QueryBuilders.termQuery("taskId", taskId)) + .toString(), + "*", + 0, + 1, + null); + + if (taskSearchResult.getTotalHits() == 0) { + LOGGER.error("Task: {} does not belong to workflow: {}", taskId, workflowId); + } + + DeleteRequest request = new DeleteRequest(taskIndexName, docType, taskId); + + try { + DeleteResponse response = elasticSearchClient.delete(request); + + if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) { + LOGGER.error("Index removal failed - task not found by id: {}", workflowId); + } + long endTime = Instant.now().toEpochMilli(); + LOGGER.debug( + "Time taken {} for removing task:{} of workflow: {}", + endTime - startTime, + taskId, + workflowId); + Monitors.recordESIndexTime("remove_task", docType, endTime - startTime); + Monitors.recordWorkerQueueSize( + "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size()); + } catch (IOException e) { + LOGGER.error( + "Failed to remove task {} of workflow: {} from index", taskId, workflowId, e); + Monitors.error(className, "remove"); + } + } + + @Override + public CompletableFuture asyncRemoveTask(String workflowId, String taskId) { + return CompletableFuture.runAsync(() -> removeTask(workflowId, taskId), executorService); + } + + @Override + public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) { + try { + if (keys.length != values.length) { + throw new IllegalArgumentException("Number of keys and values do not match"); + } + + long startTime = Instant.now().toEpochMilli(); + String docType = StringUtils.isBlank(docTypeOverride) ? TASK_DOC_TYPE : docTypeOverride; + UpdateRequest request = new UpdateRequest(taskIndexName, docType, taskId); + Map source = + IntStream.range(0, keys.length) + .boxed() + .collect(Collectors.toMap(i -> keys[i], i -> values[i])); + request.doc(source); + + LOGGER.debug("Updating task: {} of workflow: {} with {}", taskId, workflowId, source); + elasticSearchClient.update(request, RequestOptions.DEFAULT); + long endTime = Instant.now().toEpochMilli(); + LOGGER.debug( + "Time taken {} for updating task: {} of workflow: {}", + endTime - startTime, + taskId, + workflowId); + Monitors.recordESIndexTime("update_task", docType, endTime - startTime); + Monitors.recordWorkerQueueSize( + "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size()); + } catch (Exception e) { + LOGGER.error("Failed to update task: {} of workflow: {}", taskId, workflowId, e); + Monitors.error(className, "update"); + } + } + + @Override + public CompletableFuture asyncUpdateTask( + String workflowId, String taskId, String[] keys, Object[] values) { + return CompletableFuture.runAsync( + () -> updateTask(workflowId, taskId, keys, values), executorService); + } + @Override public String get(String workflowInstanceId, String fieldToGet) { diff --git a/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java b/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java index 3b10b33106..63d27f6cb4 100644 --- a/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java +++ b/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java @@ -134,7 +134,8 @@ public void removeWorkflow( WorkflowServicePb.RemoveWorkflowRequest req, StreamObserver response) { try { - workflowService.deleteWorkflow(req.getWorkflodId(), req.getArchiveWorkflow()); + // TODO: Follow up with modifying the request grpc + workflowService.deleteWorkflow(req.getWorkflodId(), req.getArchiveWorkflow(), false); response.onNext(WorkflowServicePb.RemoveWorkflowResponse.getDefaultInstance()); response.onCompleted(); } catch (Exception e) { diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java index bb986592e4..0bab84544e 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java @@ -111,8 +111,10 @@ public Workflow getExecutionStatus( public void delete( @PathVariable("workflowId") String workflowId, @RequestParam(value = "archiveWorkflow", defaultValue = "true", required = false) - boolean archiveWorkflow) { - workflowService.deleteWorkflow(workflowId, archiveWorkflow); + boolean archiveWorkflow, + @RequestParam(value = "removeTask", defaultValue = "false", required = false) + boolean removeTasks) { + workflowService.deleteWorkflow(workflowId, archiveWorkflow, removeTasks); } @GetMapping("/running/{name}") diff --git a/rest/src/test/java/com/netflix/conductor/rest/controllers/WorkflowResourceTest.java b/rest/src/test/java/com/netflix/conductor/rest/controllers/WorkflowResourceTest.java index d128962147..ad14771cfd 100644 --- a/rest/src/test/java/com/netflix/conductor/rest/controllers/WorkflowResourceTest.java +++ b/rest/src/test/java/com/netflix/conductor/rest/controllers/WorkflowResourceTest.java @@ -130,9 +130,17 @@ public void testGetExecutionStatus() { } @Test - public void testDelete() { - workflowResource.delete("w123", true); - verify(mockWorkflowService, times(1)).deleteWorkflow(anyString(), anyBoolean()); + public void testDeleteWithoutTask() { + workflowResource.delete("w123", true, false); + verify(mockWorkflowService, times(1)) + .deleteWorkflow(anyString(), anyBoolean(), anyBoolean()); + } + + @Test + public void testDeleteWithTask() { + workflowResource.delete("w123", true, true); + verify(mockWorkflowService, times(1)) + .deleteWorkflow(anyString(), anyBoolean(), anyBoolean()); } @Test diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy index 3834323f38..4a7391f5ba 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy @@ -23,6 +23,8 @@ import com.netflix.conductor.common.run.Workflow import com.netflix.conductor.common.utils.ExternalPayloadStorage import com.netflix.conductor.core.exception.NotFoundException import com.netflix.conductor.core.exception.TransientException +import com.netflix.conductor.core.utils.QueueUtils +import com.netflix.conductor.core.utils.Utils import com.netflix.conductor.rest.controllers.TaskResource import com.netflix.conductor.rest.controllers.WorkflowResource import com.netflix.conductor.test.base.AbstractResiliencySpecification @@ -237,7 +239,7 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { notThrown(Exception) } - def "Verify remove workflow succeeds when QueueDAO is unavailable"() { + def "Verify remove workflow without tasks succeeds when QueueDAO is unavailable"() { when: "Start a simple workflow" def workflowInstanceId = workflowResource.startWorkflow(new StartWorkflowRequest() .withName(SIMPLE_TWO_TASK_WORKFLOW) @@ -252,13 +254,52 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { } when: "We get a workflow when QueueDAO is unavailable" - workflowResource.delete(workflowInstanceId, false) + workflowResource.delete(workflowInstanceId, false, false) then: "Verify queueDAO is called to remove from _deciderQueue" - 1 * queueDAO._ + 1 * queueDAO.remove(Utils.DECIDER_QUEUE, _) - when: "We try to get deleted workflow" - workflowResource.getExecutionStatus(workflowInstanceId, true) + when: "We try to get deleted workflow, verify the status and check if tasks are not removed from queue" + with(workflowResource.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.TERMINATED + tasks.size() == 1 + tasks[0].taskType == 'integration_task_1' + tasks[0].status == Task.Status.CANCELED + 0 * queueDAO.remove(QueueUtils.getQueueName(tasks[0]), _) + } + + then: + thrown(NotFoundException.class) + } + + def "Verify remove workflow with tasks succeeds when QueueDAO is unavailable"() { + when: "Start a simple workflow" + def workflowInstanceId = workflowResource.startWorkflow(new StartWorkflowRequest() + .withName(SIMPLE_TWO_TASK_WORKFLOW) + .withVersion(1)) + then: "Verify workflow is started" + + with(workflowResource.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].taskType == 'integration_task_1' + tasks[0].status == Task.Status.SCHEDULED + } + + when: "We get a workflow when QueueDAO is unavailable" + workflowResource.delete(workflowInstanceId, false, true) + + then: "Verify queueDAO is called to remove from _deciderQueue" + 1 * queueDAO.remove(Utils.DECIDER_QUEUE, _) + + when: "We try to get deleted workflow, verify the status and check if tasks are removed from queue" + with(workflowResource.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.TERMINATED + tasks.size() == 1 + tasks[0].taskType == 'integration_task_1' + tasks[0].status == Task.Status.CANCELED + 1 * queueDAO.remove(QueueUtils.getQueueName(tasks[0]), _) + } then: thrown(NotFoundException.class) From 92a8801b15fe8d55df1ee9e81c968a95f5450f80 Mon Sep 17 00:00:00 2001 From: RemcoBuddelmeijer Date: Mon, 17 Oct 2022 23:21:00 +0200 Subject: [PATCH 2/8] Implement fix for when tasks are not removed (#1505) - Now tasks are search and associated with workflows through a search query directly embedded rather than constructed - If a task can not be removed it will error and return - Monitors are updated automatically - Updated TestElasticSearchRestDAOV6 and TestElasticSearchDAOV6 --- .../es6/dao/index/ElasticSearchDAOV6.java | 14 ++-- .../es6/dao/index/ElasticSearchRestDAOV6.java | 13 +-- .../es6/dao/index/TestElasticSearchDAOV6.java | 82 ++++++++++++++++++- .../dao/index/TestElasticSearchRestDAOV6.java | 81 +++++++++++++++++- .../conductor/es6/utils/TestUtils.java | 12 +++ 5 files changed, 186 insertions(+), 16 deletions(-) diff --git a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java index 4361eb7438..b99c858549 100644 --- a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java +++ b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java @@ -740,10 +740,8 @@ public void removeTask(String workflowId, String taskId) { SearchResult taskSearchResult = searchTasks( - QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery("workflowId", workflowId)) - .must(QueryBuilders.termQuery("taskId", taskId)) - .toString(), + String.format( + "(taskId='%s') AND (workflowId='%s')", taskId, workflowId), "*", 0, 1, @@ -751,17 +749,21 @@ public void removeTask(String workflowId, String taskId) { if (taskSearchResult.getTotalHits() == 0) { LOGGER.error("Task: {} does not belong to workflow: {}", taskId, workflowId); + Monitors.error(CLASS_NAME, "removeTask"); + return; } DeleteRequest request = new DeleteRequest(taskIndexName, docType, taskId); DeleteResponse response = elasticSearchClient.delete(request).actionGet(); long endTime = Instant.now().toEpochMilli(); - if (response.getResult() == DocWriteResponse.Result.DELETED) { + if (response.getResult() != DocWriteResponse.Result.DELETED) { LOGGER.error( "Index removal failed - task not found by id: {} of workflow: {}", taskId, workflowId); + Monitors.error(CLASS_NAME, "removeTask"); + return; } LOGGER.debug( "Time taken {} for removing task:{} of workflow: {}", @@ -774,7 +776,7 @@ public void removeTask(String workflowId, String taskId) { } catch (Exception e) { LOGGER.error( "Failed to remove task: {} of workflow: {} from index", taskId, workflowId, e); - Monitors.error(CLASS_NAME, "remove"); + Monitors.error(CLASS_NAME, "removeTask"); } } diff --git a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java index d70a00bcc2..6ebf161329 100644 --- a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java +++ b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java @@ -817,10 +817,7 @@ public void removeTask(String workflowId, String taskId) { SearchResult taskSearchResult = searchTasks( - QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery("workflowId", workflowId)) - .must(QueryBuilders.termQuery("taskId", taskId)) - .toString(), + String.format("(taskId='%s') AND (workflowId='%s')", taskId, workflowId), "*", 0, 1, @@ -828,6 +825,8 @@ public void removeTask(String workflowId, String taskId) { if (taskSearchResult.getTotalHits() == 0) { LOGGER.error("Task: {} does not belong to workflow: {}", taskId, workflowId); + Monitors.error(className, "removeTask"); + return; } DeleteRequest request = new DeleteRequest(taskIndexName, docType, taskId); @@ -835,8 +834,10 @@ public void removeTask(String workflowId, String taskId) { try { DeleteResponse response = elasticSearchClient.delete(request); - if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) { + if (response.getResult() != DocWriteResponse.Result.DELETED) { LOGGER.error("Index removal failed - task not found by id: {}", workflowId); + Monitors.error(className, "removeTask"); + return; } long endTime = Instant.now().toEpochMilli(); LOGGER.debug( @@ -850,7 +851,7 @@ public void removeTask(String workflowId, String taskId) { } catch (IOException e) { LOGGER.error( "Failed to remove task {} of workflow: {} from index", taskId, workflowId, e); - Monitors.error(className, "remove"); + Monitors.error(className, "removeTask"); } } diff --git a/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchDAOV6.java b/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchDAOV6.java index 1597d3cb16..75dd64454c 100644 --- a/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchDAOV6.java +++ b/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchDAOV6.java @@ -34,8 +34,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableMap; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; +import static org.junit.Assert.assertFalse; public class TestElasticSearchDAOV6 extends ElasticSearchDaoBaseTest { @@ -196,6 +196,84 @@ public void shouldIndexTaskAsync() throws Exception { assertEquals(taskSummary.getTaskId(), tasks.get(0)); } + @Test + public void shouldRemoveTask() { + WorkflowSummary workflowSummary = + TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary"); + indexDAO.indexWorkflow(workflowSummary); + + // wait for workflow to be indexed + tryFindResults(() -> searchWorkflows(workflowSummary.getWorkflowId()), 1); + + TaskSummary taskSummary = + TestUtils.loadTaskSnapshot( + objectMapper, "task_summary", workflowSummary.getWorkflowId()); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.removeTask(workflowSummary.getWorkflowId(), taskSummary.getTaskId()); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertTrue("Task was not removed.", tasks.isEmpty()); + } + + @Test + public void shouldAsyncRemoveTask() throws Exception { + WorkflowSummary workflowSummary = + TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary"); + indexDAO.indexWorkflow(workflowSummary); + + // wait for workflow to be indexed + tryFindResults(() -> searchWorkflows(workflowSummary.getWorkflowId()), 1); + + TaskSummary taskSummary = + TestUtils.loadTaskSnapshot( + objectMapper, "task_summary", workflowSummary.getWorkflowId()); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.asyncRemoveTask(workflowSummary.getWorkflowId(), taskSummary.getTaskId()).get(); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertTrue("Task was not removed.", tasks.isEmpty()); + } + + @Test + public void shouldNotRemoveTaskWhenNotAssociatedWithWorkflow() { + TaskSummary taskSummary = TestUtils.loadTaskSnapshot(objectMapper, "task_summary"); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.removeTask("InvalidWorkflow", taskSummary.getTaskId()); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertFalse("Task was removed.", tasks.isEmpty()); + } + + @Test + public void shouldNotAsyncRemoveTaskWhenNotAssociatedWithWorkflow() throws Exception { + TaskSummary taskSummary = TestUtils.loadTaskSnapshot(objectMapper, "task_summary"); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.asyncRemoveTask("InvalidWorkflow", taskSummary.getTaskId()).get(); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertFalse("Task was removed.", tasks.isEmpty()); + } + @Test public void shouldAddTaskExecutionLogs() { List logs = new ArrayList<>(); diff --git a/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchRestDAOV6.java b/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchRestDAOV6.java index 50117ce138..154b891add 100644 --- a/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchRestDAOV6.java +++ b/es6-persistence/src/test/java/com/netflix/conductor/es6/dao/index/TestElasticSearchRestDAOV6.java @@ -38,8 +38,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableMap; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class TestElasticSearchRestDAOV6 extends ElasticSearchRestDaoBaseTest { @@ -195,6 +194,84 @@ public void shouldIndexTaskAsync() throws Exception { assertEquals(taskSummary.getTaskId(), tasks.get(0)); } + @Test + public void shouldRemoveTask() { + WorkflowSummary workflowSummary = + TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary"); + indexDAO.indexWorkflow(workflowSummary); + + // wait for workflow to be indexed + tryFindResults(() -> searchWorkflows(workflowSummary.getWorkflowId()), 1); + + TaskSummary taskSummary = + TestUtils.loadTaskSnapshot( + objectMapper, "task_summary", workflowSummary.getWorkflowId()); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.removeTask(workflowSummary.getWorkflowId(), taskSummary.getTaskId()); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertTrue("Task was not removed.", tasks.isEmpty()); + } + + @Test + public void shouldAsyncRemoveTask() throws Exception { + WorkflowSummary workflowSummary = + TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary"); + indexDAO.indexWorkflow(workflowSummary); + + // wait for workflow to be indexed + tryFindResults(() -> searchWorkflows(workflowSummary.getWorkflowId()), 1); + + TaskSummary taskSummary = + TestUtils.loadTaskSnapshot( + objectMapper, "task_summary", workflowSummary.getWorkflowId()); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.asyncRemoveTask(workflowSummary.getWorkflowId(), taskSummary.getTaskId()).get(); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertTrue("Task was not removed.", tasks.isEmpty()); + } + + @Test + public void shouldNotRemoveTaskWhenNotAssociatedWithWorkflow() { + TaskSummary taskSummary = TestUtils.loadTaskSnapshot(objectMapper, "task_summary"); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.removeTask("InvalidWorkflow", taskSummary.getTaskId()); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertFalse("Task was removed.", tasks.isEmpty()); + } + + @Test + public void shouldNotAsyncRemoveTaskWhenNotAssociatedWithWorkflow() throws Exception { + TaskSummary taskSummary = TestUtils.loadTaskSnapshot(objectMapper, "task_summary"); + indexDAO.indexTask(taskSummary); + + // Wait for the task to be indexed + List tasks = tryFindResults(() -> searchTasks(taskSummary), 1); + + indexDAO.asyncRemoveTask("InvalidWorkflow", taskSummary.getTaskId()).get(); + + tasks = tryFindResults(() -> searchTasks(taskSummary), 0); + + assertFalse("Task was removed.", tasks.isEmpty()); + } + @Test public void shouldAddTaskExecutionLogs() { List logs = new ArrayList<>(); diff --git a/es6-persistence/src/test/java/com/netflix/conductor/es6/utils/TestUtils.java b/es6-persistence/src/test/java/com/netflix/conductor/es6/utils/TestUtils.java index 1102cd494d..4cb8d234d7 100644 --- a/es6-persistence/src/test/java/com/netflix/conductor/es6/utils/TestUtils.java +++ b/es6-persistence/src/test/java/com/netflix/conductor/es6/utils/TestUtils.java @@ -52,6 +52,18 @@ public static TaskSummary loadTaskSnapshot(ObjectMapper objectMapper, String res } } + public static TaskSummary loadTaskSnapshot( + ObjectMapper objectMapper, String resourceFileName, String workflowId) { + try { + String content = loadJsonResource(resourceFileName); + content = content.replace(WORKFLOW_INSTANCE_ID_PLACEHOLDER, workflowId); + + return objectMapper.readValue(content, TaskSummary.class); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + public static String loadJsonResource(String resourceFileName) { try { return FileUtils.readFileToString( From 4ebe63801493d77ced14b10e497bed544357a6e1 Mon Sep 17 00:00:00 2001 From: RemcoBuddelmeijer Date: Tue, 18 Oct 2022 00:01:27 +0200 Subject: [PATCH 3/8] Removed rawJSON field from task archival from index DAO (#1505) --- .../com/netflix/conductor/core/dal/ExecutionDAOFacade.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index 89cb28580d..5fac077fb9 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -564,8 +564,8 @@ private void removeTaskIndex(WorkflowModel workflow, TaskModel task, boolean arc indexDAO.updateTask( workflow.getWorkflowId(), task.getTaskId(), - new String[] {RAW_JSON_FIELD, ARCHIVED_FIELD}, - new Object[] {objectMapper.writeValueAsString(task), true}); + new String[] {ARCHIVED_FIELD}, + new Object[] {true}); } else { throw new IllegalArgumentException( String.format( From 338297542a7206bcc80f3af924e607a69c56c240 Mon Sep 17 00:00:00 2001 From: RemcoBuddelmeijer Date: Tue, 18 Oct 2022 01:08:43 +0200 Subject: [PATCH 4/8] Removed removeTasks boolean parameter, now always removing tasks (#1505) --- .../conductor/client/http/WorkflowClient.java | 3 +- .../core/dal/ExecutionDAOFacade.java | 77 ++++++++----------- .../operation/StartWorkflowOperation.java | 2 +- .../conductor/service/ExecutionService.java | 4 +- .../conductor/service/WorkflowService.java | 7 +- .../service/WorkflowServiceImpl.java | 8 +- .../core/dal/ExecutionDAOFacadeTest.java | 39 +--------- .../service/WorkflowServiceTest.java | 26 +++---- .../server/service/WorkflowServiceImpl.java | 3 +- .../rest/controllers/WorkflowResource.java | 6 +- .../controllers/WorkflowResourceTest.java | 14 +--- 11 files changed, 65 insertions(+), 124 deletions(-) diff --git a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index 57b240dd3d..1098525015 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -254,7 +254,8 @@ private void populateWorkflowOutput(Workflow workflow) { * Removes a workflow from the system * * @param workflowId the id of the workflow to be deleted - * @param archiveWorkflow flag to indicate if the workflow should be archived before deletion + * @param archiveWorkflow flag to indicate if the workflow and associated tasks should be + * archived before deletion */ public void deleteWorkflow(String workflowId, boolean archiveWorkflow) { Validate.notBlank(workflowId, "Workflow id cannot be blank"); diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index 5fac077fb9..240b334af1 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -333,27 +333,18 @@ public void removeFromPendingWorkflow(String workflowType, String workflowId) { * Removes the workflow from the data store. * * @param workflowId the id of the workflow to be removed - * @param archiveWorkflow if true, the workflow will be archived in the {@link IndexDAO} after - * removal from {@link ExecutionDAO}. Next to this if removeTasks is true, the tasks - * associated with the workflow will also be archived in the {@link IndexDAO} after removal - * from {@link ExecutionDAO}. - * @param removeTasks if true, the tasks associated with the workflow will be removed + * @param archiveWorkflow if true, the workflow and associated tasks will be archived in the + * {@link IndexDAO} after removal from {@link ExecutionDAO}. */ - public void removeWorkflow(String workflowId, boolean archiveWorkflow, boolean removeTasks) { - if (!removeTasks) { - LOGGER.info("Not removing tasks of workflow: {}", workflowId); - } - + public void removeWorkflow(String workflowId, boolean archiveWorkflow) { WorkflowModel workflow = getWorkflowModelFromDataStore(workflowId, true); List tasks = workflow.getTasks(); executionDAO.removeWorkflow(workflowId); - if (removeTasks) { - tasks.forEach( - task -> { - executionDAO.removeTask(task.getTaskId()); - }); - } + tasks.forEach( + task -> { + executionDAO.removeTask(task.getTaskId()); + }); try { removeWorkflowIndex(workflow, archiveWorkflow); @@ -361,20 +352,18 @@ public void removeWorkflow(String workflowId, boolean archiveWorkflow, boolean r throw new TransientException("Workflow can not be serialized to json", e); } - if (removeTasks) { - tasks.forEach( - task -> { - try { - removeTaskIndex(workflow, task, archiveWorkflow); - } catch (JsonProcessingException e) { - throw new TransientException( - String.format( - "Task %s of workflow %s can not be serialized to json", - task.getTaskId(), workflow.getWorkflowId()), - e); - } - }); - } + tasks.forEach( + task -> { + try { + removeTaskIndex(workflow, task, archiveWorkflow); + } catch (JsonProcessingException e) { + throw new TransientException( + String.format( + "Task %s of workflow %s can not be serialized to json", + task.getTaskId(), workflow.getWorkflowId()), + e); + } + }); try { queueDAO.remove(DECIDER_QUEUE, workflowId); @@ -382,21 +371,19 @@ public void removeWorkflow(String workflowId, boolean archiveWorkflow, boolean r LOGGER.info("Error removing workflow: {} from decider queue", workflowId, e); } - if (removeTasks) { - tasks.forEach( - task -> { - try { - queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()); - } catch (Exception e) { - LOGGER.info( - "Error removing task: {} of workflow: {} from {} queue", - workflowId, - task.getTaskId(), - QueueUtils.getQueueName(task), - e); - } - }); - } + tasks.forEach( + task -> { + try { + queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()); + } catch (Exception e) { + LOGGER.info( + "Error removing task: {} of workflow: {} from {} queue", + workflowId, + task.getTaskId(), + QueueUtils.getQueueName(task), + e); + } + }); } private void removeWorkflowIndex(WorkflowModel workflow, boolean archiveWorkflow) diff --git a/core/src/main/java/com/netflix/conductor/core/operation/StartWorkflowOperation.java b/core/src/main/java/com/netflix/conductor/core/operation/StartWorkflowOperation.java index f9b9221f3d..e9281de641 100644 --- a/core/src/main/java/com/netflix/conductor/core/operation/StartWorkflowOperation.java +++ b/core/src/main/java/com/netflix/conductor/core/operation/StartWorkflowOperation.java @@ -135,7 +135,7 @@ private String startWorkflow(StartWorkflowInput input) { // It's possible the remove workflow call hits an exception as well, in that case we // want to log both errors to help diagnosis. try { - executionDAOFacade.removeWorkflow(workflowId, false, false); + executionDAOFacade.removeWorkflow(workflowId, false); } catch (Exception rwe) { LOGGER.error("Could not remove the workflowId: " + workflowId, rwe); } diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index 9835b41b97..56cefd255c 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -378,8 +378,8 @@ public List getRunningWorkflows(String workflowName, int version) { return executionDAOFacade.getRunningWorkflowIds(workflowName, version); } - public void removeWorkflow(String workflowId, boolean archiveWorkflow, boolean removeTasks) { - executionDAOFacade.removeWorkflow(workflowId, archiveWorkflow, removeTasks); + public void removeWorkflow(String workflowId, boolean archiveWorkflow) { + executionDAOFacade.removeWorkflow(workflowId, archiveWorkflow); } public SearchResult search( diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowService.java b/core/src/main/java/com/netflix/conductor/service/WorkflowService.java index f0883a4e90..bb760ac684 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowService.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowService.java @@ -134,14 +134,11 @@ Workflow getExecutionStatus( * Removes the workflow from the system. * * @param workflowId WorkflowID of the workflow you want to remove from system. - * @param archiveWorkflow Archives the workflow instead of removing it. Next to this if - * removeTasks is true, the tasks associated with the workflow will be archived as well. - * @param removeTasks Whether to remove the associated tasks from system. + * @param archiveWorkflow Archives the workflow and associated tasks instead of removing them. */ void deleteWorkflow( @NotEmpty(message = "WorkflowId cannot be null or empty.") String workflowId, - boolean archiveWorkflow, - boolean removeTasks); + boolean archiveWorkflow); /** * Retrieves all the running workflows. diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java index 61d4939cb6..99732a23a2 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java @@ -190,12 +190,10 @@ public Workflow getExecutionStatus(String workflowId, boolean includeTasks) { * Removes the workflow from the system. * * @param workflowId WorkflowID of the workflow you want to remove from system. - * @param archiveWorkflow Archives the workflow instead of removing it. Next to this if - * removeTasks is true, the tasks associated with the workflow will be archived as well. - * @param removeTasks Whether to remove the associated tasks from system. + * @param archiveWorkflow Archives the workflow and associated tasks instead of removing them. */ - public void deleteWorkflow(String workflowId, boolean archiveWorkflow, boolean removeTasks) { - executionService.removeWorkflow(workflowId, archiveWorkflow, removeTasks); + public void deleteWorkflow(String workflowId, boolean archiveWorkflow) { + executionService.removeWorkflow(workflowId, archiveWorkflow); } /** diff --git a/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java b/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java index d251c4eb39..33dfe92e84 100644 --- a/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java +++ b/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java @@ -137,7 +137,7 @@ public void testGetWorkflowsByCorrelationId() { } @Test - public void testRemoveWorkflowWithoutTasks() { + public void testRemoveWorkflow() { WorkflowModel workflow = new WorkflowModel(); workflow.setWorkflowId("workflowId"); workflow.setStatus(WorkflowModel.Status.COMPLETED); @@ -147,25 +147,7 @@ public void testRemoveWorkflowWithoutTasks() { workflow.setTasks(Collections.singletonList(task)); when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); - executionDAOFacade.removeWorkflow("workflowId", false, false); - verify(indexDAO, never()).updateWorkflow(anyString(), any(), any()); - verify(indexDAO, never()).updateTask(anyString(), anyString(), any(), any()); - verify(indexDAO, times(1)).asyncRemoveWorkflow(anyString()); - verify(indexDAO, never()).asyncRemoveTask(anyString(), anyString()); - } - - @Test - public void testRemoveWorkflowWithTasks() { - WorkflowModel workflow = new WorkflowModel(); - workflow.setWorkflowId("workflowId"); - workflow.setStatus(WorkflowModel.Status.COMPLETED); - - TaskModel task = new TaskModel(); - task.setTaskId("taskId"); - workflow.setTasks(Collections.singletonList(task)); - - when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); - executionDAOFacade.removeWorkflow("workflowId", false, true); + executionDAOFacade.removeWorkflow("workflowId", false); verify(indexDAO, never()).updateWorkflow(anyString(), any(), any()); verify(indexDAO, never()).updateTask(anyString(), anyString(), any(), any()); verify(indexDAO, times(1)).asyncRemoveWorkflow(anyString()); @@ -173,25 +155,12 @@ public void testRemoveWorkflowWithTasks() { } @Test - public void testArchiveWorkflowWithoutTasks() throws Exception { - InputStream stream = TestDeciderService.class.getResourceAsStream("/completed.json"); - WorkflowModel workflow = objectMapper.readValue(stream, WorkflowModel.class); - - when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); - executionDAOFacade.removeWorkflow("workflowId", true, false); - verify(indexDAO, times(1)).updateWorkflow(anyString(), any(), any()); - verify(indexDAO, never()).updateTask(anyString(), anyString(), any(), any()); - verify(indexDAO, never()).removeWorkflow(anyString()); - verify(indexDAO, never()).removeTask(anyString(), anyString()); - } - - @Test - public void testArchiveWorkflowWithTasks() throws Exception { + public void testArchiveWorkflow() throws Exception { InputStream stream = TestDeciderService.class.getResourceAsStream("/completed.json"); WorkflowModel workflow = objectMapper.readValue(stream, WorkflowModel.class); when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); - executionDAOFacade.removeWorkflow("workflowId", true, true); + executionDAOFacade.removeWorkflow("workflowId", true); verify(indexDAO, times(1)).updateWorkflow(anyString(), any(), any()); verify(indexDAO, times(15)).updateTask(anyString(), anyString(), any(), any()); verify(indexDAO, never()).removeWorkflow(anyString()); diff --git a/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java b/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java index 47768daad3..dd61089835 100644 --- a/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java @@ -178,21 +178,15 @@ public void testNotFoundExceptionGetExecutionStatus() { } @Test - public void testDeleteWorkflowWithoutTask() { - workflowService.deleteWorkflow("w123", true, false); - verify(executionService, times(1)).removeWorkflow(anyString(), anyBoolean(), eq(false)); - } - - @Test - public void testDeleteWorkflowWithTask() { - workflowService.deleteWorkflow("w123", true, true); - verify(executionService, times(1)).removeWorkflow(anyString(), anyBoolean(), eq(true)); + public void testDeleteWorkflow() { + workflowService.deleteWorkflow("w123", false); + verify(executionService, times(1)).removeWorkflow(anyString(), eq(false)); } @Test(expected = ConstraintViolationException.class) - public void testInvalidDeleteWorkflowWithoutTask() { + public void testInvalidDeleteWorkflow() { try { - workflowService.deleteWorkflow(null, true, false); + workflowService.deleteWorkflow(null, false); } catch (ConstraintViolationException ex) { assertEquals(1, ex.getConstraintViolations().size()); Set messages = getConstraintViolationMessages(ex.getConstraintViolations()); @@ -201,10 +195,16 @@ public void testInvalidDeleteWorkflowWithoutTask() { } } + @Test + public void testArchiveWorkflow() { + workflowService.deleteWorkflow("w123", true); + verify(executionService, times(1)).removeWorkflow(anyString(), eq(true)); + } + @Test(expected = ConstraintViolationException.class) - public void testInvalidDeleteWorkflowWithTask() { + public void testInvalidArchiveWorkflow() { try { - workflowService.deleteWorkflow(null, true, true); + workflowService.deleteWorkflow(null, true); } catch (ConstraintViolationException ex) { assertEquals(1, ex.getConstraintViolations().size()); Set messages = getConstraintViolationMessages(ex.getConstraintViolations()); diff --git a/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java b/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java index 63d27f6cb4..3b10b33106 100644 --- a/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java +++ b/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java @@ -134,8 +134,7 @@ public void removeWorkflow( WorkflowServicePb.RemoveWorkflowRequest req, StreamObserver response) { try { - // TODO: Follow up with modifying the request grpc - workflowService.deleteWorkflow(req.getWorkflodId(), req.getArchiveWorkflow(), false); + workflowService.deleteWorkflow(req.getWorkflodId(), req.getArchiveWorkflow()); response.onNext(WorkflowServicePb.RemoveWorkflowResponse.getDefaultInstance()); response.onCompleted(); } catch (Exception e) { diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java index 0bab84544e..bb986592e4 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java @@ -111,10 +111,8 @@ public Workflow getExecutionStatus( public void delete( @PathVariable("workflowId") String workflowId, @RequestParam(value = "archiveWorkflow", defaultValue = "true", required = false) - boolean archiveWorkflow, - @RequestParam(value = "removeTask", defaultValue = "false", required = false) - boolean removeTasks) { - workflowService.deleteWorkflow(workflowId, archiveWorkflow, removeTasks); + boolean archiveWorkflow) { + workflowService.deleteWorkflow(workflowId, archiveWorkflow); } @GetMapping("/running/{name}") diff --git a/rest/src/test/java/com/netflix/conductor/rest/controllers/WorkflowResourceTest.java b/rest/src/test/java/com/netflix/conductor/rest/controllers/WorkflowResourceTest.java index ad14771cfd..d128962147 100644 --- a/rest/src/test/java/com/netflix/conductor/rest/controllers/WorkflowResourceTest.java +++ b/rest/src/test/java/com/netflix/conductor/rest/controllers/WorkflowResourceTest.java @@ -130,17 +130,9 @@ public void testGetExecutionStatus() { } @Test - public void testDeleteWithoutTask() { - workflowResource.delete("w123", true, false); - verify(mockWorkflowService, times(1)) - .deleteWorkflow(anyString(), anyBoolean(), anyBoolean()); - } - - @Test - public void testDeleteWithTask() { - workflowResource.delete("w123", true, true); - verify(mockWorkflowService, times(1)) - .deleteWorkflow(anyString(), anyBoolean(), anyBoolean()); + public void testDelete() { + workflowResource.delete("w123", true); + verify(mockWorkflowService, times(1)).deleteWorkflow(anyString(), anyBoolean()); } @Test From 0ae8b3e8ab14b2e67db90008cca979293acba6ce Mon Sep 17 00:00:00 2001 From: RemcoBuddelmeijer Date: Tue, 18 Oct 2022 01:41:35 +0200 Subject: [PATCH 5/8] Update QueueResilienceSpec tests to include archive workflow --- .../conductor/test/resiliency/QueueResiliencySpec.groovy | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy index 4a7391f5ba..d25da524ed 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy @@ -239,7 +239,7 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { notThrown(Exception) } - def "Verify remove workflow without tasks succeeds when QueueDAO is unavailable"() { + def "Verify remove workflow succeeds when QueueDAO is unavailable"() { when: "Start a simple workflow" def workflowInstanceId = workflowResource.startWorkflow(new StartWorkflowRequest() .withName(SIMPLE_TWO_TASK_WORKFLOW) @@ -254,7 +254,7 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { } when: "We get a workflow when QueueDAO is unavailable" - workflowResource.delete(workflowInstanceId, false, false) + workflowResource.delete(workflowInstanceId, false) then: "Verify queueDAO is called to remove from _deciderQueue" 1 * queueDAO.remove(Utils.DECIDER_QUEUE, _) @@ -272,7 +272,7 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { thrown(NotFoundException.class) } - def "Verify remove workflow with tasks succeeds when QueueDAO is unavailable"() { + def "Verify archive workflow succeeds when QueueDAO is unavailable"() { when: "Start a simple workflow" def workflowInstanceId = workflowResource.startWorkflow(new StartWorkflowRequest() .withName(SIMPLE_TWO_TASK_WORKFLOW) @@ -287,7 +287,7 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { } when: "We get a workflow when QueueDAO is unavailable" - workflowResource.delete(workflowInstanceId, false, true) + workflowResource.delete(workflowInstanceId, true) then: "Verify queueDAO is called to remove from _deciderQueue" 1 * queueDAO.remove(Utils.DECIDER_QUEUE, _) From e184068b16e1e283978e6922d9b04855d5a74874 Mon Sep 17 00:00:00 2001 From: RemcoBuddelmeijer Date: Tue, 18 Oct 2022 02:11:27 +0200 Subject: [PATCH 6/8] Update ExecutionDAOFacade to fix removeTask Currently ExecutionDAOFacade would call removeTask double the amount of times, this has been fixed. Now the only bug still present is that tasks are not terminated once removed and with that the index may also not be removed. --- .../core/dal/ExecutionDAOFacade.java | 19 +++++++------------ .../core/dal/ExecutionDAOFacadeTest.java | 4 ++++ 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index 240b334af1..d5571d3170 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -338,20 +338,15 @@ public void removeFromPendingWorkflow(String workflowType, String workflowId) { */ public void removeWorkflow(String workflowId, boolean archiveWorkflow) { WorkflowModel workflow = getWorkflowModelFromDataStore(workflowId, true); - List tasks = workflow.getTasks(); executionDAO.removeWorkflow(workflowId); - tasks.forEach( - task -> { - executionDAO.removeTask(task.getTaskId()); - }); + List tasks = workflow.getTasks(); try { removeWorkflowIndex(workflow, archiveWorkflow); } catch (JsonProcessingException e) { throw new TransientException("Workflow can not be serialized to json", e); } - tasks.forEach( task -> { try { @@ -365,12 +360,6 @@ public void removeWorkflow(String workflowId, boolean archiveWorkflow) { } }); - try { - queueDAO.remove(DECIDER_QUEUE, workflowId); - } catch (Exception e) { - LOGGER.info("Error removing workflow: {} from decider queue", workflowId, e); - } - tasks.forEach( task -> { try { @@ -384,6 +373,12 @@ public void removeWorkflow(String workflowId, boolean archiveWorkflow) { e); } }); + + try { + queueDAO.remove(DECIDER_QUEUE, workflowId); + } catch (Exception e) { + LOGGER.info("Error removing workflow: {} from decider queue", workflowId, e); + } } private void removeWorkflowIndex(WorkflowModel workflow, boolean archiveWorkflow) diff --git a/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java b/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java index 33dfe92e84..aceccf2b28 100644 --- a/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java +++ b/core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java @@ -148,6 +148,8 @@ public void testRemoveWorkflow() { when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); executionDAOFacade.removeWorkflow("workflowId", false); + verify(executionDAO, times(1)).removeWorkflow(anyString()); + verify(executionDAO, never()).removeTask(anyString()); verify(indexDAO, never()).updateWorkflow(anyString(), any(), any()); verify(indexDAO, never()).updateTask(anyString(), anyString(), any(), any()); verify(indexDAO, times(1)).asyncRemoveWorkflow(anyString()); @@ -161,6 +163,8 @@ public void testArchiveWorkflow() throws Exception { when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); executionDAOFacade.removeWorkflow("workflowId", true); + verify(executionDAO, times(1)).removeWorkflow(anyString()); + verify(executionDAO, never()).removeTask(anyString()); verify(indexDAO, times(1)).updateWorkflow(anyString(), any(), any()); verify(indexDAO, times(15)).updateTask(anyString(), anyString(), any(), any()); verify(indexDAO, never()).removeWorkflow(anyString()); From 19d628797c71e64f389fc540b52cbd336f717a26 Mon Sep 17 00:00:00 2001 From: RemcoBuddelmeijer Date: Tue, 18 Oct 2022 02:24:36 +0200 Subject: [PATCH 7/8] Remove archival of workflow tasks from QueueResiliencySpec --- .../resiliency/QueueResiliencySpec.groovy | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy index d25da524ed..44f7ade064 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy @@ -272,39 +272,6 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { thrown(NotFoundException.class) } - def "Verify archive workflow succeeds when QueueDAO is unavailable"() { - when: "Start a simple workflow" - def workflowInstanceId = workflowResource.startWorkflow(new StartWorkflowRequest() - .withName(SIMPLE_TWO_TASK_WORKFLOW) - .withVersion(1)) - then: "Verify workflow is started" - - with(workflowResource.getExecutionStatus(workflowInstanceId, true)) { - status == Workflow.WorkflowStatus.RUNNING - tasks.size() == 1 - tasks[0].taskType == 'integration_task_1' - tasks[0].status == Task.Status.SCHEDULED - } - - when: "We get a workflow when QueueDAO is unavailable" - workflowResource.delete(workflowInstanceId, true) - - then: "Verify queueDAO is called to remove from _deciderQueue" - 1 * queueDAO.remove(Utils.DECIDER_QUEUE, _) - - when: "We try to get deleted workflow, verify the status and check if tasks are removed from queue" - with(workflowResource.getExecutionStatus(workflowInstanceId, true)) { - status == Workflow.WorkflowStatus.TERMINATED - tasks.size() == 1 - tasks[0].taskType == 'integration_task_1' - tasks[0].status == Task.Status.CANCELED - 1 * queueDAO.remove(QueueUtils.getQueueName(tasks[0]), _) - } - - then: - thrown(NotFoundException.class) - } - def "Verify decide succeeds when QueueDAO is unavailable"() { when: "Start a simple workflow" def workflowInstanceId = workflowResource.startWorkflow(new StartWorkflowRequest() From fa7c2103306391de50c03404d518eba1b1837c1e Mon Sep 17 00:00:00 2001 From: RemcoBuddelmeijer Date: Wed, 19 Oct 2022 00:55:27 +0200 Subject: [PATCH 8/8] Merged loops for removing task index and queue removal (#1505) --- .../core/dal/ExecutionDAOFacade.java | 53 +++++++++---------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index d5571d3170..c8bb0e6b5d 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -340,39 +340,36 @@ public void removeWorkflow(String workflowId, boolean archiveWorkflow) { WorkflowModel workflow = getWorkflowModelFromDataStore(workflowId, true); executionDAO.removeWorkflow(workflowId); - - List tasks = workflow.getTasks(); try { removeWorkflowIndex(workflow, archiveWorkflow); } catch (JsonProcessingException e) { throw new TransientException("Workflow can not be serialized to json", e); } - tasks.forEach( - task -> { - try { - removeTaskIndex(workflow, task, archiveWorkflow); - } catch (JsonProcessingException e) { - throw new TransientException( - String.format( - "Task %s of workflow %s can not be serialized to json", - task.getTaskId(), workflow.getWorkflowId()), - e); - } - }); - - tasks.forEach( - task -> { - try { - queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()); - } catch (Exception e) { - LOGGER.info( - "Error removing task: {} of workflow: {} from {} queue", - workflowId, - task.getTaskId(), - QueueUtils.getQueueName(task), - e); - } - }); + + workflow.getTasks() + .forEach( + task -> { + try { + removeTaskIndex(workflow, task, archiveWorkflow); + } catch (JsonProcessingException e) { + throw new TransientException( + String.format( + "Task %s of workflow %s can not be serialized to json", + task.getTaskId(), workflow.getWorkflowId()), + e); + } + + try { + queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()); + } catch (Exception e) { + LOGGER.info( + "Error removing task: {} of workflow: {} from {} queue", + workflowId, + task.getTaskId(), + QueueUtils.getQueueName(task), + e); + } + }); try { queueDAO.remove(DECIDER_QUEUE, workflowId);