Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support plan replayer load #29247

Merged
merged 14 commits into from
Nov 1, 2021
15 changes: 10 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildLoadStats(v)
case *plannercore.IndexAdvise:
return b.buildIndexAdvise(v)
case *plannercore.PlanReplayerSingle:
return b.buildPlanReplayerSingle(v)
case *plannercore.PlanReplayer:
return b.buildPlanReplayer(v)
case *plannercore.PhysicalLimit:
return b.buildLimit(v)
case *plannercore.Prepare:
Expand Down Expand Up @@ -907,13 +907,18 @@ func (b *executorBuilder) buildIndexAdvise(v *plannercore.IndexAdvise) Executor
return e
}

func (b *executorBuilder) buildPlanReplayerSingle(v *plannercore.PlanReplayerSingle) Executor {
func (b *executorBuilder) buildPlanReplayer(v *plannercore.PlanReplayer) Executor {
if v.Load {
e := &PlanReplayerLoadExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
info: &PlanReplayerLoadInfo{Path: v.File, Ctx: b.ctx},
}
return e
}
e := &PlanReplayerSingleExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
ExecStmt: v.ExecStmt,
Analyze: v.Analyze,
Load: v.Load,
File: v.File,
}
return e
}
Expand Down
189 changes: 179 additions & 10 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor

import (
"archive/zip"
"bytes"
"context"
"crypto/rand"
"encoding/base64"
Expand All @@ -30,9 +31,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand All @@ -41,13 +44,14 @@ import (
"go.uber.org/zap"
)

var _ Executor = &PlanReplayerSingleExec{}
var _ Executor = &PlanReplayerLoadExec{}

// PlanReplayerSingleExec represents a plan replayer executor.
type PlanReplayerSingleExec struct {
baseExecutor
ExecStmt ast.StmtNode
Analyze bool
Load bool
File string

endFlag bool
}
Expand Down Expand Up @@ -270,7 +274,7 @@ func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *domain.Doma

func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error {
varMap := make(map[string]string)
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "show variables")
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show variables")
if err != nil {
return err
}
Expand All @@ -297,7 +301,7 @@ func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error {
}

func dumpSessionBindings(ctx sessionctx.Context, zw *zip.Writer) error {
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "show bindings")
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show bindings")
if err != nil {
return err
}
Expand All @@ -321,7 +325,7 @@ func dumpSessionBindings(ctx sessionctx.Context, zw *zip.Writer) error {
}

func dumpGlobalBindings(ctx sessionctx.Context, zw *zip.Writer) error {
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "show global bindings")
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show global bindings")
if err != nil {
return err
}
Expand Down Expand Up @@ -349,13 +353,13 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, sql string, isAnalyze b
var err error
if isAnalyze {
// Explain analyze
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), fmt.Sprintf("explain analyze %s", sql))
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain analyze %s", sql))
if err != nil {
return err
}
} else {
// Explain
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), fmt.Sprintf("explain %s", sql))
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain %s", sql))
if err != nil {
return err
}
Expand Down Expand Up @@ -400,7 +404,7 @@ func getStatsForTable(do *domain.Domain, pair tableNamePair) (*handle.JSONTable,
}

func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Context) error {
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), fmt.Sprintf("show create table `%v`.`%v`", pair.DBName, pair.TableName))
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("show create table `%v`.`%v`", pair.DBName, pair.TableName))
if err != nil {
return err
}
Expand All @@ -412,9 +416,11 @@ func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Conte
if err != nil {
return errors.AddStack(err)
}
for _, row := range sRows {
fmt.Fprintf(fw, "%s\n", strings.Join(row, "\t"))
if len(sRows) == 0 || len(sRows[0]) != 2 {
return errors.New(fmt.Sprintf("plan replayer: get create table %v.%v failed", pair.DBName, pair.TableName))
}
fmt.Fprintf(fw, "create database `%v`; use `%v`;", pair.DBName, pair.DBName)
fmt.Fprintf(fw, "%s", sRows[0][1])
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
Expand Down Expand Up @@ -474,3 +480,166 @@ func getRows(ctx context.Context, rs sqlexec.RecordSet) ([]chunk.Row, error) {
}
return rows, nil
}

// PlanReplayerLoadExec represents a plan replayer load executor.
type PlanReplayerLoadExec struct {
baseExecutor
info *PlanReplayerLoadInfo
}

// PlanReplayerLoadInfo contains file path and session context.
type PlanReplayerLoadInfo struct {
Path string
Ctx sessionctx.Context
}

type planReplayerLoadKeyType int

func (k planReplayerLoadKeyType) String() string {
return "plan_replayer_load_var"
}

// PlanReplayerLoadVarKey is a variable key for plan replayer load.
const PlanReplayerLoadVarKey planReplayerLoadKeyType = 0

// Next implements the Executor Next interface.
func (e *PlanReplayerLoadExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
if len(e.info.Path) == 0 {
return errors.New("plan replayer: file path is empty")
}
val := e.ctx.Value(PlanReplayerLoadVarKey)
if val != nil {
e.ctx.SetValue(PlanReplayerLoadVarKey, nil)
return errors.New("plan replayer: previous plan replayer load option isn't closed normally, please try again")
}
e.ctx.SetValue(PlanReplayerLoadVarKey, e.info)
return nil
}

func loadVariables(ctx sessionctx.Context, z *zip.Reader) error {
for _, zipFile := range z.File {
if strings.Compare(zipFile.Name, "variables.toml") == 0 {
varMap := make(map[string]string)
v, err := zipFile.Open()
if err != nil {
return errors.AddStack(err)
}
defer v.Close()
_, err = toml.DecodeReader(v, &varMap)
if err != nil {
return errors.AddStack(err)
}
vars := ctx.GetSessionVars()
for name, value := range varMap {
sysVar := variable.GetSysVar(name)
if sysVar == nil {
return variable.ErrUnknownSystemVar.GenWithStackByArgs(name)
}
sVal, err := sysVar.Validate(vars, value, variable.ScopeSession)
if err != nil {
logutil.BgLogger().Debug(fmt.Sprintf("skip variable %s:%s", name, value), zap.Error(err))
continue
}
err = vars.SetSystemVar(name, sVal)
if err != nil {
return err
}
}
}
}
return nil
}

func createSchemaAndTables(ctx sessionctx.Context, f *zip.File) error {
r, err := f.Open()
if err != nil {
return errors.AddStack(err)
}
defer r.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(r)
if err != nil {
return errors.AddStack(err)
}
sqls := strings.Split(buf.String(), ";")
if len(sqls) != 3 {
return errors.New("plan replayer: create schema and tables failed")
}
c := context.Background()
// create database
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[0])
logutil.BgLogger().Debug("plan replayer: skip error", zap.Error(err))
// use database
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[1])
if err != nil {
return err
}
// create table
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[2])
if err != nil {
return err
}
return nil
}

func loadStats(ctx sessionctx.Context, f *zip.File) error {
jsonTbl := &handle.JSONTable{}
r, err := f.Open()
if err != nil {
return errors.AddStack(err)
}
defer r.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(r)
if err != nil {
return errors.AddStack(err)
}
if err := json.Unmarshal(buf.Bytes(), jsonTbl); err != nil {
return errors.AddStack(err)
}
do := domain.GetDomain(ctx)
h := do.StatsHandle()
if h == nil {
return errors.New("plan replayer: hanlde is nil")
}
return h.LoadStatsFromJSON(ctx.GetInfoSchema().(infoschema.InfoSchema), jsonTbl)
}

// Update updates the data of the corresponding table.
func (e *PlanReplayerLoadInfo) Update(data []byte) error {
b := bytes.NewReader(data)
z, err := zip.NewReader(b, int64(len(data)))
if err != nil {
return errors.AddStack(err)
}

// load variable
err = loadVariables(e.Ctx, z)
if err != nil {
return err
}

// build schema and table
for _, zipFile := range z.File {
path := strings.Split(zipFile.Name, "/")
if len(path) == 2 && strings.Compare(path[0], "schema") == 0 {
err = createSchemaAndTables(e.Ctx, zipFile)
if err != nil {
return err
}
}
}

// load stats
for _, zipFile := range z.File {
path := strings.Split(zipFile.Name, "/")
if len(path) == 2 && strings.Compare(path[0], "stats") == 0 {
err = loadStats(e.Ctx, zipFile)
if err != nil {
return err
}
}
}
return nil
}
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,8 +965,8 @@ type LoadStats struct {
Path string
}

// PlanReplayerSingle represents a plan replayer plan.
type PlanReplayerSingle struct {
// PlanReplayer represents a plan replayer plan.
type PlanReplayer struct {
baseSchemaProducer
ExecStmt ast.StmtNode
Analyze bool
Expand Down
2 changes: 1 addition & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4356,7 +4356,7 @@ func buildShowSchema(s *ast.ShowStmt, isView bool, isSequence bool) (schema *exp
}

func (b *PlanBuilder) buildPlanReplayer(pc *ast.PlanReplayerStmt) Plan {
p := &PlanReplayerSingle{ExecStmt: pc.Stmt, Analyze: pc.Analyze, Load: pc.Load, File: pc.File}
p := &PlanReplayer{ExecStmt: pc.Stmt, Analyze: pc.Analyze, Load: pc.Load, File: pc.File}
schema := newColumnsWithNames(1)
schema.Append(buildColumnWithName("", "Dump_link", mysql.TypeVarchar, 128))
p.SetSchema(schema.col2Schema())
Expand Down
26 changes: 26 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1681,6 +1681,23 @@ func (cc *clientConn) handleIndexAdvise(ctx context.Context, indexAdviseInfo *ex
return nil
}

func (cc *clientConn) handlePlanReplayerLoad(ctx context.Context, planReplayerLoadInfo *executor.PlanReplayerLoadInfo) error {
if cc.capability&mysql.ClientLocalFiles == 0 {
return errNotAllowedCommand
}
if planReplayerLoadInfo == nil {
return errors.New("plan replayer load: info is empty")
}
data, err := cc.getDataFromPath(ctx, planReplayerLoadInfo.Path)
if err != nil {
return err
}
if len(data) == 0 {
return nil
}
return planReplayerLoadInfo.Update(data)
}

func (cc *clientConn) audit(eventType plugin.GeneralEvent) {
err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
audit := plugin.DeclareAuditManifest(p.Manifest)
Expand Down Expand Up @@ -1960,6 +1977,15 @@ func (cc *clientConn) handleQuerySpecial(ctx context.Context, status uint16) (bo
}
}

planReplayerLoad := cc.ctx.Value(executor.PlanReplayerLoadVarKey)
if planReplayerLoad != nil {
handled = true
defer cc.ctx.SetValue(executor.PlanReplayerLoadVarKey, nil)
if err := cc.handlePlanReplayerLoad(ctx, planReplayerLoad.(*executor.PlanReplayerLoadInfo)); err != nil {
return handled, err
}
}

return handled, cc.writeOkWith(ctx, cc.ctx.LastMessage(), cc.ctx.AffectedRows(), cc.ctx.LastInsertID(), status, cc.ctx.WarningCount())
}

Expand Down
Loading