Skip to content

Commit

Permalink
domain: revise plan replayer process log (#40126)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Dec 26, 2022
1 parent 2f6401a commit fc241b2
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 45 deletions.
54 changes: 40 additions & 14 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,21 @@ type planReplayerHandle struct {
}

// SendTask send dumpTask in background task handler
func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) {
func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) bool {
select {
case h.planReplayerTaskDumpHandle.taskCH <- task:
// we directly remove the task key if we put task in channel successfully, if the task was failed to dump,
// the task handle will re-add the task in next loop
if !task.IsContinuesCapture {
h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey)
}
return true
default:
// TODO: add metrics here
// directly discard the task if the task channel is full in order not to block the query process
logutil.BgLogger().Info("discard one plan replayer dump task",
zap.String("sql digest", task.SQLDigest), zap.String("plan digest", task.PlanDigest))
logutil.BgLogger().Warn("discard one plan replayer dump task",
zap.String("sql-digest", task.SQLDigest), zap.String("plan-digest", task.PlanDigest))
return false
}
}

Expand All @@ -209,9 +211,13 @@ func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error {
for _, key := range allKeys {
unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, key)
if err != nil {
logutil.BgLogger().Warn("[plan-replayer-task] collect plan replayer task failed", zap.Error(err))
return err
}
if unhandled {
logutil.BgLogger().Debug("[plan-replayer-task] collect plan replayer task success",
zap.String("sql-digest", key.SQLDigest),
zap.String("plan-digest", key.PlanDigest))
tasks = append(tasks, key)
}
}
Expand Down Expand Up @@ -351,16 +357,36 @@ type planReplayerTaskDumpWorker struct {

func (w *planReplayerTaskDumpWorker) run() {
for task := range w.taskCH {
w.handleTask(task)
}
}

func (w *planReplayerTaskDumpWorker) handleTask(task *PlanReplayerDumpTask) {
sqlDigest := task.SQLDigest
planDigest := task.PlanDigest
check := true
occupy := true
handleTask := true
defer func() {
logutil.BgLogger().Debug("[plan-replayer-capture] handle task",
zap.String("sql-digest", sqlDigest),
zap.String("plan-digest", planDigest),
zap.Bool("check", check),
zap.Bool("occupy", occupy),
zap.Bool("handle", handleTask))
}()
if task.IsContinuesCapture {
if w.status.checkTaskKeyFinishedBefore(task) {
continue
check = false
return
}
successOccupy := w.status.occupyRunningTaskKey(task)
if !successOccupy {
continue
}
w.HandleTask(task)
w.status.releaseRunningTaskKey(task)
}
occupy = w.status.occupyRunningTaskKey(task)
if !occupy {
return
}
handleTask = w.HandleTask(task)
w.status.releaseRunningTaskKey(task)
}

// HandleTask handled task
Expand All @@ -373,7 +399,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
taskKey := task.PlanReplayerTaskKey
unhandled, err := checkUnHandledReplayerTask(w.ctx, w.sctx, taskKey)
if err != nil {
logutil.BgLogger().Warn("check plan replayer capture task failed",
logutil.BgLogger().Warn("[plan-replayer-capture] check task failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand All @@ -386,7 +412,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc

file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsContinuesCapture)
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task file failed",
logutil.BgLogger().Warn("[plan-replayer-capture] generate task file failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand All @@ -409,7 +435,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
}
r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table))
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task json stats failed",
logutil.BgLogger().Warn("[plan-replayer-capture] generate task json stats failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand All @@ -421,7 +447,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
}
err = DumpPlanReplayerInfo(w.ctx, w.sctx, task)
if err != nil {
logutil.BgLogger().Warn("dump plan replayer capture task result failed",
logutil.BgLogger().Warn("[plan-replayer-capture] dump task result failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand Down
52 changes: 43 additions & 9 deletions domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ const (
PlanReplayerTaskMetaIsCapture = "isCapture"
// PlanReplayerTaskMetaIsContinues indicates whether this task is continues task
PlanReplayerTaskMetaIsContinues = "isContinues"
// PlanReplayerTaskMetaSQLDigest indicates the sql digest of this task
PlanReplayerTaskMetaSQLDigest = "sqlDigest"
// PlanReplayerTaskMetaPlanDigest indicates the plan digest of this task
PlanReplayerTaskMetaPlanDigest = "planDigest"
)

type tableNamePair struct {
Expand Down Expand Up @@ -180,25 +184,53 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
execStmts := task.ExecStmts
zw := zip.NewWriter(zf)
var records []PlanReplayerStatusRecord
sqls := make([]string, 0)
for _, execStmt := range task.ExecStmts {
sqls = append(sqls, execStmt.Text())
}
if task.IsCapture {
logutil.BgLogger().Info("[plan-replayer-dump] start to dump plan replayer result",
zap.String("sql-digest", task.SQLDigest),
zap.String("plan-digest", task.PlanDigest),
zap.Strings("sql", sqls),
zap.Bool("isContinues", task.IsContinuesCapture))
} else {
logutil.BgLogger().Info("[plan-replayer-dump] start to dump plan replayer result",
zap.Strings("sqls", sqls))
}
defer func() {
errMsg := ""
if err != nil {
logutil.BgLogger().Error("dump plan replayer failed", zap.Error(err))
if task.IsCapture {
logutil.BgLogger().Info("[plan-replayer-dump] dump file failed",
zap.String("sql-digest", task.SQLDigest),
zap.String("plan-digest", task.PlanDigest),
zap.Strings("sql", sqls),
zap.Bool("isContinues", task.IsContinuesCapture))
} else {
logutil.BgLogger().Info("[plan-replayer-dump] start to dump plan replayer result",
zap.Strings("sqls", sqls))
}
errMsg = err.Error()
}
err = zw.Close()
if err != nil {
logutil.BgLogger().Error("Closing zip writer failed", zap.Error(err), zap.String("filename", fileName))
err1 := zw.Close()
if err1 != nil {
logutil.BgLogger().Error("[plan-replayer-dump] Closing zip writer failed", zap.Error(err), zap.String("filename", fileName))
errMsg = errMsg + "," + err1.Error()
}
err = zf.Close()
if err != nil {
logutil.BgLogger().Error("Closing zip file failed", zap.Error(err), zap.String("filename", fileName))
err2 := zf.Close()
if err2 != nil {
logutil.BgLogger().Error("[plan-replayer-dump] Closing zip file failed", zap.Error(err), zap.String("filename", fileName))
errMsg = errMsg + "," + err2.Error()
}
if len(errMsg) > 0 {
for i, record := range records {
record.FailedReason = err.Error()
record.FailedReason = errMsg
records[i] = record
}
}
insertPlanReplayerStatus(ctx, sctx, records)
}()

// Dump SQLMeta
if err = dumpSQLMeta(zw, task); err != nil {
return err
Expand Down Expand Up @@ -299,6 +331,8 @@ func dumpSQLMeta(zw *zip.Writer, task *PlanReplayerDumpTask) error {
varMap[PlanReplayerSQLMetaStartTS] = strconv.FormatUint(task.StartTS, 10)
varMap[PlanReplayerTaskMetaIsCapture] = strconv.FormatBool(task.IsCapture)
varMap[PlanReplayerTaskMetaIsContinues] = strconv.FormatBool(task.IsContinuesCapture)
varMap[PlanReplayerTaskMetaSQLDigest] = task.SQLDigest
varMap[PlanReplayerTaskMetaPlanDigest] = task.PlanDigest
if err := toml.NewEncoder(cf).Encode(varMap); err != nil {
return errors.AddStack(err)
}
Expand Down
47 changes: 31 additions & 16 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,16 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL {
if _, ok := stmtNode.(*ast.SelectStmt); ok {
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture {
checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS)
} else {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS)
}
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture {
checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS)
} else {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS)
}
}

return stmt, nil
}

Expand All @@ -183,17 +180,25 @@ func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode
if handle == nil {
return
}
captured := false
tasks := handle.GetTasks()
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx)
defer func() {
logutil.BgLogger().Debug("[plan-replayer-capture] check capture task",
zap.String("sql-digest", sqlDigest.String()),
zap.String("plan-digest", planDigest.String()),
zap.Int("tasks", len(tasks)),
zap.Bool("captured", captured))
}()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
for _, task := range tasks {
if task.SQLDigest == sqlDigest.String() {
if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() {
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false)
captured = sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false)
return
}
}
Expand All @@ -215,16 +220,26 @@ func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.Stm
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
captured := false
defer func() {
logutil.BgLogger().Debug("[plan-replayer-capture] check continues capture task",
zap.String("sql-digest", sqlDigest.String()),
zap.String("plan-digest", planDigest.String()),
zap.Bool("captured", captured))
}()

existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key)
if existed {
return
}
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true)
sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key)
captured = sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true)
if captured {
sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key)
}
}

func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode,
startTS uint64, isContinuesCapture bool) {
startTS uint64, isContinuesCapture bool) bool {
stmtCtx := sctx.GetSessionVars().StmtCtx
handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
dumpTask := &domain.PlanReplayerDumpTask{
Expand All @@ -239,7 +254,7 @@ func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.
IsCapture: true,
IsContinuesCapture: isContinuesCapture,
}
domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask)
return domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask)
}

// needLowerPriority checks whether it's needed to lower the execution priority
Expand Down
31 changes: 25 additions & 6 deletions statistics/handle/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -153,15 +155,32 @@ func (h *Handle) ClearOutdatedHistoryStats() error {
h.mu.Lock()
defer h.mu.Unlock()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
sql := "delete from mysql.stats_meta_history where NOW() - create_time >= %?"
_, err := exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
sql := "select count(*) from mysql.stats_meta_history where NOW() - create_time >= %?"
rs, err := exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
if err != nil {
return err
}
sql = "delete from mysql.stats_history where NOW() - create_time >= %? "
_, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
logutil.BgLogger().Info("clear outdated historical stats")
return err
if rs == nil {
return nil
}
var rows []chunk.Row
defer terror.Call(rs.Close)
if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil {
return errors.Trace(err)
}
count := rows[0].GetInt64(0)
if count > 0 {
sql = "delete from mysql.stats_meta_history where NOW() - create_time >= %?"
_, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
if err != nil {
return err
}
sql = "delete from mysql.stats_history where NOW() - create_time >= %? "
_, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
logutil.BgLogger().Info("clear outdated historical stats")
return err
}
return nil
}

func (h *Handle) gcHistoryStatsFromKV(physicalID int64) error {
Expand Down

0 comments on commit fc241b2

Please sign in to comment.