Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new codec for each s3 group in s3 sink #4410

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey;
import org.opensearch.dataprepper.plugins.sink.s3.codec.CodecFactory;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions;
Expand Down Expand Up @@ -139,6 +140,9 @@ class S3SinkServiceIT {
private OutputCodec codec;
private KeyGenerator keyGenerator;

@Mock
private CodecFactory codecFactory;

@Mock
NdjsonOutputConfig ndjsonOutputConfig;

Expand Down Expand Up @@ -177,6 +181,8 @@ public void setUp() {
@Test
void verify_flushed_object_count_into_s3_bucket() {
configureNewLineCodec();
when(codecFactory.provideCodec()).thenReturn(codec);

int s3ObjectCountBeforeIngest = gets3ObjectCount();
S3SinkService s3SinkService = createObjectUnderTest();
s3SinkService.output(setEventQueue());
Expand All @@ -192,6 +198,8 @@ void configureNewLineCodec() {
@Test
void verify_flushed_records_into_s3_bucketNewLine() throws JsonProcessingException {
configureNewLineCodec();

when(codecFactory.provideCodec()).thenReturn(codec);
S3SinkService s3SinkService = createObjectUnderTest();
Collection<Record<Event>> recordsData = setEventQueue();

Expand Down Expand Up @@ -221,6 +229,8 @@ void verify_flushed_records_into_s3_bucketNewLine() throws JsonProcessingExcepti
@Test
void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOException {
configureNewLineCodec();
when(codecFactory.provideCodec()).thenReturn(codec);

bufferFactory = new CompressionBufferFactory(bufferFactory, CompressionOption.GZIP.getCompressionEngine(), codec);
S3SinkService s3SinkService = createObjectUnderTest();
Collection<Record<Event>> recordsData = setEventQueue();
Expand Down Expand Up @@ -256,9 +266,9 @@ void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOEx
private S3SinkService createObjectUnderTest() {
OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList());
final S3GroupIdentifierFactory groupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig);
s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, s3Client);
s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, codecFactory, s3Client);

return new S3SinkService(s3SinkConfig, codec, codecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics, s3GroupManager);
return new S3SinkService(s3SinkConfig, codecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics, s3GroupManager);
}

private int gets3ObjectCount() {
Expand Down Expand Up @@ -351,6 +361,8 @@ private static List<HashMap> generateRecords(int numberOfRecords) {
@Disabled
void verify_flushed_records_into_s3_bucket_Parquet() throws IOException {
configureParquetCodec();
when(codecFactory.provideCodec()).thenReturn(codec);

S3SinkService s3SinkService = createObjectUnderTest();
Collection<Record<Event>> recordsData = getRecordList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CodecBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.codec.BufferedCodec;
import org.opensearch.dataprepper.plugins.sink.s3.codec.CodecFactory;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupIdentifierFactory;
Expand All @@ -48,7 +49,6 @@ public class S3Sink extends AbstractSink<Record<Event>> {

private static final Duration RETRY_FLUSH_BACKOFF = Duration.ofSeconds(5);
private final S3SinkConfig s3SinkConfig;
private final OutputCodec codec;
private volatile boolean sinkInitialized;
private final S3SinkService s3SinkService;
private final BufferFactory bufferFactory;
Expand All @@ -70,24 +70,26 @@ public S3Sink(final PluginSetting pluginSetting,
this.s3SinkConfig = s3SinkConfig;
this.sinkContext = sinkContext;
final PluginModel codecConfiguration = s3SinkConfig.getCodec();
final CodecFactory codecFactory = new CodecFactory(pluginFactory, codecConfiguration);

final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
codecConfiguration.getPluginSettings());
codec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings);
final OutputCodec testCodec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings);
sinkInitialized = Boolean.FALSE;

final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier);
BufferFactory innerBufferFactory = s3SinkConfig.getBufferType().getBufferFactory();
if(codec instanceof ParquetOutputCodec && s3SinkConfig.getBufferType() != BufferTypeOptions.INMEMORY) {
if(testCodec instanceof ParquetOutputCodec && s3SinkConfig.getBufferType() != BufferTypeOptions.INMEMORY) {
throw new InvalidPluginConfigurationException("The Parquet sink codec is an in_memory buffer only.");
}
if(codec instanceof BufferedCodec) {
innerBufferFactory = new CodecBufferFactory(innerBufferFactory, (BufferedCodec) codec);
if(testCodec instanceof BufferedCodec) {
innerBufferFactory = new CodecBufferFactory(innerBufferFactory, (BufferedCodec) testCodec);
}
CompressionOption compressionOption = s3SinkConfig.getCompression();
final CompressionEngine compressionEngine = compressionOption.getCompressionEngine();
bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine, codec);
bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine, testCodec);

ExtensionProvider extensionProvider = StandardExtensionProvider.create(codec, compressionOption);
ExtensionProvider extensionProvider = StandardExtensionProvider.create(testCodec, compressionOption);
KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, extensionProvider, expressionEvaluator);

if (s3SinkConfig.getObjectKeyOptions().getPathPrefix() != null &&
Expand All @@ -102,13 +104,13 @@ public S3Sink(final PluginSetting pluginSetting,

S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption);

codec.validateAgainstCodecContext(s3OutputCodecContext);
testCodec.validateAgainstCodecContext(s3OutputCodecContext);

final S3GroupIdentifierFactory s3GroupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig);
final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, s3Client);
final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client);


s3SinkService = new S3SinkService(s3SinkConfig, codec, s3OutputCodecContext, s3Client, keyGenerator, RETRY_FLUSH_BACKOFF, pluginMetrics, s3GroupManager);
s3SinkService = new S3SinkService(s3SinkConfig, s3OutputCodecContext, s3Client, keyGenerator, RETRY_FLUSH_BACKOFF, pluginMetrics, s3GroupManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class S3SinkService {
static final String S3_OBJECTS_SIZE = "s3SinkObjectSizeBytes";
private final S3SinkConfig s3SinkConfig;
private final Lock reentrantLock;
private final OutputCodec codec;
private final S3Client s3Client;
private final int maxEvents;
private final ByteCount maxBytes;
Expand All @@ -70,15 +69,13 @@ public class S3SinkService {

/**
* @param s3SinkConfig s3 sink related configuration.
* @param codec parser.
* @param s3Client
* @param pluginMetrics metrics.
*/
public S3SinkService(final S3SinkConfig s3SinkConfig, final OutputCodec codec,
public S3SinkService(final S3SinkConfig s3SinkConfig,
final OutputCodecContext codecContext, final S3Client s3Client, final KeyGenerator keyGenerator,
final Duration retrySleepTime, final PluginMetrics pluginMetrics, final S3GroupManager s3GroupManager) {
this.s3SinkConfig = s3SinkConfig;
this.codec = codec;
this.s3Client = s3Client;
this.codecContext = codecContext;
this.keyGenerator = keyGenerator;
Expand Down Expand Up @@ -122,6 +119,7 @@ void output(Collection<Record<Event>> records) {
try {
final S3Group s3Group = s3GroupManager.getOrCreateGroupForEvent(event);
final Buffer currentBuffer = s3Group.getBuffer();
final OutputCodec codec = s3Group.getOutputCodec();

if (currentBuffer.getEventCount() == 0) {
codec.start(currentBuffer.getOutputStream(), event, codecContext);
Expand Down Expand Up @@ -178,7 +176,7 @@ private boolean flushToS3IfNeeded(final S3Group s3Group, final boolean forceFlus
s3Group.getBuffer().getSize(), s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getDuration());
if (forceFlush || ThresholdCheck.checkThresholdExceed(s3Group.getBuffer(), maxEvents, maxBytes, maxCollectionDuration)) {
try {
codec.complete(s3Group.getBuffer().getOutputStream());
s3Group.getOutputCodec().complete(s3Group.getBuffer().getOutputStream());
String s3Key = s3Group.getBuffer().getKey();
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
s3Key, s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getSize());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.opensearch.dataprepper.plugins.sink.s3.codec;

import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;

public class CodecFactory {

private final PluginFactory pluginFactory;

private final PluginSetting codecPluginSetting;

public CodecFactory(final PluginFactory pluginFactory,
final PluginModel codecConfiguration) {
this.pluginFactory = pluginFactory;
this.codecPluginSetting = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
}

public OutputCodec provideCodec() {
return pluginFactory.loadPlugin(OutputCodec.class, codecPluginSetting);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.sink.s3.grouping;

import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer;

Expand All @@ -15,21 +16,29 @@ public class S3Group implements Comparable<S3Group> {

private final Buffer buffer;

private OutputCodec outputCodec;

private final S3GroupIdentifier s3GroupIdentifier;

private final Collection<EventHandle> groupEventHandles;

public S3Group(final S3GroupIdentifier s3GroupIdentifier,
final Buffer buffer) {
final Buffer buffer,
final OutputCodec outputCodec) {
this.buffer = buffer;
this.s3GroupIdentifier = s3GroupIdentifier;
this.outputCodec = outputCodec;
this.groupEventHandles = new LinkedList<>();
}

public Buffer getBuffer() {
return buffer;
}

public OutputCodec getOutputCodec() {
return outputCodec;
}

S3GroupIdentifier getS3GroupIdentifier() { return s3GroupIdentifier; }

public void addEventHandle(final EventHandle eventHandle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
package org.opensearch.dataprepper.plugins.sink.s3.grouping;

import com.google.common.collect.Maps;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.codec.CodecFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
Expand All @@ -25,6 +27,9 @@ public class S3GroupManager {
private final S3SinkConfig s3SinkConfig;
private final S3GroupIdentifierFactory s3GroupIdentifierFactory;
private final BufferFactory bufferFactory;

private final CodecFactory codecFactory;

private final S3Client s3Client;

private long totalGroupSize;
Expand All @@ -33,10 +38,12 @@ public class S3GroupManager {
public S3GroupManager(final S3SinkConfig s3SinkConfig,
final S3GroupIdentifierFactory s3GroupIdentifierFactory,
final BufferFactory bufferFactory,
final CodecFactory codecFactory,
final S3Client s3Client) {
this.s3SinkConfig = s3SinkConfig;
this.s3GroupIdentifierFactory = s3GroupIdentifierFactory;
this.bufferFactory = bufferFactory;
this.codecFactory = codecFactory;
this.s3Client = s3Client;
totalGroupSize = 0;
}
Expand Down Expand Up @@ -67,7 +74,8 @@ public S3Group getOrCreateGroupForEvent(final Event event) {
return allGroups.get(s3GroupIdentifier);
} else {
final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3SinkConfig::getBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey);
final S3Group s3Group = new S3Group(s3GroupIdentifier, bufferForNewGroup);
final OutputCodec outputCodec = codecFactory.provideCodec();
final S3Group s3Group = new S3Group(s3GroupIdentifier, bufferForNewGroup, outputCodec);
allGroups.put(s3GroupIdentifier, s3Group);
LOG.debug("Created a new S3 group. Total number of groups: {}", allGroups.size());
return s3Group;
Expand Down
Loading
Loading