Skip to content

Commit

Permalink
[improvement](cloud) Accelerate cloud rebalance by batch editlog (#37787
Browse files Browse the repository at this point in the history
)

1. use `JournalBatch` to batch editlogs
2. same partition, different tablets use one editlog

env:
in docker cloud mode, 3fe 3be. 
3be expansion to 4be, trigger cloud rebalance
table, 1860 partitions, 48 buckets, every rebalance loop min balance 12
and close pre cache

result:
```
before improvement
2024-07-16 16:51:01,371 INFO (cloud tablet rebalancer|77) [CloudTabletRebalancer.runAfterCatalogReady():228] 
finished to rebalancer. cost: 58471 ms


after imprevement
2024-07-16 17:10:20,699 INFO (cloud tablet rebalancer|77) [CloudTabletRebalancer.runAfterCatalogReady():235]
finished to rebalancer. cost: 28687 ms
```
  • Loading branch information
deardeng authored and dataroaring committed Jul 22, 2024
1 parent 191910e commit 02dff85
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ public ShowResultSetMetaData getMetaData() {

@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
if (ConnectContext.get().getSessionVariable().getForwardToMaster()) {
return RedirectStatus.FORWARD_NO_SYNC;
} else {
return RedirectStatus.NO_FORWARD;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;

Expand Down Expand Up @@ -221,7 +222,10 @@ private long getBackendIdImpl(String cluster) {
return backendId;
}
}

if (DebugPointUtil.isEnable("CloudReplica.getBackendIdImpl.clusterToBackends")) {
LOG.info("Debug Point enable CloudReplica.getBackendIdImpl.clusterToBackends");
return -1;
}
return hashReplicaToBe(clusterId, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.doris.thrift.TWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;

import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -57,6 +58,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

public class CloudTabletRebalancer extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(CloudTabletRebalancer.class);
Expand Down Expand Up @@ -181,8 +183,13 @@ protected void runAfterCatalogReady() {
// 2 complete route info
replicaInfos = new ArrayList<UpdateCloudReplicaInfo>();
completeRouteInfo();
for (UpdateCloudReplicaInfo info : replicaInfos) {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info);
LOG.info("collect to editlog route {} infos", replicaInfos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(replicaInfos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}

// 3 check whether the inflight preheating task has been completed
Expand Down Expand Up @@ -238,9 +245,20 @@ public void balanceAllPartitions() {
entry.getKey(), entry.getValue().size());
}

List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
// balance in partitions/index
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
balanceInPartition(entry.getValue(), entry.getKey());
balanceInPartition(entry.getValue(), entry.getKey(), infos);
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog partitions before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}

for (Map.Entry<Long, List<Tablet>> entry : beToTabletsGlobal.entrySet()) {
Expand All @@ -263,9 +281,20 @@ public void balanceAllTables() {
entry.getKey(), entry.getValue().size());
}

List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
// balance in partitions/index
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
balanceInTable(entry.getValue(), entry.getKey());
balanceInTable(entry.getValue(), entry.getKey(), infos);
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog table before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}

for (Map.Entry<Long, List<Tablet>> entry : beToTabletsGlobal.entrySet()) {
Expand All @@ -288,8 +317,19 @@ public void globalBalance() {
entry.getKey(), entry.getValue().size());
}

List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
balanceImpl(entry.getValue(), entry.getKey(), futureBeToTabletsGlobal, BalanceType.GLOBAL);
balanceImpl(entry.getValue(), entry.getKey(), futureBeToTabletsGlobal, BalanceType.GLOBAL, infos);
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog global before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}

for (Map.Entry<Long, List<Tablet>> entry : beToTabletsGlobal.entrySet()) {
Expand All @@ -310,6 +350,7 @@ public void checkInflghtWarmUpCacheAsync() {
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId());
}

List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size());
Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey());
Expand All @@ -335,11 +376,21 @@ public void checkInflghtWarmUpCacheAsync() {
if (!result.getValue()) {
LOG.info("{} pre cache timeout, forced to change the mapping", result.getKey());
}
updateClusterToBeMap(task.pickedTablet, task.destBe, task.clusterId);
updateClusterToBeMap(task.pickedTablet, task.destBe, task.clusterId, infos);
tabletToInfightTask.remove(result.getKey());
}
}
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog warmup before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}

// recalculate inflight beToTablets, just for print the log
beToTabletIds = new HashMap<Long, List<Long>>();
Expand Down Expand Up @@ -550,22 +601,22 @@ public void loopCloudReplica(Operator operator) {
}
}

public void balanceInPartition(List<Long> bes, String clusterId) {
public void balanceInPartition(List<Long> bes, String clusterId, List<UpdateCloudReplicaInfo> infos) {
// balance all partition
for (Map.Entry<Long, Map<Long, Map<Long, List<Tablet>>>> partitionEntry : futurePartitionToTablets.entrySet()) {
Map<Long, Map<Long, List<Tablet>>> indexToTablets = partitionEntry.getValue();
// balance all index of a partition
for (Map.Entry<Long, Map<Long, List<Tablet>>> entry : indexToTablets.entrySet()) {
// balance a index
balanceImpl(bes, clusterId, entry.getValue(), BalanceType.PARTITION);
balanceImpl(bes, clusterId, entry.getValue(), BalanceType.PARTITION, infos);
}
}
}

public void balanceInTable(List<Long> bes, String clusterId) {
public void balanceInTable(List<Long> bes, String clusterId, List<UpdateCloudReplicaInfo> infos) {
// balance all tables
for (Map.Entry<Long, Map<Long, List<Tablet>>> entry : futureBeToTabletsInTable.entrySet()) {
balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE);
balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE, infos);
}
}

Expand Down Expand Up @@ -641,7 +692,8 @@ private void updateBeToTablets(Tablet pickedTablet, long srcBe, long destBe, Bal
partToTablets);
}

private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clusterId) {
private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clusterId,
List<UpdateCloudReplicaInfo> infos) {
CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0);
cloudReplica.updateClusterToBe(clusterId, destBe);
Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId());
Expand All @@ -663,7 +715,7 @@ private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clust
UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(),
cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(),
pickedTablet.getId(), cloudReplica.getId(), clusterId, destBe);
Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info);
infos.add(info);
} finally {
table.readUnlock();
}
Expand Down Expand Up @@ -765,7 +817,7 @@ private boolean isConflict(long srcBe, long destBe, CloudReplica cloudReplica, B
}

private void balanceImpl(List<Long> bes, String clusterId, Map<Long, List<Tablet>> beToTablets,
BalanceType balanceType) {
BalanceType balanceType, List<UpdateCloudReplicaInfo> infos) {
if (bes == null || bes.isEmpty() || beToTablets == null || beToTablets.isEmpty()) {
return;
}
Expand Down Expand Up @@ -852,7 +904,7 @@ private void balanceImpl(List<Long> bes, String clusterId, Map<Long, List<Tablet
beToTabletsInTable, partitionToTablets);
updateBeToTablets(pickedTablet, srcBe, destBe, balanceType,
futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
updateClusterToBeMap(pickedTablet, destBe, clusterId);
updateClusterToBeMap(pickedTablet, destBe, clusterId, infos);
}
}
}
Expand All @@ -869,17 +921,16 @@ private void migrateTablets(Long srcBe, Long dstBe) {
List<Tablet> tablets = new ArrayList<>();
if (!beToTabletsGlobal.containsKey(srcBe)) {
LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe);
// TODO(merge-cloud): wait add cloud upgrade mgr
// Env.getCurrentEnv().getCloudUpgradeMgr().setBeStateInactive(srcBe);
((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
return;
}
tablets = beToTabletsGlobal.get(srcBe);
if (tablets.isEmpty()) {
LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe);
// TODO(merge-cloud): wait add cloud upgrade mgr
// Env.getCurrentEnv().getCloudUpgradeMgr().setBeStateInactive(srcBe);
((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
return;
}
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
for (Tablet tablet : tablets) {
// get replica
CloudReplica cloudReplica = (CloudReplica) tablet.getReplicas().get(0);
Expand Down Expand Up @@ -915,11 +966,21 @@ private void migrateTablets(Long srcBe, Long dstBe) {
UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(),
cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(),
tablet.getId(), cloudReplica.getId(), clusterId, dstBe);
Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info);
infos.add(info);
} finally {
table.readUnlock();
}
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog migrate before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("update cloud replicas failed", e);
// edit log failed, try next time
throw new RuntimeException(e);
}

try {
((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().registerWaterShedTxnId(srcBe);
Expand All @@ -928,5 +989,64 @@ private void migrateTablets(Long srcBe, Long dstBe) {
throw new RuntimeException(e);
}
}

private List<UpdateCloudReplicaInfo> batchUpdateCloudReplicaInfoEditlogs(List<UpdateCloudReplicaInfo> infos) {
long start = System.currentTimeMillis();
List<UpdateCloudReplicaInfo> rets = new ArrayList<>();
// clusterId, infos
Map<String, List<UpdateCloudReplicaInfo>> clusterIdToInfos = infos.stream()
.collect(Collectors.groupingBy(UpdateCloudReplicaInfo::getClusterId));
for (Map.Entry<String, List<UpdateCloudReplicaInfo>> entry : clusterIdToInfos.entrySet()) {
// same cluster
String clusterId = entry.getKey();
List<UpdateCloudReplicaInfo> infoList = entry.getValue();
Map<Long, List<UpdateCloudReplicaInfo>> sameLocationInfos = infoList.stream()
.collect(Collectors.groupingBy(
info -> info.getDbId()
+ info.getTableId() + info.getPartitionId() + info.getIndexId()));
sameLocationInfos.forEach((location, locationInfos) -> {
UpdateCloudReplicaInfo newInfo = new UpdateCloudReplicaInfo();
long dbId = -1;
long tableId = -1;
long partitionId = -1;
long indexId = -1;
for (UpdateCloudReplicaInfo info : locationInfos) {
Preconditions.checkState(clusterId.equals(info.getClusterId()),
"impossible, cluster id not eq outer=" + clusterId + ", inner=" + info.getClusterId());

dbId = info.getDbId();
tableId = info.getTableId();
partitionId = info.getPartitionId();
indexId = info.getIndexId();

StringBuilder sb = new StringBuilder("impossible, some locations do not match location");
sb.append(", location=").append(location).append(", dbId=").append(dbId)
.append(", tableId=").append(tableId).append(", partitionId=").append(partitionId)
.append(", indexId=").append(indexId);
Preconditions.checkState(location == dbId + tableId + partitionId + indexId, sb.toString());

long tabletId = info.getTabletId();
long replicaId = info.getReplicaId();
long beId = info.getBeId();
newInfo.getTabletIds().add(tabletId);
newInfo.getReplicaIds().add(replicaId);
newInfo.getBeIds().add(beId);
}
newInfo.setDbId(dbId);
newInfo.setTableId(tableId);
newInfo.setPartitionId(partitionId);
newInfo.setIndexId(indexId);
newInfo.setClusterId(clusterId);
// APPR: in unprotectUpdateCloudReplica, use batch must set tabletId = -1
newInfo.setTabletId(-1);
rets.add(newInfo);
});
}
if (LOG.isDebugEnabled()) {
LOG.debug("batchUpdateCloudReplicaInfoEditlogs old size {}, cur size {} cost {} ms",
infos.size(), rets.size(), System.currentTimeMillis() - start);
}
return rets;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,12 @@ private void unprotectUpdateCloudReplica(OlapTable olapTable, UpdateCloudReplica
List<Long> tabletIds = info.getTabletIds();
for (int i = 0; i < tabletIds.size(); ++i) {
Tablet tablet = materializedIndex.getTablet(tabletIds.get(i));
Replica replica = tablet.getReplicas().get(0);
Replica replica;
if (info.getReplicaIds().isEmpty()) {
replica = tablet.getReplicas().get(0);
} else {
replica = tablet.getReplicaById(info.getReplicaIds().get(i));
}
Preconditions.checkNotNull(replica, info);

String clusterId = info.getClusterId();
Expand Down
Loading

0 comments on commit 02dff85

Please sign in to comment.