Skip to content

Commit

Permalink
sessionctx: add validation for tidb_enable_plan_replayer_continues_ca…
Browse files Browse the repository at this point in the history
…pture (#40787)
  • Loading branch information
Yisaer authored Jan 29, 2023
1 parent 0bcb859 commit fe4a81d
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 17 deletions.
1 change: 0 additions & 1 deletion domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ go_library(
"//privilege/privileges",
"//sessionctx",
"//sessionctx/sessionstates",
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//statistics/handle",
"//telemetry",
Expand Down
16 changes: 13 additions & 3 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/replayer"
Expand Down Expand Up @@ -164,7 +164,18 @@ func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.Token, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.String("sql", record.OriginSQL),
zap.Error(err))
// try insert record without original sql
_, err = exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, token, instance) values ('%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.Token, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.String("sqlDigest", record.SQLDigest),
zap.String("planDigest", record.PlanDigest),
zap.Error(err))
}
}
}

Expand Down Expand Up @@ -379,6 +390,7 @@ func (w *planReplayerTaskDumpWorker) handleTask(task *PlanReplayerDumpTask) {
occupy := true
handleTask := true
defer func() {
util.Recover(metrics.LabelDomain, "PlanReplayerTaskDumpWorker", nil, false)
logutil.BgLogger().Debug("[plan-replayer-capture] handle task",
zap.String("sql-digest", sqlDigest),
zap.String("plan-digest", planDigest),
Expand Down Expand Up @@ -431,7 +443,6 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
}
task.Zf = file
task.FileName = fileName
task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false)
if task.InExecute && len(task.NormalizedSQL) > 0 {
p := parser.New()
stmts, _, err := p.ParseSQL(task.NormalizedSQL)
Expand Down Expand Up @@ -538,7 +549,6 @@ type PlanReplayerDumpTask struct {
replayer.PlanReplayerTaskKey

// tmp variables stored during the query
EncodePlan func(*stmtctx.StatementContext, bool) (string, string)
TblStats map[int64]interface{}
InExecute bool
NormalizedSQL string
Expand Down
9 changes: 8 additions & 1 deletion domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,14 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,

// For capture task, we dump stats in storage only if EnableHistoricalStatsForCapture is disabled.
// For manual plan replayer dump command, we directly dump stats in storage
if !variable.EnableHistoricalStatsForCapture.Load() || !task.IsCapture {
if task.IsCapture {
if !task.IsContinuesCapture && variable.EnableHistoricalStatsForCapture.Load() {
// Dump stats
if err = dumpStats(zw, pairs, do); err != nil {
return err
}
}
} else {
// Dump stats
if err = dumpStats(zw, pairs, do); err != nil {
return err
Expand Down
31 changes: 19 additions & 12 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1412,17 +1412,7 @@ func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.Comm
// 4. update the `PrevStmt` in session variable.
// 5. reset `DurationParse` in session variable.
func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults bool) {
se := a.Ctx
if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() {
stmtNode := a.GetStmtNode()
if se.GetSessionVars().EnablePlanReplayedContinuesCapture {
if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) {
checkPlanReplayerContinuesCapture(se, stmtNode, txnTS)
}
} else {
checkPlanReplayerCaptureTask(se, stmtNode, txnTS)
}
}
a.checkPlanReplayerCapture(txnTS)

sessVars := a.Ctx.GetSessionVars()
execDetail := sessVars.StmtCtx.GetExecDetails()
Expand Down Expand Up @@ -1485,6 +1475,23 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
}
}

func (a *ExecStmt) checkPlanReplayerCapture(txnTS uint64) {
if kv.GetInternalSourceType(a.GoCtx) == kv.InternalTxnStats {
return
}
se := a.Ctx
if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() {
stmtNode := a.GetStmtNode()
if se.GetSessionVars().EnablePlanReplayedContinuesCapture {
if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) {
checkPlanReplayerContinuesCapture(se, stmtNode, txnTS)
}
} else {
checkPlanReplayerCaptureTask(se, stmtNode, txnTS)
}
}
}

// CloseRecordSet will finish the execution of current statement and do some record work
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
a.FinishExecuteStmt(txnStartTS, lastErr, false)
Expand Down Expand Up @@ -2112,7 +2119,6 @@ func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.
dumpTask := &domain.PlanReplayerDumpTask{
PlanReplayerTaskKey: key,
StartTS: startTS,
EncodePlan: GetEncodedPlan,
TblStats: stmtCtx.TableStats,
SessionBindings: handle.GetAllBindRecord(),
SessionVars: sctx.GetSessionVars(),
Expand All @@ -2121,6 +2127,7 @@ func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.
IsCapture: true,
IsContinuesCapture: isContinuesCapture,
}
dumpTask.EncodedPlan, _ = GetEncodedPlan(stmtCtx, false)
if _, ok := stmtNode.(*ast.ExecuteStmt); ok {
nsql, _ := sctx.GetSessionVars().StmtCtx.SQLDigest()
dumpTask.InExecute = true
Expand Down
9 changes: 9 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,15 @@ func TestPlanReplayerCapture(t *testing.T) {
func TestPlanReplayerContinuesCapture(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("set @@global.tidb_enable_historical_stats='OFF'")
_, err := tk.Exec("set @@global.tidb_enable_plan_replayer_continues_capture='ON'")
require.Error(t, err)
require.Equal(t, err.Error(), "tidb_enable_historical_stats should be enabled before enabling tidb_enable_plan_replayer_continues_capture")

tk.MustExec("set @@global.tidb_enable_historical_stats='ON'")
tk.MustExec("set @@global.tidb_enable_plan_replayer_continues_capture='ON'")

prHandle := dom.GetPlanReplayerHandle()
tk.MustExec("delete from mysql.plan_replayer_status;")
tk.MustExec("use test")
Expand Down
11 changes: 11 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package kv

import (
"context"

"github.com/tikv/client-go/v2/util"
)

Expand Down Expand Up @@ -136,6 +138,15 @@ type RequestSource = util.RequestSource
// WithInternalSourceType create context with internal source.
var WithInternalSourceType = util.WithInternalSourceType

// GetInternalSourceType get internal source
func GetInternalSourceType(ctx context.Context) string {
v := ctx.Value(util.RequestSourceKey)
if v == nil {
return ""
}
return v.(util.RequestSource).RequestSourceType
}

const (
// InternalTxnOthers is the type of requests that consume low resources.
// This reduces the size of metrics.
Expand Down
17 changes: 17 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,12 +1188,29 @@ var defaultSysVars = []*SysVar{
/* The system variables below have GLOBAL and SESSION scope */
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePlanReplayerContinuesCapture, Value: BoolToOnOff(false), Type: TypeBool,
SetSession: func(s *SessionVars, val string) error {
historicalStatsEnabled, err := s.GlobalVarsAccessor.GetGlobalSysVar(TiDBEnableHistoricalStats)
if err != nil {
return err
}
if !TiDBOptOn(historicalStatsEnabled) && TiDBOptOn(val) {
return errors.Errorf("%v should be enabled before enabling %v", TiDBEnableHistoricalStats, TiDBEnablePlanReplayerContinuesCapture)
}
s.EnablePlanReplayedContinuesCapture = TiDBOptOn(val)
return nil
},
GetSession: func(vars *SessionVars) (string, error) {
return BoolToOnOff(vars.EnablePlanReplayedContinuesCapture), nil
},
Validation: func(vars *SessionVars, s string, s2 string, flag ScopeFlag) (string, error) {
historicalStatsEnabled, err := vars.GlobalVarsAccessor.GetGlobalSysVar(TiDBEnableHistoricalStats)
if err != nil {
return "", err
}
if !TiDBOptOn(historicalStatsEnabled) && TiDBOptOn(s) {
return "", errors.Errorf("%v should be enabled before enabling %v", TiDBEnableHistoricalStats, TiDBEnablePlanReplayerContinuesCapture)
}
return s, nil
},
},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePlanReplayerCapture, Value: BoolToOnOff(true), Type: TypeBool,
SetSession: func(s *SessionVars, val string) error {
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,7 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.Analyz
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", statsVer),
zap.String("source", source),
zap.Error(err1))
}
}
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/historical_stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64, source
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", version),
zap.String("source", source),
zap.Error(err))
}
}

0 comments on commit fe4a81d

Please sign in to comment.