From f795f295e405c1a011743936cb4dee831f8fe546 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 24 Aug 2023 17:25:15 +0800 Subject: [PATCH 01/10] statistics: split lock stats handler and rewrite the insert SQL statistics: update SQL statistics: add tests --- statistics/handle/BUILD.bazel | 4 +- statistics/handle/handle.go | 271 ----------------- statistics/handle/lock_stats_handler.go | 304 +++++++++++++++++++ statistics/handle/lock_stats_handler_test.go | 61 ++++ 4 files changed, 368 insertions(+), 272 deletions(-) create mode 100644 statistics/handle/lock_stats_handler.go create mode 100644 statistics/handle/lock_stats_handler_test.go diff --git a/statistics/handle/BUILD.bazel b/statistics/handle/BUILD.bazel index a6c84ae37647d..5dd26fd1ea5f8 100644 --- a/statistics/handle/BUILD.bazel +++ b/statistics/handle/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "handle.go", "handle_hist.go", "historical_stats_handler.go", + "lock_stats_handler.go", "update.go", ], importpath = "github.com/pingcap/tidb/statistics/handle", @@ -59,13 +60,14 @@ go_test( "dump_test.go", "gc_test.go", "handle_hist_test.go", + "lock_stats_handler_test.go", "main_test.go", "update_list_test.go", ], embed = [":handle"], flaky = True, race = "on", - shard_count = 26, + shard_count = 27, deps = [ "//config", "//domain", diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 5bce85c55ccb6..a96045b65f696 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -54,9 +54,6 @@ import ( ) const ( - // TiDBGlobalStats represents the global-stats for a partitioned table. - TiDBGlobalStats = "global" - // MaxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats MaxPartitionMergeBatchSize = 256 ) @@ -127,274 +124,6 @@ type Handle struct { lease atomic2.Duration } -// GetTableLockedAndClearForTest for unit test only -func (h *Handle) GetTableLockedAndClearForTest() []int64 { - tableLocked := h.tableLocked - h.tableLocked = make([]int64, 0) - return tableLocked -} - -// LoadLockedTables load locked tables from store -func (h *Handle) LoadLockedTables() error { - h.mu.Lock() - defer h.mu.Unlock() - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked") - if err != nil { - return errors.Trace(err) - } - - h.tableLocked = make([]int64, len(rows)) - for i, row := range rows { - h.tableLocked[i] = row.GetInt64(0) - } - - return nil -} - -// AddLockedTables add locked tables id to store -func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { - h.mu.Lock() - defer h.mu.Unlock() - - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - - exec := h.mu.ctx.(sqlexec.SQLExecutor) - - _, err := exec.ExecuteInternal(ctx, "begin pessimistic") - if err != nil { - return "", err - } - - //load tables to check duplicate when insert - rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked") - if err != nil { - return "", err - } - - dupTables := make([]string, 0) - tableLocked := make([]int64, 0) - for _, row := range rows { - tableLocked = append(tableLocked, row.GetInt64(0)) - } - - strTids := fmt.Sprintf("%v", tids) - logutil.BgLogger().Info("lock table ", zap.String("category", "stats"), zap.String("tableIDs", strTids)) - for i, tid := range tids { - _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_table_locked(table_id) select %? from dual where not exists(select table_id from mysql.stats_table_locked where table_id = %?)", tid, tid) - if err != nil { - logutil.BgLogger().Error("error occurred when insert mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - // update handle - if !isTableLocked(tableLocked, tid) { - tableLocked = append(tableLocked, tid) - } else { - dupTables = append(dupTables, tables[i].Schema.L+"."+tables[i].Name.L) - } - } - - //insert related partitions while don't warning duplicate partitions - for _, tid := range pids { - _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_table_locked(table_id) select %? from dual where not exists(select table_id from mysql.stats_table_locked where table_id = %?)", tid, tid) - if err != nil { - logutil.BgLogger().Error("error occurred when insert mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - if !isTableLocked(tableLocked, tid) { - tableLocked = append(tableLocked, tid) - } - } - - err = finishTransaction(ctx, exec, err) - if err != nil { - return "", err - } - // update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated - h.tableLocked = tableLocked - - if len(dupTables) > 0 { - tables := dupTables[0] - for i, table := range dupTables { - if i == 0 { - continue - } - tables += ", " + table - } - var msg string - if len(tids) > 1 { - if len(tids) > len(dupTables) { - msg = "skip locking locked tables: " + tables + ", other tables locked successfully" - } else { - msg = "skip locking locked tables: " + tables - } - } else { - msg = "skip locking locked table: " + tables - } - return msg, err - } - return "", err -} - -// getStatsDeltaFromTableLocked get count, modify_count and version for the given table from mysql.stats_table_locked. -func (h *Handle) getStatsDeltaFromTableLocked(ctx context.Context, tableID int64) (count, modifyCount int64, version uint64, err error) { - rows, _, err := h.execRestrictedSQL(ctx, "select count, modify_count, version from mysql.stats_table_locked where table_id = %?", tableID) - if err != nil { - return 0, 0, 0, err - } - - if len(rows) == 0 { - return 0, 0, 0, nil - } - count = rows[0].GetInt64(0) - modifyCount = rows[0].GetInt64(1) - version = rows[0].GetUint64(2) - return count, modifyCount, version, nil -} - -// RemoveLockedTables remove tables from table locked array -func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { - h.mu.Lock() - defer h.mu.Unlock() - - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - - exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err := exec.ExecuteInternal(ctx, "begin pessimistic") - if err != nil { - return "", err - } - - //load tables to check unlock the unlock table - rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked") - if err != nil { - return "", err - } - - nonlockedTables := make([]string, 0) - tableLocked := make([]int64, 0) - for _, row := range rows { - tableLocked = append(tableLocked, row.GetInt64(0)) - } - - strTids := fmt.Sprintf("%v", tids) - logutil.BgLogger().Info("unlock table ", zap.String("category", "stats"), zap.String("tableIDs", strTids)) - for i, tid := range tids { - // get stats delta during table locked - count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid) - if err != nil { - logutil.BgLogger().Error("error occurred when getStatsDeltaFromTableLocked", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - // update stats_meta with stats delta - _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid) - if err != nil { - logutil.BgLogger().Error("error occurred when update mysql.stats_meta", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - cache.TableRowStatsCache.Invalidate(tid) - - _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid) - if err != nil { - logutil.BgLogger().Error("error occurred when delete from mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - var exist bool - exist, tableLocked = removeIfTableLocked(tableLocked, tid) - if !exist { - nonlockedTables = append(nonlockedTables, tables[i].Schema.L+"."+tables[i].Name.L) - } - } - //delete related partitions while don't warning delete empty partitions - for _, tid := range pids { - // get stats delta during table locked - count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid) - if err != nil { - logutil.BgLogger().Error("error occurred when getStatsDeltaFromTableLocked", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - // update stats_meta with stats delta - _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid) - if err != nil { - logutil.BgLogger().Error("error occurred when update mysql.stats_meta", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - cache.TableRowStatsCache.Invalidate(tid) - - _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid) - if err != nil { - logutil.BgLogger().Error("error occurred when delete from mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - _, tableLocked = removeIfTableLocked(tableLocked, tid) - } - - err = finishTransaction(ctx, exec, err) - if err != nil { - return "", err - } - // update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated - h.tableLocked = tableLocked - - if len(nonlockedTables) > 0 { - tables := nonlockedTables[0] - for i, table := range nonlockedTables { - if i == 0 { - continue - } - tables += ", " + table - } - var msg string - if len(tids) > 1 { - if len(tids) > len(nonlockedTables) { - msg = "skip unlocking non-locked tables: " + tables + ", other tables unlocked successfully" - } else { - msg = "skip unlocking non-locked tables: " + tables - } - } else { - msg = "skip unlocking non-locked table: " + tables - } - return msg, err - } - return "", err -} - -// IsTableLocked check whether table is locked in handle with Handle.Mutex -func (h *Handle) IsTableLocked(tableID int64) bool { - h.mu.RLock() - defer h.mu.RUnlock() - return h.isTableLocked(tableID) -} - -// IsTableLocked check whether table is locked in handle without Handle.Mutex -func (h *Handle) isTableLocked(tableID int64) bool { - return isTableLocked(h.tableLocked, tableID) -} - -// isTableLocked check whether table is locked -func isTableLocked(tableLocked []int64, tableID int64) bool { - return lockTableIndexOf(tableLocked, tableID) > -1 -} - -// lockTableIndexOf get the locked table's index in the array -func lockTableIndexOf(tableLocked []int64, tableID int64) int { - for idx, id := range tableLocked { - if id == tableID { - return idx - } - } - return -1 -} - -// removeIfTableLocked try to remove the table from table locked array -func removeIfTableLocked(tableLocked []int64, tableID int64) (bool, []int64) { - idx := lockTableIndexOf(tableLocked, tableID) - if idx > -1 { - tableLocked = append(tableLocked[:idx], tableLocked[idx+1:]...) - } - return idx > -1, tableLocked -} - func (h *Handle) withRestrictedSQLExecutor(ctx context.Context, fn func(context.Context, sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error)) ([]chunk.Row, []*ast.ResultField, error) { se, err := h.pool.Get() if err != nil { diff --git a/statistics/handle/lock_stats_handler.go b/statistics/handle/lock_stats_handler.go new file mode 100644 index 0000000000000..869cac63486c2 --- /dev/null +++ b/statistics/handle/lock_stats_handler.go @@ -0,0 +1,304 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handle + +import ( + "context" + "fmt" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/statistics/handle/cache" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/sqlexec" + "go.uber.org/zap" +) + +// AddLockedTables add locked tables id to store +func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { + h.mu.Lock() + defer h.mu.Unlock() + + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + exec := h.mu.ctx.(sqlexec.SQLExecutor) + + _, err := exec.ExecuteInternal(ctx, "BEGIN PESSIMISTIC") + if err != nil { + return "", err + } + + // Load tables to check duplicate when inserting. + rows, _, err := h.execRestrictedSQL(ctx, "SELECT table_id FROM mysql.stats_table_locked") + if err != nil { + return "", err + } + + dupTables := make([]string, 0) + tableLocked := make([]int64, 0) + for _, row := range rows { + tableLocked = append(tableLocked, row.GetInt64(0)) + } + + log := logutil.BgLogger().With(zap.String("category", "stats")) + log.Info("lock table", zap.Int64s("tableIDs", tids)) + + // Insert locked tables. + for i, tid := range tids { + if !isTableLocked(tableLocked, tid) { + if err := insertIntoStatsTableLocked(ctx, exec, tid); err != nil { + return "", err + } + tableLocked = append(tableLocked, tid) + } else { + dupTables = append(dupTables, tables[i].Schema.L+"."+tables[i].Name.L) + } + } + + // Insert related partitions while don't warning duplicate partitions. + for _, tid := range pids { + if !isTableLocked(tableLocked, tid) { + if err := insertIntoStatsTableLocked(ctx, exec, tid); err != nil { + return "", err + } + tableLocked = append(tableLocked, tid) + } + } + + // Commit transaction. + err = finishTransaction(ctx, exec, err) + if err != nil { + return "", err + } + + // Update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated. + h.tableLocked = tableLocked + + mag := generateDuplicateTablesMessage(tids, dupTables) + return mag, nil +} + +func generateDuplicateTablesMessage(tids []int64, dupTables []string) string { + if len(dupTables) > 0 { + tables := strings.Join(dupTables, ", ") + var msg string + if len(tids) > 1 { + if len(tids) > len(dupTables) { + msg = "skip locking locked tables: " + tables + ", other tables locked successfully" + } else { + msg = "skip locking locked tables: " + tables + } + } else { + msg = "skip locking locked table: " + tables + } + return msg + } + + return "" +} + +func insertIntoStatsTableLocked(ctx context.Context, exec sqlexec.SQLExecutor, tid int64) error { + _, err := exec.ExecuteInternal(ctx, "INSERT INTO mysql.stats_table_locked (table_id) VALUES (%?) ON DUPLICATE KEY UPDATE table_id = %?", tid, tid) + if err != nil { + logutil.BgLogger().Error("error occurred when insert mysql.stats_table_locked", zap.String("category", "stats"), zap.Error(err)) + return err + } + return nil +} + +// RemoveLockedTables remove tables from table locked array +func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { + h.mu.Lock() + defer h.mu.Unlock() + + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + + exec := h.mu.ctx.(sqlexec.SQLExecutor) + _, err := exec.ExecuteInternal(ctx, "begin pessimistic") + if err != nil { + return "", err + } + + //load tables to check unlock the unlock table + rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked") + if err != nil { + return "", err + } + + nonlockedTables := make([]string, 0) + tableLocked := make([]int64, 0) + for _, row := range rows { + tableLocked = append(tableLocked, row.GetInt64(0)) + } + + strTids := fmt.Sprintf("%v", tids) + logutil.BgLogger().Info("unlock table ", zap.String("category", "stats"), zap.String("tableIDs", strTids)) + for i, tid := range tids { + // get stats delta during table locked + count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid) + if err != nil { + logutil.BgLogger().Error("error occurred when getStatsDeltaFromTableLocked", zap.String("category", "stats"), zap.Error(err)) + return "", err + } + // update stats_meta with stats delta + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid) + if err != nil { + logutil.BgLogger().Error("error occurred when update mysql.stats_meta", zap.String("category", "stats"), zap.Error(err)) + return "", err + } + cache.TableRowStatsCache.Invalidate(tid) + + _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid) + if err != nil { + logutil.BgLogger().Error("error occurred when delete from mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err)) + return "", err + } + var exist bool + exist, tableLocked = removeIfTableLocked(tableLocked, tid) + if !exist { + nonlockedTables = append(nonlockedTables, tables[i].Schema.L+"."+tables[i].Name.L) + } + } + //delete related partitions while don't warning delete empty partitions + for _, tid := range pids { + // get stats delta during table locked + count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid) + if err != nil { + logutil.BgLogger().Error("error occurred when getStatsDeltaFromTableLocked", zap.String("category", "stats"), zap.Error(err)) + return "", err + } + // update stats_meta with stats delta + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid) + if err != nil { + logutil.BgLogger().Error("error occurred when update mysql.stats_meta", zap.String("category", "stats"), zap.Error(err)) + return "", err + } + cache.TableRowStatsCache.Invalidate(tid) + + _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid) + if err != nil { + logutil.BgLogger().Error("error occurred when delete from mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err)) + return "", err + } + _, tableLocked = removeIfTableLocked(tableLocked, tid) + } + + err = finishTransaction(ctx, exec, err) + if err != nil { + return "", err + } + // update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated + h.tableLocked = tableLocked + + if len(nonlockedTables) > 0 { + tables := nonlockedTables[0] + for i, table := range nonlockedTables { + if i == 0 { + continue + } + tables += ", " + table + } + var msg string + if len(tids) > 1 { + if len(tids) > len(nonlockedTables) { + msg = "skip unlocking non-locked tables: " + tables + ", other tables unlocked successfully" + } else { + msg = "skip unlocking non-locked tables: " + tables + } + } else { + msg = "skip unlocking non-locked table: " + tables + } + return msg, err + } + return "", err +} + +// getStatsDeltaFromTableLocked get count, modify_count and version for the given table from mysql.stats_table_locked. +func (h *Handle) getStatsDeltaFromTableLocked(ctx context.Context, tableID int64) (count, modifyCount int64, version uint64, err error) { + rows, _, err := h.execRestrictedSQL(ctx, "select count, modify_count, version from mysql.stats_table_locked where table_id = %?", tableID) + if err != nil { + return 0, 0, 0, err + } + + if len(rows) == 0 { + return 0, 0, 0, nil + } + count = rows[0].GetInt64(0) + modifyCount = rows[0].GetInt64(1) + version = rows[0].GetUint64(2) + return count, modifyCount, version, nil +} + +// LoadLockedTables load locked tables from store +func (h *Handle) LoadLockedTables() error { + h.mu.Lock() + defer h.mu.Unlock() + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked") + if err != nil { + return errors.Trace(err) + } + + h.tableLocked = make([]int64, len(rows)) + for i, row := range rows { + h.tableLocked[i] = row.GetInt64(0) + } + + return nil +} + +// removeIfTableLocked try to remove the table from table locked array +func removeIfTableLocked(tableLocked []int64, tableID int64) (bool, []int64) { + idx := lockTableIndexOf(tableLocked, tableID) + if idx > -1 { + tableLocked = append(tableLocked[:idx], tableLocked[idx+1:]...) + } + return idx > -1, tableLocked +} + +// IsTableLocked check whether table is locked in handle with Handle.Mutex +func (h *Handle) IsTableLocked(tableID int64) bool { + h.mu.RLock() + defer h.mu.RUnlock() + return h.isTableLocked(tableID) +} + +// IsTableLocked check whether table is locked in handle without Handle.Mutex +func (h *Handle) isTableLocked(tableID int64) bool { + return isTableLocked(h.tableLocked, tableID) +} + +// isTableLocked check whether table is locked +func isTableLocked(tableLocked []int64, tableID int64) bool { + return lockTableIndexOf(tableLocked, tableID) > -1 +} + +// lockTableIndexOf get the locked table's index in the array +func lockTableIndexOf(tableLocked []int64, tableID int64) int { + for idx, id := range tableLocked { + if id == tableID { + return idx + } + } + return -1 +} + +// GetTableLockedAndClearForTest for unit test only +func (h *Handle) GetTableLockedAndClearForTest() []int64 { + tableLocked := h.tableLocked + h.tableLocked = make([]int64, 0) + return tableLocked +} diff --git a/statistics/handle/lock_stats_handler_test.go b/statistics/handle/lock_stats_handler_test.go new file mode 100644 index 0000000000000..a943ada718856 --- /dev/null +++ b/statistics/handle/lock_stats_handler_test.go @@ -0,0 +1,61 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handle + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGenerateDuplicateTablesMessage(t *testing.T) { + tests := []struct { + name string + totalTableIDs []int64 + dupTables []string + expectedMsg string + }{ + { + name: "no duplicate tables", + totalTableIDs: []int64{1, 2, 3}, + expectedMsg: "", + }, + { + name: "one duplicate table", + totalTableIDs: []int64{1}, + dupTables: []string{"t1"}, + expectedMsg: "skip locking locked table: t1", + }, + { + name: "multiple duplicate tables", + totalTableIDs: []int64{1, 2, 3, 4}, + dupTables: []string{"t1", "t2", "t3"}, + expectedMsg: "skip locking locked tables: t1, t2, t3, other tables locked successfully", + }, + { + name: "all tables are duplicate", + totalTableIDs: []int64{1, 2, 3, 4}, + dupTables: []string{"t1", "t2", "t3", "t4"}, + expectedMsg: "skip locking locked tables: t1, t2, t3, t4", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + msg := generateDuplicateTablesMessage(tt.totalTableIDs, tt.dupTables) + require.Equal(t, tt.expectedMsg, msg) + }) + } +} From e434f07a213a6773d02db3c388a56e3f0d5dd3ce Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Mon, 28 Aug 2023 15:23:18 +0800 Subject: [PATCH 02/10] statistics: add tests --- .../handletest/statslock/stats_lcok_test.go | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/statistics/handle/handletest/statslock/stats_lcok_test.go b/statistics/handle/handletest/statslock/stats_lcok_test.go index 366f9d1ad3779..328727ff69277 100644 --- a/statistics/handle/handletest/statslock/stats_lcok_test.go +++ b/statistics/handle/handletest/statslock/stats_lcok_test.go @@ -76,6 +76,53 @@ func TestStatsLockAndUnlockTable(t *testing.T) { require.Equal(t, int64(2), tblStats2.RealtimeCount) } +func TestStatsLockAndUnlockTableRepeatedly(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.EnableStatsCacheMemQuota = true + }) + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_analyze_version = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b varchar(10), index idx_b (b))") + tk.MustExec("analyze table test.t") + tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.Nil(t, err) + + handle := domain.GetDomain(tk.Session()).StatsHandle() + tblStats := handle.GetTableStats(tbl.Meta()) + for _, col := range tblStats.Columns { + require.True(t, col.IsStatsInitialized()) + } + tk.MustExec("lock stats t") + + rows := tk.MustQuery("select count(*) from mysql.stats_table_locked").Rows() + num, _ := strconv.Atoi(rows[0][0].(string)) + require.Equal(t, num, 1) + + tk.MustExec("insert into t(a, b) values(1,'a')") + tk.MustExec("insert into t(a, b) values(2,'b')") + + tk.MustExec("analyze table test.t") + tblStats1 := handle.GetTableStats(tbl.Meta()) + require.Equal(t, tblStats, tblStats1) + + // Lock the table again and check the warning. + tk.MustExec("lock stats t") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows( + "Warning 1105 skip locking locked table: test.t", + )) + + tableLocked1 := handle.GetTableLockedAndClearForTest() + err = handle.LoadLockedTables() + require.Nil(t, err) + tableLocked2 := handle.GetTableLockedAndClearForTest() + require.Equal(t, tableLocked1, tableLocked2) +} + func TestStatsLockAndUnlockTables(t *testing.T) { restore := config.RestoreFunc() defer restore() From e6e64fc1ee126696fb24f96e16e73aabc1b76881 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Mon, 28 Aug 2023 15:26:45 +0800 Subject: [PATCH 03/10] statistics: better tests --- statistics/handle/handletest/statslock/BUILD.bazel | 1 + statistics/handle/handletest/statslock/stats_lcok_test.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/statistics/handle/handletest/statslock/BUILD.bazel b/statistics/handle/handletest/statslock/BUILD.bazel index 78bea0e87a71c..92cdb776a0634 100644 --- a/statistics/handle/handletest/statslock/BUILD.bazel +++ b/statistics/handle/handletest/statslock/BUILD.bazel @@ -8,6 +8,7 @@ go_test( "stats_lcok_test.go", ], flaky = True, + shard_count = 3, deps = [ "//config", "//domain", diff --git a/statistics/handle/handletest/statslock/stats_lcok_test.go b/statistics/handle/handletest/statslock/stats_lcok_test.go index 328727ff69277..72e8ee05161db 100644 --- a/statistics/handle/handletest/statslock/stats_lcok_test.go +++ b/statistics/handle/handletest/statslock/stats_lcok_test.go @@ -111,12 +111,12 @@ func TestStatsLockAndUnlockTableRepeatedly(t *testing.T) { require.Equal(t, tblStats, tblStats1) // Lock the table again and check the warning. + tableLocked1 := handle.GetTableLockedAndClearForTest() tk.MustExec("lock stats t") tk.MustQuery("show warnings").Sort().Check(testkit.Rows( "Warning 1105 skip locking locked table: test.t", )) - tableLocked1 := handle.GetTableLockedAndClearForTest() err = handle.LoadLockedTables() require.Nil(t, err) tableLocked2 := handle.GetTableLockedAndClearForTest() From 9fb3780582a4935d56390dd12b924a3b12944148 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Mon, 28 Aug 2023 15:28:46 +0800 Subject: [PATCH 04/10] statistics: better tests --- statistics/handle/handletest/statslock/stats_lcok_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statistics/handle/handletest/statslock/stats_lcok_test.go b/statistics/handle/handletest/statslock/stats_lcok_test.go index 72e8ee05161db..1774b125e176f 100644 --- a/statistics/handle/handletest/statslock/stats_lcok_test.go +++ b/statistics/handle/handletest/statslock/stats_lcok_test.go @@ -76,7 +76,7 @@ func TestStatsLockAndUnlockTable(t *testing.T) { require.Equal(t, int64(2), tblStats2.RealtimeCount) } -func TestStatsLockAndUnlockTableRepeatedly(t *testing.T) { +func TestStatsLockTableRepeatedly(t *testing.T) { restore := config.RestoreFunc() defer restore() config.UpdateGlobal(func(conf *config.Config) { From 19b61b4ac417a93afe4de7b439805112d546f073 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Mon, 28 Aug 2023 16:18:42 +0800 Subject: [PATCH 05/10] Update statistics/handle/lock_stats_handler.go Co-authored-by: Yuanjia Zhang --- statistics/handle/lock_stats_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statistics/handle/lock_stats_handler.go b/statistics/handle/lock_stats_handler.go index 869cac63486c2..c4522bc457b13 100644 --- a/statistics/handle/lock_stats_handler.go +++ b/statistics/handle/lock_stats_handler.go @@ -28,7 +28,7 @@ import ( "go.uber.org/zap" ) -// AddLockedTables add locked tables id to store +// AddLockedTables add locked tables id to store func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { h.mu.Lock() defer h.mu.Unlock() From ccb67d3edde7aafd84996a8659ad677a479e3c64 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Mon, 28 Aug 2023 17:38:31 +0800 Subject: [PATCH 06/10] statistics: use `exec.ExecuteInternal` for AddLockedTables --- executor/lockstats/lock_stats_executor.go | 6 ++++-- statistics/handle/lock_stats_handler.go | 12 ++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/executor/lockstats/lock_stats_executor.go b/executor/lockstats/lock_stats_executor.go index 33a71f4d5eb3f..465fbea4e01e6 100644 --- a/executor/lockstats/lock_stats_executor.go +++ b/executor/lockstats/lock_stats_executor.go @@ -50,12 +50,14 @@ func (e *LockExec) Next(_ context.Context, _ *chunk.Chunk) error { return err } - msg, err := h.AddLockedTables(tids, pids, e.Tables) + sv := e.Ctx().GetSessionVars() + + msg, err := h.AddLockedTables(tids, pids, e.Tables, sv.MaxChunkSize) if err != nil { return err } if msg != "" { - e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg)) + sv.StmtCtx.AppendWarning(errors.New(msg)) } return nil diff --git a/statistics/handle/lock_stats_handler.go b/statistics/handle/lock_stats_handler.go index c4522bc457b13..1d6a3385e1331 100644 --- a/statistics/handle/lock_stats_handler.go +++ b/statistics/handle/lock_stats_handler.go @@ -28,8 +28,8 @@ import ( "go.uber.org/zap" ) -// AddLockedTables add locked tables id to store -func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { +// AddLockedTables add locked tables id to store +func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName, maxChunkSize int) (string, error) { h.mu.Lock() defer h.mu.Unlock() @@ -41,8 +41,12 @@ func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.Table return "", err } - // Load tables to check duplicate when inserting. - rows, _, err := h.execRestrictedSQL(ctx, "SELECT table_id FROM mysql.stats_table_locked") + // Load tables to check duplicate before insert. + recordSet, err := exec.ExecuteInternal(ctx, "SELECT table_id FROM mysql.stats_table_locked") + if err != nil { + return "", err + } + rows, err := sqlexec.DrainRecordSet(ctx, recordSet, maxChunkSize) if err != nil { return "", err } From aac37c105ab6b4fc2785fd5475288fda7699468a Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Mon, 28 Aug 2023 17:40:03 +0800 Subject: [PATCH 07/10] statistics: format --- statistics/handle/lock_stats_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statistics/handle/lock_stats_handler.go b/statistics/handle/lock_stats_handler.go index 1d6a3385e1331..3257df99fa238 100644 --- a/statistics/handle/lock_stats_handler.go +++ b/statistics/handle/lock_stats_handler.go @@ -28,7 +28,7 @@ import ( "go.uber.org/zap" ) -// AddLockedTables add locked tables id to store +// AddLockedTables add locked tables id to store. func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName, maxChunkSize int) (string, error) { h.mu.Lock() defer h.mu.Unlock() From 57f31c7003e59db44dc6a804cc05261f2555c361 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Mon, 28 Aug 2023 17:59:17 +0800 Subject: [PATCH 08/10] statistics: use handle.TiDBGlobalStats --- executor/historical_stats_test.go | 4 ++-- server/handler/optimizor/statistics_handler_test.go | 2 +- statistics/handle/dump.go | 6 +++--- statistics/handle/dump_test.go | 4 ++-- statistics/handle/handle.go | 2 ++ 5 files changed, 10 insertions(+), 8 deletions(-) diff --git a/executor/historical_stats_test.go b/executor/historical_stats_test.go index 973ecac880083..7288a8958b062 100644 --- a/executor/historical_stats_test.go +++ b/executor/historical_stats_test.go @@ -343,7 +343,7 @@ PARTITION p0 VALUES LESS THAN (6) require.NotNil(t, jsTable) // only has p0 stats require.NotNil(t, jsTable.Partitions["p0"]) - require.Nil(t, jsTable.Partitions["global"]) + require.Nil(t, jsTable.Partitions[handle.TiDBGlobalStats]) // change static to dynamic then assert tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") @@ -365,7 +365,7 @@ PARTITION p0 VALUES LESS THAN (6) require.NotNil(t, jsTable) // has both global and p0 stats require.NotNil(t, jsTable.Partitions["p0"]) - require.NotNil(t, jsTable.Partitions["global"]) + require.NotNil(t, jsTable.Partitions[handle.TiDBGlobalStats]) } func TestDumpHistoricalStatsFallback(t *testing.T) { diff --git a/server/handler/optimizor/statistics_handler_test.go b/server/handler/optimizor/statistics_handler_test.go index 7b1966dfd6131..f9cdf89d2e0d5 100644 --- a/server/handler/optimizor/statistics_handler_test.go +++ b/server/handler/optimizor/statistics_handler_test.go @@ -180,7 +180,7 @@ func testDumpPartitionTableStats(t *testing.T, client *testserverclient.TestServ jsonTable := &handle.JSONTable{} err = json.Unmarshal(b, jsonTable) require.NoError(t, err) - require.NotNil(t, jsonTable.Partitions["global"]) + require.NotNil(t, jsonTable.Partitions[handle.TiDBGlobalStats]) require.Len(t, jsonTable.Partitions, expectedLen) } check(false) diff --git a/statistics/handle/dump.go b/statistics/handle/dump.go index 8340ed18d6073..56bd83c9cb626 100644 --- a/statistics/handle/dump.go +++ b/statistics/handle/dump.go @@ -195,7 +195,7 @@ func (h *Handle) DumpHistoricalStatsBySnapshot( } // dump its global-stats if existed if tbl != nil { - jsonTbl.Partitions["global"] = tbl + jsonTbl.Partitions[TiDBGlobalStats] = tbl } return jsonTbl, fallbackTbls, nil } @@ -233,7 +233,7 @@ func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.Table return nil, errors.Trace(err) } if tbl != nil { - jsonTbl.Partitions["global"] = tbl + jsonTbl.Partitions[TiDBGlobalStats] = tbl } return jsonTbl, nil } @@ -396,7 +396,7 @@ func (h *Handle) LoadStatsFromJSON(is infoschema.InfoSchema, jsonTbl *JSONTable) } } // load global-stats if existed - if globalStats, ok := jsonTbl.Partitions["global"]; ok { + if globalStats, ok := jsonTbl.Partitions[TiDBGlobalStats]; ok { if err := h.loadStatsFromJSON(tableInfo, tableInfo.ID, globalStats); err != nil { return errors.Trace(err) } diff --git a/statistics/handle/dump_test.go b/statistics/handle/dump_test.go index bd7d44de26e69..bd56020d6591d 100644 --- a/statistics/handle/dump_test.go +++ b/statistics/handle/dump_test.go @@ -137,7 +137,7 @@ func TestDumpGlobalStats(t *testing.T) { stats := getStatsJSON(t, dom, "test", "t") require.NotNil(t, stats.Partitions["p0"]) require.NotNil(t, stats.Partitions["p1"]) - require.Nil(t, stats.Partitions["global"]) + require.Nil(t, stats.Partitions[handle.TiDBGlobalStats]) // global-stats is existed tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") @@ -145,7 +145,7 @@ func TestDumpGlobalStats(t *testing.T) { stats = getStatsJSON(t, dom, "test", "t") require.NotNil(t, stats.Partitions["p0"]) require.NotNil(t, stats.Partitions["p1"]) - require.NotNil(t, stats.Partitions["global"]) + require.NotNil(t, stats.Partitions[handle.TiDBGlobalStats]) } func TestLoadGlobalStats(t *testing.T) { diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index a96045b65f696..42547598490c4 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -54,6 +54,8 @@ import ( ) const ( + // TiDBGlobalStats represents the global-stats for a partitioned table. + TiDBGlobalStats = "global" // MaxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats MaxPartitionMergeBatchSize = 256 ) From 0df9eda1b27cced00aeb899bba4c3a66704e0022 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 29 Aug 2023 11:18:52 +0800 Subject: [PATCH 09/10] statistics: address comment --- statistics/handle/lock_stats_handler.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/statistics/handle/lock_stats_handler.go b/statistics/handle/lock_stats_handler.go index 3257df99fa238..1b52d1b6810ab 100644 --- a/statistics/handle/lock_stats_handler.go +++ b/statistics/handle/lock_stats_handler.go @@ -28,6 +28,9 @@ import ( "go.uber.org/zap" ) +// Stats logger. +var log = logutil.BgLogger().With(zap.String("category", "stats")) + // AddLockedTables add locked tables id to store. func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName, maxChunkSize int) (string, error) { h.mu.Lock() @@ -51,13 +54,12 @@ func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.Table return "", err } - dupTables := make([]string, 0) - tableLocked := make([]int64, 0) + dupTables := make([]string, 0, len(tables)) + tableLocked := make([]int64, 0, len(rows)) for _, row := range rows { tableLocked = append(tableLocked, row.GetInt64(0)) } - log := logutil.BgLogger().With(zap.String("category", "stats")) log.Info("lock table", zap.Int64s("tableIDs", tids)) // Insert locked tables. From 31d4effdff4341cd1e27486c1317975c41fd2dd0 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 29 Aug 2023 11:23:04 +0800 Subject: [PATCH 10/10] statistics: rename --- statistics/handle/lock_stats_handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/statistics/handle/lock_stats_handler.go b/statistics/handle/lock_stats_handler.go index 1b52d1b6810ab..4db1b60b46ff1 100644 --- a/statistics/handle/lock_stats_handler.go +++ b/statistics/handle/lock_stats_handler.go @@ -29,7 +29,7 @@ import ( ) // Stats logger. -var log = logutil.BgLogger().With(zap.String("category", "stats")) +var statsLogger = logutil.BgLogger().With(zap.String("category", "stats")) // AddLockedTables add locked tables id to store. func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName, maxChunkSize int) (string, error) { @@ -60,7 +60,7 @@ func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.Table tableLocked = append(tableLocked, row.GetInt64(0)) } - log.Info("lock table", zap.Int64s("tableIDs", tids)) + statsLogger.Info("lock table", zap.Int64s("tableIDs", tids)) // Insert locked tables. for i, tid := range tids {