Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support plan replayer load #29247

Merged
merged 14 commits into from
Nov 1, 2021
15 changes: 10 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildLoadStats(v)
case *plannercore.IndexAdvise:
return b.buildIndexAdvise(v)
case *plannercore.PlanReplayerSingle:
return b.buildPlanReplayerSingle(v)
case *plannercore.PlanReplayer:
return b.buildPlanReplayer(v)
case *plannercore.PhysicalLimit:
return b.buildLimit(v)
case *plannercore.Prepare:
Expand Down Expand Up @@ -906,13 +906,18 @@ func (b *executorBuilder) buildIndexAdvise(v *plannercore.IndexAdvise) Executor
return e
}

func (b *executorBuilder) buildPlanReplayerSingle(v *plannercore.PlanReplayerSingle) Executor {
func (b *executorBuilder) buildPlanReplayer(v *plannercore.PlanReplayer) Executor {
if v.Load {
e := &PlanReplayerLoadExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
info: &PlanReplayerLoadInfo{Path: v.File, Ctx: b.ctx},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why hold b.ctx twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PlanReplayerLoadInfo will use Ctx when alone. You can see handlePlanReplayerLoad for details.

}
return e
}
e := &PlanReplayerSingleExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
ExecStmt: v.ExecStmt,
Analyze: v.Analyze,
Load: v.Load,
File: v.File,
}
return e
}
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9289,7 +9289,7 @@ func (s *testSuiteWithData) TestPlanReplayerDumpSingle(c *C) {
res := tk.MustQuery("plan replayer dump explain select * from t_dump_single")
path := s.testData.ConvertRowsToStrings(res.Rows())

reader, err := zip.OpenReader(filepath.Join(domain.GetPlanReplayerDirName(), path[0]))
reader, err := zip.OpenReader(path[0])
c.Assert(err, IsNil)
defer reader.Close()
for _, file := range reader.File {
Expand Down
162 changes: 158 additions & 4 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor

import (
"archive/zip"
"bytes"
"context"
"crypto/rand"
"encoding/base64"
Expand All @@ -33,21 +34,24 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/printer"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)

var _ Executor = &PlanReplayerSingleExec{}
var _ Executor = &PlanReplayerLoadExec{}

// PlanReplayerSingleExec represents a plan replayer executor.
type PlanReplayerSingleExec struct {
baseExecutor
ExecStmt ast.StmtNode
Analyze bool
Load bool
File string

endFlag bool
}
Expand Down Expand Up @@ -412,9 +416,11 @@ func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Conte
if err != nil {
return errors.AddStack(err)
}
for _, row := range sRows {
fmt.Fprintf(fw, "%s\n", strings.Join(row, "\t"))
if len(sRows) == 0 || len(sRows[0]) != 2 {
return errors.New(fmt.Sprintf("plan replayer: get create table %v.%v failed", pair.DBName, pair.TableName))
}
fmt.Fprintf(fw, "create database `%v`; use `%v`;", pair.DBName, pair.DBName)
fmt.Fprintf(fw, "%s", sRows[0][1])
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
Expand Down Expand Up @@ -474,3 +480,151 @@ func getRows(ctx context.Context, rs sqlexec.RecordSet) ([]chunk.Row, error) {
}
return rows, nil
}

type PlanReplayerLoadExec struct {
baseExecutor
info *PlanReplayerLoadInfo
}

type PlanReplayerLoadInfo struct {
Path string
Ctx sessionctx.Context
}

type planReplayerLoadKeyType int

func (k planReplayerLoadKeyType) String() string {
return "plan_replayer_load_var"
}

const PlanReplayerLoadVarKey planReplayerLoadKeyType = 0

func (e *PlanReplayerLoadExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
if len(e.info.Path) == 0 {
return errors.New("plan replayer: file path is empty")
}
val := e.ctx.Value(PlanReplayerLoadVarKey)
if val != nil {
e.ctx.SetValue(PlanReplayerLoadVarKey, nil)
return errors.New("plan replayer: previous plan replayer load option isn't closed normally")
rebelice marked this conversation as resolved.
Show resolved Hide resolved
}
e.ctx.SetValue(PlanReplayerLoadVarKey, e.info)
return nil
}

func loadVariables(ctx sessionctx.Context, z *zip.Reader) error {
for _, zipFile := range z.File {
if strings.Compare(zipFile.Name, "variables.toml") == 0 {
varMap := make(map[string]string)
v, err := zipFile.Open()
if err != nil {
return errors.AddStack(err)
}
defer v.Close()
_, err = toml.DecodeReader(v, &varMap)
if err != nil {
return errors.AddStack(err)
}
vars := ctx.GetSessionVars()
for name, value := range varMap {
sysVar := variable.GetSysVar(name)
if sysVar == nil {
return variable.ErrUnknownSystemVar.GenWithStackByArgs(name)
}
sVal, err := sysVar.Validate(vars, value, variable.ScopeSession)
if err != nil {
logutil.BgLogger().Debug(fmt.Sprintf("skip variable %s:%s", name, value), zap.Error(err))
continue
}
err = vars.SetSystemVar(name, sVal)
}
}
}
return nil
}

func createSchemaAndTables(ctx sessionctx.Context, f *zip.File) error {
r, err := f.Open()
if err != nil {
return errors.AddStack(err)
}
defer r.Close()
buf := new(bytes.Buffer)
buf.ReadFrom(r)
sqls := strings.Split(buf.String(), ";")
if len(sqls) != 3 {
return errors.New("plan replayer: create schema and tables failed")
}
// create database
_, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sqls[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use sessioncontext's ctx instead of context.TODO()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the meanings of the two are different. Refer to other usage methods, I think it is more appropriate to use context.Background(). Updated.

logutil.BgLogger().Debug("plan replayer: skip error", zap.Error(err))
// use database
_, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sqls[1])
if err != nil {
return err
}
// create table
_, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sqls[2])
if err != nil {
return err
}
return nil
}

func loadStats(ctx sessionctx.Context, f *zip.File) error {
jsonTbl := &handle.JSONTable{}
r, err := f.Open()
if err != nil {
return errors.AddStack(err)
}
defer r.Close()
buf := new(bytes.Buffer)
buf.ReadFrom(r)
if err := json.Unmarshal(buf.Bytes(), jsonTbl); err != nil {
return errors.AddStack(err)
}
do := domain.GetDomain(ctx)
h := do.StatsHandle()
if h == nil {
return errors.New("plan replayer: hanlde is nil")
}
return h.LoadStatsFromJSON(ctx.GetInfoSchema().(infoschema.InfoSchema), jsonTbl)
}

func (e *PlanReplayerLoadInfo) Update(data []byte) error{
b := bytes.NewReader(data)
z, err := zip.NewReader(b, int64(len(data)))
if err != nil {
return errors.AddStack(err)
}

// load variable
err = loadVariables(e.Ctx, z)
if err != nil {
return err
}

// build schema and table
for _, zipFile := range z.File {
path := strings.Split(zipFile.Name, "/")
if len(path) == 2 && strings.Compare(path[0], "schema") == 0 {
err = createSchemaAndTables(e.Ctx, zipFile)
if err != nil {
return err
}
}
}

// load stats
for _, zipFile := range z.File {
path := strings.Split(zipFile.Name, "/")
if len(path) == 2 && strings.Compare(path[0], "stats") == 0 {
err = loadStats(e.Ctx, zipFile)
if err != nil {
return err
}
}
}
return nil
}
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,8 +965,8 @@ type LoadStats struct {
Path string
}

// PlanReplayerSingle represents a plan replayer plan.
type PlanReplayerSingle struct {
// PlanReplayer represents a plan replayer plan.
type PlanReplayer struct {
baseSchemaProducer
ExecStmt ast.StmtNode
Analyze bool
Expand Down
2 changes: 1 addition & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4356,7 +4356,7 @@ func buildShowSchema(s *ast.ShowStmt, isView bool, isSequence bool) (schema *exp
}

func (b *PlanBuilder) buildPlanReplayer(pc *ast.PlanReplayerStmt) Plan {
p := &PlanReplayerSingle{ExecStmt: pc.Stmt, Analyze: pc.Analyze, Load: pc.Load, File: pc.File}
p := &PlanReplayer{ExecStmt: pc.Stmt, Analyze: pc.Analyze, Load: pc.Load, File: pc.File}
schema := newColumnsWithNames(1)
schema.Append(buildColumnWithName("", "Dump_link", mysql.TypeVarchar, 128))
p.SetSchema(schema.col2Schema())
Expand Down
26 changes: 26 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1681,6 +1681,23 @@ func (cc *clientConn) handleIndexAdvise(ctx context.Context, indexAdviseInfo *ex
return nil
}

func (cc *clientConn) handlePlanReplayerLoad(ctx context.Context, planReplayerLoadInfo *executor.PlanReplayerLoadInfo) error {
if cc.capability&mysql.ClientLocalFiles == 0 {
return errNotAllowedCommand
}
if planReplayerLoadInfo == nil {
return errors.New("plan replayer load: info is empty")
}
data, err := cc.getDataFromPath(ctx, planReplayerLoadInfo.Path)
if err != nil {
return err
}
if len(data) == 0 {
return nil
}
return planReplayerLoadInfo.Update(data)
}

func (cc *clientConn) audit(eventType plugin.GeneralEvent) {
err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
audit := plugin.DeclareAuditManifest(p.Manifest)
Expand Down Expand Up @@ -1960,6 +1977,15 @@ func (cc *clientConn) handleQuerySpecial(ctx context.Context, status uint16) (bo
}
}

planReplayerLoad := cc.ctx.Value(executor.PlanReplayerLoadVarKey)
if planReplayerLoad != nil {
handled = true
defer cc.ctx.SetValue(executor.PlanReplayerLoadVarKey, nil)
if err := cc.handlePlanReplayerLoad(ctx, planReplayerLoad.(*executor.PlanReplayerLoadInfo)); err != nil {
return handled, err
}
}

return handled, cc.writeOkWith(ctx, cc.ctx.LastMessage(), cc.ctx.AffectedRows(), cc.ctx.LastInsertID(), status, cc.ctx.WarningCount())
}

Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,7 @@ var querySpecialKeys = []fmt.Stringer{
executor.LoadDataVarKey,
executor.LoadStatsVarKey,
executor.IndexAdviseVarKey,
executor.PlanReplayerLoadVarKey,
}

func (s *session) hasQuerySpecial() bool {
Expand Down