Skip to content

Commit

Permalink
Create DynamoDB tables on On-Demand billing mode by default. (#854)
Browse files Browse the repository at this point in the history
* Create DynamoDB tables on On-Demand billing mode by default.

This will enable KCL to create the DynamoDB tables - Lease and
ShardProgress with the On-Demand billing mode instead of provisioned
billing mode.

* Keep previous table creation function for backward compatibility.

* Added unit tests.

* Added more unit tests.
  • Loading branch information
shanmsac authored Oct 6, 2021
1 parent 2447513 commit e81969a
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public class LeaseManagementConfig {

private Duration dynamoDbRequestTimeout = DEFAULT_REQUEST_TIMEOUT;

private BillingMode billingMode = BillingMode.PROVISIONED;
private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;

/**
* Frequency (in millis) of the auditor job to scan for partial leases in the lease table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,32 @@ public interface LeaseRefresher {

/**
* Creates the table that will store leases. Succeeds if table already exists.
*
* Deprecated. Use {@link #createLeaseTableIfNotExists()}.
*
* @param readCapacity
* @param writeCapacity
*
* @return true if we created a new table (table didn't exist before)
*
* @throws ProvisionedThroughputException if we cannot create the lease table due to per-AWS-account capacity
* restrictions.
* @throws DependencyException if DynamoDB createTable fails in an unexpected way
*/
@Deprecated
boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity)
throws ProvisionedThroughputException, DependencyException;

/**
* Creates the table that will store leases. Table is now created in PayPerRequest billing mode by default.
* Succeeds if table already exists.
*
* @return true if we created a new table (table didn't exist before)
*
* @throws ProvisionedThroughputException if we cannot create the lease table due to per-AWS-account capacity
* restrictions.
* @throws DependencyException if DynamoDB createTable fails in an unexpected way
*/
boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity)
boolean createLeaseTableIfNotExists()
throws ProvisionedThroughputException, DependencyException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,9 @@ public void run() {
@Override
public void initialize() throws ProvisionedThroughputException, DependencyException, IllegalStateException {
final boolean newTableCreated =
leaseRefresher.createLeaseTableIfNotExists(initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity);
leaseRefresher.createLeaseTableIfNotExists();
if (newTableCreated) {
log.info("Created new lease table for coordinator with initial read capacity of {} and write capacity of {}.",
initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity);
log.info("Created new lease table for coordinator with pay per request billing mode.");
}
// Need to wait for table in active state.
final long secondsBetweenPolls = 10L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PAY_PER_REQUEST);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dyna
public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer, final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) {
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
this(table, dynamoDBClient, serializer, consistentReads, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PAY_PER_REQUEST);
}

/**
Expand Down Expand Up @@ -162,16 +162,6 @@ public DynamoDBLeaseRefresher(final String table, final DynamoDbAsyncClient dyna
@Override
public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity)
throws ProvisionedThroughputException, DependencyException {
try {
if (tableStatus() != null) {
return newTableCreated;
}
} catch (DependencyException de) {
//
// Something went wrong with DynamoDB
//
log.error("Failed to get table status for {}", table, de);
}
ProvisionedThroughput throughput = ProvisionedThroughput.builder().readCapacityUnits(readCapacity)
.writeCapacityUnits(writeCapacity).build();
final CreateTableRequest request;
Expand All @@ -185,6 +175,34 @@ public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @No
.build();
}

return createTableIfNotExists(request);
}

/**
* {@inheritDoc}
*/
@Override
public boolean createLeaseTableIfNotExists()
throws ProvisionedThroughputException, DependencyException {
final CreateTableRequest request = CreateTableRequest.builder().tableName(table).keySchema(serializer.getKeySchema())
.attributeDefinitions(serializer.getAttributeDefinitions())
.billingMode(billingMode).build();

return createTableIfNotExists(request);
}

private boolean createTableIfNotExists(CreateTableRequest request)
throws ProvisionedThroughputException, DependencyException {
try {
if (tableStatus() != null) {
return newTableCreated;
}
} catch (DependencyException de) {
//
// Something went wrong with DynamoDB
//
log.error("Failed to get table status for {}", table, de);
}

final AWSExceptionManager exceptionManager = createExceptionManager();
exceptionManager.add(ResourceInUseException.class, t -> t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,22 @@ private void throwExceptions(String methodName, ExceptionThrowingLeaseRefresherM

@Override
public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity)
throws ProvisionedThroughputException, DependencyException {
throws ProvisionedThroughputException, DependencyException {
throwExceptions("createLeaseTableIfNotExists",
ExceptionThrowingLeaseRefresherMethods.CREATELEASETABLEIFNOTEXISTS);

return leaseRefresher.createLeaseTableIfNotExists(readCapacity, writeCapacity);
}

@Override
public boolean createLeaseTableIfNotExists()
throws ProvisionedThroughputException, DependencyException {
throwExceptions("createLeaseTableIfNotExists",
ExceptionThrowingLeaseRefresherMethods.CREATELEASETABLEIFNOTEXISTS);

return leaseRefresher.createLeaseTableIfNotExists();
}

@Override
public boolean leaseTableExists() throws DependencyException {
throwExceptions("leaseTableExists", ExceptionThrowingLeaseRefresherMethods.LEASETABLEEXISTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ public static void main(String[] args) throws InterruptedException, DependencyEx
LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", dynamoDBClient,
new DynamoDBLeaseSerializer(), true, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);

if (leaseRefresher.createLeaseTableIfNotExists(INITIAL_LEASE_TABLE_READ_CAPACITY,
INITIAL_LEASE_TABLE_WRITE_CAPACITY)) {
if (leaseRefresher.createLeaseTableIfNotExists()) {
log.info("Waiting for newly created lease table");
if (!leaseRefresher.waitUntilLeaseTableExists(10, 300)) {
log.error("Table was not created in time");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,3 @@ protected DynamoDBLeaseRefresher getLeaseRefresher() {
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected void starting(Description description) {

protected DynamoDBLeaseRefresher getLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, ddbClient, leaseSerializer, true,
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED);
tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PAY_PER_REQUEST);
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package software.amazon.kinesis.leases.dynamodb;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.metrics.MetricsFactory;

import java.util.UUID;

import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class DynamoDBLeaseCoordinatorTest {

private static final String WORKER_ID = UUID.randomUUID().toString();
private static final long LEASE_DURATION_MILLIS = 5000L;
private static final long EPSILON_MILLIS = 25L;
private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1;
private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20;
private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L;
private static final long SECONDS_BETWEEN_POLLS = 10L;
private static final long TIMEOUT_SECONDS = 600L;

@Mock
private LeaseRefresher leaseRefresher;
@Mock
private MetricsFactory metricsFactory;

private DynamoDBLeaseCoordinator leaseCoordinator;

@Before
public void setup() {
this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS,
EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
}

@Test
public void testInitialize_tableCreationSucceeds() throws Exception {
when(leaseRefresher.createLeaseTableIfNotExists()).thenReturn(true);
when(leaseRefresher.waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS)).thenReturn(true);

leaseCoordinator.initialize();

verify(leaseRefresher, times(1)).createLeaseTableIfNotExists();
verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
}

@Test
public void testInitialize_tableCreationFails() throws Exception {
when(leaseRefresher.createLeaseTableIfNotExists()).thenReturn(false);
when(leaseRefresher.waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS)).thenReturn(false);

Assert.assertThrows(DependencyException.class, () -> leaseCoordinator.initialize());
verify(leaseRefresher, times(1)).createLeaseTableIfNotExists();
verify(leaseRefresher, times(1)).waitUntilLeaseTableExists(SECONDS_BETWEEN_POLLS, TIMEOUT_SECONDS);
}
}
Loading

0 comments on commit e81969a

Please sign in to comment.