Skip to content

Commit

Permalink
*: support small cached table feature on 5.4 (#32703)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Mar 2, 2022
1 parent 55f3b24 commit 9dff1e1
Show file tree
Hide file tree
Showing 40 changed files with 1,045 additions and 399 deletions.
6 changes: 5 additions & 1 deletion ddl/db_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,14 @@ func TestCacheTableSizeLimit(t *testing.T) {
require.NoError(t, err)
}

lastReadFromCache := func(tk *testkit.TestKit) bool {
return tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache
}

cached := false
for i := 0; i < 200; i++ {
tk.MustQuery("select count(*) from (select * from cache_t2 limit 1) t1").Check(testkit.Rows("1"))
if tk.HasPlan("select * from cache_t2", "UnionScan") {
if lastReadFromCache(tk) {
cached = true
break
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/placement_policy_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *testDDLSuite) TestPlacementPolicyInUse(c *C) {
t4.State = model.StatePublic
db1.Tables = append(db1.Tables, t4)

builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(
builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(
[]*model.DBInfo{db1, db2, dbP},
nil,
[]*model.PolicyInfo{p1, p2, p3, p4, p5},
Expand Down
25 changes: 3 additions & 22 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ type Domain struct {
serverID uint64
serverIDSession *concurrency.Session
isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false.
renewLeaseCh chan func() // It is used to call the renewLease function of the cache table.
onClose func()
sysExecutorFactory func(*Domain) (pools.Resource, error)
}
Expand Down Expand Up @@ -162,7 +161,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
return nil, false, currentSchemaVersion, nil, err
}

newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion)
newISBuilder, err := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion)
if err != nil {
return nil, false, currentSchemaVersion, nil, err
}
Expand Down Expand Up @@ -284,7 +283,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
}
diffs = append(diffs, diff)
}
builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest())
builder := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest())
phyTblIDs := make([]int64, 0, len(diffs))
actions := make([]uint64, 0, len(diffs))
for _, diff := range diffs {
Expand Down Expand Up @@ -729,7 +728,6 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
indexUsageSyncLease: idxUsageSyncLease,
planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease},
onClose: onClose,
renewLeaseCh: make(chan func(), 10),
expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp),
}

Expand Down Expand Up @@ -856,10 +854,9 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain)
// Local store needs to get the change information for every DDL state in each session.
go do.loadSchemaInLoop(ctx, ddlLease)
}
do.wg.Add(4)
do.wg.Add(3)
go do.topNSlowQueryLoop()
go do.infoSyncerKeeper()
go do.renewLease()
go do.globalConfigSyncerKeeper()
if !skipRegisterToDashboard {
do.wg.Add(1)
Expand Down Expand Up @@ -1777,22 +1774,6 @@ func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) {
do.infoCache.Insert(is, 0)
}

func (do *Domain) renewLease() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("renew lease goroutine exited.")
}()
for {
select {
case <-do.exit:
close(do.renewLeaseCh)
return
case op := <-do.renewLeaseCh:
op()
}
}
}

func init() {
initByLDFlagsForGlobalKill()
}
Expand Down
9 changes: 9 additions & 0 deletions domain/sysvar_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func (do *Domain) checkEnableServerGlobalVar(name, sVal string) {
break
}
topsqlstate.GlobalState.ReportIntervalSeconds.Store(val)
case variable.TiDBSuperReadOnly:
variable.VarTiDBSuperReadOnly.Store(variable.TiDBOptOn(sVal))
case variable.TiDBRestrictedReadOnly:
variable.RestrictedReadOnly.Store(variable.TiDBOptOn(sVal))
case variable.TiDBStoreLimit:
Expand All @@ -239,6 +241,13 @@ func (do *Domain) checkEnableServerGlobalVar(name, sVal string) {
break
}
storekv.StoreLimit.Store(val)
case variable.TiDBTableCacheLease:
var val int64
val, err = strconv.ParseInt(sVal, 10, 64)
if err != nil {
break
}
variable.TableCacheLease.Store(val)
case variable.TiDBPersistAnalyzeOptions:
variable.PersistAnalyzeOptions.Store(variable.TiDBOptOn(sVal))
case variable.TiDBEnableColumnTracking:
Expand Down
4 changes: 4 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,10 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
sessVars.DurationParse = 0
// Clean the stale read flag when statement execution finish
sessVars.StmtCtx.IsStaleness = false

if sessVars.StmtCtx.ReadFromTableCache {
metrics.ReadFromTableCacheCounter.Inc()
}
}

// CloseRecordSet will finish the execution of current statement and do some record work
Expand Down
95 changes: 78 additions & 17 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/aggfuncs"
Expand All @@ -46,6 +45,7 @@ import (
plannerutil "github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -1067,7 +1067,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
return x
}
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), reader)}
us.cacheTable = v.CacheTable
// Get the handle column index of the below Plan.
us.belowHandleCols = v.HandleCols
us.mutableRow = chunk.MutRowFromTypes(retTypes(us))
Expand All @@ -1083,13 +1082,37 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.collators = append(us.collators, collate.GetCollator(tp.Collate))
}

startTS, err := b.getSnapshotTS()
sessionVars := b.ctx.GetSessionVars()
if err != nil {
b.err = err
return nil
}

switch x := reader.(type) {
case *TableReaderExecutor:
us.desc = x.desc
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = x.virtualColumnIndex
tbl := x.Table()
if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
cachedTable := tbl.(table.CachedTable)
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
// Determine whether the cache can be used.
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
sessionVars.StmtCtx.ReadFromTableCache = true
x.dummy = true
us.cacheTable = cacheData
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
}
case *IndexReaderExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -1103,6 +1126,24 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table

tbl := x.Table()
if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
cachedTable := tbl.(table.CachedTable)
// Determine whether the cache can be used.
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
sessionVars.StmtCtx.ReadFromTableCache = true
x.dummy = true
us.cacheTable = cacheData
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
}
case *IndexLookUpExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -1117,6 +1158,24 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns)

tbl := x.Table()
if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
cachedTable := tbl.(table.CachedTable)
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
// Determine whether the cache can be used.
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
sessionVars.StmtCtx.ReadFromTableCache = true
x.dummy = true
us.cacheTable = cacheData
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
}
case *IndexMergeReaderExecutor:
// IndexMergeReader doesn't care order for now. So we will not set desc and useIndex.
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
Expand Down Expand Up @@ -3138,6 +3197,10 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
return nil
}

if ret.table.Meta().TempTableType != model.TempTableNone {
ret.dummy = true
}

ret.ranges = ts.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
Expand Down Expand Up @@ -3365,6 +3428,10 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) E
return nil
}

if ret.table.Meta().TempTableType != model.TempTableNone {
ret.dummy = true
}

ret.ranges = is.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O)
Expand Down Expand Up @@ -3530,6 +3597,10 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
return nil
}

if ret.table.Meta().TempTableType != model.TempTableNone {
ret.dummy = true
}

ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)

ret.ranges = is.Ranges
Expand Down Expand Up @@ -4782,25 +4853,15 @@ func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64
b.err = errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(b.ctx.GetSessionVars().CurrentDB, tblInfo.Name))
return nil
}
cacheData := tbl.(table.CachedTable).TryReadFromCache(startTS)
sessVars := b.ctx.GetSessionVars()
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
cacheData := tbl.(table.CachedTable).TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
b.ctx.GetSessionVars().StmtCtx.ReadFromTableCache = true
sessVars.StmtCtx.ReadFromTableCache = true
return cacheData
}
if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt {
go func() {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("panic in the recoverable goroutine",
zap.Reflect("r", r),
zap.Stack("stack trace"))
}
}()
err := tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS)
if err != nil {
log.Warn("Update Lock Info Error")
}
}()
tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS, leaseDuration)
}
return nil
}
30 changes: 24 additions & 6 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,20 @@ type IndexReaderExecutor struct {
memTracker *memory.Tracker

selectResultHook // for testing

// If dummy flag is set, this is not a real IndexReader, it just provides the KV ranges for UnionScan.
// Used by the temporary table, cached table.
dummy bool
}

// Table implements the dataSourceExecutor interface.
func (e *IndexReaderExecutor) Table() table.Table {
return e.table
}

// Close clears all resources hold by current object.
func (e *IndexReaderExecutor) Close() (err error) {
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
if e.dummy {
return nil
}

Expand All @@ -212,7 +221,7 @@ func (e *IndexReaderExecutor) Close() (err error) {

// Next implements the Executor Next interface.
func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
if e.dummy {
req.Reset()
return nil
}
Expand Down Expand Up @@ -282,7 +291,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
// In a test case IndexReaderExecutor is mocked and e.table is nil.
// Avoid sending distsql request to TIKV.
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
if e.dummy {
return nil
}

Expand Down Expand Up @@ -380,6 +389,10 @@ type IndexLookUpExecutor struct {

// cancelFunc is called when close the executor
cancelFunc context.CancelFunc

// If dummy flag is set, this is not a real IndexLookUpReader, it just provides the KV ranges for UnionScan.
// Used by the temporary table, cached table.
dummy bool
}

type getHandleType int8
Expand All @@ -395,6 +408,11 @@ type checkIndexValue struct {
idxTblCols []*table.Column
}

// Table implements the dataSourceExecutor interface.
func (e *IndexLookUpExecutor) Table() table.Table {
return e.table
}

// Open implements the Executor Open interface.
func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
var err error
Expand All @@ -411,7 +429,7 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
}

// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
if e.table.Meta().TempTableType != model.TempTableNone {
if e.dummy {
return nil
}

Expand Down Expand Up @@ -677,7 +695,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup

// Close implements Exec Close interface.
func (e *IndexLookUpExecutor) Close() error {
if e.table.Meta().TempTableType != model.TempTableNone {
if e.dummy {
return nil
}

Expand Down Expand Up @@ -705,7 +723,7 @@ func (e *IndexLookUpExecutor) Close() error {

// Next implements Exec Next interface.
func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.table.Meta().TempTableType != model.TempTableNone {
if e.dummy {
req.Reset()
return nil
}
Expand Down
Loading

0 comments on commit 9dff1e1

Please sign in to comment.