Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](cloud) Accelerate cloud rebalance by batch editlog #37787

Merged
merged 7 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ public ShowResultSetMetaData getMetaData() {

@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
if (ConnectContext.get().getSessionVariable().getForwardToMaster()) {
return RedirectStatus.FORWARD_NO_SYNC;
} else {
return RedirectStatus.NO_FORWARD;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;

Expand Down Expand Up @@ -221,7 +222,10 @@ private long getBackendIdImpl(String cluster) {
return backendId;
}
}

if (DebugPointUtil.isEnable("CloudReplica.getBackendIdImpl.clusterToBackends")) {
LOG.info("Debug Point enable CloudReplica.getBackendIdImpl.clusterToBackends");
return -1;
}
return hashReplicaToBe(clusterId, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.doris.thrift.TWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;

import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -57,6 +58,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

public class CloudTabletRebalancer extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(CloudTabletRebalancer.class);
Expand Down Expand Up @@ -181,8 +183,13 @@ protected void runAfterCatalogReady() {
// 2 complete route info
replicaInfos = new ArrayList<UpdateCloudReplicaInfo>();
completeRouteInfo();
for (UpdateCloudReplicaInfo info : replicaInfos) {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info);
LOG.info("collect to editlog route {} infos", replicaInfos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(replicaInfos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}

// 3 check whether the inflight preheating task has been completed
Expand Down Expand Up @@ -238,9 +245,20 @@ public void balanceAllPartitions() {
entry.getKey(), entry.getValue().size());
}

List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
// balance in partitions/index
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
balanceInPartition(entry.getValue(), entry.getKey());
balanceInPartition(entry.getValue(), entry.getKey(), infos);
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog partitions before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}

for (Map.Entry<Long, List<Tablet>> entry : beToTabletsGlobal.entrySet()) {
Expand All @@ -263,9 +281,20 @@ public void balanceAllTables() {
entry.getKey(), entry.getValue().size());
}

List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
// balance in partitions/index
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
balanceInTable(entry.getValue(), entry.getKey());
balanceInTable(entry.getValue(), entry.getKey(), infos);
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog table before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}

for (Map.Entry<Long, List<Tablet>> entry : beToTabletsGlobal.entrySet()) {
Expand All @@ -288,8 +317,19 @@ public void globalBalance() {
entry.getKey(), entry.getValue().size());
}

List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
balanceImpl(entry.getValue(), entry.getKey(), futureBeToTabletsGlobal, BalanceType.GLOBAL);
balanceImpl(entry.getValue(), entry.getKey(), futureBeToTabletsGlobal, BalanceType.GLOBAL, infos);
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog global before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}

for (Map.Entry<Long, List<Tablet>> entry : beToTabletsGlobal.entrySet()) {
Expand All @@ -310,6 +350,7 @@ public void checkInflghtWarmUpCacheAsync() {
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId());
}

List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size());
Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey());
Expand All @@ -335,11 +376,21 @@ public void checkInflghtWarmUpCacheAsync() {
if (!result.getValue()) {
LOG.info("{} pre cache timeout, forced to change the mapping", result.getKey());
}
updateClusterToBeMap(task.pickedTablet, task.destBe, task.clusterId);
updateClusterToBeMap(task.pickedTablet, task.destBe, task.clusterId, infos);
tabletToInfightTask.remove(result.getKey());
}
}
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog warmup before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
// edit log failed, try next time
return;
}

// recalculate inflight beToTablets, just for print the log
beToTabletIds = new HashMap<Long, List<Long>>();
Expand Down Expand Up @@ -550,22 +601,22 @@ public void loopCloudReplica(Operator operator) {
}
}

public void balanceInPartition(List<Long> bes, String clusterId) {
public void balanceInPartition(List<Long> bes, String clusterId, List<UpdateCloudReplicaInfo> infos) {
// balance all partition
for (Map.Entry<Long, Map<Long, Map<Long, List<Tablet>>>> partitionEntry : futurePartitionToTablets.entrySet()) {
Map<Long, Map<Long, List<Tablet>>> indexToTablets = partitionEntry.getValue();
// balance all index of a partition
for (Map.Entry<Long, Map<Long, List<Tablet>>> entry : indexToTablets.entrySet()) {
// balance a index
balanceImpl(bes, clusterId, entry.getValue(), BalanceType.PARTITION);
balanceImpl(bes, clusterId, entry.getValue(), BalanceType.PARTITION, infos);
}
}
}

public void balanceInTable(List<Long> bes, String clusterId) {
public void balanceInTable(List<Long> bes, String clusterId, List<UpdateCloudReplicaInfo> infos) {
// balance all tables
for (Map.Entry<Long, Map<Long, List<Tablet>>> entry : futureBeToTabletsInTable.entrySet()) {
balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE);
balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE, infos);
}
}

Expand Down Expand Up @@ -641,7 +692,8 @@ private void updateBeToTablets(Tablet pickedTablet, long srcBe, long destBe, Bal
partToTablets);
}

private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clusterId) {
private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clusterId,
List<UpdateCloudReplicaInfo> infos) {
CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0);
cloudReplica.updateClusterToBe(clusterId, destBe);
Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId());
Expand All @@ -663,7 +715,7 @@ private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clust
UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(),
cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(),
pickedTablet.getId(), cloudReplica.getId(), clusterId, destBe);
Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info);
infos.add(info);
} finally {
table.readUnlock();
}
Expand Down Expand Up @@ -765,7 +817,7 @@ private boolean isConflict(long srcBe, long destBe, CloudReplica cloudReplica, B
}

private void balanceImpl(List<Long> bes, String clusterId, Map<Long, List<Tablet>> beToTablets,
BalanceType balanceType) {
BalanceType balanceType, List<UpdateCloudReplicaInfo> infos) {
if (bes == null || bes.isEmpty() || beToTablets == null || beToTablets.isEmpty()) {
return;
}
Expand Down Expand Up @@ -852,7 +904,7 @@ private void balanceImpl(List<Long> bes, String clusterId, Map<Long, List<Tablet
beToTabletsInTable, partitionToTablets);
updateBeToTablets(pickedTablet, srcBe, destBe, balanceType,
futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
updateClusterToBeMap(pickedTablet, destBe, clusterId);
updateClusterToBeMap(pickedTablet, destBe, clusterId, infos);
}
}
}
Expand All @@ -869,17 +921,16 @@ private void migrateTablets(Long srcBe, Long dstBe) {
List<Tablet> tablets = new ArrayList<>();
if (!beToTabletsGlobal.containsKey(srcBe)) {
LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe);
// TODO(merge-cloud): wait add cloud upgrade mgr
// Env.getCurrentEnv().getCloudUpgradeMgr().setBeStateInactive(srcBe);
((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
return;
}
tablets = beToTabletsGlobal.get(srcBe);
if (tablets.isEmpty()) {
LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe);
// TODO(merge-cloud): wait add cloud upgrade mgr
// Env.getCurrentEnv().getCloudUpgradeMgr().setBeStateInactive(srcBe);
((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
return;
}
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
for (Tablet tablet : tablets) {
// get replica
CloudReplica cloudReplica = (CloudReplica) tablet.getReplicas().get(0);
Expand Down Expand Up @@ -915,11 +966,21 @@ private void migrateTablets(Long srcBe, Long dstBe) {
UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(),
cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(),
tablet.getId(), cloudReplica.getId(), clusterId, dstBe);
Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info);
infos.add(info);
} finally {
table.readUnlock();
}
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos);
LOG.info("collect to editlog migrate before size={} after size={} infos", oldSize, infos.size());
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("update cloud replicas failed", e);
// edit log failed, try next time
throw new RuntimeException(e);
}

try {
((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().registerWaterShedTxnId(srcBe);
Expand All @@ -928,5 +989,64 @@ private void migrateTablets(Long srcBe, Long dstBe) {
throw new RuntimeException(e);
}
}

private List<UpdateCloudReplicaInfo> batchUpdateCloudReplicaInfoEditlogs(List<UpdateCloudReplicaInfo> infos) {
long start = System.currentTimeMillis();
List<UpdateCloudReplicaInfo> rets = new ArrayList<>();
// clusterId, infos
Map<String, List<UpdateCloudReplicaInfo>> clusterIdToInfos = infos.stream()
.collect(Collectors.groupingBy(UpdateCloudReplicaInfo::getClusterId));
for (Map.Entry<String, List<UpdateCloudReplicaInfo>> entry : clusterIdToInfos.entrySet()) {
// same cluster
String clusterId = entry.getKey();
List<UpdateCloudReplicaInfo> infoList = entry.getValue();
Map<Long, List<UpdateCloudReplicaInfo>> sameLocationInfos = infoList.stream()
.collect(Collectors.groupingBy(
info -> info.getDbId()
+ info.getTableId() + info.getPartitionId() + info.getIndexId()));
sameLocationInfos.forEach((location, locationInfos) -> {
UpdateCloudReplicaInfo newInfo = new UpdateCloudReplicaInfo();
long dbId = -1;
long tableId = -1;
long partitionId = -1;
long indexId = -1;
for (UpdateCloudReplicaInfo info : locationInfos) {
Preconditions.checkState(clusterId.equals(info.getClusterId()),
"impossible, cluster id not eq outer=" + clusterId + ", inner=" + info.getClusterId());

dbId = info.getDbId();
tableId = info.getTableId();
partitionId = info.getPartitionId();
indexId = info.getIndexId();

StringBuilder sb = new StringBuilder("impossible, some locations do not match location");
sb.append(", location=").append(location).append(", dbId=").append(dbId)
.append(", tableId=").append(tableId).append(", partitionId=").append(partitionId)
.append(", indexId=").append(indexId);
Preconditions.checkState(location == dbId + tableId + partitionId + indexId, sb.toString());

long tabletId = info.getTabletId();
long replicaId = info.getReplicaId();
long beId = info.getBeId();
newInfo.getTabletIds().add(tabletId);
newInfo.getReplicaIds().add(replicaId);
newInfo.getBeIds().add(beId);
}
newInfo.setDbId(dbId);
newInfo.setTableId(tableId);
newInfo.setPartitionId(partitionId);
newInfo.setIndexId(indexId);
newInfo.setClusterId(clusterId);
// APPR: in unprotectUpdateCloudReplica, use batch must set tabletId = -1
newInfo.setTabletId(-1);
rets.add(newInfo);
});
}
if (LOG.isDebugEnabled()) {
LOG.debug("batchUpdateCloudReplicaInfoEditlogs old size {}, cur size {} cost {} ms",
infos.size(), rets.size(), System.currentTimeMillis() - start);
}
return rets;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,12 @@ private void unprotectUpdateCloudReplica(OlapTable olapTable, UpdateCloudReplica
List<Long> tabletIds = info.getTabletIds();
for (int i = 0; i < tabletIds.size(); ++i) {
Tablet tablet = materializedIndex.getTablet(tabletIds.get(i));
Replica replica = tablet.getReplicas().get(0);
Replica replica;
if (info.getReplicaIds().isEmpty()) {
replica = tablet.getReplicas().get(0);
} else {
replica = tablet.getReplicaById(info.getReplicaIds().get(i));
}
Preconditions.checkNotNull(replica, info);

String clusterId = info.getClusterId();
Expand Down
Loading
Loading