Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC branch for cached table based on release-5.4 #31476

Closed
wants to merge 8 commits into from
6 changes: 5 additions & 1 deletion ddl/db_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,14 @@ func TestCacheTableSizeLimit(t *testing.T) {
require.NoError(t, err)
}

lastReadFromCache := func(tk *testkit.TestKit) bool {
return tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache
}

cached := false
for i := 0; i < 200; i++ {
tk.MustQuery("select count(*) from (select * from cache_t2 limit 1) t1").Check(testkit.Rows("1"))
if tk.HasPlan("select * from cache_t2", "UnionScan") {
if lastReadFromCache(tk) {
cached = true
break
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/placement_policy_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *testDDLSuite) TestPlacementPolicyInUse(c *C) {
t4.State = model.StatePublic
db1.Tables = append(db1.Tables, t4)

builder, err := infoschema.NewBuilder(store, nil, nil).InitWithDBInfos(
builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(
[]*model.DBInfo{db1, db2, dbP},
nil,
[]*model.PolicyInfo{p1, p2, p3, p4, p5},
Expand Down
25 changes: 3 additions & 22 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ type Domain struct {
serverID uint64
serverIDSession *concurrency.Session
isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false.
renewLeaseCh chan func() // It is used to call the renewLease function of the cache table.
onClose func()
sysExecutorFactory func(*Domain) (pools.Resource, error)
}
Expand Down Expand Up @@ -162,7 +161,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
return nil, false, currentSchemaVersion, nil, err
}

newISBuilder, err := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion)
newISBuilder, err := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithDBInfos(schemas, bundles, policies, neededSchemaVersion)
if err != nil {
return nil, false, currentSchemaVersion, nil, err
}
Expand Down Expand Up @@ -284,7 +283,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
}
diffs = append(diffs, diff)
}
builder := infoschema.NewBuilder(do.Store(), do.renewLeaseCh, do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest())
builder := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest())
phyTblIDs := make([]int64, 0, len(diffs))
actions := make([]uint64, 0, len(diffs))
for _, diff := range diffs {
Expand Down Expand Up @@ -729,7 +728,6 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
indexUsageSyncLease: idxUsageSyncLease,
planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease},
onClose: onClose,
renewLeaseCh: make(chan func(), 10),
expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp),
}

Expand Down Expand Up @@ -856,10 +854,9 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain)
// Local store needs to get the change information for every DDL state in each session.
go do.loadSchemaInLoop(ctx, ddlLease)
}
do.wg.Add(4)
do.wg.Add(3)
go do.topNSlowQueryLoop()
go do.infoSyncerKeeper()
go do.renewLease()
go do.globalConfigSyncerKeeper()
if !skipRegisterToDashboard {
do.wg.Add(1)
Expand Down Expand Up @@ -1777,22 +1774,6 @@ func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) {
do.infoCache.Insert(is, 0)
}

func (do *Domain) renewLease() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("renew lease goroutine exited.")
}()
for {
select {
case <-do.exit:
close(do.renewLeaseCh)
return
case op := <-do.renewLeaseCh:
op()
}
}
}

func init() {
initByLDFlagsForGlobalKill()
}
Expand Down
7 changes: 7 additions & 0 deletions domain/sysvar_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ func (do *Domain) checkEnableServerGlobalVar(name, sVal string) {
break
}
storekv.StoreLimit.Store(val)
case variable.TiDBTableCacheLease:
var val int64
val, err = strconv.ParseInt(sVal, 10, 64)
if err != nil {
break
}
variable.TableCacheLease.Store(val)
case variable.TiDBPersistAnalyzeOptions:
variable.PersistAnalyzeOptions.Store(variable.TiDBOptOn(sVal))
case variable.TiDBEnableColumnTracking:
Expand Down
4 changes: 4 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,10 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
sessVars.DurationParse = 0
// Clean the stale read flag when statement execution finish
sessVars.StmtCtx.IsStaleness = false

if sessVars.StmtCtx.ReadFromTableCache {
metrics.ReadFromTableCacheCounter.Inc()
}
}

// CloseRecordSet will finish the execution of current statement and do some record work
Expand Down
95 changes: 78 additions & 17 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/aggfuncs"
Expand All @@ -46,6 +45,7 @@ import (
plannerutil "github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -1067,7 +1067,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
return x
}
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), reader)}
us.cacheTable = v.CacheTable
// Get the handle column index of the below Plan.
us.belowHandleCols = v.HandleCols
us.mutableRow = chunk.MutRowFromTypes(retTypes(us))
Expand All @@ -1083,13 +1082,37 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.collators = append(us.collators, collate.GetCollator(tp.Collate))
}

startTS, err := b.getSnapshotTS()
sessionVars := b.ctx.GetSessionVars()
if err != nil {
b.err = err
return nil
}

switch x := reader.(type) {
case *TableReaderExecutor:
us.desc = x.desc
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = x.virtualColumnIndex
tbl := x.Table()
if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
cachedTable := tbl.(table.CachedTable)
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
// Determine whether the cache can be used.
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
sessionVars.StmtCtx.ReadFromTableCache = true
x.dummy = true
us.cacheTable = cacheData
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
}
case *IndexReaderExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -1103,6 +1126,24 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table

tbl := x.Table()
if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
cachedTable := tbl.(table.CachedTable)
// Determine whether the cache can be used.
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
sessionVars.StmtCtx.ReadFromTableCache = true
x.dummy = true
us.cacheTable = cacheData
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
}
case *IndexLookUpExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -1117,6 +1158,24 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns)

tbl := x.Table()
if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
cachedTable := tbl.(table.CachedTable)
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
// Determine whether the cache can be used.
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
sessionVars.StmtCtx.ReadFromTableCache = true
x.dummy = true
us.cacheTable = cacheData
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
}
case *IndexMergeReaderExecutor:
// IndexMergeReader doesn't care order for now. So we will not set desc and useIndex.
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
Expand Down Expand Up @@ -3138,6 +3197,10 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
return nil
}

if ret.table.Meta().TempTableType != model.TempTableNone {
ret.dummy = true
}

ret.ranges = ts.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
Expand Down Expand Up @@ -3365,6 +3428,10 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) E
return nil
}

if ret.table.Meta().TempTableType != model.TempTableNone {
ret.dummy = true
}

ret.ranges = is.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O)
Expand Down Expand Up @@ -3530,6 +3597,10 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
return nil
}

if ret.table.Meta().TempTableType != model.TempTableNone {
ret.dummy = true
}

ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)

ret.ranges = is.Ranges
Expand Down Expand Up @@ -4785,25 +4856,15 @@ func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64
b.err = errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(b.ctx.GetSessionVars().CurrentDB, tblInfo.Name))
return nil
}
cacheData := tbl.(table.CachedTable).TryReadFromCache(startTS)
sessVars := b.ctx.GetSessionVars()
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
cacheData := tbl.(table.CachedTable).TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
b.ctx.GetSessionVars().StmtCtx.ReadFromTableCache = true
sessVars.StmtCtx.ReadFromTableCache = true
return cacheData
}
if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt {
go func() {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("panic in the recoverable goroutine",
zap.Reflect("r", r),
zap.Stack("stack trace"))
}
}()
err := tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS)
if err != nil {
log.Warn("Update Lock Info Error")
}
}()
tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS, leaseDuration)
}
return nil
}
30 changes: 24 additions & 6 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,20 @@ type IndexReaderExecutor struct {
memTracker *memory.Tracker

selectResultHook // for testing

// If dummy flag is set, this is not a real IndexReader, it just provides the KV ranges for UnionScan.
// Used by the temporary table, cached table.
dummy bool
}

// Table implements the dataSourceExecutor interface.
func (e *IndexReaderExecutor) Table() table.Table {
return e.table
}

// Close clears all resources hold by current object.
func (e *IndexReaderExecutor) Close() (err error) {
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
if e.dummy {
return nil
}

Expand All @@ -212,7 +221,7 @@ func (e *IndexReaderExecutor) Close() (err error) {

// Next implements the Executor Next interface.
func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
if e.dummy {
req.Reset()
return nil
}
Expand Down Expand Up @@ -282,7 +291,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
// In a test case IndexReaderExecutor is mocked and e.table is nil.
// Avoid sending distsql request to TIKV.
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
if e.dummy {
return nil
}

Expand Down Expand Up @@ -383,6 +392,10 @@ type IndexLookUpExecutor struct {

// cancelFunc is called when close the executor
cancelFunc context.CancelFunc

// If dummy flag is set, this is not a real IndexLookUpReader, it just provides the KV ranges for UnionScan.
// Used by the temporary table, cached table.
dummy bool
}

type getHandleType int8
Expand All @@ -398,6 +411,11 @@ type checkIndexValue struct {
idxTblCols []*table.Column
}

// Table implements the dataSourceExecutor interface.
func (e *IndexLookUpExecutor) Table() table.Table {
return e.table
}

// Open implements the Executor Open interface.
func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
var err error
Expand All @@ -414,7 +432,7 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
}

// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
if e.table.Meta().TempTableType != model.TempTableNone {
if e.dummy {
return nil
}

Expand Down Expand Up @@ -683,7 +701,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup

// Close implements Exec Close interface.
func (e *IndexLookUpExecutor) Close() error {
if e.table.Meta().TempTableType != model.TempTableNone {
if e.dummy {
return nil
}

Expand Down Expand Up @@ -711,7 +729,7 @@ func (e *IndexLookUpExecutor) Close() error {

// Next implements Exec Next interface.
func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.table.Meta().TempTableType != model.TempTableNone {
if e.dummy {
req.Reset()
return nil
}
Expand Down
Loading