diff --git a/config/config.go b/config/config.go index b0a7b4498a05b..a2230f5d73c35 100644 --- a/config/config.go +++ b/config/config.go @@ -295,6 +295,8 @@ type Config struct { TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"` IsTiFlashComputeFixedPool bool `toml:"is-tiflashcompute-fixed-pool" json:"is-tiflashcompute-fixed-pool"` AutoScalerClusterID string `toml:"autoscaler-cluster-id" json:"autoscaler-cluster-id"` + // todo: remove this after AutoScaler is stable. + UseAutoScaler bool `toml:"use-autoscaler" json:"use-autoscaler"` // TiDBMaxReuseChunk indicates max cached chunk num TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` @@ -1012,6 +1014,7 @@ var defaultConf = Config{ TiFlashComputeAutoScalerAddr: tiflashcompute.DefAWSAutoScalerAddr, IsTiFlashComputeFixedPool: false, AutoScalerClusterID: "", + UseAutoScaler: true, TiDBMaxReuseChunk: 64, TiDBMaxReuseColumn: 256, TiDBEnableExitCheck: false, @@ -1348,7 +1351,7 @@ func (c *Config) Valid() error { } // Check tiflash_compute topo fetch is valid. - if c.DisaggregatedTiFlash { + if c.DisaggregatedTiFlash && c.UseAutoScaler { if !tiflashcompute.IsValidAutoScalerConfig(c.TiFlashComputeAutoScalerType) { return fmt.Errorf("invalid AutoScaler type, expect %s, %s or %s, got %s", tiflashcompute.MockASStr, tiflashcompute.AWSASStr, tiflashcompute.GCPASStr, c.TiFlashComputeAutoScalerType) diff --git a/config/config.toml.example b/config/config.toml.example index d153f56fe7690..74cbc9413a496 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -129,6 +129,10 @@ autoscaler-addr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081 # Only meaningful when disaggregated-tiflash is true. autoscaler-cluster-id = "" +# use-autoscaler indicates whether use AutoScaler or PD for tiflash_compute nodes, only meaningful when disaggregated-tiflash is true. +# Will remove this after AutoScaler is stable. +use-autoscaler = true + [log] # Log level: debug, info, warn, error, fatal. level = "info" diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index f2bb7232bbbfa..bc759b5b27452 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1319,6 +1319,37 @@ func TestDisaggregatedTiFlash(t *testing.T) { require.Contains(t, err.Error(), "[util:1815]Internal : get tiflash_compute topology failed") } +// todo: remove this after AutoScaler is stable. +func TestDisaggregatedTiFlashNonAutoScaler(t *testing.T) { + config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = true + conf.UseAutoScaler = false + }) + defer config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = false + conf.UseAutoScaler = true + }) + + // Setting globalTopoFetcher to nil to can make sure cannot fetch topo from AutoScaler. + err := tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.InvalidASStr, "", "", false) + require.Contains(t, err.Error(), "unexpected topo fetch type. expect: mock or aws or gcp, got invalid") + + store := testkit.CreateMockStore(t, withMockTiFlash(2)) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(c1 int)") + tk.MustExec("alter table t set tiflash replica 1") + tb := external.GetTableByName(t, tk, "test", "t") + err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + require.NoError(t, err) + tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") + + err = tk.ExecToErr("select * from t;") + // This error message means we use PD instead of AutoScaler. + require.Contains(t, err.Error(), "tiflash_compute node is unavailable") +} + func TestDisaggregatedTiFlashQuery(t *testing.T) { config.UpdateGlobal(func(conf *config.Config) { conf.DisaggregatedTiFlash = true diff --git a/session/session.go b/session/session.go index 624315fe8fc72..5b7fa5a8875aa 100644 --- a/session/session.go +++ b/session/session.go @@ -3339,6 +3339,14 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { return nil, err } + if config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler { + // Invalid client-go tiflash_compute store cache if necessary. + err = dom.WatchTiFlashComputeNodeChange() + if err != nil { + return nil, err + } + } + if err = extensionimpl.Bootstrap(context.Background(), dom); err != nil { return nil, err } diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 568b4dc940c3c..99b7a807d52e2 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -494,7 +494,10 @@ func buildBatchCopTasksForNonPartitionedTable( balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - return buildBatchCopTasksConsistentHash(ctx, bo, store, []*KeyRanges{ranges}, storeType, ttl) + if config.GetGlobalConfig().UseAutoScaler { + return buildBatchCopTasksConsistentHash(ctx, bo, store, []*KeyRanges{ranges}, storeType, ttl) + } + return buildBatchCopTasksConsistentHashForPD(bo, store, []*KeyRanges{ranges}, storeType, ttl) } return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -511,7 +514,12 @@ func buildBatchCopTasksForPartitionedTable( balanceContinuousRegionCount int64, partitionIDs []int64) (batchTasks []*batchCopTask, err error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - batchTasks, err = buildBatchCopTasksConsistentHash(ctx, bo, store, rangesForEachPhysicalTable, storeType, ttl) + if config.GetGlobalConfig().UseAutoScaler { + batchTasks, err = buildBatchCopTasksConsistentHash(ctx, bo, store, rangesForEachPhysicalTable, storeType, ttl) + } else { + // todo: remove this after AutoScaler is stable. + batchTasks, err = buildBatchCopTasksConsistentHashForPD(bo, store, rangesForEachPhysicalTable, storeType, ttl) + } } else { batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -1169,3 +1177,92 @@ func (b *batchCopIterator) handleCollectExecutionInfo(bo *Backoffer, resp *batch } resp.detail.CalleeAddress = task.storeAddr } + +// Only called when UseAutoScaler is false. +func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer, + kvStore *kvStore, + rangesForEachPhysicalTable []*KeyRanges, + storeType kv.StoreType, + ttl time.Duration) (res []*batchCopTask, err error) { + const cmdType = tikvrpc.CmdBatchCop + var retryNum int + cache := kvStore.GetRegionCache() + + for { + retryNum++ + var rangesLen int + tasks := make([]*copTask, 0) + regionIDs := make([]tikv.RegionVerID, 0) + + for i, ranges := range rangesForEachPhysicalTable { + rangesLen += ranges.Len() + locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) + if err != nil { + return nil, errors.Trace(err) + } + for _, lo := range locations { + tasks = append(tasks, &copTask{ + region: lo.Location.Region, + ranges: lo.Ranges, + cmdType: cmdType, + storeType: storeType, + partitionIndex: int64(i), + }) + regionIDs = append(regionIDs, lo.Location.Region) + } + } + + stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer()) + if err != nil { + return nil, err + } + stores = filterAliveStores(bo.GetCtx(), stores, ttl, kvStore) + if len(stores) == 0 { + return nil, errors.New("tiflash_compute node is unavailable") + } + + rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores) + if err != nil { + return nil, err + } + if rpcCtxs == nil { + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum)) + err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) + if err != nil { + return nil, errors.Trace(err) + } + continue + } + if len(rpcCtxs) != len(tasks) { + return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks)) + } + taskMap := make(map[string]*batchCopTask) + for i, rpcCtx := range rpcCtxs { + regionInfo := RegionInfo{ + // tasks and rpcCtxs are correspond to each other. + Region: tasks[i].region, + Meta: rpcCtx.Meta, + Ranges: tasks[i].ranges, + AllStores: []uint64{rpcCtx.Store.StoreID()}, + PartitionIndex: tasks[i].partitionIndex, + } + if batchTask, ok := taskMap[rpcCtx.Addr]; ok { + batchTask.regionInfos = append(batchTask.regionInfos, regionInfo) + } else { + batchTask := &batchCopTask{ + storeAddr: rpcCtx.Addr, + cmdType: cmdType, + ctx: rpcCtx, + regionInfos: []RegionInfo{regionInfo}, + } + taskMap[rpcCtx.Addr] = batchTask + res = append(res, batchTask) + } + } + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores))) + break + } + + failpointCheckForConsistentHash(res) + return res, nil +} diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 49b44922aa2ce..03c09b37950a0 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -250,6 +250,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req var rpcResp *tikvrpc.Response var err error var retry bool + invalidPDCache := config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler // If copTasks is not empty, we should send request according to region distribution. // Or else it's the task without region, which always happens in high layer task without table. @@ -262,6 +263,9 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req // That's a hard job but we can try it in the future. if sender.GetRPCError() != nil { logutil.BgLogger().Warn("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId), zap.Int64("mpp-version", taskMeta.MppVersion)) + if invalidPDCache { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } // if needTriggerFallback is true, we return timeout to trigger tikv's fallback if m.needTriggerFallback { err = derr.ErrTiFlashServerTimeout @@ -274,6 +278,9 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled { retry = false } else if err != nil { + if invalidPDCache { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } if bo.Backoff(tikv.BoTiFlashRPC(), err) == nil { retry = true } @@ -351,7 +358,9 @@ func (m *mppIterator) cancelMppTasks() { } // send cancel cmd to all stores where tasks run + invalidPDCache := config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler wg := util.WaitGroupWrapper{} + gotErr := atomic.Bool{} for addr := range usedStoreAddrs { storeAddr := addr wg.Run(func() { @@ -359,10 +368,16 @@ func (m *mppIterator) cancelMppTasks() { logutil.BgLogger().Debug("cancel task", zap.Uint64("query id ", m.startTs), zap.String("on addr", storeAddr), zap.Int64("mpp-version", m.mppVersion.ToInt64())) if err != nil { logutil.BgLogger().Error("cancel task error", zap.Error(err), zap.Uint64("query id", m.startTs), zap.String("on addr", storeAddr), zap.Int64("mpp-version", m.mppVersion.ToInt64())) + if invalidPDCache { + gotErr.CompareAndSwap(false, true) + } } }) } wg.Wait() + if invalidPDCache && gotErr.Load() { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } } func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) { @@ -389,6 +404,9 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques if err != nil { logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId), zap.Int64("mpp-version", taskMeta.MppVersion)) + if config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } // if needTriggerFallback is true, we return timeout to trigger tikv's fallback if m.needTriggerFallback { m.sendError(derr.ErrTiFlashServerTimeout) diff --git a/tidb-server/main.go b/tidb-server/main.go index f3ca17cf7c4c6..3d73fa111ab8c 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -204,7 +204,7 @@ func main() { err := cpuprofile.StartCPUProfiler() terror.MustNil(err) - if config.GetGlobalConfig().DisaggregatedTiFlash { + if config.GetGlobalConfig().DisaggregatedTiFlash && config.GetGlobalConfig().UseAutoScaler { clusterID, err := config.GetAutoScalerClusterID() terror.MustNil(err) diff --git a/util/tiflashcompute/topo_fetcher.go b/util/tiflashcompute/topo_fetcher.go index f8d7e7b123e63..877d3a0519448 100644 --- a/util/tiflashcompute/topo_fetcher.go +++ b/util/tiflashcompute/topo_fetcher.go @@ -44,6 +44,8 @@ const ( GCPASStr = "gcp" // TestASStr is string value for test AutoScaler. TestASStr = "test" + // InvalidASStr is string value for invalid AutoScaler. + InvalidASStr = "invalid" ) const ( @@ -127,6 +129,7 @@ func InitGlobalTopoFetcher(typ string, addr string, clusterID string, isFixedPoo case TestASType: globalTopoFetcher = NewTestAutoScalerFetcher() default: + globalTopoFetcher = nil err = errors.Errorf("unexpected topo fetch type. expect: %s or %s or %s, got %s", MockASStr, AWSASStr, GCPASStr, typ) }