From 976cd4c320385427c198c2bf29badabf42db7551 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 11 Oct 2024 17:58:39 +0300 Subject: [PATCH 1/2] [improve][broker][PIP-379] Don't replace a consumer when there's a collision --- ...stentHashingStickyKeyConsumerSelector.java | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java index fde140a299c27..2559a02f87df0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java @@ -27,12 +27,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Range; /** * This is a consumer selector using consistent hashing to evenly split * the number of keys assigned to each consumer. */ +@Slf4j public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyConsumerSelector { // use NUL character as field separator for hash key calculation private static final String KEY_SEPARATOR = "\0"; @@ -76,18 +78,36 @@ public CompletableFuture> addConsumer(Consumer ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer); // Insert multiple points on the hash ring for every consumer // The points are deterministically added based on the hash of the consumer name + int hashPointsAdded = 0; + int hashPointCollisions = 0; for (int i = 0; i < numberOfPoints; i++) { int consumerNameIndex = consumerNameIndexTracker.increaseConsumerRefCountAndReturnIndex(consumerIdentityWrapper); int hash = calculateHashForConsumerAndIndex(consumer, consumerNameIndex, i); - // When there's a collision, the new consumer will replace the old one. - // This is a rare case, and it is acceptable to replace the old consumer since there - // are multiple points for each consumer. This won't affect the overall distribution significantly. - ConsumerIdentityWrapper removed = hashRing.put(hash, consumerIdentityWrapper); - if (removed != null) { - consumerNameIndexTracker.decreaseConsumerRefCount(removed); + // When there's a collision, the entry won't be added to the hash ring. + // This isn't a problem with the consumerNameIndexTracker solution since the collisions won't align + // for all hash ring points when using the same consumer name. This won't affect the overall + // distribution significantly when the number of hash ring points is sufficiently large (>100). + ConsumerIdentityWrapper existing = hashRing.putIfAbsent(hash, consumerIdentityWrapper); + if (existing != null) { + hashPointCollisions++; + // reduce the ref count which was increased before adding since the consumer was not added + consumerNameIndexTracker.decreaseConsumerRefCount(consumerIdentityWrapper); + } else { + hashPointsAdded++; } } + if (hashPointsAdded == 0) { + log.error("Failed to add consumer '{}' to the hash ring. There were {} collisions. Consider increasing " + + "the number of points ({}) per consumer by setting " + + "subscriptionKeySharedConsistentHashingReplicaPoints={}", + consumer, hashPointCollisions, numberOfPoints, + Math.max((int) (numberOfPoints * 1.5d), numberOfPoints + 1)); + } + if (log.isDebugEnabled()) { + log.debug("Added consumer '{}' with {} points, {} collisions", consumer, hashPointsAdded, + hashPointCollisions); + } if (!addOrRemoveReturnsImpactedConsumersResult) { return CompletableFuture.completedFuture(Optional.empty()); } From cc54bf8983d9a2cb25df359e9d0ea5ca13d9f818 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 11 Oct 2024 18:03:32 +0300 Subject: [PATCH 2/2] Add test --- .../service/ConsumerHashAssignmentsSnapshot.java | 4 ++-- ...tentHashingStickyKeyConsumerSelectorTest.java | 16 +++++----------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java index d2bd113e69d1e..b4add79294cc4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java @@ -71,8 +71,8 @@ public static ConsumerHashAssignmentsSnapshot empty() { return new ConsumerHashAssignmentsSnapshot(Collections.emptyList()); } - public ImpactedConsumersResult resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot other) { - return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, other.hashRangeAssignments); + public ImpactedConsumersResult resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot assignmentsAfter) { + return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, assignmentsAfter.hashRangeAssignments); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java index 2b01256611b01..6752cd7cfab45 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java @@ -475,11 +475,11 @@ public void testShouldNotChangeSelectedConsumerWhenConsumerIsAdded() { } @Test - public void testShouldContainMinimalMappingChangesWhenConsumerLeavesAndRejoins() { + public void testShouldNotContainMappingChangesWhenConsumersLeaveAndRejoinInSameOrder() { final ConsistentHashingStickyKeyConsumerSelector selector = - new ConsistentHashingStickyKeyConsumerSelector(100, true); + new ConsistentHashingStickyKeyConsumerSelector(200, true); final String consumerName = "consumer"; - final int numOfInitialConsumers = 10; + final int numOfInitialConsumers = 200; List consumers = new ArrayList<>(); for (int i = 0; i < numOfInitialConsumers; i++) { final Consumer consumer = createMockConsumer(consumerName, "index " + i, i); @@ -498,14 +498,8 @@ public void testShouldContainMinimalMappingChangesWhenConsumerLeavesAndRejoins() selector.addConsumer(consumers.get(numOfInitialConsumers / 2)); ConsumerHashAssignmentsSnapshot assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot(); - int removedRangesSize = assignmentsBefore.diffRanges(assignmentsAfter).keySet().stream() - .mapToInt(Range::size) - .sum(); - double allowedremovedRangesPercentage = 1; // 1% - int hashRangeSize = selector.getKeyHashRange().size(); - int allowedremovedRanges = (int) (hashRangeSize * (allowedremovedRangesPercentage / 100.0d)); - assertThat(removedRangesSize).describedAs("Allow up to %d%% of total hash range size to be impacted", - allowedremovedRangesPercentage).isLessThan(allowedremovedRanges); + + assertThat(assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges()).isEmpty(); } @Test