Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker][PIP-379] Don't replace a consumer when there's a collision #23441

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Range;

/**
* This is a consumer selector using consistent hashing to evenly split
* the number of keys assigned to each consumer.
*/
@Slf4j
public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyConsumerSelector {
// use NUL character as field separator for hash key calculation
private static final String KEY_SEPARATOR = "\0";
Expand Down Expand Up @@ -76,18 +78,36 @@ public CompletableFuture<Optional<ImpactedConsumersResult>> addConsumer(Consumer
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
// Insert multiple points on the hash ring for every consumer
// The points are deterministically added based on the hash of the consumer name
int hashPointsAdded = 0;
int hashPointCollisions = 0;
for (int i = 0; i < numberOfPoints; i++) {
int consumerNameIndex =
consumerNameIndexTracker.increaseConsumerRefCountAndReturnIndex(consumerIdentityWrapper);
int hash = calculateHashForConsumerAndIndex(consumer, consumerNameIndex, i);
// When there's a collision, the new consumer will replace the old one.
// This is a rare case, and it is acceptable to replace the old consumer since there
// are multiple points for each consumer. This won't affect the overall distribution significantly.
ConsumerIdentityWrapper removed = hashRing.put(hash, consumerIdentityWrapper);
if (removed != null) {
consumerNameIndexTracker.decreaseConsumerRefCount(removed);
// When there's a collision, the entry won't be added to the hash ring.
// This isn't a problem with the consumerNameIndexTracker solution since the collisions won't align
// for all hash ring points when using the same consumer name. This won't affect the overall
// distribution significantly when the number of hash ring points is sufficiently large (>100).
ConsumerIdentityWrapper existing = hashRing.putIfAbsent(hash, consumerIdentityWrapper);
if (existing != null) {
hashPointCollisions++;
// reduce the ref count which was increased before adding since the consumer was not added
consumerNameIndexTracker.decreaseConsumerRefCount(consumerIdentityWrapper);
} else {
hashPointsAdded++;
}
}
if (hashPointsAdded == 0) {
log.error("Failed to add consumer '{}' to the hash ring. There were {} collisions. Consider increasing "
+ "the number of points ({}) per consumer by setting "
+ "subscriptionKeySharedConsistentHashingReplicaPoints={}",
consumer, hashPointCollisions, numberOfPoints,
Math.max((int) (numberOfPoints * 1.5d), numberOfPoints + 1));
}
lhotari marked this conversation as resolved.
Show resolved Hide resolved
if (log.isDebugEnabled()) {
log.debug("Added consumer '{}' with {} points, {} collisions", consumer, hashPointsAdded,
hashPointCollisions);
}
if (!addOrRemoveReturnsImpactedConsumersResult) {
return CompletableFuture.completedFuture(Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public static ConsumerHashAssignmentsSnapshot empty() {
return new ConsumerHashAssignmentsSnapshot(Collections.emptyList());
}

public ImpactedConsumersResult resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot other) {
return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, other.hashRangeAssignments);
public ImpactedConsumersResult resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot assignmentsAfter) {
return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, assignmentsAfter.hashRangeAssignments);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,11 +475,11 @@ public void testShouldNotChangeSelectedConsumerWhenConsumerIsAdded() {
}

@Test
public void testShouldContainMinimalMappingChangesWhenConsumerLeavesAndRejoins() {
public void testShouldNotContainMappingChangesWhenConsumersLeaveAndRejoinInSameOrder() {
final ConsistentHashingStickyKeyConsumerSelector selector =
new ConsistentHashingStickyKeyConsumerSelector(100, true);
new ConsistentHashingStickyKeyConsumerSelector(200, true);
final String consumerName = "consumer";
final int numOfInitialConsumers = 10;
final int numOfInitialConsumers = 200;
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < numOfInitialConsumers; i++) {
final Consumer consumer = createMockConsumer(consumerName, "index " + i, i);
Expand All @@ -498,14 +498,8 @@ public void testShouldContainMinimalMappingChangesWhenConsumerLeavesAndRejoins()
selector.addConsumer(consumers.get(numOfInitialConsumers / 2));

ConsumerHashAssignmentsSnapshot assignmentsAfter = selector.getConsumerHashAssignmentsSnapshot();
int removedRangesSize = assignmentsBefore.diffRanges(assignmentsAfter).keySet().stream()
.mapToInt(Range::size)
.sum();
double allowedremovedRangesPercentage = 1; // 1%
int hashRangeSize = selector.getKeyHashRange().size();
int allowedremovedRanges = (int) (hashRangeSize * (allowedremovedRangesPercentage / 100.0d));
assertThat(removedRangesSize).describedAs("Allow up to %d%% of total hash range size to be impacted",
allowedremovedRangesPercentage).isLessThan(allowedremovedRanges);

assertThat(assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges()).isEmpty();
}

@Test
Expand Down
Loading