Skip to content

Commit

Permalink
Added support for multiple workers in S3 Scan Source (#4439)
Browse files Browse the repository at this point in the history
* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Fixed failing integration tests

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Fixed failing check style

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
kkondaka and Krishna Kondaka authored Apr 23, 2024
1 parent 250531d commit 3cc41b7
Show file tree
Hide file tree
Showing 17 changed files with 138 additions and 298 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,13 @@ public interface SourceCoordinator<T> {

/**
* Should be called by the source when it is shutting down to indicate that it will no longer be able to perform work on partitions,
* or can be called to give up ownership of its partitions in order to pick up new ones with {@link #getNextPartition(Function)} ()}.
* or can be called to give up ownership of a partition in order to pick up new ones with {@link #getNextPartition(Function)} ()}.
* This is used when source coordinator is shared by multiple threads.
* @param partitionKey partition to giveup ownership
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException if the partition could not be given up due to some failure
* @since 2.2
* @since 2.8
*/
void giveUpPartitions();
void giveUpPartition(String partitionKey);


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.concurrent.locks.ReentrantLock;

public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {

Expand Down Expand Up @@ -66,7 +67,6 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {

private final SourceCoordinationConfig sourceCoordinationConfig;
private final SourceCoordinationStore sourceCoordinationStore;
private final PartitionManager<T> partitionManager;

private final Class<T> partitionProgressStateClass;
private final String ownerId;
Expand All @@ -89,6 +89,7 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
private final Counter saveStatePartitionUpdateErrorCounter;
private final Counter closePartitionUpdateErrorCounter;
private final Counter completePartitionUpdateErrorCounter;
private final ReentrantLock lock;

static {
try {
Expand All @@ -101,13 +102,11 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass,
final SourceCoordinationStore sourceCoordinationStore,
final SourceCoordinationConfig sourceCoordinationConfig,
final PartitionManager<T> partitionManager,
final String sourceIdentifier,
final PluginMetrics pluginMetrics) {
this.sourceCoordinationConfig = sourceCoordinationConfig;
this.sourceCoordinationStore = sourceCoordinationStore;
this.partitionProgressStateClass = partitionProgressStateClass;
this.partitionManager = partitionManager;
this.sourceIdentifier = Objects.nonNull(sourceCoordinationConfig.getPartitionPrefix()) ?
sourceCoordinationConfig.getPartitionPrefix() + "|" + sourceIdentifier :
sourceIdentifier;
Expand All @@ -128,6 +127,7 @@ public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass,
this.saveStatePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, SAVE_STATE_ACTION);
this.closePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, CLOSE_ACTION);
this.completePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, COMPLETE_ACTION);
this.lock = new ReentrantLock();
}

@Override
Expand All @@ -141,26 +141,25 @@ public void initialize() {
public Optional<SourcePartition<T>> getNextPartition(final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier) {
validateIsInitialized();

if (partitionManager.getActivePartition().isPresent()) {
return partitionManager.getActivePartition();
}

Optional<SourcePartitionStoreItem> ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);

if (ownedPartitions.isEmpty()) {

final Optional<SourcePartitionStoreItem> acquiredGlobalStateForPartitionCreation = acquireGlobalStateForPartitionCreation();

if (acquiredGlobalStateForPartitionCreation.isPresent()) {
final Map<String, Object> globalStateMap = convertStringToGlobalStateMap(acquiredGlobalStateForPartitionCreation.get().getPartitionProgressState());
LOG.info("Partition owner {} did not acquire any partitions. Running partition creation supplier to create more partitions", ownerId);
createPartitions(partitionCreationSupplier.apply(globalStateMap));
partitionCreationSupplierInvocationsCounter.increment();

giveUpAndSaveGlobalStateForPartitionCreation(acquiredGlobalStateForPartitionCreation.get(), globalStateMap);
try {
if (ownedPartitions.isEmpty() && lock.tryLock()) {
final Optional<SourcePartitionStoreItem> acquiredGlobalStateForPartitionCreation = acquireGlobalStateForPartitionCreation();
if (acquiredGlobalStateForPartitionCreation.isPresent()) {
final Map<String, Object> globalStateMap = convertStringToGlobalStateMap(acquiredGlobalStateForPartitionCreation.get().getPartitionProgressState());
LOG.info("Partition owner {} did not acquire any partitions. Running partition creation supplier to create more partitions", ownerId);
createPartitions(partitionCreationSupplier.apply(globalStateMap));
partitionCreationSupplierInvocationsCounter.increment();
giveUpAndSaveGlobalStateForPartitionCreation(acquiredGlobalStateForPartitionCreation.get(), globalStateMap);
}
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
if (ownedPartitions.isEmpty()) {
ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);
}

ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);
}

if (ownedPartitions.isEmpty()) {
Expand All @@ -175,7 +174,6 @@ public Optional<SourcePartition<T>> getNextPartition(final Function<Map<String,
.withPartitionClosedCount(ownedPartitions.get().getClosedCount())
.build();

partitionManager.setActivePartition(sourcePartition);

LOG.debug("Partition key {} was acquired by owner {}", sourcePartition.getPartitionKey(), ownerId);
partitionsAcquiredCounter.increment();
Expand Down Expand Up @@ -210,7 +208,7 @@ private void createPartitions(final List<PartitionIdentifier> partitionIdentifie
public void completePartition(final String partitionKey, final Boolean fromAcknowledgmentsCallback) {
validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = getItemWithAction(partitionKey, COMPLETE_ACTION, fromAcknowledgmentsCallback);
final SourcePartitionStoreItem itemToUpdate = getSourcePartitionStoreItem(partitionKey, COMPLETE_ACTION);
validatePartitionOwnership(itemToUpdate);

itemToUpdate.setPartitionOwner(null);
Expand All @@ -225,10 +223,6 @@ public void completePartition(final String partitionKey, final Boolean fromAckno
throw e;
}

if (!fromAcknowledgmentsCallback) {
partitionManager.removeActivePartition();
}

LOG.info("Partition key {} was completed by owner {}.", partitionKey, ownerId);
partitionsCompletedCounter.increment();
}
Expand All @@ -237,7 +231,7 @@ public void completePartition(final String partitionKey, final Boolean fromAckno
public void closePartition(final String partitionKey, final Duration reopenAfter, final int maxClosedCount, final Boolean fromAcknowledgmentsCallback) {
validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = getItemWithAction(partitionKey, CLOSE_ACTION, fromAcknowledgmentsCallback);
final SourcePartitionStoreItem itemToUpdate = getSourcePartitionStoreItem(partitionKey, CLOSE_ACTION);
validatePartitionOwnership(itemToUpdate);

itemToUpdate.setPartitionOwner(null);
Expand Down Expand Up @@ -269,18 +263,14 @@ public void closePartition(final String partitionKey, final Duration reopenAfter
partitionsClosedCounter.increment();
}

if (!fromAcknowledgmentsCallback) {
partitionManager.removeActivePartition();
}

LOG.info("Partition key {} was closed by owner {}. The resulting status of the partition is now {}", partitionKey, ownerId, itemToUpdate.getSourcePartitionStatus());
}

@Override
public <S extends T> void saveProgressStateForPartition(final String partitionKey, final S partitionProgressState) {
validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, SAVE_STATE_ACTION);
final SourcePartitionStoreItem itemToUpdate = getSourcePartitionStoreItem(partitionKey, SAVE_STATE_ACTION);
validatePartitionOwnership(itemToUpdate);

itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(DEFAULT_LEASE_TIMEOUT));
Expand All @@ -303,46 +293,40 @@ public <S extends T> void saveProgressStateForPartition(final String partitionKe
public void updatePartitionForAcknowledgmentWait(final String partitionKey, final Duration ackowledgmentTimeout) {
validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "update for ack wait");
final SourcePartitionStoreItem itemToUpdate = getSourcePartitionStoreItem(partitionKey, "update for ack wait");
validatePartitionOwnership(itemToUpdate);

itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(ackowledgmentTimeout));

sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate);

partitionManager.removeActivePartition();
}

@Override
public void giveUpPartitions() {
public void giveUpPartition(String partitionKey) {

if (!initialized) {
return;
}

final Optional<SourcePartition<T>> activePartition = partitionManager.getActivePartition();
if (activePartition.isPresent()) {
final Optional<SourcePartitionStoreItem> optionalItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, activePartition.get().getPartitionKey());
if (optionalItem.isPresent()) {
final SourcePartitionStoreItem updateItem = optionalItem.get();
validatePartitionOwnership(updateItem);

updateItem.setSourcePartitionStatus(SourcePartitionStatus.UNASSIGNED);
updateItem.setPartitionOwner(null);
updateItem.setPartitionOwnershipTimeout(null);

try {
sourceCoordinationStore.tryUpdateSourcePartitionItem(updateItem);
} catch (final PartitionUpdateException e) {
LOG.info("Unable to explicitly give up partition {}. Partition can be considered given up.", updateItem.getSourcePartitionKey());
}
final Optional<SourcePartitionStoreItem> optionalItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionKey);
if (optionalItem.isPresent()) {
final SourcePartitionStoreItem updateItem = optionalItem.get();
validatePartitionOwnership(updateItem);

updateItem.setSourcePartitionStatus(SourcePartitionStatus.UNASSIGNED);
updateItem.setPartitionOwner(null);
updateItem.setPartitionOwnershipTimeout(null);

LOG.info("Partition key {} was given up by owner {}", updateItem.getSourcePartitionKey(), ownerId);
try {
sourceCoordinationStore.tryUpdateSourcePartitionItem(updateItem);
} catch (final PartitionUpdateException e) {
LOG.info("Unable to explicitly give up partition {}. Partition can be considered given up.", updateItem.getSourcePartitionKey());
}
partitionManager.removeActivePartition();
partitionsGivenUpCounter.increment();


LOG.info("Partition key {} was given up by owner {}", updateItem.getSourcePartitionKey(), ownerId);
}
partitionsGivenUpCounter.increment();
}

private T convertStringToPartitionProgressStateClass(final String serializedPartitionProgressState) {
Expand Down Expand Up @@ -380,32 +364,15 @@ private Map<String, Object> convertStringToGlobalStateMap(final String serialize
}
}

private boolean isActivelyOwnedPartition(final String partitionKey) {
final Optional<SourcePartition<T>> activePartition = partitionManager.getActivePartition();
return activePartition.isPresent() && activePartition.get().getPartitionKey().equals(partitionKey);
}

private void validatePartitionOwnership(final SourcePartitionStoreItem item) {
if (Objects.isNull(item.getPartitionOwner()) || !ownerId.equals(item.getPartitionOwner())) {
partitionManager.removeActivePartition();
partitionNotOwnedErrorCounter.increment();
throw new PartitionNotOwnedException(String.format("The partition is no longer owned by this instance of Data Prepper. " +
"The partition ownership timeout most likely expired and was grabbed by another instance of Data Prepper for partition owner %s and partition key %s.",
ownerId, item.getSourcePartitionKey()));
}
}

private SourcePartitionStoreItem validateAndGetSourcePartitionStoreItem(final String partitionKey, final String action) {
if (!isActivelyOwnedPartition(partitionKey)) {
partitionNotOwnedErrorCounter.increment();
throw new PartitionNotOwnedException(
String.format("Unable to %s for the partition because partition key %s is not owned by this instance of Data Prepper for owner %s", action, partitionKey, ownerId)
);
}

return getSourcePartitionStoreItem(partitionKey, action);
}

private SourcePartitionStoreItem getSourcePartitionStoreItem(final String partitionKey, final String action) {
final Optional<SourcePartitionStoreItem> optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionKey);

Expand Down Expand Up @@ -469,9 +436,4 @@ private void giveUpAndSaveGlobalStateForPartitionCreation(final SourcePartitionS
}
}

private SourcePartitionStoreItem getItemWithAction(final String partitionKey, final String action, final Boolean fromAcknowledgmentsCallback) {
// The validation against activePartition in partition manager needs to be skipped when called from acknowledgments callback
// because otherwise it will fail the validation since it is actively working on a different partition when ack is received
return fromAcknowledgmentsCallback ? getSourcePartitionStoreItem(partitionKey, action) : validateAndGetSourcePartitionStoreItem(partitionKey, action);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public <T> SourceCoordinator<T> provideSourceCoordinator(final Class<T> clazz, f

LOG.info("Creating LeaseBasedSourceCoordinator with coordination store {} for sub-pipeline {}",
sourceCoordinationConfig.getSourceCoordinationStoreConfig().getName(), subPipelineName);
return new LeaseBasedSourceCoordinator<T>(clazz, sourceCoordinationStore, sourceCoordinationConfig, new PartitionManager<>(), subPipelineName, sourceCoordinatorMetrics);
return new LeaseBasedSourceCoordinator<T>(clazz, sourceCoordinationStore, sourceCoordinationConfig, subPipelineName, sourceCoordinatorMetrics);
}

public EnhancedSourceCoordinator provideEnhancedSourceCoordinator(final Function<SourcePartitionStoreItem, EnhancedSourcePartition> partitionFactory,
Expand Down
Loading

0 comments on commit 3cc41b7

Please sign in to comment.