Skip to content

Commit

Permalink
[fix][broker] Fix thread unsafe access on the bundle range cache for …
Browse files Browse the repository at this point in the history
…load manager (#23217)
  • Loading branch information
BewareMyPower authored Aug 28, 2024
1 parent 9a97c84 commit 325c6a5
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 173 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

/**
* The cache for the bundle ranges.
* The first key is the broker id and the second key is the namespace name, the value is the set of bundle ranges of
* that namespace. When the broker key is accessed if the associated value is not present, an empty map will be created
* as the initial value that will never be removed.
* Therefore, for each broker, there could only be one internal map during the whole lifetime. Then it will be safe
* to apply the synchronized key word on the value for thread safe operations.
*/
public class BundleRangeCache {

// Map from brokers to namespaces to the bundle ranges in that namespace assigned to that broker.
// Used to distribute bundles within a namespace evenly across brokers.
private final Map<String, Map<String, Set<String>>> data = new ConcurrentHashMap<>();

public void reloadFromBundles(String broker, Stream<String> bundles) {
final var namespaceToBundleRange = data.computeIfAbsent(broker, __ -> new HashMap<>());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.clear();
bundles.forEach(bundleName -> {
final String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName);
namespaceToBundleRange.computeIfAbsent(namespace, __ -> new HashSet<>()).add(bundleRange);
});
}
}

public void add(String broker, String namespace, String bundleRange) {
final var namespaceToBundleRange = data.computeIfAbsent(broker, __ -> new HashMap<>());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespace, __ -> new HashSet<>()).add(bundleRange);
}
}

public int getBundleRangeCount(String broker, String namespace) {
final var namespaceToBundleRange = data.computeIfAbsent(broker, __ -> new HashMap<>());
synchronized (namespaceToBundleRange) {
final var bundleRangeSet = namespaceToBundleRange.get(namespace);
return bundleRangeSet != null ? bundleRangeSet.size() : 0;
}
}

/**
* Get the map whose key is the broker and value is the namespace that has at least 1 cached bundle range.
*/
public Map<String, List<String>> getBrokerToNamespacesMap() {
final var brokerToNamespaces = new HashMap<String, List<String>>();
for (var entry : data.entrySet()) {
final var broker = entry.getKey();
final var namespaceToBundleRange = entry.getValue();
synchronized (namespaceToBundleRange) {
brokerToNamespaces.put(broker, namespaceToBundleRange.keySet().stream().toList());
}
}
return brokerToNamespaces;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
Expand Down Expand Up @@ -282,24 +280,6 @@ public static CompletableFuture<Set<String>> applyNamespacePoliciesAsync(
return brokerCandidateCache;
});
}
/**
* Using the given bundles, populate the namespace to bundle range map.
*
* @param bundles
* Bundles with which to populate.
* @param target
* Map to fill.
*/
public static void fillNamespaceToBundlesMap(final Set<String> bundles,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> target) {
bundles.forEach(bundleName -> {
final String namespaceName = getNamespaceNameFromBundleName(bundleName);
final String bundleRange = getBundleRangeFromBundleName(bundleName);
target.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
});
}

// From a full bundle name, extract the bundle range.
public static String getBundleRangeFromBundleName(String bundleName) {
Expand Down Expand Up @@ -359,8 +339,7 @@ public static boolean isLoadSheddingEnabled(final PulsarService pulsar) {
public static void removeMostServicingBrokersForNamespace(
final String assignedBundleName,
final Set<String> candidates,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
brokerToNamespaceToBundleRange) {
final BundleRangeCache brokerToNamespaceToBundleRange) {
if (candidates.isEmpty()) {
return;
}
Expand All @@ -369,13 +348,7 @@ public static void removeMostServicingBrokersForNamespace(
int leastBundles = Integer.MAX_VALUE;

for (final String broker : candidates) {
int bundles = (int) brokerToNamespaceToBundleRange
.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder().build())
.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.size();
int bundles = brokerToNamespaceToBundleRange.getBundleRangeCount(broker, namespaceName);
leastBundles = Math.min(leastBundles, bundles);
if (leastBundles == 0) {
break;
Expand All @@ -386,13 +359,8 @@ public static void removeMostServicingBrokersForNamespace(
// `leastBundles` may differ from the actual value.

final int finalLeastBundles = leastBundles;
candidates.removeIf(
broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder().build())
.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.size() > finalLeastBundles);
candidates.removeIf(broker ->
brokerToNamespaceToBundleRange.getBundleRangeCount(broker, namespaceName) > finalLeastBundles);
}

/**
Expand Down Expand Up @@ -426,8 +394,7 @@ public static void removeMostServicingBrokersForNamespace(
public static void filterAntiAffinityGroupOwnedBrokers(
final PulsarService pulsar, final String assignedBundleName,
final Set<String> candidates,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
brokerToNamespaceToBundleRange,
final BundleRangeCache brokerToNamespaceToBundleRange,
Map<String, String> brokerToDomainMap) {
if (candidates.isEmpty()) {
return;
Expand Down Expand Up @@ -572,8 +539,7 @@ private static void filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(
*/
public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOwnedBrokers(
final PulsarService pulsar, final String namespaceName,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
brokerToNamespaceToBundleRange) {
final BundleRangeCache brokerToNamespaceToBundleRange) {

CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult = new CompletableFuture<>();
getNamespaceAntiAffinityGroupAsync(pulsar, namespaceName)
Expand All @@ -584,21 +550,16 @@ public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOw
}
final String antiAffinityGroup = antiAffinityGroupOptional.get();
final Map<String, Integer> brokerToAntiAffinityNamespaceCount = new ConcurrentHashMap<>();
final List<CompletableFuture<Void>> futures = new ArrayList<>();
brokerToNamespaceToBundleRange.forEach((broker, nsToBundleRange) -> {
nsToBundleRange.forEach((ns, bundleRange) -> {
if (bundleRange.isEmpty()) {
return;
}

CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
countAntiAffinityNamespaceOwnedBrokers(broker, ns, future,
final var brokerToNamespaces = brokerToNamespaceToBundleRange.getBrokerToNamespacesMap();
FutureUtil.waitForAll(brokerToNamespaces.entrySet().stream().flatMap(e -> {
final var broker = e.getKey();
return e.getValue().stream().map(namespace -> {
final var future = new CompletableFuture<Void>();
countAntiAffinityNamespaceOwnedBrokers(broker, namespace, future,
pulsar, antiAffinityGroup, brokerToAntiAffinityNamespaceCount);
return future;
});
});
FutureUtil.waitForAll(futures)
.thenAccept(r -> antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));
}).toList()).thenAccept(__ -> antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));
}).exceptionally(ex -> {
// namespace-policies has not been created yet
antiAffinityNsBrokersResult.complete(null);
Expand Down Expand Up @@ -698,7 +659,6 @@ public static Optional<String> getNamespaceAntiAffinityGroup(
* by different broker.
*
* @param namespace
* @param bundle
* @param currentBroker
* @param pulsar
* @param brokerToNamespaceToBundleRange
Expand All @@ -707,10 +667,9 @@ public static Optional<String> getNamespaceAntiAffinityGroup(
* @throws Exception
*/
public static boolean shouldAntiAffinityNamespaceUnload(
String namespace, String bundle, String currentBroker,
String namespace, String currentBroker,
final PulsarService pulsar,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
brokerToNamespaceToBundleRange,
final BundleRangeCache brokerToNamespaceToBundleRange,
Set<String> candidateBrokers) throws Exception {

Map<String, Integer> brokerNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar, namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
Expand Down Expand Up @@ -72,8 +73,6 @@
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.Notification;
Expand Down Expand Up @@ -116,10 +115,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
// Broker host usage object used to calculate system resource usage.
private BrokerHostUsage brokerHostUsage;

// Map from brokers to namespaces to the bundle ranges in that namespace assigned to that broker.
// Used to distribute bundles within a namespace evenly across brokers.
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
brokerToNamespaceToBundleRange;
private final BundleRangeCache brokerToNamespaceToBundleRange = new BundleRangeCache();

// Path to the ZNode containing the LocalBrokerData json for this broker.
private String brokerZnodePath;
Expand Down Expand Up @@ -199,10 +195,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
*/
public ModularLoadManagerImpl() {
brokerCandidateCache = new HashSet<>();
brokerToNamespaceToBundleRange =
ConcurrentOpenHashMap.<String,
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
.build();
defaultStats = new NamespaceBundleStats();
filterPipeline = new ArrayList<>();
loadData = new LoadData();
Expand Down Expand Up @@ -582,17 +574,9 @@ private void updateBundleData() {
TimeAverageBrokerData timeAverageData = new TimeAverageBrokerData();
timeAverageData.reset(statsMap.keySet(), bundleData, defaultStats);
brokerData.setTimeAverageData(timeAverageData);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker, k ->
ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), namespaceToBundleRange);
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(), namespaceToBundleRange);
}

brokerToNamespaceToBundleRange.reloadFromBundles(broker,
Stream.of(statsMap.keySet(), preallocatedBundleData.keySet()).flatMap(Collection::stream));
}

// Remove not active bundle from loadData
Expand Down Expand Up @@ -736,7 +720,7 @@ public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle
.getBundle(namespace, bundle);
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(), brokerTopicLoadingPredicate);
return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, currentBroker, pulsar,
return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, currentBroker, pulsar,
brokerToNamespaceToBundleRange, brokerCandidateCache);
}

Expand Down Expand Up @@ -873,17 +857,7 @@ private void preallocateBundle(String bundle, String broker) {

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
}
brokerToNamespaceToBundleRange.add(broker, namespaceName, bundleRange);
}

@VisibleForTesting
Expand Down
Loading

0 comments on commit 325c6a5

Please sign in to comment.