diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowReplicaDistributionStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowReplicaDistributionStmt.java index 6d598be727ae59..58d2ac1052f653 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowReplicaDistributionStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowReplicaDistributionStmt.java @@ -82,6 +82,10 @@ public ShowResultSetMetaData getMetaData() { @Override public RedirectStatus getRedirectStatus() { - return RedirectStatus.FORWARD_NO_SYNC; + if (ConnectContext.get().getSessionVariable().getForwardToMaster()) { + return RedirectStatus.FORWARD_NO_SYNC; + } else { + return RedirectStatus.NO_FORWARD; + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index d2e173dfd5347f..be0c510559eda2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -25,6 +25,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.Backend; @@ -221,7 +222,10 @@ private long getBackendIdImpl(String cluster) { return backendId; } } - + if (DebugPointUtil.isEnable("CloudReplica.getBackendIdImpl.clusterToBackends")) { + LOG.info("Debug Point enable CloudReplica.getBackendIdImpl.clusterToBackends"); + return -1; + } return hashReplicaToBe(clusterId, false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 30149de0d56c1f..73ddbe4c4551ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -45,6 +45,7 @@ import org.apache.doris.thrift.TWarmUpCacheAsyncRequest; import org.apache.doris.thrift.TWarmUpCacheAsyncResponse; +import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -57,6 +58,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; public class CloudTabletRebalancer extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(CloudTabletRebalancer.class); @@ -181,8 +183,13 @@ protected void runAfterCatalogReady() { // 2 complete route info replicaInfos = new ArrayList(); completeRouteInfo(); - for (UpdateCloudReplicaInfo info : replicaInfos) { - Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info); + LOG.info("collect to editlog route {} infos", replicaInfos.size()); + try { + Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(replicaInfos); + } catch (Exception e) { + LOG.warn("failed to update cloud replicas", e); + // edit log failed, try next time + return; } // 3 check whether the inflight preheating task has been completed @@ -238,9 +245,20 @@ public void balanceAllPartitions() { entry.getKey(), entry.getValue().size()); } + List infos = new ArrayList<>(); // balance in partitions/index for (Map.Entry> entry : clusterToBes.entrySet()) { - balanceInPartition(entry.getValue(), entry.getKey()); + balanceInPartition(entry.getValue(), entry.getKey(), infos); + } + long oldSize = infos.size(); + infos = batchUpdateCloudReplicaInfoEditlogs(infos); + LOG.info("collect to editlog partitions before size={} after size={} infos", oldSize, infos.size()); + try { + Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos); + } catch (Exception e) { + LOG.warn("failed to update cloud replicas", e); + // edit log failed, try next time + return; } for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { @@ -263,9 +281,20 @@ public void balanceAllTables() { entry.getKey(), entry.getValue().size()); } + List infos = new ArrayList<>(); // balance in partitions/index for (Map.Entry> entry : clusterToBes.entrySet()) { - balanceInTable(entry.getValue(), entry.getKey()); + balanceInTable(entry.getValue(), entry.getKey(), infos); + } + long oldSize = infos.size(); + infos = batchUpdateCloudReplicaInfoEditlogs(infos); + LOG.info("collect to editlog table before size={} after size={} infos", oldSize, infos.size()); + try { + Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos); + } catch (Exception e) { + LOG.warn("failed to update cloud replicas", e); + // edit log failed, try next time + return; } for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { @@ -288,8 +317,19 @@ public void globalBalance() { entry.getKey(), entry.getValue().size()); } + List infos = new ArrayList<>(); for (Map.Entry> entry : clusterToBes.entrySet()) { - balanceImpl(entry.getValue(), entry.getKey(), futureBeToTabletsGlobal, BalanceType.GLOBAL); + balanceImpl(entry.getValue(), entry.getKey(), futureBeToTabletsGlobal, BalanceType.GLOBAL, infos); + } + long oldSize = infos.size(); + infos = batchUpdateCloudReplicaInfoEditlogs(infos); + LOG.info("collect to editlog global before size={} after size={} infos", oldSize, infos.size()); + try { + Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos); + } catch (Exception e) { + LOG.warn("failed to update cloud replicas", e); + // edit log failed, try next time + return; } for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { @@ -310,6 +350,7 @@ public void checkInflghtWarmUpCacheAsync() { beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId()); } + List infos = new ArrayList<>(); for (Map.Entry> entry : beToTabletIds.entrySet()) { LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size()); Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey()); @@ -335,11 +376,21 @@ public void checkInflghtWarmUpCacheAsync() { if (!result.getValue()) { LOG.info("{} pre cache timeout, forced to change the mapping", result.getKey()); } - updateClusterToBeMap(task.pickedTablet, task.destBe, task.clusterId); + updateClusterToBeMap(task.pickedTablet, task.destBe, task.clusterId, infos); tabletToInfightTask.remove(result.getKey()); } } } + long oldSize = infos.size(); + infos = batchUpdateCloudReplicaInfoEditlogs(infos); + LOG.info("collect to editlog warmup before size={} after size={} infos", oldSize, infos.size()); + try { + Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos); + } catch (Exception e) { + LOG.warn("failed to update cloud replicas", e); + // edit log failed, try next time + return; + } // recalculate inflight beToTablets, just for print the log beToTabletIds = new HashMap>(); @@ -550,22 +601,22 @@ public void loopCloudReplica(Operator operator) { } } - public void balanceInPartition(List bes, String clusterId) { + public void balanceInPartition(List bes, String clusterId, List infos) { // balance all partition for (Map.Entry>>> partitionEntry : futurePartitionToTablets.entrySet()) { Map>> indexToTablets = partitionEntry.getValue(); // balance all index of a partition for (Map.Entry>> entry : indexToTablets.entrySet()) { // balance a index - balanceImpl(bes, clusterId, entry.getValue(), BalanceType.PARTITION); + balanceImpl(bes, clusterId, entry.getValue(), BalanceType.PARTITION, infos); } } } - public void balanceInTable(List bes, String clusterId) { + public void balanceInTable(List bes, String clusterId, List infos) { // balance all tables for (Map.Entry>> entry : futureBeToTabletsInTable.entrySet()) { - balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE); + balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE, infos); } } @@ -641,7 +692,8 @@ private void updateBeToTablets(Tablet pickedTablet, long srcBe, long destBe, Bal partToTablets); } - private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clusterId) { + private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clusterId, + List infos) { CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0); cloudReplica.updateClusterToBe(clusterId, destBe); Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId()); @@ -663,7 +715,7 @@ private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clust UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(), cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(), pickedTablet.getId(), cloudReplica.getId(), clusterId, destBe); - Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info); + infos.add(info); } finally { table.readUnlock(); } @@ -765,7 +817,7 @@ private boolean isConflict(long srcBe, long destBe, CloudReplica cloudReplica, B } private void balanceImpl(List bes, String clusterId, Map> beToTablets, - BalanceType balanceType) { + BalanceType balanceType, List infos) { if (bes == null || bes.isEmpty() || beToTablets == null || beToTablets.isEmpty()) { return; } @@ -852,7 +904,7 @@ private void balanceImpl(List bes, String clusterId, Map tablets = new ArrayList<>(); if (!beToTabletsGlobal.containsKey(srcBe)) { LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe); - // TODO(merge-cloud): wait add cloud upgrade mgr - // Env.getCurrentEnv().getCloudUpgradeMgr().setBeStateInactive(srcBe); + ((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe); return; } tablets = beToTabletsGlobal.get(srcBe); if (tablets.isEmpty()) { LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe); - // TODO(merge-cloud): wait add cloud upgrade mgr - // Env.getCurrentEnv().getCloudUpgradeMgr().setBeStateInactive(srcBe); + ((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe); return; } + List infos = new ArrayList<>(); for (Tablet tablet : tablets) { // get replica CloudReplica cloudReplica = (CloudReplica) tablet.getReplicas().get(0); @@ -915,11 +966,21 @@ private void migrateTablets(Long srcBe, Long dstBe) { UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(), cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(), tablet.getId(), cloudReplica.getId(), clusterId, dstBe); - Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info); + infos.add(info); } finally { table.readUnlock(); } } + long oldSize = infos.size(); + infos = batchUpdateCloudReplicaInfoEditlogs(infos); + LOG.info("collect to editlog migrate before size={} after size={} infos", oldSize, infos.size()); + try { + Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos); + } catch (Exception e) { + LOG.warn("update cloud replicas failed", e); + // edit log failed, try next time + throw new RuntimeException(e); + } try { ((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().registerWaterShedTxnId(srcBe); @@ -928,5 +989,64 @@ private void migrateTablets(Long srcBe, Long dstBe) { throw new RuntimeException(e); } } + + private List batchUpdateCloudReplicaInfoEditlogs(List infos) { + long start = System.currentTimeMillis(); + List rets = new ArrayList<>(); + // clusterId, infos + Map> clusterIdToInfos = infos.stream() + .collect(Collectors.groupingBy(UpdateCloudReplicaInfo::getClusterId)); + for (Map.Entry> entry : clusterIdToInfos.entrySet()) { + // same cluster + String clusterId = entry.getKey(); + List infoList = entry.getValue(); + Map> sameLocationInfos = infoList.stream() + .collect(Collectors.groupingBy( + info -> info.getDbId() + + info.getTableId() + info.getPartitionId() + info.getIndexId())); + sameLocationInfos.forEach((location, locationInfos) -> { + UpdateCloudReplicaInfo newInfo = new UpdateCloudReplicaInfo(); + long dbId = -1; + long tableId = -1; + long partitionId = -1; + long indexId = -1; + for (UpdateCloudReplicaInfo info : locationInfos) { + Preconditions.checkState(clusterId.equals(info.getClusterId()), + "impossible, cluster id not eq outer=" + clusterId + ", inner=" + info.getClusterId()); + + dbId = info.getDbId(); + tableId = info.getTableId(); + partitionId = info.getPartitionId(); + indexId = info.getIndexId(); + + StringBuilder sb = new StringBuilder("impossible, some locations do not match location"); + sb.append(", location=").append(location).append(", dbId=").append(dbId) + .append(", tableId=").append(tableId).append(", partitionId=").append(partitionId) + .append(", indexId=").append(indexId); + Preconditions.checkState(location == dbId + tableId + partitionId + indexId, sb.toString()); + + long tabletId = info.getTabletId(); + long replicaId = info.getReplicaId(); + long beId = info.getBeId(); + newInfo.getTabletIds().add(tabletId); + newInfo.getReplicaIds().add(replicaId); + newInfo.getBeIds().add(beId); + } + newInfo.setDbId(dbId); + newInfo.setTableId(tableId); + newInfo.setPartitionId(partitionId); + newInfo.setIndexId(indexId); + newInfo.setClusterId(clusterId); + // APPR: in unprotectUpdateCloudReplica, use batch must set tabletId = -1 + newInfo.setTabletId(-1); + rets.add(newInfo); + }); + } + if (LOG.isDebugEnabled()) { + LOG.debug("batchUpdateCloudReplicaInfoEditlogs old size {}, cur size {} cost {} ms", + infos.size(), rets.size(), System.currentTimeMillis() - start); + } + return rets; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index 721ec7899f41f7..20b1e17f5291e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -939,7 +939,12 @@ private void unprotectUpdateCloudReplica(OlapTable olapTable, UpdateCloudReplica List tabletIds = info.getTabletIds(); for (int i = 0; i < tabletIds.size(); ++i) { Tablet tablet = materializedIndex.getTablet(tabletIds.get(i)); - Replica replica = tablet.getReplicas().get(0); + Replica replica; + if (info.getReplicaIds().isEmpty()) { + replica = tablet.getReplicas().get(0); + } else { + replica = tablet.getReplicaById(info.getReplicaIds().get(i)); + } Preconditions.checkNotNull(replica, info); String clusterId = info.getClusterId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java index 1ff2912a3971df..c5d6fc0cd6466a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java @@ -17,11 +17,14 @@ package org.apache.doris.cloud.persist; + import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; import com.google.gson.annotations.SerializedName; +import lombok.Getter; +import lombok.Setter; import java.io.DataInput; import java.io.DataOutput; @@ -29,6 +32,8 @@ import java.util.ArrayList; import java.util.List; +@Getter +@Setter public class UpdateCloudReplicaInfo implements Writable { @SerializedName(value = "dbId") private long dbId; @@ -54,6 +59,9 @@ public class UpdateCloudReplicaInfo implements Writable { @SerializedName(value = "beIds") private List beIds = new ArrayList(); + @SerializedName(value = "rids") + private List replicaIds = new ArrayList<>(); + public UpdateCloudReplicaInfo() { } @@ -97,46 +105,6 @@ public static UpdateCloudReplicaInfo read(DataInput in) throws IOException { return GsonUtils.GSON.fromJson(json, UpdateCloudReplicaInfo.class); } - public long getDbId() { - return dbId; - } - - public long getTableId() { - return tableId; - } - - public long getPartitionId() { - return partitionId; - } - - public long getIndexId() { - return indexId; - } - - public long getTabletId() { - return tabletId; - } - - public long getReplicaId() { - return replicaId; - } - - public String getClusterId() { - return clusterId; - } - - public long getBeId() { - return beId; - } - - public List getBeIds() { - return beIds; - } - - public List getTabletIds() { - return tabletIds; - } - public String toString() { StringBuilder sb = new StringBuilder(); sb.append("database id: ").append(dbId); @@ -159,6 +127,11 @@ public String toString() { for (long id : tabletIds) { sb.append(" ").append(id); } + + sb.append(" replica id list: "); + for (long id : replicaIds) { + sb.append(" ").append(id); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java index 12d62b687172e4..e56bf34dfe57bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java @@ -21,6 +21,8 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.persist.OperationType; +import lombok.Getter; + import java.io.IOException; import java.util.ArrayList; @@ -29,6 +31,9 @@ public class JournalBatch { private ArrayList entities; + @Getter + private long size = 0; + public JournalBatch() { entities = new ArrayList<>(); } @@ -56,6 +61,7 @@ public void addJournal(short op, Writable data) throws IOException { DataOutputBuffer buffer = new DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE); entity.write(buffer); + size += buffer.size(); entities.add(new Entity(op, buffer)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index eb26bbc04f02ac..c3a326f6d484cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -59,6 +59,7 @@ import org.apache.doris.insertoverwrite.InsertOverwriteLog; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.journal.Journal; +import org.apache.doris.journal.JournalBatch; import org.apache.doris.journal.JournalCursor; import org.apache.doris.journal.JournalEntity; import org.apache.doris.journal.bdbje.BDBJEJournal; @@ -1264,6 +1265,21 @@ public void rollEditLog() { journal.rollJournal(); } + private synchronized void logEdit(short op, List entries) throws IOException { + JournalBatch batch = new JournalBatch(35); + for (T entry : entries) { + // the number of batch entities to less than 32 and the batch data size to less than 640KB + if (batch.getJournalEntities().size() >= 32 || batch.getSize() >= 640 * 1024) { + journal.write(batch); + batch = new JournalBatch(35); + } + batch.addJournal(op, entry); + } + if (!batch.getJournalEntities().isEmpty()) { + journal.write(batch); + } + } + /** * Write an operation to the edit log. Do not sync to persistent store yet. */ @@ -1576,8 +1592,12 @@ public void logExportCreate(ExportJob job) { logEdit(OperationType.OP_EXPORT_CREATE, job); } - public void logUpdateCloudReplica(UpdateCloudReplicaInfo info) { - logEdit(OperationType.OP_UPDATE_CLOUD_REPLICA, info); + public void logUpdateCloudReplicas(List infos) throws IOException { + long start = System.currentTimeMillis(); + logEdit(OperationType.OP_UPDATE_CLOUD_REPLICA, infos); + if (LOG.isDebugEnabled()) { + LOG.debug("log update {} cloud replicas. cost: {} ms", infos.size(), (System.currentTimeMillis() - start)); + } } public void logExportUpdateState(long jobId, ExportJobState newState) { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 8397a638c364da..a964996c2366b6 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -17,6 +17,8 @@ package org.apache.doris.regression.suite +import org.awaitility.Awaitility +import static java.util.concurrent.TimeUnit.SECONDS import groovy.json.JsonOutput import com.google.common.collect.Maps import com.google.common.util.concurrent.Futures @@ -260,6 +262,15 @@ class Suite implements GroovyInterceptable { return context.connect(user, password, url, actionSupplier) } + public void dockerAwaitUntil(int atMostSeconds, int intervalSecond = 1, Closure actionSupplier) { + def connInfo = context.threadLocalConn.get() + Awaitility.await().atMost(atMostSeconds, SECONDS).pollInterval(intervalSecond, SECONDS).until( + { + connect(connInfo.username, connInfo.password, connInfo.conn.getMetaData().getURL(), actionSupplier) + } + ) + } + public void docker(ClusterOptions options = new ClusterOptions(), Closure actionSupplier) throws Exception { if (context.config.excludeDockerTest) { return diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy new file mode 100644 index 00000000000000..e1735a4acd486d --- /dev/null +++ b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonSlurper +import org.awaitility.Awaitility; +import static java.util.concurrent.TimeUnit.SECONDS; + +suite('test_rebalance_in_cloud', 'multi_cluster') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'enable_cloud_warm_up_for_rebalance=false', + 'cloud_tablet_rebalancer_interval_second=1', + 'cloud_balance_tablet_percent_per_run=0.5', + 'cloud_pre_heating_time_limit_sec=1', + 'sys_log_verbose_modules=org', + ] + options.setFeNum(3) + options.setBeNum(1) + options.cloudMode = true + options.connectToFollower = true + options.enableDebugPoints() + + docker(options) { + sql """ + CREATE TABLE table100 ( + class INT, + id INT, + score INT SUM + ) + AGGREGATE KEY(class, id) + DISTRIBUTED BY HASH(class) BUCKETS 48 + """ + + sql """ + CREATE TABLE table_p2 ( k1 int(11) NOT NULL, k2 varchar(20) NOT NULL, k3 int sum NOT NULL ) + AGGREGATE KEY(k1, k2) + PARTITION BY RANGE(k1) ( + PARTITION p1992 VALUES [("-2147483648"), ("19930101")), + PARTITION p1993 VALUES [("19930101"), ("19940101")), + PARTITION p1994 VALUES [("19940101"), ("19950101")), + PARTITION p1995 VALUES [("19950101"), ("19960101")), + PARTITION p1996 VALUES [("19960101"), ("19970101")), + PARTITION p1997 VALUES [("19970101"), ("19980101")), + PARTITION p1998 VALUES [("19980101"), ("19990101"))) + DISTRIBUTED BY HASH(k1) BUCKETS 3 + """ + GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.clusterToBackends"); + sql """set global forward_to_master=false""" + + // add a be + cluster.addBackend(1, null) + + dockerAwaitUntil(30) { + def bes = sql """show backends""" + log.info("bes: {}", bes) + bes.size() == 2 + } + + dockerAwaitUntil(5) { + def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100""" + log.info("replica distribution table100: {}", ret) + ret.size() == 2 + } + + def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM table100; """ + assertEquals(2, result.size()) + int replicaNum = 0 + + for (def row : result) { + log.info("replica distribution: ${row} ".toString()) + replicaNum = Integer.valueOf((String) row.ReplicaNum) + if (replicaNum == 0) { + // due to debug point, observer not hash replica + } else { + assertTrue(replicaNum <= 25 && replicaNum >= 23) + } + } + + dockerAwaitUntil(5) { + def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1992)""" + log.info("replica distribution table_p2: {}", ret) + ret.size() == 2 + } + + + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1992) """ + assertEquals(2, result.size()) + for (def row : result) { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + log.info("replica distribution: ${row} ".toString()) + if (replicaNum != 0) { + assertTrue(replicaNum <= 2 && replicaNum >= 1) + } + } + + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1993) """ + assertEquals(2, result.size()) + for (def row : result) { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + log.info("replica distribution: ${row} ".toString()) + if (replicaNum != 0) { + assertTrue(replicaNum <= 2 && replicaNum >= 1) + } + } + + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1994) """ + assertEquals(2, result.size()) + for (def row : result) { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + log.info("replica distribution: ${row} ".toString()) + if (replicaNum != 0) { + assertTrue(replicaNum <= 2 && replicaNum >= 1) + } + } + + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1995) """ + assertEquals(2, result.size()) + for (def row : result) { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + log.info("replica distribution: ${row} ".toString()) + if (replicaNum != 0) { + assertTrue(replicaNum <= 2 && replicaNum >= 1) + } + } + + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1996) """ + assertEquals(2, result.size()) + for (def row : result) { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + log.info("replica distribution: ${row} ".toString()) + if (replicaNum != 0) { + assertTrue(replicaNum <= 2 && replicaNum >= 1) + } + } + + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1997) """ + assertEquals(2, result.size()) + for (def row : result) { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + log.info("replica distribution: ${row} ".toString()) + if (replicaNum != 0) { + assertTrue(replicaNum <= 2 && replicaNum >= 1) + } + } + } +}