Skip to content

Commit

Permalink
[improve][broker] Improve the extensibility of the TopicBundleAssignm…
Browse files Browse the repository at this point in the history
…entStrategy interface class (apache#23773)
  • Loading branch information
rayluoluo committed Jan 13, 2025
1 parent ba04a43 commit bf72d86
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,46 @@
*/
package org.apache.pulsar.common.naming;

import com.google.common.hash.Hashing;
import com.google.common.hash.HashFunction;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;

public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignmentStrategy {
private PulsarService pulsar;

private volatile HashFunction hashFunction;

@Override
public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {
long hashCode = Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
NamespaceBundle bundle = namespaceBundles.getBundle(hashCode);
NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName));
if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
bundle.setHasNonPersistentTopic(true);
}
return bundle;
}

@Override
public long calculateBundleHashCode(TopicName topicName) {
if (hashFunction == null) {
synchronized (ConsistentHashingTopicBundleAssigner.class) {
if (hashFunction == null) {
hashFunction = getBundleHashFunc();
}
}
}
return hashFunction.hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
}

@Override
public void init(PulsarService pulsarService) {
this.pulsar = pulsarService;
}

private HashFunction getBundleHashFunc() {
return Optional.ofNullable(pulsar.getNamespaceService()).map(NamespaceService::getNamespaceBundleFactory)
.map(NamespaceBundleFactory::getHashFunc)
.orElseThrow(() -> new RuntimeException("HashFunc not specified"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -64,6 +63,7 @@
public class NamespaceBundleFactory {
private static final Logger LOG = LoggerFactory.getLogger(NamespaceBundleFactory.class);

@Getter
private final HashFunction hashFunc;

private final AsyncLoadingCache<NamespaceName, NamespaceBundles> bundlesCache;
Expand Down Expand Up @@ -292,7 +292,7 @@ public CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn)
}

public long getLongHashCode(String name) {
return this.hashFunc.hashString(name, StandardCharsets.UTF_8).padToLong();
return this.topicBundleAssignmentStrategy.calculateBundleHashCode(TopicName.get(name));
}

public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
*/
package org.apache.pulsar.common.naming;

import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import org.apache.pulsar.broker.PulsarService;

public interface TopicBundleAssignmentStrategy {
NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles);
NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles);

default long calculateBundleHashCode(TopicName topicName) {
return Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
}

void init(PulsarService pulsarService);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.collect.Range;
import com.google.common.hash.Hashing;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -122,7 +123,11 @@ private NamespaceBundleFactory getNamespaceBundleFactory() {
MetadataStoreExtended store = mock(MetadataStoreExtended.class);
when(pulsar.getLocalMetadataStore()).thenReturn(store);
when(pulsar.getConfigurationMetadataStore()).thenReturn(store);
return NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32());
NamespaceService namespaceService = mock(NamespaceService.class);
when(pulsar.getNamespaceService()).thenReturn(namespaceService);
NamespaceBundleFactory namespaceBundleFactory = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32());
when(namespaceService.getNamespaceBundleFactory()).thenReturn(namespaceBundleFactory);
return namespaceBundleFactory;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.naming;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import com.google.common.base.Charsets;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.Hashing;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.PoliciesUtil;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker-naming")
public class TopicBundleAssignmentStrategyTest {
@BeforeMethod
public void setUp() {
// clean up the values of static member variables in memory
try {
Field field = TopicBundleAssignmentFactory.class.getDeclaredField("strategy");
field.setAccessible(true);
field.set(null, null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}

@Test
public void testStrategyFactory() {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setTopicBundleAssignmentStrategy(
"org.apache.pulsar.common.naming.TopicBundleAssignmentStrategyTest$TestStrategy");
PulsarService pulsarService = mock(PulsarService.class);
doReturn(conf).when(pulsarService).getConfiguration();
TopicBundleAssignmentStrategy strategy = TopicBundleAssignmentFactory.create(pulsarService);
NamespaceBundle bundle = strategy.findBundle(null, null);
Range<Long> keyRange = Range.range(0L, BoundType.CLOSED, 0xffffffffL, BoundType.CLOSED);
String range = String.format("0x%08x_0x%08x", keyRange.lowerEndpoint(), keyRange.upperEndpoint());
Assert.assertEquals(bundle.getBundleRange(), range);
Assert.assertEquals(bundle.getNamespaceObject(), NamespaceName.get("my/test"));
}

private static class TestStrategy implements TopicBundleAssignmentStrategy {
@Override
public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {
Range<Long> range = Range.range(0L, BoundType.CLOSED, 0xffffffffL, BoundType.CLOSED);
return new NamespaceBundle(NamespaceName.get("my/test"), range,
mock(NamespaceBundleFactory.class));
}

@Override
public long calculateBundleHashCode(TopicName topicName) {
return 0;
}

@Override
public void init(PulsarService pulsarService) {
}
}

@Test
public void testRoundRobinBundleAssigner() {
int DEFALUT_BUNDLE_NUM = 128;

ServiceConfiguration conf = new ServiceConfiguration();
conf.setTopicBundleAssignmentStrategy(
"org.apache.pulsar.common.naming.TopicBundleAssignmentStrategyTest$RoundRobinBundleAssigner");
conf.setDefaultNumberOfNamespaceBundles(DEFALUT_BUNDLE_NUM);
PulsarService pulsarService = mock(PulsarService.class);
doReturn(conf).when(pulsarService).getConfiguration();
MetadataStoreExtended store = mock(MetadataStoreExtended.class);
when(pulsarService.getLocalMetadataStore()).thenReturn(store);
when(pulsarService.getConfigurationMetadataStore()).thenReturn(store);
NamespaceBundleFactory factoryNew = NamespaceBundleFactory.createFactory(pulsarService, Hashing.sha256());
NamespaceService namespaceService = mock(NamespaceService.class);
when(namespaceService.getNamespaceBundleFactory()).thenReturn(factoryNew);
when(pulsarService.getNamespaceService()).thenReturn(namespaceService);
BundlesData bundlesData = PoliciesUtil.getBundles(DEFALUT_BUNDLE_NUM);
LocalPolicies localPolicies = new LocalPolicies(bundlesData, null, null);
NamespaceBundles bundles = new NamespaceBundles(NamespaceName.get("pulsar/global/ns1"),
factoryNew, Optional.of(Pair.of(localPolicies, (long) DEFALUT_BUNDLE_NUM)));
Set<NamespaceBundle> alreadyAssignNamespaceBundle = new HashSet<NamespaceBundle>();
for (int i = 0; i < DEFALUT_BUNDLE_NUM; i++) {
TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-partition-" + i);
NamespaceBundle bundle = bundles.findBundle(topicName);
assertTrue(bundle.includes(topicName));
//new hash func will make topic partition assign to different bundle as possible
assertFalse(alreadyAssignNamespaceBundle.contains(bundle));
alreadyAssignNamespaceBundle.add(bundle);
}
}

public static class RoundRobinBundleAssigner implements TopicBundleAssignmentStrategy {
PulsarService pulsar;

@Override
public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {
NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName));
if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
bundle.setHasNonPersistentTopic(true);
}
return bundle;
}

@Override
public long calculateBundleHashCode(TopicName topicName) {
// use topic name without partition id to decide the first hash value
long currentPartitionTopicHash =
pulsar.getNamespaceService().getNamespaceBundleFactory().getHashFunc()
.hashString(topicName.getPartitionedTopicName(), Charsets.UTF_8).padToLong();

// if the topic is a non partition topic, use topic name to calculate the hashcode
if (!topicName.isPartitioned()) {
return currentPartitionTopicHash;
}

// a pieces of bundle size with default * partiton id
double targetPartitionRangeSize =
(double) NamespaceBundles.FULL_UPPER_BOUND / (double) pulsar.getConfiguration()
.getDefaultNumberOfNamespaceBundles() * (double) topicName.getPartitionIndex();

// new hash func will make topic partition assign to different bundle as possible
return (currentPartitionTopicHash + Math.round(targetPartitionRangeSize))
% NamespaceBundles.FULL_UPPER_BOUND;
}

@Override
public void init(PulsarService pulsarService) {
this.pulsar = pulsarService;
}
}
}

This file was deleted.

0 comments on commit bf72d86

Please sign in to comment.