Skip to content

Commit

Permalink
Add multiple worker support for S3 Scan
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
Krishna Kondaka committed Apr 18, 2024
1 parent e3a5701 commit da4f0a5
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ public interface SourceCoordinator<T> {
*/
void giveUpPartitions();

/**
* Should be called by the source when it is shutting down to indicate that it will no longer be able to perform work on partitions,
* or can be called to give up ownership of its partitions in order to pick up new ones with {@link #getNextPartition(Function)} ()}.
* Same as giveUpPartitions() but with specific partition to be
* given up. This can be used when source coordinator is shared
* by multiple threads.
*
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException if the partition could not be given up due to some failure
* @since 2.8
*/
void giveUpPartitions(SourcePartition<T> sourcePartition);


/**
* Should be called by the source after when acknowledgments are enabled to keep ownership of the partition for acknowledgmentTimeout amount of time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.concurrent.locks.ReentrantLock;

public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {

Expand Down Expand Up @@ -89,6 +90,7 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
private final Counter saveStatePartitionUpdateErrorCounter;
private final Counter closePartitionUpdateErrorCounter;
private final Counter completePartitionUpdateErrorCounter;
private final ReentrantLock lock;

static {
try {
Expand Down Expand Up @@ -128,6 +130,7 @@ public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass,
this.saveStatePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, SAVE_STATE_ACTION);
this.closePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, CLOSE_ACTION);
this.completePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, COMPLETE_ACTION);
this.lock = new ReentrantLock();
}

@Override
Expand All @@ -141,26 +144,31 @@ public void initialize() {
public Optional<SourcePartition<T>> getNextPartition(final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier) {
validateIsInitialized();

if (partitionManager.getActivePartition().isPresent()) {
return partitionManager.getActivePartition();
}
Optional<SourcePartitionStoreItem> ownedPartitions = Optional.empty();
try {
if (lock.tryLock()) {
ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);

Optional<SourcePartitionStoreItem> ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);
if (ownedPartitions.isEmpty()) {

if (ownedPartitions.isEmpty()) {
final Optional<SourcePartitionStoreItem> acquiredGlobalStateForPartitionCreation = acquireGlobalStateForPartitionCreation();

final Optional<SourcePartitionStoreItem> acquiredGlobalStateForPartitionCreation = acquireGlobalStateForPartitionCreation();
if (acquiredGlobalStateForPartitionCreation.isPresent()) {
final Map<String, Object> globalStateMap = convertStringToGlobalStateMap(acquiredGlobalStateForPartitionCreation.get().getPartitionProgressState());
LOG.info("Partition owner {} did not acquire any partitions. Running partition creation supplier to create more partitions", ownerId);
createPartitions(partitionCreationSupplier.apply(globalStateMap));
partitionCreationSupplierInvocationsCounter.increment();

if (acquiredGlobalStateForPartitionCreation.isPresent()) {
final Map<String, Object> globalStateMap = convertStringToGlobalStateMap(acquiredGlobalStateForPartitionCreation.get().getPartitionProgressState());
LOG.info("Partition owner {} did not acquire any partitions. Running partition creation supplier to create more partitions", ownerId);
createPartitions(partitionCreationSupplier.apply(globalStateMap));
partitionCreationSupplierInvocationsCounter.increment();
giveUpAndSaveGlobalStateForPartitionCreation(acquiredGlobalStateForPartitionCreation.get(), globalStateMap);
}

giveUpAndSaveGlobalStateForPartitionCreation(acquiredGlobalStateForPartitionCreation.get(), globalStateMap);
ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);
}
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}

ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);
}

if (ownedPartitions.isEmpty()) {
Expand All @@ -175,7 +183,6 @@ public Optional<SourcePartition<T>> getNextPartition(final Function<Map<String,
.withPartitionClosedCount(ownedPartitions.get().getClosedCount())
.build();

partitionManager.setActivePartition(sourcePartition);

LOG.debug("Partition key {} was acquired by owner {}", sourcePartition.getPartitionKey(), ownerId);
partitionsAcquiredCounter.increment();
Expand Down Expand Up @@ -225,10 +232,6 @@ public void completePartition(final String partitionKey, final Boolean fromAckno
throw e;
}

if (!fromAcknowledgmentsCallback) {
partitionManager.removeActivePartition();
}

LOG.info("Partition key {} was completed by owner {}.", partitionKey, ownerId);
partitionsCompletedCounter.increment();
}
Expand Down Expand Up @@ -269,10 +272,6 @@ public void closePartition(final String partitionKey, final Duration reopenAfter
partitionsClosedCounter.increment();
}

if (!fromAcknowledgmentsCallback) {
partitionManager.removeActivePartition();
}

LOG.info("Partition key {} was closed by owner {}. The resulting status of the partition is now {}", partitionKey, ownerId, itemToUpdate.getSourcePartitionStatus());
}

Expand Down Expand Up @@ -309,40 +308,46 @@ public void updatePartitionForAcknowledgmentWait(final String partitionKey, fina
itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(ackowledgmentTimeout));

sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate);

partitionManager.removeActivePartition();
}

@Override
public void giveUpPartitions() {

if (!initialized) {
return;
}

final Optional<SourcePartition<T>> activePartition = partitionManager.getActivePartition();
if (activePartition.isPresent()) {
final Optional<SourcePartitionStoreItem> optionalItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, activePartition.get().getPartitionKey());
if (optionalItem.isPresent()) {
final SourcePartitionStoreItem updateItem = optionalItem.get();
validatePartitionOwnership(updateItem);

updateItem.setSourcePartitionStatus(SourcePartitionStatus.UNASSIGNED);
updateItem.setPartitionOwner(null);
updateItem.setPartitionOwnershipTimeout(null);

try {
sourceCoordinationStore.tryUpdateSourcePartitionItem(updateItem);
} catch (final PartitionUpdateException e) {
LOG.info("Unable to explicitly give up partition {}. Partition can be considered given up.", updateItem.getSourcePartitionKey());
}
giveUpPartitions(activePartition.get());
}

}

@Override
public void giveUpPartitions(SourcePartition<T> sourcePartition) {

LOG.info("Partition key {} was given up by owner {}", updateItem.getSourcePartitionKey(), ownerId);
if (!initialized) {
return;
}

final Optional<SourcePartitionStoreItem> optionalItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, sourcePartition.getPartitionKey());
if (optionalItem.isPresent()) {
final SourcePartitionStoreItem updateItem = optionalItem.get();
validatePartitionOwnership(updateItem);

updateItem.setSourcePartitionStatus(SourcePartitionStatus.UNASSIGNED);
updateItem.setPartitionOwner(null);
updateItem.setPartitionOwnershipTimeout(null);

try {
sourceCoordinationStore.tryUpdateSourcePartitionItem(updateItem);
} catch (final PartitionUpdateException e) {
LOG.info("Unable to explicitly give up partition {}. Partition can be considered given up.", updateItem.getSourcePartitionKey());
}
partitionManager.removeActivePartition();
partitionsGivenUpCounter.increment();


LOG.info("Partition key {} was given up by owner {}", updateItem.getSourcePartitionKey(), ownerId);
}
partitionsGivenUpCounter.increment();
}

private T convertStringToPartitionProgressStateClass(final String serializedPartitionProgressState) {
Expand Down Expand Up @@ -387,7 +392,6 @@ private boolean isActivelyOwnedPartition(final String partitionKey) {

private void validatePartitionOwnership(final SourcePartitionStoreItem item) {
if (Objects.isNull(item.getPartitionOwner()) || !ownerId.equals(item.getPartitionOwner())) {
partitionManager.removeActivePartition();
partitionNotOwnedErrorCounter.increment();
throw new PartitionNotOwnedException(String.format("The partition is no longer owned by this instance of Data Prepper. " +
"The partition ownership timeout most likely expired and was grabbed by another instance of Data Prepper for partition owner %s and partition key %s.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,27 @@
package org.opensearch.dataprepper.plugins.source.s3;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOptions;
import org.opensearch.dataprepper.plugins.source.s3.ownership.BucketOwnerProvider;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

/**
* Class responsible for taking an {@link S3SourceConfig} and creating all the necessary {@link ScanOptions}
* objects and spawn a thread {@link S3SelectObjectWorker}
*/
public class S3ScanService {
static final long SHUTDOWN_TIMEOUT = 30L;

private final S3SourceConfig s3SourceConfig;
private final List<S3ScanBucketOptions> s3ScanBucketOptions;
private final S3ClientBuilderFactory s3ClientBuilderFactory;
Expand All @@ -36,7 +42,10 @@ public class S3ScanService {
private final AcknowledgementSetManager acknowledgementSetManager;
private final S3ObjectDeleteWorker s3ObjectDeleteWorker;
private final PluginMetrics pluginMetrics;
private ScanObjectWorker scanObjectWorker;
//private ScanObjectWorker scanObjectWorker;
private final ExecutorService executorService;
static final int STANDARD_BACKOFF_MILLIS = 30_000;
static final int FAST_BACKOFF_MILLIS = 100;

public S3ScanService(final S3SourceConfig s3SourceConfig,
final S3ClientBuilderFactory s3ClientBuilderFactory,
Expand All @@ -58,17 +67,30 @@ public S3ScanService(final S3SourceConfig s3SourceConfig,
this.acknowledgementSetManager = acknowledgementSetManager;
this.s3ObjectDeleteWorker = s3ObjectDeleteWorker;
this.pluginMetrics = pluginMetrics;
this.executorService = Executors.newFixedThreadPool(s3SourceConfig.getNumWorkers(), BackgroundThreadFactory.defaultExecutorThreadFactory("s3-source-scan"));
}

public void start() {
scanObjectWorker = new ScanObjectWorker(s3ClientBuilderFactory.getS3Client(),
getScanOptions(),s3ObjectHandler,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics);
scanObjectWorkerThread = new Thread(scanObjectWorker);
scanObjectWorkerThread.start();
int backOffMs = s3SourceConfig.getNumWorkers() > 1 ? FAST_BACKOFF_MILLIS : STANDARD_BACKOFF_MILLIS;
for (int i = 0; i < s3SourceConfig.getNumWorkers(); i++) {
ScanObjectWorker scanObjectWorker = new ScanObjectWorker(s3ClientBuilderFactory.getS3Client(),
getScanOptions(),s3ObjectHandler,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, backOffMs, pluginMetrics);
executorService.submit(new Thread(scanObjectWorker));
}
}

public void stop() {
scanObjectWorker.stop();
executorService.shutdown();
try {
if (!executorService.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
if (e.getCause() instanceof InterruptedException) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class ScanObjectWorker implements Runnable{

private static final Logger LOG = LoggerFactory.getLogger(ScanObjectWorker.class);

private static final int STANDARD_BACKOFF_MILLIS = 30_000;
private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000;

static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2);
Expand Down Expand Up @@ -67,6 +66,7 @@ public class ScanObjectWorker implements Runnable{
private final S3ObjectDeleteWorker s3ObjectDeleteWorker;
private final PluginMetrics pluginMetrics;
private final Counter acknowledgementSetCallbackCounter;
private final int backOffMs;

public ScanObjectWorker(final S3Client s3Client,
final List<ScanOptions> scanOptionsBuilderList,
Expand All @@ -76,8 +76,10 @@ public ScanObjectWorker(final S3Client s3Client,
final S3SourceConfig s3SourceConfig,
final AcknowledgementSetManager acknowledgementSetManager,
final S3ObjectDeleteWorker s3ObjectDeleteWorker,
final int backOffMs,
final PluginMetrics pluginMetrics){
this.s3Client = s3Client;
this.backOffMs = backOffMs;
this.scanOptionsBuilderList = scanOptionsBuilderList;
this.s3ObjectHandler= s3ObjectHandler;
this.bucketOwnerProvider = bucketOwnerProvider;
Expand All @@ -98,7 +100,7 @@ public ScanObjectWorker(final S3Client s3Client,
public void run() {
while (!isStopped) {
try {
startProcessingObject(STANDARD_BACKOFF_MILLIS);
startProcessingObject(backOffMs);
} catch (final Exception e) {
LOG.error("Received an exception while processing S3 objects, backing off and retrying", e);
try {
Expand Down Expand Up @@ -166,7 +168,7 @@ private void startProcessingObject(final int waitTimeMillis) {
sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey(), false);
} catch (final PartitionNotOwnedException | PartitionNotFoundException | PartitionUpdateException e) {
LOG.warn("S3 scan object worker received an exception from the source coordinator. There is a potential for duplicate data from {}, giving up partition and getting next partition: {}", objectKey, e.getMessage());
sourceCoordinator.giveUpPartitions();
sourceCoordinator.giveUpPartitions(objectToProcess.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private ScanObjectWorker createObjectUnderTest() {
when(s3SourceConfig.getS3ScanScanOptions()).thenReturn(s3ScanScanOptions);
when(pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME)).thenReturn(counter);
final ScanObjectWorker objectUnderTest = new ScanObjectWorker(s3Client, scanOptionsList, s3ObjectHandler, bucketOwnerProvider,
sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics);
sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, S3ScanService.STANDARD_BACKOFF_MILLIS, pluginMetrics);
verify(sourceCoordinator).initialize();
return objectUnderTest;
}
Expand All @@ -129,7 +129,7 @@ void giveUpPartitions_is_called_when_a_PartitionException_is_thrown_from_parseS3

final ArgumentCaptor<S3ObjectReference> objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class);
doThrow(exception).when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey));
doNothing().when(sourceCoordinator).giveUpPartitions();
doNothing().when(sourceCoordinator).giveUpPartitions(any());

createObjectUnderTest().runWithoutInfiniteLoop();

Expand Down
Loading

0 comments on commit da4f0a5

Please sign in to comment.