diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java index 1e8b0d03392cc..04f85a56efaed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -18,23 +18,46 @@ */ package org.apache.pulsar.common.naming; -import com.google.common.hash.Hashing; +import com.google.common.hash.HashFunction; import java.nio.charset.StandardCharsets; +import java.util.Optional; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.namespace.NamespaceService; public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignmentStrategy { + private PulsarService pulsar; + + private volatile HashFunction hashFunction; + @Override public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { - long hashCode = Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong(); - NamespaceBundle bundle = namespaceBundles.getBundle(hashCode); + NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName)); if (topicName.getDomain().equals(TopicDomain.non_persistent)) { bundle.setHasNonPersistentTopic(true); } return bundle; } + @Override + public long calculateBundleHashCode(TopicName topicName) { + if (hashFunction == null) { + synchronized (ConsistentHashingTopicBundleAssigner.class) { + if (hashFunction == null) { + hashFunction = getBundleHashFunc(); + } + } + } + return hashFunction.hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong(); + } + @Override public void init(PulsarService pulsarService) { + this.pulsar = pulsarService; } + private HashFunction getBundleHashFunc() { + return Optional.ofNullable(pulsar.getNamespaceService()).map(NamespaceService::getNamespaceBundleFactory) + .map(NamespaceBundleFactory::getHashFunc) + .orElseThrow(() -> new RuntimeException("HashFunc not specified")); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 2b285cbb0e2ab..bbace4e16ba43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -29,7 +29,6 @@ import com.google.common.collect.Range; import com.google.common.hash.HashFunction; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -64,6 +63,7 @@ public class NamespaceBundleFactory { private static final Logger LOG = LoggerFactory.getLogger(NamespaceBundleFactory.class); + @Getter private final HashFunction hashFunc; private final AsyncLoadingCache bundlesCache; @@ -292,7 +292,7 @@ public CompletableFuture getFullBundleAsync(NamespaceName fqnn) } public long getLongHashCode(String name) { - return this.hashFunc.hashString(name, StandardCharsets.UTF_8).padToLong(); + return this.topicBundleAssignmentStrategy.calculateBundleHashCode(TopicName.get(name)); } public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java index b43ca4afa440e..07dd63f73536a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java @@ -18,10 +18,16 @@ */ package org.apache.pulsar.common.naming; +import com.google.common.hash.Hashing; +import java.nio.charset.StandardCharsets; import org.apache.pulsar.broker.PulsarService; public interface TopicBundleAssignmentStrategy { - NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles); + NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles); + + default long calculateBundleHashCode(TopicName topicName) { + return Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong(); + } void init(PulsarService pulsarService); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java index afa815979951a..cd385c12008b0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java @@ -30,6 +30,7 @@ import com.google.common.collect.Range; import com.google.common.hash.Hashing; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -122,7 +123,11 @@ private NamespaceBundleFactory getNamespaceBundleFactory() { MetadataStoreExtended store = mock(MetadataStoreExtended.class); when(pulsar.getLocalMetadataStore()).thenReturn(store); when(pulsar.getConfigurationMetadataStore()).thenReturn(store); - return NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()); + NamespaceService namespaceService = mock(NamespaceService.class); + when(pulsar.getNamespaceService()).thenReturn(namespaceService); + NamespaceBundleFactory namespaceBundleFactory = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()); + when(namespaceService.getNamespaceBundleFactory()).thenReturn(namespaceBundleFactory); + return namespaceBundleFactory; } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java new file mode 100644 index 0000000000000..7738a7396642c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import com.google.common.base.Charsets; +import com.google.common.collect.BoundType; +import com.google.common.collect.Range; +import com.google.common.hash.Hashing; +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.common.policies.data.PoliciesUtil; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-naming") +public class TopicBundleAssignmentStrategyTest { + @BeforeMethod + public void setUp() { + // clean up the values of static member variables in memory + try { + Field field = TopicBundleAssignmentFactory.class.getDeclaredField("strategy"); + field.setAccessible(true); + field.set(null, null); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testStrategyFactory() { + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setTopicBundleAssignmentStrategy( + "org.apache.pulsar.common.naming.TopicBundleAssignmentStrategyTest$TestStrategy"); + PulsarService pulsarService = mock(PulsarService.class); + doReturn(conf).when(pulsarService).getConfiguration(); + TopicBundleAssignmentStrategy strategy = TopicBundleAssignmentFactory.create(pulsarService); + NamespaceBundle bundle = strategy.findBundle(null, null); + Range keyRange = Range.range(0L, BoundType.CLOSED, 0xffffffffL, BoundType.CLOSED); + String range = String.format("0x%08x_0x%08x", keyRange.lowerEndpoint(), keyRange.upperEndpoint()); + Assert.assertEquals(bundle.getBundleRange(), range); + Assert.assertEquals(bundle.getNamespaceObject(), NamespaceName.get("my/test")); + } + + private static class TestStrategy implements TopicBundleAssignmentStrategy { + @Override + public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { + Range range = Range.range(0L, BoundType.CLOSED, 0xffffffffL, BoundType.CLOSED); + return new NamespaceBundle(NamespaceName.get("my/test"), range, + mock(NamespaceBundleFactory.class)); + } + + @Override + public long calculateBundleHashCode(TopicName topicName) { + return 0; + } + + @Override + public void init(PulsarService pulsarService) { + } + } + + @Test + public void testRoundRobinBundleAssigner() { + int DEFALUT_BUNDLE_NUM = 128; + + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setTopicBundleAssignmentStrategy( + "org.apache.pulsar.common.naming.TopicBundleAssignmentStrategyTest$RoundRobinBundleAssigner"); + conf.setDefaultNumberOfNamespaceBundles(DEFALUT_BUNDLE_NUM); + PulsarService pulsarService = mock(PulsarService.class); + doReturn(conf).when(pulsarService).getConfiguration(); + MetadataStoreExtended store = mock(MetadataStoreExtended.class); + when(pulsarService.getLocalMetadataStore()).thenReturn(store); + when(pulsarService.getConfigurationMetadataStore()).thenReturn(store); + NamespaceBundleFactory factoryNew = NamespaceBundleFactory.createFactory(pulsarService, Hashing.sha256()); + NamespaceService namespaceService = mock(NamespaceService.class); + when(namespaceService.getNamespaceBundleFactory()).thenReturn(factoryNew); + when(pulsarService.getNamespaceService()).thenReturn(namespaceService); + BundlesData bundlesData = PoliciesUtil.getBundles(DEFALUT_BUNDLE_NUM); + LocalPolicies localPolicies = new LocalPolicies(bundlesData, null, null); + NamespaceBundles bundles = new NamespaceBundles(NamespaceName.get("pulsar/global/ns1"), + factoryNew, Optional.of(Pair.of(localPolicies, (long) DEFALUT_BUNDLE_NUM))); + Set alreadyAssignNamespaceBundle = new HashSet(); + for (int i = 0; i < DEFALUT_BUNDLE_NUM; i++) { + TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-partition-" + i); + NamespaceBundle bundle = bundles.findBundle(topicName); + assertTrue(bundle.includes(topicName)); + //new hash func will make topic partition assign to different bundle as possible + assertFalse(alreadyAssignNamespaceBundle.contains(bundle)); + alreadyAssignNamespaceBundle.add(bundle); + } + } + + public static class RoundRobinBundleAssigner implements TopicBundleAssignmentStrategy { + PulsarService pulsar; + + @Override + public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { + NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName)); + if (topicName.getDomain().equals(TopicDomain.non_persistent)) { + bundle.setHasNonPersistentTopic(true); + } + return bundle; + } + + @Override + public long calculateBundleHashCode(TopicName topicName) { + // use topic name without partition id to decide the first hash value + long currentPartitionTopicHash = + pulsar.getNamespaceService().getNamespaceBundleFactory().getHashFunc() + .hashString(topicName.getPartitionedTopicName(), Charsets.UTF_8).padToLong(); + + // if the topic is a non partition topic, use topic name to calculate the hashcode + if (!topicName.isPartitioned()) { + return currentPartitionTopicHash; + } + + // a pieces of bundle size with default * partiton id + double targetPartitionRangeSize = + (double) NamespaceBundles.FULL_UPPER_BOUND / (double) pulsar.getConfiguration() + .getDefaultNumberOfNamespaceBundles() * (double) topicName.getPartitionIndex(); + + // new hash func will make topic partition assign to different bundle as possible + return (currentPartitionTopicHash + Math.round(targetPartitionRangeSize)) + % NamespaceBundles.FULL_UPPER_BOUND; + } + + @Override + public void init(PulsarService pulsarService) { + this.pulsar = pulsarService; + } + } +} diff --git a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java deleted file mode 100644 index b371106cad85a..0000000000000 --- a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.naming; - -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; - -import com.google.common.collect.BoundType; -import com.google.common.collect.Range; - -import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.testng.Assert; -import org.testng.annotations.Test; - -@Test(groups = "broker-naming") -public class TopicBundleAssignmentStrategyTest { - @Test - public void testStrategyFactory() { - ServiceConfiguration conf = new ServiceConfiguration(); - conf.setTopicBundleAssignmentStrategy( - "org.apache.pulsar.common.naming.TopicBundleAssignmentStrategyTest$TestStrategy"); - PulsarService pulsarService = mock(PulsarService.class); - doReturn(conf).when(pulsarService).getConfiguration(); - TopicBundleAssignmentStrategy strategy = TopicBundleAssignmentFactory.create(pulsarService); - NamespaceBundle bundle = strategy.findBundle(null, null); - Range keyRange = Range.range(0L, BoundType.CLOSED, 0xffffffffL, BoundType.CLOSED); - String range = String.format("0x%08x_0x%08x", keyRange.lowerEndpoint(), keyRange.upperEndpoint()); - Assert.assertEquals(bundle.getBundleRange(), range); - Assert.assertEquals(bundle.getNamespaceObject(), NamespaceName.get("my/test")); - } - - public static class TestStrategy implements TopicBundleAssignmentStrategy { - @Override - public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { - Range range = Range.range(0L, BoundType.CLOSED, 0xffffffffL, BoundType.CLOSED); - return new NamespaceBundle(NamespaceName.get("my/test"), range, - mock(NamespaceBundleFactory.class)); - } - - @Override - public void init(PulsarService pulsarService) { - - } - } -}