diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java index 2e606a7487..9a958e035d 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java @@ -55,6 +55,7 @@ import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey; +import org.opensearch.dataprepper.plugins.sink.s3.codec.CodecFactory; import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; @@ -139,6 +140,9 @@ class S3SinkServiceIT { private OutputCodec codec; private KeyGenerator keyGenerator; + @Mock + private CodecFactory codecFactory; + @Mock NdjsonOutputConfig ndjsonOutputConfig; @@ -177,6 +181,8 @@ public void setUp() { @Test void verify_flushed_object_count_into_s3_bucket() { configureNewLineCodec(); + when(codecFactory.provideCodec()).thenReturn(codec); + int s3ObjectCountBeforeIngest = gets3ObjectCount(); S3SinkService s3SinkService = createObjectUnderTest(); s3SinkService.output(setEventQueue()); @@ -192,6 +198,8 @@ void configureNewLineCodec() { @Test void verify_flushed_records_into_s3_bucketNewLine() throws JsonProcessingException { configureNewLineCodec(); + + when(codecFactory.provideCodec()).thenReturn(codec); S3SinkService s3SinkService = createObjectUnderTest(); Collection> recordsData = setEventQueue(); @@ -221,6 +229,8 @@ void verify_flushed_records_into_s3_bucketNewLine() throws JsonProcessingExcepti @Test void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOException { configureNewLineCodec(); + when(codecFactory.provideCodec()).thenReturn(codec); + bufferFactory = new CompressionBufferFactory(bufferFactory, CompressionOption.GZIP.getCompressionEngine(), codec); S3SinkService s3SinkService = createObjectUnderTest(); Collection> recordsData = setEventQueue(); @@ -256,9 +266,9 @@ void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOEx private S3SinkService createObjectUnderTest() { OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList()); final S3GroupIdentifierFactory groupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig); - s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, s3Client); + s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, codecFactory, s3Client); - return new S3SinkService(s3SinkConfig, codec, codecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics, s3GroupManager); + return new S3SinkService(s3SinkConfig, codecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics, s3GroupManager); } private int gets3ObjectCount() { @@ -351,6 +361,8 @@ private static List generateRecords(int numberOfRecords) { @Disabled void verify_flushed_records_into_s3_bucket_Parquet() throws IOException { configureParquetCodec(); + when(codecFactory.provideCodec()).thenReturn(codec); + S3SinkService s3SinkService = createObjectUnderTest(); Collection> recordsData = getRecordList(); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index 73750af754..38db7cba7e 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -26,6 +26,7 @@ import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CodecBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.codec.BufferedCodec; +import org.opensearch.dataprepper.plugins.sink.s3.codec.CodecFactory; import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupIdentifierFactory; @@ -48,7 +49,6 @@ public class S3Sink extends AbstractSink> { private static final Duration RETRY_FLUSH_BACKOFF = Duration.ofSeconds(5); private final S3SinkConfig s3SinkConfig; - private final OutputCodec codec; private volatile boolean sinkInitialized; private final S3SinkService s3SinkService; private final BufferFactory bufferFactory; @@ -70,24 +70,26 @@ public S3Sink(final PluginSetting pluginSetting, this.s3SinkConfig = s3SinkConfig; this.sinkContext = sinkContext; final PluginModel codecConfiguration = s3SinkConfig.getCodec(); + final CodecFactory codecFactory = new CodecFactory(pluginFactory, codecConfiguration); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); - codec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings); + final OutputCodec testCodec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings); sinkInitialized = Boolean.FALSE; final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier); BufferFactory innerBufferFactory = s3SinkConfig.getBufferType().getBufferFactory(); - if(codec instanceof ParquetOutputCodec && s3SinkConfig.getBufferType() != BufferTypeOptions.INMEMORY) { + if(testCodec instanceof ParquetOutputCodec && s3SinkConfig.getBufferType() != BufferTypeOptions.INMEMORY) { throw new InvalidPluginConfigurationException("The Parquet sink codec is an in_memory buffer only."); } - if(codec instanceof BufferedCodec) { - innerBufferFactory = new CodecBufferFactory(innerBufferFactory, (BufferedCodec) codec); + if(testCodec instanceof BufferedCodec) { + innerBufferFactory = new CodecBufferFactory(innerBufferFactory, (BufferedCodec) testCodec); } CompressionOption compressionOption = s3SinkConfig.getCompression(); final CompressionEngine compressionEngine = compressionOption.getCompressionEngine(); - bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine, codec); + bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine, testCodec); - ExtensionProvider extensionProvider = StandardExtensionProvider.create(codec, compressionOption); + ExtensionProvider extensionProvider = StandardExtensionProvider.create(testCodec, compressionOption); KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, extensionProvider, expressionEvaluator); if (s3SinkConfig.getObjectKeyOptions().getPathPrefix() != null && @@ -102,13 +104,13 @@ public S3Sink(final PluginSetting pluginSetting, S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption); - codec.validateAgainstCodecContext(s3OutputCodecContext); + testCodec.validateAgainstCodecContext(s3OutputCodecContext); final S3GroupIdentifierFactory s3GroupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig); - final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, s3Client); + final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client); - s3SinkService = new S3SinkService(s3SinkConfig, codec, s3OutputCodecContext, s3Client, keyGenerator, RETRY_FLUSH_BACKOFF, pluginMetrics, s3GroupManager); + s3SinkService = new S3SinkService(s3SinkConfig, s3OutputCodecContext, s3Client, keyGenerator, RETRY_FLUSH_BACKOFF, pluginMetrics, s3GroupManager); } @Override diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index 90aa5f4d07..15c8d71177 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -48,7 +48,6 @@ public class S3SinkService { static final String S3_OBJECTS_SIZE = "s3SinkObjectSizeBytes"; private final S3SinkConfig s3SinkConfig; private final Lock reentrantLock; - private final OutputCodec codec; private final S3Client s3Client; private final int maxEvents; private final ByteCount maxBytes; @@ -70,15 +69,13 @@ public class S3SinkService { /** * @param s3SinkConfig s3 sink related configuration. - * @param codec parser. * @param s3Client * @param pluginMetrics metrics. */ - public S3SinkService(final S3SinkConfig s3SinkConfig, final OutputCodec codec, + public S3SinkService(final S3SinkConfig s3SinkConfig, final OutputCodecContext codecContext, final S3Client s3Client, final KeyGenerator keyGenerator, final Duration retrySleepTime, final PluginMetrics pluginMetrics, final S3GroupManager s3GroupManager) { this.s3SinkConfig = s3SinkConfig; - this.codec = codec; this.s3Client = s3Client; this.codecContext = codecContext; this.keyGenerator = keyGenerator; @@ -122,6 +119,7 @@ void output(Collection> records) { try { final S3Group s3Group = s3GroupManager.getOrCreateGroupForEvent(event); final Buffer currentBuffer = s3Group.getBuffer(); + final OutputCodec codec = s3Group.getOutputCodec(); if (currentBuffer.getEventCount() == 0) { codec.start(currentBuffer.getOutputStream(), event, codecContext); @@ -178,7 +176,7 @@ private boolean flushToS3IfNeeded(final S3Group s3Group, final boolean forceFlus s3Group.getBuffer().getSize(), s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getDuration()); if (forceFlush || ThresholdCheck.checkThresholdExceed(s3Group.getBuffer(), maxEvents, maxBytes, maxCollectionDuration)) { try { - codec.complete(s3Group.getBuffer().getOutputStream()); + s3Group.getOutputCodec().complete(s3Group.getBuffer().getOutputStream()); String s3Key = s3Group.getBuffer().getKey(); LOG.info("Writing {} to S3 with {} events and size of {} bytes.", s3Key, s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getSize()); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/codec/CodecFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/codec/CodecFactory.java new file mode 100644 index 0000000000..9b09d85cf0 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/codec/CodecFactory.java @@ -0,0 +1,23 @@ +package org.opensearch.dataprepper.plugins.sink.s3.codec; + +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; + +public class CodecFactory { + + private final PluginFactory pluginFactory; + + private final PluginSetting codecPluginSetting; + + public CodecFactory(final PluginFactory pluginFactory, + final PluginModel codecConfiguration) { + this.pluginFactory = pluginFactory; + this.codecPluginSetting = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); + } + + public OutputCodec provideCodec() { + return pluginFactory.loadPlugin(OutputCodec.class, codecPluginSetting); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3Group.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3Group.java index db86df3261..b0656441a8 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3Group.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3Group.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.sink.s3.grouping; +import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; @@ -15,14 +16,18 @@ public class S3Group implements Comparable { private final Buffer buffer; + private OutputCodec outputCodec; + private final S3GroupIdentifier s3GroupIdentifier; private final Collection groupEventHandles; public S3Group(final S3GroupIdentifier s3GroupIdentifier, - final Buffer buffer) { + final Buffer buffer, + final OutputCodec outputCodec) { this.buffer = buffer; this.s3GroupIdentifier = s3GroupIdentifier; + this.outputCodec = outputCodec; this.groupEventHandles = new LinkedList<>(); } @@ -30,6 +35,10 @@ public Buffer getBuffer() { return buffer; } + public OutputCodec getOutputCodec() { + return outputCodec; + } + S3GroupIdentifier getS3GroupIdentifier() { return s3GroupIdentifier; } public void addEventHandle(final EventHandle eventHandle) { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java index 5d460f2040..63d6dde8eb 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java @@ -6,10 +6,12 @@ package org.opensearch.dataprepper.plugins.sink.s3.grouping; import com.google.common.collect.Maps; +import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.s3.codec.CodecFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; @@ -25,6 +27,9 @@ public class S3GroupManager { private final S3SinkConfig s3SinkConfig; private final S3GroupIdentifierFactory s3GroupIdentifierFactory; private final BufferFactory bufferFactory; + + private final CodecFactory codecFactory; + private final S3Client s3Client; private long totalGroupSize; @@ -33,10 +38,12 @@ public class S3GroupManager { public S3GroupManager(final S3SinkConfig s3SinkConfig, final S3GroupIdentifierFactory s3GroupIdentifierFactory, final BufferFactory bufferFactory, + final CodecFactory codecFactory, final S3Client s3Client) { this.s3SinkConfig = s3SinkConfig; this.s3GroupIdentifierFactory = s3GroupIdentifierFactory; this.bufferFactory = bufferFactory; + this.codecFactory = codecFactory; this.s3Client = s3Client; totalGroupSize = 0; } @@ -67,7 +74,8 @@ public S3Group getOrCreateGroupForEvent(final Event event) { return allGroups.get(s3GroupIdentifier); } else { final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3SinkConfig::getBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey); - final S3Group s3Group = new S3Group(s3GroupIdentifier, bufferForNewGroup); + final OutputCodec outputCodec = codecFactory.provideCodec(); + final S3Group s3Group = new S3Group(s3GroupIdentifier, bufferForNewGroup, outputCodec); allGroups.put(s3GroupIdentifier, s3Group); LOG.debug("Created a new S3 group. Total number of groups: {}", allGroups.size()); return s3Group; diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java index c021ab7c2c..4235b86473 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java @@ -156,7 +156,7 @@ private DefaultEventHandle castToDefaultHandle(EventHandle eventHandle) { } private S3SinkService createObjectUnderTest() { - return new S3SinkService(s3SinkConfig, codec, codecContext, s3Client, keyGenerator, Duration.ofMillis(100), pluginMetrics, s3GroupManager); + return new S3SinkService(s3SinkConfig, codecContext, s3Client, keyGenerator, Duration.ofMillis(100), pluginMetrics, s3GroupManager); } @Test @@ -178,6 +178,7 @@ void test_output_with_threshold_set_as_more_then_zero_event_count() throws IOExc final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); @@ -204,6 +205,7 @@ void test_output_with_threshold_set_as_zero_event_count() throws IOException { final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); @@ -225,6 +227,7 @@ void test_output_with_uploadedToS3_success() throws IOException { final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); @@ -249,6 +252,7 @@ void test_output_with_uploadedToS3_success_records_byte_count() throws IOExcepti final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); @@ -274,6 +278,7 @@ void test_output_with_uploadedToS3_midBatch_generatesNewOutputStream() throws IO final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); @@ -301,6 +306,7 @@ void test_output_with_uploadedToS3_failed() throws IOException { doNothing().when(codec).writeEvent(event, outputStream); final S3Group s3Group = mock(S3Group.class); + when(s3Group.getOutputCodec()).thenReturn(codec); Buffer buffer = mock(Buffer.class); when(s3Group.getBuffer()).thenReturn(buffer); @@ -328,6 +334,7 @@ void test_output_with_uploadedToS3_failure_does_not_record_byte_count() throws I final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); @@ -351,6 +358,7 @@ void test_output_with_no_incoming_records_flushes_batch() throws IOException { final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(event)).thenReturn(s3Group); when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); @@ -384,6 +392,7 @@ void test_retryFlushToS3_positive() throws InterruptedException, IOException { final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(event)).thenReturn(s3Group); when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); @@ -404,6 +413,7 @@ void test_retryFlushToS3_negative() throws InterruptedException, IOException { final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(event)).thenReturn(s3Group); when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); @@ -427,6 +437,7 @@ void output_will_release_all_handles_since_a_flush() throws IOException { final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); @@ -455,6 +466,7 @@ void output_will_skip_releasing_events_without_EventHandle_objects() throws IOEx final Event event1 = JacksonEvent.fromMessage(UUID.randomUUID().toString()); final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); @@ -495,6 +507,7 @@ void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOExce final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); @@ -523,6 +536,7 @@ void output_will_release_only_new_handles_since_a_flush() throws IOException { final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); @@ -562,6 +576,7 @@ void output_will_skip_and_drop_failed_records() throws IOException { Event event2 = records.get(1).getData(); final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); when(s3GroupManager.getS3GroupEntries()).thenReturn(List.of(s3Group)); @@ -600,6 +615,7 @@ void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws I final OutputStream outputStream = mock(OutputStream.class); final S3Group s3Group = mock(S3Group.class); when(s3Group.getBuffer()).thenReturn(buffer); + when(s3Group.getOutputCodec()).thenReturn(codec); when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); @@ -644,6 +660,7 @@ void output_will_flush_the_largest_group_until_below_aggregate_threshold_when_ag final Event firstGroupEvent = mock(Event.class); final S3Group firstGroup = mock(S3Group.class); final Buffer firstGroupBuffer = mock(Buffer.class); + when(firstGroup.getOutputCodec()).thenReturn(codec); when(firstGroupBuffer.getOutputStream()).thenReturn(mock(OutputStream.class)); when(firstGroupBuffer.getSize()).thenReturn(bufferOneSize); when(firstGroup.getBuffer()).thenReturn(firstGroupBuffer); @@ -652,6 +669,7 @@ void output_will_flush_the_largest_group_until_below_aggregate_threshold_when_ag final Event secondGroupEvent = mock(Event.class); final S3Group secondGroup = mock(S3Group.class); final Buffer secondGroupBuffer = mock(Buffer.class); + when(secondGroup.getOutputCodec()).thenReturn(codec); when(secondGroupBuffer.getSize()).thenReturn(bufferTwoSize); when(secondGroupBuffer.getOutputStream()).thenReturn(mock(OutputStream.class)); when(secondGroup.getBuffer()).thenReturn(secondGroupBuffer); @@ -659,6 +677,8 @@ void output_will_flush_the_largest_group_until_below_aggregate_threshold_when_ag final Event thirdGroupEvent = mock(Event.class); final S3Group thirdGroup = mock(S3Group.class); + when(thirdGroup.getOutputCodec()).thenReturn(codec); + final Buffer thirdGroupBuffer = mock(Buffer.class); when(thirdGroupBuffer.getSize()).thenReturn(bufferThreeSize); when(thirdGroupBuffer.getOutputStream()).thenReturn(mock(OutputStream.class)); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java index a6cf9d122d..ba9dd2af1d 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java @@ -9,10 +9,12 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.s3.codec.CodecFactory; import software.amazon.awssdk.services.s3.S3Client; import java.util.Collection; @@ -41,11 +43,14 @@ public class S3GroupManagerTest { @Mock private BufferFactory bufferFactory; + @Mock + private CodecFactory codecFactory; + @Mock private S3Client s3Client; private S3GroupManager createObjectUnderTest() { - return new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, s3Client); + return new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client); } @Test @@ -64,6 +69,8 @@ void getOrCreateGroupForEvent_creates_expected_group_when_it_does_not_exist() { final Buffer buffer = mock(Buffer.class); when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class))) .thenReturn(buffer); + final OutputCodec outputCodec = mock(OutputCodec.class); + when(codecFactory.provideCodec()).thenReturn(outputCodec); final S3GroupManager objectUnderTest = createObjectUnderTest(); @@ -72,6 +79,7 @@ void getOrCreateGroupForEvent_creates_expected_group_when_it_does_not_exist() { assertThat(result, notNullValue()); assertThat(result.getS3GroupIdentifier(), equalTo(s3GroupIdentifier)); assertThat(result.getBuffer(), equalTo(buffer)); + assertThat(result.getOutputCodec(), equalTo(outputCodec)); final Collection groups = objectUnderTest.getS3GroupEntries(); assertThat(groups, notNullValue()); @@ -88,8 +96,10 @@ void getOrCreateGroupForEvent_returns_expected_group_when_it_exists() { when(s3GroupIdentifierFactory.getS3GroupIdentifierForEvent(event)).thenReturn(s3GroupIdentifier); final Buffer buffer = mock(Buffer.class); + final OutputCodec outputCodec = mock(OutputCodec.class); when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class))) .thenReturn(buffer); + when(codecFactory.provideCodec()).thenReturn(outputCodec); final S3GroupManager objectUnderTest = createObjectUnderTest(); @@ -98,6 +108,7 @@ void getOrCreateGroupForEvent_returns_expected_group_when_it_exists() { assertThat(result, notNullValue()); assertThat(result.getS3GroupIdentifier(), equalTo(s3GroupIdentifier)); assertThat(result.getBuffer(), equalTo(buffer)); + assertThat(result.getOutputCodec(), equalTo(outputCodec)); final Event secondEvent = mock(Event.class); when(s3GroupIdentifierFactory.getS3GroupIdentifierForEvent(secondEvent)).thenReturn(s3GroupIdentifier); @@ -147,6 +158,9 @@ void recalculateAndGetGroupSize_returns_expected_size() { when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class))) .thenReturn(buffer).thenReturn(secondBuffer).thenReturn(thirdBuffer); + final OutputCodec outputCodec = mock(OutputCodec.class); + when(codecFactory.provideCodec()).thenReturn(outputCodec); + final S3GroupManager objectUnderTest = createObjectUnderTest(); objectUnderTest.getOrCreateGroupForEvent(event); @@ -187,6 +201,13 @@ void getGroupsOrderedBySize_returns_groups_in_expected_order() { when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class))) .thenReturn(buffer).thenReturn(secondBuffer).thenReturn(thirdBuffer); + final OutputCodec firstOutputCodec = mock(OutputCodec.class); + final OutputCodec secondOutputCodec = mock(OutputCodec.class); + final OutputCodec thirdOutputCodec = mock(OutputCodec.class); + when(codecFactory.provideCodec()).thenReturn(firstOutputCodec) + .thenReturn(secondOutputCodec) + .thenReturn(thirdOutputCodec); + final S3GroupManager objectUnderTest = createObjectUnderTest(); final S3Group firstGroup = objectUnderTest.getOrCreateGroupForEvent(event); @@ -194,6 +215,9 @@ void getGroupsOrderedBySize_returns_groups_in_expected_order() { final S3Group thirdGroup = objectUnderTest.getOrCreateGroupForEvent(thirdEvent); assertThat(objectUnderTest.getNumberOfGroups(), equalTo(3)); + assertThat(firstGroup.getOutputCodec(), equalTo(firstOutputCodec)); + assertThat(secondGroup.getOutputCodec(), equalTo(secondOutputCodec)); + assertThat(thirdGroup.getOutputCodec(), equalTo(thirdOutputCodec)); final Collection sortedGroups = objectUnderTest.getS3GroupsSortedBySize(); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupTest.java index 7843dadd48..da0cf899b0 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupTest.java @@ -3,6 +3,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; @@ -22,7 +23,8 @@ public class S3GroupTest { void releasingEventHandles_releases_all_event_handles(final boolean result) { final S3GroupIdentifier s3GroupIdentifier = mock(S3GroupIdentifier.class); final Buffer buffer = mock(Buffer.class); - final S3Group objectUnderTest = new S3Group(s3GroupIdentifier, buffer); + final OutputCodec outputCodec = mock(OutputCodec.class); + final S3Group objectUnderTest = new S3Group(s3GroupIdentifier, buffer, outputCodec); final Collection eventHandles = List.of(mock(EventHandle.class), mock(EventHandle.class)); for (final EventHandle eventHandle : eventHandles) { @@ -47,9 +49,9 @@ void comparingS3GroupsReturns_expected_result_based_on_buffer_size() { final Buffer equalBuffer = mock(Buffer.class); when(equalBuffer.getSize()).thenReturn(1000L); - final S3Group smallGroup = new S3Group(mock(S3GroupIdentifier.class), smallBuffer); - final S3Group largeGroup = new S3Group(mock(S3GroupIdentifier.class), largeBuffer); - final S3Group anotherLargeGroup = new S3Group(mock(S3GroupIdentifier.class), equalBuffer); + final S3Group smallGroup = new S3Group(mock(S3GroupIdentifier.class), smallBuffer, mock(OutputCodec.class)); + final S3Group largeGroup = new S3Group(mock(S3GroupIdentifier.class), largeBuffer, mock(OutputCodec.class)); + final S3Group anotherLargeGroup = new S3Group(mock(S3GroupIdentifier.class), equalBuffer, mock(OutputCodec.class)); assertThat(smallGroup.compareTo(largeGroup), equalTo(1)); assertThat(largeGroup.compareTo(smallGroup), equalTo(-1));