Skip to content

Commit

Permalink
*: support plan replayer load (#29247)
Browse files Browse the repository at this point in the history
  • Loading branch information
rebelice authored Nov 1, 2021
1 parent afa7dfb commit 2c69740
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 23 deletions.
15 changes: 10 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildLoadStats(v)
case *plannercore.IndexAdvise:
return b.buildIndexAdvise(v)
case *plannercore.PlanReplayerSingle:
return b.buildPlanReplayerSingle(v)
case *plannercore.PlanReplayer:
return b.buildPlanReplayer(v)
case *plannercore.PhysicalLimit:
return b.buildLimit(v)
case *plannercore.Prepare:
Expand Down Expand Up @@ -907,13 +907,18 @@ func (b *executorBuilder) buildIndexAdvise(v *plannercore.IndexAdvise) Executor
return e
}

func (b *executorBuilder) buildPlanReplayerSingle(v *plannercore.PlanReplayerSingle) Executor {
func (b *executorBuilder) buildPlanReplayer(v *plannercore.PlanReplayer) Executor {
if v.Load {
e := &PlanReplayerLoadExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
info: &PlanReplayerLoadInfo{Path: v.File, Ctx: b.ctx},
}
return e
}
e := &PlanReplayerSingleExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
ExecStmt: v.ExecStmt,
Analyze: v.Analyze,
Load: v.Load,
File: v.File,
}
return e
}
Expand Down
189 changes: 179 additions & 10 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor

import (
"archive/zip"
"bytes"
"context"
"crypto/rand"
"encoding/base64"
Expand All @@ -30,9 +31,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand All @@ -41,13 +44,14 @@ import (
"go.uber.org/zap"
)

var _ Executor = &PlanReplayerSingleExec{}
var _ Executor = &PlanReplayerLoadExec{}

// PlanReplayerSingleExec represents a plan replayer executor.
type PlanReplayerSingleExec struct {
baseExecutor
ExecStmt ast.StmtNode
Analyze bool
Load bool
File string

endFlag bool
}
Expand Down Expand Up @@ -270,7 +274,7 @@ func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *domain.Doma

func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error {
varMap := make(map[string]string)
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "show variables")
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show variables")
if err != nil {
return err
}
Expand All @@ -297,7 +301,7 @@ func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error {
}

func dumpSessionBindings(ctx sessionctx.Context, zw *zip.Writer) error {
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "show bindings")
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show bindings")
if err != nil {
return err
}
Expand All @@ -321,7 +325,7 @@ func dumpSessionBindings(ctx sessionctx.Context, zw *zip.Writer) error {
}

func dumpGlobalBindings(ctx sessionctx.Context, zw *zip.Writer) error {
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "show global bindings")
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show global bindings")
if err != nil {
return err
}
Expand Down Expand Up @@ -349,13 +353,13 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, sql string, isAnalyze b
var err error
if isAnalyze {
// Explain analyze
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), fmt.Sprintf("explain analyze %s", sql))
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain analyze %s", sql))
if err != nil {
return err
}
} else {
// Explain
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), fmt.Sprintf("explain %s", sql))
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain %s", sql))
if err != nil {
return err
}
Expand Down Expand Up @@ -400,7 +404,7 @@ func getStatsForTable(do *domain.Domain, pair tableNamePair) (*handle.JSONTable,
}

func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Context) error {
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), fmt.Sprintf("show create table `%v`.`%v`", pair.DBName, pair.TableName))
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("show create table `%v`.`%v`", pair.DBName, pair.TableName))
if err != nil {
return err
}
Expand All @@ -412,9 +416,11 @@ func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Conte
if err != nil {
return errors.AddStack(err)
}
for _, row := range sRows {
fmt.Fprintf(fw, "%s\n", strings.Join(row, "\t"))
if len(sRows) == 0 || len(sRows[0]) != 2 {
return errors.New(fmt.Sprintf("plan replayer: get create table %v.%v failed", pair.DBName, pair.TableName))
}
fmt.Fprintf(fw, "create database `%v`; use `%v`;", pair.DBName, pair.DBName)
fmt.Fprintf(fw, "%s", sRows[0][1])
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
Expand Down Expand Up @@ -474,3 +480,166 @@ func getRows(ctx context.Context, rs sqlexec.RecordSet) ([]chunk.Row, error) {
}
return rows, nil
}

// PlanReplayerLoadExec represents a plan replayer load executor.
type PlanReplayerLoadExec struct {
baseExecutor
info *PlanReplayerLoadInfo
}

// PlanReplayerLoadInfo contains file path and session context.
type PlanReplayerLoadInfo struct {
Path string
Ctx sessionctx.Context
}

type planReplayerLoadKeyType int

func (k planReplayerLoadKeyType) String() string {
return "plan_replayer_load_var"
}

// PlanReplayerLoadVarKey is a variable key for plan replayer load.
const PlanReplayerLoadVarKey planReplayerLoadKeyType = 0

// Next implements the Executor Next interface.
func (e *PlanReplayerLoadExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
if len(e.info.Path) == 0 {
return errors.New("plan replayer: file path is empty")
}
val := e.ctx.Value(PlanReplayerLoadVarKey)
if val != nil {
e.ctx.SetValue(PlanReplayerLoadVarKey, nil)
return errors.New("plan replayer: previous plan replayer load option isn't closed normally, please try again")
}
e.ctx.SetValue(PlanReplayerLoadVarKey, e.info)
return nil
}

func loadVariables(ctx sessionctx.Context, z *zip.Reader) error {
for _, zipFile := range z.File {
if strings.Compare(zipFile.Name, "variables.toml") == 0 {
varMap := make(map[string]string)
v, err := zipFile.Open()
if err != nil {
return errors.AddStack(err)
}
defer v.Close()
_, err = toml.DecodeReader(v, &varMap)
if err != nil {
return errors.AddStack(err)
}
vars := ctx.GetSessionVars()
for name, value := range varMap {
sysVar := variable.GetSysVar(name)
if sysVar == nil {
return variable.ErrUnknownSystemVar.GenWithStackByArgs(name)
}
sVal, err := sysVar.Validate(vars, value, variable.ScopeSession)
if err != nil {
logutil.BgLogger().Debug(fmt.Sprintf("skip variable %s:%s", name, value), zap.Error(err))
continue
}
err = vars.SetSystemVar(name, sVal)
if err != nil {
return err
}
}
}
}
return nil
}

func createSchemaAndTables(ctx sessionctx.Context, f *zip.File) error {
r, err := f.Open()
if err != nil {
return errors.AddStack(err)
}
defer r.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(r)
if err != nil {
return errors.AddStack(err)
}
sqls := strings.Split(buf.String(), ";")
if len(sqls) != 3 {
return errors.New("plan replayer: create schema and tables failed")
}
c := context.Background()
// create database
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[0])
logutil.BgLogger().Debug("plan replayer: skip error", zap.Error(err))
// use database
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[1])
if err != nil {
return err
}
// create table
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[2])
if err != nil {
return err
}
return nil
}

func loadStats(ctx sessionctx.Context, f *zip.File) error {
jsonTbl := &handle.JSONTable{}
r, err := f.Open()
if err != nil {
return errors.AddStack(err)
}
defer r.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(r)
if err != nil {
return errors.AddStack(err)
}
if err := json.Unmarshal(buf.Bytes(), jsonTbl); err != nil {
return errors.AddStack(err)
}
do := domain.GetDomain(ctx)
h := do.StatsHandle()
if h == nil {
return errors.New("plan replayer: hanlde is nil")
}
return h.LoadStatsFromJSON(ctx.GetInfoSchema().(infoschema.InfoSchema), jsonTbl)
}

// Update updates the data of the corresponding table.
func (e *PlanReplayerLoadInfo) Update(data []byte) error {
b := bytes.NewReader(data)
z, err := zip.NewReader(b, int64(len(data)))
if err != nil {
return errors.AddStack(err)
}

// load variable
err = loadVariables(e.Ctx, z)
if err != nil {
return err
}

// build schema and table
for _, zipFile := range z.File {
path := strings.Split(zipFile.Name, "/")
if len(path) == 2 && strings.Compare(path[0], "schema") == 0 {
err = createSchemaAndTables(e.Ctx, zipFile)
if err != nil {
return err
}
}
}

// load stats
for _, zipFile := range z.File {
path := strings.Split(zipFile.Name, "/")
if len(path) == 2 && strings.Compare(path[0], "stats") == 0 {
err = loadStats(e.Ctx, zipFile)
if err != nil {
return err
}
}
}
return nil
}
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,8 +965,8 @@ type LoadStats struct {
Path string
}

// PlanReplayerSingle represents a plan replayer plan.
type PlanReplayerSingle struct {
// PlanReplayer represents a plan replayer plan.
type PlanReplayer struct {
baseSchemaProducer
ExecStmt ast.StmtNode
Analyze bool
Expand Down
2 changes: 1 addition & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4356,7 +4356,7 @@ func buildShowSchema(s *ast.ShowStmt, isView bool, isSequence bool) (schema *exp
}

func (b *PlanBuilder) buildPlanReplayer(pc *ast.PlanReplayerStmt) Plan {
p := &PlanReplayerSingle{ExecStmt: pc.Stmt, Analyze: pc.Analyze, Load: pc.Load, File: pc.File}
p := &PlanReplayer{ExecStmt: pc.Stmt, Analyze: pc.Analyze, Load: pc.Load, File: pc.File}
schema := newColumnsWithNames(1)
schema.Append(buildColumnWithName("", "Dump_link", mysql.TypeVarchar, 128))
p.SetSchema(schema.col2Schema())
Expand Down
26 changes: 26 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1681,6 +1681,23 @@ func (cc *clientConn) handleIndexAdvise(ctx context.Context, indexAdviseInfo *ex
return nil
}

func (cc *clientConn) handlePlanReplayerLoad(ctx context.Context, planReplayerLoadInfo *executor.PlanReplayerLoadInfo) error {
if cc.capability&mysql.ClientLocalFiles == 0 {
return errNotAllowedCommand
}
if planReplayerLoadInfo == nil {
return errors.New("plan replayer load: info is empty")
}
data, err := cc.getDataFromPath(ctx, planReplayerLoadInfo.Path)
if err != nil {
return err
}
if len(data) == 0 {
return nil
}
return planReplayerLoadInfo.Update(data)
}

func (cc *clientConn) audit(eventType plugin.GeneralEvent) {
err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
audit := plugin.DeclareAuditManifest(p.Manifest)
Expand Down Expand Up @@ -1960,6 +1977,15 @@ func (cc *clientConn) handleQuerySpecial(ctx context.Context, status uint16) (bo
}
}

planReplayerLoad := cc.ctx.Value(executor.PlanReplayerLoadVarKey)
if planReplayerLoad != nil {
handled = true
defer cc.ctx.SetValue(executor.PlanReplayerLoadVarKey, nil)
if err := cc.handlePlanReplayerLoad(ctx, planReplayerLoad.(*executor.PlanReplayerLoadInfo)); err != nil {
return handled, err
}
}

return handled, cc.writeOkWith(ctx, cc.ctx.LastMessage(), cc.ctx.AffectedRows(), cc.ctx.LastInsertID(), status, cc.ctx.WarningCount())
}

Expand Down
Loading

0 comments on commit 2c69740

Please sign in to comment.