Skip to content

Commit

Permalink
planner,executor: fix cached table query with filter condition (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Mar 1, 2022
1 parent cba396e commit fd8fd82
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 67 deletions.
119 changes: 63 additions & 56 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (b *MockExecutorBuilder) Build(p plannercore.Plan) Executor {
}

func (b *executorBuilder) build(p plannercore.Plan) Executor {
var e Executor
switch v := p.(type) {
case nil:
return nil
Expand Down Expand Up @@ -258,13 +257,13 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
case *plannercore.Analyze:
return b.buildAnalyze(v)
case *plannercore.PhysicalTableReader:
e = b.buildTableReader(v)
return b.buildTableReader(v)
case *plannercore.PhysicalTableSample:
return b.buildTableSample(v)
case *plannercore.PhysicalIndexReader:
e = b.buildIndexReader(v)
return b.buildIndexReader(v)
case *plannercore.PhysicalIndexLookUpReader:
e = b.buildIndexLookUpReader(v)
return b.buildIndexLookUpReader(v)
case *plannercore.PhysicalWindow:
return b.buildWindow(v)
case *plannercore.PhysicalShuffle:
Expand Down Expand Up @@ -295,57 +294,6 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
b.err = ErrUnknownPlan.GenWithStack("Unknown Plan %T", p)
return nil
}

if tblExec, ok := e.(dataSourceExecutor); ok {
tbl := tblExec.Table()
tableInfo := tbl.Meta()
// When reading from a cached table, check whether it satisfies the conditions of read cache.
if tableInfo.TableCacheStatusType == model.TableCacheStatusEnable {
physicalPlan := p.(plannercore.PhysicalPlan)
return b.buildCachedTableExecutor(tbl, physicalPlan, e)
}
}

return e
}

// buildCachedTableExecutor adds an UnionScan to the original Executor to make the reader read from table cache.
func (b *executorBuilder) buildCachedTableExecutor(tbl table.Table, p plannercore.PhysicalPlan, e Executor) Executor {
if b.ctx.GetSessionVars().SnapshotTS != 0 || b.ctx.GetSessionVars().StmtCtx.IsStaleness {
return e
}

cachedTable := tbl.(table.CachedTable)
startTS, err := b.getSnapshotTS()
if err != nil {
b.err = errors.Trace(err)
return nil
}

leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
sessionVars := b.ctx.GetSessionVars()
// Use the TS of the transaction to determine whether the cache can be used.
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
sessionVars.StmtCtx.ReadFromTableCache = true
switch raw := e.(type) {
case *TableReaderExecutor:
raw.dummy = true
case *IndexReaderExecutor:
raw.dummy = true
case *IndexLookUpExecutor:
raw.dummy = true
}
us := plannercore.PhysicalUnionScan{CacheTable: cacheData}.Init(b.ctx, nil, -1)
us.SetChildren(p)
e = b.buildUnionScanFromReader(e, us)
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
return e
}

func (b *executorBuilder) buildCancelDDLJobs(v *plannercore.CancelDDLJobs) Executor {
Expand Down Expand Up @@ -1119,7 +1067,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
return x
}
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), reader)}
us.cacheTable = v.CacheTable
// Get the handle column index of the below Plan.
us.belowHandleCols = v.HandleCols
us.mutableRow = chunk.MutRowFromTypes(retTypes(us))
Expand All @@ -1135,13 +1082,37 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.collators = append(us.collators, collate.GetCollator(tp.Collate))
}

startTS, err := b.getSnapshotTS()
sessionVars := b.ctx.GetSessionVars()
if err != nil {
b.err = err
return nil
}

switch x := reader.(type) {
case *TableReaderExecutor:
us.desc = x.desc
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = x.virtualColumnIndex
tbl := x.Table()
if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
cachedTable := tbl.(table.CachedTable)
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
// Determine whether the cache can be used.
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
sessionVars.StmtCtx.ReadFromTableCache = true
x.dummy = true
us.cacheTable = cacheData
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
}
case *IndexReaderExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -1155,6 +1126,24 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table

tbl := x.Table()
if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
cachedTable := tbl.(table.CachedTable)
// Determine whether the cache can be used.
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
sessionVars.StmtCtx.ReadFromTableCache = true
x.dummy = true
us.cacheTable = cacheData
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
}
case *IndexLookUpExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -1169,6 +1158,24 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns)

tbl := x.Table()
if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
cachedTable := tbl.(table.CachedTable)
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
// Determine whether the cache can be used.
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
sessionVars.StmtCtx.ReadFromTableCache = true
x.dummy = true
us.cacheTable = cacheData
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
}
case *IndexMergeReaderExecutor:
// IndexMergeReader doesn't care order for now. So we will not set desc and useIndex.
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
Expand Down
63 changes: 63 additions & 0 deletions executor/union_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor_test
import (
"fmt"
"testing"
"time"

"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -416,3 +417,65 @@ func TestForApplyAndUnionScan(t *testing.T) {
tk.MustQuery("select c_int, c_str from t where (select count(*) from t1 where t1.c_int in (t.c_int, t.c_int + 2, t.c_int + 10)) > 2").Check(testkit.Rows())
tk.MustExec("rollback")
}

func TestIssue32422(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")

tk.MustExec("create table t (id int, c int, index(id));")
tk.MustExec("insert into t values (3,3), (4,4), (5,5);")
tk.MustExec("alter table t cache;")

var cacheUsed bool
for i := 0; i < 20; i++ {
tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))
if tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache {
cacheUsed = true
break
}
time.Sleep(50 * time.Millisecond)
}
require.True(t, cacheUsed)

tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))

// Some extra tests.
// Since cached table use UnionScanExec utilities, check what happens when they work together.
// In these cases, the cache data serve as the snapshot, tikv is skipped, and txn membuffer works the same way.
tk.MustExec("begin")
tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))
tk.MustExec("insert into t values (6, 6)")
// Check for the new added data.
tk.HasPlan("select id+1, c from t where c = 6;", "UnionScan")
tk.MustQuery("select id+1, c from t where c = 6;").Check(testkit.Rows("7 6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
// Check for the old data.
tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

// Point get
tk.HasPlan("select id+1, c from t where id = 6", "PointGet")
tk.MustQuery("select id+1, c from t where id = 6").Check(testkit.Rows("7 6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
tk.MustQuery("select id+1, c from t where id = 4").Check(testkit.Rows("5 4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

// Index Lookup
tk.HasPlan("select id+1, c from t where id = 6", "IndexLookUp")
tk.MustQuery("select id+1, c from t use index(id) where id = 6").Check(testkit.Rows("7 6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
tk.MustQuery("select id+1, c from t use index(id) where id = 4").Check(testkit.Rows("5 4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

// Index Reader
tk.HasPlan("select id from t where id = 6", "IndexReader")
tk.MustQuery("select id from t use index(id) where id = 6").Check(testkit.Rows("6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
tk.MustQuery("select id from t use index(id) where id = 4").Check(testkit.Rows("4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

tk.MustExec("rollback")
}
1 change: 0 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func (p *LogicalUnionScan) exhaustPhysicalPlans(prop *property.PhysicalProperty)
us := PhysicalUnionScan{
Conditions: p.conditions,
HandleCols: p.handleCols,
CacheTable: p.cacheTable,
}.Init(p.ctx, p.stats, p.blockOffset, childProp)
return []PhysicalPlan{us}, true, nil
}
Expand Down
7 changes: 4 additions & 3 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5215,9 +5215,10 @@ func (s *testIntegrationSuite) TestAggPushToCopForCachedTable(c *C) {

tk.MustQuery("explain format = 'brief' select /*+AGG_TO_COP()*/ count(*) from t32157 ignore index(primary) where process_code = 'GDEP0071'").Check(testkit.Rows(
"StreamAgg 1.00 root funcs:count(1)->Column#8]\n" +
"[└─TableReader 10.00 root data:Selection]\n" +
"[ └─Selection 10.00 cop[tikv] eq(test.t32157.process_code, \"GDEP0071\")]\n" +
"[ └─TableFullScan 10000.00 cop[tikv] table:t32157 keep order:false, stats:pseudo"))
"[└─UnionScan 10.00 root eq(test.t32157.process_code, \"GDEP0071\")]\n" +
"[ └─TableReader 10.00 root data:Selection]\n" +
"[ └─Selection 10.00 cop[tikv] eq(test.t32157.process_code, \"GDEP0071\")]\n" +
"[ └─TableFullScan 10000.00 cop[tikv] table:t32157 keep order:false, stats:pseudo"))

var readFromCacheNoPanic bool
for i := 0; i < 10; i++ {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4178,7 +4178,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as

var result LogicalPlan = ds
dirty := tableHasDirtyContent(b.ctx, tableInfo)
if dirty || tableInfo.TempTableType == model.TempTableLocal {
if dirty || tableInfo.TempTableType == model.TempTableLocal || tableInfo.TableCacheStatusType == model.TableCacheStatusEnable {
us := LogicalUnionScan{handleCols: handleCols}.Init(b.ctx, b.getSelectOffset())
us.SetChildren(ds)
result = us
Expand Down
4 changes: 0 additions & 4 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -527,9 +526,6 @@ type LogicalUnionScan struct {
conditions []expression.Expression

handleCols HandleCols

// cacheTable not nil means it's reading from cached table.
cacheTable kv.MemBuffer
}

// DataSource represents a tableScan without condition push down.
Expand Down
2 changes: 0 additions & 2 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,8 +1186,6 @@ type PhysicalUnionScan struct {
Conditions []expression.Expression

HandleCols HandleCols

CacheTable kv.MemBuffer
}

// ExtractCorrelatedCols implements PhysicalPlan interface.
Expand Down

0 comments on commit fd8fd82

Please sign in to comment.