From e81969af599e7dbdbdcb5b5a9618b2fac57a0fef Mon Sep 17 00:00:00 2001 From: Sachin Sundar Pungampalayam Shanmugasundaram <89495050+shanmsac@users.noreply.github.com> Date: Wed, 6 Oct 2021 15:54:23 -0700 Subject: [PATCH] Create DynamoDB tables on On-Demand billing mode by default. (#854) * Create DynamoDB tables on On-Demand billing mode by default. This will enable KCL to create the DynamoDB tables - Lease and ShardProgress with the On-Demand billing mode instead of provisioned billing mode. * Keep previous table creation function for backward compatibility. * Added unit tests. * Added more unit tests. --- .../kinesis/leases/LeaseManagementConfig.java | 2 +- .../amazon/kinesis/leases/LeaseRefresher.java | 19 +- .../dynamodb/DynamoDBLeaseCoordinator.java | 5 +- .../DynamoDBLeaseManagementFactory.java | 2 +- .../dynamodb/DynamoDBLeaseRefresher.java | 40 ++-- .../ExceptionThrowingLeaseRefresher.java | 11 +- .../leases/LeaseCoordinatorExerciser.java | 3 +- ...tegrationBillingModePayPerRequestTest.java | 1 - .../kinesis/leases/LeaseIntegrationTest.java | 2 +- .../DynamoDBLeaseCoordinatorTest.java | 67 +++++++ .../dynamodb/DynamoDBLeaseRefresherTest.java | 171 +++++++++++++++++- 11 files changed, 294 insertions(+), 29 deletions(-) create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index dd18ec2f5..89e6a3bfb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -196,7 +196,7 @@ public class LeaseManagementConfig { private Duration dynamoDbRequestTimeout = DEFAULT_REQUEST_TIMEOUT; - private BillingMode billingMode = BillingMode.PROVISIONED; + private BillingMode billingMode = BillingMode.PAY_PER_REQUEST; /** * Frequency (in millis) of the auditor job to scan for partial leases in the lease table. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index b7f38a4eb..2fca59c76 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -29,9 +29,24 @@ public interface LeaseRefresher { /** * Creates the table that will store leases. Succeeds if table already exists. - * + * Deprecated. Use {@link #createLeaseTableIfNotExists()}. + * * @param readCapacity * @param writeCapacity + * + * @return true if we created a new table (table didn't exist before) + * + * @throws ProvisionedThroughputException if we cannot create the lease table due to per-AWS-account capacity + * restrictions. + * @throws DependencyException if DynamoDB createTable fails in an unexpected way + */ + @Deprecated + boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity) + throws ProvisionedThroughputException, DependencyException; + + /** + * Creates the table that will store leases. Table is now created in PayPerRequest billing mode by default. + * Succeeds if table already exists. * * @return true if we created a new table (table didn't exist before) * @@ -39,7 +54,7 @@ public interface LeaseRefresher { * restrictions. * @throws DependencyException if DynamoDB createTable fails in an unexpected way */ - boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity) + boolean createLeaseTableIfNotExists() throws ProvisionedThroughputException, DependencyException; /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 78673f669..76f918006 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -211,10 +211,9 @@ public void run() { @Override public void initialize() throws ProvisionedThroughputException, DependencyException, IllegalStateException { final boolean newTableCreated = - leaseRefresher.createLeaseTableIfNotExists(initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity); + leaseRefresher.createLeaseTableIfNotExists(); if (newTableCreated) { - log.info("Created new lease table for coordinator with initial read capacity of {} and write capacity of {}.", - initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity); + log.info("Created new lease table for coordinator with pay per request billing mode."); } // Need to wait for table in active state. final long secondsBetweenPolls = 10L; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 5102bc5ec..ad1a23005 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -285,7 +285,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); + hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PAY_PER_REQUEST); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 8002eacc7..361db9f92 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -130,7 +130,7 @@ public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dyna public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient, final LeaseSerializer serializer, final boolean consistentReads, @NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) { - this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); + this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PAY_PER_REQUEST); } /** @@ -162,16 +162,6 @@ public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dyna @Override public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity) throws ProvisionedThroughputException, DependencyException { - try { - if (tableStatus() != null) { - return newTableCreated; - } - } catch (DependencyException de) { - // - // Something went wrong with DynamoDB - // - log.error("Failed to get table status for {}", table, de); - } ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity) .writeCapacityUnits(writeCapacity).build(); final CreateTableRequest request; @@ -185,6 +175,34 @@ public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @No .build(); } + return createTableIfNotExists(request); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean createLeaseTableIfNotExists() + throws ProvisionedThroughputException, DependencyException { + final CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema()) + .attributeDefinitions(serializer.getAttributeDefinitions()) + .billingMode(billingMode).build(); + + return createTableIfNotExists(request); + } + + private boolean createTableIfNotExists(CreateTableRequest request) + throws ProvisionedThroughputException, DependencyException { + try { + if (tableStatus() != null) { + return newTableCreated; + } + } catch (DependencyException de) { + // + // Something went wrong with DynamoDB + // + log.error("Failed to get table status for {}", table, de); + } final AWSExceptionManager exceptionManager = createExceptionManager(); exceptionManager.add(ResourceInUseException.class, t -> t); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java index 81a49839d..5e612adee 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java @@ -110,13 +110,22 @@ private void throwExceptions(String methodName, ExceptionThrowingLeaseRefresherM @Override public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity) - throws ProvisionedThroughputException, DependencyException { + throws ProvisionedThroughputException, DependencyException { throwExceptions("createLeaseTableIfNotExists", ExceptionThrowingLeaseRefresherMethods.CREATELEASETABLEIFNOTEXISTS); return leaseRefresher.createLeaseTableIfNotExists(readCapacity, writeCapacity); } + @Override + public boolean createLeaseTableIfNotExists() + throws ProvisionedThroughputException, DependencyException { + throwExceptions("createLeaseTableIfNotExists", + ExceptionThrowingLeaseRefresherMethods.CREATELEASETABLEIFNOTEXISTS); + + return leaseRefresher.createLeaseTableIfNotExists(); + } + @Override public boolean leaseTableExists() throws DependencyException { throwExceptions("leaseTableExists", ExceptionThrowingLeaseRefresherMethods.LEASETABLEEXISTS); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java index 2de34649b..186fe290b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java @@ -68,8 +68,7 @@ public static void main(String[] args) throws InterruptedException, DependencyEx LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", dynamoDBClient, new DynamoDBLeaseSerializer(), true, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); - if (leaseRefresher.createLeaseTableIfNotExists(INITIAL_LEASE_TABLE_READ_CAPACITY, - INITIAL_LEASE_TABLE_WRITE_CAPACITY)) { + if (leaseRefresher.createLeaseTableIfNotExists()) { log.info("Waiting for newly created lease table"); if (!leaseRefresher.waitUntilLeaseTableExists(10, 300)) { log.error("Table was not created in time"); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java index dd069b191..9f7735f9e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationBillingModePayPerRequestTest.java @@ -34,4 +34,3 @@ protected DynamoDBLeaseRefresher getLeaseRefresher() { tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST); } } - diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java index 21d4e9323..fd5106e40 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseIntegrationTest.java @@ -74,7 +74,7 @@ protected void starting(Description description) { protected DynamoDBLeaseRefresher getLeaseRefresher() { return new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true, - tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED); + tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java new file mode 100644 index 000000000..caa7a6c7d --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java @@ -0,0 +1,67 @@ +package software.amazon.kinesis.leases.dynamodb; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.leases.LeaseRefresher; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.metrics.MetricsFactory; + +import java.util.UUID; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DynamoDBLeaseCoordinatorTest { + + private static final String WORKER_ID = UUID.randomUUID().toString(); + private static final long LEASE_DURATION_MILLIS = 5000L; + private static final long EPSILON_MILLIS = 25L; + private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; + private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; + private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20; + private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; + private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; + private static final long SECONDS_BETWEEN_POLLS = 10L; + private static final long TIMEOUT_SECONDS = 600L; + + @Mock + private LeaseRefresher leaseRefresher; + @Mock + private MetricsFactory metricsFactory; + + private DynamoDBLeaseCoordinator leaseCoordinator; + + @Before + public void setup() { + this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, + EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT, + INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); + } + + @Test + public void testInitialize_tableCreationSucceeds() throws Exception { + when(leaseRefresher.createLeaseTableIfNotExists()).thenReturn(true); + when(leaseRefresher.waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS)).thenReturn(true); + + leaseCoordinator.initialize(); + + verify(leaseRefresher, times(1)).createLeaseTableIfNotExists(); + verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS); + } + + @Test + public void testInitialize_tableCreationFails() throws Exception { + when(leaseRefresher.createLeaseTableIfNotExists()).thenReturn(false); + when(leaseRefresher.waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS)).thenReturn(false); + + Assert.assertThrows(DependencyException.class, () -> leaseCoordinator.initialize()); + verify(leaseRefresher, times(1)).createLeaseTableIfNotExists(); + verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index 5188c41b3..beed73f2c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -21,6 +21,8 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -33,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -49,10 +52,14 @@ import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse; import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; +import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.LimitExceededException; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.PutItemResponse; +import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; @@ -66,6 +73,8 @@ import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseSerializer; import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; + @RunWith(MockitoJUnitRunner.class) public class DynamoDBLeaseRefresherTest { @@ -99,6 +108,8 @@ public class DynamoDBLeaseRefresherTest { public ExpectedException expectedException = ExpectedException.none(); private DynamoDBLeaseRefresher leaseRefresher; + private DescribeTableRequest describeTableRequest; + private CreateTableRequest createTableRequest; private Map serializedLease; @@ -108,6 +119,13 @@ public void setup() throws Exception { tableCreatorCallback); serializedLease = new HashMap<>(); + describeTableRequest = DescribeTableRequest.builder().tableName(TABLE_NAME).build(); + createTableRequest = CreateTableRequest.builder() + .tableName(TABLE_NAME) + .keySchema(leaseSerializer.getKeySchema()) + .attributeDefinitions(leaseSerializer.getAttributeDefinitions()) + .billingMode(BillingMode.PAY_PER_REQUEST) + .build(); } @Test @@ -265,7 +283,151 @@ public void testLeaseTableExistsTimesOut() throws Exception { } @Test - public void testCreateLeaseTableTimesOut() throws Exception { + public void testCreateLeaseTableProvisionedBillingModeIfNotExists() throws Exception { + leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS, + tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED); + + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + + final ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(10L) + .writeCapacityUnits(10L).build(); + final CreateTableRequest createTableRequest = CreateTableRequest.builder() + .tableName(TABLE_NAME) + .keySchema(leaseSerializer.getKeySchema()) + .attributeDefinitions(leaseSerializer.getAttributeDefinitions()) + .provisionedThroughput(throughput) + .build(); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenReturn(null); + + final boolean result = leaseRefresher.createLeaseTableIfNotExists(10L, 10L); + + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + Assert.assertTrue(result); + } + + @Test + public void testCreateLeaseTableIfNotExists() throws Exception { + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenReturn(null); + + final boolean result = leaseRefresher.createLeaseTableIfNotExists(); + + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + Assert.assertTrue(result); + } + + @Test + public void testCreateLeaseTableIfNotExists_throwsDependencyException() throws Exception { + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(new InterruptedException()); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceInUseException.builder().message("Table already exists").build()); + + Assert.assertFalse(leaseRefresher.createLeaseTableIfNotExists()); + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void testCreateLeaseTableIfNotExists_tableAlreadyExists_throwsResourceInUseException_expectFalse() throws Exception { + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceInUseException.builder().message("Table already exists").build()); + + Assert.assertFalse(leaseRefresher.createLeaseTableIfNotExists()); + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void testCreateLeaseTableIfNotExists_throwsLimitExceededException_expectProvisionedThroughputException() throws Exception { + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(LimitExceededException.builder().build()); + + Assert.assertThrows(ProvisionedThroughputException.class, () -> leaseRefresher.createLeaseTableIfNotExists()); + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void testCreateLeaseTableIfNotExists_throwsDynamoDbException_expectDependencyException() throws Exception { + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(DynamoDbException.builder().build()); + + Assert.assertThrows(DependencyException.class, () -> leaseRefresher.createLeaseTableIfNotExists()); + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void testCreateLeaseTableIfNotExists_throwsTimeoutException_expectDependencyException() throws Exception { + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceNotFoundException.builder().message("Table doesn't exist").build()); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(new TimeoutException()); + + Assert.assertThrows(DependencyException.class, () -> leaseRefresher.createLeaseTableIfNotExists()); + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void testCreateLeaseTableProvisionedBillingModeTimesOut() throws Exception { + leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS, + tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED); TimeoutException te = setRuleForDependencyTimeout(); when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture); @@ -279,10 +441,7 @@ public void testCreateLeaseTableTimesOut() throws Exception { } @Test - public void testCreateLeaseTableBillingMode() throws Exception { - leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDbClient, leaseSerializer, CONSISTENT_READS, - tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST); - + public void testCreateLeaseTableTimesOut() throws Exception { TimeoutException te = setRuleForDependencyTimeout(); when(dynamoDbClient.describeTable(any(DescribeTableRequest.class))).thenReturn(mockDescribeTableFuture); @@ -292,7 +451,7 @@ public void testCreateLeaseTableBillingMode() throws Exception { when(dynamoDbClient.createTable(any(CreateTableRequest.class))).thenReturn(mockCreateTableFuture); when(mockCreateTableFuture.get(anyLong(), any())).thenThrow(te); - verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists(10L, 10L)); + verifyCancel(mockCreateTableFuture, () -> leaseRefresher.createLeaseTableIfNotExists()); } @FunctionalInterface