From 5fb878307e28630ae9bfabe804886c6f5c8d8ba1 Mon Sep 17 00:00:00 2001 From: Aravinda Kidambi Srinivasan Date: Thu, 7 Nov 2024 12:15:35 -0800 Subject: [PATCH 1/4] Address github issues 1. Fix transitive dependencies and add a maven plugin to catch these at build time 2. Remove the redundant shutdown of the leaseCoordinatorThreadPool --- amazon-kinesis-client/pom.xml | 104 ++++++++++++++++++ .../dynamodb/DynamoDBLeaseCoordinator.java | 1 - 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 124a23efb..a7eafd544 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -90,6 +90,37 @@ netty-nio-client ${awssdk.version} + + software.amazon.awssdk + sdk-core + ${awssdk.version} + + + software.amazon.awssdk + aws-core + ${awssdk.version} + + + software.amazon.awssdk + arns + ${awssdk.version} + + + software.amazon.awssdk + regions + ${awssdk.version} + + + software.amazon.awssdk + utils + ${awssdk.version} + + + software.amazon.awssdk + http-client-spi + ${awssdk.version} + + software.amazon.glue schema-registry-serde @@ -127,6 +158,36 @@ commons-collections 3.2.2 + + org.apache.commons + commons-collections4 + 4.4 + + + io.netty + netty-handler + 4.1.108.Final + + + com.google.code.findbugs + jsr305 + 3.0.2 + + + com.fasterxml.jackson.core + jackson-databind + 2.10.1 + + + org.reactivestreams + reactive-streams + 1.0.4 + + + software.amazon.awssdk + annotations + 2.25.64 + org.slf4j slf4j-api @@ -153,6 +214,18 @@ + + software.amazon.awssdk + sts + ${awssdk.version} + test + + + software.amazon.awssdk + auth + ${awssdk.version} + test + org.junit.jupiter @@ -180,12 +253,24 @@ 3.12.4 test + + org.mockito + mockito-core + 3.12.4 + test + org.hamcrest hamcrest-all 1.3 test + + org.hamcrest + hamcrest-core + 1.3 + test + @@ -464,6 +549,25 @@ + + org.apache.maven.plugins + maven-dependency-plugin + 3.1.2 + + + analyze-dependencies + verify + + analyze-only + + + true + + true + + + + diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 7eb4c4f1a..4f4d7886d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -383,7 +383,6 @@ public void stop() { } leaseRenewalThreadpool.shutdownNow(); - leaseCoordinatorThreadPool.shutdownNow(); leaseGracefulShutdownHandler.stop(); synchronized (shutdownLock) { leaseRenewer.clearCurrentlyHeldLeases(); From 037dd7f6c811391d9f23e9b573d589a6bff9bba9 Mon Sep 17 00:00:00 2001 From: Aravinda Kidambi Srinivasan Date: Thu, 7 Nov 2024 14:55:30 -0800 Subject: [PATCH 2/4] Fix typo THROUGHOUT_PUT_KBPS --- .../leases/dynamodb/DynamoDBLeaseSerializer.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 16d719bf8..0ad34b69e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -54,7 +54,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { private static final String CHILD_SHARD_IDS_KEY = "childShardIds"; private static final String STARTING_HASH_KEY = "startingHashKey"; private static final String ENDING_HASH_KEY = "endingHashKey"; - private static final String THROUGHOUT_PUT_KBPS = "throughputKBps"; + private static final String THROUGHPUT_KBPS = "throughputKBps"; private static final String CHECKPOINT_SEQUENCE_NUMBER_KEY = "checkpoint"; static final String CHECKPOINT_OWNER = "checkpointOwner"; static final String LEASE_OWNER_KEY = "leaseOwner"; @@ -113,7 +113,7 @@ public Map toDynamoRecord(final Lease lease) { } if (lease.throughputKBps() != null) { - result.put(THROUGHOUT_PUT_KBPS, DynamoUtils.createAttributeValue(lease.throughputKBps())); + result.put(THROUGHPUT_KBPS, DynamoUtils.createAttributeValue(lease.throughputKBps())); } if (lease.checkpointOwner() != null) { @@ -155,8 +155,8 @@ public Lease fromDynamoRecord(Map dynamoRecord, Lease le leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(startingHashKey, endingHashKey)); } - if (DynamoUtils.safeGetDouble(dynamoRecord, THROUGHOUT_PUT_KBPS) != null) { - leaseToUpdate.throughputKBps(DynamoUtils.safeGetDouble(dynamoRecord, THROUGHOUT_PUT_KBPS)); + if (DynamoUtils.safeGetDouble(dynamoRecord, THROUGHPUT_KBPS) != null) { + leaseToUpdate.throughputKBps(DynamoUtils.safeGetDouble(dynamoRecord, THROUGHPUT_KBPS)); } if (DynamoUtils.safeGetString(dynamoRecord, CHECKPOINT_OWNER) != null) { @@ -466,7 +466,7 @@ public Map getDynamoLeaseThroughputKbpsUpdate(Leas .value(DynamoUtils.createAttributeValue(lease.throughputKBps())) .action(AttributeAction.PUT) .build(); - result.put(THROUGHOUT_PUT_KBPS, avu); + result.put(THROUGHPUT_KBPS, avu); return result; } From 6acb51d66e9446b62690d367363624641b3c916a Mon Sep 17 00:00:00 2001 From: Aravinda Kidambi Srinivasan Date: Fri, 8 Nov 2024 12:10:15 -0800 Subject: [PATCH 3/4] Fix shutdown sequence Fix a few issues in shutdown sequence and make sure scheduler shutdown without invoking run works Inorder for the main thread to shutdown, the scheduler's final shutdown still needs to be invoked, which only happens via the run method. --- amazon-kinesis-client/pom.xml | 5 +++++ .../coordinator/DynamicMigrationComponentsInitializer.java | 6 ++++++ .../MigrationClientVersion3xWithRollbackState.java | 2 +- .../coordinator/migration/MigrationStateMachineImpl.java | 2 +- 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index a7eafd544..9e31537bf 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -120,6 +120,11 @@ http-client-spi ${awssdk.version} + + software.amazon.awssdk + dynamodb-enhanced + ${awssdk.version} + software.amazon.glue diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java index c4aecdda2..e9740ccac 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java @@ -223,6 +223,9 @@ void shutdown() { workerMetricsThreadPool.shutdown(); try { if (!lamThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + log.info( + "LamThreadPool did not shutdown in {}s, forcefully shutting down", + SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS); lamThreadPool.shutdownNow(); } } catch (final InterruptedException e) { @@ -232,6 +235,9 @@ void shutdown() { try { if (!workerMetricsThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + log.info( + "WorkerMetricsThreadPool did not shutdown in {}s, forcefully shutting down", + SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS); workerMetricsThreadPool.shutdownNow(); } } catch (final InterruptedException e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java index 6235c5a93..912f0dc9a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java @@ -87,7 +87,7 @@ public synchronized void enter(final ClientVersion fromClientVersion) throws Dep } @Override - public void leave() { + public synchronized void leave() { if (entered && !left) { log.info("Leaving {}", this); cancelRollbackMonitor(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java index 96e16a0f5..ad744bfa9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java @@ -126,7 +126,7 @@ public void shutdown() { if (!stateMachineThreadPool.isShutdown()) { stateMachineThreadPool.shutdown(); try { - if (stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + if (!stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { log.info( "StateMachineThreadPool did not shutdown within {} seconds, forcefully shutting down", THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS); From c53665fc0b1f58cb370c221a502bc98fee1fbf1d Mon Sep 17 00:00:00 2001 From: Aravinda Kidambi Srinivasan Date: Tue, 12 Nov 2024 10:23:55 -0800 Subject: [PATCH 4/4] Fix backward compatibility check Avoid flagging methods as deleted if it is marked synchronized. Also mark a few more interfaces as internal. --- .github/scripts/backwards_compatibility_check.sh | 14 +++++++++----- .../assignment/LeaseAssignmentDecider.java | 2 ++ .../migration/MigrationClientVersionState.java | 2 ++ .../migration/MigrationStateMachine.java | 2 ++ .../amazon/kinesis/leases/LeaseDiscoverer.java | 2 ++ 5 files changed, 17 insertions(+), 5 deletions(-) diff --git a/.github/scripts/backwards_compatibility_check.sh b/.github/scripts/backwards_compatibility_check.sh index 070e9c171..03eee014f 100755 --- a/.github/scripts/backwards_compatibility_check.sh +++ b/.github/scripts/backwards_compatibility_check.sh @@ -58,14 +58,18 @@ is_non_public_class() { return $? } -# Ignore methods that change from abstract to non-abstract (and vice versa) if the class is an interface. -ignore_abstract_changes_in_interfaces() { +# Ignore methods that change from abstract to non-abstract (and vice-versa) if the class is an interface.\ +# Ignore methods that change from synchronized to non-synchronized (and vice-versa) +ignore_non_breaking_changes() { local current_class="$1" local class_definition=$(javap -classpath "$LATEST_JAR" "$current_class" | head -2 | tail -1) if [[ $class_definition == *"interface"* ]] then - LATEST_METHODS=${LATEST_METHODS// abstract / } - CURRENT_METHODS=${CURRENT_METHODS// abstract / } + LATEST_METHODS=${LATEST_METHODS//abstract /} + CURRENT_METHODS=${CURRENT_METHODS//abstract /} + else + LATEST_METHODS=${LATEST_METHODS//synchronized /} + CURRENT_METHODS=${CURRENT_METHODS//synchronized /} fi } @@ -103,7 +107,7 @@ find_removed_methods() { LATEST_METHODS=$(javap -classpath "$LATEST_JAR" "$class") - ignore_abstract_changes_in_interfaces "$class" + ignore_non_breaking_changes "$class" local removed_methods=$(diff <(echo "$LATEST_METHODS") <(echo "$CURRENT_METHODS") | grep '^<') diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentDecider.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentDecider.java index a39866aeb..660426512 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentDecider.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentDecider.java @@ -17,8 +17,10 @@ import java.util.List; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.Lease; +@KinesisClientInternalApi public interface LeaseAssignmentDecider { /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionState.java index c1d8507ed..c2e3feac9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionState.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionState.java @@ -14,11 +14,13 @@ */ package software.amazon.kinesis.coordinator.migration; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.exceptions.DependencyException; /** * Interface of a state implementation for the MigrationStateMachine */ +@KinesisClientInternalApi public interface MigrationClientVersionState { /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachine.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachine.java index 4698feb08..6dff4e0c5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachine.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachine.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.coordinator.migration; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; @@ -28,6 +29,7 @@ * 3. Instant roll-forwards - Once any issue has been mitigated, rollfowards are supported instantly * with KCL Migration tool. */ +@KinesisClientInternalApi public interface MigrationStateMachine { /** * Initialize the state machine by identifying the initial state when the KCL worker comes up for the first time. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseDiscoverer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseDiscoverer.java index 9f6ce776e..6056d1fd8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseDiscoverer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseDiscoverer.java @@ -17,10 +17,12 @@ import java.util.List; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; +@KinesisClientInternalApi public interface LeaseDiscoverer { /** * Identifies the leases that are assigned to the current worker but are not being tracked and processed by the