-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] Improve the extensibility of the TopicBundleAssignmentStrategy interface class (#23773) #23774
Conversation
9567b1a
to
43e0699
Compare
@@ -23,5 +23,7 @@ | |||
public interface TopicBundleAssignmentStrategy { | |||
NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles); | |||
|
|||
long getHashCode(String name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to make the name of this method more specific. It's better to use the type TopicName
as well and provide a default implementation. Please also add javadoc with proper description of the method.
long getHashCode(String name); | |
default long calculateBundleHashCode(TopicName topicName) { | |
return Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong(); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change as suggested, thank you. @lhotari
if (topicName.getDomain().equals(TopicDomain.non_persistent)) { | ||
bundle.setHasNonPersistentTopic(true); | ||
} | ||
return bundle; | ||
} | ||
|
||
@Override | ||
public long getHashCode(String name) { | ||
return pulsar.getNamespaceService().getNamespaceBundleFactory().getHashFunc() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't look great since it's a "train wreck" anti-pattern to have long call chains to reach out to a dependency. could the hash function be passed in the constructor instead?
@Override | ||
public void init(PulsarService pulsarService) { | ||
this.pulsar = pulsarService; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can hold a reference to HashFunc here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HashFunc is initialized in the NamespaceBundleFactory. When the ConsistentHashingTopicBundleAssigner#init() method needs to be called to initialize the bundle allocation algorithm instance, pulsar.getNamespaceService() is not ready, pulsar.getNamespaceService().getNamespaceBundleFactory() will cause NPE. Therefore, HashFunc is more suitable for obtaining on first use.
if (topicName.getDomain().equals(TopicDomain.non_persistent)) { | ||
bundle.setHasNonPersistentTopic(true); | ||
} | ||
return bundle; | ||
} | ||
|
||
@Override | ||
public long calculateBundleHashCode(TopicName topicName) { | ||
if (hashFunction == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems not thread safety. Why not use double check lock
if (hashFunction == null) {
synchronized(ConsistentHashingTopicBundleAssigner.class) {
if (hashFunction == null) {
hashFunction = getBundleHashFunc();
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change as suggested, thank you. @315157973
[improve][broker] Improve the extensibility of the TopicBundleAssignmentStrategy interface class (#23773)
Fixes #23773
PIP: #19806
Motivation
For details, see the description of issue #23773
Modifications
The implementation of the
NamespaceBundleFactory#getLongHashCode
method is moved to the implementation class of the interfaceTopicBundleAssignmentStrategy
. Therefore, a new methodlong getHashCode(String name)
is added to theTopicBundleAssignmentStrategy
interface class. The implementation of the hash algorithm is no longer fixed in theNamespaceBundleFactory#getLongHashCode
method. Instead, thegetHashCode
method implemented by different algorithms is invoked.Verifying this change
This change is already covered by existing tests, such as TopicBundleAssignmentStrategyTest#testStrategyFactory, and added tests and can be verified as follows:
TopicBundleAssignmentStrategyTest
RoundRobinBundleAssigner
is customized and theTopicBundleAssignmentStrategy
interface is implemented. Only the interfaces inTopicBundleAssignmentStrategy
need to be implemented. It's not needed to modify the interface implementation of other classes. The open and closed principles are met.Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: