From 2c69740fde0f708df54cb4d654012de42ce16b28 Mon Sep 17 00:00:00 2001 From: rebelice Date: Mon, 1 Nov 2021 11:22:51 +0800 Subject: [PATCH] *: support plan replayer load (#29247) --- executor/builder.go | 15 ++- executor/plan_replayer.go | 189 +++++++++++++++++++++++++++++++++-- planner/core/common_plans.go | 4 +- planner/core/planbuilder.go | 2 +- server/conn.go | 26 +++++ server/plan_replayer_test.go | 57 ++++++++++- session/session.go | 1 + 7 files changed, 271 insertions(+), 23 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index d064618fcebb7..d1231cffe1a37 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -164,8 +164,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor { return b.buildLoadStats(v) case *plannercore.IndexAdvise: return b.buildIndexAdvise(v) - case *plannercore.PlanReplayerSingle: - return b.buildPlanReplayerSingle(v) + case *plannercore.PlanReplayer: + return b.buildPlanReplayer(v) case *plannercore.PhysicalLimit: return b.buildLimit(v) case *plannercore.Prepare: @@ -907,13 +907,18 @@ func (b *executorBuilder) buildIndexAdvise(v *plannercore.IndexAdvise) Executor return e } -func (b *executorBuilder) buildPlanReplayerSingle(v *plannercore.PlanReplayerSingle) Executor { +func (b *executorBuilder) buildPlanReplayer(v *plannercore.PlanReplayer) Executor { + if v.Load { + e := &PlanReplayerLoadExec{ + baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()), + info: &PlanReplayerLoadInfo{Path: v.File, Ctx: b.ctx}, + } + return e + } e := &PlanReplayerSingleExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), ExecStmt: v.ExecStmt, Analyze: v.Analyze, - Load: v.Load, - File: v.File, } return e } diff --git a/executor/plan_replayer.go b/executor/plan_replayer.go index b6a0905452d69..a8b005b4470be 100644 --- a/executor/plan_replayer.go +++ b/executor/plan_replayer.go @@ -16,6 +16,7 @@ package executor import ( "archive/zip" + "bytes" "context" "crypto/rand" "encoding/base64" @@ -30,9 +31,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" @@ -41,13 +44,14 @@ import ( "go.uber.org/zap" ) +var _ Executor = &PlanReplayerSingleExec{} +var _ Executor = &PlanReplayerLoadExec{} + // PlanReplayerSingleExec represents a plan replayer executor. type PlanReplayerSingleExec struct { baseExecutor ExecStmt ast.StmtNode Analyze bool - Load bool - File string endFlag bool } @@ -270,7 +274,7 @@ func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *domain.Doma func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error { varMap := make(map[string]string) - recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "show variables") + recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show variables") if err != nil { return err } @@ -297,7 +301,7 @@ func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error { } func dumpSessionBindings(ctx sessionctx.Context, zw *zip.Writer) error { - recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "show bindings") + recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show bindings") if err != nil { return err } @@ -321,7 +325,7 @@ func dumpSessionBindings(ctx sessionctx.Context, zw *zip.Writer) error { } func dumpGlobalBindings(ctx sessionctx.Context, zw *zip.Writer) error { - recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "show global bindings") + recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show global bindings") if err != nil { return err } @@ -349,13 +353,13 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, sql string, isAnalyze b var err error if isAnalyze { // Explain analyze - recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), fmt.Sprintf("explain analyze %s", sql)) + recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain analyze %s", sql)) if err != nil { return err } } else { // Explain - recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), fmt.Sprintf("explain %s", sql)) + recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain %s", sql)) if err != nil { return err } @@ -400,7 +404,7 @@ func getStatsForTable(do *domain.Domain, pair tableNamePair) (*handle.JSONTable, } func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Context) error { - recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), fmt.Sprintf("show create table `%v`.`%v`", pair.DBName, pair.TableName)) + recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("show create table `%v`.`%v`", pair.DBName, pair.TableName)) if err != nil { return err } @@ -412,9 +416,11 @@ func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Conte if err != nil { return errors.AddStack(err) } - for _, row := range sRows { - fmt.Fprintf(fw, "%s\n", strings.Join(row, "\t")) + if len(sRows) == 0 || len(sRows[0]) != 2 { + return errors.New(fmt.Sprintf("plan replayer: get create table %v.%v failed", pair.DBName, pair.TableName)) } + fmt.Fprintf(fw, "create database `%v`; use `%v`;", pair.DBName, pair.DBName) + fmt.Fprintf(fw, "%s", sRows[0][1]) if len(recordSets) > 0 { if err := recordSets[0].Close(); err != nil { return err @@ -474,3 +480,166 @@ func getRows(ctx context.Context, rs sqlexec.RecordSet) ([]chunk.Row, error) { } return rows, nil } + +// PlanReplayerLoadExec represents a plan replayer load executor. +type PlanReplayerLoadExec struct { + baseExecutor + info *PlanReplayerLoadInfo +} + +// PlanReplayerLoadInfo contains file path and session context. +type PlanReplayerLoadInfo struct { + Path string + Ctx sessionctx.Context +} + +type planReplayerLoadKeyType int + +func (k planReplayerLoadKeyType) String() string { + return "plan_replayer_load_var" +} + +// PlanReplayerLoadVarKey is a variable key for plan replayer load. +const PlanReplayerLoadVarKey planReplayerLoadKeyType = 0 + +// Next implements the Executor Next interface. +func (e *PlanReplayerLoadExec) Next(ctx context.Context, req *chunk.Chunk) error { + req.GrowAndReset(e.maxChunkSize) + if len(e.info.Path) == 0 { + return errors.New("plan replayer: file path is empty") + } + val := e.ctx.Value(PlanReplayerLoadVarKey) + if val != nil { + e.ctx.SetValue(PlanReplayerLoadVarKey, nil) + return errors.New("plan replayer: previous plan replayer load option isn't closed normally, please try again") + } + e.ctx.SetValue(PlanReplayerLoadVarKey, e.info) + return nil +} + +func loadVariables(ctx sessionctx.Context, z *zip.Reader) error { + for _, zipFile := range z.File { + if strings.Compare(zipFile.Name, "variables.toml") == 0 { + varMap := make(map[string]string) + v, err := zipFile.Open() + if err != nil { + return errors.AddStack(err) + } + defer v.Close() + _, err = toml.DecodeReader(v, &varMap) + if err != nil { + return errors.AddStack(err) + } + vars := ctx.GetSessionVars() + for name, value := range varMap { + sysVar := variable.GetSysVar(name) + if sysVar == nil { + return variable.ErrUnknownSystemVar.GenWithStackByArgs(name) + } + sVal, err := sysVar.Validate(vars, value, variable.ScopeSession) + if err != nil { + logutil.BgLogger().Debug(fmt.Sprintf("skip variable %s:%s", name, value), zap.Error(err)) + continue + } + err = vars.SetSystemVar(name, sVal) + if err != nil { + return err + } + } + } + } + return nil +} + +func createSchemaAndTables(ctx sessionctx.Context, f *zip.File) error { + r, err := f.Open() + if err != nil { + return errors.AddStack(err) + } + defer r.Close() + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(r) + if err != nil { + return errors.AddStack(err) + } + sqls := strings.Split(buf.String(), ";") + if len(sqls) != 3 { + return errors.New("plan replayer: create schema and tables failed") + } + c := context.Background() + // create database + _, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[0]) + logutil.BgLogger().Debug("plan replayer: skip error", zap.Error(err)) + // use database + _, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[1]) + if err != nil { + return err + } + // create table + _, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[2]) + if err != nil { + return err + } + return nil +} + +func loadStats(ctx sessionctx.Context, f *zip.File) error { + jsonTbl := &handle.JSONTable{} + r, err := f.Open() + if err != nil { + return errors.AddStack(err) + } + defer r.Close() + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(r) + if err != nil { + return errors.AddStack(err) + } + if err := json.Unmarshal(buf.Bytes(), jsonTbl); err != nil { + return errors.AddStack(err) + } + do := domain.GetDomain(ctx) + h := do.StatsHandle() + if h == nil { + return errors.New("plan replayer: hanlde is nil") + } + return h.LoadStatsFromJSON(ctx.GetInfoSchema().(infoschema.InfoSchema), jsonTbl) +} + +// Update updates the data of the corresponding table. +func (e *PlanReplayerLoadInfo) Update(data []byte) error { + b := bytes.NewReader(data) + z, err := zip.NewReader(b, int64(len(data))) + if err != nil { + return errors.AddStack(err) + } + + // load variable + err = loadVariables(e.Ctx, z) + if err != nil { + return err + } + + // build schema and table + for _, zipFile := range z.File { + path := strings.Split(zipFile.Name, "/") + if len(path) == 2 && strings.Compare(path[0], "schema") == 0 { + err = createSchemaAndTables(e.Ctx, zipFile) + if err != nil { + return err + } + } + } + + // load stats + for _, zipFile := range z.File { + path := strings.Split(zipFile.Name, "/") + if len(path) == 2 && strings.Compare(path[0], "stats") == 0 { + err = loadStats(e.Ctx, zipFile) + if err != nil { + return err + } + } + } + return nil +} diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index fbcf238dbe46a..eb8555e961e00 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -965,8 +965,8 @@ type LoadStats struct { Path string } -// PlanReplayerSingle represents a plan replayer plan. -type PlanReplayerSingle struct { +// PlanReplayer represents a plan replayer plan. +type PlanReplayer struct { baseSchemaProducer ExecStmt ast.StmtNode Analyze bool diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index da83858283aac..4033bc7ff055c 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -4356,7 +4356,7 @@ func buildShowSchema(s *ast.ShowStmt, isView bool, isSequence bool) (schema *exp } func (b *PlanBuilder) buildPlanReplayer(pc *ast.PlanReplayerStmt) Plan { - p := &PlanReplayerSingle{ExecStmt: pc.Stmt, Analyze: pc.Analyze, Load: pc.Load, File: pc.File} + p := &PlanReplayer{ExecStmt: pc.Stmt, Analyze: pc.Analyze, Load: pc.Load, File: pc.File} schema := newColumnsWithNames(1) schema.Append(buildColumnWithName("", "Dump_link", mysql.TypeVarchar, 128)) p.SetSchema(schema.col2Schema()) diff --git a/server/conn.go b/server/conn.go index ba4ca6457e30c..652ae6da2f0ab 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1681,6 +1681,23 @@ func (cc *clientConn) handleIndexAdvise(ctx context.Context, indexAdviseInfo *ex return nil } +func (cc *clientConn) handlePlanReplayerLoad(ctx context.Context, planReplayerLoadInfo *executor.PlanReplayerLoadInfo) error { + if cc.capability&mysql.ClientLocalFiles == 0 { + return errNotAllowedCommand + } + if planReplayerLoadInfo == nil { + return errors.New("plan replayer load: info is empty") + } + data, err := cc.getDataFromPath(ctx, planReplayerLoadInfo.Path) + if err != nil { + return err + } + if len(data) == 0 { + return nil + } + return planReplayerLoadInfo.Update(data) +} + func (cc *clientConn) audit(eventType plugin.GeneralEvent) { err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error { audit := plugin.DeclareAuditManifest(p.Manifest) @@ -1960,6 +1977,15 @@ func (cc *clientConn) handleQuerySpecial(ctx context.Context, status uint16) (bo } } + planReplayerLoad := cc.ctx.Value(executor.PlanReplayerLoadVarKey) + if planReplayerLoad != nil { + handled = true + defer cc.ctx.SetValue(executor.PlanReplayerLoadVarKey, nil) + if err := cc.handlePlanReplayerLoad(ctx, planReplayerLoad.(*executor.PlanReplayerLoadInfo)); err != nil { + return handled, err + } + } + return handled, cc.writeOkWith(ctx, cc.ctx.LastMessage(), cc.ctx.AffectedRows(), cc.ctx.LastInsertID(), status, cc.ctx.WarningCount()) } diff --git a/server/plan_replayer_test.go b/server/plan_replayer_test.go index 7ba76ca7e52a8..903f771463ee8 100644 --- a/server/plan_replayer_test.go +++ b/server/plan_replayer_test.go @@ -15,14 +15,18 @@ package server import ( - "archive/zip" "bytes" "database/sql" + "io" "io/ioutil" + "os" "path/filepath" "testing" + "github.com/go-sql-driver/mysql" "github.com/gorilla/mux" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" ) @@ -50,8 +54,12 @@ func TestDumpPlanReplayerAPI(t *testing.T) { }() client.waitUntilServerOnline() + dom, err := session.GetDomain(store) + require.NoError(t, err) + statsHandler := &StatsHandler{dom} + planReplayerHandler := &PlanReplayerHandler{} - filename := prepareData4PlanReplayer(t, client) + filename := prepareData4PlanReplayer(t, client, statsHandler) router := mux.NewRouter() router.Handle("/plan_replayer/dump/{filename}", planReplayerHandler) @@ -65,13 +73,46 @@ func TestDumpPlanReplayerAPI(t *testing.T) { body, err := ioutil.ReadAll(resp0.Body) require.NoError(t, err) - zipReader, err := zip.NewReader(bytes.NewReader(body), int64(len(body))) + path := "/tmp/plan_replayer.zip" + fp, err := os.Create(path) require.NoError(t, err) + require.NotNil(t, fp) + defer func() { + require.NoError(t, fp.Close()) + require.NoError(t, os.Remove(path)) + }() - require.Equal(t, len(zipReader.File), 9) + _, err = io.Copy(fp, bytes.NewReader(body)) + require.NoError(t, err) + require.NoError(t, fp.Sync()) + + db, err := sql.Open("mysql", client.getDSN(func(config *mysql.Config) { + config.AllowAllFiles = true + })) + require.NoError(t, err, "Error connecting") + defer func() { + err := db.Close() + require.NoError(t, err) + }() + tk := testkit.NewDBTestKit(t, db) + + tk.MustExec("use planReplayer") + tk.MustExec("drop table planReplayer.t") + tk.MustExec(`plan replayer load "/tmp/plan_replayer.zip"`) + rows := tk.MustQuery("show stats_meta") + require.True(t, rows.Next(), "unexpected data") + var dbName, tableName string + var modifyCount, count int64 + var other interface{} + err = rows.Scan(&dbName, &tableName, &other, &other, &modifyCount, &count) + require.NoError(t, err) + require.Equal(t, "planReplayer", dbName) + require.Equal(t, "t", tableName) + require.Equal(t, int64(4), modifyCount) + require.Equal(t, int64(8), count) } -func prepareData4PlanReplayer(t *testing.T, client *testServerClient) string { +func prepareData4PlanReplayer(t *testing.T, client *testServerClient, statHandle *StatsHandler) string { db, err := sql.Open("mysql", client.getDSN()) require.NoError(t, err, "Error connecting") defer func() { @@ -80,11 +121,17 @@ func prepareData4PlanReplayer(t *testing.T, client *testServerClient) string { }() tk := testkit.NewDBTestKit(t, db) + h := statHandle.do.StatsHandle() tk.MustExec("create database planReplayer") tk.MustExec("use planReplayer") tk.MustExec("create table t(a int)") + err = h.HandleDDLEvent(<-h.DDLEventCh()) + require.NoError(t, err) tk.MustExec("insert into t values(1), (2), (3), (4)") + require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) tk.MustExec("analyze table t") + tk.MustExec("insert into t values(5), (6), (7), (8)") + require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) rows := tk.MustQuery("plan replayer dump explain select * from t") require.True(t, rows.Next(), "unexpected data") var filename string diff --git a/session/session.go b/session/session.go index 27782745fcc94..090b6c6f31caf 100644 --- a/session/session.go +++ b/session/session.go @@ -1634,6 +1634,7 @@ var querySpecialKeys = []fmt.Stringer{ executor.LoadDataVarKey, executor.LoadStatsVarKey, executor.IndexAdviseVarKey, + executor.PlanReplayerLoadVarKey, } func (s *session) hasQuerySpecial() bool {