Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner,executor: fix cached table query with filter condition #32590

Merged
merged 9 commits into from
Mar 9, 2022
94 changes: 38 additions & 56 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ func (b *MockExecutorBuilder) Build(p plannercore.Plan) Executor {
}

func (b *executorBuilder) build(p plannercore.Plan) Executor {
var e Executor
switch v := p.(type) {
case nil:
return nil
Expand Down Expand Up @@ -267,13 +266,13 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
case *plannercore.Analyze:
return b.buildAnalyze(v)
case *plannercore.PhysicalTableReader:
e = b.buildTableReader(v)
return b.buildTableReader(v)
case *plannercore.PhysicalTableSample:
return b.buildTableSample(v)
case *plannercore.PhysicalIndexReader:
e = b.buildIndexReader(v)
return b.buildIndexReader(v)
case *plannercore.PhysicalIndexLookUpReader:
e = b.buildIndexLookUpReader(v)
return b.buildIndexLookUpReader(v)
case *plannercore.PhysicalWindow:
return b.buildWindow(v)
case *plannercore.PhysicalShuffle:
Expand Down Expand Up @@ -304,57 +303,6 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
b.err = ErrUnknownPlan.GenWithStack("Unknown Plan %T", p)
return nil
}

if tblExec, ok := e.(dataSourceExecutor); ok {
tbl := tblExec.Table()
tableInfo := tbl.Meta()
// When reading from a cached table, check whether it satisfies the conditions of read cache.
if tableInfo.TableCacheStatusType == model.TableCacheStatusEnable {
physicalPlan := p.(plannercore.PhysicalPlan)
return b.buildCachedTableExecutor(tbl, physicalPlan, e)
}
}

return e
}

// buildCachedTableExecutor adds an UnionScan to the original Executor to make the reader read from table cache.
func (b *executorBuilder) buildCachedTableExecutor(tbl table.Table, p plannercore.PhysicalPlan, e Executor) Executor {
if b.ctx.GetSessionVars().SnapshotTS != 0 || b.ctx.GetSessionVars().StmtCtx.IsStaleness {
return e
}

cachedTable := tbl.(table.CachedTable)
startTS, err := b.getSnapshotTS()
if err != nil {
b.err = errors.Trace(err)
return nil
}

leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
sessionVars := b.ctx.GetSessionVars()
// Use the TS of the transaction to determine whether the cache can be used.
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
sessionVars.StmtCtx.ReadFromTableCache = true
switch raw := e.(type) {
case *TableReaderExecutor:
raw.dummy = true
case *IndexReaderExecutor:
raw.dummy = true
case *IndexLookUpExecutor:
raw.dummy = true
}
us := plannercore.PhysicalUnionScan{CacheTable: cacheData}.Init(b.ctx, nil, -1)
us.SetChildren(p)
e = b.buildUnionScanFromReader(e, us)
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
return e
}

func (b *executorBuilder) buildCancelDDLJobs(v *plannercore.CancelDDLJobs) Executor {
Expand Down Expand Up @@ -1118,7 +1066,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
return x
}
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), reader)}
us.cacheTable = v.CacheTable
// Get the handle column index of the below Plan.
us.belowHandleCols = v.HandleCols
us.mutableRow = chunk.MutRowFromTypes(retTypes(us))
Expand All @@ -1134,13 +1081,21 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.collators = append(us.collators, collate.GetCollator(tp.Collate))
}

startTS, err := b.getSnapshotTS()
sessionVars := b.ctx.GetSessionVars()
if err != nil {
b.err = err
return nil
}

switch x := reader.(type) {
case *TableReaderExecutor:
us.desc = x.desc
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = x.virtualColumnIndex
us.handleCachedTable(b, x, sessionVars, startTS)
case *IndexReaderExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -1154,6 +1109,7 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
us.handleCachedTable(b, x, sessionVars, startTS)
case *IndexLookUpExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -1168,6 +1124,7 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns)
us.handleCachedTable(b, x, sessionVars, startTS)
case *IndexMergeReaderExecutor:
// IndexMergeReader doesn't care order for now. So we will not set desc and useIndex.
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
Expand All @@ -1181,6 +1138,31 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
return us
}

type bypassDataSourceExecutor interface {
dataSourceExecutor
setDummy()
}

func (us *UnionScanExec) handleCachedTable(b *executorBuilder, x bypassDataSourceExecutor, vars *variable.SessionVars, startTS uint64) {
tbl := x.Table()
if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
cachedTable := tbl.(table.CachedTable)
// Determine whether the cache can be used.
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
vars.StmtCtx.ReadFromTableCache = true
x.setDummy()
us.cacheTable = cacheData
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !vars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
}
}

// buildMergeJoin builds MergeJoinExec executor.
func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Executor {
leftExec := b.build(v.Children()[0])
Expand Down
8 changes: 8 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ func (e *IndexReaderExecutor) Table() table.Table {
return e.table
}

func (e *IndexReaderExecutor) setDummy() {
e.dummy = true
}

// Close clears all resources hold by current object.
func (e *IndexReaderExecutor) Close() (err error) {
if e.dummy {
Expand Down Expand Up @@ -412,6 +416,10 @@ func (e *IndexLookUpExecutor) Table() table.Table {
return e.table
}

func (e *IndexLookUpExecutor) setDummy() {
e.dummy = true
}

// Open implements the Executor Open interface.
func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
var err error
Expand Down
4 changes: 4 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ func (e *TableReaderExecutor) Table() table.Table {
return e.table
}

func (e *TableReaderExecutor) setDummy() {
e.dummy = true
}

// Open initializes necessary variables for using this executor.
func (e *TableReaderExecutor) Open(ctx context.Context) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand Down
63 changes: 63 additions & 0 deletions executor/union_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor_test
import (
"fmt"
"testing"
"time"

"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -457,6 +458,68 @@ func TestIssue28073(t *testing.T) {
require.False(t, exist)
}

func TestIssue32422(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")

tk.MustExec("create table t (id int, c int, index(id));")
tk.MustExec("insert into t values (3,3), (4,4), (5,5);")
tk.MustExec("alter table t cache;")

var cacheUsed bool
for i := 0; i < 20; i++ {
tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))
if tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache {
cacheUsed = true
break
}
time.Sleep(50 * time.Millisecond)
}
require.True(t, cacheUsed)

tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))

// Some extra tests.
// Since cached table use UnionScanExec utilities, check what happens when they work together.
// In these cases, the cache data serve as the snapshot, tikv is skipped, and txn membuffer works the same way.
tk.MustExec("begin")
tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))
tk.MustExec("insert into t values (6, 6)")
// Check for the new added data.
tk.HasPlan("select id+1, c from t where c = 6;", "UnionScan")
tk.MustQuery("select id+1, c from t where c = 6;").Check(testkit.Rows("7 6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
// Check for the old data.
tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

// Point get
tk.HasPlan("select id+1, c from t where id = 6", "PointGet")
tk.MustQuery("select id+1, c from t where id = 6").Check(testkit.Rows("7 6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
tk.MustQuery("select id+1, c from t where id = 4").Check(testkit.Rows("5 4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

// Index Lookup
tk.HasPlan("select id+1, c from t where id = 6", "IndexLookUp")
tk.MustQuery("select id+1, c from t use index(id) where id = 6").Check(testkit.Rows("7 6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
tk.MustQuery("select id+1, c from t use index(id) where id = 4").Check(testkit.Rows("5 4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

// Index Reader
tk.HasPlan("select id from t where id = 6", "IndexReader")
tk.MustQuery("select id from t use index(id) where id = 6").Check(testkit.Rows("6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
tk.MustQuery("select id from t use index(id) where id = 4").Check(testkit.Rows("4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

tk.MustExec("rollback")
}

func BenchmarkUnionScanRead(b *testing.B) {
store, clean := testkit.CreateMockStore(b)
defer clean()
Expand Down
1 change: 0 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func (p *LogicalUnionScan) exhaustPhysicalPlans(prop *property.PhysicalProperty)
us := PhysicalUnionScan{
Conditions: p.conditions,
HandleCols: p.handleCols,
CacheTable: p.cacheTable,
}.Init(p.ctx, p.stats, p.blockOffset, childProp)
return []PhysicalPlan{us}, true, nil
}
Expand Down
7 changes: 4 additions & 3 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6098,9 +6098,10 @@ func TestAggPushToCopForCachedTable(t *testing.T) {

tk.MustQuery("explain format = 'brief' select /*+AGG_TO_COP()*/ count(*) from t32157 ignore index(primary) where process_code = 'GDEP0071'").Check(testkit.Rows(
"StreamAgg 1.00 root funcs:count(1)->Column#8]\n" +
"[└─TableReader 10.00 root data:Selection]\n" +
"[ └─Selection 10.00 cop[tikv] eq(test.t32157.process_code, \"GDEP0071\")]\n" +
"[ └─TableFullScan 10000.00 cop[tikv] table:t32157 keep order:false, stats:pseudo"))
"[└─UnionScan 10.00 root eq(test.t32157.process_code, \"GDEP0071\")]\n" +
"[ └─TableReader 10.00 root data:Selection]\n" +
"[ └─Selection 10.00 cop[tikv] eq(test.t32157.process_code, \"GDEP0071\")]\n" +
"[ └─TableFullScan 10000.00 cop[tikv] table:t32157 keep order:false, stats:pseudo"))

var readFromCacheNoPanic bool
for i := 0; i < 10; i++ {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4233,7 +4233,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as

var result LogicalPlan = ds
dirty := tableHasDirtyContent(b.ctx, tableInfo)
if dirty || tableInfo.TempTableType == model.TempTableLocal {
if dirty || tableInfo.TempTableType == model.TempTableLocal || tableInfo.TableCacheStatusType == model.TableCacheStatusEnable {
us := LogicalUnionScan{handleCols: handleCols}.Init(b.ctx, b.getSelectOffset())
us.SetChildren(ds)
if tableInfo.Partition != nil && b.optFlag&flagPartitionProcessor == 0 {
Expand Down
4 changes: 0 additions & 4 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -533,9 +532,6 @@ type LogicalUnionScan struct {
conditions []expression.Expression

handleCols HandleCols

// cacheTable not nil means it's reading from cached table.
cacheTable kv.MemBuffer
}

// DataSource represents a tableScan without condition push down.
Expand Down
2 changes: 0 additions & 2 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,8 +1185,6 @@ type PhysicalUnionScan struct {
Conditions []expression.Expression

HandleCols HandleCols

CacheTable kv.MemBuffer
}

// ExtractCorrelatedCols implements PhysicalPlan interface.
Expand Down