diff --git a/config/config.go b/config/config.go index 68108267540b1..9030c4ac388f8 100644 --- a/config/config.go +++ b/config/config.go @@ -292,6 +292,8 @@ type Config struct { TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` // TiDBMaxReuseColumn indicates max cached column num TiDBMaxReuseColumn uint32 `toml:"tidb-max-reuse-column" json:"tidb-max-reuse-column"` + // TiDBEnableExitCheck indicates whether exit-checking in domain for background process + TiDBEnableExitCheck bool `toml:"tidb-enable-exit-check" json:"tidb-enable-exit-check"` } // UpdateTempStoragePath is to update the `TempStoragePath` if port/statusPort was changed @@ -1000,6 +1002,7 @@ var defaultConf = Config{ DisaggregatedTiFlash: false, TiDBMaxReuseChunk: 64, TiDBMaxReuseColumn: 256, + TiDBEnableExitCheck: false, } var ( diff --git a/domain/domain.go b/domain/domain.go index 977694d773998..2b6337d23cdec 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -116,16 +116,17 @@ type Domain struct { expensiveQueryHandle *expensivequery.Handle memoryUsageAlarmHandle *memoryusagealarm.Handle serverMemoryLimitHandle *servermemorylimit.Handle - wg util.WaitGroupWrapper - statsUpdating atomicutil.Int32 - cancel context.CancelFunc - indexUsageSyncLease time.Duration - dumpFileGcChecker *dumpFileGcChecker - planReplayerHandle *planReplayerHandle - expiredTimeStamp4PC types.Time - logBackupAdvancer *daemon.OwnerDaemon - historicalStatsWorker *HistoricalStatsWorker - ttlJobManager *ttlworker.JobManager + // TODO: use Run for each process in future pr + wg *util.WaitGroupEnhancedWrapper + statsUpdating atomicutil.Int32 + cancel context.CancelFunc + indexUsageSyncLease time.Duration + dumpFileGcChecker *dumpFileGcChecker + planReplayerHandle *planReplayerHandle + expiredTimeStamp4PC types.Time + logBackupAdvancer *daemon.OwnerDaemon + historicalStatsWorker *HistoricalStatsWorker + ttlJobManager *ttlworker.JobManager serverID uint64 serverIDSession *concurrency.Session @@ -918,7 +919,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio jobsIdsMap: make(map[int64]string), }, } - + do.wg = util.NewWaitGroupEnhancedWrapper("domain", do.exit, config.GetGlobalConfig().TiDBEnableExitCheck) do.SchemaValidator = NewSchemaValidator(ddlLease, do) do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit) do.memoryUsageAlarmHandle = memoryusagealarm.NewMemoryUsageAlarmHandle(do.exit) @@ -1065,7 +1066,7 @@ func (do *Domain) Init( // Local store needs to get the change information for every DDL state in each session. go do.loadSchemaInLoop(ctx, ddlLease) } - do.wg.Run(do.mdlCheckLoop) + do.wg.Run(do.mdlCheckLoop, "mdlCheckLoop") do.wg.Add(3) go do.topNSlowQueryLoop() go do.infoSyncerKeeper() @@ -1107,7 +1108,7 @@ func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { if err != nil { return err } - do.wg.Run(loop) + do.wg.Run(loop, "logBackupAdvancer") return nil } @@ -1921,7 +1922,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err do.ddl.RegisterStatsHandle(statsHandle) // Negative stats lease indicates that it is in test, it does not need update. if do.statsLease >= 0 { - do.wg.Run(do.loadStatsWorker) + do.wg.Run(do.loadStatsWorker, "loadStatsWorker") } owner := do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey) if do.indexUsageSyncLease > 0 { @@ -1932,9 +1933,9 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err return nil } do.SetStatsUpdating(true) - do.wg.Run(func() { do.updateStatsWorker(ctx, owner) }) - do.wg.Run(func() { do.autoAnalyzeWorker(owner) }) - do.wg.Run(func() { do.gcAnalyzeHistory(owner) }) + do.wg.Run(func() { do.updateStatsWorker(ctx, owner) }, "updateStatsWorker") + do.wg.Run(func() { do.autoAnalyzeWorker(owner) }, "autoAnalyzeWorker") + do.wg.Run(func() { do.gcAnalyzeHistory(owner) }, "gcAnalyzeHistory") return nil } @@ -1944,7 +1945,7 @@ func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) { for i, ctx := range ctxList { statsHandle.StatsLoad.SubCtxs[i] = ctx do.wg.Add(1) - go statsHandle.SubLoadWorker(ctx, do.exit, &do.wg) + go statsHandle.SubLoadWorker(ctx, do.exit, do.wg) } } @@ -2529,7 +2530,7 @@ func (do *Domain) StartTTLJobManager() { if err != nil { logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err)) } - }) + }, "ttlJobManager") } // TTLJobManager returns the ttl job manager on this domain diff --git a/statistics/handle/handle_hist.go b/statistics/handle/handle_hist.go index ddfd88dd32ffb..ad04d946e3f22 100644 --- a/statistics/handle/handle_hist.go +++ b/statistics/handle/handle_hist.go @@ -182,7 +182,7 @@ type StatsReaderContext struct { } // SubLoadWorker loads hist data for each column -func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupWrapper) { +func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) { readerCtx := &StatsReaderContext{} defer func() { exitWg.Done() diff --git a/util/wait_group_wrapper.go b/util/wait_group_wrapper.go index 3f3b139990b81..21f808934f8d7 100644 --- a/util/wait_group_wrapper.go +++ b/util/wait_group_wrapper.go @@ -19,7 +19,6 @@ import ( "time" "github.com/pingcap/tidb/util/logutil" - "go.uber.org/atomic" "go.uber.org/zap" ) @@ -27,42 +26,46 @@ import ( // if the `exited` signal is true by print them on log. type WaitGroupEnhancedWrapper struct { sync.WaitGroup - exited *atomic.Bool source string registerProcess sync.Map } // NewWaitGroupEnhancedWrapper returns WaitGroupEnhancedWrapper, the empty source indicates the unit test, then // the `checkUnExitedProcess` won't be executed. -func NewWaitGroupEnhancedWrapper(source string, exited *atomic.Bool) *WaitGroupEnhancedWrapper { +func NewWaitGroupEnhancedWrapper(source string, exit chan struct{}, exitedCheck bool) *WaitGroupEnhancedWrapper { wgew := &WaitGroupEnhancedWrapper{ - exited: exited, source: source, registerProcess: sync.Map{}, } - if len(source) > 0 { - go wgew.checkUnExitedProcess() + if exitedCheck { + wgew.Add(1) + go wgew.checkUnExitedProcess(exit) } return wgew } -func (w *WaitGroupEnhancedWrapper) checkUnExitedProcess() { - logutil.BgLogger().Info("waitGroupWrapper ready to check unexited process", zap.String("source", w.source)) - ticker := time.NewTimer(10 * time.Second) - defer ticker.Stop() - for { - <-ticker.C - continueCheck := w.check() - if !continueCheck { - return +func (w *WaitGroupEnhancedWrapper) checkUnExitedProcess(exit chan struct{}) { + defer func() { + logutil.BgLogger().Info("waitGroupWrapper exit-checking exited", zap.String("source", w.source)) + w.Done() + }() + logutil.BgLogger().Info("waitGroupWrapper enable exit-checking", zap.String("source", w.source)) + <-exit + logutil.BgLogger().Info("waitGroupWrapper start exit-checking", zap.String("source", w.source)) + if w.check() { + ticker := time.NewTimer(2 * time.Second) + defer ticker.Stop() + for { + <-ticker.C + continueCheck := w.check() + if !continueCheck { + return + } } } } func (w *WaitGroupEnhancedWrapper) check() bool { - if !w.exited.Load() { - return true - } unexitedProcess := make([]string, 0) w.registerProcess.Range(func(key, value any) bool { unexitedProcess = append(unexitedProcess, key.(string)) diff --git a/util/wait_group_wrapper_test.go b/util/wait_group_wrapper_test.go index 1ad9ada053ebb..2d06abdd689df 100644 --- a/util/wait_group_wrapper_test.go +++ b/util/wait_group_wrapper_test.go @@ -36,8 +36,7 @@ func TestWaitGroupWrapperRun(t *testing.T) { require.Equal(t, expect, val.Load()) val.Store(0) - exited := atomic.NewBool(false) - wg2 := NewWaitGroupEnhancedWrapper("", exited) + wg2 := NewWaitGroupEnhancedWrapper("", nil, false) for i := int32(0); i < expect; i++ { wg2.Run(func() { val.Inc() @@ -62,8 +61,7 @@ func TestWaitGroupWrapperRunWithRecover(t *testing.T) { require.Equal(t, expect, val.Load()) val.Store(0) - exited := atomic.NewBool(false) - wg2 := NewWaitGroupEnhancedWrapper("", exited) + wg2 := NewWaitGroupEnhancedWrapper("", nil, false) for i := int32(0); i < expect; i++ { wg2.RunWithRecover(func() { panic("test1") @@ -76,18 +74,15 @@ func TestWaitGroupWrapperRunWithRecover(t *testing.T) { } func TestWaitGroupWrapperCheck(t *testing.T) { - exited := atomic.NewBool(false) - wg := NewWaitGroupEnhancedWrapper("", exited) + exit := make(chan struct{}) + wg := NewWaitGroupEnhancedWrapper("", exit, false) quit := make(chan struct{}) wg.Run(func() { <-quit }, "test") - // directly skip check if exited is false - require.True(t, wg.check()) - // need continue check as existed unexited process - exited.Store(true) + close(exit) require.True(t, wg.check()) // no need to continue check as all process exited