Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, executor: add stale read compatibility for temporary table #25206

Merged
merged 10 commits into from
Jun 7, 2021
4 changes: 4 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ type ExecStmt struct {
// SnapshotTS stores the timestamp for stale read.
// It is not equivalent to session variables's snapshot ts, it only use to build the executor.
SnapshotTS uint64
// ExplicitStaleness means whether the 'SELECT' clause are using 'AS OF TIMESTAMP' to perform stale read explicitly.
ExplicitStaleness bool
// InfoSchema stores a reference to the schema information.
InfoSchema infoschema.InfoSchema
// Plan stores a reference to the final physical plan.
Expand Down Expand Up @@ -288,6 +290,7 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
}
a.InfoSchema = ret.InfoSchema
a.SnapshotTS = ret.SnapshotTS
a.ExplicitStaleness = ret.ExplicitStaleness
p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, a.InfoSchema)
if err != nil {
return 0, err
Expand Down Expand Up @@ -791,6 +794,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {

b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti)
b.snapshotTS = a.SnapshotTS
b.explicitStaleness = a.ExplicitStaleness
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
Expand Down
47 changes: 44 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type executorBuilder struct {
err error // err is set when there is error happened during Executor building process.
hasLock bool
Ti *TelemetryInfo
// ExplicitStaleness means whether the 'SELECT' clause are using 'AS OF TIMESTAMP' to perform stale read explicitly.
explicitStaleness bool
}

// CTEStorages stores resTbl and iterInTbl for CTEExec.
Expand Down Expand Up @@ -1371,6 +1373,11 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu
return e
}

// IsStaleness returns if the query is staleness
func (b *executorBuilder) IsStaleness() bool {
return b.ctx.GetSessionVars().TxnCtx.IsStaleness || b.explicitStaleness
}

// `getSnapshotTS` returns the timestamp of the snapshot that a reader should read.
func (b *executorBuilder) getSnapshotTS() (uint64, error) {
// `refreshForUpdateTSForRC` should always be invoked before returning the cached value to
Expand Down Expand Up @@ -2642,6 +2649,10 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return nil, err
}
ts := v.GetTableScan()
if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
return nil, errors.New("can not stale read temporary table")
}

tbl, _ := b.is.TableByID(ts.Table.ID)
isPartition, physicalTableID := ts.IsPartition()
if isPartition {
Expand Down Expand Up @@ -2736,6 +2747,11 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
}

ts := v.GetTableScan()
if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

ret.ranges = ts.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
Expand Down Expand Up @@ -2944,13 +2960,18 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
}

func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) Executor {
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
if is.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

ret, err := buildNoRangeIndexReader(b, v)
if err != nil {
b.err = err
return nil
}

is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
ret.ranges = is.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O)
Expand Down Expand Up @@ -3091,13 +3112,18 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
}

func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor {
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
if is.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

ret, err := buildNoRangeIndexLookUpReader(b, v)
if err != nil {
b.err = err
return nil
}

is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)

ret.ranges = is.Ranges
Expand Down Expand Up @@ -3197,6 +3223,12 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
}

func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor {
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

ret, err := buildNoRangeIndexMergeReader(b, v)
if err != nil {
b.err = err
Expand All @@ -3216,7 +3248,6 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg
}
}
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
executorCounterIndexMergeReaderExecutor.Inc()

Expand Down Expand Up @@ -3972,6 +4003,11 @@ func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model
}

func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor {
if plan.TblInfo.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

startTS, err := b.getSnapshotTS()
if err != nil {
b.err = err
Expand Down Expand Up @@ -4104,6 +4140,11 @@ func fullRangePartition(idxArr []int) bool {
}

func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *TableSampleExecutor {
if v.TableInfo.Meta().TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

startTS, err := b.getSnapshotTS()
if err != nil {
b.err = err
Expand Down
21 changes: 11 additions & 10 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,17 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
lowerPriority = needLowerPriority(finalPlan)
}
return &ExecStmt{
GoCtx: ctx,
SnapshotTS: ret.SnapshotTS,
InfoSchema: ret.InfoSchema,
Plan: finalPlan,
LowerPriority: lowerPriority,
Text: stmtNode.Text(),
StmtNode: stmtNode,
Ctx: c.Ctx,
OutputNames: names,
Ti: &TelemetryInfo{},
GoCtx: ctx,
SnapshotTS: ret.SnapshotTS,
ExplicitStaleness: ret.ExplicitStaleness,
InfoSchema: ret.InfoSchema,
Plan: finalPlan,
LowerPriority: lowerPriority,
Text: stmtNode.Text(),
StmtNode: stmtNode,
Ctx: c.Ctx,
OutputNames: names,
Ti: &TelemetryInfo{},
}, nil
}

Expand Down
5 changes: 5 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ import (
)

func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
if p.TblInfo.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
return nil
}

startTS, err := b.getSnapshotTS()
if err != nil {
b.err = err
Expand Down
90 changes: 90 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor_test

import (
"fmt"
"strings"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -772,3 +773,92 @@ func (s *testStaleTxnSuite) TestSetTransactionInfoSchema(c *C) {
tk.MustExec("commit")
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer3)
}

func (s *testStaleTxnSuite) TestStaleReadTemporaryTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
safePointName := "tikv_gc_safe_point"
safePointValue := "20160102-15:04:05 -0700"
safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)

tk.MustExec("set @@tidb_enable_global_temporary_table=1")
tk.MustExec("use test")
tk.MustExec("drop table if exists tmp1")
tk.MustExec("create global temporary table tmp1 " +
"(id int not null primary key, code int not null, value int default null, unique key code(code))" +
"on commit delete rows")
time.Sleep(time.Second)
tk.MustGetErrMsg("select * from tmp1 as of timestamp NOW() where id=1", "can not stale read temporary table")

queries := []struct {
sql string
}{
{
sql: "select * from tmp1 where id=1",
},
{
sql: "select * from tmp1 where code=1",
},
{
sql: "select * from tmp1 where id in (1, 2, 3)",
},
{
sql: "select * from tmp1 where code in (1, 2, 3)",
},
{
sql: "select * from tmp1 where id > 1",
},
{
sql: "select /*+use_index(tmp1, code)*/ * from tmp1 where code > 1",
},
{
sql: "select /*+use_index(tmp1, code)*/ code from tmp1 where code > 1",
},
{
sql: "select * from tmp1 tablesample regions()",
},
{
sql: "select /*+ use_index_merge(tmp1, primary, code) */ * from tmp1 where id > 1 or code > 2",
},
}

addStaleReadToSQL := func(sql string) string {
idx := strings.Index(sql, " where ")
if idx < 0 {
return ""
}
return sql[0:idx] + " as of timestamp NOW()" + sql[idx:]
}

for _, query := range queries {
sql := addStaleReadToSQL(query.sql)
if sql != "" {
tk.MustGetErrMsg(sql, "can not stale read temporary table")
}
}

tk.MustExec("start transaction read only as of timestamp NOW()")
for _, query := range queries {
tk.MustGetErrMsg(query.sql, "can not stale read temporary table")
}
tk.MustExec("commit")

for _, query := range queries {
tk.MustExec(query.sql)
}

tk.MustExec("set transaction read only as of timestamp NOW()")
tk.MustExec("start transaction")
for _, query := range queries {
tk.MustGetErrMsg(query.sql, "can not stale read temporary table")
}
tk.MustExec("commit")

for _, query := range queries {
tk.MustExec(query.sql)
}
}
6 changes: 4 additions & 2 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ const (

// PreprocessorReturn is used to retain information obtained in the preprocessor.
type PreprocessorReturn struct {
SnapshotTS uint64
InfoSchema infoschema.InfoSchema
SnapshotTS uint64
ExplicitStaleness bool
InfoSchema infoschema.InfoSchema
}

// preprocessor is an ast.Visitor that preprocess
Expand Down Expand Up @@ -1420,6 +1421,7 @@ func (p *preprocessor) handleAsOf(node *ast.AsOfClause) {
return
}
p.SnapshotTS = ts
p.ExplicitStaleness = true
p.InfoSchema = is
}
if p.SnapshotTS != ts {
Expand Down