From 576aabfe776661b1b637e378154732fe1097d920 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 29 Apr 2024 09:17:18 -0500 Subject: [PATCH] Fix DocumentDB source S3PathPrefix null or empty Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../mongo/documentdb/DocumentDBService.java | 24 ++++++++++++-- .../mongo/documentdb/MongoTasksRefresher.java | 13 +++++--- .../plugins/mongo/export/ExportWorker.java | 16 ++++----- .../plugins/mongo/leader/LeaderScheduler.java | 33 ++++++++----------- .../plugins/mongo/stream/StreamScheduler.java | 13 +++----- .../documentdb/MongoTasksRefresherTest.java | 5 ++- .../mongo/export/ExportWorkerTest.java | 5 ++- .../mongo/leader/LeaderSchedulerTest.java | 23 +++++-------- .../mongo/stream/StreamSchedulerTest.java | 4 +-- 9 files changed, 72 insertions(+), 64 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java index 0bac0fb631..233c45d10e 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java @@ -23,6 +23,7 @@ public class DocumentDBService { private static final Logger LOG = LoggerFactory.getLogger(DocumentDBService.class); + private static final String S3_PATH_DELIMITER = "/"; private final EnhancedSourceCoordinator sourceCoordinator; private final PluginMetrics pluginMetrics; private final MongoDBSourceConfig sourceConfig; @@ -51,7 +52,8 @@ public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator, public void start(Buffer> buffer) { final List runnableList = new ArrayList<>(); - final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig); + final String s3PathPrefix = getS3PathPrefix(); + final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig, s3PathPrefix); runnableList.add(leaderScheduler); final List collections = sourceConfig.getCollections().stream().map(CollectionConfig::getCollection).collect(Collectors.toList()); if (!collections.isEmpty()) { @@ -65,12 +67,30 @@ public void start(Buffer> buffer) { final MongoTasksRefresher mongoTasksRefresher = new MongoTasksRefresher( buffer, sourceCoordinator, pluginMetrics, acknowledgementSetManager, numThread -> Executors.newFixedThreadPool( - numThread, BackgroundThreadFactory.defaultExecutorThreadFactory("documentdb-source"))); + numThread, BackgroundThreadFactory.defaultExecutorThreadFactory("documentdb-source")), + s3PathPrefix); mongoTasksRefresher.initialize(sourceConfig); pluginConfigObservable.addPluginConfigObserver( pluginConfig -> mongoTasksRefresher.update((MongoDBSourceConfig) pluginConfig)); } + private String getS3PathPrefix() { + final String s3UserPathPrefix; + if (sourceConfig.getS3Prefix() != null && !sourceConfig.getS3Prefix().isBlank()) { + s3UserPathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER; + } else { + s3UserPathPrefix = ""; + } + + final String s3PathPrefix; + if (sourceCoordinator.getPartitionPrefix() != null ) { + s3PathPrefix = s3UserPathPrefix + sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER; + } else { + s3PathPrefix = s3UserPathPrefix; + } + return s3PathPrefix; + } + /** * Interrupt the running of schedulers. * Each scheduler must implement logic for gracefully shutdown. diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/MongoTasksRefresher.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/MongoTasksRefresher.java index e19359d8f9..ba636f798f 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/MongoTasksRefresher.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/MongoTasksRefresher.java @@ -39,6 +39,7 @@ public class MongoTasksRefresher implements PluginConfigObserver executorServiceFunction; private final Counter credentialsChangeCounter; private final Counter executorRefreshErrorsCounter; + private final String s3PathPrefix; private MongoDBExportPartitionSupplier currentMongoDBExportPartitionSupplier; private MongoDBSourceConfig currentMongoDBSourceConfig; private ExecutorService currentExecutor; @@ -47,14 +48,16 @@ public MongoTasksRefresher(final Buffer> buffer, final EnhancedSourceCoordinator sourceCoordinator, final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager, - final Function executorServiceFunction) { + final Function executorServiceFunction, + final String s3PathPrefix) { this.sourceCoordinator = sourceCoordinator; this.pluginMetrics = pluginMetrics; this.acknowledgementSetManager = acknowledgementSetManager; this.buffer = buffer; this.executorServiceFunction = executorServiceFunction; - credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED); - executorRefreshErrorsCounter = pluginMetrics.counter(EXECUTOR_REFRESH_ERRORS); + this.credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED); + this.executorRefreshErrorsCounter = pluginMetrics.counter(EXECUTOR_REFRESH_ERRORS); + this.s3PathPrefix = s3PathPrefix; } public void initialize(final MongoDBSourceConfig sourceConfig) { @@ -84,11 +87,11 @@ private void refreshJobs(MongoDBSourceConfig pluginConfig) { currentMongoDBExportPartitionSupplier = new MongoDBExportPartitionSupplier(pluginConfig); runnables.add(new ExportScheduler(sourceCoordinator, currentMongoDBExportPartitionSupplier, pluginMetrics)); runnables.add(new ExportWorker( - sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, pluginConfig)); + sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, pluginConfig, s3PathPrefix)); } if (pluginConfig.getCollections().stream().anyMatch(CollectionConfig::isStream)) { runnables.add(new StreamScheduler( - sourceCoordinator, buffer, acknowledgementSetManager, pluginConfig, pluginMetrics)); + sourceCoordinator, buffer, acknowledgementSetManager, pluginConfig, s3PathPrefix, pluginMetrics)); } this.currentExecutor = executorServiceFunction.apply(runnables.size()); runnables.forEach(currentExecutor::submit); diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorker.java index 8b0b143d25..774c2c8dc9 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorker.java @@ -57,8 +57,6 @@ public class ExportWorker implements Runnable { */ private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 2_000; - private static final String S3_PATH_DELIMITER = "/"; - /** * Start Line is the checkpoint */ @@ -77,13 +75,15 @@ public class ExportWorker implements Runnable { private final EnhancedSourceCoordinator sourceCoordinator; private final ExecutorService executor; private final PluginMetrics pluginMetrics; + private final String s3PathPrefix; public ExportWorker(final EnhancedSourceCoordinator sourceCoordinator, final Buffer> buffer, final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager, - final MongoDBSourceConfig sourceConfig) { + final MongoDBSourceConfig sourceConfig, + final String s3PathPrefix) { this.sourceCoordinator = sourceCoordinator; executor = Executors.newFixedThreadPool(MAX_JOB_COUNT); final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); @@ -92,6 +92,7 @@ public ExportWorker(final EnhancedSourceCoordinator sourceCoordinator, this.sourceConfig = sourceConfig; this.startLine = 0;// replace it with checkpoint line this.pluginMetrics = pluginMetrics; + this.s3PathPrefix = s3PathPrefix; this.successPartitionCounter = pluginMetrics.counter(SUCCESS_PARTITION_COUNTER_NAME); this.failureParitionCounter = pluginMetrics.counter(FAILURE_PARTITION_COUNTER_NAME); this.activeExportPartitionConsumerGauge = pluginMetrics.gauge(ACTIVE_EXPORT_PARTITION_CONSUMERS_GAUGE, numOfWorkers); @@ -110,15 +111,10 @@ public void run() { if (sourcePartition.isPresent()) { dataQueryPartition = (DataQueryPartition) sourcePartition.get(); final AcknowledgementSet acknowledgementSet = createAcknowledgementSet(dataQueryPartition).orElse(null); - final String s3PathPrefix; - if (sourceCoordinator.getPartitionPrefix() != null ) { - s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER + dataQueryPartition.getCollection(); - } else { - s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + dataQueryPartition.getCollection(); - } + final String s3Prefix = s3PathPrefix + dataQueryPartition.getCollection(); final DataQueryPartitionCheckpoint partitionCheckpoint = new DataQueryPartitionCheckpoint(sourceCoordinator, dataQueryPartition); final PartitionKeyRecordConverter recordConverter = new PartitionKeyRecordConverter(dataQueryPartition.getCollection(), - ExportPartition.PARTITION_TYPE, s3PathPrefix); + ExportPartition.PARTITION_TYPE, s3Prefix); final ExportPartitionWorker exportPartitionWorker = new ExportPartitionWorker(recordBufferWriter, recordConverter, dataQueryPartition, acknowledgementSet, sourceConfig, partitionCheckpoint, Instant.now().toEpochMilli(), pluginMetrics); final CompletableFuture runLoader = CompletableFuture.runAsync(exportPartitionWorker, executor); diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java index 6e088f426a..aa8af0e6d9 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java @@ -34,7 +34,6 @@ public class LeaderScheduler implements Runnable { * Default duration to extend the timeout of lease */ static final int DEFAULT_EXTEND_LEASE_MINUTES = 3; - private static final String S3_PATH_DELIMITER = "/"; /** * Default interval to run lease check and shard discovery @@ -44,20 +43,23 @@ public class LeaderScheduler implements Runnable { private final MongoDBSourceConfig sourceConfig; private final EnhancedSourceCoordinator coordinator; + private final String s3PathPrefix; private final Duration leaseInterval; private LeaderPartition leaderPartition; - public LeaderScheduler(final EnhancedSourceCoordinator coordinator, final MongoDBSourceConfig sourceConfig) { - this(coordinator, sourceConfig, DEFAULT_LEASE_INTERVAL); + public LeaderScheduler(final EnhancedSourceCoordinator coordinator, final MongoDBSourceConfig sourceConfig, final String s3PathPrefix) { + this(coordinator, sourceConfig, s3PathPrefix, DEFAULT_LEASE_INTERVAL); } - LeaderScheduler(EnhancedSourceCoordinator coordinator, - MongoDBSourceConfig sourceConfig, - Duration leaseInterval) { + LeaderScheduler(final EnhancedSourceCoordinator coordinator, + final MongoDBSourceConfig sourceConfig, + final String s3PathPrefix, + final Duration leaseInterval) { this.sourceConfig = sourceConfig; this.coordinator = coordinator; + this.s3PathPrefix = s3PathPrefix; this.leaseInterval = leaseInterval; } @@ -128,8 +130,8 @@ private void init() { createExportGlobalState(collectionConfig); } - final String s3PathPrefix = getS3PathPrefix(collectionConfig); - createS3Partition(sourceConfig.getS3Bucket(), sourceConfig.getS3Region(), s3PathPrefix, collectionConfig); + final String s3Prefix = getS3PathPrefix(collectionConfig); + createS3Partition(sourceConfig.getS3Bucket(), sourceConfig.getS3Region(), s3Prefix, collectionConfig); if (collectionConfig.isStream()) { createStreamPartition(collectionConfig, startTime, exportRequired); @@ -143,20 +145,11 @@ private void init() { } private String getS3PathPrefix(final CollectionConfig collectionConfig) { - final String s3UserPathPrefix; - if (sourceConfig.getS3Prefix() != null && !sourceConfig.getS3Prefix().isBlank()) { - s3UserPathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER; + if (s3PathPrefix == null || s3PathPrefix.isBlank()) { + return collectionConfig.getCollection(); } else { - s3UserPathPrefix = ""; + return s3PathPrefix + collectionConfig.getCollection(); } - - final String s3PathPrefix; - if (coordinator.getPartitionPrefix() != null ) { - s3PathPrefix = s3UserPathPrefix + coordinator.getPartitionPrefix() + S3_PATH_DELIMITER + collectionConfig.getCollection(); - } else { - s3PathPrefix = s3UserPathPrefix + collectionConfig.getCollection(); - } - return s3PathPrefix; } /** diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java index 7ead6fb836..2159d39a17 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java @@ -26,7 +26,6 @@ public class StreamScheduler implements Runnable { static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 120_000; static final int DEFAULT_BUFFER_WRITE_INTERVAL_MILLS = 15_000; private static final int DEFAULT_MONITOR_WAIT_TIME_MS = 15_000; - private static final String S3_PATH_DELIMITER = "/"; /** * Number of records to accumulate before flushing to buffer */ @@ -40,17 +39,20 @@ public class StreamScheduler implements Runnable { private final RecordBufferWriter recordBufferWriter; private final AcknowledgementSetManager acknowledgementSetManager; private final MongoDBSourceConfig sourceConfig; + private final String s3PathPrefix; private final PluginMetrics pluginMetrics; public StreamScheduler(final EnhancedSourceCoordinator sourceCoordinator, final Buffer> buffer, final AcknowledgementSetManager acknowledgementSetManager, final MongoDBSourceConfig sourceConfig, + final String s3PathPrefix, final PluginMetrics pluginMetrics) { this.sourceCoordinator = sourceCoordinator; final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); recordBufferWriter = ExportRecordBufferWriter.create(bufferAccumulator, pluginMetrics); this.acknowledgementSetManager = acknowledgementSetManager; this.sourceConfig = sourceConfig; + this.s3PathPrefix = s3PathPrefix; this.pluginMetrics = pluginMetrics; } @@ -102,13 +104,8 @@ private StreamWorker getStreamWorker (final StreamPartition streamPartition) { } private PartitionKeyRecordConverter getPartitionKeyRecordConverter(final StreamPartition streamPartition) { - final String s3PathPrefix; - if (sourceCoordinator.getPartitionPrefix() != null ) { - s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER + streamPartition.getCollection(); - } else { - s3PathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER + streamPartition.getCollection(); - } + final String s3Prefix = s3PathPrefix + streamPartition.getCollection(); return new PartitionKeyRecordConverter(streamPartition.getCollection(), - StreamPartition.PARTITION_TYPE, s3PathPrefix); + StreamPartition.PARTITION_TYPE, s3Prefix); } } diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/documentdb/MongoTasksRefresherTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/documentdb/MongoTasksRefresherTest.java index 694bf86e0d..1b66760344 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/documentdb/MongoTasksRefresherTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/documentdb/MongoTasksRefresherTest.java @@ -24,6 +24,7 @@ import org.opensearch.dataprepper.plugins.mongo.stream.StreamScheduler; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.function.Function; @@ -43,6 +44,7 @@ class MongoTasksRefresherTest { private static final String TEST_USERNAME = "test_user"; private static final String TEST_PASSWORD = "test_password"; + private final String S3_PATH_PREFIX = UUID.randomUUID().toString(); @Mock private EnhancedSourceCoordinator enhancedSourceCoordinator; @@ -79,7 +81,8 @@ class MongoTasksRefresherTest { private MongoTasksRefresher createObjectUnderTest() { return new MongoTasksRefresher( - buffer, enhancedSourceCoordinator, pluginMetrics, acknowledgementSetManager, executorServiceFunction); + buffer, enhancedSourceCoordinator, pluginMetrics, acknowledgementSetManager, + executorServiceFunction, S3_PATH_PREFIX); } @BeforeEach diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorkerTest.java index cb65f74887..1d31505eff 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorkerTest.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -24,6 +25,8 @@ @ExtendWith(MockitoExtension.class) public class ExportWorkerTest { + private final String S3_PATH_PREFIX = UUID.randomUUID().toString(); + @Mock private EnhancedSourceCoordinator sourceCoordinator; @@ -46,7 +49,7 @@ public class ExportWorkerTest { @BeforeEach public void setup() throws Exception { - exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig); + exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig, S3_PATH_PREFIX); } @Test diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java index eedda1c654..028f145b39 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java @@ -65,7 +65,7 @@ public class LeaderSchedulerTest { @Test void test_non_leader_run() { - leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, Duration.ofMillis(100)); + leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, TEST_S3_PATH_PREFIX, Duration.ofMillis(100)); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.empty()); final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> leaderScheduler.run()); @@ -78,7 +78,7 @@ void test_non_leader_run() { @Test void test_should_init() { given(mongoDBSourceConfig.getCollections()).willReturn(List.of(collectionConfig)); - leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, Duration.ofMillis(100)); + leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, TEST_S3_PATH_PREFIX, Duration.ofMillis(100)); leaderPartition = new LeaderPartition(); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); given(collectionConfig.isExport()).willReturn(true); @@ -86,7 +86,6 @@ void test_should_init() { given(collectionConfig.getExportBatchSize()).willReturn(Math.abs(new Random().nextInt())); given(collectionConfig.getCollection()).willReturn(TEST_COLLECTION); given(mongoDBSourceConfig.getS3Bucket()).willReturn(TEST_S3_BUCKET_NAME); - given(mongoDBSourceConfig.getS3Prefix()).willReturn(TEST_S3_PATH_PREFIX); given(mongoDBSourceConfig.getS3Region()).willReturn(TEST_S3_REGION); final int partitionCount = Math.abs(new Random().nextInt(10)); given(collectionConfig.getPartitionCount()).willReturn(partitionCount); @@ -139,13 +138,12 @@ void test_should_init() { @Test void test_should_init_export() { given(mongoDBSourceConfig.getCollections()).willReturn(List.of(collectionConfig)); - leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, Duration.ofMillis(100)); + leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, TEST_S3_PATH_PREFIX, Duration.ofMillis(100)); leaderPartition = new LeaderPartition(); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); given(collectionConfig.isExport()).willReturn(true); given(collectionConfig.getExportBatchSize()).willReturn(Math.abs(new Random().nextInt())); given(collectionConfig.getCollection()).willReturn(TEST_COLLECTION); - given(mongoDBSourceConfig.getS3Prefix()).willReturn(TEST_S3_PATH_PREFIX); given(mongoDBSourceConfig.getS3Bucket()).willReturn(TEST_S3_BUCKET_NAME); given(mongoDBSourceConfig.getS3Region()).willReturn(TEST_S3_REGION); final int partitionCount = Math.abs(new Random().nextInt(10)); @@ -198,12 +196,11 @@ void test_should_init_export() { @Test void test_should_init_stream() { given(mongoDBSourceConfig.getCollections()).willReturn(List.of(collectionConfig)); - leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, Duration.ofMillis(100)); + leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, TEST_S3_PATH_PREFIX, Duration.ofMillis(100)); leaderPartition = new LeaderPartition(); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); given(collectionConfig.isStream()).willReturn(true); given(collectionConfig.getCollection()).willReturn(TEST_COLLECTION); - given(mongoDBSourceConfig.getS3Prefix()).willReturn(TEST_S3_PATH_PREFIX); given(mongoDBSourceConfig.getS3Bucket()).willReturn(TEST_S3_BUCKET_NAME); given(mongoDBSourceConfig.getS3Region()).willReturn(TEST_S3_REGION); final int partitionCount = Math.abs(new Random().nextInt(10)); @@ -245,12 +242,11 @@ void test_should_init_stream() { @Test void test_shouldInitStream_withEmptyS3PathPrefix() { given(mongoDBSourceConfig.getCollections()).willReturn(List.of(collectionConfig)); - leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, Duration.ofMillis(100)); + leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, "", Duration.ofMillis(100)); leaderPartition = new LeaderPartition(); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); given(collectionConfig.isStream()).willReturn(true); given(collectionConfig.getCollection()).willReturn(TEST_COLLECTION); - given(mongoDBSourceConfig.getS3Prefix()).willReturn(null); given(mongoDBSourceConfig.getS3Bucket()).willReturn(TEST_S3_BUCKET_NAME); given(mongoDBSourceConfig.getS3Region()).willReturn(TEST_S3_REGION); final int partitionCount = Math.abs(new Random().nextInt(10)); @@ -290,16 +286,13 @@ void test_shouldInitStream_withEmptyS3PathPrefix() { } @Test - void test_shouldInitStream_withEmptyS3PathPrefixWithCoordinatorPartitionPrefix() { + void test_shouldInitStream_withNullS3PathPrefix() { given(mongoDBSourceConfig.getCollections()).willReturn(List.of(collectionConfig)); - leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, Duration.ofMillis(100)); + leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, null, Duration.ofMillis(100)); leaderPartition = new LeaderPartition(); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); - final String coordinatorPartitionPrefix = UUID.randomUUID().toString(); - given(coordinator.getPartitionPrefix()).willReturn(coordinatorPartitionPrefix); given(collectionConfig.isStream()).willReturn(true); given(collectionConfig.getCollection()).willReturn(TEST_COLLECTION); - given(mongoDBSourceConfig.getS3Prefix()).willReturn(null); given(mongoDBSourceConfig.getS3Bucket()).willReturn(TEST_S3_BUCKET_NAME); given(mongoDBSourceConfig.getS3Region()).willReturn(TEST_S3_REGION); final int partitionCount = Math.abs(new Random().nextInt(10)); @@ -331,7 +324,7 @@ void test_shouldInitStream_withEmptyS3PathPrefixWithCoordinatorPartitionPrefix() final String[] partitionKeys = s3FolderPartition.getPartitionKey().split("\\|"); assertThat(partitionKeys[0], is(TEST_COLLECTION)); assertThat(partitionKeys[1], is(TEST_S3_BUCKET_NAME)); - assertThat(partitionKeys[2], startsWith(coordinatorPartitionPrefix)); + assertThat(partitionKeys[2], startsWith(TEST_COLLECTION)); assertThat(partitionKeys[3], is(String.valueOf(partitionCount))); assertThat(partitionKeys[4], is(TEST_S3_REGION)); assertThat(allEnhancedSourcePartitions.get(2), instanceOf(StreamPartition.class)); diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java index e3a3263f21..724b882810 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java @@ -42,6 +42,7 @@ @ExtendWith(MockitoExtension.class) public class StreamSchedulerTest { + private final String S3_PATH_PREFIX = UUID.randomUUID().toString(); @Mock private EnhancedSourceCoordinator sourceCoordinator; @@ -69,8 +70,7 @@ public class StreamSchedulerTest { @BeforeEach void setup() { lenient().when(sourceConfig.getCollections()).thenReturn(List.of(collectionConfig)); - //given(sourceConfig.getCollections()).willReturn(List.of(collectionConfig)); - streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics); + streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, S3_PATH_PREFIX, pluginMetrics); }