Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](task-assignment) use consistent hash as default task assigner and cache the consistent hash ring (#28522) #28908

Merged
merged 1 commit into from
Dec 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.apache.doris.thrift.TScanRangeLocations;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand All @@ -46,7 +49,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class FederationBackendPolicy {
Expand All @@ -59,6 +64,53 @@ public class FederationBackendPolicy {
private int nextBe = 0;
private boolean initialized = false;

// Create a ConsistentHash ring may be a time-consuming operation, so we cache it.
private static LoadingCache<HashCacheKey, ConsistentHash<TScanRangeLocations, Backend>> consistentHashCache;

static {
consistentHashCache = CacheBuilder.newBuilder().maximumSize(5)
.build(new CacheLoader<HashCacheKey, ConsistentHash<TScanRangeLocations, Backend>>() {
@Override
public ConsistentHash<TScanRangeLocations, Backend> load(HashCacheKey key) {
return new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(),
new BackendHash(), key.bes, Config.virtual_node_number);
}
});
}

private static class HashCacheKey {
// sorted backend ids as key
private List<Long> beIds;
// backends is not part of key, just an attachment
private List<Backend> bes;

HashCacheKey(List<Backend> backends) {
this.bes = backends;
this.beIds = backends.stream().map(b -> b.getId()).sorted().collect(Collectors.toList());
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof HashCacheKey)) {
return false;
}
return Objects.equals(beIds, ((HashCacheKey) obj).beIds);
}

@Override
public int hashCode() {
return Objects.hash(beIds);
}

@Override
public String toString() {
return "HashCache{" + "beIds=" + beIds + '}';
}
}

public void init() throws UserException {
if (!initialized) {
init(Collections.emptyList());
Expand Down Expand Up @@ -96,8 +148,11 @@ public void init(BeSelectionPolicy policy) throws UserException {
throw new UserException("No available backends");
}
backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost)));
consistentHash = new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(),
new BackendHash(), backends, Config.virtual_node_number);
try {
consistentHash = consistentHashCache.get(new HashCacheKey(backends));
} catch (ExecutionException e) {
throw new UserException("failed to get consistent hash", e);
}
}

public Backend getNextBe() {
Expand All @@ -111,7 +166,7 @@ public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations) {
}

// Try to find a local BE, if not exists, use `getNextBe` instead
public Backend getNextLocalBe(List<String> hosts) {
public Backend getNextLocalBe(List<String> hosts, TScanRangeLocations scanRangeLocations) {
List<Backend> candidateBackends = Lists.newArrayListWithCapacity(hosts.size());
for (String host : hosts) {
List<Backend> backends = backendMap.get(host);
Expand All @@ -121,7 +176,7 @@ public Backend getNextLocalBe(List<String> hosts) {
}

return CollectionUtils.isEmpty(candidateBackends)
? getNextBe()
? getNextConsistentBe(scanRangeLocations)
: candidateBackends.get(random.nextInt(candidateBackends.size()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ public void createScanRangeLocations() throws UserException {
params.setProperties(locationProperties);
}

boolean enableSqlCache = ConnectContext.get().getSessionVariable().enableFileCache;
boolean enableShortCircuitRead = HdfsResource.enableShortCircuitRead(locationProperties);
List<String> pathPartitionKeys = getPathPartitionKeys();
for (Split split : inputSplits) {
Expand Down Expand Up @@ -346,14 +345,12 @@ public void createScanRangeLocations() throws UserException {
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
Backend selectedBackend;
if (enableSqlCache) {
// Use consistent hash to assign the same scan range into the same backend among different queries
selectedBackend = backendPolicy.getNextConsistentBe(curLocations);
} else if (enableShortCircuitRead) {
if (enableShortCircuitRead) {
// Try to find a local BE if enable hdfs short circuit read
selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()));
selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()), curLocations);
} else {
selectedBackend = backendPolicy.getNextBe();
// Use consistent hash to assign the same scan range into the same backend among different queries
selectedBackend = backendPolicy.getNextConsistentBe(curLocations);
}
setLocationPropertiesIfNecessary(selectedBackend, locationType, locationProperties);
location.setBackendId(selectedBackend.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import org.apache.doris.planner.external.FederationBackendPolicy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocations;

import com.google.common.base.Stopwatch;
import mockit.Mock;
Expand Down Expand Up @@ -93,12 +98,50 @@ public void testGetNextLocalBe() throws UserException {
int invokeTimes = 1000000;
Assertions.assertEquals(policy.numBackends(), backendNum);
List<String> localHosts = Arrays.asList("192.168.1.0", "192.168.1.1", "192.168.1.2");
TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100);
Stopwatch sw = Stopwatch.createStarted();
for (int i = 0; i < invokeTimes; i++) {
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts).getHost()));
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts, scanRangeLocations).getHost()));
}
sw.stop();
System.out.println("Invoke getNextLocalBe() " + invokeTimes
+ " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] ms");
}

@Test
public void testConsistentHash() throws UserException {
FederationBackendPolicy policy = new FederationBackendPolicy();
policy.init();
int backendNum = 200;
Assertions.assertEquals(policy.numBackends(), backendNum);

TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100);
Assertions.assertEquals(39, policy.getNextConsistentBe(scanRangeLocations).getId());

scanRangeLocations = getScanRangeLocations("path2", 0, 100);
Assertions.assertEquals(78, policy.getNextConsistentBe(scanRangeLocations).getId());
}

private TScanRangeLocations getScanRangeLocations(String path, long startOffset, long size) {
// Generate on file scan range
TFileScanRange fileScanRange = new TFileScanRange();
// Scan range
TExternalScanRange externalScanRange = new TExternalScanRange();
externalScanRange.setFileScanRange(fileScanRange);
TScanRange scanRange = new TScanRange();
scanRange.setExtScanRange(externalScanRange);
scanRange.getExtScanRange().getFileScanRange().addToRanges(createRangeDesc(path, startOffset, size));
// Locations
TScanRangeLocations locations = new TScanRangeLocations();
locations.setScanRange(scanRange);
return locations;
}

private TFileRangeDesc createRangeDesc(String path, long startOffset, long size) {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
rangeDesc.setPath(path);
rangeDesc.setStartOffset(startOffset);
rangeDesc.setSize(size);
return rangeDesc;
}
}
Loading