-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: support plan replayer load #29247
Changes from 5 commits
4a0b926
2849b9c
365d045
4192582
5218642
70e9218
79a0f39
66549a0
3dd09ab
ec5dca8
bbf2175
718199a
a53897a
29c6ef9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ package executor | |
|
||
import ( | ||
"archive/zip" | ||
"bytes" | ||
"context" | ||
"crypto/rand" | ||
"encoding/base64" | ||
|
@@ -33,21 +34,24 @@ import ( | |
"github.com/pingcap/tidb/parser/ast" | ||
"github.com/pingcap/tidb/parser/model" | ||
"github.com/pingcap/tidb/sessionctx" | ||
"github.com/pingcap/tidb/sessionctx/variable" | ||
"github.com/pingcap/tidb/statistics/handle" | ||
"github.com/pingcap/tidb/infoschema" | ||
"github.com/pingcap/tidb/util/chunk" | ||
"github.com/pingcap/tidb/util/logutil" | ||
"github.com/pingcap/tidb/util/printer" | ||
"github.com/pingcap/tidb/util/sqlexec" | ||
"go.uber.org/zap" | ||
) | ||
|
||
var _ Executor = &PlanReplayerSingleExec{} | ||
var _ Executor = &PlanReplayerLoadExec{} | ||
|
||
// PlanReplayerSingleExec represents a plan replayer executor. | ||
type PlanReplayerSingleExec struct { | ||
baseExecutor | ||
ExecStmt ast.StmtNode | ||
Analyze bool | ||
Load bool | ||
File string | ||
|
||
endFlag bool | ||
} | ||
|
@@ -412,9 +416,11 @@ func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Conte | |
if err != nil { | ||
return errors.AddStack(err) | ||
} | ||
for _, row := range sRows { | ||
fmt.Fprintf(fw, "%s\n", strings.Join(row, "\t")) | ||
if len(sRows) == 0 || len(sRows[0]) != 2 { | ||
return errors.New(fmt.Sprintf("plan replayer: get create table %v.%v failed", pair.DBName, pair.TableName)) | ||
} | ||
fmt.Fprintf(fw, "create database `%v`; use `%v`;", pair.DBName, pair.DBName) | ||
fmt.Fprintf(fw, "%s", sRows[0][1]) | ||
if len(recordSets) > 0 { | ||
if err := recordSets[0].Close(); err != nil { | ||
return err | ||
|
@@ -474,3 +480,151 @@ func getRows(ctx context.Context, rs sqlexec.RecordSet) ([]chunk.Row, error) { | |
} | ||
return rows, nil | ||
} | ||
|
||
type PlanReplayerLoadExec struct { | ||
baseExecutor | ||
info *PlanReplayerLoadInfo | ||
} | ||
|
||
type PlanReplayerLoadInfo struct { | ||
Path string | ||
Ctx sessionctx.Context | ||
} | ||
|
||
type planReplayerLoadKeyType int | ||
|
||
func (k planReplayerLoadKeyType) String() string { | ||
return "plan_replayer_load_var" | ||
} | ||
|
||
const PlanReplayerLoadVarKey planReplayerLoadKeyType = 0 | ||
|
||
func (e *PlanReplayerLoadExec) Next(ctx context.Context, req *chunk.Chunk) error { | ||
req.GrowAndReset(e.maxChunkSize) | ||
if len(e.info.Path) == 0 { | ||
return errors.New("plan replayer: file path is empty") | ||
} | ||
val := e.ctx.Value(PlanReplayerLoadVarKey) | ||
if val != nil { | ||
e.ctx.SetValue(PlanReplayerLoadVarKey, nil) | ||
return errors.New("plan replayer: previous plan replayer load option isn't closed normally") | ||
rebelice marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
e.ctx.SetValue(PlanReplayerLoadVarKey, e.info) | ||
return nil | ||
} | ||
|
||
func loadVariables(ctx sessionctx.Context, z *zip.Reader) error { | ||
for _, zipFile := range z.File { | ||
if strings.Compare(zipFile.Name, "variables.toml") == 0 { | ||
varMap := make(map[string]string) | ||
v, err := zipFile.Open() | ||
if err != nil { | ||
return errors.AddStack(err) | ||
} | ||
defer v.Close() | ||
_, err = toml.DecodeReader(v, &varMap) | ||
if err != nil { | ||
return errors.AddStack(err) | ||
} | ||
vars := ctx.GetSessionVars() | ||
for name, value := range varMap { | ||
sysVar := variable.GetSysVar(name) | ||
if sysVar == nil { | ||
return variable.ErrUnknownSystemVar.GenWithStackByArgs(name) | ||
} | ||
sVal, err := sysVar.Validate(vars, value, variable.ScopeSession) | ||
if err != nil { | ||
logutil.BgLogger().Debug(fmt.Sprintf("skip variable %s:%s", name, value), zap.Error(err)) | ||
continue | ||
} | ||
err = vars.SetSystemVar(name, sVal) | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func createSchemaAndTables(ctx sessionctx.Context, f *zip.File) error { | ||
r, err := f.Open() | ||
if err != nil { | ||
return errors.AddStack(err) | ||
} | ||
defer r.Close() | ||
buf := new(bytes.Buffer) | ||
buf.ReadFrom(r) | ||
sqls := strings.Split(buf.String(), ";") | ||
if len(sqls) != 3 { | ||
return errors.New("plan replayer: create schema and tables failed") | ||
} | ||
// create database | ||
_, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sqls[0]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not use sessioncontext's ctx instead of context.TODO()? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the meanings of the two are different. Refer to other usage methods, I think it is more appropriate to use |
||
logutil.BgLogger().Debug("plan replayer: skip error", zap.Error(err)) | ||
// use database | ||
_, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sqls[1]) | ||
if err != nil { | ||
return err | ||
} | ||
// create table | ||
_, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sqls[2]) | ||
if err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func loadStats(ctx sessionctx.Context, f *zip.File) error { | ||
jsonTbl := &handle.JSONTable{} | ||
r, err := f.Open() | ||
if err != nil { | ||
return errors.AddStack(err) | ||
} | ||
defer r.Close() | ||
buf := new(bytes.Buffer) | ||
buf.ReadFrom(r) | ||
if err := json.Unmarshal(buf.Bytes(), jsonTbl); err != nil { | ||
return errors.AddStack(err) | ||
} | ||
do := domain.GetDomain(ctx) | ||
h := do.StatsHandle() | ||
if h == nil { | ||
return errors.New("plan replayer: hanlde is nil") | ||
} | ||
return h.LoadStatsFromJSON(ctx.GetInfoSchema().(infoschema.InfoSchema), jsonTbl) | ||
} | ||
|
||
func (e *PlanReplayerLoadInfo) Update(data []byte) error{ | ||
b := bytes.NewReader(data) | ||
z, err := zip.NewReader(b, int64(len(data))) | ||
if err != nil { | ||
return errors.AddStack(err) | ||
} | ||
|
||
// load variable | ||
err = loadVariables(e.Ctx, z) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// build schema and table | ||
for _, zipFile := range z.File { | ||
path := strings.Split(zipFile.Name, "/") | ||
if len(path) == 2 && strings.Compare(path[0], "schema") == 0 { | ||
err = createSchemaAndTables(e.Ctx, zipFile) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
|
||
// load stats | ||
for _, zipFile := range z.File { | ||
path := strings.Split(zipFile.Name, "/") | ||
if len(path) == 2 && strings.Compare(path[0], "stats") == 0 { | ||
err = loadStats(e.Ctx, zipFile) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why hold b.ctx twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PlanReplayerLoadInfo
will useCtx
when alone. You can seehandlePlanReplayerLoad
for details.