From 9dff1e161b4eaa9658c3d7c76700be7eac566edc Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 2 Mar 2022 14:02:40 +0800 Subject: [PATCH] *: support small cached table feature on 5.4 (#32703) --- ddl/db_cache_test.go | 6 +- ddl/placement_policy_ddl_test.go | 2 +- domain/domain.go | 25 +- domain/sysvar_cache.go | 9 + executor/adapter.go | 4 + executor/builder.go | 95 +++++- executor/distsql.go | 30 +- executor/executor.go | 15 + executor/index_merge_reader.go | 5 + executor/slow_query_test.go | 2 +- executor/table_reader.go | 15 +- executor/union_scan_test.go | 63 ++++ expression/integration_serial_test.go | 22 +- infoschema/builder.go | 14 +- infoschema/infoschema_test.go | 6 +- metrics/metrics.go | 1 + metrics/server.go | 9 + planner/core/common_plans.go | 14 - planner/core/exhaust_physical_plans.go | 9 +- planner/core/integration_test.go | 36 +++ planner/core/logical_plan_builder.go | 42 +-- planner/core/logical_plans.go | 4 - planner/core/physical_plans.go | 2 - planner/core/prepare_test.go | 71 +++++ planner/core/stats.go | 5 +- planner/optimize.go | 20 +- session/session.go | 4 +- session/session_test.go | 13 +- sessionctx/variable/sysvar.go | 37 ++- sessionctx/variable/sysvar_test.go | 37 +++ sessionctx/variable/tidb_vars.go | 10 + sessionctx/variable/varsutil_test.go | 4 + table/table.go | 7 +- table/tables/cache.go | 114 ++++--- table/tables/cache_test.go | 419 +++++++++++++++++-------- table/tables/state_remote.go | 83 +++-- table/tables/state_remote_test.go | 18 +- tests/readonlytest/go.mod | 2 +- tests/readonlytest/go.sum | 59 ++++ tests/readonlytest/readonly_test.go | 111 +++++-- 40 files changed, 1045 insertions(+), 399 deletions(-) diff --git a/ddl/db_cache_test.go b/ddl/db_cache_test.go index c1f591869e3b9..1adbe69272f5d 100644 --- a/ddl/db_cache_test.go +++ b/ddl/db_cache_test.go @@ -240,10 +240,14 @@ func TestCacheTableSizeLimit(t *testing.T) { require.NoError(t, err) } + lastReadFromCache := func(tk *testkit.TestKit) bool { + return tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache + } + cached := false for i := 0; i < 200; i++ { tk.MustQuery("select count(*) from (select * from cache_t2 limit 1) t1").Check(testkit.Rows("1")) - if tk.HasPlan("select * from cache_t2", "UnionScan") { + if lastReadFromCache(tk) { cached = true break } diff --git a/ddl/placement_policy_ddl_test.go b/ddl/placement_policy_ddl_test.go index 28efb3bae3e3c..c75891d90b349 100644 --- a/ddl/placement_policy_ddl_test.go +++ b/ddl/placement_policy_ddl_test.go @@ -119,7 +119,7 @@ func (s *testDDLSuite) TestPlacementPolicyInUse(c *C) { t4.State = model.StatePublic db1.Tables = append(db1.Tables, t4) - builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos( + builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos( []*model.DBInfo{db1, db2, dbP}, nil, []*model.PolicyInfo{p1, p2, p3, p4, p5}, diff --git a/domain/domain.go b/domain/domain.go index 58f46588ac0aa..4d47435bba959 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -95,7 +95,6 @@ type Domain struct { serverID uint64 serverIDSession *concurrency.Session isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false. - renewLeaseCh chan func() // It is used to call the renewLease function of the cache table. onClose func() sysExecutorFactory func(*Domain) (pools.Resource, error) } @@ -162,7 +161,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, currentSchemaVersion, nil, err } - newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) + newISBuilder, err := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion) if err != nil { return nil, false, currentSchemaVersion, nil, err } @@ -284,7 +283,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest()) + builder := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest()) phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { @@ -729,7 +728,6 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio indexUsageSyncLease: idxUsageSyncLease, planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease}, onClose: onClose, - renewLeaseCh: make(chan func(), 10), expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp), } @@ -856,10 +854,9 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain) // Local store needs to get the change information for every DDL state in each session. go do.loadSchemaInLoop(ctx, ddlLease) } - do.wg.Add(4) + do.wg.Add(3) go do.topNSlowQueryLoop() go do.infoSyncerKeeper() - go do.renewLease() go do.globalConfigSyncerKeeper() if !skipRegisterToDashboard { do.wg.Add(1) @@ -1777,22 +1774,6 @@ func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) { do.infoCache.Insert(is, 0) } -func (do *Domain) renewLease() { - defer func() { - do.wg.Done() - logutil.BgLogger().Info("renew lease goroutine exited.") - }() - for { - select { - case <-do.exit: - close(do.renewLeaseCh) - return - case op := <-do.renewLeaseCh: - op() - } - } -} - func init() { initByLDFlagsForGlobalKill() } diff --git a/domain/sysvar_cache.go b/domain/sysvar_cache.go index 125b1fdecbf52..64480e325b529 100644 --- a/domain/sysvar_cache.go +++ b/domain/sysvar_cache.go @@ -230,6 +230,8 @@ func (do *Domain) checkEnableServerGlobalVar(name, sVal string) { break } topsqlstate.GlobalState.ReportIntervalSeconds.Store(val) + case variable.TiDBSuperReadOnly: + variable.VarTiDBSuperReadOnly.Store(variable.TiDBOptOn(sVal)) case variable.TiDBRestrictedReadOnly: variable.RestrictedReadOnly.Store(variable.TiDBOptOn(sVal)) case variable.TiDBStoreLimit: @@ -239,6 +241,13 @@ func (do *Domain) checkEnableServerGlobalVar(name, sVal string) { break } storekv.StoreLimit.Store(val) + case variable.TiDBTableCacheLease: + var val int64 + val, err = strconv.ParseInt(sVal, 10, 64) + if err != nil { + break + } + variable.TableCacheLease.Store(val) case variable.TiDBPersistAnalyzeOptions: variable.PersistAnalyzeOptions.Store(variable.TiDBOptOn(sVal)) case variable.TiDBEnableColumnTracking: diff --git a/executor/adapter.go b/executor/adapter.go index fa96d82ea5e84..955f0b83fca75 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -966,6 +966,10 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo sessVars.DurationParse = 0 // Clean the stale read flag when statement execution finish sessVars.StmtCtx.IsStaleness = false + + if sessVars.StmtCtx.ReadFromTableCache { + metrics.ReadFromTableCacheCounter.Inc() + } } // CloseRecordSet will finish the execution of current statement and do some record work diff --git a/executor/builder.go b/executor/builder.go index a718577cea424..0242b9070cd38 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" - "github.com/pingcap/log" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor/aggfuncs" @@ -46,6 +45,7 @@ import ( plannerutil "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/table" @@ -1067,7 +1067,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco return x } us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), reader)} - us.cacheTable = v.CacheTable // Get the handle column index of the below Plan. us.belowHandleCols = v.HandleCols us.mutableRow = chunk.MutRowFromTypes(retTypes(us)) @@ -1083,6 +1082,13 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco us.collators = append(us.collators, collate.GetCollator(tp.Collate)) } + startTS, err := b.getSnapshotTS() + sessionVars := b.ctx.GetSessionVars() + if err != nil { + b.err = err + return nil + } + switch x := reader.(type) { case *TableReaderExecutor: us.desc = x.desc @@ -1090,6 +1096,23 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco us.columns = x.columns us.table = x.table us.virtualColumnIndex = x.virtualColumnIndex + tbl := x.Table() + if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable { + cachedTable := tbl.(table.CachedTable) + leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second + // Determine whether the cache can be used. + cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration) + if cacheData != nil { + sessionVars.StmtCtx.ReadFromTableCache = true + x.dummy = true + us.cacheTable = cacheData + } else { + if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt { + store := b.ctx.GetStore() + cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration) + } + } + } case *IndexReaderExecutor: us.desc = x.desc for _, ic := range x.index.Columns { @@ -1103,6 +1126,24 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions) us.columns = x.columns us.table = x.table + + tbl := x.Table() + if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable { + cachedTable := tbl.(table.CachedTable) + // Determine whether the cache can be used. + leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second + cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration) + if cacheData != nil { + sessionVars.StmtCtx.ReadFromTableCache = true + x.dummy = true + us.cacheTable = cacheData + } else { + if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt { + store := b.ctx.GetStore() + cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration) + } + } + } case *IndexLookUpExecutor: us.desc = x.desc for _, ic := range x.index.Columns { @@ -1117,6 +1158,24 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco us.columns = x.columns us.table = x.table us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns) + + tbl := x.Table() + if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable { + cachedTable := tbl.(table.CachedTable) + leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second + // Determine whether the cache can be used. + cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration) + if cacheData != nil { + sessionVars.StmtCtx.ReadFromTableCache = true + x.dummy = true + us.cacheTable = cacheData + } else { + if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt { + store := b.ctx.GetStore() + cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration) + } + } + } case *IndexMergeReaderExecutor: // IndexMergeReader doesn't care order for now. So we will not set desc and useIndex. us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions) @@ -3138,6 +3197,10 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E return nil } + if ret.table.Meta().TempTableType != model.TempTableNone { + ret.dummy = true + } + ret.ranges = ts.Ranges sctx := b.ctx.GetSessionVars().StmtCtx sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID) @@ -3365,6 +3428,10 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) E return nil } + if ret.table.Meta().TempTableType != model.TempTableNone { + ret.dummy = true + } + ret.ranges = is.Ranges sctx := b.ctx.GetSessionVars().StmtCtx sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O) @@ -3530,6 +3597,10 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo return nil } + if ret.table.Meta().TempTableType != model.TempTableNone { + ret.dummy = true + } + ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) ret.ranges = is.Ranges @@ -4782,25 +4853,15 @@ func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64 b.err = errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(b.ctx.GetSessionVars().CurrentDB, tblInfo.Name)) return nil } - cacheData := tbl.(table.CachedTable).TryReadFromCache(startTS) + sessVars := b.ctx.GetSessionVars() + leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second + cacheData := tbl.(table.CachedTable).TryReadFromCache(startTS, leaseDuration) if cacheData != nil { - b.ctx.GetSessionVars().StmtCtx.ReadFromTableCache = true + sessVars.StmtCtx.ReadFromTableCache = true return cacheData } if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt { - go func() { - defer func() { - if r := recover(); r != nil { - logutil.BgLogger().Error("panic in the recoverable goroutine", - zap.Reflect("r", r), - zap.Stack("stack trace")) - } - }() - err := tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS) - if err != nil { - log.Warn("Update Lock Info Error") - } - }() + tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS, leaseDuration) } return nil } diff --git a/executor/distsql.go b/executor/distsql.go index 734074eef8baf..da474928258f6 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -194,11 +194,20 @@ type IndexReaderExecutor struct { memTracker *memory.Tracker selectResultHook // for testing + + // If dummy flag is set, this is not a real IndexReader, it just provides the KV ranges for UnionScan. + // Used by the temporary table, cached table. + dummy bool +} + +// Table implements the dataSourceExecutor interface. +func (e *IndexReaderExecutor) Table() table.Table { + return e.table } // Close clears all resources hold by current object. func (e *IndexReaderExecutor) Close() (err error) { - if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { + if e.dummy { return nil } @@ -212,7 +221,7 @@ func (e *IndexReaderExecutor) Close() (err error) { // Next implements the Executor Next interface. func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { - if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { + if e.dummy { req.Reset() return nil } @@ -282,7 +291,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) // Treat temporary table as dummy table, avoid sending distsql request to TiKV. // In a test case IndexReaderExecutor is mocked and e.table is nil. // Avoid sending distsql request to TIKV. - if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { + if e.dummy { return nil } @@ -380,6 +389,10 @@ type IndexLookUpExecutor struct { // cancelFunc is called when close the executor cancelFunc context.CancelFunc + + // If dummy flag is set, this is not a real IndexLookUpReader, it just provides the KV ranges for UnionScan. + // Used by the temporary table, cached table. + dummy bool } type getHandleType int8 @@ -395,6 +408,11 @@ type checkIndexValue struct { idxTblCols []*table.Column } +// Table implements the dataSourceExecutor interface. +func (e *IndexLookUpExecutor) Table() table.Table { + return e.table +} + // Open implements the Executor Open interface. func (e *IndexLookUpExecutor) Open(ctx context.Context) error { var err error @@ -411,7 +429,7 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { } // Treat temporary table as dummy table, avoid sending distsql request to TiKV. - if e.table.Meta().TempTableType != model.TempTableNone { + if e.dummy { return nil } @@ -677,7 +695,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup // Close implements Exec Close interface. func (e *IndexLookUpExecutor) Close() error { - if e.table.Meta().TempTableType != model.TempTableNone { + if e.dummy { return nil } @@ -705,7 +723,7 @@ func (e *IndexLookUpExecutor) Close() error { // Next implements Exec Next interface. func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.Chunk) error { - if e.table.Meta().TempTableType != model.TempTableNone { + if e.dummy { req.Reset() return nil } diff --git a/executor/executor.go b/executor/executor.go index 163588e55b6d9..497d01e484172 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -102,6 +102,21 @@ var ( GlobalDiskUsageTracker *disk.Tracker ) +var ( + _ dataSourceExecutor = &TableReaderExecutor{} + _ dataSourceExecutor = &IndexReaderExecutor{} + _ dataSourceExecutor = &IndexLookUpExecutor{} + _ dataSourceExecutor = &IndexMergeReaderExecutor{} +) + +// dataSourceExecutor is a table DataSource converted Executor. +// Currently, there are TableReader/IndexReader/IndexLookUp/IndexMergeReader. +// Note, partition reader is special and the caller should handle it carefully. +type dataSourceExecutor interface { + Executor + Table() table.Table +} + type baseExecutor struct { ctx sessionctx.Context id int diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index a712e02f580bf..66dde5b0e743a 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -118,6 +118,11 @@ type IndexMergeReaderExecutor struct { isCorColInPartialAccess []bool } +// Table implements the dataSourceExecutor interface. +func (e *IndexMergeReaderExecutor) Table() table.Table { + return e.table +} + // Open implements the Executor Open interface func (e *IndexMergeReaderExecutor) Open(ctx context.Context) (err error) { e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans)) diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 9828263402ac1..3c6a2fd2d484a 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -54,7 +54,7 @@ func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bu } func newSlowQueryRetriever() (*slowQueryRetriever, error) { - newISBuilder, err := infoschema.NewBuilder(nil, nil, nil).InitWithDBInfos(nil, nil, nil, 0) + newISBuilder, err := infoschema.NewBuilder(nil, nil).InitWithDBInfos(nil, nil, nil, 0) if err != nil { return nil, err } diff --git a/executor/table_reader.go b/executor/table_reader.go index 958b8cc442061..fb654281bcbb7 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -111,6 +111,10 @@ type TableReaderExecutor struct { // extraPIDColumnIndex is used for partition reader to add an extra partition ID column. extraPIDColumnIndex offsetOptional + + // If dummy flag is set, this is not a real TableReader, it just provides the KV ranges for UnionScan. + // Used by the temporary table, cached table. + dummy bool } // offsetOptional may be a positive integer, or invalid. @@ -128,6 +132,11 @@ func (i offsetOptional) value() int { return int(i - 1) } +// Table implements the dataSourceExecutor interface. +func (e *TableReaderExecutor) Table() table.Table { + return e.table +} + // Open initializes necessary variables for using this executor. func (e *TableReaderExecutor) Open(ctx context.Context) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { @@ -180,7 +189,7 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { // Treat temporary table as dummy table, avoid sending distsql request to TiKV. // Calculate the kv ranges here, UnionScan rely on this kv ranges. // cached table and temporary table are similar - if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { + if e.dummy { kvReq, err := e.buildKVReq(ctx, firstPartRanges) if err != nil { return err @@ -218,7 +227,7 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { // Next fills data into the chunk passed by its caller. // The task was actually done by tableReaderHandler. func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { - if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { + if e.dummy { // Treat temporary table as dummy table, avoid sending distsql request to TiKV. req.Reset() return nil @@ -263,7 +272,7 @@ func fillExtraPIDColumn(req *chunk.Chunk, extraPIDColumnIndex int, physicalID in // Close implements the Executor Close interface. func (e *TableReaderExecutor) Close() error { - if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { + if e.dummy { return nil } diff --git a/executor/union_scan_test.go b/executor/union_scan_test.go index 6133010ec29da..6f1fd994a5268 100644 --- a/executor/union_scan_test.go +++ b/executor/union_scan_test.go @@ -17,6 +17,7 @@ package executor_test import ( "fmt" "testing" + "time" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" @@ -416,3 +417,65 @@ func TestForApplyAndUnionScan(t *testing.T) { tk.MustQuery("select c_int, c_str from t where (select count(*) from t1 where t1.c_int in (t.c_int, t.c_int + 2, t.c_int + 10)) > 2").Check(testkit.Rows()) tk.MustExec("rollback") } + +func TestIssue32422(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + + tk.MustExec("create table t (id int, c int, index(id));") + tk.MustExec("insert into t values (3,3), (4,4), (5,5);") + tk.MustExec("alter table t cache;") + + var cacheUsed bool + for i := 0; i < 20; i++ { + tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4")) + if tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache { + cacheUsed = true + break + } + time.Sleep(50 * time.Millisecond) + } + require.True(t, cacheUsed) + + tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4")) + + // Some extra tests. + // Since cached table use UnionScanExec utilities, check what happens when they work together. + // In these cases, the cache data serve as the snapshot, tikv is skipped, and txn membuffer works the same way. + tk.MustExec("begin") + tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4")) + tk.MustExec("insert into t values (6, 6)") + // Check for the new added data. + tk.HasPlan("select id+1, c from t where c = 6;", "UnionScan") + tk.MustQuery("select id+1, c from t where c = 6;").Check(testkit.Rows("7 6")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + // Check for the old data. + tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + + // Point get + tk.HasPlan("select id+1, c from t where id = 6", "PointGet") + tk.MustQuery("select id+1, c from t where id = 6").Check(testkit.Rows("7 6")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + tk.MustQuery("select id+1, c from t where id = 4").Check(testkit.Rows("5 4")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + + // Index Lookup + tk.HasPlan("select id+1, c from t where id = 6", "IndexLookUp") + tk.MustQuery("select id+1, c from t use index(id) where id = 6").Check(testkit.Rows("7 6")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + tk.MustQuery("select id+1, c from t use index(id) where id = 4").Check(testkit.Rows("5 4")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + + // Index Reader + tk.HasPlan("select id from t where id = 6", "IndexReader") + tk.MustQuery("select id from t use index(id) where id = 6").Check(testkit.Rows("6")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + tk.MustQuery("select id from t use index(id) where id = 4").Check(testkit.Rows("4")) + require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache) + + tk.MustExec("rollback") +} diff --git a/expression/integration_serial_test.go b/expression/integration_serial_test.go index d43a5342d8b23..f6d134da36c22 100644 --- a/expression/integration_serial_test.go +++ b/expression/integration_serial_test.go @@ -3890,7 +3890,7 @@ func TestPreparePlanCache(t *testing.T) { tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("1")) } -func TestPreparePlanCacheNotForCacheTable(t *testing.T) { +func TestPreparePlanCacheOnCachedTable(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() @@ -3911,28 +3911,24 @@ func TestPreparePlanCacheNotForCacheTable(t *testing.T) { tk.MustExec("create table t(a int);") tk.MustExec("alter table t cache") - var useCache bool + var readFromTableCache bool for i := 0; i < 50; i++ { tk.MustQuery("select * from t where a = 1") - if tk.HasPlan("select * from t where a = 1", "Union") { - useCache = true + if tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache { + readFromTableCache = true + break } } - require.True(t, useCache) + require.True(t, readFromTableCache) // already read cache after reading first time - tk.MustQuery("explain format = 'brief' select * from t where a = 1").Check(testkit.Rows( - "Projection 10.00 root test.t.a", - "└─UnionScan 10.00 root eq(test.t.a, 1)", - " └─TableReader 10.00 root data:Selection", - " └─Selection 10.00 cop[tikv] eq(test.t.a, 1)", - " └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo")) - tk.MustExec("prepare stmt from 'select * from t where a = ?';") tk.MustExec("set @a = 1;") tk.MustExec("execute stmt using @a;") tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0")) tk.MustExec("execute stmt using @a;") - tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0")) + readFromTableCache = tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache + require.True(t, readFromTableCache) + tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("1")) } func TestIssue16205(t *testing.T) { diff --git a/infoschema/builder.go b/infoschema/builder.go index 12b8807f84385..7085fccc8bd06 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -48,9 +48,8 @@ type Builder struct { // TODO: store is only used by autoid allocators // detach allocators from storage, use passed transaction in the feature store kv.Storage - // TODO: renewLeaseCh is only used to pass data between table and domain - renewLeaseCh chan func() - factory func() (pools.Resource, error) + + factory func() (pools.Resource, error) } // ApplyDiff applies SchemaDiff to the new InfoSchema. @@ -694,7 +693,7 @@ func (b *Builder) tableFromMeta(alloc autoid.Allocators, tblInfo *model.TableInf return nil, errors.Trace(err) } - err = t.Init(b.renewLeaseCh, tmp.(sqlexec.SQLExecutor)) + err = t.Init(tmp.(sqlexec.SQLExecutor)) if err != nil { return nil, errors.Trace(err) } @@ -738,7 +737,7 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) } // NewBuilder creates a new Builder with a Handle. -func NewBuilder(store kv.Storage, renewCh chan func(), factory func() (pools.Resource, error)) *Builder { +func NewBuilder(store kv.Storage, factory func() (pools.Resource, error)) *Builder { return &Builder{ store: store, is: &infoSchema{ @@ -747,9 +746,8 @@ func NewBuilder(store kv.Storage, renewCh chan func(), factory func() (pools.Res ruleBundleMap: map[string]*placement.Bundle{}, sortedTablesBuckets: make([]sortedTables, bucketCount), }, - dirtyDB: make(map[string]bool), - renewLeaseCh: renewCh, - factory: factory, + dirtyDB: make(map[string]bool), + factory: factory, } } diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index f3adc34ed7a15..29befd88cd720 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -107,7 +107,7 @@ func TestBasic(t *testing.T) { }) require.NoError(t, err) - builder, err := infoschema.NewBuilder(dom.Store(), nil, nil).InitWithDBInfos(dbInfos, nil, nil, 1) + builder, err := infoschema.NewBuilder(dom.Store(), nil).InitWithDBInfos(dbInfos, nil, nil, 1) require.NoError(t, err) txn, err := store.Begin() @@ -253,7 +253,7 @@ func TestInfoTables(t *testing.T) { require.NoError(t, err) }() - builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(nil, nil, nil, 0) + builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) is := builder.Build() @@ -318,7 +318,7 @@ func TestGetBundle(t *testing.T) { require.NoError(t, err) }() - builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(nil, nil, nil, 0) + builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) is := builder.Build() diff --git a/metrics/metrics.go b/metrics/metrics.go index b24eb7b152750..47f1d0a49b9f5 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -162,6 +162,7 @@ func RegisterMetrics() { prometheus.MustRegister(TopSQLReportDataHistogram) prometheus.MustRegister(PDApiExecutionHistogram) prometheus.MustRegister(CPUProfileCounter) + prometheus.MustRegister(ReadFromTableCacheCounter) tikvmetrics.InitMetrics(TiDB, TiKVClient) tikvmetrics.RegisterMetrics() diff --git a/metrics/server.go b/metrics/server.go index 440a833e6f03e..661f8c05fba73 100644 --- a/metrics/server.go +++ b/metrics/server.go @@ -129,6 +129,15 @@ var ( Help: "Counter of query using plan cache.", }, []string{LblType}) + ReadFromTableCacheCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "read_from_tablecache_total", + Help: "Counter of query read from table cache.", + }, + ) + HandShakeErrorCounter = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "tidb", diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 39d361ee4045c..00b6c8ed7e28b 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -427,20 +427,6 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context, sessVars := sctx.GetSessionVars() stmtCtx := sessVars.StmtCtx prepared := preparedStmt.PreparedAst - if prepared.UseCache { - // disable the cache if cache table in prepared statement - for _, vInfo := range preparedStmt.VisitInfos { - tbl, err := is.TableByName(model.NewCIStr(vInfo.db), model.NewCIStr(vInfo.table)) - // if table does not exist, skip it, maybe it is a `create table` statement - if err != nil { - continue - } - if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable { - prepared.UseCache = false - break - } - } - } stmtCtx.UseCache = prepared.UseCache var bindSQL string diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index db9c16e722d78..1e31c2d516e4e 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/planner/util" @@ -51,7 +52,6 @@ func (p *LogicalUnionScan) exhaustPhysicalPlans(prop *property.PhysicalProperty) us := PhysicalUnionScan{ Conditions: p.conditions, HandleCols: p.handleCols, - CacheTable: p.cacheTable, }.Init(p.ctx, p.stats, p.blockOffset, childProp) return []PhysicalPlan{us}, true, nil } @@ -2349,6 +2349,13 @@ func (p *baseLogicalPlan) canPushToCopImpl(storeTp kv.StoreType, considerDual bo if (isTopN || isLimit) && considerIndexMerge { return false // TopN and Limit cannot be pushed down to IndexMerge } + if c.tableInfo.TableCacheStatusType != model.TableCacheStatusDisable { + // Don't push to cop for cached table, it brings more harm than good: + // 1. Those tables are small enough, push to cop can't utilize several TiKV to accelerate computation. + // 2. Cached table use UnionScan to read the cache data, and push to cop is not supported when an UnionScan exists. + // Once aggregation is pushed to cop, the cache data can't be use any more. + return false + } case *LogicalUnionAll: if storeTp == kv.TiFlash { ret = ret && c.canPushToCopImpl(storeTp, true) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 769e9782ace39..4fafc6ebd6221 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -5224,5 +5224,41 @@ func (s *testIntegrationSuite) TestIssue31202(c *C) { tk.MustQuery("explain format = 'brief' select * from t31202 use index (primary);").Check(testkit.Rows( "TableReader 10000.00 root data:TableFullScan", "└─TableFullScan 10000.00 cop[tikv] table:t31202 keep order:false, stats:pseudo")) + tk.MustExec("drop table if exists t31202") } + +func (s *testIntegrationSuite) TestAggPushToCopForCachedTable(c *C) { + store, _ := s.store, s.dom + tk := testkit.NewTestKit(c, store) + + tk.MustExec("use test") + tk.MustExec(`create table t32157( + process_code varchar(8) NOT NULL, + ctrl_class varchar(2) NOT NULL, + ctrl_type varchar(1) NOT NULL, + oper_no varchar(12) DEFAULT NULL, + modify_date datetime DEFAULT NULL, + d_c_flag varchar(2) NOT NULL, + PRIMARY KEY (process_code,ctrl_class,d_c_flag));`) + tk.MustExec("insert into t32157 values ('GDEP0071', '05', '1', '10000', '2016-06-29 00:00:00', 'C')") + tk.MustExec("insert into t32157 values ('GDEP0071', '05', '0', '0000', '2016-06-01 00:00:00', 'D')") + tk.MustExec("alter table t32157 cache") + + tk.MustQuery("explain format = 'brief' select /*+AGG_TO_COP()*/ count(*) from t32157 ignore index(primary) where process_code = 'GDEP0071'").Check(testkit.Rows( + "StreamAgg 1.00 root funcs:count(1)->Column#8]\n" + + "[└─UnionScan 10.00 root eq(test.t32157.process_code, \"GDEP0071\")]\n" + + "[ └─TableReader 10.00 root data:Selection]\n" + + "[ └─Selection 10.00 cop[tikv] eq(test.t32157.process_code, \"GDEP0071\")]\n" + + "[ └─TableFullScan 10000.00 cop[tikv] table:t32157 keep order:false, stats:pseudo")) + + var readFromCacheNoPanic bool + for i := 0; i < 10; i++ { + tk.MustQuery("select /*+AGG_TO_COP()*/ count(*) from t32157 ignore index(primary) where process_code = 'GDEP0071'").Check(testkit.Rows("2")) + if tk.Se.GetSessionVars().StmtCtx.ReadFromTableCache { + readFromCacheNoPanic = true + break + } + } + c.Assert(readFromCacheNoPanic, IsTrue) +} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index a67dc8da7808a..e7950661d6e0a 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -27,7 +27,6 @@ import ( "github.com/cznic/mathutil" "github.com/pingcap/errors" - "github.com/pingcap/log" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" @@ -58,8 +57,6 @@ import ( "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/set" - "go.uber.org/zap" - "golang.org/x/sync/singleflight" ) const ( @@ -4181,46 +4178,11 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as var result LogicalPlan = ds dirty := tableHasDirtyContent(b.ctx, tableInfo) - if dirty || tableInfo.TempTableType == model.TempTableLocal { + if dirty || tableInfo.TempTableType == model.TempTableLocal || tableInfo.TableCacheStatusType == model.TableCacheStatusEnable { us := LogicalUnionScan{handleCols: handleCols}.Init(b.ctx, b.getSelectOffset()) us.SetChildren(ds) result = us } - // If a table is a cache table, it is judged whether it satisfies the conditions of read cache. - if tableInfo.TableCacheStatusType == model.TableCacheStatusEnable && b.ctx.GetSessionVars().SnapshotTS == 0 && !b.ctx.GetSessionVars().StmtCtx.IsStaleness { - cachedTable := tbl.(table.CachedTable) - txn, err := b.ctx.Txn(true) - if err != nil { - return nil, err - } - // Use the TS of the transaction to determine whether the cache can be used. - cacheData := cachedTable.TryReadFromCache(txn.StartTS()) - if cacheData != nil { - sessionVars.StmtCtx.ReadFromTableCache = true - us := LogicalUnionScan{handleCols: handleCols, cacheTable: cacheData}.Init(b.ctx, b.getSelectOffset()) - us.SetChildren(ds) - result = us - } else { - if !b.inUpdateStmt && !b.inDeleteStmt && !sessionVars.StmtCtx.InExplainStmt { - startTS := txn.StartTS() - store := b.ctx.GetStore() - go func() { - defer func() { - if r := recover(); r != nil { - } - }() - _, err, _ := sf.Do(fmt.Sprintf("%d", tableInfo.ID), func() (interface{}, error) { - err := cachedTable.UpdateLockForRead(ctx, store, startTS) - if err != nil { - log.Warn("Update Lock Info Error", zap.Error(err)) - } - return nil, nil - }) - terror.Log(err) - }() - } - } - } if sessionVars.StmtCtx.TblInfo2UnionScan == nil { sessionVars.StmtCtx.TblInfo2UnionScan = make(map[*model.TableInfo]bool) @@ -4244,8 +4206,6 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as return result, nil } -var sf singleflight.Group - func (b *PlanBuilder) timeRangeForSummaryTable() QueryTimeRange { const defaultSummaryDuration = 30 * time.Minute hints := b.TableHints() diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index c322f7e9489c1..784a1f3770e4c 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/model" @@ -527,9 +526,6 @@ type LogicalUnionScan struct { conditions []expression.Expression handleCols HandleCols - - // cacheTable not nil means it's reading from cached table. - cacheTable kv.MemBuffer } // DataSource represents a tableScan without condition push down. diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 6293bba4b5073..3e3fb05cd60f6 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -1186,8 +1186,6 @@ type PhysicalUnionScan struct { Conditions []expression.Expression HandleCols HandleCols - - CacheTable kv.MemBuffer } // ExtractCorrelatedCols implements PhysicalPlan interface. diff --git a/planner/core/prepare_test.go b/planner/core/prepare_test.go index 6b336384ef8da..9654642f1244c 100644 --- a/planner/core/prepare_test.go +++ b/planner/core/prepare_test.go @@ -2695,3 +2695,74 @@ func (s *testPlanSerialSuite) TestPartitionWithVariedDatasources(c *C) { } } } + +func (s *testPlanSerialSuite) TestCachedTable(c *C) { + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + tk := testkit.NewTestKit(c, store) + orgEnable := core.PreparedPlanCacheEnabled() + defer func() { + dom.Close() + c.Assert(store.Close(), IsNil) + core.SetPreparedPlanCache(orgEnable) + }() + core.SetPreparedPlanCache(true) + + tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{ + PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64), + }) + c.Assert(err, IsNil) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + + tk.MustExec("create table t (a int, b int, index i_b(b))") + tk.MustExec("insert into t values (1, 1), (2, 2)") + tk.MustExec("alter table t cache") + + tk.MustExec("prepare tableScan from 'select * from t where a>=?'") + tk.MustExec("prepare indexScan from 'select b from t use index(i_b) where b>?'") + tk.MustExec("prepare indexLookup from 'select a from t use index(i_b) where b>? and b= 0 && distance <= (1500*time.Millisecond) { - c.renewCh <- c.renewLease(ts, RenewReadLease, data) + + var triggerFailpoint bool + failpoint.Inject("mockRenewLeaseABA1", func(_ failpoint.Value) { + triggerFailpoint = true + }) + + if distance >= 0 && distance <= leaseDuration/2 || triggerFailpoint { + select { + case c.renewReadLease <- struct{}{}: + go c.renewLease(ts, data, data.Lease, leaseDuration) + default: + } } return data.MemBuffer } @@ -101,15 +102,15 @@ func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { // newCachedTable creates a new CachedTable Instance func newCachedTable(tbl *TableCommon) (table.Table, error) { ret := &cachedTable{ - TableCommon: *tbl, + TableCommon: *tbl, + renewReadLease: make(chan struct{}, 1), } return ret, nil } // Init is an extra operation for cachedTable after TableFromMeta, // Because cachedTable need some additional parameter that can't be passed in TableFromMeta. -func (c *cachedTable) Init(renewCh chan func(), exec sqlexec.SQLExecutor) error { - c.renewCh = renewCh +func (c *cachedTable) Init(exec sqlexec.SQLExecutor) error { raw, ok := exec.(sqlExec) if !ok { return errors.New("Need sqlExec rather than sqlexec.SQLExecutor") @@ -163,18 +164,37 @@ func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) return buffer, startTS, totalSize, nil } -func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64) error { +func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, ts uint64, leaseDuration time.Duration) { + select { + case c.renewReadLease <- struct{}{}: + go c.updateLockForRead(ctx, store, ts, leaseDuration) + default: + // There is a inflight calling already. + } +} + +func (c *cachedTable) updateLockForRead(ctx context.Context, store kv.Storage, ts uint64, leaseDuration time.Duration) { + defer func() { + if r := recover(); r != nil { + log.Error("panic in the recoverable goroutine", + zap.Reflect("r", r), + zap.Stack("stack trace")) + } + <-c.renewReadLease + }() + // Load data from original table and the update lock information. tid := c.Meta().ID - lease := leaseFromTS(ts) + lease := leaseFromTS(ts, leaseDuration) succ, err := c.handle.LockForRead(ctx, tid, lease) if err != nil { - return errors.Trace(err) + log.Warn("lock cached table for read", zap.Error(err)) + return } if succ { mb, startTS, totalSize, err := c.loadDataFromOriginalTable(store, lease) if err != nil { - return errors.Trace(err) + return } c.cacheData.Store(&cacheData{ @@ -185,7 +205,6 @@ func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, t atomic.StoreInt64(&c.totalSize, totalSize) } // Current status is not suitable to cache. - return nil } const cachedTableSizeLimit = 64 * (1 << 20) @@ -225,20 +244,31 @@ func (c *cachedTable) RemoveRecord(sctx sessionctx.Context, h kv.Handle, r []typ return c.TableCommon.RemoveRecord(sctx, h, r) } -func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) func() { - return func() { - tid := c.Meta().ID - lease := leaseFromTS(ts) - succ, err := c.handle.RenewLease(context.Background(), tid, lease, op) - if err != nil { - log.Warn("Renew read lease error", zap.Error(err)) - } - if succ { - c.cacheData.Store(&cacheData{ - Start: data.Start, - Lease: lease, - MemBuffer: data.MemBuffer, - }) - } +// TestMockRenewLeaseABA2 is used by test function TestRenewLeaseABAFailPoint. +var TestMockRenewLeaseABA2 chan struct{} + +func (c *cachedTable) renewLease(ts uint64, data *cacheData, oldLease uint64, leaseDuration time.Duration) { + defer func() { <-c.renewReadLease }() + + failpoint.Inject("mockRenewLeaseABA2", func(_ failpoint.Value) { + <-TestMockRenewLeaseABA2 + }) + + tid := c.Meta().ID + lease := leaseFromTS(ts, leaseDuration) + newLease, err := c.handle.RenewReadLease(context.Background(), tid, oldLease, lease) + if err != nil && !kv.IsTxnRetryableError(err) { + log.Warn("Renew read lease error", zap.Error(err)) } + if newLease > 0 { + c.cacheData.Store(&cacheData{ + Start: data.Start, + Lease: newLease, + MemBuffer: data.MemBuffer, + }) + } + + failpoint.Inject("mockRenewLeaseABA2", func(_ failpoint.Value) { + TestMockRenewLeaseABA2 <- struct{}{} + }) } diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index 8bf41abb3110c..f2d7670cf5979 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -19,15 +19,23 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/stmtsummary" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) +func lastReadFromCache(tk *testkit.TestKit) bool { + return tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache +} + func TestCacheTableBasicScan(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() @@ -40,114 +48,87 @@ func TestCacheTableBasicScan(t *testing.T) { "(10, 110, 1010), (12, 112, 1012), (14, 114, 1014), (16, 116, 1016), (18, 118, 1018)", ) tk.MustExec("alter table tmp1 cache") - assertSelect := func() { - // For TableReader - // First read will read from original table - tk.MustQuery("select * from tmp1 where id>3 order by id").Check(testkit.Rows( + + // For TableReader + // First read will read from original table + tk.MustQuery("select * from tmp1 where id>3 order by id").Check(testkit.Rows( + "5 105 1005", "7 117 1007", "9 109 1009", + "10 110 1010", "12 112 1012", "14 114 1014", "16 116 1016", "18 118 1018", + )) + // Test for join two cache table + tk.MustExec("drop table if exists join_t1, join_t2, join_t3") + tk.MustExec("create table join_t1 (id int)") + tk.MustExec("insert into join_t1 values(1)") + tk.MustExec("alter table join_t1 cache") + tk.MustQuery("select *from join_t1").Check(testkit.Rows("1")) + tk.MustExec("create table join_t2 (id int)") + tk.MustExec("insert into join_t2 values(2)") + tk.MustExec("alter table join_t2 cache") + tk.MustQuery("select *from join_t2").Check(testkit.Rows("2")) + tk.MustExec("create table join_t3 (id int)") + tk.MustExec("insert into join_t3 values(3)") + planUsed := false + for i := 0; i < 10; i++ { + tk.MustQuery("select *from join_t1 join join_t2").Check(testkit.Rows("1 2")) + if lastReadFromCache(tk) { + planUsed = true + break + } + } + require.True(t, planUsed) + + // Test for join a cache table and a normal table + for i := 0; i < 10; i++ { + tk.MustQuery("select * from join_t1 join join_t3").Check(testkit.Rows("1 3")) + if lastReadFromCache(tk) { + // if tk.HasPlan("select *from join_t1 join join_t3", "UnionScan") { + planUsed = true + break + } + } + require.True(t, planUsed) + + // Second read will from cache table + for i := 0; i < 100; i++ { + tk.MustQuery("select * from tmp1 where id>4 order by id").Check(testkit.Rows( "5 105 1005", "7 117 1007", "9 109 1009", "10 110 1010", "12 112 1012", "14 114 1014", "16 116 1016", "18 118 1018", )) - // Test for join two cache table - tk.MustExec("drop table if exists join_t1, join_t2, join_t3") - tk.MustExec("create table join_t1 (id int)") - tk.MustExec("insert into join_t1 values(1)") - tk.MustExec("alter table join_t1 cache") - tk.MustQuery("select *from join_t1").Check(testkit.Rows("1")) - tk.MustExec("create table join_t2 (id int)") - tk.MustExec("insert into join_t2 values(2)") - tk.MustExec("alter table join_t2 cache") - tk.MustQuery("select *from join_t2").Check(testkit.Rows("2")) - tk.MustExec("create table join_t3 (id int)") - tk.MustExec("insert into join_t3 values(3)") - planUsed := false - for i := 0; i < 10; i++ { - tk.MustQuery("select *from join_t1 join join_t2").Check(testkit.Rows("1 2")) - if tk.HasPlan("select *from join_t1 join join_t2", "UnionScan") { - planUsed = true - break - } - } - require.True(t, planUsed) - result := tk.MustQuery("explain format = 'brief' select *from join_t1 join join_t2") - result.Check(testkit.Rows( - "HashJoin 100000000.00 root CARTESIAN inner join", - "├─UnionScan(Build) 10000.00 root ", - "│ └─TableReader 10000.00 root data:TableFullScan", - "│ └─TableFullScan 10000.00 cop[tikv] table:join_t2 keep order:false, stats:pseudo", - "└─UnionScan(Probe) 10000.00 root ", - " └─TableReader 10000.00 root data:TableFullScan", - " └─TableFullScan 10000.00 cop[tikv] table:join_t1 keep order:false, stats:pseudo")) - // Test for join a cache table and a normal table - for i := 0; i < 10; i++ { - tk.MustQuery("select *from join_t1 join join_t3").Check(testkit.Rows("1 3")) - if tk.HasPlan("select *from join_t1 join join_t3", "UnionScan") { - planUsed = true - break - } - } - require.True(t, planUsed) - result = tk.MustQuery("explain format = 'brief' select *from join_t1 join join_t3") - result.Check(testkit.Rows( - "Projection 100000000.00 root test.join_t1.id, test.join_t3.id", - "└─HashJoin 100000000.00 root CARTESIAN inner join", - " ├─UnionScan(Build) 10000.00 root ", - " │ └─TableReader 10000.00 root data:TableFullScan", - " │ └─TableFullScan 10000.00 cop[tikv] table:join_t1 keep order:false, stats:pseudo", - " └─TableReader(Probe) 10000.00 root data:TableFullScan", - " └─TableFullScan 10000.00 cop[tikv] table:join_t3 keep order:false, stats:pseudo")) - - // Second read will from cache table - for i := 0; i < 100; i++ { - tk.MustQuery("select * from tmp1 where id>4 order by id").Check(testkit.Rows( - "5 105 1005", "7 117 1007", "9 109 1009", - "10 110 1010", "12 112 1012", "14 114 1014", "16 116 1016", "18 118 1018", - )) - if tk.HasPlan("select * from tmp1 where id>4 order by id", "UnionScan") { - planUsed = true - break - } - } - require.True(t, planUsed) - result = tk.MustQuery("explain format = 'brief' select * from tmp1 where id>4 order by id") - result.Check(testkit.Rows("UnionScan 3333.33 root gt(test.tmp1.id, 4)", - "└─TableReader 3333.33 root data:TableRangeScan", - " └─TableRangeScan 3333.33 cop[tikv] table:tmp1 range:(4,+inf], keep order:true, stats:pseudo")) - // For IndexLookUpReader - for i := 0; i < 10; i++ { - tk.MustQuery("select /*+ use_index(tmp1, u) */ * from tmp1 where u>101 order by u").Check(testkit.Rows( - "5 105 1005", "9 109 1009", "10 110 1010", - "12 112 1012", "3 113 1003", "14 114 1014", "16 116 1016", "7 117 1007", "18 118 1018", - )) - if tk.HasPlan("select /*+ use_index(tmp1, u) */ * from tmp1 where u>101 order by u", "UnionScan") { - planUsed = true - break - } + if lastReadFromCache(tk) { + // if tk.HasPlan("select * from tmp1 where id>4 order by id", "UnionScan") { + planUsed = true + break } - require.True(t, planUsed) - result = tk.MustQuery("explain format = 'brief' select /*+ use_index(tmp1, u) */ * from tmp1 where u>101 order by u") - result.Check(testkit.Rows("UnionScan 3333.33 root gt(test.tmp1.u, 101)", - "└─IndexLookUp 3333.33 root ", - " ├─IndexRangeScan(Build) 3333.33 cop[tikv] table:tmp1, index:u(u) range:(101,+inf], keep order:true, stats:pseudo", - " └─TableRowIDScan(Probe) 3333.33 cop[tikv] table:tmp1 keep order:false, stats:pseudo")) - tk.MustQuery("show warnings").Check(testkit.Rows()) - - // For IndexReader - tk.MustQuery("select /*+ use_index(tmp1, u) */ id,u from tmp1 where u>101 order by id").Check(testkit.Rows( - "3 113", "5 105", "7 117", "9 109", "10 110", - "12 112", "14 114", "16 116", "18 118", - )) - tk.MustQuery("show warnings").Check(testkit.Rows()) + } + require.True(t, planUsed) - // For IndexMerge, cache table should not use index merge - tk.MustQuery("select /*+ use_index_merge(tmp1, primary, u) */ * from tmp1 where id>5 or u>110 order by u").Check(testkit.Rows( - "9 109 1009", "10 110 1010", + // For IndexLookUpReader + for i := 0; i < 10; i++ { + tk.MustQuery("select /*+ use_index(tmp1, u) */ * from tmp1 where u>101 order by u").Check(testkit.Rows( + "5 105 1005", "9 109 1009", "10 110 1010", "12 112 1012", "3 113 1003", "14 114 1014", "16 116 1016", "7 117 1007", "18 118 1018", )) - - tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 IndexMerge is inapplicable or disabled. Cannot use IndexMerge on TableCache.")) + if lastReadFromCache(tk) { + planUsed = true + break + } } - assertSelect() - + require.True(t, planUsed) + + // For IndexReader + tk.MustQuery("select /*+ use_index(tmp1, u) */ id,u from tmp1 where u>101 order by id").Check(testkit.Rows( + "3 113", "5 105", "7 117", "9 109", "10 110", + "12 112", "14 114", "16 116", "18 118", + )) + tk.MustQuery("show warnings").Check(testkit.Rows()) + + // For IndexMerge, cache table should not use index merge + tk.MustQuery("select /*+ use_index_merge(tmp1, primary, u) */ * from tmp1 where id>5 or u>110 order by u").Check(testkit.Rows( + "9 109 1009", "10 110 1010", + "12 112 1012", "3 113 1003", "14 114 1014", "16 116 1016", "7 117 1007", "18 118 1018", + )) + + tk.MustQuery("show warnings").Check(testkit.Rows()) } func TestCacheCondition(t *testing.T) { @@ -160,44 +141,51 @@ func TestCacheCondition(t *testing.T) { tk.MustExec("alter table t2 cache") // Explain should not trigger cache. - tk.MustQuery("explain select * from t2") for i := 0; i < 10; i++ { + tk.MustQuery("explain select * from t2") time.Sleep(100 * time.Millisecond) - require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) + require.False(t, lastReadFromCache(tk)) } // Insert should not trigger cache. tk.MustExec("insert into t2 values (1,1)") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) - require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) + require.False(t, lastReadFromCache(tk)) } // Update should not trigger cache. tk.MustExec("update t2 set v = v + 1 where id > 0") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) - require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) + require.False(t, lastReadFromCache(tk)) } // Contains PointGet Update should not trigger cache. tk.MustExec("update t2 set v = v + 1 where id = 2") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) - require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) + require.False(t, lastReadFromCache(tk)) } // Contains PointGet Delete should not trigger cache. tk.MustExec("delete from t2 where id = 2") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) - require.False(t, tk.HasPlan("select * from t2 where id>0", "UnionScan")) + require.False(t, lastReadFromCache(tk)) } // Normal query should trigger cache. tk.MustQuery("select * from t2") - for !tk.HasPlan("select * from t2 where id>0", "UnionScan") { - tk.MustExec("select * from t2") + cacheUsed := false + for i := 0; i < 100; i++ { + tk.MustQuery("select * from t2") + if lastReadFromCache(tk) { + cacheUsed = true + break + } + time.Sleep(100 * time.Millisecond) } + require.True(t, cacheUsed) } func TestCacheTableBasicReadAndWrite(t *testing.T) { @@ -215,31 +203,38 @@ func TestCacheTableBasicReadAndWrite(t *testing.T) { tk.MustExec("alter table write_tmp1 cache") // Read and add read lock - tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", - "3 113 1003")) + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "3 113 1003")) // read lock should valid var i int for i = 0; i < 10; i++ { - if tk.HasPlan("select * from write_tmp1", "UnionScan") { + if lastReadFromCache(tk) { break } // Wait for the cache to be loaded. time.Sleep(50 * time.Millisecond) + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "3 113 1003")) } require.True(t, i < 10) tk.MustExec("use test") tk1.MustExec("insert into write_tmp1 values (2, 222, 222)") // write lock exists - require.False(t, tk.HasPlan("select * from write_tmp1", "UnionScan")) + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", + "2 222 222", + "3 113 1003")) + require.False(t, lastReadFromCache(tk)) + // wait write lock expire and check cache can be used again - for !tk.HasPlan("select * from write_tmp1", "UnionScan") { - tk.MustExec("select * from write_tmp1") + for !lastReadFromCache(tk) { + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows( + "1 101 1001", + "2 222 222", + "3 113 1003")) } tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003")) tk1.MustExec("update write_tmp1 set v = 3333 where id = 2") - for !tk.HasPlan("select * from write_tmp1", "UnionScan") { - tk.MustExec("select * from write_tmp1") + for !lastReadFromCache(tk) { + tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 222", "3 113 1003")) } tk.MustQuery("select * from write_tmp1").Check(testkit.Rows("1 101 1001", "2 222 3333", "3 113 1003")) } @@ -258,7 +253,8 @@ func TestCacheTableComplexRead(t *testing.T) { var i int for i = 0; i < 100; i++ { time.Sleep(100 * time.Millisecond) - if tk1.HasPlan("select * from complex_cache where id > 7", "UnionScan") { + tk1.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) + if lastReadFromCache(tk1) { break } } @@ -269,14 +265,16 @@ func TestCacheTableComplexRead(t *testing.T) { tk2.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) for i = 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) - if tk2.HasPlan("select * from complex_cache where id > 7", "UnionScan") { + tk2.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) + if lastReadFromCache(tk2) { break } } require.True(t, i < 10) tk2.MustExec("commit") - tk1.HasPlan("select * from complex_cache where id > 7", "UnionScan") + tk1.MustQuery("select * from complex_cache where id > 7").Check(testkit.Rows("9 109 1009")) + require.True(t, lastReadFromCache(tk1)) tk1.MustExec("commit") } @@ -298,7 +296,8 @@ func TestBeginSleepABA(t *testing.T) { tk1.MustQuery("select * from aba").Check(testkit.Rows("1 1")) cacheUsed := false for i := 0; i < 100; i++ { - if tk1.HasPlan("select * from aba", "UnionScan") { + tk1.MustQuery("select * from aba").Check(testkit.Rows("1 1")) + if lastReadFromCache(tk1) { cacheUsed = true break } @@ -309,7 +308,8 @@ func TestBeginSleepABA(t *testing.T) { tk1.MustExec("begin") cacheUsed = false for i := 0; i < 100; i++ { - if tk1.HasPlan("select * from aba", "UnionScan") { + tk1.MustQuery("select * from aba").Check(testkit.Rows("1 1")) + if lastReadFromCache(tk1) { cacheUsed = true break } @@ -320,9 +320,10 @@ func TestBeginSleepABA(t *testing.T) { tk2.MustExec("update aba set v = 2") // And then make the cache available again. - for i := 0; i < 50; i++ { + cacheUsed = false + for i := 0; i < 100; i++ { tk2.MustQuery("select * from aba").Check(testkit.Rows("1 2")) - if tk2.HasPlan("select * from aba", "UnionScan") { + if lastReadFromCache(tk2) { cacheUsed = true break } @@ -331,7 +332,8 @@ func TestBeginSleepABA(t *testing.T) { require.True(t, cacheUsed) // tk1 should not use the staled cache, because the data is changed. - require.False(t, tk1.HasPlan("select * from aba", "UnionScan")) + tk1.MustQuery("select * from aba").Check(testkit.Rows("1 1")) + require.False(t, lastReadFromCache(tk1)) } func TestCacheTablePointGet(t *testing.T) { @@ -468,13 +470,13 @@ func TestCacheTableWriteOperatorWaitLockLease(t *testing.T) { tk.MustExec("drop table if exists wait_tb1") tk.MustExec("create table wait_tb1(id int)") tk.MustExec("alter table wait_tb1 cache") - tk.MustExec("select *from wait_tb1") var i int for i = 0; i < 10; i++ { - time.Sleep(100 * time.Millisecond) - if tk.HasPlan("select *from wait_tb1", "UnionScan") { + tk.MustQuery("select * from wait_tb1").Check(testkit.Rows()) + if lastReadFromCache(tk) { break } + time.Sleep(100 * time.Millisecond) } require.True(t, i < 10) stmtsummary.StmtSummaryByDigestMap.Clear() @@ -483,3 +485,166 @@ func TestCacheTableWriteOperatorWaitLockLease(t *testing.T) { tk.MustQuery("select DIGEST_TEXT from INFORMATION_SCHEMA.STATEMENTS_SUMMARY where MAX_BACKOFF_TIME > 0 or MAX_WAIT_TIME > 0").Check(testkit.Rows("insert into `wait_tb1` values ( ? )")) } + +func TestTableCacheLeaseVariable(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + // Check default value. + tk.MustQuery("select @@global.tidb_table_cache_lease").Check(testkit.Rows("3")) + + // Check a valid value. + tk.MustExec("set @@global.tidb_table_cache_lease = 1;") + tk.MustQuery("select @@global.tidb_table_cache_lease").Check(testkit.Rows("1")) + + // Check a invalid value, the valid range is [2, 10] + tk.MustExec("set @@global.tidb_table_cache_lease = 111;") + tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_table_cache_lease value: '111'")) + tk.MustQuery("select @@global.tidb_table_cache_lease").Check(testkit.Rows("10")) + + // Change to a non-default value and verify the behaviour. + tk.MustExec("set @@global.tidb_table_cache_lease = 2;") + + tk.MustExec("drop table if exists test_lease_variable;") + tk.MustExec(`create table test_lease_variable(c0 int, c1 varchar(20), c2 varchar(20), unique key uk(c0));`) + tk.MustExec(`insert into test_lease_variable(c0, c1, c2) values (1, null, 'green');`) + tk.MustExec(`alter table test_lease_variable cache;`) + + cached := false + for i := 0; i < 20; i++ { + tk.MustQuery("select * from test_lease_variable").Check(testkit.Rows("1 green")) + if lastReadFromCache(tk) { + cached = true + break + } + time.Sleep(50 * time.Millisecond) + } + require.True(t, cached) + + start := time.Now() + tk.MustExec("update test_lease_variable set c0 = 2") + duration := time.Since(start) + + // The lease is 2s, check how long the write operation takes. + require.True(t, duration > time.Second) + require.True(t, duration < 3*time.Second) +} + +func TestMetrics(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists test_metrics;") + tk.MustExec(`create table test_metrics(c0 int, c1 varchar(20), c2 varchar(20), unique key uk(c0));`) + tk.MustExec(`create table nt (c0 int, c1 varchar(20), c2 varchar(20), unique key uk(c0));`) + tk.MustExec(`insert into test_metrics(c0, c1, c2) values (1, null, 'green');`) + tk.MustExec(`alter table test_metrics cache;`) + + tk.MustQuery("select * from test_metrics").Check(testkit.Rows("1 green")) + cached := false + for i := 0; i < 20; i++ { + if lastReadFromCache(tk) { + cached = true + break + } + time.Sleep(50 * time.Millisecond) + tk.MustQuery("select * from test_metrics").Check(testkit.Rows("1 green")) + } + require.True(t, cached) + + counter := metrics.ReadFromTableCacheCounter + pb := &dto.Metric{} + + queries := []string{ + // Table scan + "select * from test_metrics", + // Index scan + "select c0 from test_metrics use index(uk) where c0 > 1", + // Index Lookup + "select c1 from test_metrics use index(uk) where c0 = 1", + // Point Get + "select c0 from test_metrics use index(uk) where c0 = 1", + // // Aggregation + "select count(*) from test_metrics", + // Join + "select * from test_metrics as a join test_metrics as b on a.c0 = b.c0 where a.c1 != 'xxx'", + } + counter.Write(pb) + i := pb.GetCounter().GetValue() + + for _, query := range queries { + tk.MustQuery(query) + i++ + counter.Write(pb) + hit := pb.GetCounter().GetValue() + require.Equal(t, i, hit) + } + + // A counter-example that doesn't increase metrics.ReadFromTableCacheCounter. + tk.MustQuery("select * from nt") + counter.Write(pb) + hit := pb.GetCounter().GetValue() + require.Equal(t, i, hit) +} + +func TestRenewLeaseABAFailPoint(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + tables.TestMockRenewLeaseABA2 = make(chan struct{}) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t_lease;") + tk.MustExec(`create table t_lease(a int, b int);`) + tk.MustExec(`insert into t_lease values (1, 1)`) + tk.MustExec(`alter table t_lease cache`) + + tk1 := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk2.MustExec("use test") + + // Load the cache data by this query. + var cacheUsed bool + for i := 0; i < 10; i++ { + tk.MustQuery("select * from t_lease").Check(testkit.Rows("1 1")) + if lastReadFromCache(tk) { + cacheUsed = true + break + } + time.Sleep(50 * time.Millisecond) + } + require.True(t, cacheUsed) + + // Renew lease by this query, mock the operation is delayed. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/table/tables/mockRenewLeaseABA1", `return`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/table/tables/mockRenewLeaseABA2", `return`)) + tk.MustQuery("select * from t_lease").Check(testkit.Rows("1 1")) + + // Make the cache data stale after writing: read lock-> write lock + tk1.MustExec("update t_lease set b = 2 where a = 1") + + // Mock reading from another TiDB instance: write lock -> read lock + is := tk2.Session().GetInfoSchema().(infoschema.InfoSchema) + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t_lease")) + require.NoError(t, err) + lease := oracle.GoTimeToTS(time.Now().Add(20 * time.Second)) // A big enough future time + tk2.MustExec("update mysql.table_cache_meta set lock_type = 'READ', lease = ? where tid = ?", lease, tbl.Meta().ID) + + // Then the stagnant renew lease operation finally arrive. + tables.TestMockRenewLeaseABA2 <- struct{}{} + + <-tables.TestMockRenewLeaseABA2 + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/table/tables/mockRenewLeaseABA1")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/table/tables/mockRenewLeaseABA2")) + + // The renew lease operation should not success, + // And the session should not read from a staled cache data. + tk.MustQuery("select * from t_lease").Check(testkit.Rows("1 2")) + require.False(t, lastReadFromCache(tk)) +} diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index aeddd5b972ab2..3ffb2fc73a086 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -67,10 +67,13 @@ type StateRemote interface { LockForRead(ctx context.Context, tid int64, lease uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID - LockForWrite(ctx context.Context, tid int64) (uint64, error) + LockForWrite(ctx context.Context, tid int64, leaseDuration time.Duration) (uint64, error) - // RenewLease attempt to renew the read / write lock on the table with the specified tableID - RenewLease(ctx context.Context, tid int64, newTs uint64, op RenewLeaseType) (bool, error) + // RenewReadLease attempt to renew the read lock lease on the table with the specified tableID + RenewReadLease(ctx context.Context, tid int64, oldLocalLease, newValue uint64) (uint64, error) + + // RenewWriteLease attempt to renew the write lock lease on the table with the specified tableID + RenewWriteLease(ctx context.Context, tid int64, newTs uint64) (bool, error) } type sqlExec interface { @@ -96,7 +99,7 @@ func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLoc return lockType, lease, err } -func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint64) ( /*succ*/ bool, error) { +func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, newLease uint64) ( /*succ*/ bool, error) { h.Lock() defer h.Unlock() succ := false @@ -108,7 +111,7 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint6 // The old lock is outdated, clear orphan lock. if now > lease { succ = true - if err := h.updateRow(ctx, tid, "READ", ts); err != nil { + if err := h.updateRow(ctx, tid, "READ", newLease); err != nil { return errors.Trace(err) } return nil @@ -121,8 +124,8 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint6 return nil } succ = true - if ts > lease { // Note the check, don't decrease lease value! - if err := h.updateRow(ctx, tid, "READ", ts); err != nil { + if newLease > lease { // Note the check, don't decrease lease value! + if err := h.updateRow(ctx, tid, "READ", newLease); err != nil { return errors.Trace(err) } } @@ -133,12 +136,12 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint6 } // LockForWrite try to add a write lock to the table with the specified tableID, return the write lock lease. -func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64) (uint64, error) { +func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, leaseDuration time.Duration) (uint64, error) { h.Lock() defer h.Unlock() var ret uint64 for { - waitAndRetry, lease, err := h.lockForWriteOnce(ctx, tid) + waitAndRetry, lease, err := h.lockForWriteOnce(ctx, tid, leaseDuration) if err != nil { return 0, err } @@ -151,13 +154,13 @@ func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64) (uint64 return ret, nil } -func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64) (waitAndRetry time.Duration, ts uint64, err error) { +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, leaseDuration time.Duration) (waitAndRetry time.Duration, ts uint64, err error) { err = h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) } - ts = leaseFromTS(now) + ts = leaseFromTS(now, leaseDuration) // The lease is outdated, so lock is invalid, clear orphan lock of any kind. if now > lease { if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { @@ -214,27 +217,17 @@ func waitForLeaseExpire(oldReadLease, now uint64) time.Duration { return 0 } -func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, newLease uint64, op RenewLeaseType) (bool, error) { - h.Lock() - defer h.Unlock() - - switch op { - case RenewReadLease: - return h.renewReadLease(ctx, tid, newLease) - case RenewWriteLease: - return h.renewWriteLease(ctx, tid, newLease) - } - return false, errors.New("wrong renew lease type") -} - -func (h *stateRemoteHandle) renewReadLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { - var succ bool +// RenewReadLease renew the read lock lease. +// Return the current lease value on success, and return 0 on fail. +func (h *stateRemoteHandle) RenewReadLease(ctx context.Context, tid int64, oldLocalLease, newValue uint64) (uint64, error) { + var newLease uint64 err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { - lockType, oldLease, _, err := h.loadRow(ctx, tid) + lockType, remoteLease, _, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) } - if now >= oldLease { + + if now >= remoteLease { // read lock had already expired, fail to renew return nil } @@ -243,19 +236,36 @@ func (h *stateRemoteHandle) renewReadLease(ctx context.Context, tid int64, newLe return nil } - if newLease > oldLease { // lease should never decrease! - err = h.updateRow(ctx, tid, "READ", newLease) + // It means that the lease had already been changed by some other TiDB instances. + if oldLocalLease != remoteLease { + // 1. Data in [cacheDataTS -------- oldLocalLease) time range is also immutable. + // 2. Data in [ now ------------------- remoteLease) time range is immutable. + // + // If now < oldLocalLease, it means data in all the time range is immutable, + // so the old cache data is still available. + if now < oldLocalLease { + newLease = remoteLease + } + // Otherwise, there might be write operation during the oldLocalLease and the new remoteLease + // Make renew lease operation fail. + return nil + } + + if newValue > remoteLease { // lease should never decrease! + err = h.updateRow(ctx, tid, "READ", newValue) if err != nil { return errors.Trace(err) } + newLease = newValue + } else { + newLease = remoteLease } - succ = true return nil }) - return succ, err + return newLease, err } -func (h *stateRemoteHandle) renewWriteLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { +func (h *stateRemoteHandle) RenewWriteLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { var succ bool err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, oldLease, _, err := h.loadRow(ctx, tid) @@ -304,6 +314,11 @@ func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Co return errors.Trace(err) } + _, err = h.execSQL(ctx, "set @@session.tidb_retry_limit = 0") + if err != nil { + return errors.Trace(err) + } + rows, err := h.execSQL(ctx, "select @@tidb_current_ts") if err != nil { return errors.Trace(err) @@ -324,7 +339,7 @@ func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Co } func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64) (CachedTableLockType, uint64, uint64, error) { - chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %? for update", tid) + chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %?", tid) if err != nil { return 0, 0, 0, errors.Trace(err) } diff --git a/table/tables/state_remote_test.go b/table/tables/state_remote_test.go index dc4e9272b1830..5d75c5e47d129 100644 --- a/table/tables/state_remote_test.go +++ b/table/tables/state_remote_test.go @@ -79,9 +79,9 @@ func TestStateRemote(t *testing.T) { // Renew read lock lease operation. leaseVal = oracle.GoTimeToTS(physicalTime.Add(400 * time.Millisecond)) - succ, err = h.RenewLease(ctx, 5, leaseVal, tables.RenewReadLease) + leaseVal, err = h.RenewReadLease(ctx, 5, lease, leaseVal) require.NoError(t, err) - require.True(t, succ) + require.True(t, leaseVal > 0) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockRead) @@ -89,7 +89,7 @@ func TestStateRemote(t *testing.T) { require.Equal(t, lease, leaseVal) // Check write lock. - writeLease, err := h.LockForWrite(ctx, 5) + writeLease, err := h.LockForWrite(ctx, 5, 3*time.Second) require.NoError(t, err) lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) @@ -99,25 +99,25 @@ func TestStateRemote(t *testing.T) { require.Greater(t, writeLease, leaseVal) // Lock for write again - writeLease, err = h.LockForWrite(ctx, 5) + writeLease, err = h.LockForWrite(ctx, 5, 3*time.Second) require.NoError(t, err) - lockType, _, err = h.Load(ctx, 5) + lockType, lease, err = h.Load(ctx, 5) require.NoError(t, err) require.Equal(t, lockType, tables.CachedTableLockWrite) require.Equal(t, lockType.String(), "WRITE") // Renew read lock lease should fail when the write lock is hold. - succ, err = h.RenewLease(ctx, 5, leaseVal, tables.RenewReadLease) + leaseVal, err = h.RenewReadLease(ctx, 5, lease, lease+1) require.NoError(t, err) - require.False(t, succ) + require.False(t, leaseVal > 0) // Acquire read lock should also fail when the write lock is hold. - succ, err = h.LockForRead(ctx, 5, leaseVal) + succ, err = h.LockForRead(ctx, 5, lease+1) require.NoError(t, err) require.False(t, succ) // Renew write lease. - succ, err = h.RenewLease(ctx, 5, writeLease+1, tables.RenewWriteLease) + succ, err = h.RenewWriteLease(ctx, 5, writeLease+1) require.NoError(t, err) require.True(t, succ) diff --git a/tests/readonlytest/go.mod b/tests/readonlytest/go.mod index a4c912bf9de6b..cc0b22e3b1343 100644 --- a/tests/readonlytest/go.mod +++ b/tests/readonlytest/go.mod @@ -6,7 +6,7 @@ require ( github.com/go-sql-driver/mysql v1.6.0 github.com/pingcap/tidb v2.0.11+incompatible github.com/stretchr/testify v1.7.0 - go.uber.org/goleak v1.1.11 + go.uber.org/goleak v1.1.12 ) replace github.com/pingcap/tidb => ../../ diff --git a/tests/readonlytest/go.sum b/tests/readonlytest/go.sum index f3facf536b3f1..387cfacb99a28 100644 --- a/tests/readonlytest/go.sum +++ b/tests/readonlytest/go.sum @@ -43,6 +43,10 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 cloud.google.com/go/storage v1.16.1/go.mod h1:LaNorbty3ehnU3rEjXSNV/NRgQA0O8Y+uh6bPe5UOk4= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= +github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0/go.mod h1:ZPW/Z0kLCTdDZaDbYTetxc9Cxl/2lNqxYHYNOF2bti0= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0/go.mod h1:GJzjM4SR9T0KyX5gKCVyz1ytD8FeWeUPCwtFCt1AyfE= +github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.0/go.mod h1:eHWhQKXc1Gv1DvWH//UzgWjWFEo0Pp4pH2vBzjBw8Fc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EFZQ978U7x8IRnstaskI3IysnWY5Ao3QgZUKOXlsAdw= @@ -151,6 +155,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= +github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -175,6 +181,7 @@ github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= +github.com/form3tech-oss/jwt-go v3.2.5+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsouza/fake-gcs-server v1.19.0/go.mod h1:JtXHY/QzHhtyIxsNfIuQ+XgHtRb5B/w8nqbL5O8zqo0= @@ -352,6 +359,7 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y= github.com/iris-contrib/blackfriday v2.0.0+incompatible/go.mod h1:UzZ2bDEoaSGPbkg6SAB4att1aAwTmVIx/5gCVqeyUdI= github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/+fafWORmlnuysV2EMP8MW+qe0= github.com/iris-contrib/i18n v0.0.0-20171121225848-987a633949d0/go.mod h1:pMCz62A0xJL6I+umB2YTlFRwWXaDFA0jy+5HzGiJjqI= @@ -434,6 +442,7 @@ github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= +github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y= github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI= github.com/mattn/go-sqlite3 v2.0.1+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= @@ -450,6 +459,7 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= github.com/montanaflynn/stats v0.5.0/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= @@ -496,6 +506,7 @@ github.com/pingcap/errors v0.11.5-0.20200917111840-a15ef68f753d/go.mod h1:g4vx// github.com/pingcap/errors v0.11.5-0.20201029093017-5a7df2af2ac7/go.mod h1:G7x87le1poQzLB/TqvTJI2ILrSgobnq4Ut7luOwvfvI= github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3/go.mod h1:G7x87le1poQzLB/TqvTJI2ILrSgobnq4Ut7luOwvfvI= github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= +github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk= github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd/go.mod h1:IVF+ijPSMZVtx2oIqxAg7ur6EyixtTYfOHwpfmlhqI4= @@ -506,20 +517,31 @@ github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLy github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210805052247-76981389e818/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20210819164333-bd5706b9d9f2/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20211207042851-78a55fb8e69c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= +github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 h1:SvWCbCPh1YeHd9yQLksvJYAgft6wLTY1aNG81tpyscQ= github.com/pingcap/log v0.0.0-20210906054005-afc726e70354/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8= github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5/go.mod h1:XsOaV712rUk63aOEKYP9PhXTIE3FMNHmC2r1wX5wElY= +github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d/go.mod h1:7j18ezaWTao2LHOyMlsc2Dg1vW+mDY9dEbPzVyOlaeM= github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:OzFN8H0EDMMqeulPhPMw2i2JaiZWOKFQ7zdRPhENNgo= github.com/pingcap/tidb-dashboard v0.0.0-20210716172320-2226872e3296/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ= +github.com/pingcap/tidb-dashboard v0.0.0-20211008050453-a25c25809529/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ= +github.com/pingcap/tidb-dashboard v0.0.0-20211107164327-80363dfbe884/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ= github.com/pingcap/tidb-tools v5.0.3+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= +github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20210802080519-94b831c6db55/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tipb v0.0.0-20220107024056-3b91949a18a7/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -569,6 +591,7 @@ github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.21.2+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/shirou/gopsutil v3.21.3+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0/go.mod h1:919LwcH0M7/W4fcZ0/jy0qGght1GIhqyS/EgWGH2j5Q= @@ -619,8 +642,11 @@ github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJH github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tikv/client-go/v2 v2.0.0-alpha.0.20210926100628-3cc2459779ca/go.mod h1:KwtZXt0JD+bP9bWW2ka0ir3Wp3oTEfZUTh22bs2sI4o= +github.com/tikv/client-go/v2 v2.0.0-rc.0.20211229051614-62d6b4a2e8f7/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= github.com/tikv/pd v1.1.0-beta.0.20210323121136-78679e5e209d/go.mod h1:Jw9KG11C/23Rr7DW4XWQ7H5xOgGZo6DFL1OKAF4+Igw= github.com/tikv/pd v1.1.0-beta.0.20210818082359-acba1da0018d/go.mod h1:rammPjeZgpvfrQRPkijcx8tlxF1XM5+m6kRXrkDzCAA= +github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ= +github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee/go.mod h1:lRbwxBAhnTQR5vqbTzeI/Bj62bD2OvYYuFezo2vrmeI= github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek= github.com/tklauser/numcpus v0.2.1/go.mod h1:9aU+wOc6WjUIZEwWMP62PL/41d65P+iks1gBkr4QyP8= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -668,6 +694,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= @@ -688,6 +715,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q= go.uber.org/dig v1.8.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw= @@ -697,11 +725,14 @@ go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.8.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= @@ -712,6 +743,7 @@ go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= +go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI= go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= golang.org/x/crypto v0.0.0-20180723164146-c126467f60eb/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -728,8 +760,10 @@ golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -770,6 +804,7 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -808,6 +843,7 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -817,6 +853,9 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -909,6 +948,9 @@ golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -998,12 +1040,15 @@ golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.8/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= +gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= +gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= @@ -1158,6 +1203,7 @@ gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mN gopkg.in/jcmturner/gokrb5.v7 v7.3.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= @@ -1187,7 +1233,20 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.2.0/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= +modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8= +modernc.org/golex v1.0.1/go.mod h1:QCA53QtsT1NdGkaZZkF5ezFwk4IXh4BGNafAARTC254= +modernc.org/lex v1.0.0/go.mod h1:G6rxMTy3cH2iA0iXL/HRRv4Znu8MK4higxph/lE7ypk= +modernc.org/lexer v1.0.0/go.mod h1:F/Dld0YKYdZCLQ7bD0USbWL4YKCyTDRDHiDTOs0q0vk= +modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/parser v1.0.0/go.mod h1:H20AntYJ2cHHL6MHthJ8LZzXCdDCHMWt1KZXtIMjejA= +modernc.org/parser v1.0.2/go.mod h1:TXNq3HABP3HMaqLK7brD1fLA/LfN0KS6JxZn71QdDqs= +modernc.org/scanner v1.0.1/go.mod h1:OIzD2ZtjYk6yTuyqZr57FmifbM9fIH74SumloSsajuE= +modernc.org/sortutil v1.0.0/go.mod h1:1QO0q8IlIlmjBIwm6t/7sof874+xCfZouyqZMLIAtxM= +modernc.org/strutil v1.0.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= +modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= +modernc.org/y v1.0.1/go.mod h1:Ho86I+LVHEI+LYXoUKlmOMAM1JTXOCfj8qi1T8PsClE= moul.io/zapgorm2 v1.1.0/go.mod h1:emRfKjNqSzVj5lcgasBdovIXY1jSOwFz2GQZn1Rddks= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/tests/readonlytest/readonly_test.go b/tests/readonlytest/readonly_test.go index 8674ea4c45dba..bf2f267531788 100644 --- a/tests/readonlytest/readonly_test.go +++ b/tests/readonlytest/readonly_test.go @@ -27,9 +27,14 @@ import ( ) var ( - tidbRootPassword = flag.String("passwd", "", "tidb root password") - tidbStartPort = flag.Int("tidb_start_port", 4000, "first tidb server listening port") - ReadOnlyErrMsg = "Error 1836: Running in read-only mode" + tidbRootPassword = flag.String("passwd", "", "tidb root password") + tidbAPort = flag.Int("tidb_a_port", 4001, "first tidb server listening port") + tidbBPort = flag.Int("tidb_b_port", 4002, "second tidb server listening port") + ReadOnlyErrMsg = "Error 1836: Running in read-only mode" + ConflictErrMsg = "Error 1105: can't turn off tidb_super_read_only when tidb_restricted_read_only is on" + PriviledgedErrMsg = "Error 1227: Access denied; you need (at least one of) the SUPER or SYSTEM_VARIABLES_ADMIN privilege(s) for this operation" + TiDBRestrictedReadOnly = "tidb_restricted_read_only" + TiDBSuperReadOnly = "tidb_super_read_only" ) type ReadOnlySuite struct { @@ -38,34 +43,40 @@ type ReadOnlySuite struct { rdb *sql.DB } -func checkVariable(t *testing.T, db *sql.DB, on bool) { +func checkVariable(t *testing.T, db *sql.DB, variable string, on bool) { var name, status string - rs, err := db.Query("show variables like 'tidb_restricted_read_only'") + rs, err := db.Query(fmt.Sprintf("show variables like '%s'", variable)) require.NoError(t, err) require.True(t, rs.Next()) require.NoError(t, rs.Scan(&name, &status)) - require.Equal(t, name, "tidb_restricted_read_only") + require.Equal(t, name, variable) if on { - require.Equal(t, status, "ON") + require.Equal(t, "ON", status) } else { - require.Equal(t, status, "OFF") + require.Equal(t, "OFF", status) } require.NoError(t, rs.Close()) } -func setReadOnly(t *testing.T, db *sql.DB, status int) { - _, err := db.Exec(fmt.Sprintf("set global tidb_restricted_read_only=%d", status)) +func setVariableNoError(t *testing.T, db *sql.DB, variable string, status int) { + _, err := db.Exec(fmt.Sprintf("set global %s=%d", variable, status)) require.NoError(t, err) } +func setVariable(t *testing.T, db *sql.DB, variable string, status int) error { + _, err := db.Exec(fmt.Sprintf("set global %s=%d", variable, status)) + return err +} + func createReadOnlySuite(t *testing.T) (s *ReadOnlySuite, clean func()) { s = new(ReadOnlySuite) var err error - s.db, err = sql.Open("mysql", fmt.Sprintf("root:%s@(%s:%d)/test", *tidbRootPassword, "127.0.0.1", *tidbStartPort+1)) + s.db, err = sql.Open("mysql", fmt.Sprintf("root:%s@(%s:%d)/test", *tidbRootPassword, "127.0.0.1", *tidbAPort)) require.NoError(t, err) - setReadOnly(t, s.db, 0) + setVariableNoError(t, s.db, TiDBRestrictedReadOnly, 0) + setVariableNoError(t, s.db, TiDBSuperReadOnly, 0) _, err = s.db.Exec("drop user if exists 'u1'@'%'") require.NoError(t, err) @@ -73,7 +84,7 @@ func createReadOnlySuite(t *testing.T) (s *ReadOnlySuite, clean func()) { require.NoError(t, err) _, err = s.db.Exec("grant all privileges on test.* to 'u1'@'%'") require.NoError(t, err) - s.udb, err = sql.Open("mysql", fmt.Sprintf("u1:password@(%s:%d)/test", "127.0.0.1", *tidbStartPort+2)) + s.udb, err = sql.Open("mysql", fmt.Sprintf("u1:password@(%s:%d)/test", "127.0.0.1", *tidbBPort)) require.NoError(t, err) _, err = s.db.Exec("drop user if exists 'r1'@'%'") require.NoError(t, err) @@ -83,7 +94,7 @@ func createReadOnlySuite(t *testing.T) (s *ReadOnlySuite, clean func()) { require.NoError(t, err) _, err = s.db.Exec("grant RESTRICTED_REPLICA_WRITER_ADMIN on *.* to 'r1'@'%'") require.NoError(t, err) - s.rdb, err = sql.Open("mysql", fmt.Sprintf("r1:password@(%s:%d)/test", "127.0.0.1", *tidbStartPort+2)) + s.rdb, err = sql.Open("mysql", fmt.Sprintf("r1:password@(%s:%d)/test", "127.0.0.1", *tidbBPort)) require.NoError(t, err) clean = func() { require.NoError(t, s.db.Close()) @@ -96,27 +107,82 @@ func createReadOnlySuite(t *testing.T) (s *ReadOnlySuite, clean func()) { func TestRestriction(t *testing.T) { s, clean := createReadOnlySuite(t) defer clean() - _, err := s.db.Exec("set global tidb_restricted_read_only=1") + + var err error + _, err = s.db.Exec("drop table if exists t") require.NoError(t, err) + _, err = s.udb.Exec("create table t (a int primary key, b int)") + require.NoError(t, err) + _, err = s.udb.Exec("insert into t values (1, 1)") + require.NoError(t, err) + _, err = s.udb.Exec("update t set b = 2 where a = 1") + require.NoError(t, err) + + setVariable(t, s.db, TiDBRestrictedReadOnly, 1) time.Sleep(1) - checkVariable(t, s.udb, true) + checkVariable(t, s.udb, TiDBRestrictedReadOnly, true) + checkVariable(t, s.udb, TiDBSuperReadOnly, true) + + checkVariable(t, s.rdb, TiDBRestrictedReadOnly, true) + checkVariable(t, s.rdb, TiDBSuperReadOnly, true) + + // can't create table _, err = s.udb.Exec("create table t(a int)") require.Error(t, err) require.Equal(t, err.Error(), ReadOnlyErrMsg) + + // can't do point update when tidb_restricted_read_only is on + _, err = s.udb.Exec("update t set b = 2 where a = 1") + require.Error(t, err) + require.Equal(t, err.Error(), ReadOnlyErrMsg) + + // can't insert + _, err = s.udb.Exec("insert into t values (2, 3)") + require.Error(t, err) + require.Equal(t, err.Error(), ReadOnlyErrMsg) + + // can't turn off tidb_super_read_only if tidb_restricted_read_only is on + err = setVariable(t, s.db, TiDBSuperReadOnly, 0) + require.Error(t, err) + require.Equal(t, err.Error(), ConflictErrMsg) + + // can't change global variable + err = setVariable(t, s.udb, TiDBSuperReadOnly, 0) + require.Error(t, err) + require.Equal(t, err.Error(), PriviledgedErrMsg) + + err = setVariable(t, s.rdb, TiDBSuperReadOnly, 0) + require.Error(t, err) + require.Equal(t, err.Error(), PriviledgedErrMsg) + + // turn off tidb_restricted_read_only does not affect tidb_super_read_only + setVariableNoError(t, s.db, TiDBRestrictedReadOnly, 0) + + checkVariable(t, s.udb, TiDBRestrictedReadOnly, false) + checkVariable(t, s.rdb, TiDBRestrictedReadOnly, false) + + checkVariable(t, s.udb, TiDBSuperReadOnly, true) + checkVariable(t, s.rdb, TiDBSuperReadOnly, true) + + // it is now allowed to turn off tidb_super_read_only + setVariableNoError(t, s.db, TiDBSuperReadOnly, 0) + + checkVariable(t, s.udb, TiDBRestrictedReadOnly, false) + checkVariable(t, s.rdb, TiDBRestrictedReadOnly, false) + + checkVariable(t, s.udb, TiDBSuperReadOnly, false) + checkVariable(t, s.rdb, TiDBSuperReadOnly, false) } func TestRestrictionWithConnectionPool(t *testing.T) { s, clean := createReadOnlySuite(t) defer clean() - _, err := s.db.Exec("set global tidb_restricted_read_only=0") - require.NoError(t, err) + var err error _, err = s.db.Exec("drop table if exists t") require.NoError(t, err) _, err = s.db.Exec("create table t (a int)") require.NoError(t, err) - time.Sleep(1) - checkVariable(t, s.udb, false) conn, err := s.udb.Conn(context.Background()) require.NoError(t, err) @@ -142,8 +208,7 @@ func TestRestrictionWithConnectionPool(t *testing.T) { time.Sleep(1 * time.Second) timer := time.NewTimer(10 * time.Second) - _, err = s.db.Exec("set global tidb_restricted_read_only=1") - require.NoError(t, err) + setVariableNoError(t, s.db, TiDBRestrictedReadOnly, 1) select { case <-timer.C: require.Fail(t, "failed") @@ -161,8 +226,6 @@ func TestReplicationWriter(t *testing.T) { require.NoError(t, err) _, err = s.db.Exec("create table t (a int)") require.NoError(t, err) - time.Sleep(1) - checkVariable(t, s.udb, false) conn, err := s.rdb.Conn(context.Background()) require.NoError(t, err)