Skip to content

Commit

Permalink
Fix DocumentDB source S3PathPrefix null or empty
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
  • Loading branch information
dinujoh committed Apr 29, 2024
1 parent d3b0e78 commit 576aabf
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

public class DocumentDBService {
private static final Logger LOG = LoggerFactory.getLogger(DocumentDBService.class);
private static final String S3_PATH_DELIMITER = "/";
private final EnhancedSourceCoordinator sourceCoordinator;
private final PluginMetrics pluginMetrics;
private final MongoDBSourceConfig sourceConfig;
Expand Down Expand Up @@ -51,7 +52,8 @@ public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
public void start(Buffer<Record<Event>> buffer) {
final List<Runnable> runnableList = new ArrayList<>();

final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig);
final String s3PathPrefix = getS3PathPrefix();
final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig, s3PathPrefix);
runnableList.add(leaderScheduler);
final List<String> collections = sourceConfig.getCollections().stream().map(CollectionConfig::getCollection).collect(Collectors.toList());
if (!collections.isEmpty()) {
Expand All @@ -65,12 +67,30 @@ public void start(Buffer<Record<Event>> buffer) {
final MongoTasksRefresher mongoTasksRefresher = new MongoTasksRefresher(
buffer, sourceCoordinator, pluginMetrics, acknowledgementSetManager,
numThread -> Executors.newFixedThreadPool(
numThread, BackgroundThreadFactory.defaultExecutorThreadFactory("documentdb-source")));
numThread, BackgroundThreadFactory.defaultExecutorThreadFactory("documentdb-source")),
s3PathPrefix);
mongoTasksRefresher.initialize(sourceConfig);
pluginConfigObservable.addPluginConfigObserver(
pluginConfig -> mongoTasksRefresher.update((MongoDBSourceConfig) pluginConfig));
}

private String getS3PathPrefix() {
final String s3UserPathPrefix;
if (sourceConfig.getS3Prefix() != null && !sourceConfig.getS3Prefix().isBlank()) {
s3UserPathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER;
} else {
s3UserPathPrefix = "";
}

final String s3PathPrefix;
if (sourceCoordinator.getPartitionPrefix() != null ) {
s3PathPrefix = s3UserPathPrefix + sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER;
} else {
s3PathPrefix = s3UserPathPrefix;
}
return s3PathPrefix;
}

/**
* Interrupt the running of schedulers.
* Each scheduler must implement logic for gracefully shutdown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class MongoTasksRefresher implements PluginConfigObserver<MongoDBSourceCo
private final Function<Integer, ExecutorService> executorServiceFunction;
private final Counter credentialsChangeCounter;
private final Counter executorRefreshErrorsCounter;
private final String s3PathPrefix;
private MongoDBExportPartitionSupplier currentMongoDBExportPartitionSupplier;
private MongoDBSourceConfig currentMongoDBSourceConfig;
private ExecutorService currentExecutor;
Expand All @@ -47,14 +48,16 @@ public MongoTasksRefresher(final Buffer<Record<Event>> buffer,
final EnhancedSourceCoordinator sourceCoordinator,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager,
final Function<Integer, ExecutorService> executorServiceFunction) {
final Function<Integer, ExecutorService> executorServiceFunction,
final String s3PathPrefix) {
this.sourceCoordinator = sourceCoordinator;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.buffer = buffer;
this.executorServiceFunction = executorServiceFunction;
credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED);
executorRefreshErrorsCounter = pluginMetrics.counter(EXECUTOR_REFRESH_ERRORS);
this.credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED);
this.executorRefreshErrorsCounter = pluginMetrics.counter(EXECUTOR_REFRESH_ERRORS);
this.s3PathPrefix = s3PathPrefix;
}

public void initialize(final MongoDBSourceConfig sourceConfig) {
Expand Down Expand Up @@ -84,11 +87,11 @@ private void refreshJobs(MongoDBSourceConfig pluginConfig) {
currentMongoDBExportPartitionSupplier = new MongoDBExportPartitionSupplier(pluginConfig);
runnables.add(new ExportScheduler(sourceCoordinator, currentMongoDBExportPartitionSupplier, pluginMetrics));
runnables.add(new ExportWorker(
sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, pluginConfig));
sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, pluginConfig, s3PathPrefix));
}
if (pluginConfig.getCollections().stream().anyMatch(CollectionConfig::isStream)) {
runnables.add(new StreamScheduler(
sourceCoordinator, buffer, acknowledgementSetManager, pluginConfig, pluginMetrics));
sourceCoordinator, buffer, acknowledgementSetManager, pluginConfig, s3PathPrefix, pluginMetrics));
}
this.currentExecutor = executorServiceFunction.apply(runnables.size());
runnables.forEach(currentExecutor::submit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ public class ExportWorker implements Runnable {
*/
private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 2_000;

private static final String S3_PATH_DELIMITER = "/";

/**
* Start Line is the checkpoint
*/
Expand All @@ -77,13 +75,15 @@ public class ExportWorker implements Runnable {
private final EnhancedSourceCoordinator sourceCoordinator;
private final ExecutorService executor;
private final PluginMetrics pluginMetrics;
private final String s3PathPrefix;


public ExportWorker(final EnhancedSourceCoordinator sourceCoordinator,
final Buffer<Record<Event>> buffer,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager,
final MongoDBSourceConfig sourceConfig) {
final MongoDBSourceConfig sourceConfig,
final String s3PathPrefix) {
this.sourceCoordinator = sourceCoordinator;
executor = Executors.newFixedThreadPool(MAX_JOB_COUNT);
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
Expand All @@ -92,6 +92,7 @@ public ExportWorker(final EnhancedSourceCoordinator sourceCoordinator,
this.sourceConfig = sourceConfig;
this.startLine = 0;// replace it with checkpoint line
this.pluginMetrics = pluginMetrics;
this.s3PathPrefix = s3PathPrefix;
this.successPartitionCounter = pluginMetrics.counter(SUCCESS_PARTITION_COUNTER_NAME);
this.failureParitionCounter = pluginMetrics.counter(FAILURE_PARTITION_COUNTER_NAME);
this.activeExportPartitionConsumerGauge = pluginMetrics.gauge(ACTIVE_EXPORT_PARTITION_CONSUMERS_GAUGE, numOfWorkers);
Expand All @@ -110,15 +111,10 @@ public void run() {
if (sourcePartition.isPresent()) {
dataQueryPartition = (DataQueryPartition) sourcePartition.get();
final AcknowledgementSet acknowledgementSet = createAcknowledgementSet(dataQueryPartition).orElse(null);
final String s3PathPrefix;
if (sourceCoordinator.getPartitionPrefix() != null ) {
s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER + dataQueryPartition.getCollection();
} else {
s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + dataQueryPartition.getCollection();
}
final String s3Prefix = s3PathPrefix + dataQueryPartition.getCollection();
final DataQueryPartitionCheckpoint partitionCheckpoint = new DataQueryPartitionCheckpoint(sourceCoordinator, dataQueryPartition);
final PartitionKeyRecordConverter recordConverter = new PartitionKeyRecordConverter(dataQueryPartition.getCollection(),
ExportPartition.PARTITION_TYPE, s3PathPrefix);
ExportPartition.PARTITION_TYPE, s3Prefix);
final ExportPartitionWorker exportPartitionWorker = new ExportPartitionWorker(recordBufferWriter, recordConverter,
dataQueryPartition, acknowledgementSet, sourceConfig, partitionCheckpoint, Instant.now().toEpochMilli(), pluginMetrics);
final CompletableFuture<Void> runLoader = CompletableFuture.runAsync(exportPartitionWorker, executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class LeaderScheduler implements Runnable {
* Default duration to extend the timeout of lease
*/
static final int DEFAULT_EXTEND_LEASE_MINUTES = 3;
private static final String S3_PATH_DELIMITER = "/";

/**
* Default interval to run lease check and shard discovery
Expand All @@ -44,20 +43,23 @@ public class LeaderScheduler implements Runnable {
private final MongoDBSourceConfig sourceConfig;

private final EnhancedSourceCoordinator coordinator;
private final String s3PathPrefix;

private final Duration leaseInterval;

private LeaderPartition leaderPartition;

public LeaderScheduler(final EnhancedSourceCoordinator coordinator, final MongoDBSourceConfig sourceConfig) {
this(coordinator, sourceConfig, DEFAULT_LEASE_INTERVAL);
public LeaderScheduler(final EnhancedSourceCoordinator coordinator, final MongoDBSourceConfig sourceConfig, final String s3PathPrefix) {
this(coordinator, sourceConfig, s3PathPrefix, DEFAULT_LEASE_INTERVAL);
}

LeaderScheduler(EnhancedSourceCoordinator coordinator,
MongoDBSourceConfig sourceConfig,
Duration leaseInterval) {
LeaderScheduler(final EnhancedSourceCoordinator coordinator,
final MongoDBSourceConfig sourceConfig,
final String s3PathPrefix,
final Duration leaseInterval) {
this.sourceConfig = sourceConfig;
this.coordinator = coordinator;
this.s3PathPrefix = s3PathPrefix;
this.leaseInterval = leaseInterval;
}

Expand Down Expand Up @@ -128,8 +130,8 @@ private void init() {
createExportGlobalState(collectionConfig);
}

final String s3PathPrefix = getS3PathPrefix(collectionConfig);
createS3Partition(sourceConfig.getS3Bucket(), sourceConfig.getS3Region(), s3PathPrefix, collectionConfig);
final String s3Prefix = getS3PathPrefix(collectionConfig);
createS3Partition(sourceConfig.getS3Bucket(), sourceConfig.getS3Region(), s3Prefix, collectionConfig);

if (collectionConfig.isStream()) {
createStreamPartition(collectionConfig, startTime, exportRequired);
Expand All @@ -143,20 +145,11 @@ private void init() {
}

private String getS3PathPrefix(final CollectionConfig collectionConfig) {
final String s3UserPathPrefix;
if (sourceConfig.getS3Prefix() != null && !sourceConfig.getS3Prefix().isBlank()) {
s3UserPathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER;
if (s3PathPrefix == null || s3PathPrefix.isBlank()) {
return collectionConfig.getCollection();
} else {
s3UserPathPrefix = "";
return s3PathPrefix + collectionConfig.getCollection();
}

final String s3PathPrefix;
if (coordinator.getPartitionPrefix() != null ) {
s3PathPrefix = s3UserPathPrefix + coordinator.getPartitionPrefix() + S3_PATH_DELIMITER + collectionConfig.getCollection();
} else {
s3PathPrefix = s3UserPathPrefix + collectionConfig.getCollection();
}
return s3PathPrefix;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public class StreamScheduler implements Runnable {
static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 120_000;
static final int DEFAULT_BUFFER_WRITE_INTERVAL_MILLS = 15_000;
private static final int DEFAULT_MONITOR_WAIT_TIME_MS = 15_000;
private static final String S3_PATH_DELIMITER = "/";
/**
* Number of records to accumulate before flushing to buffer
*/
Expand All @@ -40,17 +39,20 @@ public class StreamScheduler implements Runnable {
private final RecordBufferWriter recordBufferWriter;
private final AcknowledgementSetManager acknowledgementSetManager;
private final MongoDBSourceConfig sourceConfig;
private final String s3PathPrefix;
private final PluginMetrics pluginMetrics;
public StreamScheduler(final EnhancedSourceCoordinator sourceCoordinator,
final Buffer<Record<Event>> buffer,
final AcknowledgementSetManager acknowledgementSetManager,
final MongoDBSourceConfig sourceConfig,
final String s3PathPrefix,
final PluginMetrics pluginMetrics) {
this.sourceCoordinator = sourceCoordinator;
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
recordBufferWriter = ExportRecordBufferWriter.create(bufferAccumulator, pluginMetrics);
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceConfig = sourceConfig;
this.s3PathPrefix = s3PathPrefix;
this.pluginMetrics = pluginMetrics;
}

Expand Down Expand Up @@ -102,13 +104,8 @@ private StreamWorker getStreamWorker (final StreamPartition streamPartition) {
}

private PartitionKeyRecordConverter getPartitionKeyRecordConverter(final StreamPartition streamPartition) {
final String s3PathPrefix;
if (sourceCoordinator.getPartitionPrefix() != null ) {
s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER + streamPartition.getCollection();
} else {
s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + streamPartition.getCollection();
}
final String s3Prefix = s3PathPrefix + streamPartition.getCollection();
return new PartitionKeyRecordConverter(streamPartition.getCollection(),
StreamPartition.PARTITION_TYPE, s3PathPrefix);
StreamPartition.PARTITION_TYPE, s3Prefix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.dataprepper.plugins.mongo.stream.StreamScheduler;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;

Expand All @@ -43,6 +44,7 @@
class MongoTasksRefresherTest {
private static final String TEST_USERNAME = "test_user";
private static final String TEST_PASSWORD = "test_password";
private final String S3_PATH_PREFIX = UUID.randomUUID().toString();

@Mock
private EnhancedSourceCoordinator enhancedSourceCoordinator;
Expand Down Expand Up @@ -79,7 +81,8 @@ class MongoTasksRefresherTest {

private MongoTasksRefresher createObjectUnderTest() {
return new MongoTasksRefresher(
buffer, enhancedSourceCoordinator, pluginMetrics, acknowledgementSetManager, executorServiceFunction);
buffer, enhancedSourceCoordinator, pluginMetrics, acknowledgementSetManager,
executorServiceFunction, S3_PATH_PREFIX);
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -24,6 +25,8 @@

@ExtendWith(MockitoExtension.class)
public class ExportWorkerTest {
private final String S3_PATH_PREFIX = UUID.randomUUID().toString();

@Mock
private EnhancedSourceCoordinator sourceCoordinator;

Expand All @@ -46,7 +49,7 @@ public class ExportWorkerTest {

@BeforeEach
public void setup() throws Exception {
exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig);
exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig, S3_PATH_PREFIX);
}

@Test
Expand Down
Loading

0 comments on commit 576aabf

Please sign in to comment.