diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index 7e27db0afd..141eee1829 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -9,12 +9,12 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.processor.Processor; -import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; -import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.InternalEventHandle; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.pipeline.common.FutureHelper; import org.opensearch.dataprepper.pipeline.common.FutureHelperResult; import org.slf4j.Logger; @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.ArrayList; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; @@ -129,7 +128,7 @@ private void doRun() { List inputEvents = null; if (acknowledgementsEnabled) { - inputEvents = ((ArrayList>) records).stream().map(Record::getData).collect(Collectors.toList()); + inputEvents = ((List>) records).stream().map(Record::getData).collect(Collectors.toList()); } try { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java index 9b31b20691..3d13c0d49f 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java @@ -208,4 +208,47 @@ void testProcessWorkerWithProcessorThrowingExceptionAndAcknowledgmentsEnabledIsH verify(skippedProcessor, never()).execute(any()); } + + @Test + void testProcessWorkerWithProcessorDroppingAllRecordsAndAcknowledgmentsEnabledIsHandledProperly() { + + when(source.areAcknowledgementsEnabled()).thenReturn(true); + + final List> records = new ArrayList<>(); + final Record mockRecord = mock(Record.class); + final Event mockEvent = mock(Event.class); + final EventHandle eventHandle = mock(DefaultEventHandle.class); + when(((DefaultEventHandle) eventHandle).getAcknowledgementSet()).thenReturn(mock(AcknowledgementSet.class)); + doNothing().when(eventHandle).release(true); + when(mockRecord.getData()).thenReturn(mockEvent); + when(mockEvent.getEventHandle()).thenReturn(eventHandle); + + records.add(mockRecord); + + final CheckpointState checkpointState = mock(CheckpointState.class); + final Map.Entry readResult = Map.entry(records, checkpointState); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult); + + final Processor processor = mock(Processor.class); + when(processor.execute(records)).thenReturn(Collections.emptyList()); + when(processor.isReadyForShutdown()).thenReturn(true); + + final Processor secondProcessor = mock(Processor.class); + when(secondProcessor.isReadyForShutdown()).thenReturn(true); + when(secondProcessor.execute(Collections.emptyList())).thenReturn(Collections.emptyList()); + processors = List.of(processor, secondProcessor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + + + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = createObjectUnderTest(); + + processWorker.run(); + } + } }