Skip to content

Commit

Permalink
[improve][broker] Improve the extensibility of the TopicBundleAssignm…
Browse files Browse the repository at this point in the history
…entStrategy interface class (apache#23773)
  • Loading branch information
rayluoluo committed Jan 10, 2025
1 parent 04423ba commit dea39e3
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignmentStrategy {
private PulsarService pulsar;

private volatile HashFunction hashFunction;

@Override
public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {
NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName));
Expand All @@ -39,7 +41,14 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac

@Override
public long calculateBundleHashCode(TopicName topicName) {
return getBundleHashFunc().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
if (hashFunction == null) {
synchronized (ConsistentHashingTopicBundleAssigner.class) {
if (hashFunction == null) {
hashFunction = getBundleHashFunc();
}
}
}
return hashFunction.hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
}

@Override
Expand All @@ -49,6 +58,7 @@ public void init(PulsarService pulsarService) {

private HashFunction getBundleHashFunc() {
return Optional.ofNullable(pulsar.getNamespaceService()).map(NamespaceService::getNamespaceBundleFactory)
.map(NamespaceBundleFactory::getHashFunc).orElseThrow(() -> new RuntimeException("HashFunc not specified"));
.map(NamespaceBundleFactory::getHashFunc)
.orElseThrow(() -> new RuntimeException("HashFunc not specified"));
}
}

0 comments on commit dea39e3

Please sign in to comment.