Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: fix data race in the Handle.IsTableLocked #40572

Merged
merged 13 commits into from
Jan 17, 2023
4 changes: 2 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
tableID = task.idxIncrementalExec.tableID
}
// skip locked tables
if !statsHandle.IsTableLocked(tableID.TableID) {
if !statsHandle.IsTableLocked(tableID.TableID, true) {
tasks = append(tasks, task)
}
// generate warning message
Expand All @@ -115,7 +115,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}
//avoid generate duplicate tables
if !dup {
if statsHandle.IsTableLocked(tableID.TableID) {
if statsHandle.IsTableLocked(tableID.TableID, true) {
tbl, ok := is.TableByID(tableID.TableID)
if !ok {
return nil
Expand Down
6 changes: 3 additions & 3 deletions executor/show_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,19 @@ func (e *ShowExec) fetchShowStatsLocked() error {
if pi != nil {
partitionName = "global"
}
if h.IsTableLocked(tbl.ID) {
if h.IsTableLocked(tbl.ID, true) {
e.appendTableForStatsLocked(db.Name.O, tbl.Name.O, partitionName)
}
if pi != nil {
for _, def := range pi.Definitions {
if h.IsTableLocked(def.ID) {
if h.IsTableLocked(def.ID, true) {
e.appendTableForStatsLocked(db.Name.O, tbl.Name.O, def.Name.O)
}
}
}
} else {
for _, def := range pi.Definitions {
if h.IsTableLocked(def.ID) {
if h.IsTableLocked(def.ID, true) {
e.appendTableForStatsLocked(db.Name.O, tbl.Name.O, def.Name.O)
}
}
Expand Down
2 changes: 2 additions & 0 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_library(
"//util/memory",
"//util/ranger",
"//util/sqlexec",
"//util/syncutil",
"//util/timeutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
Expand Down Expand Up @@ -72,6 +73,7 @@ go_test(
],
embed = [":handle"],
flaky = True,
race = "on",
shard_count = 50,
deps = [
"//config",
Expand Down
9 changes: 7 additions & 2 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/syncutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
atomic2 "go.uber.org/atomic"
Expand All @@ -66,7 +67,7 @@ const (
// Handle can update stats info periodically.
type Handle struct {
mu struct {
sync.RWMutex
syncutil.RWMutex
ctx sessionctx.Context
// rateMap contains the error rate delta from feedback.
rateMap errorRateDeltaMap
Expand Down Expand Up @@ -354,7 +355,11 @@ func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.Ta
}

// IsTableLocked check whether table is locked in handle
func (h *Handle) IsTableLocked(tableID int64) bool {
func (h *Handle) IsTableLocked(tableID int64, isLock bool) bool {
if isLock {
h.mu.RLock()
defer h.mu.RUnlock()
}
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
return isTableLocked(h.tableLocked, tableID)
}

Expand Down
7 changes: 4 additions & 3 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (s *SessionStatsCollector) StoreQueryFeedback(feedback interface{}, h *Hand
}

// if table locked, skip
if h.IsTableLocked(q.PhysicalID) {
if h.IsTableLocked(q.PhysicalID, true) {
return nil
}

Expand Down Expand Up @@ -549,7 +549,8 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up
startTS := txn.StartTS()
updateStatsMeta := func(id int64) error {
var err error
if h.IsTableLocked(id) {
// This lock is already locked on it so it's isLock is set to false.
if h.IsTableLocked(id, false) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not locking is because it is already locked in previous code and Mutex is not a reentrant lock? If so, could you add some comments here and above the method defination?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutex is not a reentrant lock. I have enabled deadlock in the Handle. so that if I set lock true, we will meet deadlock.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add some commnets.

if delta.Delta < 0 {
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_table_locked set version = %?, count = count - %?, modify_count = modify_count + %? where table_id = %? and count >= %?", startTS, -delta.Delta, delta.Count, id, -delta.Delta)
} else {
Expand Down Expand Up @@ -1110,7 +1111,7 @@ func (h *Handle) HandleAutoAnalyze(is infoschema.InfoSchema) (analyzed bool) {
})
for _, tbl := range tbls {
//if table locked, skip analyze
if h.IsTableLocked(tbl.Meta().ID) {
if h.IsTableLocked(tbl.Meta().ID, true) {
continue
}
tblInfo := tbl.Meta()
Expand Down