Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
fix

fix

fix

fix

fix

fix

fix

fix
  • Loading branch information
Yisaer committed Jan 29, 2023
1 parent f2f9827 commit d1a8122
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 16 deletions.
16 changes: 13 additions & 3 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/replayer"
Expand Down Expand Up @@ -164,7 +164,18 @@ func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.Token, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.String("sql", record.OriginSQL),
zap.Error(err))
// try insert record without original sql
_, err = exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, token, instance) values ('%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.Token, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.String("sqlDigest", record.SQLDigest),
zap.String("planDigest", record.PlanDigest),
zap.Error(err))
}
}
}

Expand Down Expand Up @@ -379,6 +390,7 @@ func (w *planReplayerTaskDumpWorker) handleTask(task *PlanReplayerDumpTask) {
occupy := true
handleTask := true
defer func() {
util.Recover(metrics.LabelDomain, "PlanReplayerTaskDumpWorker", nil, false)
logutil.BgLogger().Debug("[plan-replayer-capture] handle task",
zap.String("sql-digest", sqlDigest),
zap.String("plan-digest", planDigest),
Expand Down Expand Up @@ -431,7 +443,6 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
}
task.Zf = file
task.FileName = fileName
task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false)
if task.InExecute && len(task.NormalizedSQL) > 0 {
p := parser.New()
stmts, _, err := p.ParseSQL(task.NormalizedSQL)
Expand Down Expand Up @@ -538,7 +549,6 @@ type PlanReplayerDumpTask struct {
replayer.PlanReplayerTaskKey

// tmp variables stored during the query
EncodePlan func(*stmtctx.StatementContext, bool) (string, string)
TblStats map[int64]interface{}
InExecute bool
NormalizedSQL string
Expand Down
9 changes: 8 additions & 1 deletion domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,14 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,

// For capture task, we dump stats in storage only if EnableHistoricalStatsForCapture is disabled.
// For manual plan replayer dump command, we directly dump stats in storage
if !variable.EnableHistoricalStatsForCapture.Load() || !task.IsCapture {
if task.IsCapture {
if !task.IsContinuesCapture && variable.EnableHistoricalStatsForCapture.Load() {
// Dump stats
if err = dumpStats(zw, pairs, do); err != nil {
return err
}
}
} else {
// Dump stats
if err = dumpStats(zw, pairs, do); err != nil {
return err
Expand Down
31 changes: 19 additions & 12 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1412,17 +1412,7 @@ func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.Comm
// 4. update the `PrevStmt` in session variable.
// 5. reset `DurationParse` in session variable.
func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults bool) {
se := a.Ctx
if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() {
stmtNode := a.GetStmtNode()
if se.GetSessionVars().EnablePlanReplayedContinuesCapture {
if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) {
checkPlanReplayerContinuesCapture(se, stmtNode, txnTS)
}
} else {
checkPlanReplayerCaptureTask(se, stmtNode, txnTS)
}
}
a.checkPlanReplayerCapture(txnTS)

sessVars := a.Ctx.GetSessionVars()
execDetail := sessVars.StmtCtx.GetExecDetails()
Expand Down Expand Up @@ -1485,6 +1475,23 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
}
}

func (a *ExecStmt) checkPlanReplayerCapture(txnTS uint64) {
if kv.GetInternalSourceType(a.GoCtx) == kv.InternalTxnStats {
return
}
se := a.Ctx
if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() {
stmtNode := a.GetStmtNode()
if se.GetSessionVars().EnablePlanReplayedContinuesCapture {
if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) {
checkPlanReplayerContinuesCapture(se, stmtNode, txnTS)
}
} else {
checkPlanReplayerCaptureTask(se, stmtNode, txnTS)
}
}
}

// CloseRecordSet will finish the execution of current statement and do some record work
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
a.FinishExecuteStmt(txnStartTS, lastErr, false)
Expand Down Expand Up @@ -2112,7 +2119,6 @@ func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.
dumpTask := &domain.PlanReplayerDumpTask{
PlanReplayerTaskKey: key,
StartTS: startTS,
EncodePlan: GetEncodedPlan,
TblStats: stmtCtx.TableStats,
SessionBindings: handle.GetAllBindRecord(),
SessionVars: sctx.GetSessionVars(),
Expand All @@ -2121,6 +2127,7 @@ func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.
IsCapture: true,
IsContinuesCapture: isContinuesCapture,
}
dumpTask.EncodedPlan, _ = GetEncodedPlan(stmtCtx, false)
if _, ok := stmtNode.(*ast.ExecuteStmt); ok {
nsql, _ := sctx.GetSessionVars().StmtCtx.SQLDigest()
dumpTask.InExecute = true
Expand Down
11 changes: 11 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package kv

import (
"context"

"github.com/tikv/client-go/v2/util"
)

Expand Down Expand Up @@ -136,6 +138,15 @@ type RequestSource = util.RequestSource
// WithInternalSourceType create context with internal source.
var WithInternalSourceType = util.WithInternalSourceType

// GetInternalSourceType get internal source
func GetInternalSourceType(ctx context.Context) string {
v := ctx.Value(util.RequestSourceKey)
if v == nil {
return ""
}
return v.(util.RequestSource).RequestSourceType
}

const (
// InternalTxnOthers is the type of requests that consume low resources.
// This reduces the size of metrics.
Expand Down
2 changes: 2 additions & 0 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ func NewHandle(ctx, initStatsCtx sessionctx.Context, lease time.Duration, pool s
if err != nil {
return nil, err
}
handle.mu.ctx.GetSessionVars().InRestrictedSQL = true
return handle, nil
}

Expand Down Expand Up @@ -1634,6 +1635,7 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.Analyz
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", statsVer),
zap.String("source", source),
zap.Error(err1))
}
}
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/historical_stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64, source
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", version),
zap.String("source", source),
zap.Error(err))
}
}

0 comments on commit d1a8122

Please sign in to comment.