Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add folder-based partitioning for s3 scan source #4455

Merged
merged 3 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -64,4 +65,8 @@ boolean tryCreatePartitionItem(final String sourceIdentifier,
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully
*/
void tryUpdateSourcePartitionItem(final SourcePartitionStoreItem updateItem);

void tryUpdateSourcePartitionItem(final SourcePartitionStoreItem updateItem, final Instant priorityForUnassignedPartitions);

void tryDeletePartitionItem(final SourcePartitionStoreItem deleteItem);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,17 @@ public PartitionIdentifier build() {
return new PartitionIdentifier(this);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PartitionIdentifier that = (PartitionIdentifier) o;
return Objects.equals(partitionKey, that.partitionKey);
}

@Override
public int hashCode() {
return Objects.hash(partitionKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.model.source.coordinator;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -43,6 +44,25 @@ public interface SourceCoordinator<T> {
*/
Optional<SourcePartition<T>> getNextPartition(final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier);


/**
* This should be called by the source when it needs to get the next partition it should process on.
* This method will attempt to acquire a partition for this instance of the source to work on, and if no partition is acquired, the partitionCreatorSupplier will be called to potentially create
* new partitions. This method will then attempt to acquire a partition again after creating new partitions from the supplier, and will return the result of that attempt whether a partition was
* acquired successfully or not. It is recommended to backoff and retry at the source level when this method returns an empty Optional.
* @param partitionCreationSupplier - The Function that will provide partitions. This supplier will be called by the SourceCoordinator when no partitions are acquired
* for this instance of Data Prepper. This function will be passed a global state map, which will be empty until it is used and modified in the supplier function passed.
* If the global state map is not needed, then it can be ignored. Updating the global state map will save it, so the next time the supplier function is run,
* it will contain the most recent state from the previous supplier function run.
* @param forceSupplier - If set to true, the SourceCoordinator will check if the supplier has been run in the last X minutes, and if it has not, will force it to run
* @return {@link SourcePartition} with the details about how to process this partition. Will repeatedly return the partition until
* {@link SourceCoordinator#completePartition(String, Boolean)}
* or {@link SourceCoordinator#closePartition(String, Duration, int, Boolean)} are called by the source,
* or until the partition ownership times out.
* @since 2.2
*/
Optional<SourcePartition<T>> getNextPartition(final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier, final boolean forceSupplier);

/**
* Should be called by the source when it has fully processed a given partition
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException if the partition key could not be found in the distributed store
Expand Down Expand Up @@ -99,6 +119,15 @@ public interface SourceCoordinator<T> {
*/
void giveUpPartition(String partitionKey);

/**
* Should be called by the source when it is shutting down to indicate that it will no longer be able to perform work on partitions,
* or can be called to give up ownership of its partitions in order to pick up new ones with {@link #getNextPartition(Function)} ()}.
* @param priorityTimestamp - A timestamp that will determine the order that UNASSIGNED partitions are acquired after they are given up.
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException if the partition could not be given up due to some failure
* @since 2.8
*/
void giveUpPartition(final String partitionKey, final Instant priorityTimestamp);


/**
* Should be called by the source after when acknowledgments are enabled to keep ownership of the partition for acknowledgmentTimeout amount of time
Expand All @@ -108,4 +137,6 @@ public interface SourceCoordinator<T> {
* can pick it up for processing
*/
void updatePartitionForAcknowledgmentWait(final String partitionKey, final Duration ackowledgmentTimeout);

void deletePartition(final String partitionKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

public class PartitionIdentifierTest {

Expand All @@ -22,4 +23,24 @@ void testPartitionIdentifierBuilder() {

assertThat(partitionIdentifier.getPartitionKey(), equalTo(partitionKey));
}

@Test
void testPartitionIdentifierEquals_and_hashCode() {
final String partitionKey = UUID.randomUUID().toString();
final String differentPartitionKey = UUID.randomUUID().toString();

final PartitionIdentifier partitionIdentifier = PartitionIdentifier.builder().withPartitionKey(partitionKey).build();
final PartitionIdentifier equalPartitionIdentifier = PartitionIdentifier.builder().withPartitionKey(partitionKey).build();

assertThat(partitionIdentifier.equals(equalPartitionIdentifier), equalTo(true));
assertThat(partitionIdentifier.hashCode(), equalTo(equalPartitionIdentifier.hashCode()));

final PartitionIdentifier notEqualPartitionIdentifier = PartitionIdentifier.builder().withPartitionKey(differentPartitionKey).build();
assertThat(partitionIdentifier.equals(notEqualPartitionIdentifier), equalTo(false));
assertNotEquals(partitionIdentifier.hashCode(), notEqualPartitionIdentifier.hashCode());

assertThat(partitionIdentifier.equals(partitionIdentifier), equalTo(true));
assertThat(partitionIdentifier.equals(null), equalTo(false));

}
}
1 change: 1 addition & 0 deletions data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies {
testImplementation testLibs.mockito.inline
testImplementation libs.commons.lang3
testImplementation project(':data-prepper-test-event')
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-api').sourceSets.test.output
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {

Expand All @@ -51,6 +51,8 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
static final String PARTITIONS_ACQUIRED_COUNT = "partitionsAcquired";
static final String PARTITIONS_COMPLETED_COUNT = "partitionsCompleted";
static final String PARTITIONS_CLOSED_COUNT = "partitionsClosed";

static final String PARTITIONS_DELETED = "partitionsDeleted";
static final String SAVE_PROGRESS_STATE_INVOCATION_SUCCESS_COUNT = "savePartitionProgressStateInvocationsSuccess";
static final String PARTITION_OWNERSHIP_GIVEN_UP_COUNT = "partitionsGivenUp";

Expand Down Expand Up @@ -89,8 +91,14 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
private final Counter saveStatePartitionUpdateErrorCounter;
private final Counter closePartitionUpdateErrorCounter;
private final Counter completePartitionUpdateErrorCounter;

private final Counter partitionsDeleted;
private final ReentrantLock lock;

private Instant lastSupplierRunTime;

static final Duration FORCE_SUPPLIER_AFTER_DURATION = Duration.ofMinutes(5);

static {
try {
hostName = InetAddress.getLocalHost().getHostName();
Expand Down Expand Up @@ -127,7 +135,9 @@ public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass,
this.saveStatePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, SAVE_STATE_ACTION);
this.closePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, CLOSE_ACTION);
this.completePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, COMPLETE_ACTION);
this.partitionsDeleted = pluginMetrics.counter(PARTITIONS_DELETED);
this.lock = new ReentrantLock();
this.lastSupplierRunTime = Instant.now();
}

@Override
Expand All @@ -139,11 +149,25 @@ public void initialize() {

@Override
public Optional<SourcePartition<T>> getNextPartition(final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier) {
return getNextPartitionInternal(partitionCreationSupplier, false);
}

@Override
public Optional<SourcePartition<T>> getNextPartition(final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier, final boolean forceSupplierEnabled) {
if (forceSupplierEnabled && Instant.now().isAfter(lastSupplierRunTime.plus(FORCE_SUPPLIER_AFTER_DURATION))) {
return getNextPartitionInternal(partitionCreationSupplier, true);
}

return getNextPartitionInternal(partitionCreationSupplier, false);
}

private Optional<SourcePartition<T>> getNextPartitionInternal(final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier, final boolean forceSupplier) {
validateIsInitialized();

Optional<SourcePartitionStoreItem> ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);
try {
if (ownedPartitions.isEmpty() && lock.tryLock()) {
if ((ownedPartitions.isEmpty() || forceSupplier) && lock.tryLock()) {
lastSupplierRunTime = Instant.now();
final Optional<SourcePartitionStoreItem> acquiredGlobalStateForPartitionCreation = acquireGlobalStateForPartitionCreation();
if (acquiredGlobalStateForPartitionCreation.isPresent()) {
final Map<String, Object> globalStateMap = convertStringToGlobalStateMap(acquiredGlobalStateForPartitionCreation.get().getPartitionProgressState());
Expand Down Expand Up @@ -302,8 +326,16 @@ public void updatePartitionForAcknowledgmentWait(final String partitionKey, fina
}

@Override
public void giveUpPartition(String partitionKey) {
public void giveUpPartition(final String partitionKey) {
giveUpPartitionInternal(partitionKey, null);
}

@Override
public void giveUpPartition(final String partitionKey, final Instant priorityTimestamp) {
giveUpPartitionInternal(partitionKey, priorityTimestamp);
}

private void giveUpPartitionInternal(final String partitionKey, final Instant priorityTimestamp) {
if (!initialized) {
return;
}
Expand All @@ -318,7 +350,7 @@ public void giveUpPartition(String partitionKey) {
updateItem.setPartitionOwnershipTimeout(null);

try {
sourceCoordinationStore.tryUpdateSourcePartitionItem(updateItem);
sourceCoordinationStore.tryUpdateSourcePartitionItem(updateItem, priorityTimestamp);
} catch (final PartitionUpdateException e) {
LOG.info("Unable to explicitly give up partition {}. Partition can be considered given up.", updateItem.getSourcePartitionKey());
}
Expand All @@ -329,6 +361,25 @@ public void giveUpPartition(String partitionKey) {
partitionsGivenUpCounter.increment();
}

@Override
public void deletePartition(final String partitionKey) {
final Optional<SourcePartitionStoreItem> optionalItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionKey);
if (optionalItem.isPresent()) {
final SourcePartitionStoreItem deleteItem = optionalItem.get();
validatePartitionOwnership(deleteItem);

try {
sourceCoordinationStore.tryDeletePartitionItem(deleteItem);
} catch (final PartitionUpdateException e) {
LOG.info("Unable to delete partition {}: {}.", deleteItem.getSourcePartitionKey(), e.getMessage());
return;
}

partitionsDeleted.increment();
LOG.info("Partition key {} was deleted by owner {}", deleteItem.getSourcePartitionKey(), ownerId);
}
}

private T convertStringToPartitionProgressStateClass(final String serializedPartitionProgressState) {
if (Objects.isNull(serializedPartitionProgressState)) {
return null;
Expand Down
Loading
Loading