Skip to content

Commit

Permalink
executor: fix prepared stale read statement not work (#25746) (#25800)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jul 19, 2021
1 parent 344c5bf commit 6cae8ae
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 15 deletions.
2 changes: 1 addition & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) {
assertScope := val.(string)
if len(assertScope) > 0 {
if builder.IsStaleness && assertScope != builder.TxnScope {
panic("batch point get staleness option fail")
panic("request builder get staleness option fail")
}
}
})
Expand Down
12 changes: 12 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -678,6 +679,8 @@ func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor {

func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
b.snapshotTS = v.SnapshotTS
b.explicitStaleness = v.IsStaleness
b.txnScope = v.TxnScope
if b.snapshotTS != 0 {
b.is, b.err = domain.GetDomain(b.ctx).GetSnapshotInfoSchema(b.snapshotTS)
}
Expand All @@ -691,6 +694,15 @@ func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
plan: v.Plan,
outputNames: v.OutputNames(),
}
failpoint.Inject("assertExecutePrepareStatementStalenessOption", func(val failpoint.Value) {
vs := strings.Split(val.(string), "_")
assertTS, assertTxnScope := vs[0], vs[1]
if strconv.FormatUint(b.snapshotTS, 10) != assertTS ||
assertTxnScope != b.txnScope {
panic("execute prepare statement have wrong staleness option")
}
})

return e
}

Expand Down
42 changes: 42 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,3 +947,45 @@ func (s *testStaleTxnSuite) TestStaleReadTemporaryTable(c *C) {
tk.MustExec(query.sql)
}
}

func (s *testStaleTxnSerialSuite) TestStaleReadPrepare(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int)")
time.Sleep(2 * time.Second)
conf := *config.GetGlobalConfig()
oldConf := conf
defer config.StoreGlobalConfig(&oldConf)
conf.Labels = map[string]string{
placement.DCLabelKey: "sh",
}
config.StoreGlobalConfig(&conf)
time1 := time.Now()
tso := oracle.ComposeTS(time1.Unix()*1000, 0)
time.Sleep(200 * time.Millisecond)
failpoint.Enable("github.com/pingcap/tidb/executor/assertExecutePrepareStatementStalenessOption",
fmt.Sprintf(`return("%v_%v")`, tso, "sh"))
tk.MustExec(fmt.Sprintf(`prepare p1 from "select * from t as of timestamp '%v'"`, time1.Format("2006-1-2 15:04:05")))
tk.MustExec("execute p1")
// assert execute prepared statement in stale read txn
tk.MustExec(`prepare p2 from "select * from t"`)
tk.MustExec(fmt.Sprintf("start transaction read only as of timestamp '%v'", time1.Format("2006-1-2 15:04:05")))
tk.MustExec("execute p2")
tk.MustExec("commit")

// assert execute prepared statement in stale read txn
tk.MustExec(fmt.Sprintf("set transaction read only as of timestamp '%v'", time1.Format("2006-1-2 15:04:05")))
tk.MustExec("execute p2")
failpoint.Disable("github.com/pingcap/tidb/executor/assertExecutePrepareStatementStalenessOption")

// test prepared stale select in stale txn
tk.MustExec(fmt.Sprintf(`start transaction read only as of timestamp '%s'`, time1.Format("2006-1-2 15:04:05.000")))
c.Assert("execute p1", NotNil)
tk.MustExec("commit")

// assert execute prepared statement should be error after set transaction read only as of
tk.MustExec(fmt.Sprintf(`set transaction read only as of timestamp '%s'`, time1.Format("2006-1-2 15:04:05.000")))
c.Assert("execute p1", NotNil)
}
83 changes: 69 additions & 14 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -184,6 +186,8 @@ type Execute struct {
PrepareParams []types.Datum
ExecID uint32
SnapshotTS uint64
IsStaleness bool
TxnScope string
Stmt ast.StmtNode
StmtType string
Plan Plan
Expand Down Expand Up @@ -257,19 +261,11 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
vars.PreparedParams = append(vars.PreparedParams, val)
}
}

var snapshotTS uint64
if preparedObj.SnapshotTSEvaluator != nil {
if vars.InTxn() {
return ErrAsOf.FastGenWithCause("as of timestamp can't be set in transaction.")
}
// if preparedObj.SnapshotTSEvaluator != nil, it is a stale read SQL:
// which means its infoschema is specified by the SQL, not the current/latest infoschema
var err error
snapshotTS, err = preparedObj.SnapshotTSEvaluator(sctx)
if err != nil {
return errors.Trace(err)
}
snapshotTS, txnScope, isStaleness, err := e.handleExecuteBuilderOption(sctx, preparedObj)
if err != nil {
return err
}
if isStaleness {
is, err = domain.GetDomain(sctx).GetSnapshotInfoSchema(snapshotTS)
if err != nil {
return errors.Trace(err)
Expand All @@ -291,15 +287,74 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
}
prepared.SchemaVersion = is.SchemaMetaVersion()
}
err := e.getPhysicalPlan(ctx, sctx, is, preparedObj)
err = e.getPhysicalPlan(ctx, sctx, is, preparedObj)
if err != nil {
return err
}
e.SnapshotTS = snapshotTS
e.TxnScope = txnScope
e.IsStaleness = isStaleness
e.Stmt = prepared.Stmt
return nil
}

func (e *Execute) handleExecuteBuilderOption(sctx sessionctx.Context,
preparedObj *CachedPrepareStmt) (snapshotTS uint64, txnScope string, isStaleness bool, err error) {
snapshotTS = 0
txnScope = oracle.GlobalTxnScope
isStaleness = false
err = nil
vars := sctx.GetSessionVars()
readTS := vars.TxnReadTS.PeakTxnReadTS()
if readTS > 0 {
// It means we meet following case:
// 1. prepare p from 'select * from t as of timestamp now() - x seconds'
// 1. set transaction read only as of timestamp ts2
// 2. execute prepare p
// The execute statement would be refused due to timestamp conflict
if preparedObj.SnapshotTSEvaluator != nil {
err = ErrAsOf.FastGenWithCause("as of timestamp can't be set after set transaction read only as of.")
return
}
snapshotTS = vars.TxnReadTS.UseTxnReadTS()
isStaleness = true
txnScope = config.GetTxnScopeFromConfig()
return
}
// It means we meet following case:
// 1. prepare p from 'select * from t as of timestamp ts1'
// 1. begin
// 2. execute prepare p
// The execute statement would be refused due to timestamp conflict
if preparedObj.SnapshotTSEvaluator != nil {
if vars.InTxn() {
err = ErrAsOf.FastGenWithCause("as of timestamp can't be set in transaction.")
return
}
// if preparedObj.SnapshotTSEvaluator != nil, it is a stale read SQL:
// which means its infoschema is specified by the SQL, not the current/latest infoschema
snapshotTS, err = preparedObj.SnapshotTSEvaluator(sctx)
if err != nil {
err = errors.Trace(err)
return
}
isStaleness = true
txnScope = config.GetTxnScopeFromConfig()
return
}
// It means we meet following case:
// 1. prepare p from 'select * from t'
// 1. start transaction read only as of timestamp ts1
// 2. execute prepare p
if vars.InTxn() && vars.TxnCtx.IsStaleness {
isStaleness = true
snapshotTS = vars.TxnCtx.StartTS
txnScope = vars.TxnCtx.TxnScope
return
}
return
}

func (e *Execute) checkPreparedPriv(ctx context.Context, sctx sessionctx.Context,
preparedObj *CachedPrepareStmt, is infoschema.InfoSchema) error {
if pm := privilege.GetPrivilegeManager(sctx); pm != nil {
Expand Down

0 comments on commit 6cae8ae

Please sign in to comment.