From 1256bbea2338d0bb97682a1f493b4c5c1fd7661b Mon Sep 17 00:00:00 2001 From: Hu# Date: Tue, 3 Sep 2024 13:38:29 +0800 Subject: [PATCH] *: Refactor runaway related code (#55202) ref pingcap/tidb#54434 --- pkg/ddl/BUILD.bazel | 2 +- pkg/ddl/executor.go | 2 +- pkg/ddl/resource_group.go | 2 +- pkg/distsql/BUILD.bazel | 2 +- pkg/distsql/context/BUILD.bazel | 2 +- pkg/distsql/context/context.go | 4 +- pkg/distsql/request_builder_test.go | 2 +- pkg/domain/BUILD.bazel | 5 +- pkg/domain/domain.go | 7 +- pkg/domain/infosync/BUILD.bazel | 2 +- pkg/domain/infosync/info.go | 2 +- .../infosync/resource_manager_client.go | 2 +- pkg/domain/resourcegroup/runaway.go | 671 ------------------ pkg/domain/ru_stats.go | 9 +- pkg/domain/runaway.go | 612 +--------------- pkg/executor/BUILD.bazel | 3 +- pkg/executor/infoschema_reader.go | 8 +- pkg/executor/internal/querywatch/BUILD.bazel | 3 +- .../internal/querywatch/query_watch.go | 20 +- pkg/executor/simple.go | 2 +- pkg/kv/BUILD.bazel | 2 +- pkg/kv/kv.go | 4 +- pkg/meta/BUILD.bazel | 2 +- pkg/meta/meta.go | 2 +- pkg/resourcegroup/BUILD.bazel | 12 + pkg/resourcegroup/checker.go | 40 ++ .../runaway}/BUILD.bazel | 25 +- pkg/resourcegroup/runaway/checker.go | 299 ++++++++ pkg/resourcegroup/runaway/manager.go | 440 ++++++++++++ pkg/resourcegroup/runaway/record.go | 407 +++++++++++ pkg/resourcegroup/runaway/syncer.go | 189 +++++ .../tests}/BUILD.bazel | 2 +- .../tests}/resource_group_test.go | 18 +- pkg/server/BUILD.bazel | 2 +- pkg/server/conn.go | 2 +- pkg/server/metrics/BUILD.bazel | 2 +- pkg/server/metrics/metrics.go | 2 +- pkg/server/server.go | 2 +- pkg/sessionctx/stmtctx/BUILD.bazel | 2 +- pkg/sessionctx/stmtctx/stmtctx.go | 4 +- pkg/sessionctx/variable/BUILD.bazel | 2 +- pkg/sessionctx/variable/session.go | 2 +- pkg/store/copr/BUILD.bazel | 2 +- pkg/store/copr/copr_test/BUILD.bazel | 2 +- pkg/store/copr/copr_test/coprocessor_test.go | 6 +- pkg/store/copr/coprocessor.go | 12 +- pkg/util/BUILD.bazel | 2 +- pkg/util/processinfo.go | 4 +- 48 files changed, 1506 insertions(+), 1346 deletions(-) delete mode 100644 pkg/domain/resourcegroup/runaway.go create mode 100644 pkg/resourcegroup/BUILD.bazel create mode 100644 pkg/resourcegroup/checker.go rename pkg/{domain/resourcegroup => resourcegroup/runaway}/BUILD.bazel (50%) create mode 100644 pkg/resourcegroup/runaway/checker.go create mode 100644 pkg/resourcegroup/runaway/manager.go create mode 100644 pkg/resourcegroup/runaway/record.go create mode 100644 pkg/resourcegroup/runaway/syncer.go rename pkg/{ddl/tests/resourcegroup => resourcegroup/tests}/BUILD.bazel (95%) rename pkg/{ddl/tests/resourcegroup => resourcegroup/tests}/resource_group_test.go (98%) diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index ca5cc149de3ef..b49e72df37bbf 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -93,7 +93,6 @@ go_library( "//pkg/disttask/framework/taskexecutor/execute", "//pkg/disttask/operator", "//pkg/domain/infosync", - "//pkg/domain/resourcegroup", "//pkg/errctx", "//pkg/expression", "//pkg/expression/context", @@ -119,6 +118,7 @@ go_library( "//pkg/parser/terror", "//pkg/parser/types", "//pkg/privilege", + "//pkg/resourcegroup", "//pkg/resourcemanager/pool/workerpool", "//pkg/resourcemanager/util", "//pkg/sessionctx", diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 475ebf94838c4..9c6599dc1fb67 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -36,7 +36,6 @@ import ( "github.com/pingcap/tidb/pkg/ddl/resourcegroup" sess "github.com/pingcap/tidb/pkg/ddl/session" ddlutil "github.com/pingcap/tidb/pkg/ddl/util" - rg "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/infoschema" @@ -50,6 +49,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/privilege" + rg "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/sessiontxn" diff --git a/pkg/ddl/resource_group.go b/pkg/ddl/resource_group.go index 4a4d6914e0ded..20849d6ca520e 100644 --- a/pkg/ddl/resource_group.go +++ b/pkg/ddl/resource_group.go @@ -25,11 +25,11 @@ import ( "github.com/pingcap/tidb/pkg/ddl/logutil" "github.com/pingcap/tidb/pkg/ddl/resourcegroup" "github.com/pingcap/tidb/pkg/domain/infosync" - rg "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" + rg "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/util/dbterror" kvutil "github.com/tikv/client-go/v2/util" "go.uber.org/zap" diff --git a/pkg/distsql/BUILD.bazel b/pkg/distsql/BUILD.bazel index fe5073763908d..8ea4d56d2b0c7 100644 --- a/pkg/distsql/BUILD.bazel +++ b/pkg/distsql/BUILD.bazel @@ -69,12 +69,12 @@ go_test( shard_count = 27, deps = [ "//pkg/distsql/context", - "//pkg/domain/resourcegroup", "//pkg/errctx", "//pkg/kv", "//pkg/parser/charset", "//pkg/parser/model", "//pkg/parser/mysql", + "//pkg/resourcegroup", "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", diff --git a/pkg/distsql/context/BUILD.bazel b/pkg/distsql/context/BUILD.bazel index 2034ee3754381..fbe8a3a94fc74 100644 --- a/pkg/distsql/context/BUILD.bazel +++ b/pkg/distsql/context/BUILD.bazel @@ -6,10 +6,10 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/distsql/context", visibility = ["//visibility:public"], deps = [ - "//pkg/domain/resourcegroup", "//pkg/errctx", "//pkg/kv", "//pkg/parser/mysql", + "//pkg/resourcegroup", "//pkg/util/context", "//pkg/util/execdetails", "//pkg/util/memory", diff --git a/pkg/distsql/context/context.go b/pkg/distsql/context/context.go index e113bc2024f04..38a0a83973eeb 100644 --- a/pkg/distsql/context/context.go +++ b/pkg/distsql/context/context.go @@ -17,10 +17,10 @@ package context import ( "time" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/resourcegroup" contextutil "github.com/pingcap/tidb/pkg/util/context" "github.com/pingcap/tidb/pkg/util/execdetails" "github.com/pingcap/tidb/pkg/util/memory" @@ -75,7 +75,7 @@ type DistSQLContext struct { StoreBatchSize int ResourceGroupName string LoadBasedReplicaReadThreshold time.Duration - RunawayChecker *resourcegroup.RunawayChecker + RunawayChecker resourcegroup.RunawayChecker TiKVClientReadTimeout uint64 ReplicaClosestReadThreshold int64 diff --git a/pkg/distsql/request_builder_test.go b/pkg/distsql/request_builder_test.go index f9753fef54a66..05b5672fff475 100644 --- a/pkg/distsql/request_builder_test.go +++ b/pkg/distsql/request_builder_test.go @@ -19,9 +19,9 @@ import ( "testing" "time" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/types" diff --git a/pkg/domain/BUILD.bazel b/pkg/domain/BUILD.bazel index d369511566fd9..a2a9ecc8950ac 100644 --- a/pkg/domain/BUILD.bazel +++ b/pkg/domain/BUILD.bazel @@ -37,7 +37,6 @@ go_library( "//pkg/domain/globalconfigsync", "//pkg/domain/infosync", "//pkg/domain/metrics", - "//pkg/domain/resourcegroup", "//pkg/errno", "//pkg/infoschema", "//pkg/infoschema/metrics", @@ -55,6 +54,7 @@ go_library( "//pkg/parser/terror", "//pkg/planner/core/metrics", "//pkg/privilege/privileges", + "//pkg/resourcegroup/runaway", "//pkg/sessionctx", "//pkg/sessionctx/sessionstates", "//pkg/sessionctx/sysproctrack", @@ -66,8 +66,6 @@ go_library( "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/util", "//pkg/store/helper", - "//pkg/ttl/cache", - "//pkg/ttl/sqlbuilder", "//pkg/ttl/ttlworker", "//pkg/types", "//pkg/util", @@ -99,7 +97,6 @@ go_library( "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/pdpb", - "@com_github_pingcap_kvproto//pkg/resource_manager", "@com_github_pingcap_log//:log", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 239bab3ff1be8..73752d559960b 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -46,7 +46,6 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" "github.com/pingcap/tidb/pkg/domain/globalconfigsync" "github.com/pingcap/tidb/pkg/domain/infosync" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/infoschema" infoschema_metrics "github.com/pingcap/tidb/pkg/infoschema/metrics" @@ -64,6 +63,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/terror" metrics2 "github.com/pingcap/tidb/pkg/planner/core/metrics" "github.com/pingcap/tidb/pkg/privilege/privileges" + "github.com/pingcap/tidb/pkg/resourcegroup/runaway" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/sessionstates" "github.com/pingcap/tidb/pkg/sessionctx/sysproctrack" @@ -194,8 +194,7 @@ type Domain struct { logBackupAdvancer *daemon.OwnerDaemon historicalStatsWorker *HistoricalStatsWorker ttlJobManager atomic.Pointer[ttlworker.JobManager] - runawayManager *resourcegroup.RunawayManager - runawaySyncer *runawaySyncer + runawayManager *runaway.Manager resourceGroupsController *rmclient.ResourceGroupsController serverID uint64 @@ -2073,7 +2072,7 @@ func (do *Domain) SetupPlanReplayerHandle(collectorSctx sessionctx.Context, work } // RunawayManager returns the runaway manager. -func (do *Domain) RunawayManager() *resourcegroup.RunawayManager { +func (do *Domain) RunawayManager() *runaway.Manager { return do.runawayManager } diff --git a/pkg/domain/infosync/BUILD.bazel b/pkg/domain/infosync/BUILD.bazel index 6dae000bc368b..8cd43a9dd6567 100644 --- a/pkg/domain/infosync/BUILD.bazel +++ b/pkg/domain/infosync/BUILD.bazel @@ -20,13 +20,13 @@ go_library( "//pkg/ddl/label", "//pkg/ddl/placement", "//pkg/ddl/util", - "//pkg/domain/resourcegroup", "//pkg/errno", "//pkg/kv", "//pkg/metrics", "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/parser/terror", + "//pkg/resourcegroup", "//pkg/session/cursor", "//pkg/sessionctx/binloginfo", "//pkg/sessionctx/variable", diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index 1e5e0cb51e6e6..06dd1703ad12a 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -37,13 +37,13 @@ import ( "github.com/pingcap/tidb/pkg/ddl/label" "github.com/pingcap/tidb/pkg/ddl/placement" "github.com/pingcap/tidb/pkg/ddl/util" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/session/cursor" "github.com/pingcap/tidb/pkg/sessionctx/binloginfo" "github.com/pingcap/tidb/pkg/sessionctx/variable" diff --git a/pkg/domain/infosync/resource_manager_client.go b/pkg/domain/infosync/resource_manager_client.go index c9afef835730d..4af1f78e61299 100644 --- a/pkg/domain/infosync/resource_manager_client.go +++ b/pkg/domain/infosync/resource_manager_client.go @@ -24,7 +24,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/pingcap/kvproto/pkg/meta_storagepb" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" + "github.com/pingcap/tidb/pkg/resourcegroup" pd "github.com/tikv/pd/client" ) diff --git a/pkg/domain/resourcegroup/runaway.go b/pkg/domain/resourcegroup/runaway.go deleted file mode 100644 index b7260bd2725c0..0000000000000 --- a/pkg/domain/resourcegroup/runaway.go +++ /dev/null @@ -1,671 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package resourcegroup - -import ( - "context" - "fmt" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/jellydator/ttlcache/v3" - rmpb "github.com/pingcap/kvproto/pkg/resource_manager" - "github.com/pingcap/tidb/pkg/metrics" - "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" - "github.com/pingcap/tidb/pkg/util/generic" - "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/prometheus/client_golang/prometheus" - "github.com/tikv/client-go/v2/tikv" - "github.com/tikv/client-go/v2/tikvrpc" - rmclient "github.com/tikv/pd/client/resource_group/controller" - "go.uber.org/zap" -) - -const ( - // DefaultResourceGroupName is the default resource group name. - DefaultResourceGroupName = "default" - // ManualSource shows the item added manually. - ManualSource = "manual" - // RunawayWatchTableName is the name of system table which save runaway watch items. - RunawayWatchTableName = "mysql.tidb_runaway_watch" - // RunawayWatchDoneTableName is the name of system table which save done runaway watch items. - RunawayWatchDoneTableName = "mysql.tidb_runaway_watch_done" - - // MaxWaitDuration is the max duration to wait for acquiring token buckets. - MaxWaitDuration = time.Second * 30 - maxWatchListCap = 10000 - maxWatchRecordChannelSize = 1024 -) - -// NullTime is a zero time.Time. -var NullTime time.Time - -// RunawayRecord is used to save records which will be insert into mysql.tidb_runaway_queries. -type RunawayRecord struct { - ResourceGroupName string - Time time.Time - Match string - Action string - SQLText string - PlanDigest string - Source string -} - -// GenRunawayQueriesStmt generates statement with given RunawayRecords. -func GenRunawayQueriesStmt(records []*RunawayRecord) (string, []any) { - var builder strings.Builder - params := make([]any, 0, len(records)*7) - builder.WriteString("insert into mysql.tidb_runaway_queries VALUES ") - for count, r := range records { - if count > 0 { - builder.WriteByte(',') - } - builder.WriteString("(%?, %?, %?, %?, %?, %?, %?)") - params = append(params, r.ResourceGroupName) - params = append(params, r.Time) - params = append(params, r.Match) - params = append(params, r.Action) - params = append(params, r.SQLText) - params = append(params, r.PlanDigest) - params = append(params, r.Source) - } - return builder.String(), params -} - -// QuarantineRecord is used to save records which will be insert into mysql.tidb_runaway_watch. -type QuarantineRecord struct { - ID int64 - ResourceGroupName string - // startTime and endTime are in UTC. - StartTime time.Time - EndTime time.Time - Watch rmpb.RunawayWatchType - WatchText string - Source string - Action rmpb.RunawayAction -} - -// GetRecordKey is used to get the key in ttl cache. -func (r *QuarantineRecord) GetRecordKey() string { - return r.ResourceGroupName + "/" + r.WatchText -} - -func writeInsert(builder *strings.Builder, tableName string) { - builder.WriteString("insert into ") - builder.WriteString(tableName) - builder.WriteString(" VALUES ") -} - -// GenInsertionStmt is used to generate insertion sql. -func (r *QuarantineRecord) GenInsertionStmt() (string, []any) { - var builder strings.Builder - params := make([]any, 0, 6) - writeInsert(&builder, RunawayWatchTableName) - builder.WriteString("(null, %?, %?, %?, %?, %?, %?, %?)") - params = append(params, r.ResourceGroupName) - params = append(params, r.StartTime) - if r.EndTime.Equal(NullTime) { - params = append(params, nil) - } else { - params = append(params, r.EndTime) - } - params = append(params, r.Watch) - params = append(params, r.WatchText) - params = append(params, r.Source) - params = append(params, r.Action) - return builder.String(), params -} - -// GenInsertionDoneStmt is used to generate insertion sql for runaway watch done record. -func (r *QuarantineRecord) GenInsertionDoneStmt() (string, []any) { - var builder strings.Builder - params := make([]any, 0, 9) - writeInsert(&builder, RunawayWatchDoneTableName) - builder.WriteString("(null, %?, %?, %?, %?, %?, %?, %?, %?, %?)") - params = append(params, r.ID) - params = append(params, r.ResourceGroupName) - params = append(params, r.StartTime) - if r.EndTime.Equal(NullTime) { - params = append(params, nil) - } else { - params = append(params, r.EndTime) - } - params = append(params, r.Watch) - params = append(params, r.WatchText) - params = append(params, r.Source) - params = append(params, r.Action) - params = append(params, time.Now().UTC()) - return builder.String(), params -} - -// GenDeletionStmt is used to generate deletion sql. -func (r *QuarantineRecord) GenDeletionStmt() (string, []any) { - var builder strings.Builder - params := make([]any, 0, 1) - builder.WriteString("delete from ") - builder.WriteString(RunawayWatchTableName) - builder.WriteString(" where id = %?") - params = append(params, r.ID) - return builder.String(), params -} - -// RunawayManager is used to detect and record runaway queries. -type RunawayManager struct { - syncerInitialized atomic.Bool - logOnce sync.Once - - // queryLock is used to avoid repeated additions. Since we will add new items to the system table, - // in order to avoid repeated additions, we need a lock to ensure that - // action "judging whether there is this record in the current watch list and adding records" have atomicity. - queryLock sync.Mutex - watchList *ttlcache.Cache[string, *QuarantineRecord] - // activeGroup is used to manage the active runaway watches of resource group - activeGroup map[string]int64 - activeLock sync.RWMutex - metricsMap generic.SyncMap[string, prometheus.Counter] - - resourceGroupCtl *rmclient.ResourceGroupsController - serverID string - runawayQueriesChan chan *RunawayRecord - quarantineChan chan *QuarantineRecord - // staleQuarantineRecord is used to clean outdated record. There are three scenarios: - // 1. Record is expired in watch list. - // 2. The record that will be added is itself out of date. - // Like that tidb cluster is paused, and record is expired when restarting. - // 3. Duplicate added records. - // It replaces clean up loop. - staleQuarantineRecord chan *QuarantineRecord - evictionCancel func() - insertionCancel func() -} - -// NewRunawayManager creates a new RunawayManager. -func NewRunawayManager(resourceGroupCtl *rmclient.ResourceGroupsController, serverAddr string) *RunawayManager { - watchList := ttlcache.New[string, *QuarantineRecord]( - ttlcache.WithTTL[string, *QuarantineRecord](ttlcache.NoTTL), - ttlcache.WithCapacity[string, *QuarantineRecord](maxWatchListCap), - ttlcache.WithDisableTouchOnHit[string, *QuarantineRecord](), - ) - go watchList.Start() - staleQuarantineChan := make(chan *QuarantineRecord, maxWatchRecordChannelSize) - m := &RunawayManager{ - syncerInitialized: atomic.Bool{}, - resourceGroupCtl: resourceGroupCtl, - watchList: watchList, - serverID: serverAddr, - runawayQueriesChan: make(chan *RunawayRecord, maxWatchRecordChannelSize), - quarantineChan: make(chan *QuarantineRecord, maxWatchRecordChannelSize), - staleQuarantineRecord: staleQuarantineChan, - activeGroup: make(map[string]int64), - metricsMap: generic.NewSyncMap[string, prometheus.Counter](8), - } - m.insertionCancel = watchList.OnInsertion(func(_ context.Context, i *ttlcache.Item[string, *QuarantineRecord]) { - m.activeLock.Lock() - m.activeGroup[i.Value().ResourceGroupName]++ - m.activeLock.Unlock() - }) - m.evictionCancel = watchList.OnEviction(func(_ context.Context, _ ttlcache.EvictionReason, i *ttlcache.Item[string, *QuarantineRecord]) { - m.activeLock.Lock() - m.activeGroup[i.Value().ResourceGroupName]-- - m.activeLock.Unlock() - if i.Value().ID == 0 { - return - } - staleQuarantineChan <- i.Value() - }) - return m -} - -// MarkSyncerInitialized is used to mark the syncer is initialized. -func (rm *RunawayManager) MarkSyncerInitialized() { - rm.syncerInitialized.Store(true) -} - -// DeriveChecker derives a RunawayChecker from the given resource group -func (rm *RunawayManager) DeriveChecker(resourceGroupName, originalSQL, sqlDigest, planDigest string, startTime time.Time) *RunawayChecker { - group, err := rm.resourceGroupCtl.GetResourceGroup(resourceGroupName) - if err != nil || group == nil { - logutil.BgLogger().Warn("cannot setup up runaway checker", zap.Error(err)) - return nil - } - rm.activeLock.RLock() - defer rm.activeLock.RUnlock() - if group.RunawaySettings == nil && rm.activeGroup[resourceGroupName] == 0 { - return nil - } - counter, ok := rm.metricsMap.Load(resourceGroupName) - if !ok { - counter = metrics.RunawayCheckerCounter.WithLabelValues(resourceGroupName, "hit", "") - rm.metricsMap.Store(resourceGroupName, counter) - } - counter.Inc() - return newRunawayChecker(rm, resourceGroupName, group.RunawaySettings, originalSQL, sqlDigest, planDigest, startTime) -} - -func (rm *RunawayManager) markQuarantine(resourceGroupName, convict string, watchType rmpb.RunawayWatchType, action rmpb.RunawayAction, ttl time.Duration, now *time.Time) { - var endTime time.Time - if ttl > 0 { - endTime = now.UTC().Add(ttl) - } - record := &QuarantineRecord{ - ResourceGroupName: resourceGroupName, - StartTime: now.UTC(), - EndTime: endTime, - Watch: watchType, - WatchText: convict, - Source: rm.serverID, - Action: action, - } - // Add record without ID into watch list in this TiDB right now. - rm.addWatchList(record, ttl, false) - if !rm.syncerInitialized.Load() { - rm.logOnce.Do(func() { - logutil.BgLogger().Warn("runaway syncer is not initialized, so can't records about runaway") - }) - return - } - select { - case rm.quarantineChan <- record: - default: - // TODO: add warning for discard flush records - } -} - -// IsSyncerInitialized is only used for test. -func (rm *RunawayManager) IsSyncerInitialized() bool { - return rm.syncerInitialized.Load() -} - -func (rm *RunawayManager) addWatchList(record *QuarantineRecord, ttl time.Duration, force bool) { - key := record.GetRecordKey() - // This is a pre-check, because we generally believe that in most cases, we will not add a watch list to a key repeatedly. - item := rm.getWatchFromWatchList(key) - if force { - rm.queryLock.Lock() - defer rm.queryLock.Unlock() - if item != nil { - // check the ID because of the earlier scan. - if item.ID == record.ID { - return - } - rm.watchList.Delete(key) - } - rm.watchList.Set(key, record, ttl) - } else { - if item == nil { - rm.queryLock.Lock() - // When watchList get record, it will check whether the record is stale, so add new record if returns nil. - if rm.watchList.Get(key) == nil { - rm.watchList.Set(key, record, ttl) - } else { - rm.staleQuarantineRecord <- record - } - rm.queryLock.Unlock() - } else if item.ID == 0 { - // to replace the record without ID. - rm.queryLock.Lock() - defer rm.queryLock.Unlock() - rm.watchList.Set(key, record, ttl) - } else if item.ID != record.ID { - // check the ID because of the earlier scan. - rm.staleQuarantineRecord <- record - } - } -} - -// GetWatchByKey is used to get a watch item by given key. -func (rm *RunawayManager) GetWatchByKey(key string) *QuarantineRecord { - return rm.getWatchFromWatchList(key) -} - -// GetWatchList is used to get all watch items. -func (rm *RunawayManager) GetWatchList() []*QuarantineRecord { - items := rm.watchList.Items() - ret := make([]*QuarantineRecord, 0, len(items)) - for _, item := range items { - ret = append(ret, item.Value()) - } - return ret -} - -// AddWatch is used to add watch items from system table. -func (rm *RunawayManager) AddWatch(record *QuarantineRecord) { - ttl := time.Until(record.EndTime) - if record.EndTime.Equal(NullTime) { - ttl = 0 - } else if ttl <= 0 { - rm.staleQuarantineRecord <- record - return - } - - force := false - // The manual record replaces the old record. - force = record.Source == ManualSource - rm.addWatchList(record, ttl, force) -} - -// RemoveWatch is used to remove watch item, and this action is triggered by reading done watch system table. -func (rm *RunawayManager) RemoveWatch(record *QuarantineRecord) { - // we should check whether the cached record is not the same as the removing record. - rm.queryLock.Lock() - defer rm.queryLock.Unlock() - item := rm.getWatchFromWatchList(record.GetRecordKey()) - if item == nil { - return - } - if item.ID == record.ID { - rm.watchList.Delete(record.GetRecordKey()) - } -} -func (rm *RunawayManager) getWatchFromWatchList(key string) *QuarantineRecord { - item := rm.watchList.Get(key) - if item != nil { - return item.Value() - } - return nil -} - -func (rm *RunawayManager) markRunaway(resourceGroupName, originalSQL, planDigest, action, matchType string, now *time.Time) { - source := rm.serverID - if !rm.syncerInitialized.Load() { - rm.logOnce.Do(func() { - logutil.BgLogger().Warn("runaway syncer is not initialized, so can't records about runaway") - }) - return - } - select { - case rm.runawayQueriesChan <- &RunawayRecord{ - ResourceGroupName: resourceGroupName, - Time: *now, - Match: matchType, - Action: action, - SQLText: originalSQL, - PlanDigest: planDigest, - Source: source, - }: - default: - // TODO: add warning for discard flush records - } -} - -// FlushThreshold specifies the threshold for the number of records in trigger flush -func (*RunawayManager) FlushThreshold() int { - return maxWatchRecordChannelSize / 2 -} - -// RunawayRecordChan returns the channel of RunawayRecord -func (rm *RunawayManager) RunawayRecordChan() <-chan *RunawayRecord { - return rm.runawayQueriesChan -} - -// QuarantineRecordChan returns the channel of QuarantineRecord -func (rm *RunawayManager) QuarantineRecordChan() <-chan *QuarantineRecord { - return rm.quarantineChan -} - -// StaleQuarantineRecordChan returns the channel of staleQuarantineRecord -func (rm *RunawayManager) StaleQuarantineRecordChan() <-chan *QuarantineRecord { - return rm.staleQuarantineRecord -} - -// examineWatchList check whether the query is in watch list. -func (rm *RunawayManager) examineWatchList(resourceGroupName string, convict string) (bool, rmpb.RunawayAction) { - item := rm.getWatchFromWatchList(resourceGroupName + "/" + convict) - if item == nil { - return false, 0 - } - return true, item.Action -} - -// Stop stops the watchList which is a ttlcache. -func (rm *RunawayManager) Stop() { - if rm == nil { - return - } - if rm.watchList != nil { - rm.watchList.Stop() - } -} - -// RunawayChecker is used to check if the query is runaway. -type RunawayChecker struct { - manager *RunawayManager - resourceGroupName string - originalSQL string - sqlDigest string - planDigest string - - deadline time.Time - // From the group runaway settings, which will be applied when a query lacks a specified watch rule. - settings *rmpb.RunawaySettings - - // markedByRule is set to true when the query matches the group runaway settings. - markedByRule atomic.Bool - // markedByWatch is set to true when the query matches the specified watch rules. - markedByWatch bool - watchAction rmpb.RunawayAction -} - -func newRunawayChecker( - manager *RunawayManager, - resourceGroupName string, settings *rmpb.RunawaySettings, - originalSQL, sqlDigest, planDigest string, startTime time.Time, -) *RunawayChecker { - c := &RunawayChecker{ - manager: manager, - resourceGroupName: resourceGroupName, - originalSQL: originalSQL, - sqlDigest: sqlDigest, - planDigest: planDigest, - settings: settings, - } - if settings != nil { - c.deadline = startTime.Add(time.Duration(settings.Rule.ExecElapsedTimeMs) * time.Millisecond) - } - return c -} - -// BeforeExecutor checks whether query is in watch list before executing and after compiling. -func (r *RunawayChecker) BeforeExecutor() error { - if r == nil { - return nil - } - // Check if the query matches any specified watch rules. - for _, convict := range r.getConvictIdentifiers() { - watched, action := r.manager.examineWatchList(r.resourceGroupName, convict) - if !watched { - continue - } - // Use the group runaway settings if none are provided. - if action == rmpb.RunawayAction_NoneAction && r.settings != nil { - action = r.settings.Action - } - // Mark it if this is the first time being watched. - r.markRunawayByWatch(action) - // Take action if needed. - switch action { - case rmpb.RunawayAction_Kill: - // Return an error to interrupt the query. - return exeerrors.ErrResourceGroupQueryRunawayQuarantine - case rmpb.RunawayAction_CoolDown: - // This action will be handled in `BeforeCopRequest`. - return nil - case rmpb.RunawayAction_DryRun: - // Noop. - return nil - default: - // Continue to examine other convicts. - } - } - return nil -} - -// CheckAction is used to check current action of the query. -// It's safe to call this method concurrently. -func (r *RunawayChecker) CheckAction() rmpb.RunawayAction { - if r == nil { - return rmpb.RunawayAction_NoneAction - } - if r.markedByWatch { - return r.watchAction - } - if r.markedByRule.Load() { - return r.settings.Action - } - return rmpb.RunawayAction_NoneAction -} - -// CheckRuleKillAction checks whether the query should be killed according to the group settings. -func (r *RunawayChecker) CheckRuleKillAction() bool { - // If the group settings are not available and it's not marked by watch, skip this part. - if r.settings == nil && !r.markedByWatch { - return false - } - // If the group settings are available and it's not marked by rule, check the execution time. - if r.settings != nil && !r.markedByRule.Load() { - now := time.Now() - until := r.deadline.Sub(now) - if until > 0 { - return false - } - r.markRunawayByIdentify(r.settings.Action, &now) - return r.settings.Action == rmpb.RunawayAction_Kill - } - return false -} - -// Rule returns the rule of the runaway checker. -func (r *RunawayChecker) Rule() string { - var execElapsedTime time.Duration - if r.settings != nil { - execElapsedTime = time.Duration(r.settings.Rule.ExecElapsedTimeMs) * time.Millisecond - } - return fmt.Sprintf("execElapsedTime:%s", execElapsedTime) -} - -// BeforeCopRequest checks runaway and modifies the request if necessary before sending coprocessor request. -func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error { - // If the group settings are not available and it's not marked by watch, skip this part. - if r.settings == nil && !r.markedByWatch { - return nil - } - // If it's marked by watch and the action is cooldown, override the priority, - if r.markedByWatch && r.watchAction == rmpb.RunawayAction_CoolDown { - req.ResourceControlContext.OverridePriority = 1 // set priority to lowest - } - // If group settings are available and the query is not marked by a rule, - // verify if it matches any rules in the settings. - if r.settings != nil && !r.markedByRule.Load() { - now := time.Now() - until := r.deadline.Sub(now) - if until > 0 { - if r.settings.Action == rmpb.RunawayAction_Kill { - // if the execution time is close to the threshold, set a timeout - if until < tikv.ReadTimeoutMedium { - req.Context.MaxExecutionDurationMs = uint64(until.Milliseconds()) - } - } - return nil - } - // execution time exceeds the threshold, mark the query as runaway - r.markRunawayByIdentify(r.settings.Action, &now) - // Take action if needed. - switch r.settings.Action { - case rmpb.RunawayAction_Kill: - return exeerrors.ErrResourceGroupQueryRunawayInterrupted - case rmpb.RunawayAction_CoolDown: - req.ResourceControlContext.OverridePriority = 1 // set priority to lowest - return nil - default: - return nil - } - } - return nil -} - -// CheckCopRespError checks TiKV error after receiving coprocessor response. -func (r *RunawayChecker) CheckCopRespError(err error) error { - if err == nil || r.settings == nil || r.settings.Action != rmpb.RunawayAction_Kill { - return err - } - if strings.HasPrefix(err.Error(), "Coprocessor task terminated due to exceeding the deadline") { - if !r.markedByRule.Load() { - now := time.Now() - if r.deadline.Before(now) && r.markRunawayByIdentify(r.settings.Action, &now) { - return exeerrors.ErrResourceGroupQueryRunawayInterrupted - } - } - // Due to concurrency, check again. - if r.markedByRule.Load() { - return exeerrors.ErrResourceGroupQueryRunawayInterrupted - } - } - return err -} - -func (r *RunawayChecker) markQuarantine(now *time.Time) { - if r.settings == nil || r.settings.Watch == nil { - return - } - ttl := time.Duration(r.settings.Watch.LastingDurationMs) * time.Millisecond - - r.manager.markQuarantine(r.resourceGroupName, r.getSettingConvictIdentifier(), r.settings.Watch.Type, r.settings.Action, ttl, now) -} - -func (r *RunawayChecker) markRunawayByIdentify(action rmpb.RunawayAction, now *time.Time) bool { - swapped := r.markedByRule.CompareAndSwap(false, true) - if swapped { - r.markRunaway("identify", action, now) - if !r.markedByWatch { - r.markQuarantine(now) - } - } - return swapped -} - -func (r *RunawayChecker) markRunawayByWatch(action rmpb.RunawayAction) { - r.markedByWatch = true - r.watchAction = action - now := time.Now() - r.markRunaway("watch", action, &now) -} - -func (r *RunawayChecker) markRunaway(matchType string, action rmpb.RunawayAction, now *time.Time) { - actionStr := strings.ToLower(action.String()) - metrics.RunawayCheckerCounter.WithLabelValues(r.resourceGroupName, matchType, actionStr).Inc() - r.manager.markRunaway(r.resourceGroupName, r.originalSQL, r.planDigest, actionStr, matchType, now) -} - -func (r *RunawayChecker) getSettingConvictIdentifier() string { - if r == nil || r.settings == nil || r.settings.Watch == nil { - return "" - } - switch r.settings.Watch.Type { - case rmpb.RunawayWatchType_Plan: - return r.planDigest - case rmpb.RunawayWatchType_Similar: - return r.sqlDigest - case rmpb.RunawayWatchType_Exact: - return r.originalSQL - default: - return "" - } -} - -func (r *RunawayChecker) getConvictIdentifiers() []string { - return []string{r.originalSQL, r.sqlDigest, r.planDigest} -} diff --git a/pkg/domain/ru_stats.go b/pkg/domain/ru_stats.go index 1dd64e969b68b..e31fb03306625 100644 --- a/pkg/domain/ru_stats.go +++ b/pkg/domain/ru_stats.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/resourcegroup/runaway" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" @@ -216,7 +217,7 @@ func (r *RUStatsWriter) persistLatestRUStats(stats *meta.RUStats) error { func (r *RUStatsWriter) isLatestDataInserted(lastEndTime time.Time) (bool, error) { end := lastEndTime.Format(time.DateTime) start := lastEndTime.Add(-ruStatsInterval).Format(time.DateTime) - rows, sqlErr := execRestrictedSQL(r.sessPool, "SELECT 1 from mysql.request_unit_by_group where start_time = %? and end_time = %? limit 1", []any{start, end}) + rows, sqlErr := runaway.ExecRCRestrictedSQL(r.sessPool, "SELECT 1 from mysql.request_unit_by_group where start_time = %? and end_time = %? limit 1", []any{start, end}) if sqlErr != nil { return false, errors.Trace(sqlErr) } @@ -229,7 +230,7 @@ func (r *RUStatsWriter) insertRUStats(stats *meta.RUStats) error { return nil } - _, err := execRestrictedSQL(r.sessPool, sql, nil) + _, err := runaway.ExecRCRestrictedSQL(r.sessPool, sql, nil) return err } @@ -237,7 +238,7 @@ func (r *RUStatsWriter) insertRUStats(stats *meta.RUStats) error { func (r *RUStatsWriter) GCOutdatedRecords(lastEndTime time.Time) error { gcEndDate := lastEndTime.Add(-ruStatsGCDuration).Format(time.DateTime) countSQL := fmt.Sprintf("SELECT count(*) FROM mysql.request_unit_by_group where end_time <= '%s'", gcEndDate) - rows, err := execRestrictedSQL(r.sessPool, countSQL, nil) + rows, err := runaway.ExecRCRestrictedSQL(r.sessPool, countSQL, nil) if err != nil { return errors.Trace(err) } @@ -246,7 +247,7 @@ func (r *RUStatsWriter) GCOutdatedRecords(lastEndTime time.Time) error { loopCount := (totalCount + gcBatchSize - 1) / gcBatchSize for i := int64(0); i < loopCount; i++ { sql := fmt.Sprintf("DELETE FROM mysql.request_unit_by_group where end_time <= '%s' order by end_time limit %d", gcEndDate, gcBatchSize) - _, err = execRestrictedSQL(r.sessPool, sql, nil) + _, err = runaway.ExecRCRestrictedSQL(r.sessPool, sql, nil) if err != nil { return errors.Trace(err) } diff --git a/pkg/domain/runaway.go b/pkg/domain/runaway.go index ae5d50724a047..fb23a327e1da6 100644 --- a/pkg/domain/runaway.go +++ b/pkg/domain/runaway.go @@ -18,27 +18,13 @@ import ( "context" "net" "strconv" - "strings" - "sync" "time" - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/tidb/pkg/domain/infosync" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" - "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/terror" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/ttl/cache" - "github.com/pingcap/tidb/pkg/ttl/sqlbuilder" - "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/resourcegroup/runaway" "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" rmclient "github.com/tikv/pd/client/resource_group/controller" @@ -46,99 +32,32 @@ import ( ) const ( - runawayRecordFlushInterval = time.Second - runawayRecordGCInterval = time.Hour * 24 - runawayRecordExpiredDuration = time.Hour * 24 * 7 - runawayWatchSyncInterval = time.Second - - runawayRecordGCBatchSize = 100 - runawayRecordGCSelectBatchSize = runawayRecordGCBatchSize * 5 - - maxIDRetries = 3 + runawayWatchSyncInterval = time.Second runawayLoopLogErrorIntervalCount = 1800 ) -var systemSchemaCIStr = model.NewCIStr("mysql") - -func (do *Domain) deleteExpiredRows(tableName, colName string, expiredDuration time.Duration) { - if !do.DDL().OwnerManager().IsOwner() { - return - } - failpoint.Inject("FastRunawayGC", func() { - expiredDuration = time.Second * 1 - }) - expiredTime := time.Now().Add(-expiredDuration) - tbCIStr := model.NewCIStr(tableName) - tbl, err := do.InfoSchema().TableByName(context.Background(), systemSchemaCIStr, tbCIStr) - if err != nil { - logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) - return - } - tbInfo := tbl.Meta() - col := tbInfo.FindPublicColumnByName(colName) - if col == nil { - logutil.BgLogger().Error("time column is not public in table", zap.String("table", tableName), zap.String("column", colName)) - return +func (do *Domain) initResourceGroupsController(ctx context.Context, pdClient pd.Client, uniqueID uint64) error { + if pdClient == nil { + logutil.BgLogger().Warn("cannot setup up resource controller, not using tikv storage") + // return nil as unistore doesn't support it + return nil } - tb, err := cache.NewBasePhysicalTable(systemSchemaCIStr, tbInfo, model.NewCIStr(""), col) + + control, err := rmclient.NewResourceGroupController(ctx, uniqueID, pdClient, nil, rmclient.WithMaxWaitDuration(runaway.MaxWaitDuration)) if err != nil { - logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) - return + return err } - generator, err := sqlbuilder.NewScanQueryGenerator(tb, expiredTime, nil, nil) + control.Start(ctx) + serverInfo, err := infosync.GetServerInfo() if err != nil { - logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) - return - } - var leftRows [][]types.Datum - for { - sql := "" - if sql, err = generator.NextSQL(leftRows, runawayRecordGCSelectBatchSize); err != nil { - logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) - return - } - // to remove - if len(sql) == 0 { - return - } - - rows, sqlErr := execRestrictedSQL(do.sysSessionPool, sql, nil) - if sqlErr != nil { - logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) - return - } - leftRows = make([][]types.Datum, len(rows)) - for i, row := range rows { - leftRows[i] = row.GetDatumRow(tb.KeyColumnTypes) - } - - for len(leftRows) > 0 { - var delBatch [][]types.Datum - if len(leftRows) < runawayRecordGCBatchSize { - delBatch = leftRows - leftRows = nil - } else { - delBatch = leftRows[0:runawayRecordGCBatchSize] - leftRows = leftRows[runawayRecordGCBatchSize:] - } - sql, err := sqlbuilder.BuildDeleteSQL(tb, delBatch, expiredTime) - if err != nil { - logutil.BgLogger().Error( - "build delete SQL failed when deleting system table", - zap.Error(err), - zap.String("table", tb.Schema.O+"."+tb.Name.O), - ) - return - } - - _, err = execRestrictedSQL(do.sysSessionPool, sql, nil) - if err != nil { - logutil.BgLogger().Error( - "delete SQL failed when deleting system table", zap.Error(err), zap.String("SQL", sql), - ) - } - } + return err } + serverAddr := net.JoinHostPort(serverInfo.IP, strconv.Itoa(int(serverInfo.Port))) + do.runawayManager = runaway.NewRunawayManager(control, serverAddr, + do.sysSessionPool, do.exit, do.infoCache, do.ddl) + do.resourceGroupsController = control + tikv.SetResourceControlInterceptor(control) + return nil } func (do *Domain) runawayStartLoop() { @@ -153,11 +72,11 @@ func (do *Domain) runawayStartLoop() { return case <-runawayWatchSyncTicker.C: // Due to the watch and watch done tables is created later than runaway queries table - err = do.updateNewAndDoneWatch() + err = do.runawayManager.UpdateNewAndDoneWatch() if err == nil { logutil.BgLogger().Info("preparations for the runaway manager are finished and start runaway manager loop") - do.wg.Run(do.runawayRecordFlushLoop, "runawayRecordFlushLoop") - do.wg.Run(do.runawayWatchSyncLoop, "runawayWatchSyncLoop") + do.wg.Run(do.runawayManager.RunawayRecordFlushLoop, "runawayRecordFlushLoop") + do.wg.Run(do.runawayManager.RunawayWatchSyncLoop, "runawayWatchSyncLoop") do.runawayManager.MarkSyncerInitialized() return } @@ -170,490 +89,3 @@ func (do *Domain) runawayStartLoop() { count++ } } - -func (do *Domain) updateNewAndDoneWatch() error { - do.runawaySyncer.mu.Lock() - defer do.runawaySyncer.mu.Unlock() - records, err := do.runawaySyncer.getNewWatchRecords() - if err != nil { - return err - } - for _, r := range records { - do.runawayManager.AddWatch(r) - } - doneRecords, err := do.runawaySyncer.getNewWatchDoneRecords() - if err != nil { - return err - } - for _, r := range doneRecords { - do.runawayManager.RemoveWatch(r) - } - return nil -} - -func (do *Domain) runawayWatchSyncLoop() { - defer util.Recover(metrics.LabelDomain, "runawayWatchSyncLoop", nil, false) - runawayWatchSyncTicker := time.NewTicker(runawayWatchSyncInterval) - count := 0 - for { - select { - case <-do.exit: - return - case <-runawayWatchSyncTicker.C: - err := do.updateNewAndDoneWatch() - if err != nil { - if count %= runawayLoopLogErrorIntervalCount; count == 0 { - logutil.BgLogger().Warn("get runaway watch record failed", zap.Error(err)) - } - count++ - } - } - } -} - -// GetRunawayWatchList is used to get all items from runaway watch list. -func (do *Domain) GetRunawayWatchList() []*resourcegroup.QuarantineRecord { - return do.runawayManager.GetWatchList() -} - -// TryToUpdateRunawayWatch is used to update watch list including -// creation and deletion by manual trigger. -func (do *Domain) TryToUpdateRunawayWatch() error { - return do.updateNewAndDoneWatch() -} - -// RemoveRunawayWatch is used to remove runaway watch item manually. -func (do *Domain) RemoveRunawayWatch(recordID int64) error { - do.runawaySyncer.mu.Lock() - defer do.runawaySyncer.mu.Unlock() - records, err := do.runawaySyncer.getWatchRecordByID(recordID) - if err != nil { - return err - } - if len(records) != 1 { - return errors.Errorf("no runaway watch with the specific ID") - } - err = do.handleRunawayWatchDone(records[0]) - return err -} - -func (do *Domain) runawayRecordFlushLoop() { - defer util.Recover(metrics.LabelDomain, "runawayRecordFlushLoop", nil, false) - - // this times is used to batch flushing records, with 1s duration, - // we can guarantee a watch record can be seen by the user within 1s. - runawayRecordFlushTimer := time.NewTimer(runawayRecordFlushInterval) - runawayRecordGCTicker := time.NewTicker(runawayRecordGCInterval) - failpoint.Inject("FastRunawayGC", func() { - runawayRecordFlushTimer.Stop() - runawayRecordGCTicker.Stop() - runawayRecordFlushTimer = time.NewTimer(time.Millisecond * 50) - runawayRecordGCTicker = time.NewTicker(time.Millisecond * 200) - }) - - fired := false - recordCh := do.runawayManager.RunawayRecordChan() - quarantineRecordCh := do.runawayManager.QuarantineRecordChan() - staleQuarantineRecordCh := do.runawayManager.StaleQuarantineRecordChan() - flushThreshold := do.runawayManager.FlushThreshold() - records := make([]*resourcegroup.RunawayRecord, 0, flushThreshold) - - flushRunawayRecords := func() { - if len(records) == 0 { - return - } - sql, params := resourcegroup.GenRunawayQueriesStmt(records) - if _, err := execRestrictedSQL(do.sysSessionPool, sql, params); err != nil { - logutil.BgLogger().Error("flush runaway records failed", zap.Error(err), zap.Int("count", len(records))) - } - records = records[:0] - } - - for { - select { - case <-do.exit: - return - case <-runawayRecordFlushTimer.C: - flushRunawayRecords() - fired = true - case r := <-recordCh: - records = append(records, r) - failpoint.Inject("FastRunawayGC", func() { - flushRunawayRecords() - }) - if len(records) >= flushThreshold { - flushRunawayRecords() - } else if fired { - fired = false - // meet a new record, reset the timer. - runawayRecordFlushTimer.Reset(runawayRecordFlushInterval) - } - case <-runawayRecordGCTicker.C: - go do.deleteExpiredRows("tidb_runaway_queries", "time", runawayRecordExpiredDuration) - case r := <-quarantineRecordCh: - go func() { - _, err := do.AddRunawayWatch(r) - if err != nil { - logutil.BgLogger().Error("add runaway watch", zap.Error(err)) - } - }() - case r := <-staleQuarantineRecordCh: - go func() { - for i := 0; i < 3; i++ { - err := do.handleRemoveStaleRunawayWatch(r) - if err == nil { - break - } - logutil.BgLogger().Error("remove stale runaway watch", zap.Error(err)) - time.Sleep(time.Second) - } - }() - } - } -} - -// AddRunawayWatch is used to add runaway watch item manually. -func (do *Domain) AddRunawayWatch(record *resourcegroup.QuarantineRecord) (uint64, error) { - se, err := do.sysSessionPool.Get() - defer func() { - do.sysSessionPool.Put(se) - }() - if err != nil { - return 0, errors.Annotate(err, "get session failed") - } - exec := se.(sessionctx.Context).GetSQLExecutor() - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) - _, err = exec.ExecuteInternal(ctx, "BEGIN") - if err != nil { - return 0, errors.Trace(err) - } - defer func() { - if err != nil { - _, err1 := exec.ExecuteInternal(ctx, "ROLLBACK") - terror.Log(err1) - return - } - _, err = exec.ExecuteInternal(ctx, "COMMIT") - if err != nil { - return - } - }() - sql, params := record.GenInsertionStmt() - _, err = exec.ExecuteInternal(ctx, sql, params...) - if err != nil { - return 0, err - } - for retry := 0; retry < maxIDRetries; retry++ { - if retry > 0 { - select { - case <-do.exit: - return 0, err - case <-time.After(time.Millisecond * time.Duration(retry*100)): - logutil.BgLogger().Warn("failed to get last insert id when adding runaway watch", zap.Error(err)) - } - } - var rs sqlexec.RecordSet - rs, err = exec.ExecuteInternal(ctx, `SELECT LAST_INSERT_ID();`) - if err != nil { - continue - } - var rows []chunk.Row - rows, err = sqlexec.DrainRecordSet(ctx, rs, 1) - //nolint: errcheck - rs.Close() - if err != nil { - continue - } - if len(rows) != 1 { - err = errors.Errorf("unexpected result length: %d", len(rows)) - continue - } - return rows[0].GetUint64(0), nil - } - return 0, errors.Errorf("An error: %v occurred while getting the ID of the newly added watch record. Try querying information_schema.runaway_watches later", err) -} - -func (do *Domain) handleRunawayWatchDone(record *resourcegroup.QuarantineRecord) error { - se, err := do.sysSessionPool.Get() - defer func() { - do.sysSessionPool.Put(se) - }() - if err != nil { - return errors.Annotate(err, "get session failed") - } - sctx, _ := se.(sessionctx.Context) - exec := sctx.GetSQLExecutor() - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) - _, err = exec.ExecuteInternal(ctx, "BEGIN") - if err != nil { - return errors.Trace(err) - } - defer func() { - if err != nil { - _, err1 := exec.ExecuteInternal(ctx, "ROLLBACK") - terror.Log(err1) - return - } - _, err = exec.ExecuteInternal(ctx, "COMMIT") - if err != nil { - return - } - }() - sql, params := record.GenInsertionDoneStmt() - _, err = exec.ExecuteInternal(ctx, sql, params...) - if err != nil { - return err - } - sql, params = record.GenDeletionStmt() - _, err = exec.ExecuteInternal(ctx, sql, params...) - return err -} - -func (do *Domain) handleRemoveStaleRunawayWatch(record *resourcegroup.QuarantineRecord) error { - se, err := do.sysSessionPool.Get() - defer func() { - do.sysSessionPool.Put(se) - }() - if err != nil { - return errors.Annotate(err, "get session failed") - } - sctx, _ := se.(sessionctx.Context) - exec := sctx.GetSQLExecutor() - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) - _, err = exec.ExecuteInternal(ctx, "BEGIN") - if err != nil { - return errors.Trace(err) - } - defer func() { - if err != nil { - _, err1 := exec.ExecuteInternal(ctx, "ROLLBACK") - terror.Log(err1) - return - } - _, err = exec.ExecuteInternal(ctx, "COMMIT") - if err != nil { - return - } - }() - sql, params := record.GenDeletionStmt() - _, err = exec.ExecuteInternal(ctx, sql, params...) - return err -} - -func execRestrictedSQL(sessPool util.SessionPool, sql string, params []any) ([]chunk.Row, error) { - se, err := sessPool.Get() - defer func() { - sessPool.Put(se) - }() - if err != nil { - return nil, errors.Annotate(err, "get session failed") - } - sctx := se.(sessionctx.Context) - exec := sctx.GetRestrictedSQLExecutor() - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) - r, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, - sql, params..., - ) - return r, err -} - -func (do *Domain) initResourceGroupsController(ctx context.Context, pdClient pd.Client, uniqueID uint64) error { - if pdClient == nil { - logutil.BgLogger().Warn("cannot setup up resource controller, not using tikv storage") - // return nil as unistore doesn't support it - return nil - } - - control, err := rmclient.NewResourceGroupController(ctx, uniqueID, pdClient, nil, rmclient.WithMaxWaitDuration(resourcegroup.MaxWaitDuration)) - if err != nil { - return err - } - control.Start(ctx) - serverInfo, err := infosync.GetServerInfo() - if err != nil { - return err - } - serverAddr := net.JoinHostPort(serverInfo.IP, strconv.Itoa(int(serverInfo.Port))) - do.runawayManager = resourcegroup.NewRunawayManager(control, serverAddr) - do.runawaySyncer = newRunawaySyncer(do.sysSessionPool) - do.resourceGroupsController = control - tikv.SetResourceControlInterceptor(control) - return nil -} - -type runawaySyncer struct { - newWatchReader *SystemTableReader - deletionWatchReader *SystemTableReader - sysSessionPool util.SessionPool - mu sync.Mutex -} - -func newRunawaySyncer(sysSessionPool util.SessionPool) *runawaySyncer { - return &runawaySyncer{ - sysSessionPool: sysSessionPool, - newWatchReader: &SystemTableReader{ - resourcegroup.RunawayWatchTableName, - "start_time", - resourcegroup.NullTime}, - deletionWatchReader: &SystemTableReader{resourcegroup.RunawayWatchDoneTableName, - "done_time", - resourcegroup.NullTime}, - } -} - -func (s *runawaySyncer) getWatchRecordByID(id int64) ([]*resourcegroup.QuarantineRecord, error) { - return s.getWatchRecord(s.newWatchReader, s.newWatchReader.genSelectByIDStmt(id), false) -} - -func (s *runawaySyncer) getNewWatchRecords() ([]*resourcegroup.QuarantineRecord, error) { - return s.getWatchRecord(s.newWatchReader, s.newWatchReader.genSelectStmt, true) -} - -func (s *runawaySyncer) getNewWatchDoneRecords() ([]*resourcegroup.QuarantineRecord, error) { - return s.getWatchDoneRecord(s.deletionWatchReader, s.deletionWatchReader.genSelectStmt, true) -} - -func (s *runawaySyncer) getWatchRecord(reader *SystemTableReader, sqlGenFn func() (string, []any), push bool) ([]*resourcegroup.QuarantineRecord, error) { - se, err := s.sysSessionPool.Get() - defer func() { - s.sysSessionPool.Put(se) - }() - if err != nil { - return nil, errors.Annotate(err, "get session failed") - } - sctx := se.(sessionctx.Context) - exec := sctx.GetRestrictedSQLExecutor() - return getRunawayWatchRecord(exec, reader, sqlGenFn, push) -} - -func (s *runawaySyncer) getWatchDoneRecord(reader *SystemTableReader, sqlGenFn func() (string, []any), push bool) ([]*resourcegroup.QuarantineRecord, error) { - se, err := s.sysSessionPool.Get() - defer func() { - s.sysSessionPool.Put(se) - }() - if err != nil { - return nil, errors.Annotate(err, "get session failed") - } - sctx := se.(sessionctx.Context) - exec := sctx.GetRestrictedSQLExecutor() - return getRunawayWatchDoneRecord(exec, reader, sqlGenFn, push) -} - -func getRunawayWatchRecord(exec sqlexec.RestrictedSQLExecutor, reader *SystemTableReader, sqlGenFn func() (string, []any), push bool) ([]*resourcegroup.QuarantineRecord, error) { - rs, err := reader.Read(exec, sqlGenFn) - if err != nil { - return nil, err - } - ret := make([]*resourcegroup.QuarantineRecord, 0, len(rs)) - now := time.Now().UTC() - for _, r := range rs { - startTime, err := r.GetTime(2).GoTime(time.UTC) - if err != nil { - continue - } - var endTime time.Time - if !r.IsNull(3) { - endTime, err = r.GetTime(3).GoTime(time.UTC) - if err != nil { - continue - } - } - qr := &resourcegroup.QuarantineRecord{ - ID: r.GetInt64(0), - ResourceGroupName: r.GetString(1), - StartTime: startTime, - EndTime: endTime, - Watch: rmpb.RunawayWatchType(r.GetInt64(4)), - WatchText: r.GetString(5), - Source: r.GetString(6), - Action: rmpb.RunawayAction(r.GetInt64(7)), - } - // If a TiDB write record slow, it will occur that the record which has earlier start time is inserted later than others. - // So we start the scan a little earlier. - if push { - reader.CheckPoint = now.Add(-3 * runawayWatchSyncInterval) - } - ret = append(ret, qr) - } - return ret, nil -} - -func getRunawayWatchDoneRecord(exec sqlexec.RestrictedSQLExecutor, reader *SystemTableReader, sqlGenFn func() (string, []any), push bool) ([]*resourcegroup.QuarantineRecord, error) { - rs, err := reader.Read(exec, sqlGenFn) - if err != nil { - return nil, err - } - length := len(rs) - ret := make([]*resourcegroup.QuarantineRecord, 0, length) - now := time.Now().UTC() - for _, r := range rs { - startTime, err := r.GetTime(3).GoTime(time.UTC) - if err != nil { - continue - } - var endTime time.Time - if !r.IsNull(4) { - endTime, err = r.GetTime(4).GoTime(time.UTC) - if err != nil { - continue - } - } - qr := &resourcegroup.QuarantineRecord{ - ID: r.GetInt64(1), - ResourceGroupName: r.GetString(2), - StartTime: startTime, - EndTime: endTime, - Watch: rmpb.RunawayWatchType(r.GetInt64(5)), - WatchText: r.GetString(6), - Source: r.GetString(7), - Action: rmpb.RunawayAction(r.GetInt64(8)), - } - // Ditto as getRunawayWatchRecord. - if push { - reader.CheckPoint = now.Add(-3 * runawayWatchSyncInterval) - } - ret = append(ret, qr) - } - return ret, nil -} - -// SystemTableReader is used to read table `runaway_watch` and `runaway_watch_done`. -type SystemTableReader struct { - TableName string - KeyCol string - CheckPoint time.Time -} - -func (r *SystemTableReader) genSelectByIDStmt(id int64) func() (string, []any) { - return func() (string, []any) { - var builder strings.Builder - params := make([]any, 0, 1) - builder.WriteString("select * from ") - builder.WriteString(r.TableName) - builder.WriteString(" where id = %?") - params = append(params, id) - return builder.String(), params - } -} - -func (r *SystemTableReader) genSelectStmt() (string, []any) { - var builder strings.Builder - params := make([]any, 0, 1) - builder.WriteString("select * from ") - builder.WriteString(r.TableName) - builder.WriteString(" where ") - builder.WriteString(r.KeyCol) - builder.WriteString(" > %? order by ") - builder.WriteString(r.KeyCol) - params = append(params, r.CheckPoint) - return builder.String(), params -} - -func (*SystemTableReader) Read(exec sqlexec.RestrictedSQLExecutor, genFn func() (string, []any)) ([]chunk.Row, error) { - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) - sql, params := genFn() - rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, - sql, params..., - ) - return rows, err -} diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index 6a3ef72b8979d..537657b36ec49 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -103,7 +103,6 @@ go_library( "//pkg/disttask/importinto", "//pkg/domain", "//pkg/domain/infosync", - "//pkg/domain/resourcegroup", "//pkg/errctx", "//pkg/errno", "//pkg/executor/aggfuncs", @@ -162,6 +161,8 @@ go_library( "//pkg/plugin", "//pkg/privilege", "//pkg/privilege/privileges", + "//pkg/resourcegroup", + "//pkg/resourcegroup/runaway", "//pkg/resourcemanager/pool/workerpool", "//pkg/resourcemanager/util", "//pkg/session/txninfo", diff --git a/pkg/executor/infoschema_reader.go b/pkg/executor/infoschema_reader.go index 06fdc12139969..69dadf7149897 100644 --- a/pkg/executor/infoschema_reader.go +++ b/pkg/executor/infoschema_reader.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/tidb/pkg/ddl/placement" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/domain/infosync" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/executor/internal/pdhelper" @@ -51,6 +50,7 @@ import ( "github.com/pingcap/tidb/pkg/planner/core/base" "github.com/pingcap/tidb/pkg/privilege" "github.com/pingcap/tidb/pkg/privilege/privileges" + "github.com/pingcap/tidb/pkg/resourcegroup/runaway" "github.com/pingcap/tidb/pkg/session/txninfo" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" @@ -3540,11 +3540,11 @@ func (e *memtableRetriever) setDataFromPlacementPolicies(sctx sessionctx.Context func (e *memtableRetriever) setDataFromRunawayWatches(sctx sessionctx.Context) error { do := domain.GetDomain(sctx) - err := do.TryToUpdateRunawayWatch() + err := do.RunawayManager().UpdateNewAndDoneWatch() if err != nil { logutil.BgLogger().Warn("read runaway watch list", zap.Error(err)) } - watches := do.GetRunawayWatchList() + watches := do.RunawayManager().GetWatchList() rows := make([][]types.Datum, 0, len(watches)) for _, watch := range watches { action := watch.Action @@ -3558,7 +3558,7 @@ func (e *memtableRetriever) setDataFromRunawayWatches(sctx sessionctx.Context) e watch.Source, action.String(), ) - if watch.EndTime.Equal(resourcegroup.NullTime) { + if watch.EndTime.Equal(runaway.NullTime) { row[3].SetString("UNLIMITED", mysql.DefaultCollationName) } rows = append(rows, row) diff --git a/pkg/executor/internal/querywatch/BUILD.bazel b/pkg/executor/internal/querywatch/BUILD.bazel index 5845085acce25..bbd1c16906d26 100644 --- a/pkg/executor/internal/querywatch/BUILD.bazel +++ b/pkg/executor/internal/querywatch/BUILD.bazel @@ -7,13 +7,14 @@ go_library( visibility = ["//pkg/executor:__subpackages__"], deps = [ "//pkg/domain", - "//pkg/domain/resourcegroup", "//pkg/executor/internal/exec", "//pkg/infoschema", "//pkg/parser", "//pkg/parser/ast", "//pkg/parser/model", "//pkg/planner/util", + "//pkg/resourcegroup", + "//pkg/resourcegroup/runaway", "//pkg/sessionctx", "//pkg/util/chunk", "@com_github_pingcap_errors//:errors", diff --git a/pkg/executor/internal/querywatch/query_watch.go b/pkg/executor/internal/querywatch/query_watch.go index 07b19104399e5..7f9c0087ba464 100644 --- a/pkg/executor/internal/querywatch/query_watch.go +++ b/pkg/executor/internal/querywatch/query_watch.go @@ -22,13 +22,14 @@ import ( "github.com/pingcap/errors" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/tidb/pkg/domain" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" plannerutil "github.com/pingcap/tidb/pkg/planner/util" + "github.com/pingcap/tidb/pkg/resourcegroup" + "github.com/pingcap/tidb/pkg/resourcegroup/runaway" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/util/chunk" rmclient "github.com/tikv/pd/client/resource_group/controller" @@ -37,7 +38,7 @@ import ( // setWatchOption is used to set the QuarantineRecord with specific QueryWatchOption ast node. func setWatchOption(ctx context.Context, sctx, newSctx sessionctx.Context, - record *resourcegroup.QuarantineRecord, + record *runaway.QuarantineRecord, op *ast.QueryWatchOption, ) error { switch op.Tp { @@ -116,11 +117,12 @@ func setWatchOption(ctx context.Context, } // fromQueryWatchOptionList is used to create a QuarantineRecord with some QueryWatchOption ast nodes. -func fromQueryWatchOptionList(ctx context.Context, sctx, newSctx sessionctx.Context, optionList []*ast.QueryWatchOption) (*resourcegroup.QuarantineRecord, error) { - record := &resourcegroup.QuarantineRecord{ - Source: resourcegroup.ManualSource, +func fromQueryWatchOptionList(ctx context.Context, sctx, newSctx sessionctx.Context, + optionList []*ast.QueryWatchOption) (*runaway.QuarantineRecord, error) { + record := &runaway.QuarantineRecord{ + Source: runaway.ManualSource, StartTime: time.Now(), - EndTime: resourcegroup.NullTime, + EndTime: runaway.NullTime, } for _, op := range optionList { if err := setWatchOption(ctx, sctx, newSctx, record, op); err != nil { @@ -133,7 +135,7 @@ func fromQueryWatchOptionList(ctx context.Context, sctx, newSctx sessionctx.Cont // validateWatchRecord follows several designs: // 1. If no resource group is set, the default resource group is used // 2. If no action is specified, the action of the resource group is used. If no, an error message displayed. -func validateWatchRecord(record *resourcegroup.QuarantineRecord, client *rmclient.ResourceGroupsController) error { +func validateWatchRecord(record *runaway.QuarantineRecord, client *rmclient.ResourceGroupsController) error { if len(record.ResourceGroupName) == 0 { record.ResourceGroupName = resourcegroup.DefaultResourceGroupName } @@ -182,7 +184,7 @@ func (e *AddExecutor) Next(ctx context.Context, req *chunk.Chunk) error { if err := validateWatchRecord(record, do.ResourceGroupsController()); err != nil { return err } - id, err := do.AddRunawayWatch(record) + id, err := do.RunawayManager().AddRunawayWatch(record) if err != nil { return err } @@ -193,6 +195,6 @@ func (e *AddExecutor) Next(ctx context.Context, req *chunk.Chunk) error { // ExecDropQueryWatch is use to exec DropQueryWatchStmt. func ExecDropQueryWatch(sctx sessionctx.Context, id int64) error { do := domain.GetDomain(sctx) - err := do.RemoveRunawayWatch(id) + err := do.RunawayManager().RemoveRunawayWatch(id) return err } diff --git a/pkg/executor/simple.go b/pkg/executor/simple.go index 48108a83bd636..43887a1599613 100644 --- a/pkg/executor/simple.go +++ b/pkg/executor/simple.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tidb/pkg/distsql" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/domain/infosync" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/executor/internal/querywatch" @@ -51,6 +50,7 @@ import ( "github.com/pingcap/tidb/pkg/planner/core/resolve" "github.com/pingcap/tidb/pkg/plugin" "github.com/pingcap/tidb/pkg/privilege" + "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/sessionstates" "github.com/pingcap/tidb/pkg/sessionctx/variable" diff --git a/pkg/kv/BUILD.bazel b/pkg/kv/BUILD.bazel index 62a5c6f210694..d82e1da2996fb 100644 --- a/pkg/kv/BUILD.bazel +++ b/pkg/kv/BUILD.bazel @@ -23,11 +23,11 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/config", - "//pkg/domain/resourcegroup", "//pkg/errno", "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/parser/terror", + "//pkg/resourcegroup", "//pkg/types", "//pkg/util/codec", "//pkg/util/dbterror", diff --git a/pkg/kv/kv.go b/pkg/kv/kv.go index 8dc08c8adc1fb..d2df9824862c7 100644 --- a/pkg/kv/kv.go +++ b/pkg/kv/kv.go @@ -26,8 +26,8 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/tiflash" "github.com/pingcap/tidb/pkg/util/trxevents" @@ -598,7 +598,7 @@ type Request struct { // TiKVClientReadTimeout is the timeout of kv read request TiKVClientReadTimeout uint64 - RunawayChecker *resourcegroup.RunawayChecker + RunawayChecker resourcegroup.RunawayChecker // ConnID stores the session connection id. ConnID uint64 diff --git a/pkg/meta/BUILD.bazel b/pkg/meta/BUILD.bazel index f69b0a57b92fe..ffd98f9583563 100644 --- a/pkg/meta/BUILD.bazel +++ b/pkg/meta/BUILD.bazel @@ -9,12 +9,12 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/meta", visibility = ["//visibility:public"], deps = [ - "//pkg/domain/resourcegroup", "//pkg/errno", "//pkg/kv", "//pkg/metrics", "//pkg/parser/model", "//pkg/parser/mysql", + "//pkg/resourcegroup", "//pkg/structure", "//pkg/util/dbterror", "//pkg/util/hack", diff --git a/pkg/meta/meta.go b/pkg/meta/meta.go index 79dee2297e3bf..8286975b9abc6 100644 --- a/pkg/meta/meta.go +++ b/pkg/meta/meta.go @@ -29,12 +29,12 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/structure" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/hack" diff --git a/pkg/resourcegroup/BUILD.bazel b/pkg/resourcegroup/BUILD.bazel new file mode 100644 index 0000000000000..90e26f5befb5b --- /dev/null +++ b/pkg/resourcegroup/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "resourcegroup", + srcs = ["checker.go"], + importpath = "github.com/pingcap/tidb/pkg/resourcegroup", + visibility = ["//visibility:public"], + deps = [ + "@com_github_pingcap_kvproto//pkg/resource_manager", + "@com_github_tikv_client_go_v2//tikvrpc", + ], +) diff --git a/pkg/resourcegroup/checker.go b/pkg/resourcegroup/checker.go new file mode 100644 index 0000000000000..4e51319ae9a44 --- /dev/null +++ b/pkg/resourcegroup/checker.go @@ -0,0 +1,40 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resourcegroup + +import ( + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/tikv/client-go/v2/tikvrpc" +) + +// DefaultResourceGroupName is the default resource group name. +const DefaultResourceGroupName = "default" + +// RunawayChecker is used to check runaway queries. +type RunawayChecker interface { + // BeforeExecutor checks whether query is in watch list before executing and after compiling. + BeforeExecutor() error + // BeforeCopRequest checks runaway and modifies the request if necessary before sending coprocessor request. + BeforeCopRequest(req *tikvrpc.Request) error + // CheckCopRespError checks TiKV error after receiving coprocessor response. + CheckCopRespError(err error) error + // CheckAction is used to check current action of the query. + // It's safe to call this method concurrently. + CheckAction() rmpb.RunawayAction + // CheckRuleKillAction checks whether the query should be killed according to the group settings. + CheckRuleKillAction() bool + // Rule returns the rule of the runaway checker. + Rule() string +} diff --git a/pkg/domain/resourcegroup/BUILD.bazel b/pkg/resourcegroup/runaway/BUILD.bazel similarity index 50% rename from pkg/domain/resourcegroup/BUILD.bazel rename to pkg/resourcegroup/runaway/BUILD.bazel index 44648b1ed4606..9dcbbe78c898e 100644 --- a/pkg/domain/resourcegroup/BUILD.bazel +++ b/pkg/resourcegroup/runaway/BUILD.bazel @@ -1,16 +1,35 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( - name = "resourcegroup", - srcs = ["runaway.go"], - importpath = "github.com/pingcap/tidb/pkg/domain/resourcegroup", + name = "runaway", + srcs = [ + "checker.go", + "manager.go", + "record.go", + "syncer.go", + ], + importpath = "github.com/pingcap/tidb/pkg/resourcegroup/runaway", visibility = ["//visibility:public"], deps = [ + "//pkg/ddl", + "//pkg/infoschema", + "//pkg/kv", "//pkg/metrics", + "//pkg/parser/model", + "//pkg/parser/terror", + "//pkg/sessionctx", + "//pkg/ttl/cache", + "//pkg/ttl/sqlbuilder", + "//pkg/types", + "//pkg/util", + "//pkg/util/chunk", "//pkg/util/dbterror/exeerrors", "//pkg/util/generic", "//pkg/util/logutil", + "//pkg/util/sqlexec", "@com_github_jellydator_ttlcache_v3//:ttlcache", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/resource_manager", "@com_github_prometheus_client_golang//prometheus", "@com_github_tikv_client_go_v2//tikv", diff --git a/pkg/resourcegroup/runaway/checker.go b/pkg/resourcegroup/runaway/checker.go new file mode 100644 index 0000000000000..6c06b157990d1 --- /dev/null +++ b/pkg/resourcegroup/runaway/checker.go @@ -0,0 +1,299 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runaway + +import ( + "fmt" + "strings" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" + "go.uber.org/zap" +) + +// Checker is used to check if the query is runaway. +type Checker struct { + manager *Manager + resourceGroupName string + originalSQL string + sqlDigest string + planDigest string + + deadline time.Time + // From the group runaway settings, which will be applied when a query lacks a specified watch rule. + settings *rmpb.RunawaySettings + + // markedByRule is set to true when the query matches the group runaway settings. + markedByRule atomic.Bool + // markedByWatch is set to true when the query matches the specified watch rules. + markedByWatch bool + watchAction rmpb.RunawayAction +} + +// NewChecker creates a new RunawayChecker. +func NewChecker( + manager *Manager, + resourceGroupName string, settings *rmpb.RunawaySettings, + originalSQL, sqlDigest, planDigest string, startTime time.Time, +) *Checker { + c := &Checker{ + manager: manager, + resourceGroupName: resourceGroupName, + originalSQL: originalSQL, + sqlDigest: sqlDigest, + planDigest: planDigest, + settings: settings, + markedByRule: atomic.Bool{}, + markedByWatch: false, + } + if settings != nil { + c.deadline = startTime.Add(time.Duration(settings.Rule.ExecElapsedTimeMs) * time.Millisecond) + } + return c +} + +// DeriveChecker derives a RunawayChecker from the given resource group +func (rm *Manager) DeriveChecker(resourceGroupName, originalSQL, sqlDigest, planDigest string, startTime time.Time) *Checker { + group, err := rm.ResourceGroupCtl.GetResourceGroup(resourceGroupName) + if err != nil || group == nil { + logutil.BgLogger().Warn("cannot setup up runaway checker", zap.Error(err)) + return nil + } + rm.ActiveLock.RLock() + defer rm.ActiveLock.RUnlock() + if group.RunawaySettings == nil && rm.ActiveGroup[resourceGroupName] == 0 { + return nil + } + counter, ok := rm.MetricsMap.Load(resourceGroupName) + if !ok { + counter = metrics.RunawayCheckerCounter.WithLabelValues(resourceGroupName, "hit", "") + rm.MetricsMap.Store(resourceGroupName, counter) + } + counter.Inc() + return NewChecker(rm, resourceGroupName, group.RunawaySettings, originalSQL, sqlDigest, planDigest, startTime) +} + +// BeforeExecutor checks whether query is in watch list before executing and after compiling. +func (r *Checker) BeforeExecutor() error { + if r == nil { + return nil + } + // Check if the query matches any specified watch rules. + for _, convict := range r.getConvictIdentifiers() { + watched, action := r.manager.examineWatchList(r.resourceGroupName, convict) + if !watched { + continue + } + // Use the group runaway settings if none are provided. + if action == rmpb.RunawayAction_NoneAction && r.settings != nil { + action = r.settings.Action + } + // Mark it if this is the first time being watched. + r.markRunawayByWatch(action) + // Take action if needed. + switch action { + case rmpb.RunawayAction_Kill: + // Return an error to interrupt the query. + return exeerrors.ErrResourceGroupQueryRunawayQuarantine + case rmpb.RunawayAction_CoolDown: + // This action will be handled in `BeforeCopRequest`. + return nil + case rmpb.RunawayAction_DryRun: + // Noop. + return nil + default: + // Continue to examine other convicts. + } + } + return nil +} + +// BeforeCopRequest checks runaway and modifies the request if necessary before sending coprocessor request. +func (r *Checker) BeforeCopRequest(req *tikvrpc.Request) error { + if r == nil { + return nil + } + // If the group settings are not available, and it's not marked by watch, skip this part. + if r.settings == nil && !r.markedByWatch { + return nil + } + // If it's marked by watch and the action is cooldown, override the priority, + if r.markedByWatch && r.watchAction == rmpb.RunawayAction_CoolDown { + req.ResourceControlContext.OverridePriority = 1 // set priority to lowest + } + // If group settings are available and the query is not marked by a rule, + // verify if it matches any rules in the settings. + if r.settings != nil && !r.markedByRule.Load() { + now := time.Now() + until := r.deadline.Sub(now) + if until > 0 { + if r.settings.Action == rmpb.RunawayAction_Kill { + // if the execution time is close to the threshold, set a timeout + if until < tikv.ReadTimeoutMedium { + req.Context.MaxExecutionDurationMs = uint64(until.Milliseconds()) + } + } + return nil + } + // execution time exceeds the threshold, mark the query as runaway + r.markRunawayByIdentify(r.settings.Action, &now) + // Take action if needed. + switch r.settings.Action { + case rmpb.RunawayAction_Kill: + return exeerrors.ErrResourceGroupQueryRunawayInterrupted + case rmpb.RunawayAction_CoolDown: + req.ResourceControlContext.OverridePriority = 1 // set priority to lowest + return nil + default: + return nil + } + } + return nil +} + +// CheckCopRespError checks TiKV error after receiving coprocessor response. +func (r *Checker) CheckCopRespError(err error) error { + if r == nil { + return err + } + failpoint.Inject("sleepCoprAfterReq", func(v failpoint.Value) { + //nolint:durationcheck + value := v.(int) + time.Sleep(time.Millisecond * time.Duration(value)) + if value > 50 { + err = errors.Errorf("Coprocessor task terminated due to exceeding the deadline") + } + }) + if err == nil || r.settings == nil || r.settings.Action != rmpb.RunawayAction_Kill { + return err + } + if strings.HasPrefix(err.Error(), "Coprocessor task terminated due to exceeding the deadline") { + if !r.markedByRule.Load() { + now := time.Now() + if r.deadline.Before(now) && r.markRunawayByIdentify(r.settings.Action, &now) { + return exeerrors.ErrResourceGroupQueryRunawayInterrupted + } + } + // Due to concurrency, check again. + if r.markedByRule.Load() { + return exeerrors.ErrResourceGroupQueryRunawayInterrupted + } + } + return err +} + +// CheckAction is used to check current action of the query. +// It's safe to call this method concurrently. +func (r *Checker) CheckAction() rmpb.RunawayAction { + if r == nil { + return rmpb.RunawayAction_NoneAction + } + if r.markedByWatch { + return r.watchAction + } + if r.markedByRule.Load() { + return r.settings.Action + } + return rmpb.RunawayAction_NoneAction +} + +// CheckRuleKillAction checks whether the query should be killed according to the group settings. +func (r *Checker) CheckRuleKillAction() bool { + // If the group settings are not available, and it's not marked by watch, skip this part. + if r == nil || r.settings == nil && !r.markedByWatch { + return false + } + // If the group settings are available, and it's not marked by rule, check the execution time. + if r.settings != nil && !r.markedByRule.Load() { + now := time.Now() + until := r.deadline.Sub(now) + if until > 0 { + return false + } + r.markRunawayByIdentify(r.settings.Action, &now) + return r.settings.Action == rmpb.RunawayAction_Kill + } + return false +} + +// Rule returns the rule of the runaway checker. +func (r *Checker) Rule() string { + var execElapsedTime time.Duration + if r.settings != nil { + execElapsedTime = time.Duration(r.settings.Rule.ExecElapsedTimeMs) * time.Millisecond + } + return fmt.Sprintf("execElapsedTime:%s", execElapsedTime) +} + +func (r *Checker) markQuarantine(now *time.Time) { + if r.settings == nil || r.settings.Watch == nil { + return + } + ttl := time.Duration(r.settings.Watch.LastingDurationMs) * time.Millisecond + + r.manager.markQuarantine(r.resourceGroupName, r.getSettingConvictIdentifier(), r.settings.Watch.Type, r.settings.Action, ttl, now) +} + +func (r *Checker) markRunawayByIdentify(action rmpb.RunawayAction, now *time.Time) bool { + swapped := r.markedByRule.CompareAndSwap(false, true) + if swapped { + r.markRunaway("identify", action, now) + if !r.markedByWatch { + r.markQuarantine(now) + } + } + return swapped +} + +func (r *Checker) markRunawayByWatch(action rmpb.RunawayAction) { + r.markedByWatch = true + r.watchAction = action + now := time.Now() + r.markRunaway("watch", action, &now) +} + +func (r *Checker) markRunaway(matchType string, action rmpb.RunawayAction, now *time.Time) { + actionStr := strings.ToLower(action.String()) + metrics.RunawayCheckerCounter.WithLabelValues(r.resourceGroupName, matchType, actionStr).Inc() + r.manager.markRunaway(r.resourceGroupName, r.originalSQL, r.planDigest, actionStr, matchType, now) +} + +func (r *Checker) getSettingConvictIdentifier() string { + if r == nil || r.settings == nil || r.settings.Watch == nil { + return "" + } + switch r.settings.Watch.Type { + case rmpb.RunawayWatchType_Plan: + return r.planDigest + case rmpb.RunawayWatchType_Similar: + return r.sqlDigest + case rmpb.RunawayWatchType_Exact: + return r.originalSQL + default: + return "" + } +} + +func (r *Checker) getConvictIdentifiers() []string { + return []string{r.originalSQL, r.sqlDigest, r.planDigest} +} diff --git a/pkg/resourcegroup/runaway/manager.go b/pkg/resourcegroup/runaway/manager.go new file mode 100644 index 0000000000000..b91af5eca8c3e --- /dev/null +++ b/pkg/resourcegroup/runaway/manager.go @@ -0,0 +1,440 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runaway + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/jellydator/ttlcache/v3" + "github.com/pingcap/failpoint" + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/generic" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/prometheus/client_golang/prometheus" + rmclient "github.com/tikv/pd/client/resource_group/controller" + "go.uber.org/zap" +) + +const ( + // ManualSource shows the item added manually. + ManualSource = "manual" + + // MaxWaitDuration is the max duration to wait for acquiring token buckets. + MaxWaitDuration = time.Second * 30 + maxWatchListCap = 10000 + maxWatchRecordChannelSize = 1024 + + runawayRecordFlushInterval = time.Second + runawayRecordGCInterval = time.Hour * 24 + runawayRecordExpiredDuration = time.Hour * 24 * 7 + + runawayRecordGCBatchSize = 100 + runawayRecordGCSelectBatchSize = runawayRecordGCBatchSize * 5 + runawayLoopLogErrorIntervalCount = 1800 +) + +// Manager is used to detect and record runaway queries. +type Manager struct { + logOnce sync.Once + exit chan struct{} + // queryLock is used to avoid repeated additions. Since we will add new items to the system table, + // in order to avoid repeated additions, we need a lock to ensure that + // action "judging whether there is this record in the current watch list and adding records" have atomicity. + queryLock sync.Mutex + watchList *ttlcache.Cache[string, *QuarantineRecord] + // activeGroup is used to manage the active runaway watches of resource group + ActiveGroup map[string]int64 + ActiveLock sync.RWMutex + MetricsMap generic.SyncMap[string, prometheus.Counter] + + ResourceGroupCtl *rmclient.ResourceGroupsController + serverID string + runawayQueriesChan chan *Record + quarantineChan chan *QuarantineRecord + // staleQuarantineRecord is used to clean outdated record. There are three scenarios: + // 1. Record is expired in watch list. + // 2. The record that will be added is itself out of date. + // Like that tidb cluster is paused, and record is expired when restarting. + // 3. Duplicate added records. + // It replaces clean up loop. + staleQuarantineRecord chan *QuarantineRecord + evictionCancel func() + insertionCancel func() + + syncerInitialized atomic.Bool + + // domain related fields + infoCache *infoschema.InfoCache + ddl ddl.DDL + + // syncer is used to sync runaway watch records. + runawaySyncer *syncer + sysSessionPool util.SessionPool +} + +// NewRunawayManager creates a new Manager. +func NewRunawayManager(resourceGroupCtl *rmclient.ResourceGroupsController, serverAddr string, + pool util.SessionPool, exit chan struct{}, infoCache *infoschema.InfoCache, ddl ddl.DDL) *Manager { + watchList := ttlcache.New[string, *QuarantineRecord]( + ttlcache.WithTTL[string, *QuarantineRecord](ttlcache.NoTTL), + ttlcache.WithCapacity[string, *QuarantineRecord](maxWatchListCap), + ttlcache.WithDisableTouchOnHit[string, *QuarantineRecord](), + ) + go watchList.Start() + staleQuarantineChan := make(chan *QuarantineRecord, maxWatchRecordChannelSize) + m := &Manager{ + syncerInitialized: atomic.Bool{}, + ResourceGroupCtl: resourceGroupCtl, + watchList: watchList, + serverID: serverAddr, + runawayQueriesChan: make(chan *Record, maxWatchRecordChannelSize), + quarantineChan: make(chan *QuarantineRecord, maxWatchRecordChannelSize), + staleQuarantineRecord: staleQuarantineChan, + ActiveGroup: make(map[string]int64), + MetricsMap: generic.NewSyncMap[string, prometheus.Counter](8), + sysSessionPool: pool, + exit: exit, + infoCache: infoCache, + ddl: ddl, + } + m.insertionCancel = watchList.OnInsertion(func(_ context.Context, i *ttlcache.Item[string, *QuarantineRecord]) { + m.ActiveLock.Lock() + m.ActiveGroup[i.Value().ResourceGroupName]++ + m.ActiveLock.Unlock() + }) + m.evictionCancel = watchList.OnEviction(func(_ context.Context, _ ttlcache.EvictionReason, i *ttlcache.Item[string, *QuarantineRecord]) { + m.ActiveLock.Lock() + m.ActiveGroup[i.Value().ResourceGroupName]-- + m.ActiveLock.Unlock() + if i.Value().ID == 0 { + return + } + staleQuarantineChan <- i.Value() + }) + m.runawaySyncer = newSyncer(pool) + + return m +} + +// RunawayRecordFlushLoop is used to flush runaway records. +func (rm *Manager) RunawayRecordFlushLoop() { + defer util.Recover(metrics.LabelDomain, "runawayRecordFlushLoop", nil, false) + + // this times used to batch flushing records, with 1s duration, + // we can guarantee a watch record can be seen by the user within 1s. + runawayRecordFlushTimer := time.NewTimer(runawayRecordFlushInterval) + runawayRecordGCTicker := time.NewTicker(runawayRecordGCInterval) + failpoint.Inject("FastRunawayGC", func() { + runawayRecordFlushTimer.Stop() + runawayRecordGCTicker.Stop() + runawayRecordFlushTimer = time.NewTimer(time.Millisecond * 50) + runawayRecordGCTicker = time.NewTicker(time.Millisecond * 200) + }) + + fired := false + recordCh := rm.runawayRecordChan() + quarantineRecordCh := rm.quarantineRecordChan() + staleQuarantineRecordCh := rm.staleQuarantineRecordChan() + flushThreshold := flushThreshold() + records := make([]*Record, 0, flushThreshold) + + flushRunawayRecords := func() { + if len(records) == 0 { + return + } + sql, params := genRunawayQueriesStmt(records) + if _, err := ExecRCRestrictedSQL(rm.sysSessionPool, sql, params); err != nil { + logutil.BgLogger().Error("flush runaway records failed", zap.Error(err), zap.Int("count", len(records))) + } + records = records[:0] + } + + for { + select { + case <-rm.exit: + return + case <-runawayRecordFlushTimer.C: + flushRunawayRecords() + fired = true + case r := <-recordCh: + records = append(records, r) + failpoint.Inject("FastRunawayGC", func() { + flushRunawayRecords() + }) + if len(records) >= flushThreshold { + flushRunawayRecords() + } else if fired { + fired = false + // meet a new record, reset the timer. + runawayRecordFlushTimer.Reset(runawayRecordFlushInterval) + } + case <-runawayRecordGCTicker.C: + go rm.deleteExpiredRows(runawayRecordExpiredDuration) + case r := <-quarantineRecordCh: + go func() { + _, err := rm.AddRunawayWatch(r) + if err != nil { + logutil.BgLogger().Error("add runaway watch", zap.Error(err)) + } + }() + case r := <-staleQuarantineRecordCh: + go func() { + for i := 0; i < 3; i++ { + err := handleRemoveStaleRunawayWatch(rm.sysSessionPool, r) + if err == nil { + break + } + logutil.BgLogger().Error("remove stale runaway watch", zap.Error(err)) + time.Sleep(time.Second) + } + }() + } + } +} + +// RunawayWatchSyncLoop is used to sync runaway watch records. +func (rm *Manager) RunawayWatchSyncLoop() { + defer util.Recover(metrics.LabelDomain, "runawayWatchSyncLoop", nil, false) + runawayWatchSyncTicker := time.NewTicker(watchSyncInterval) + count := 0 + for { + select { + case <-rm.exit: + return + case <-runawayWatchSyncTicker.C: + err := rm.UpdateNewAndDoneWatch() + if err != nil { + if count %= runawayLoopLogErrorIntervalCount; count == 0 { + logutil.BgLogger().Warn("get runaway watch record failed", zap.Error(err)) + } + count++ + } + } + } +} + +func (rm *Manager) markQuarantine(resourceGroupName, convict string, watchType rmpb.RunawayWatchType, action rmpb.RunawayAction, ttl time.Duration, now *time.Time) { + var endTime time.Time + if ttl > 0 { + endTime = now.UTC().Add(ttl) + } + record := &QuarantineRecord{ + ResourceGroupName: resourceGroupName, + StartTime: now.UTC(), + EndTime: endTime, + Watch: watchType, + WatchText: convict, + Source: rm.serverID, + Action: action, + } + // Add record without ID into watch list in this TiDB right now. + rm.addWatchList(record, ttl, false) + if !rm.syncerInitialized.Load() { + rm.logOnce.Do(func() { + logutil.BgLogger().Warn("runaway syncer is not initialized, so can't records about runaway") + }) + return + } + select { + case rm.quarantineChan <- record: + default: + // TODO: add warning for discard flush records + } +} + +func (rm *Manager) addWatchList(record *QuarantineRecord, ttl time.Duration, force bool) { + key := record.getRecordKey() + // This is a pre-check, because we generally believe that in most cases, we will not add a watch list to a key repeatedly. + item := rm.getWatchFromWatchList(key) + if force { + rm.queryLock.Lock() + defer rm.queryLock.Unlock() + if item != nil { + // check the ID because of the earlier scan. + if item.ID == record.ID { + return + } + rm.watchList.Delete(key) + } + rm.watchList.Set(key, record, ttl) + } else { + if item == nil { + rm.queryLock.Lock() + // When watchList get record, it will check whether the record is stale, so add new record if returns nil. + if rm.watchList.Get(key) == nil { + rm.watchList.Set(key, record, ttl) + } else { + rm.staleQuarantineRecord <- record + } + rm.queryLock.Unlock() + } else if item.ID == 0 { + // to replace the record without ID. + rm.queryLock.Lock() + defer rm.queryLock.Unlock() + rm.watchList.Set(key, record, ttl) + } else if item.ID != record.ID { + // check the ID because of the earlier scan. + rm.staleQuarantineRecord <- record + } + } +} + +// GetWatchList is used to get all watch items. +func (rm *Manager) GetWatchList() []*QuarantineRecord { + items := rm.watchList.Items() + ret := make([]*QuarantineRecord, 0, len(items)) + for _, item := range items { + ret = append(ret, item.Value()) + } + return ret +} + +func (rm *Manager) getWatchFromWatchList(key string) *QuarantineRecord { + item := rm.watchList.Get(key) + if item != nil { + return item.Value() + } + return nil +} + +func (rm *Manager) markRunaway(resourceGroupName, originalSQL, planDigest, action, matchType string, now *time.Time) { + source := rm.serverID + if !rm.syncerInitialized.Load() { + rm.logOnce.Do(func() { + logutil.BgLogger().Warn("runaway syncer is not initialized, so can't records about runaway") + }) + return + } + select { + case rm.runawayQueriesChan <- &Record{ + ResourceGroupName: resourceGroupName, + Time: *now, + Match: matchType, + Action: action, + SQLText: originalSQL, + PlanDigest: planDigest, + Source: source, + }: + default: + // TODO: add warning for discard flush records + } +} + +// runawayRecordChan returns the channel of Record +func (rm *Manager) runawayRecordChan() <-chan *Record { + return rm.runawayQueriesChan +} + +// quarantineRecordChan returns the channel of QuarantineRecord +func (rm *Manager) quarantineRecordChan() <-chan *QuarantineRecord { + return rm.quarantineChan +} + +// staleQuarantineRecordChan returns the channel of staleQuarantineRecord +func (rm *Manager) staleQuarantineRecordChan() <-chan *QuarantineRecord { + return rm.staleQuarantineRecord +} + +// examineWatchList check whether the query is in watch list. +func (rm *Manager) examineWatchList(resourceGroupName string, convict string) (bool, rmpb.RunawayAction) { + item := rm.getWatchFromWatchList(resourceGroupName + "/" + convict) + if item == nil { + return false, 0 + } + return true, item.Action +} + +// Stop stops the watchList which is a ttlCache. +func (rm *Manager) Stop() { + if rm == nil { + return + } + if rm.watchList != nil { + rm.watchList.Stop() + } +} + +// UpdateNewAndDoneWatch is used to update new and done watch items. +func (rm *Manager) UpdateNewAndDoneWatch() error { + rm.runawaySyncer.mu.Lock() + defer rm.runawaySyncer.mu.Unlock() + records, err := rm.runawaySyncer.getNewWatchRecords() + if err != nil { + return err + } + for _, r := range records { + rm.AddWatch(r) + } + doneRecords, err := rm.runawaySyncer.getNewWatchDoneRecords() + if err != nil { + return err + } + for _, r := range doneRecords { + rm.removeWatch(r) + } + return nil +} + +// AddWatch is used to add watch items from system table. +func (rm *Manager) AddWatch(record *QuarantineRecord) { + ttl := time.Until(record.EndTime) + if record.EndTime.Equal(NullTime) { + ttl = 0 + } else if ttl <= 0 { + rm.staleQuarantineRecord <- record + return + } + + force := false + // The manual record replaces the old record. + force = record.Source == ManualSource + rm.addWatchList(record, ttl, force) +} + +// removeWatch is used to remove watch item, and this action is triggered by reading done watch system table. +func (rm *Manager) removeWatch(record *QuarantineRecord) { + // we should check whether the cached record is not the same as the removing record. + rm.queryLock.Lock() + defer rm.queryLock.Unlock() + item := rm.getWatchFromWatchList(record.getRecordKey()) + if item == nil { + return + } + if item.ID == record.ID { + rm.watchList.Delete(record.getRecordKey()) + } +} + +// FlushThreshold specifies the threshold for the number of records in trigger flush +func flushThreshold() int { + return maxWatchRecordChannelSize / 2 +} + +// MarkSyncerInitialized is used to mark the syncer is initialized. +func (rm *Manager) MarkSyncerInitialized() { + rm.syncerInitialized.Store(true) +} + +// IsSyncerInitialized is only used for test. +func (rm *Manager) IsSyncerInitialized() bool { + return rm.syncerInitialized.Load() +} diff --git a/pkg/resourcegroup/runaway/record.go b/pkg/resourcegroup/runaway/record.go new file mode 100644 index 0000000000000..fb828686b576b --- /dev/null +++ b/pkg/resourcegroup/runaway/record.go @@ -0,0 +1,407 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runaway + +import ( + "context" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/ttl/cache" + "github.com/pingcap/tidb/pkg/ttl/sqlbuilder" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "go.uber.org/zap" +) + +const ( + // watchTableName is the name of system table which save runaway watch items. + watchTableName = "mysql.tidb_runaway_watch" + // watchDoneTableName is the name of system table which save done runaway watch items. + watchDoneTableName = "mysql.tidb_runaway_watch_done" + + maxIDRetries = 3 +) + +// NullTime is a zero time.Time. +var NullTime time.Time + +// Record is used to save records which will be inserted into mysql.tidb_runaway_queries. +type Record struct { + ResourceGroupName string + Time time.Time + Match string + Action string + SQLText string + PlanDigest string + Source string +} + +// genRunawayQueriesStmt generates statement with given RunawayRecords. +func genRunawayQueriesStmt(records []*Record) (string, []any) { + var builder strings.Builder + params := make([]any, 0, len(records)*7) + builder.WriteString("insert into mysql.tidb_runaway_queries VALUES ") + for count, r := range records { + if count > 0 { + builder.WriteByte(',') + } + builder.WriteString("(%?, %?, %?, %?, %?, %?, %?)") + params = append(params, r.ResourceGroupName) + params = append(params, r.Time) + params = append(params, r.Match) + params = append(params, r.Action) + params = append(params, r.SQLText) + params = append(params, r.PlanDigest) + params = append(params, r.Source) + } + return builder.String(), params +} + +// QuarantineRecord is used to save records which will be insert into mysql.tidb_runaway_watch. +type QuarantineRecord struct { + ID int64 + ResourceGroupName string + // startTime and endTime are in UTC. + StartTime time.Time + EndTime time.Time + Watch rmpb.RunawayWatchType + WatchText string + Source string + Action rmpb.RunawayAction +} + +// getRecordKey is used to get the key in ttl cache. +func (r *QuarantineRecord) getRecordKey() string { + return r.ResourceGroupName + "/" + r.WatchText +} + +func writeInsert(builder *strings.Builder, tableName string) { + builder.WriteString("insert into ") + builder.WriteString(tableName) + builder.WriteString(" VALUES ") +} + +// genInsertionStmt is used to generate insertion sql. +func (r *QuarantineRecord) genInsertionStmt() (string, []any) { + var builder strings.Builder + params := make([]any, 0, 6) + writeInsert(&builder, watchTableName) + builder.WriteString("(null, %?, %?, %?, %?, %?, %?, %?)") + params = append(params, r.ResourceGroupName) + params = append(params, r.StartTime) + if r.EndTime.Equal(NullTime) { + params = append(params, nil) + } else { + params = append(params, r.EndTime) + } + params = append(params, r.Watch) + params = append(params, r.WatchText) + params = append(params, r.Source) + params = append(params, r.Action) + return builder.String(), params +} + +// genInsertionDoneStmt is used to generate insertion sql for runaway watch done record. +func (r *QuarantineRecord) genInsertionDoneStmt() (string, []any) { + var builder strings.Builder + params := make([]any, 0, 9) + writeInsert(&builder, watchDoneTableName) + builder.WriteString("(null, %?, %?, %?, %?, %?, %?, %?, %?, %?)") + params = append(params, r.ID) + params = append(params, r.ResourceGroupName) + params = append(params, r.StartTime) + if r.EndTime.Equal(NullTime) { + params = append(params, nil) + } else { + params = append(params, r.EndTime) + } + params = append(params, r.Watch) + params = append(params, r.WatchText) + params = append(params, r.Source) + params = append(params, r.Action) + params = append(params, time.Now().UTC()) + return builder.String(), params +} + +// genDeletionStmt is used to generate deletion sql. +func (r *QuarantineRecord) genDeletionStmt() (string, []any) { + var builder strings.Builder + params := make([]any, 0, 1) + builder.WriteString("delete from ") + builder.WriteString(watchTableName) + builder.WriteString(" where id = %?") + params = append(params, r.ID) + return builder.String(), params +} + +func (rm *Manager) deleteExpiredRows(expiredDuration time.Duration) { + const ( + tableName = "tidb_runaway_queries" + colName = "time" + ) + var systemSchemaCIStr = model.NewCIStr("mysql") + + if !rm.ddl.OwnerManager().IsOwner() { + return + } + failpoint.Inject("FastRunawayGC", func() { + expiredDuration = time.Second * 1 + }) + expiredTime := time.Now().Add(-expiredDuration) + tbCIStr := model.NewCIStr(tableName) + tbl, err := rm.infoCache.GetLatest().TableByName(context.Background(), systemSchemaCIStr, tbCIStr) + if err != nil { + logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) + return + } + tbInfo := tbl.Meta() + col := tbInfo.FindPublicColumnByName(colName) + if col == nil { + logutil.BgLogger().Error("time column is not public in table", zap.String("table", tableName), zap.String("column", colName)) + return + } + tb, err := cache.NewBasePhysicalTable(systemSchemaCIStr, tbInfo, model.NewCIStr(""), col) + if err != nil { + logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) + return + } + generator, err := sqlbuilder.NewScanQueryGenerator(tb, expiredTime, nil, nil) + if err != nil { + logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) + return + } + var leftRows [][]types.Datum + for { + sql := "" + if sql, err = generator.NextSQL(leftRows, runawayRecordGCSelectBatchSize); err != nil { + logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) + return + } + // to remove + if len(sql) == 0 { + return + } + + rows, sqlErr := ExecRCRestrictedSQL(rm.sysSessionPool, sql, nil) + if sqlErr != nil { + logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err)) + return + } + leftRows = make([][]types.Datum, len(rows)) + for i, row := range rows { + leftRows[i] = row.GetDatumRow(tb.KeyColumnTypes) + } + + for len(leftRows) > 0 { + var delBatch [][]types.Datum + if len(leftRows) < runawayRecordGCBatchSize { + delBatch = leftRows + leftRows = nil + } else { + delBatch = leftRows[0:runawayRecordGCBatchSize] + leftRows = leftRows[runawayRecordGCBatchSize:] + } + sql, err := sqlbuilder.BuildDeleteSQL(tb, delBatch, expiredTime) + if err != nil { + logutil.BgLogger().Error( + "build delete SQL failed when deleting system table", + zap.Error(err), + zap.String("table", tb.Schema.O+"."+tb.Name.O), + ) + return + } + + _, err = ExecRCRestrictedSQL(rm.sysSessionPool, sql, nil) + if err != nil { + logutil.BgLogger().Error( + "delete SQL failed when deleting system table", zap.Error(err), zap.String("SQL", sql), + ) + } + } + } +} + +func handleRemoveStaleRunawayWatch(sysSessionPool util.SessionPool, record *QuarantineRecord) error { + se, err := sysSessionPool.Get() + defer func() { + sysSessionPool.Put(se) + }() + if err != nil { + return errors.Annotate(err, "get session failed") + } + sctx := se.(sessionctx.Context) + exec := sctx.GetSQLExecutor() + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) + _, err = exec.ExecuteInternal(ctx, "BEGIN") + if err != nil { + return errors.Trace(err) + } + defer func() { + if err != nil { + _, err1 := exec.ExecuteInternal(ctx, "ROLLBACK") + terror.Log(err1) + return + } + _, err = exec.ExecuteInternal(ctx, "COMMIT") + if err != nil { + return + } + }() + sql, params := record.genDeletionStmt() + _, err = exec.ExecuteInternal(ctx, sql, params...) + return err +} + +func handleRunawayWatchDone(sysSessionPool util.SessionPool, record *QuarantineRecord) error { + se, err := sysSessionPool.Get() + defer func() { + sysSessionPool.Put(se) + }() + if err != nil { + return errors.Annotate(err, "get session failed") + } + sctx := se.(sessionctx.Context) + exec := sctx.GetSQLExecutor() + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) + _, err = exec.ExecuteInternal(ctx, "BEGIN") + if err != nil { + return errors.Trace(err) + } + defer func() { + if err != nil { + _, err1 := exec.ExecuteInternal(ctx, "ROLLBACK") + terror.Log(err1) + return + } + _, err = exec.ExecuteInternal(ctx, "COMMIT") + if err != nil { + return + } + }() + sql, params := record.genInsertionDoneStmt() + _, err = exec.ExecuteInternal(ctx, sql, params...) + if err != nil { + return err + } + sql, params = record.genDeletionStmt() + _, err = exec.ExecuteInternal(ctx, sql, params...) + return err +} + +// ExecRCRestrictedSQL is used to execute a restricted SQL which related to resource control. +func ExecRCRestrictedSQL(sysSessionPool util.SessionPool, sql string, params []any) ([]chunk.Row, error) { + se, err := sysSessionPool.Get() + defer func() { + sysSessionPool.Put(se) + }() + if err != nil { + return nil, errors.Annotate(err, "get session failed") + } + sctx := se.(sessionctx.Context) + exec := sctx.GetRestrictedSQLExecutor() + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) + r, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, + sql, params..., + ) + return r, err +} + +// AddRunawayWatch is used to add runaway watch item manually. +func (rm *Manager) AddRunawayWatch(record *QuarantineRecord) (uint64, error) { + se, err := rm.sysSessionPool.Get() + defer func() { + rm.sysSessionPool.Put(se) + }() + if err != nil { + return 0, errors.Annotate(err, "get session failed") + } + exec := se.(sessionctx.Context).GetSQLExecutor() + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) + _, err = exec.ExecuteInternal(ctx, "BEGIN") + if err != nil { + return 0, errors.Trace(err) + } + defer func() { + if err != nil { + _, err1 := exec.ExecuteInternal(ctx, "ROLLBACK") + terror.Log(err1) + return + } + _, err = exec.ExecuteInternal(ctx, "COMMIT") + if err != nil { + return + } + }() + sql, params := record.genInsertionStmt() + _, err = exec.ExecuteInternal(ctx, sql, params...) + if err != nil { + return 0, err + } + for retry := 0; retry < maxIDRetries; retry++ { + if retry > 0 { + select { + case <-rm.exit: + return 0, err + case <-time.After(time.Millisecond * time.Duration(retry*100)): + logutil.BgLogger().Warn("failed to get last insert id when adding runaway watch", zap.Error(err)) + } + } + var rs sqlexec.RecordSet + rs, err = exec.ExecuteInternal(ctx, `SELECT LAST_INSERT_ID();`) + if err != nil { + continue + } + var rows []chunk.Row + rows, err = sqlexec.DrainRecordSet(ctx, rs, 1) + //nolint: errcheck + rs.Close() + if err != nil { + continue + } + if len(rows) != 1 { + err = errors.Errorf("unexpected result length: %d", len(rows)) + continue + } + return rows[0].GetUint64(0), nil + } + return 0, errors.Errorf("An error: %v occurred while getting the ID of the newly added watch record. Try querying information_schema.runaway_watches later", err) +} + +// RemoveRunawayWatch is used to remove runaway watch item manually. +func (rm *Manager) RemoveRunawayWatch(recordID int64) error { + rm.runawaySyncer.mu.Lock() + defer rm.runawaySyncer.mu.Unlock() + records, err := rm.runawaySyncer.getWatchRecordByID(recordID) + if err != nil { + return err + } + if len(records) != 1 { + return errors.Errorf("no runaway watch with the specific ID") + } + + err = handleRunawayWatchDone(rm.sysSessionPool, records[0]) + return err +} diff --git a/pkg/resourcegroup/runaway/syncer.go b/pkg/resourcegroup/runaway/syncer.go new file mode 100644 index 0000000000000..dcb68269519f2 --- /dev/null +++ b/pkg/resourcegroup/runaway/syncer.go @@ -0,0 +1,189 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runaway + +import ( + "strings" + "sync" + "time" + + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/chunk" +) + +const ( + // watchSyncInterval is the interval to sync the watch record. + watchSyncInterval = time.Second +) + +// Syncer is used to sync the runaway records. +type syncer struct { + newWatchReader *systemTableReader + deletionWatchReader *systemTableReader + sysSessionPool util.SessionPool + + mu sync.Mutex +} + +func newSyncer(sysSessionPool util.SessionPool) *syncer { + return &syncer{ + sysSessionPool: sysSessionPool, + newWatchReader: &systemTableReader{ + watchTableName, + "start_time", + NullTime}, + deletionWatchReader: &systemTableReader{watchDoneTableName, + "done_time", + NullTime}, + } +} + +func (s *syncer) getWatchRecordByID(id int64) ([]*QuarantineRecord, error) { + return s.getWatchRecord(s.newWatchReader, s.newWatchReader.genSelectByIDStmt(id), false) +} + +func (s *syncer) getNewWatchRecords() ([]*QuarantineRecord, error) { + return s.getWatchRecord(s.newWatchReader, s.newWatchReader.genSelectStmt, true) +} + +func (s *syncer) getNewWatchDoneRecords() ([]*QuarantineRecord, error) { + return s.getWatchDoneRecord(s.deletionWatchReader, s.deletionWatchReader.genSelectStmt, true) +} + +func (s *syncer) getWatchRecord(reader *systemTableReader, sqlGenFn func() (string, []any), push bool) ([]*QuarantineRecord, error) { + return getRunawayWatchRecord(s.sysSessionPool, reader, sqlGenFn, push) +} + +func (s *syncer) getWatchDoneRecord(reader *systemTableReader, sqlGenFn func() (string, []any), push bool) ([]*QuarantineRecord, error) { + return getRunawayWatchDoneRecord(s.sysSessionPool, reader, sqlGenFn, push) +} + +func getRunawayWatchRecord(sysSessionPool util.SessionPool, reader *systemTableReader, + sqlGenFn func() (string, []any), push bool) ([]*QuarantineRecord, error) { + rs, err := reader.Read(sysSessionPool, sqlGenFn) + if err != nil { + return nil, err + } + ret := make([]*QuarantineRecord, 0, len(rs)) + now := time.Now().UTC() + for _, r := range rs { + startTime, err := r.GetTime(2).GoTime(time.UTC) + if err != nil { + continue + } + var endTime time.Time + if !r.IsNull(3) { + endTime, err = r.GetTime(3).GoTime(time.UTC) + if err != nil { + continue + } + } + qr := &QuarantineRecord{ + ID: r.GetInt64(0), + ResourceGroupName: r.GetString(1), + StartTime: startTime, + EndTime: endTime, + Watch: rmpb.RunawayWatchType(r.GetInt64(4)), + WatchText: r.GetString(5), + Source: r.GetString(6), + Action: rmpb.RunawayAction(r.GetInt64(7)), + } + // If a TiDB write record slow, it will occur that the record which has earlier start time is inserted later than others. + // So we start the scan a little earlier. + if push { + reader.CheckPoint = now.Add(-3 * watchSyncInterval) + } + ret = append(ret, qr) + } + return ret, nil +} + +func getRunawayWatchDoneRecord(sysSessionPool util.SessionPool, reader *systemTableReader, + sqlGenFn func() (string, []any), push bool) ([]*QuarantineRecord, error) { + rs, err := reader.Read(sysSessionPool, sqlGenFn) + if err != nil { + return nil, err + } + length := len(rs) + ret := make([]*QuarantineRecord, 0, length) + now := time.Now().UTC() + for _, r := range rs { + startTime, err := r.GetTime(3).GoTime(time.UTC) + if err != nil { + continue + } + var endTime time.Time + if !r.IsNull(4) { + endTime, err = r.GetTime(4).GoTime(time.UTC) + if err != nil { + continue + } + } + qr := &QuarantineRecord{ + ID: r.GetInt64(1), + ResourceGroupName: r.GetString(2), + StartTime: startTime, + EndTime: endTime, + Watch: rmpb.RunawayWatchType(r.GetInt64(5)), + WatchText: r.GetString(6), + Source: r.GetString(7), + Action: rmpb.RunawayAction(r.GetInt64(8)), + } + // Ditto as getRunawayWatchRecord. + if push { + reader.CheckPoint = now.Add(-3 * watchSyncInterval) + } + ret = append(ret, qr) + } + return ret, nil +} + +// SystemTableReader is used to read table `runaway_watch` and `runaway_watch_done`. +type systemTableReader struct { + TableName string + KeyCol string + CheckPoint time.Time +} + +func (r *systemTableReader) genSelectByIDStmt(id int64) func() (string, []any) { + return func() (string, []any) { + var builder strings.Builder + params := make([]any, 0, 1) + builder.WriteString("select * from ") + builder.WriteString(r.TableName) + builder.WriteString(" where id = %?") + params = append(params, id) + return builder.String(), params + } +} + +func (r *systemTableReader) genSelectStmt() (string, []any) { + var builder strings.Builder + params := make([]any, 0, 1) + builder.WriteString("select * from ") + builder.WriteString(r.TableName) + builder.WriteString(" where ") + builder.WriteString(r.KeyCol) + builder.WriteString(" > %? order by ") + builder.WriteString(r.KeyCol) + params = append(params, r.CheckPoint) + return builder.String(), params +} + +func (*systemTableReader) Read(sysSessionPool util.SessionPool, genFn func() (string, []any)) ([]chunk.Row, error) { + sql, params := genFn() + return ExecRCRestrictedSQL(sysSessionPool, sql, params) +} diff --git a/pkg/ddl/tests/resourcegroup/BUILD.bazel b/pkg/resourcegroup/tests/BUILD.bazel similarity index 95% rename from pkg/ddl/tests/resourcegroup/BUILD.bazel rename to pkg/resourcegroup/tests/BUILD.bazel index 647c6e100598c..f48f0aa7e9b6c 100644 --- a/pkg/ddl/tests/resourcegroup/BUILD.bazel +++ b/pkg/resourcegroup/tests/BUILD.bazel @@ -1,7 +1,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test") go_test( - name = "resourcegroup_test", + name = "tests_test", timeout = "short", srcs = ["resource_group_test.go"], flaky = True, diff --git a/pkg/ddl/tests/resourcegroup/resource_group_test.go b/pkg/resourcegroup/tests/resource_group_test.go similarity index 98% rename from pkg/ddl/tests/resourcegroup/resource_group_test.go rename to pkg/resourcegroup/tests/resource_group_test.go index 653bed499cce7..85ef7c7ddbc95 100644 --- a/pkg/ddl/tests/resourcegroup/resource_group_test.go +++ b/pkg/resourcegroup/tests/resource_group_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package resourcegrouptest_test +package tests import ( "context" @@ -288,9 +288,9 @@ func testResourceGroupNameFromIS(t *testing.T, ctx sessionctx.Context, name stri } func TestResourceGroupRunaway(t *testing.T) { - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/FastRunawayGC", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(true)`)) defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/FastRunawayGC")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC")) }() store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) @@ -361,23 +361,23 @@ func TestResourceGroupRunaway(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprRequest")) tk.MustExec("create resource group rg4 BURSTABLE RU_PER_SEC=2000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' action KILL WATCH EXACT)") - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq", fmt.Sprintf("return(%d)", 50))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/sleepCoprAfterReq", fmt.Sprintf("return(%d)", 50))) tk.MustQuery("select /*+ resource_group(rg4) */ * from t").Check(testkit.Rows("1")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/sleepCoprAfterReq")) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq", fmt.Sprintf("return(%d)", 60))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/sleepCoprAfterReq", fmt.Sprintf("return(%d)", 60))) err = tk.QueryToErr("select /*+ resource_group(rg4) */ * from t") require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") tk.MustGetErrCode("select /*+ resource_group(rg4) */ * from t", mysql.ErrResourceGroupQueryRunawayQuarantine) tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_watch", nil, testkit.Rows("rg3 select /*+ resource_group(rg3) */ * from t", "rg4 select /*+ resource_group(rg4) */ * from t"), maxWaitDuration, tryInterval) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/sleepCoprAfterReq")) } func TestResourceGroupRunawayExceedTiDBSide(t *testing.T) { - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/FastRunawayGC", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(true)`)) defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/FastRunawayGC")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC")) }() store, dom := testkit.CreateMockStoreAndDomain(t) sv := server.CreateMockServer(t, store) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 91f3218c3ab71..4571f5564fbd1 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -24,7 +24,6 @@ go_library( "//pkg/config", "//pkg/domain", "//pkg/domain/infosync", - "//pkg/domain/resourcegroup", "//pkg/errno", "//pkg/executor", "//pkg/executor/mppcoordmanager", @@ -49,6 +48,7 @@ go_library( "//pkg/privilege/conn", "//pkg/privilege/privileges", "//pkg/privilege/privileges/ldap", + "//pkg/resourcegroup", "//pkg/server/err", "//pkg/server/handler", "//pkg/server/handler/extractorhandler", diff --git a/pkg/server/conn.go b/pkg/server/conn.go index 71cbc5ca27545..1c30a851c2868 100644 --- a/pkg/server/conn.go +++ b/pkg/server/conn.go @@ -60,7 +60,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/domain/infosync" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/executor" "github.com/pingcap/tidb/pkg/extension" @@ -80,6 +79,7 @@ import ( "github.com/pingcap/tidb/pkg/privilege" "github.com/pingcap/tidb/pkg/privilege/conn" "github.com/pingcap/tidb/pkg/privilege/privileges/ldap" + "github.com/pingcap/tidb/pkg/resourcegroup" servererr "github.com/pingcap/tidb/pkg/server/err" "github.com/pingcap/tidb/pkg/server/handler/tikvhandler" "github.com/pingcap/tidb/pkg/server/internal" diff --git a/pkg/server/metrics/BUILD.bazel b/pkg/server/metrics/BUILD.bazel index f3bb2fb5e6f03..8d5716ccb0e1d 100644 --- a/pkg/server/metrics/BUILD.bazel +++ b/pkg/server/metrics/BUILD.bazel @@ -6,9 +6,9 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/server/metrics", visibility = ["//visibility:public"], deps = [ - "//pkg/domain/resourcegroup", "//pkg/metrics", "//pkg/parser/mysql", + "//pkg/resourcegroup", "@com_github_prometheus_client_golang//prometheus", ], ) diff --git a/pkg/server/metrics/metrics.go b/pkg/server/metrics/metrics.go index 24cd44828341d..b0aaf83f42abd 100644 --- a/pkg/server/metrics/metrics.go +++ b/pkg/server/metrics/metrics.go @@ -17,9 +17,9 @@ package metrics import ( "strconv" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/prometheus/client_golang/prometheus" ) diff --git a/pkg/server/server.go b/pkg/server/server.go index d745b0dcd93a9..2c20135c4a659 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -53,7 +53,6 @@ import ( autoid "github.com/pingcap/tidb/pkg/autoid_service" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/domain" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/executor/mppcoordmanager" "github.com/pingcap/tidb/pkg/extension" "github.com/pingcap/tidb/pkg/kv" @@ -65,6 +64,7 @@ import ( "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/plugin" "github.com/pingcap/tidb/pkg/privilege/privileges" + "github.com/pingcap/tidb/pkg/resourcegroup" servererr "github.com/pingcap/tidb/pkg/server/err" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/session/txninfo" diff --git a/pkg/sessionctx/stmtctx/BUILD.bazel b/pkg/sessionctx/stmtctx/BUILD.bazel index 8de5e9824d24e..c13132fa87838 100644 --- a/pkg/sessionctx/stmtctx/BUILD.bazel +++ b/pkg/sessionctx/stmtctx/BUILD.bazel @@ -7,12 +7,12 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/distsql/context", - "//pkg/domain/resourcegroup", "//pkg/errctx", "//pkg/parser", "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/parser/terror", + "//pkg/resourcegroup", "//pkg/statistics/handle/usage/indexusage", "//pkg/types", "//pkg/util/context", diff --git a/pkg/sessionctx/stmtctx/stmtctx.go b/pkg/sessionctx/stmtctx/stmtctx.go index 4cefdd7f33752..04f0e0819922a 100644 --- a/pkg/sessionctx/stmtctx/stmtctx.go +++ b/pkg/sessionctx/stmtctx/stmtctx.go @@ -26,12 +26,12 @@ import ( "time" distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" "github.com/pingcap/tidb/pkg/types" contextutil "github.com/pingcap/tidb/pkg/util/context" @@ -260,7 +260,7 @@ type StatementContext struct { // per statement resource group name // hint /* +ResourceGroup(name) */ can change the statement group name ResourceGroupName string - RunawayChecker *resourcegroup.RunawayChecker + RunawayChecker resourcegroup.RunawayChecker IsTiFlash atomic2.Bool RuntimeStatsColl *execdetails.RuntimeStatsColl IndexUsageCollector *indexusage.StmtIndexUsageCollector diff --git a/pkg/sessionctx/variable/BUILD.bazel b/pkg/sessionctx/variable/BUILD.bazel index 6b0ad94c47c8c..d877052715466 100644 --- a/pkg/sessionctx/variable/BUILD.bazel +++ b/pkg/sessionctx/variable/BUILD.bazel @@ -20,7 +20,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/config", - "//pkg/domain/resourcegroup", "//pkg/errno", "//pkg/keyspace", "//pkg/kv", @@ -34,6 +33,7 @@ go_library( "//pkg/parser/types", "//pkg/planner/util/fixcontrol", "//pkg/privilege/privileges/ldap", + "//pkg/resourcegroup", "//pkg/sessionctx/sessionstates", "//pkg/sessionctx/stmtctx", "//pkg/tidb-binlog/pump_client", diff --git a/pkg/sessionctx/variable/session.go b/pkg/sessionctx/variable/session.go index 405e3c12c4c6b..2d1f3579b9fd9 100644 --- a/pkg/sessionctx/variable/session.go +++ b/pkg/sessionctx/variable/session.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser" @@ -45,6 +44,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" ptypes "github.com/pingcap/tidb/pkg/parser/types" + "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/sessionctx/sessionstates" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" pumpcli "github.com/pingcap/tidb/pkg/tidb-binlog/pump_client" diff --git a/pkg/store/copr/BUILD.bazel b/pkg/store/copr/BUILD.bazel index 1ebcfbcff1251..a130fc1f3a303 100644 --- a/pkg/store/copr/BUILD.bazel +++ b/pkg/store/copr/BUILD.bazel @@ -19,11 +19,11 @@ go_library( "//pkg/config", "//pkg/ddl/placement", "//pkg/domain/infosync", - "//pkg/domain/resourcegroup", "//pkg/errno", "//pkg/kv", "//pkg/metrics", "//pkg/parser/terror", + "//pkg/resourcegroup", "//pkg/sessionctx/variable", "//pkg/store/copr/metrics", "//pkg/store/driver/backoff", diff --git a/pkg/store/copr/copr_test/BUILD.bazel b/pkg/store/copr/copr_test/BUILD.bazel index 10f2a807b2623..a0fccf46853c9 100644 --- a/pkg/store/copr/copr_test/BUILD.bazel +++ b/pkg/store/copr/copr_test/BUILD.bazel @@ -11,8 +11,8 @@ go_test( shard_count = 3, deps = [ "//pkg/config", - "//pkg/domain/resourcegroup", "//pkg/kv", + "//pkg/resourcegroup/runaway", "//pkg/store/copr", "//pkg/store/mockstore", "//pkg/testkit/testmain", diff --git a/pkg/store/copr/copr_test/coprocessor_test.go b/pkg/store/copr/copr_test/coprocessor_test.go index 3414bae8bd329..97897c36eb883 100644 --- a/pkg/store/copr/copr_test/coprocessor_test.go +++ b/pkg/store/copr/copr_test/coprocessor_test.go @@ -24,8 +24,8 @@ import ( "github.com/pingcap/kvproto/pkg/meta_storagepb" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/resourcegroup/runaway" "github.com/pingcap/tidb/pkg/store/copr" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/stretchr/testify/require" @@ -260,13 +260,13 @@ func TestBuildCopIteratorWithRunawayChecker(t *testing.T) { ranges := copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z") resourceCtl, err := rmclient.NewResourceGroupController(context.Background(), 1, mockPrivider, nil) require.NoError(t, err) - manager := resourcegroup.NewRunawayManager(resourceCtl, "mock://test") + manager := runaway.NewRunawayManager(resourceCtl, "mock://test", nil, nil, nil, nil) defer manager.Stop() sql := "select * from t" group1 := "rg1" checker := manager.DeriveChecker(group1, sql, "", "", time.Now()) - manager.AddWatch(&resourcegroup.QuarantineRecord{ + manager.AddWatch(&runaway.QuarantineRecord{ ID: 1, ResourceGroupName: group1, Watch: rmpb.RunawayWatchType_Exact, diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index 16a75bf3f82a4..8e0c81fba348d 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -37,11 +37,11 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/domain/infosync" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/kv" tidbmetrics "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/sessionctx/variable" copr_metrics "github.com/pingcap/tidb/pkg/store/copr/metrics" "github.com/pingcap/tidb/pkg/store/driver/backoff" @@ -693,7 +693,7 @@ type copIterator struct { storeBatchedNum atomic.Uint64 storeBatchedFallbackNum atomic.Uint64 - runawayChecker *resourcegroup.RunawayChecker + runawayChecker resourcegroup.RunawayChecker unconsumedStats *unconsumedCopRuntimeStats } @@ -1266,14 +1266,6 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch timeout, getEndPointType(task.storeType), task.storeAddr, ops...) err = derr.ToTiDBErr(err) if worker.req.RunawayChecker != nil { - failpoint.Inject("sleepCoprAfterReq", func(v failpoint.Value) { - //nolint:durationcheck - value := v.(int) - time.Sleep(time.Millisecond * time.Duration(value)) - if value > 50 { - err = errors.Errorf("Coprocessor task terminated due to exceeding the deadline") - } - }) err = worker.req.RunawayChecker.CheckCopRespError(err) } if err != nil { diff --git a/pkg/util/BUILD.bazel b/pkg/util/BUILD.bazel index c45eb7e1a5798..1b20bcd29ea1f 100644 --- a/pkg/util/BUILD.bazel +++ b/pkg/util/BUILD.bazel @@ -27,7 +27,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/config", - "//pkg/domain/resourcegroup", "//pkg/infoschema/context", "//pkg/kv", "//pkg/metrics", @@ -36,6 +35,7 @@ go_library( "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/parser/terror", + "//pkg/resourcegroup", "//pkg/session/cursor", "//pkg/session/txninfo", "//pkg/sessionctx/stmtctx", diff --git a/pkg/util/processinfo.go b/pkg/util/processinfo.go index 6bbe066b0e6ba..7b58618ae72ea 100644 --- a/pkg/util/processinfo.go +++ b/pkg/util/processinfo.go @@ -21,9 +21,9 @@ import ( "strings" "time" - "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/parser/auth" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/session/cursor" "github.com/pingcap/tidb/pkg/session/txninfo" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" @@ -52,7 +52,7 @@ type ProcessInfo struct { RefCountOfStmtCtx *stmtctx.ReferenceCount MemTracker *memory.Tracker DiskTracker *disk.Tracker - RunawayChecker *resourcegroup.RunawayChecker + RunawayChecker resourcegroup.RunawayChecker StatsInfo func(any) map[string]uint64 RuntimeStatsColl *execdetails.RuntimeStatsColl User string