Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Remove tasks from Elasticsearch when workflow is removed #3300

Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ private void populateWorkflowOutput(Workflow workflow) {
* Removes a workflow from the system
*
* @param workflowId the id of the workflow to be deleted
* @param archiveWorkflow flag to indicate if the workflow should be archived before deletion
* @param archiveWorkflow flag to indicate if the workflow and associated tasks should be
* archived before deletion
*/
public void deleteWorkflow(String workflowId, boolean archiveWorkflow) {
Validate.notBlank(workflowId, "Workflow id cannot be blank");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.*;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
Expand Down Expand Up @@ -332,19 +333,43 @@ public void removeFromPendingWorkflow(String workflowType, String workflowId) {
* Removes the workflow from the data store.
*
* @param workflowId the id of the workflow to be removed
* @param archiveWorkflow if true, the workflow will be archived in the {@link IndexDAO} after
* removal from {@link ExecutionDAO}
* @param archiveWorkflow if true, the workflow and associated tasks will be archived in the
* {@link IndexDAO} after removal from {@link ExecutionDAO}.
*/
public void removeWorkflow(String workflowId, boolean archiveWorkflow) {
WorkflowModel workflow = getWorkflowModelFromDataStore(workflowId, true);

executionDAO.removeWorkflow(workflowId);
try {
removeWorkflowIndex(workflow, archiveWorkflow);
} catch (JsonProcessingException e) {
throw new TransientException("Workflow can not be serialized to json", e);
}

executionDAO.removeWorkflow(workflowId);
workflow.getTasks()
.forEach(
task -> {
try {
removeTaskIndex(workflow, task, archiveWorkflow);
} catch (JsonProcessingException e) {
throw new TransientException(
String.format(
"Task %s of workflow %s can not be serialized to json",
task.getTaskId(), workflow.getWorkflowId()),
e);
}

try {
queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId());
} catch (Exception e) {
LOGGER.info(
"Error removing task: {} of workflow: {} from {} queue",
workflowId,
task.getTaskId(),
QueueUtils.getQueueName(task),
e);
}
});

try {
queueDAO.remove(DECIDER_QUEUE, workflowId);
Expand Down Expand Up @@ -509,6 +534,29 @@ public void removeTask(String taskId) {
executionDAO.removeTask(taskId);
}

private void removeTaskIndex(WorkflowModel workflow, TaskModel task, boolean archiveTask)
throws JsonProcessingException {
if (archiveTask) {
if (task.getStatus().isTerminal()) {
// Only allow archival if task is in terminal state
// DO NOT archive async, since if archival errors out, task data will be lost
indexDAO.updateTask(
RemcoBuddelmeijer marked this conversation as resolved.
Show resolved Hide resolved
workflow.getWorkflowId(),
task.getTaskId(),
new String[] {ARCHIVED_FIELD},
new Object[] {true});
} else {
throw new IllegalArgumentException(
String.format(
"Cannot archive task: %s of workflow: %s with status: %s",
task.getTaskId(), workflow.getWorkflowId(), task.getStatus()));
}
} else {
// Not archiving, remove task from index
indexDAO.asyncRemoveTask(workflow.getWorkflowId(), task.getTaskId());
}
}

public void extendLease(TaskModel taskModel) {
taskModel.setUpdateTime(System.currentTimeMillis());
executionDAO.updateTask(taskModel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ public CompletableFuture<Void> asyncUpdateWorkflow(
return CompletableFuture.completedFuture(null);
}

@Override
public void removeTask(String workflowId, String taskId) {}

@Override
public CompletableFuture<Void> asyncRemoveTask(String workflowId, String taskId) {
return CompletableFuture.completedFuture(null);
}

@Override
public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) {}

@Override
public CompletableFuture<Void> asyncUpdateTask(
String workflowId, String taskId, String[] keys, Object[] values) {
return CompletableFuture.completedFuture(null);
}

@Override
public String get(String workflowInstanceId, String key) {
return null;
Expand Down
39 changes: 39 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/IndexDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,45 @@ SearchResult<String> searchTasks(
CompletableFuture<Void> asyncUpdateWorkflow(
String workflowInstanceId, String[] keys, Object[] values);

/**
* Remove the task index
*
* @param workflowId workflow containing task
* @param taskId task to be removed
*/
void removeTask(String workflowId, String taskId);

/**
* Remove the task index asynchronously
*
* @param workflowId workflow containing task
* @param taskId task to be removed
* @return CompletableFuture of type void
*/
CompletableFuture<Void> asyncRemoveTask(String workflowId, String taskId);

/**
* Updates the index
*
* @param workflowId id of the workflow
* @param taskId id of the task
* @param keys keys to be updated
* @param values values. Number of keys and values MUST match.
*/
void updateTask(String workflowId, String taskId, String[] keys, Object[] values);

/**
* Updates the index
*
* @param workflowId id of the workflow
* @param taskId id of the task
* @param keys keys to be updated
* @param values values. Number of keys and values MUST match.
* @return CompletableFuture of type void
*/
CompletableFuture<Void> asyncUpdateTask(
String workflowId, String taskId, String[] keys, Object[] values);

/**
* Retrieves a specific field from the index
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ Workflow getExecutionStatus(
* Removes the workflow from the system.
*
* @param workflowId WorkflowID of the workflow you want to remove from system.
* @param archiveWorkflow Archives the workflow.
* @param archiveWorkflow Archives the workflow and associated tasks instead of removing them.
*/
void deleteWorkflow(
@NotEmpty(message = "WorkflowId cannot be null or empty.") String workflowId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public Workflow getExecutionStatus(String workflowId, boolean includeTasks) {
* Removes the workflow from the system.
*
* @param workflowId WorkflowID of the workflow you want to remove from system.
* @param archiveWorkflow Archives the workflow.
* @param archiveWorkflow Archives the workflow and associated tasks instead of removing them.
*/
public void deleteWorkflow(String workflowId, boolean archiveWorkflow) {
executionService.removeWorkflow(workflowId, archiveWorkflow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,21 @@ public void testGetWorkflowsByCorrelationId() {
@Test
public void testRemoveWorkflow() {
WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId("workflowId");
workflow.setStatus(WorkflowModel.Status.COMPLETED);

TaskModel task = new TaskModel();
task.setTaskId("taskId");
workflow.setTasks(Collections.singletonList(task));

when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow);
executionDAOFacade.removeWorkflow("workflowId", false);
verify(indexDAO, never()).updateWorkflow(any(), any(), any());
verify(indexDAO, times(1)).asyncRemoveWorkflow(workflow.getWorkflowId());
verify(executionDAO, times(1)).removeWorkflow(anyString());
verify(executionDAO, never()).removeTask(anyString());
verify(indexDAO, never()).updateWorkflow(anyString(), any(), any());
verify(indexDAO, never()).updateTask(anyString(), anyString(), any(), any());
verify(indexDAO, times(1)).asyncRemoveWorkflow(anyString());
verify(indexDAO, times(1)).asyncRemoveTask(anyString(), anyString());
}

@Test
Expand All @@ -153,8 +163,12 @@ public void testArchiveWorkflow() throws Exception {

when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow);
executionDAOFacade.removeWorkflow("workflowId", true);
verify(indexDAO, times(1)).updateWorkflow(any(), any(), any());
verify(indexDAO, never()).removeWorkflow(any());
verify(executionDAO, times(1)).removeWorkflow(anyString());
verify(executionDAO, never()).removeTask(anyString());
verify(indexDAO, times(1)).updateWorkflow(anyString(), any(), any());
verify(indexDAO, times(15)).updateTask(anyString(), anyString(), any(), any());
verify(indexDAO, never()).removeWorkflow(anyString());
verify(indexDAO, never()).removeTask(anyString(), anyString());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,7 @@
import static com.netflix.conductor.TestUtils.getConstraintViolationMessages;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -184,12 +179,30 @@ public void testNotFoundExceptionGetExecutionStatus() {

@Test
public void testDeleteWorkflow() {
workflowService.deleteWorkflow("w123", true);
verify(executionService, times(1)).removeWorkflow(anyString(), anyBoolean());
workflowService.deleteWorkflow("w123", false);
verify(executionService, times(1)).removeWorkflow(anyString(), eq(false));
}

@Test(expected = ConstraintViolationException.class)
public void testInvalidDeleteWorkflow() {
try {
workflowService.deleteWorkflow(null, false);
} catch (ConstraintViolationException ex) {
assertEquals(1, ex.getConstraintViolations().size());
Set<String> messages = getConstraintViolationMessages(ex.getConstraintViolations());
assertTrue(messages.contains("WorkflowId cannot be null or empty."));
throw ex;
}
}

@Test
public void testArchiveWorkflow() {
workflowService.deleteWorkflow("w123", true);
verify(executionService, times(1)).removeWorkflow(anyString(), eq(true));
}

@Test(expected = ConstraintViolationException.class)
public void testInvalidArchiveWorkflow() {
try {
workflowService.deleteWorkflow(null, true);
} catch (ConstraintViolationException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,98 @@ public CompletableFuture<Void> asyncUpdateWorkflow(
() -> updateWorkflow(workflowInstanceId, keys, values), executorService);
}

@Override
public void removeTask(String workflowId, String taskId) {
try {
long startTime = Instant.now().toEpochMilli();
String docType = StringUtils.isBlank(docTypeOverride) ? TASK_DOC_TYPE : docTypeOverride;

SearchResult<String> taskSearchResult =
searchTasks(
String.format(
"(taskId='%s') AND (workflowId='%s')", taskId, workflowId),
"*",
0,
1,
null);

if (taskSearchResult.getTotalHits() == 0) {
LOGGER.error("Task: {} does not belong to workflow: {}", taskId, workflowId);
Monitors.error(CLASS_NAME, "removeTask");
return;
}

DeleteRequest request = new DeleteRequest(taskIndexName, docType, taskId);
DeleteResponse response = elasticSearchClient.delete(request).actionGet();
long endTime = Instant.now().toEpochMilli();

if (response.getResult() != DocWriteResponse.Result.DELETED) {
LOGGER.error(
"Index removal failed - task not found by id: {} of workflow: {}",
taskId,
workflowId);
Monitors.error(CLASS_NAME, "removeTask");
return;
}
LOGGER.debug(
"Time taken {} for removing task:{} of workflow: {}",
endTime - startTime,
taskId,
workflowId);
Monitors.recordESIndexTime("remove_task", docType, endTime - startTime);
Monitors.recordWorkerQueueSize(
"indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size());
} catch (Exception e) {
LOGGER.error(
"Failed to remove task: {} of workflow: {} from index", taskId, workflowId, e);
Monitors.error(CLASS_NAME, "removeTask");
}
}

@Override
public CompletableFuture<Void> asyncRemoveTask(String workflowId, String taskId) {
return CompletableFuture.runAsync(() -> removeTask(workflowId, taskId), executorService);
}

@Override
public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) {
if (keys.length != values.length) {
throw new IllegalArgumentException("Number of keys and values do not match");
}

long startTime = Instant.now().toEpochMilli();
String docType = StringUtils.isBlank(docTypeOverride) ? TASK_DOC_TYPE : docTypeOverride;

UpdateRequest request = new UpdateRequest(taskIndexName, docType, taskId);
Map<String, Object> source =
IntStream.range(0, keys.length)
.boxed()
.collect(Collectors.toMap(i -> keys[i], i -> values[i]));
request.doc(source);
LOGGER.debug(
"Updating task: {} of workflow: {} in elasticsearch index: {}",
taskId,
workflowId,
taskIndexName);
elasticSearchClient.update(request).actionGet();
long endTime = Instant.now().toEpochMilli();
LOGGER.debug(
"Time taken {} for updating task: {} of workflow: {}",
endTime - startTime,
taskId,
workflowId);
Monitors.recordESIndexTime("update_task", docType, endTime - startTime);
Monitors.recordWorkerQueueSize(
"indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size());
}

@Override
public CompletableFuture<Void> asyncUpdateTask(
String workflowId, String taskId, String[] keys, Object[] values) {
return CompletableFuture.runAsync(
() -> updateTask(workflowId, taskId, keys, values), executorService);
}

@Override
public String get(String workflowInstanceId, String fieldToGet) {
String docType = StringUtils.isBlank(docTypeOverride) ? WORKFLOW_DOC_TYPE : docTypeOverride;
Expand Down
Loading