Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: split lock stats handler and rewrite the insert SQL #46403

Merged
merged 10 commits into from
Aug 29, 2023
4 changes: 2 additions & 2 deletions executor/historical_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ PARTITION p0 VALUES LESS THAN (6)
require.NotNil(t, jsTable)
// only has p0 stats
require.NotNil(t, jsTable.Partitions["p0"])
require.Nil(t, jsTable.Partitions["global"])
require.Nil(t, jsTable.Partitions[handle.TiDBGlobalStats])

// change static to dynamic then assert
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
Expand All @@ -365,7 +365,7 @@ PARTITION p0 VALUES LESS THAN (6)
require.NotNil(t, jsTable)
// has both global and p0 stats
require.NotNil(t, jsTable.Partitions["p0"])
require.NotNil(t, jsTable.Partitions["global"])
require.NotNil(t, jsTable.Partitions[handle.TiDBGlobalStats])
}

func TestDumpHistoricalStatsFallback(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions executor/lockstats/lock_stats_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ func (e *LockExec) Next(_ context.Context, _ *chunk.Chunk) error {
return err
}

msg, err := h.AddLockedTables(tids, pids, e.Tables)
sv := e.Ctx().GetSessionVars()

msg, err := h.AddLockedTables(tids, pids, e.Tables, sv.MaxChunkSize)
if err != nil {
return err
}
if msg != "" {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
sv.StmtCtx.AppendWarning(errors.New(msg))
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion server/handler/optimizor/statistics_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func testDumpPartitionTableStats(t *testing.T, client *testserverclient.TestServ
jsonTable := &handle.JSONTable{}
err = json.Unmarshal(b, jsonTable)
require.NoError(t, err)
require.NotNil(t, jsonTable.Partitions["global"])
require.NotNil(t, jsonTable.Partitions[handle.TiDBGlobalStats])
require.Len(t, jsonTable.Partitions, expectedLen)
}
check(false)
Expand Down
4 changes: 3 additions & 1 deletion statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"handle.go",
"handle_hist.go",
"historical_stats_handler.go",
"lock_stats_handler.go",
"update.go",
],
importpath = "github.com/pingcap/tidb/statistics/handle",
Expand Down Expand Up @@ -59,13 +60,14 @@ go_test(
"dump_test.go",
"gc_test.go",
"handle_hist_test.go",
"lock_stats_handler_test.go",
"main_test.go",
"update_list_test.go",
],
embed = [":handle"],
flaky = True,
race = "on",
shard_count = 26,
shard_count = 27,
deps = [
"//config",
"//domain",
Expand Down
6 changes: 3 additions & 3 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (h *Handle) DumpHistoricalStatsBySnapshot(
}
// dump its global-stats if existed
if tbl != nil {
jsonTbl.Partitions["global"] = tbl
jsonTbl.Partitions[TiDBGlobalStats] = tbl
}
return jsonTbl, fallbackTbls, nil
}
Expand Down Expand Up @@ -233,7 +233,7 @@ func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.Table
return nil, errors.Trace(err)
}
if tbl != nil {
jsonTbl.Partitions["global"] = tbl
jsonTbl.Partitions[TiDBGlobalStats] = tbl
}
return jsonTbl, nil
}
Expand Down Expand Up @@ -396,7 +396,7 @@ func (h *Handle) LoadStatsFromJSON(is infoschema.InfoSchema, jsonTbl *JSONTable)
}
}
// load global-stats if existed
if globalStats, ok := jsonTbl.Partitions["global"]; ok {
if globalStats, ok := jsonTbl.Partitions[TiDBGlobalStats]; ok {
if err := h.loadStatsFromJSON(tableInfo, tableInfo.ID, globalStats); err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ func TestDumpGlobalStats(t *testing.T) {
stats := getStatsJSON(t, dom, "test", "t")
require.NotNil(t, stats.Partitions["p0"])
require.NotNil(t, stats.Partitions["p1"])
require.Nil(t, stats.Partitions["global"])
require.Nil(t, stats.Partitions[handle.TiDBGlobalStats])

// global-stats is existed
tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'")
tk.MustExec("analyze table t")
stats = getStatsJSON(t, dom, "test", "t")
require.NotNil(t, stats.Partitions["p0"])
require.NotNil(t, stats.Partitions["p1"])
require.NotNil(t, stats.Partitions["global"])
require.NotNil(t, stats.Partitions[handle.TiDBGlobalStats])
}

func TestLoadGlobalStats(t *testing.T) {
Expand Down
269 changes: 0 additions & 269 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import (
const (
// TiDBGlobalStats represents the global-stats for a partitioned table.
TiDBGlobalStats = "global"

// MaxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats
MaxPartitionMergeBatchSize = 256
)
Expand Down Expand Up @@ -127,274 +126,6 @@ type Handle struct {
lease atomic2.Duration
}

// GetTableLockedAndClearForTest for unit test only
func (h *Handle) GetTableLockedAndClearForTest() []int64 {
tableLocked := h.tableLocked
h.tableLocked = make([]int64, 0)
return tableLocked
}

// LoadLockedTables load locked tables from store
func (h *Handle) LoadLockedTables() error {
h.mu.Lock()
defer h.mu.Unlock()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked")
if err != nil {
return errors.Trace(err)
}

h.tableLocked = make([]int64, len(rows))
for i, row := range rows {
h.tableLocked[i] = row.GetInt64(0)
}

return nil
}

// AddLockedTables add locked tables id to store
func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) {
h.mu.Lock()
defer h.mu.Unlock()

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)

exec := h.mu.ctx.(sqlexec.SQLExecutor)

_, err := exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return "", err
}

//load tables to check duplicate when insert
rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked")
if err != nil {
return "", err
}

dupTables := make([]string, 0)
tableLocked := make([]int64, 0)
for _, row := range rows {
tableLocked = append(tableLocked, row.GetInt64(0))
}

strTids := fmt.Sprintf("%v", tids)
logutil.BgLogger().Info("lock table ", zap.String("category", "stats"), zap.String("tableIDs", strTids))
for i, tid := range tids {
_, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_table_locked(table_id) select %? from dual where not exists(select table_id from mysql.stats_table_locked where table_id = %?)", tid, tid)
if err != nil {
logutil.BgLogger().Error("error occurred when insert mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err))
return "", err
}
// update handle
if !isTableLocked(tableLocked, tid) {
tableLocked = append(tableLocked, tid)
} else {
dupTables = append(dupTables, tables[i].Schema.L+"."+tables[i].Name.L)
}
}

//insert related partitions while don't warning duplicate partitions
for _, tid := range pids {
_, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_table_locked(table_id) select %? from dual where not exists(select table_id from mysql.stats_table_locked where table_id = %?)", tid, tid)
if err != nil {
logutil.BgLogger().Error("error occurred when insert mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err))
return "", err
}
if !isTableLocked(tableLocked, tid) {
tableLocked = append(tableLocked, tid)
}
}

err = finishTransaction(ctx, exec, err)
if err != nil {
return "", err
}
// update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated
h.tableLocked = tableLocked

if len(dupTables) > 0 {
tables := dupTables[0]
for i, table := range dupTables {
if i == 0 {
continue
}
tables += ", " + table
}
var msg string
if len(tids) > 1 {
if len(tids) > len(dupTables) {
msg = "skip locking locked tables: " + tables + ", other tables locked successfully"
} else {
msg = "skip locking locked tables: " + tables
}
} else {
msg = "skip locking locked table: " + tables
}
return msg, err
}
return "", err
}

// getStatsDeltaFromTableLocked get count, modify_count and version for the given table from mysql.stats_table_locked.
func (h *Handle) getStatsDeltaFromTableLocked(ctx context.Context, tableID int64) (count, modifyCount int64, version uint64, err error) {
rows, _, err := h.execRestrictedSQL(ctx, "select count, modify_count, version from mysql.stats_table_locked where table_id = %?", tableID)
if err != nil {
return 0, 0, 0, err
}

if len(rows) == 0 {
return 0, 0, 0, nil
}
count = rows[0].GetInt64(0)
modifyCount = rows[0].GetInt64(1)
version = rows[0].GetUint64(2)
return count, modifyCount, version, nil
}

// RemoveLockedTables remove tables from table locked array
func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) {
h.mu.Lock()
defer h.mu.Unlock()

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)

exec := h.mu.ctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return "", err
}

//load tables to check unlock the unlock table
rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked")
if err != nil {
return "", err
}

nonlockedTables := make([]string, 0)
tableLocked := make([]int64, 0)
for _, row := range rows {
tableLocked = append(tableLocked, row.GetInt64(0))
}

strTids := fmt.Sprintf("%v", tids)
logutil.BgLogger().Info("unlock table ", zap.String("category", "stats"), zap.String("tableIDs", strTids))
for i, tid := range tids {
// get stats delta during table locked
count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid)
if err != nil {
logutil.BgLogger().Error("error occurred when getStatsDeltaFromTableLocked", zap.String("category", "stats"), zap.Error(err))
return "", err
}
// update stats_meta with stats delta
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid)
if err != nil {
logutil.BgLogger().Error("error occurred when update mysql.stats_meta", zap.String("category", "stats"), zap.Error(err))
return "", err
}
cache.TableRowStatsCache.Invalidate(tid)

_, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid)
if err != nil {
logutil.BgLogger().Error("error occurred when delete from mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err))
return "", err
}
var exist bool
exist, tableLocked = removeIfTableLocked(tableLocked, tid)
if !exist {
nonlockedTables = append(nonlockedTables, tables[i].Schema.L+"."+tables[i].Name.L)
}
}
//delete related partitions while don't warning delete empty partitions
for _, tid := range pids {
// get stats delta during table locked
count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid)
if err != nil {
logutil.BgLogger().Error("error occurred when getStatsDeltaFromTableLocked", zap.String("category", "stats"), zap.Error(err))
return "", err
}
// update stats_meta with stats delta
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid)
if err != nil {
logutil.BgLogger().Error("error occurred when update mysql.stats_meta", zap.String("category", "stats"), zap.Error(err))
return "", err
}
cache.TableRowStatsCache.Invalidate(tid)

_, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid)
if err != nil {
logutil.BgLogger().Error("error occurred when delete from mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err))
return "", err
}
_, tableLocked = removeIfTableLocked(tableLocked, tid)
}

err = finishTransaction(ctx, exec, err)
if err != nil {
return "", err
}
// update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated
h.tableLocked = tableLocked

if len(nonlockedTables) > 0 {
tables := nonlockedTables[0]
for i, table := range nonlockedTables {
if i == 0 {
continue
}
tables += ", " + table
}
var msg string
if len(tids) > 1 {
if len(tids) > len(nonlockedTables) {
msg = "skip unlocking non-locked tables: " + tables + ", other tables unlocked successfully"
} else {
msg = "skip unlocking non-locked tables: " + tables
}
} else {
msg = "skip unlocking non-locked table: " + tables
}
return msg, err
}
return "", err
}

// IsTableLocked check whether table is locked in handle with Handle.Mutex
func (h *Handle) IsTableLocked(tableID int64) bool {
h.mu.RLock()
defer h.mu.RUnlock()
return h.isTableLocked(tableID)
}

// IsTableLocked check whether table is locked in handle without Handle.Mutex
func (h *Handle) isTableLocked(tableID int64) bool {
return isTableLocked(h.tableLocked, tableID)
}

// isTableLocked check whether table is locked
func isTableLocked(tableLocked []int64, tableID int64) bool {
return lockTableIndexOf(tableLocked, tableID) > -1
}

// lockTableIndexOf get the locked table's index in the array
func lockTableIndexOf(tableLocked []int64, tableID int64) int {
for idx, id := range tableLocked {
if id == tableID {
return idx
}
}
return -1
}

// removeIfTableLocked try to remove the table from table locked array
func removeIfTableLocked(tableLocked []int64, tableID int64) (bool, []int64) {
idx := lockTableIndexOf(tableLocked, tableID)
if idx > -1 {
tableLocked = append(tableLocked[:idx], tableLocked[idx+1:]...)
}
return idx > -1, tableLocked
}

func (h *Handle) withRestrictedSQLExecutor(ctx context.Context, fn func(context.Context, sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error)) ([]chunk.Row, []*ast.ResultField, error) {
se, err := h.pool.Get()
if err != nil {
Expand Down
Loading