diff --git a/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java b/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java index 815ec338f8..fdcb1a0468 100644 --- a/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java +++ b/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java @@ -277,15 +277,18 @@ private void finalizeTask(Task task, Throwable throwable) { private void updateTaskResult(int count, Task task, TaskResult result, Worker worker) { try { - TaskResult finalResult = + // upload if necessary + Optional optionalExternalStorageLocation = retryOperation( - (TaskResult taskResult) -> { - taskClient.evaluateAndUploadLargePayload( - taskResult, task.getTaskType()); - return null; - }, + (TaskResult taskResult) -> upload(taskResult, task.getTaskType()), count, - result); + result, + "evaluateAndUploadLargePayload"); + + if (optionalExternalStorageLocation.isPresent()) { + result.setExternalOutputPayloadStoragePath(optionalExternalStorageLocation.get()); + result.setOutputData(null); + } retryOperation( (TaskResult taskResult) -> { @@ -293,7 +296,8 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo return null; }, count, - finalResult); + result, + "updateTask"); } catch (Exception e) { worker.onErrorUpdate(task); MetricsContainer.incrementTaskUpdateErrorCount(worker.getTaskDefName(), e); @@ -305,14 +309,22 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo } } - private TaskResult retryOperation( - Function operation, int count, TaskResult result) { + private Optional upload(TaskResult result, String taskType) { + try { + return taskClient.evaluateAndUploadLargePayload(result.getOutputData(), taskType); + } catch (IllegalArgumentException iae) { + result.setReasonForIncompletion(iae.getMessage()); + result.setOutputData(null); + result.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR); + return Optional.empty(); + } + } + + private R retryOperation(Function operation, int count, T input, String opName) { int index = 0; while (index < count) { try { - TaskResult taskResult = result.copy(); - operation.apply(taskResult); - return taskResult; + return operation.apply(input); } catch (Exception e) { index++; try { @@ -322,8 +334,7 @@ private TaskResult retryOperation( } } } - throw new RuntimeException( - String.format("Exhausted retries for updating task: %s", result.getTaskId())); + throw new RuntimeException("Exhausted retries performing " + opName); } private void handleException(Throwable t, TaskResult result, Worker worker, Task task) { diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index 935fc45bd4..a21c434546 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -238,14 +238,10 @@ public void updateTask(TaskResult taskResult) { postForEntityWithRequestOnly("tasks", taskResult); } - public void evaluateAndUploadLargePayload(TaskResult taskResult, String taskType) { - Preconditions.checkNotNull(taskResult, "Task result cannot be null"); - Preconditions.checkArgument( - StringUtils.isBlank(taskResult.getExternalOutputPayloadStoragePath()), - "External Storage Path must not be set"); - + public Optional evaluateAndUploadLargePayload( + Map taskOutputData, String taskType) { try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { - objectMapper.writeValue(byteArrayOutputStream, taskResult.getOutputData()); + objectMapper.writeValue(byteArrayOutputStream, taskOutputData); byte[] taskOutputBytes = byteArrayOutputStream.toByteArray(); long taskResultSize = taskOutputBytes.length; MetricsContainer.recordTaskResultPayloadSize(taskType, taskResultSize); @@ -257,30 +253,22 @@ public void evaluateAndUploadLargePayload(TaskResult taskResult, String taskType || taskResultSize > conductorClientConfiguration.getTaskOutputMaxPayloadThresholdKB() * 1024L) { - taskResult.setReasonForIncompletion( + throw new IllegalArgumentException( String.format( "The TaskResult payload size: %d is greater than the permissible %d bytes", taskResultSize, payloadSizeThreshold)); - taskResult.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR); - taskResult.setOutputData(null); - } else { - MetricsContainer.incrementExternalPayloadUsedCount( - taskType, - ExternalPayloadStorage.Operation.WRITE.name(), - ExternalPayloadStorage.PayloadType.TASK_OUTPUT.name()); - String externalStoragePath = - uploadToExternalPayloadStorage( - ExternalPayloadStorage.PayloadType.TASK_OUTPUT, - taskOutputBytes, - taskResultSize); - taskResult.setExternalOutputPayloadStoragePath(externalStoragePath); - taskResult.setOutputData(null); } + MetricsContainer.incrementExternalPayloadUsedCount( + taskType, + ExternalPayloadStorage.Operation.WRITE.name(), + ExternalPayloadStorage.PayloadType.TASK_OUTPUT.name()); + return Optional.of( + uploadToExternalPayloadStorage( + PayloadType.TASK_OUTPUT, taskOutputBytes, taskResultSize)); } + return Optional.empty(); } catch (IOException e) { - String errorMsg = - String.format( - "Unable to update task: %s with task result", taskResult.getTaskId()); + String errorMsg = String.format("Unable to update task: %s with task result", taskType); LOGGER.error(errorMsg, e); throw new ConductorClientException(errorMsg, e); } diff --git a/client/src/test/java/com/netflix/conductor/client/automator/TaskPollExecutorTest.java b/client/src/test/java/com/netflix/conductor/client/automator/TaskPollExecutorTest.java index d4029d2e2b..19791e3855 100644 --- a/client/src/test/java/com/netflix/conductor/client/automator/TaskPollExecutorTest.java +++ b/client/src/test/java/com/netflix/conductor/client/automator/TaskPollExecutorTest.java @@ -14,6 +14,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.*; @@ -136,6 +137,7 @@ public TaskResult answer(InvocationOnMock invocation) { verify(taskClient, times(3)).updateTask(any()); } + @SuppressWarnings("unchecked") @Test public void testLargePayloadCanFailUpdateWithRetry() throws InterruptedException { Task task = testTask(); @@ -158,7 +160,7 @@ public void testLargePayloadCanFailUpdateWithRetry() throws InterruptedException throw new ConductorClientException(); }) .when(taskClient) - .evaluateAndUploadLargePayload(any(TaskResult.class), any()); + .evaluateAndUploadLargePayload(any(Map.class), any()); TaskPollExecutor taskPollExecutor = new TaskPollExecutor( @@ -181,6 +183,50 @@ public void testLargePayloadCanFailUpdateWithRetry() throws InterruptedException verify(taskClient, times(0)).updateTask(any()); } + @Test + public void testLargePayloadLocationUpdate() throws InterruptedException { + Task task = testTask(); + String largePayloadLocation = "large_payload_location"; + + Worker worker = mock(Worker.class); + when(worker.getPollingInterval()).thenReturn(3000); + when(worker.getTaskDefName()).thenReturn(TEST_TASK_DEF_NAME); + when(worker.execute(any())).thenReturn(new TaskResult(task)); + + TaskClient taskClient = Mockito.mock(TaskClient.class); + when(taskClient.pollTask(any(), any(), any())).thenReturn(task); + when(taskClient.ack(any(), any())).thenReturn(true); + //noinspection unchecked + when(taskClient.evaluateAndUploadLargePayload(any(Map.class), any())) + .thenReturn(Optional.of(largePayloadLocation)); + + TaskPollExecutor taskPollExecutor = + new TaskPollExecutor( + null, taskClient, 1, 3, new HashMap<>(), "test-worker-", new HashMap<>()); + CountDownLatch latch = new CountDownLatch(1); + + doAnswer( + invocation -> { + Object[] args = invocation.getArguments(); + TaskResult result = (TaskResult) args[0]; + assertNull(result.getOutputData()); + assertEquals( + largePayloadLocation, + result.getExternalOutputPayloadStoragePath()); + latch.countDown(); + return null; + }) + .when(taskClient) + .updateTask(any()); + + Executors.newSingleThreadScheduledExecutor() + .scheduleAtFixedRate( + () -> taskPollExecutor.pollAndExecute(worker), 0, 1, TimeUnit.SECONDS); + latch.await(); + + verify(taskClient, times(1)).updateTask(any()); + } + @Test public void testTaskPollException() throws InterruptedException { Task task = testTask(); diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java index 9d902ffe82..d1628ea616 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java @@ -311,25 +311,4 @@ public static TaskResult newTaskResult(Status status) { result.setStatus(status); return result; } - - /** - * Copy the given task result object - * - * @return a deep copy of the task result object except the externalOutputPayloadStoragePath - * field - */ - public TaskResult copy() { - TaskResult taskResult = new TaskResult(); - taskResult.setWorkflowInstanceId(workflowInstanceId); - taskResult.setTaskId(taskId); - taskResult.setReasonForIncompletion(reasonForIncompletion); - taskResult.setCallbackAfterSeconds(callbackAfterSeconds); - taskResult.setWorkerId(workerId); - taskResult.setStatus(status); - taskResult.setOutputData(outputData); - taskResult.setOutputMessage(outputMessage); - taskResult.setLogs(logs); - taskResult.setSubWorkflowId(subWorkflowId); - return taskResult; - } }