diff --git a/executor/adapter.go b/executor/adapter.go index 308676dc704a7..b324fdb331a69 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -189,6 +189,8 @@ type ExecStmt struct { // SnapshotTS stores the timestamp for stale read. // It is not equivalent to session variables's snapshot ts, it only use to build the executor. SnapshotTS uint64 + // ExplicitStaleness means whether the 'SELECT' clause are using 'AS OF TIMESTAMP' to perform stale read explicitly. + ExplicitStaleness bool // InfoSchema stores a reference to the schema information. InfoSchema infoschema.InfoSchema // Plan stores a reference to the final physical plan. @@ -288,6 +290,7 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { } a.InfoSchema = ret.InfoSchema a.SnapshotTS = ret.SnapshotTS + a.ExplicitStaleness = ret.ExplicitStaleness p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, a.InfoSchema) if err != nil { return 0, err @@ -791,6 +794,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti) b.snapshotTS = a.SnapshotTS + b.explicitStaleness = a.ExplicitStaleness e := b.build(a.Plan) if b.err != nil { return nil, errors.Trace(b.err) diff --git a/executor/builder.go b/executor/builder.go index 4a1b45965a531..0662b963c57d2 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -83,6 +83,8 @@ type executorBuilder struct { err error // err is set when there is error happened during Executor building process. hasLock bool Ti *TelemetryInfo + // ExplicitStaleness means whether the 'SELECT' clause are using 'AS OF TIMESTAMP' to perform stale read explicitly. + explicitStaleness bool } // CTEStorages stores resTbl and iterInTbl for CTEExec. @@ -1371,6 +1373,11 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu return e } +// IsStaleness returns if the query is staleness +func (b *executorBuilder) IsStaleness() bool { + return b.ctx.GetSessionVars().TxnCtx.IsStaleness || b.explicitStaleness +} + // `getSnapshotTS` returns the timestamp of the snapshot that a reader should read. func (b *executorBuilder) getSnapshotTS() (uint64, error) { // `refreshForUpdateTSForRC` should always be invoked before returning the cached value to @@ -2642,6 +2649,10 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea return nil, err } ts := v.GetTableScan() + if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + return nil, errors.New("can not stale read temporary table") + } + tbl, _ := b.is.TableByID(ts.Table.ID) isPartition, physicalTableID := ts.IsPartition() if isPartition { @@ -2736,6 +2747,11 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E } ts := v.GetTableScan() + if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + ret.ranges = ts.Ranges sctx := b.ctx.GetSessionVars().StmtCtx sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID) @@ -2944,13 +2960,18 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea } func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) Executor { + is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) + if is.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + ret, err := buildNoRangeIndexReader(b, v) if err != nil { b.err = err return nil } - is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) ret.ranges = is.Ranges sctx := b.ctx.GetSessionVars().StmtCtx sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O) @@ -3091,13 +3112,18 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn } func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor { + is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) + if is.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + ret, err := buildNoRangeIndexLookUpReader(b, v) if err != nil { b.err = err return nil } - is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) ret.ranges = is.Ranges @@ -3197,6 +3223,12 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd } func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor { + ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) + if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + ret, err := buildNoRangeIndexMergeReader(b, v) if err != nil { b.err = err @@ -3216,7 +3248,6 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg } } } - ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID) executorCounterIndexMergeReaderExecutor.Inc() @@ -3972,6 +4003,11 @@ func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model } func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor { + if plan.TblInfo.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + startTS, err := b.getSnapshotTS() if err != nil { b.err = err @@ -4104,6 +4140,11 @@ func fullRangePartition(idxArr []int) bool { } func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *TableSampleExecutor { + if v.TableInfo.Meta().TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + startTS, err := b.getSnapshotTS() if err != nil { b.err = err diff --git a/executor/compiler.go b/executor/compiler.go index 5f0454c66390a..c763c43067047 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -69,16 +69,17 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm lowerPriority = needLowerPriority(finalPlan) } return &ExecStmt{ - GoCtx: ctx, - SnapshotTS: ret.SnapshotTS, - InfoSchema: ret.InfoSchema, - Plan: finalPlan, - LowerPriority: lowerPriority, - Text: stmtNode.Text(), - StmtNode: stmtNode, - Ctx: c.Ctx, - OutputNames: names, - Ti: &TelemetryInfo{}, + GoCtx: ctx, + SnapshotTS: ret.SnapshotTS, + ExplicitStaleness: ret.ExplicitStaleness, + InfoSchema: ret.InfoSchema, + Plan: finalPlan, + LowerPriority: lowerPriority, + Text: stmtNode.Text(), + StmtNode: stmtNode, + Ctx: c.Ctx, + OutputNames: names, + Ti: &TelemetryInfo{}, }, nil } diff --git a/executor/point_get.go b/executor/point_get.go index 9d09a6acf6fb0..3d1908d0fe394 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -42,6 +42,11 @@ import ( ) func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { + if p.TblInfo.TempTableType != model.TempTableNone && b.IsStaleness() { + b.err = errors.New("can not stale read temporary table") + return nil + } + startTS, err := b.getSnapshotTS() if err != nil { b.err = err diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 44def0981bed0..d7fa8a8bdec27 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -15,6 +15,7 @@ package executor_test import ( "fmt" + "strings" "time" . "github.com/pingcap/check" @@ -772,3 +773,92 @@ func (s *testStaleTxnSuite) TestSetTransactionInfoSchema(c *C) { tk.MustExec("commit") c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer3) } + +func (s *testStaleTxnSuite) TestStaleReadTemporaryTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. + safePointName := "tikv_gc_safe_point" + safePointValue := "20160102-15:04:05 -0700" + safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)" + updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY + UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment) + tk.MustExec(updateSafePoint) + + tk.MustExec("set @@tidb_enable_global_temporary_table=1") + tk.MustExec("use test") + tk.MustExec("drop table if exists tmp1") + tk.MustExec("create global temporary table tmp1 " + + "(id int not null primary key, code int not null, value int default null, unique key code(code))" + + "on commit delete rows") + time.Sleep(time.Second) + tk.MustGetErrMsg("select * from tmp1 as of timestamp NOW() where id=1", "can not stale read temporary table") + + queries := []struct { + sql string + }{ + { + sql: "select * from tmp1 where id=1", + }, + { + sql: "select * from tmp1 where code=1", + }, + { + sql: "select * from tmp1 where id in (1, 2, 3)", + }, + { + sql: "select * from tmp1 where code in (1, 2, 3)", + }, + { + sql: "select * from tmp1 where id > 1", + }, + { + sql: "select /*+use_index(tmp1, code)*/ * from tmp1 where code > 1", + }, + { + sql: "select /*+use_index(tmp1, code)*/ code from tmp1 where code > 1", + }, + { + sql: "select * from tmp1 tablesample regions()", + }, + { + sql: "select /*+ use_index_merge(tmp1, primary, code) */ * from tmp1 where id > 1 or code > 2", + }, + } + + addStaleReadToSQL := func(sql string) string { + idx := strings.Index(sql, " where ") + if idx < 0 { + return "" + } + return sql[0:idx] + " as of timestamp NOW()" + sql[idx:] + } + + for _, query := range queries { + sql := addStaleReadToSQL(query.sql) + if sql != "" { + tk.MustGetErrMsg(sql, "can not stale read temporary table") + } + } + + tk.MustExec("start transaction read only as of timestamp NOW()") + for _, query := range queries { + tk.MustGetErrMsg(query.sql, "can not stale read temporary table") + } + tk.MustExec("commit") + + for _, query := range queries { + tk.MustExec(query.sql) + } + + tk.MustExec("set transaction read only as of timestamp NOW()") + tk.MustExec("start transaction") + for _, query := range queries { + tk.MustGetErrMsg(query.sql, "can not stale read temporary table") + } + tk.MustExec("commit") + + for _, query := range queries { + tk.MustExec(query.sql) + } +} diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 5e6f41b7b4fb8..70e1391d3361c 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -132,8 +132,9 @@ const ( // PreprocessorReturn is used to retain information obtained in the preprocessor. type PreprocessorReturn struct { - SnapshotTS uint64 - InfoSchema infoschema.InfoSchema + SnapshotTS uint64 + ExplicitStaleness bool + InfoSchema infoschema.InfoSchema } // preprocessor is an ast.Visitor that preprocess @@ -1420,6 +1421,7 @@ func (p *preprocessor) handleAsOf(node *ast.AsOfClause) { return } p.SnapshotTS = ts + p.ExplicitStaleness = true p.InfoSchema = is } if p.SnapshotTS != ts {