@@ -494,7 +494,10 @@ func buildBatchCopTasksForNonPartitionedTable(
494
494
balanceWithContinuity bool ,
495
495
balanceContinuousRegionCount int64 ) ([]* batchCopTask , error ) {
496
496
if config .GetGlobalConfig ().DisaggregatedTiFlash {
497
- return buildBatchCopTasksConsistentHash (ctx , bo , store , []* KeyRanges {ranges }, storeType , ttl )
497
+ if config .GetGlobalConfig ().UseAutoScaler {
498
+ return buildBatchCopTasksConsistentHash (ctx , bo , store , []* KeyRanges {ranges }, storeType , ttl )
499
+ }
500
+ return buildBatchCopTasksConsistentHashForPD (bo , store , []* KeyRanges {ranges }, storeType , ttl )
498
501
}
499
502
return buildBatchCopTasksCore (bo , store , []* KeyRanges {ranges }, storeType , isMPP , ttl , balanceWithContinuity , balanceContinuousRegionCount )
500
503
}
@@ -511,7 +514,12 @@ func buildBatchCopTasksForPartitionedTable(
511
514
balanceContinuousRegionCount int64 ,
512
515
partitionIDs []int64 ) (batchTasks []* batchCopTask , err error ) {
513
516
if config .GetGlobalConfig ().DisaggregatedTiFlash {
514
- batchTasks , err = buildBatchCopTasksConsistentHash (ctx , bo , store , rangesForEachPhysicalTable , storeType , ttl )
517
+ if config .GetGlobalConfig ().UseAutoScaler {
518
+ batchTasks , err = buildBatchCopTasksConsistentHash (ctx , bo , store , rangesForEachPhysicalTable , storeType , ttl )
519
+ } else {
520
+ // todo: remove this after AutoScaler is stable.
521
+ batchTasks , err = buildBatchCopTasksConsistentHashForPD (bo , store , rangesForEachPhysicalTable , storeType , ttl )
522
+ }
515
523
} else {
516
524
batchTasks , err = buildBatchCopTasksCore (bo , store , rangesForEachPhysicalTable , storeType , isMPP , ttl , balanceWithContinuity , balanceContinuousRegionCount )
517
525
}
@@ -1169,3 +1177,92 @@ func (b *batchCopIterator) handleCollectExecutionInfo(bo *Backoffer, resp *batch
1169
1177
}
1170
1178
resp .detail .CalleeAddress = task .storeAddr
1171
1179
}
1180
+
1181
+ // Only called when UseAutoScaler is false.
1182
+ func buildBatchCopTasksConsistentHashForPD (bo * backoff.Backoffer ,
1183
+ kvStore * kvStore ,
1184
+ rangesForEachPhysicalTable []* KeyRanges ,
1185
+ storeType kv.StoreType ,
1186
+ ttl time.Duration ) (res []* batchCopTask , err error ) {
1187
+ const cmdType = tikvrpc .CmdBatchCop
1188
+ var retryNum int
1189
+ cache := kvStore .GetRegionCache ()
1190
+
1191
+ for {
1192
+ retryNum ++
1193
+ var rangesLen int
1194
+ tasks := make ([]* copTask , 0 )
1195
+ regionIDs := make ([]tikv.RegionVerID , 0 )
1196
+
1197
+ for i , ranges := range rangesForEachPhysicalTable {
1198
+ rangesLen += ranges .Len ()
1199
+ locations , err := cache .SplitKeyRangesByLocations (bo , ranges , UnspecifiedLimit )
1200
+ if err != nil {
1201
+ return nil , errors .Trace (err )
1202
+ }
1203
+ for _ , lo := range locations {
1204
+ tasks = append (tasks , & copTask {
1205
+ region : lo .Location .Region ,
1206
+ ranges : lo .Ranges ,
1207
+ cmdType : cmdType ,
1208
+ storeType : storeType ,
1209
+ partitionIndex : int64 (i ),
1210
+ })
1211
+ regionIDs = append (regionIDs , lo .Location .Region )
1212
+ }
1213
+ }
1214
+
1215
+ stores , err := cache .GetTiFlashComputeStores (bo .TiKVBackoffer ())
1216
+ if err != nil {
1217
+ return nil , err
1218
+ }
1219
+ stores = filterAliveStores (bo .GetCtx (), stores , ttl , kvStore )
1220
+ if len (stores ) == 0 {
1221
+ return nil , errors .New ("tiflash_compute node is unavailable" )
1222
+ }
1223
+
1224
+ rpcCtxs , err := cache .GetTiFlashComputeRPCContextByConsistentHash (bo .TiKVBackoffer (), regionIDs , stores )
1225
+ if err != nil {
1226
+ return nil , err
1227
+ }
1228
+ if rpcCtxs == nil {
1229
+ logutil .BgLogger ().Info ("buildBatchCopTasksConsistentHash retry because rcpCtx is nil" , zap .Int ("retryNum" , retryNum ))
1230
+ err := bo .Backoff (tikv .BoTiFlashRPC (), errors .New ("Cannot find region with TiFlash peer" ))
1231
+ if err != nil {
1232
+ return nil , errors .Trace (err )
1233
+ }
1234
+ continue
1235
+ }
1236
+ if len (rpcCtxs ) != len (tasks ) {
1237
+ return nil , errors .Errorf ("length should be equal, len(rpcCtxs): %d, len(tasks): %d" , len (rpcCtxs ), len (tasks ))
1238
+ }
1239
+ taskMap := make (map [string ]* batchCopTask )
1240
+ for i , rpcCtx := range rpcCtxs {
1241
+ regionInfo := RegionInfo {
1242
+ // tasks and rpcCtxs are correspond to each other.
1243
+ Region : tasks [i ].region ,
1244
+ Meta : rpcCtx .Meta ,
1245
+ Ranges : tasks [i ].ranges ,
1246
+ AllStores : []uint64 {rpcCtx .Store .StoreID ()},
1247
+ PartitionIndex : tasks [i ].partitionIndex ,
1248
+ }
1249
+ if batchTask , ok := taskMap [rpcCtx .Addr ]; ok {
1250
+ batchTask .regionInfos = append (batchTask .regionInfos , regionInfo )
1251
+ } else {
1252
+ batchTask := & batchCopTask {
1253
+ storeAddr : rpcCtx .Addr ,
1254
+ cmdType : cmdType ,
1255
+ ctx : rpcCtx ,
1256
+ regionInfos : []RegionInfo {regionInfo },
1257
+ }
1258
+ taskMap [rpcCtx .Addr ] = batchTask
1259
+ res = append (res , batchTask )
1260
+ }
1261
+ }
1262
+ logutil .BgLogger ().Info ("buildBatchCopTasksConsistentHash done" , zap .Any ("len(tasks)" , len (taskMap )), zap .Any ("len(tiflash_compute)" , len (stores )))
1263
+ break
1264
+ }
1265
+
1266
+ failpointCheckForConsistentHash (res )
1267
+ return res , nil
1268
+ }
0 commit comments