Skip to content

Commit

Permalink
executor: add several sql hint related to session variables (#11809)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreyes authored and sre-bot committed Sep 17, 2019
1 parent fa675ca commit e173c7f
Show file tree
Hide file tree
Showing 16 changed files with 239 additions and 35 deletions.
2 changes: 1 addition & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.Request.IsolationLevel = builder.getIsolationLevel()
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.ReplicaRead
builder.Request.ReplicaRead = sv.GetReplicaRead()
return builder
}

Expand Down
2 changes: 1 addition & 1 deletion distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func (s *testSuite) TestRequestBuilder6(c *C) {

func (s *testSuite) TestRequestBuilder7(c *C) {
vars := variable.NewSessionVars()
vars.ReplicaRead = kv.ReplicaReadFollower
vars.SetReplicaRead(kv.ReplicaReadFollower)

concurrency := 10

Expand Down
6 changes: 3 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild
var resp *tikvrpc.Response
var rpcCtx *tikv.RPCContext
// we always use the first follower when follower read is enabled
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().ReplicaRead, 0)
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().GetReplicaRead(), 0)
if *err != nil {
return
}
Expand Down Expand Up @@ -925,7 +925,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
if err != nil {
return 0, err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
for _, t := range e.scanTasks {
Expand All @@ -949,7 +949,7 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
if *err != nil {
return
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
ctx := tk.Se.(sessionctx.Context)
ctx.GetSessionVars().ReplicaRead = kv.ReplicaReadFollower
ctx.GetSessionVars().SetReplicaRead(kv.ReplicaReadFollower)
tk.MustExec("analyze table t")
}

Expand Down
93 changes: 92 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,13 +1366,101 @@ func (e *UnionExec) Close() error {
return e.baseExecutor.Close()
}

func extractStmtHintsFromStmtNode(stmtNode ast.StmtNode) []*ast.TableOptimizerHint {
switch x := stmtNode.(type) {
case *ast.SelectStmt:
return x.TableHints
case *ast.UpdateStmt:
return x.TableHints
case *ast.DeleteStmt:
return x.TableHints
// TODO: support hint for InsertStmt
case *ast.ExplainStmt:
return extractStmtHintsFromStmtNode(x.Stmt)
default:
return nil
}
}

func handleStmtHints(hints []*ast.TableOptimizerHint) (stmtHints stmtctx.StmtHints, warns []error) {
var memoryQuotaHintList, noIndexMergeHintList, useToJAHintList, readReplicaHintList []*ast.TableOptimizerHint
for _, hint := range hints {
switch hint.HintName.L {
case "memory_quota":
memoryQuotaHintList = append(memoryQuotaHintList, hint)
case "no_index_merge":
noIndexMergeHintList = append(noIndexMergeHintList, hint)
case "use_toja":
useToJAHintList = append(useToJAHintList, hint)
case "read_consistent_replica":
readReplicaHintList = append(readReplicaHintList, hint)
}
}
// Handle MEMORY_QUOTA
if len(memoryQuotaHintList) != 0 {
if len(memoryQuotaHintList) > 1 {
warn := errors.New("There are multiple MEMORY_QUOTA hints, only the last one will take effect")
warns = append(warns, warn)
}
hint := memoryQuotaHintList[len(memoryQuotaHintList)-1]
// Executor use MemoryQuota <= 0 to indicate no memory limit, here use < 0 to handle hint syntax error.
if hint.MemoryQuota < 0 {
warn := errors.New("The use of MEMORY_QUOTA hint is invalid, valid usage: MEMORY_QUOTA(10 MB) or MEMORY_QUOTA(10 GB)")
warns = append(warns, warn)
} else {
stmtHints.HasMemQuotaHint = true
stmtHints.MemQuotaQuery = hint.MemoryQuota
if hint.MemoryQuota == 0 {
warn := errors.New("Setting the MEMORY_QUOTA to 0 means no memory limit")
warns = append(warns, warn)
}
}
}
// Handle USE_TOJA
if len(useToJAHintList) != 0 {
if len(useToJAHintList) > 1 {
warn := errors.New("There are multiple USE_TOJA hints, only the last one will take effect")
warns = append(warns, warn)
}
hint := useToJAHintList[len(useToJAHintList)-1]
stmtHints.HasAllowInSubqToJoinAndAggHint = true
stmtHints.AllowInSubqToJoinAndAgg = hint.HintFlag
}
// Handle NO_INDEX_MERGE
if len(noIndexMergeHintList) != 0 {
if len(noIndexMergeHintList) > 1 {
warn := errors.New("There are multiple NO_INDEX_MERGE hints, only the last one will take effect")
warns = append(warns, warn)
}
stmtHints.HasEnableIndexMergeHint = true
stmtHints.EnableIndexMerge = false
}
// Handle READ_CONSISTENT_REPLICA
if len(readReplicaHintList) != 0 {
if len(readReplicaHintList) > 1 {
warn := errors.New("There are multiple READ_CONSISTENT_REPLICA hints, only the last one will take effect")
warns = append(warns, warn)
}
stmtHints.HasReplicaReadHint = true
stmtHints.ReplicaRead = byte(kv.ReplicaReadFollower)
}
return
}

// ResetContextOfStmt resets the StmtContext and session variables.
// Before every execution, we must clear statement context.
func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
hints := extractStmtHintsFromStmtNode(s)
stmtHints, hintWarns := handleStmtHints(hints)
vars := ctx.GetSessionVars()
memQuota := vars.MemQuotaQuery
if stmtHints.HasMemQuotaHint {
memQuota = stmtHints.MemQuotaQuery
}
sc := &stmtctx.StatementContext{
StmtHints: stmtHints,
TimeZone: vars.Location(),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), vars.MemQuotaQuery),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), memQuota),
}
switch config.GetGlobalConfig().OOMAction {
case config.OOMActionCancel:
Expand Down Expand Up @@ -1504,5 +1592,8 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
return err
}
vars.StmtCtx = sc
for _, warn := range hintWarns {
vars.StmtCtx.AppendWarning(warn)
}
return
}
2 changes: 1 addition & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil {
return err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
if e.idxInfo != nil {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func (er *expressionRewriter) handleInSubquery(ctx context.Context, v *ast.Patte
// and has no correlated column from the current level plan(if the correlated column is from upper level,
// we can treat it as constant, because the upper LogicalApply cannot be eliminated since current node is a join node),
// and don't need to append a scalar value, we can rewrite it to inner join.
if er.sctx.GetSessionVars().AllowInSubqToJoinAndAgg && !v.Not && !asScalar && len(extractCorColumnsBySchema(np, er.p.Schema())) == 0 {
if er.sctx.GetSessionVars().GetAllowInSubqToJoinAndAgg() && !v.Not && !asScalar && len(extractCorColumnsBySchema(np, er.p.Schema())) == 0 {
// We need to try to eliminate the agg and the projection produced by this operation.
er.b.optFlag |= flagEliminateAgg
er.b.optFlag |= flagEliminateProjection
Expand Down
2 changes: 1 addition & 1 deletion planner/core/indexmerge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (s *testIndexMergeSuite) TestIndexMergePathGenerateion(c *C) {
lp = lp.Children()[0]
}
}
ds.ctx.GetSessionVars().EnableIndexMerge = true
ds.ctx.GetSessionVars().SetEnableIndexMerge(true)
idxMergeStartIndex := len(ds.possibleAccessPaths)
_, err = lp.recursiveDeriveStats()
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo) (*property.S
}
}
// Consider the IndexMergePath. Now, we just generate `IndexMergePath` in DNF case.
if len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1 && ds.ctx.GetSessionVars().EnableIndexMerge {
if len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1 && ds.ctx.GetSessionVars().GetEnableIndexMerge() {
needConsiderIndexMerge := true
for i := 1; i < len(ds.possibleAccessPaths); i++ {
if len(ds.possibleAccessPaths[i].accessConds) != 0 {
Expand Down
4 changes: 2 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable()
if s.sessionVars.ReplicaRead.IsFollowerRead() {
if s.sessionVars.GetReplicaRead().IsFollowerRead() {
s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
}
Expand Down Expand Up @@ -1275,7 +1275,7 @@ func (s *session) NewTxn(ctx context.Context) error {
}
txn.SetCap(s.getMembufCap())
txn.SetVars(s.sessionVars.KVVars)
if s.GetSessionVars().ReplicaRead.IsFollowerRead() {
if s.GetSessionVars().GetReplicaRead().IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
s.txn.changeInvalidToValid(txn)
Expand Down
56 changes: 53 additions & 3 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2808,9 +2808,59 @@ func (s *testSessionSuite) TestReplicaRead(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader)
tk.MustExec("set @@tidb_replica_read = 'follower';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadFollower)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
tk.MustExec("set @@tidb_replica_read = 'leader';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader)
}

func (s *testSessionSuite) TestStmtHints(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)

// Test MEMORY_QUOTA hint
tk.MustExec("select /*+ MEMORY_QUOTA(1 MB) */ 1;")
val := int64(1) * 1024 * 1024
c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue)
tk.MustExec("select /*+ MEMORY_QUOTA(1 GB) */ 1;")
val = int64(1) * 1024 * 1024 * 1024
c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue)
tk.MustExec("select /*+ MEMORY_QUOTA(1 GB), MEMORY_QUOTA(1 MB) */ 1;")
val = int64(1) * 1024 * 1024
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue)
tk.MustExec("select /*+ MEMORY_QUOTA(0 GB) */ 1;")
val = int64(0)
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue)

// Test NO_INDEX_MERGE hint
tk.Se.GetSessionVars().SetEnableIndexMerge(true)
tk.MustExec("select /*+ NO_INDEX_MERGE() */ 1;")
c.Assert(tk.Se.GetSessionVars().GetEnableIndexMerge(), IsFalse)
tk.MustExec("select /*+ NO_INDEX_MERGE(), NO_INDEX_MERGE() */ 1;")
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().GetEnableIndexMerge(), IsFalse)

// Test USE_TOJA hint
tk.Se.GetSessionVars().SetAllowInSubqToJoinAndAgg(true)
tk.MustExec("select /*+ USE_TOJA(false) */ 1;")
c.Assert(tk.Se.GetSessionVars().GetAllowInSubqToJoinAndAgg(), IsFalse)
tk.Se.GetSessionVars().SetAllowInSubqToJoinAndAgg(false)
tk.MustExec("select /*+ USE_TOJA(true) */ 1;")
c.Assert(tk.Se.GetSessionVars().GetAllowInSubqToJoinAndAgg(), IsTrue)
tk.MustExec("select /*+ USE_TOJA(false), USE_TOJA(true) */ 1;")
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().GetAllowInSubqToJoinAndAgg(), IsTrue)

// Test READ_CONSISTENT_REPLICA hint
tk.Se.GetSessionVars().SetReplicaRead(kv.ReplicaReadLeader)
tk.MustExec("select /*+ READ_CONSISTENT_REPLICA() */ 1;")
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
tk.MustExec("select /*+ READ_CONSISTENT_REPLICA(), READ_CONSISTENT_REPLICA() */ 1;")
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
}
16 changes: 16 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type SQLWarn struct {
// It should be reset before executing a statement.
type StatementContext struct {
// Set the following variables before execution
StmtHints

// IsDDLJobInQueue is used to mark whether the DDL job is put into the queue.
// If IsDDLJobInQueue is true, it means the DDL job is in the queue of storage, and it can be handled by the DDL worker.
Expand Down Expand Up @@ -137,6 +138,21 @@ type StatementContext struct {
Tables []TableEntry
}

// StmtHints are SessionVars related sql hints.
type StmtHints struct {
// Hint flags
HasAllowInSubqToJoinAndAggHint bool
HasEnableIndexMergeHint bool
HasMemQuotaHint bool
HasReplicaReadHint bool

// Hint Information
AllowInSubqToJoinAndAgg bool
EnableIndexMerge bool
MemQuotaQuery int64
ReplicaRead byte
}

// GetNowTsCached getter for nowTs, if not set get now time and cache it
func (sc *StatementContext) GetNowTsCached() time.Time {
if !sc.stmtTimeCached {
Expand Down
Loading

0 comments on commit e173c7f

Please sign in to comment.