Skip to content

Commit

Permalink
Add configurable initial position for orphaned stream (#853)
Browse files Browse the repository at this point in the history
* Add default `orphanedStreamInitialPositionInStream` in `MultiStreamTracker`, so a custom initial position can be passed to Scheduler to initialize the orphaned stream's config
* Renamed `getDefaultStreamConfig` to `getOrphanedStreamConfig`
* Refactored `SchedulerTest` setup and implement `TestMultiStreamTracker` to test `MultiStreamTracker` interface default methods. Note that this is a workaround for using mockito 1.x to test default interface methods. mockito >= 2.7.13 supports Spy on interface directly, which can be used to test default methods without implementing a concrete class. However, mockito 2.x has a number of breaking changes, so future work will be needed to refactor unit tests and upgrade to mockito >= 2.7.13

Signed-off-by: Rex Chen <shuningc@amazon.com>
  • Loading branch information
rexcsn authored Aug 26, 2021
1 parent 36ff9dc commit e73a8a9
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public class Scheduler implements Runnable {
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
private MultiStreamTracker multiStreamTracker;
private FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy;
private InitialPositionInStreamExtended orphanedStreamInitialPositionInStream;
private final long listShardsBackoffTimeMillis;
private final int maxListShardsRetryAttempts;
private final LeaseRefresher leaseRefresher;
Expand Down Expand Up @@ -231,6 +232,7 @@ protected Scheduler(@NonNull final CheckpointConfig checkpointConfig,
multiStreamTracker -> {
this.multiStreamTracker = multiStreamTracker;
this.formerStreamsLeasesDeletionStrategy = multiStreamTracker.formerStreamsLeasesDeletionStrategy();
this.orphanedStreamInitialPositionInStream = multiStreamTracker.orphanedStreamInitialPositionInStream();
return multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc));
},
Expand Down Expand Up @@ -512,7 +514,7 @@ Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
// Worker 2 : BOOTS_UP -> A,B,C (stale)
//
// The following streams transition state among two workers are NOT considered safe, where Worker 2 might
// end up deleting the leases for A and D and loose progress made so far.
// end up deleting the leases for A and D and lose progress made so far.
// Worker 1 : A,B,C -> A,B,C,D (latest)
// Worker 2 : A,B,C -> B,C (stale/partial)
//
Expand Down Expand Up @@ -588,13 +590,13 @@ Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
(streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS);
}

private void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) {
@VisibleForTesting void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) {
final Set<StreamIdentifier> streamIdentifiers = leases.stream()
.map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier()))
.collect(Collectors.toSet());
for (StreamIdentifier streamIdentifier : streamIdentifiers) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
currentStreamConfigMap.put(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
currentStreamConfigMap.put(streamIdentifier, getOrphanedStreamConfig(streamIdentifier));
}
}
}
Expand Down Expand Up @@ -653,9 +655,9 @@ private boolean deleteMultiStreamLeases(List<MultiStreamLease> leases) {
return true;
}

// When a stream is no longer needed to be tracked, return a default StreamConfig with LATEST for faster shard end.
private StreamConfig getDefaultStreamConfig(StreamIdentifier streamIdentifier) {
return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
// Generate default StreamConfig for an "orphaned" stream that is in the lease table but not tracked
private StreamConfig getOrphanedStreamConfig(StreamIdentifier streamIdentifier) {
return new StreamConfig(streamIdentifier, orphanedStreamInitialPositionInStream);
}

/**
Expand Down Expand Up @@ -917,7 +919,7 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
// Irrespective of single stream app or multi stream app, streamConfig should always be available.
// If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config
// to gracefully complete the reading.
final StreamConfig streamConfig = currentStreamConfigMap.getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
final StreamConfig streamConfig = currentStreamConfigMap.getOrDefault(streamIdentifier, getOrphanedStreamConfig(streamIdentifier));
Validate.notNull(streamConfig, "StreamConfig should not be null");
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory);
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.kinesis.processor;

import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;

import java.util.List;
Expand Down Expand Up @@ -42,4 +44,14 @@ public interface MultiStreamTracker {
* @return StreamsLeasesDeletionStrategy
*/
FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy();

/**
* The position for getting records from an "orphaned" stream that is in the lease table but not tracked
* Default assumes that the stream no longer need to be tracked, so use LATEST for faster shard end.
*
* <p>Default value: {@link InitialPositionInStream#LATEST}</p>
*/
default InitialPositionInStreamExtended orphanedStreamInitialPositionInStream() {
return InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
Expand All @@ -53,12 +54,14 @@
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import io.reactivex.plugins.RxJavaPlugins;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;

import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
Expand All @@ -79,6 +82,7 @@
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseManagementFactory;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
Expand All @@ -97,6 +101,7 @@
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
Expand Down Expand Up @@ -152,8 +157,8 @@ public class SchedulerTest {
private Checkpointer checkpoint;
@Mock
private WorkerStateChangeListener workerStateChangeListener;
@Mock
private MultiStreamTracker multiStreamTracker;
@Spy
private TestMultiStreamTracker multiStreamTracker;
@Mock
private LeaseCleanupManager leaseCleanupManager;

Expand All @@ -175,25 +180,6 @@ public void setup() {
processorConfig = new ProcessorConfig(shardRecordProcessorFactory);
retrievalConfig = new RetrievalConfig(kinesisClient, streamName, applicationName)
.retrievalFactory(retrievalFactory);

final List<StreamConfig> streamConfigList = new ArrayList<StreamConfig>() {{
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream2:2"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream2:3"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
}};

when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy())
.thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ZERO;
}
});
when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher);
when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector);
when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(new HierarchicalShardSyncer());
Expand Down Expand Up @@ -460,6 +446,75 @@ public final void testMultiStreamOnlyNewStreamsAreSynced()
Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
}

@Test
public final void testMultiStreamSyncFromTableDefaultInitPos()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
// Streams in lease table but not tracked by multiStreamTracker
List<MultiStreamLease> leasesInTable = IntStream.range(1, 3).mapToObj(streamId -> new MultiStreamLease()
.streamIdentifier(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))
.shardId("some_random_shard_id"))
.collect(Collectors.toCollection(LinkedList::new));
// Include a stream that is already tracked by multiStreamTracker, just to make sure we will not touch this stream config later
leasesInTable.add(new MultiStreamLease().streamIdentifier("acc1:stream1:1").shardId("some_random_shard_id"));

// Expected StreamConfig after running syncStreamsFromLeaseTableOnAppInit
// By default, Stream not present in multiStreamTracker will have initial position of LATEST
List<StreamConfig> expectedConfig = IntStream.range(1, 3).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
// Include default configs
expectedConfig.addAll(multiStreamTracker.streamConfigList());

retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable);
Map<StreamIdentifier, StreamConfig> expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap(
sc -> sc.streamIdentifier(), sc -> sc));
Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap());
}

@Test
public final void testMultiStreamSyncFromTableCustomInitPos()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
Date testTimeStamp = new Date();

// Streams in lease table but not tracked by multiStreamTracker
List<MultiStreamLease> leasesInTable = IntStream.range(1, 3).mapToObj(streamId -> new MultiStreamLease()
.streamIdentifier(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))
.shardId("some_random_shard_id"))
.collect(Collectors.toCollection(LinkedList::new));
// Include a stream that is already tracked by multiStreamTracker, just to make sure we will not touch this stream config later
leasesInTable.add(new MultiStreamLease().streamIdentifier("acc1:stream1:1").shardId("some_random_shard_id"));

// Expected StreamConfig after running syncStreamsFromLeaseTableOnAppInit
// Stream not present in multiStreamTracker will have initial position specified by orphanedStreamInitialPositionInStream
List<StreamConfig> expectedConfig = IntStream.range(1, 3).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(testTimeStamp)))
.collect(Collectors.toCollection(LinkedList::new));
// Include default configs
expectedConfig.addAll(multiStreamTracker.streamConfigList());

// Mock a specific orphanedStreamInitialPositionInStream specified in multiStreamTracker
when(multiStreamTracker.orphanedStreamInitialPositionInStream()).thenReturn(
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(testTimeStamp));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable);
Map<StreamIdentifier, StreamConfig> expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap(
sc -> sc.streamIdentifier(), sc -> sc));
Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap());
}

@Test
public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyAutoDeletionStrategy()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
Expand Down Expand Up @@ -1111,4 +1166,32 @@ public Checkpointer createCheckpointer(final LeaseCoordinator leaseCoordinator,
}
}

// TODO: Upgrade to mockito >= 2.7.13, and use Spy on MultiStreamTracker to directly access the default methods without implementing TestMultiStreamTracker class
@NoArgsConstructor
private class TestMultiStreamTracker implements MultiStreamTracker {
@Override
public List<StreamConfig> streamConfigList(){
return new ArrayList<StreamConfig>() {{
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc1:stream2:2"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream1:1"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
add(new StreamConfig(StreamIdentifier.multiStreamInstance("acc2:stream2:3"), InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST)));
}};
}

@Override
public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy(){
return new AutoDetectionAndDeferredDeletionStrategy() {
@Override
public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ZERO;
}
};
}
}

}

0 comments on commit e73a8a9

Please sign in to comment.