Skip to content

Commit 73692d1

Browse files
lysuzz-jason
authored andcommitted
executor: improve wide table insert & update performance (#7935) (#8024)
1 parent 80e7584 commit 73692d1

File tree

6 files changed

+70
-47
lines changed

6 files changed

+70
-47
lines changed

executor/builder.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,11 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
548548
hasRefCols: v.NeedFillDefaultValue,
549549
SelectExec: selectExec,
550550
}
551+
err := ivs.initInsertColumns()
552+
if err != nil {
553+
b.err = err
554+
return nil
555+
}
551556

552557
if v.IsReplace {
553558
return b.buildReplace(ivs)
@@ -572,25 +577,23 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
572577
GenColumns: v.GenCols.Columns,
573578
GenExprs: v.GenCols.Exprs,
574579
}
575-
tableCols := tbl.Cols()
576-
columns, err := insertVal.getColumns(tableCols)
580+
err := insertVal.initInsertColumns()
577581
if err != nil {
578-
b.err = errors.Trace(err)
582+
b.err = err
579583
return nil
580584
}
581585
loadDataExec := &LoadDataExec{
582586
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()),
583587
IsLocal: v.IsLocal,
584588
loadDataInfo: &LoadDataInfo{
585-
row: make([]types.Datum, len(columns)),
589+
row: make([]types.Datum, len(insertVal.insertColumns)),
586590
InsertValues: insertVal,
587591
Path: v.Path,
588592
Table: tbl,
589593
FieldsInfo: v.FieldsInfo,
590594
LinesInfo: v.LinesInfo,
591595
IgnoreLines: v.IgnoreLines,
592596
Ctx: b.ctx,
593-
columns: columns,
594597
},
595598
}
596599

executor/insert.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,10 @@ func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error {
129129
// Next implements Exec Next interface.
130130
func (e *InsertExec) Next(ctx context.Context, chk *chunk.Chunk) error {
131131
chk.Reset()
132-
cols, err := e.getColumns(e.Table.Cols())
133-
if err != nil {
134-
return errors.Trace(err)
135-
}
136-
137132
if len(e.children) > 0 && e.children[0] != nil {
138-
return errors.Trace(e.insertRowsFromSelect(ctx, cols, e.exec))
133+
return e.insertRowsFromSelect(ctx, e.exec)
139134
}
140-
return errors.Trace(e.insertRows(cols, e.exec))
135+
return e.insertRows(e.exec)
141136
}
142137

143138
// Close implements the Executor Close interface.
@@ -154,6 +149,7 @@ func (e *InsertExec) Open(ctx context.Context) error {
154149
if e.SelectExec != nil {
155150
return e.SelectExec.Open(ctx)
156151
}
152+
e.initEvalBuffer()
157153
return nil
158154
}
159155

executor/insert_common.go

+45-22
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,13 @@ type InsertValues struct {
5050
GenColumns []*ast.ColumnName
5151
GenExprs []expression.Expression
5252

53+
insertColumns []*table.Column
54+
5355
// colDefaultVals is used to store casted default value.
5456
// Because not every insert statement needs colDefaultVals, so we will init the buffer lazily.
55-
colDefaultVals []defaultVal
57+
colDefaultVals []defaultVal
58+
evalBuffer chunk.MutRow
59+
evalBufferTypes []*types.FieldType
5660
}
5761

5862
type defaultVal struct {
@@ -61,16 +65,18 @@ type defaultVal struct {
6165
valid bool
6266
}
6367

64-
// getColumns gets the explicitly specified columns of an insert statement. There are three cases:
68+
// initInsertColumns sets the explicitly specified columns of an insert statement. There are three cases:
6569
// There are three types of insert statements:
6670
// 1 insert ... values(...) --> name type column
6771
// 2 insert ... set x=y... --> set type column
6872
// 3 insert ... (select ..) --> name type column
6973
// See https://dev.mysql.com/doc/refman/5.7/en/insert.html
70-
func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, error) {
74+
func (e *InsertValues) initInsertColumns() error {
7175
var cols []*table.Column
7276
var err error
7377

78+
tableCols := e.Table.Cols()
79+
7480
if len(e.SetList) > 0 {
7581
// Process `set` type column.
7682
columns := make([]string, 0, len(e.SetList))
@@ -82,10 +88,10 @@ func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, e
8288
}
8389
cols, err = table.FindCols(tableCols, columns, e.Table.Meta().PKIsHandle)
8490
if err != nil {
85-
return nil, errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err)
91+
return errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err)
8692
}
8793
if len(cols) == 0 {
88-
return nil, errors.Errorf("INSERT INTO %s: empty column", e.Table.Meta().Name.O)
94+
return errors.Errorf("INSERT INTO %s: empty column", e.Table.Meta().Name.O)
8995
}
9096
} else if len(e.Columns) > 0 {
9197
// Process `name` type column.
@@ -98,7 +104,7 @@ func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, e
98104
}
99105
cols, err = table.FindCols(tableCols, columns, e.Table.Meta().PKIsHandle)
100106
if err != nil {
101-
return nil, errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err)
107+
return errors.Errorf("INSERT INTO %s: %s", e.Table.Meta().Name.O, err)
102108
}
103109
} else {
104110
// If e.Columns are empty, use all columns instead.
@@ -114,10 +120,25 @@ func (e *InsertValues) getColumns(tableCols []*table.Column) ([]*table.Column, e
114120
// Check column whether is specified only once.
115121
err = table.CheckOnce(cols)
116122
if err != nil {
117-
return nil, errors.Trace(err)
123+
return err
118124
}
125+
e.insertColumns = cols
126+
return nil
127+
}
119128

120-
return cols, nil
129+
func (e *InsertValues) initEvalBuffer() {
130+
numCols := len(e.Table.Cols())
131+
if e.hasExtraHandle {
132+
numCols++
133+
}
134+
e.evalBufferTypes = make([]*types.FieldType, numCols)
135+
for i, col := range e.Table.Cols() {
136+
e.evalBufferTypes[i] = &col.FieldType
137+
}
138+
if e.hasExtraHandle {
139+
e.evalBufferTypes[len(e.evalBufferTypes)-1] = types.NewFieldType(mysql.TypeLonglong)
140+
}
141+
e.evalBuffer = chunk.MutRowFromTypes(e.evalBufferTypes)
121142
}
122143

123144
func (e *InsertValues) lazilyInitColDefaultValBuf() (ok bool) {
@@ -150,7 +171,7 @@ func (e *InsertValues) processSetList() error {
150171
}
151172

152173
// insertRows processes `insert|replace into values ()` or `insert|replace into set x=y`
153-
func (e *InsertValues) insertRows(cols []*table.Column, exec func(rows [][]types.Datum) error) (err error) {
174+
func (e *InsertValues) insertRows(exec func(rows [][]types.Datum) error) (err error) {
154175
// For `insert|replace into set x=y`, process the set list here.
155176
if err = e.processSetList(); err != nil {
156177
return errors.Trace(err)
@@ -159,7 +180,7 @@ func (e *InsertValues) insertRows(cols []*table.Column, exec func(rows [][]types
159180
rows := make([][]types.Datum, 0, len(e.Lists))
160181
for i, list := range e.Lists {
161182
e.rowCount++
162-
row, err := e.evalRow(cols, list, i)
183+
row, err := e.evalRow(list, i)
163184
if err != nil {
164185
return errors.Trace(err)
165186
}
@@ -189,7 +210,7 @@ func (e *InsertValues) handleErr(col *table.Column, val *types.Datum, rowIdx int
189210

190211
// evalRow evaluates a to-be-inserted row. The value of the column may base on another column,
191212
// so we use setValueForRefColumn to fill the empty row some default values when needFillDefaultValues is true.
192-
func (e *InsertValues) evalRow(cols []*table.Column, list []expression.Expression, rowIdx int) ([]types.Datum, error) {
213+
func (e *InsertValues) evalRow(list []expression.Expression, rowIdx int) ([]types.Datum, error) {
193214
rowLen := len(e.Table.Cols())
194215
if e.hasExtraHandle {
195216
rowLen++
@@ -204,18 +225,20 @@ func (e *InsertValues) evalRow(cols []*table.Column, list []expression.Expressio
204225
}
205226
}
206227

228+
e.evalBuffer.SetDatums(row...)
207229
for i, expr := range list {
208-
val, err := expr.Eval(chunk.MutRowFromDatums(row).ToRow())
209-
if err = e.handleErr(cols[i], &val, rowIdx, err); err != nil {
230+
val, err := expr.Eval(e.evalBuffer.ToRow())
231+
if err = e.handleErr(e.insertColumns[i], &val, rowIdx, err); err != nil {
210232
return nil, errors.Trace(err)
211233
}
212-
val1, err := table.CastValue(e.ctx, val, cols[i].ToInfo())
213-
if err = e.handleErr(cols[i], &val, rowIdx, err); err != nil {
234+
val1, err := table.CastValue(e.ctx, val, e.insertColumns[i].ToInfo())
235+
if err = e.handleErr(e.insertColumns[i], &val, rowIdx, err); err != nil {
214236
return nil, errors.Trace(err)
215237
}
216238

217-
offset := cols[i].Offset
218-
row[offset], hasValue[offset] = val1, true
239+
offset := e.insertColumns[i].Offset
240+
row[offset], hasValue[offset] = *val1.Copy(), true
241+
e.evalBuffer.SetDatum(offset, val1)
219242
}
220243

221244
return e.fillRow(row, hasValue)
@@ -251,7 +274,7 @@ func (e *InsertValues) setValueForRefColumn(row []types.Datum, hasValue []bool)
251274
return nil
252275
}
253276

254-
func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.Column, exec func(rows [][]types.Datum) error) error {
277+
func (e *InsertValues) insertRowsFromSelect(ctx context.Context, exec func(rows [][]types.Datum) error) error {
255278
// process `insert|replace into ... select ... from ...`
256279
selectExec := e.children[0]
257280
fields := selectExec.retTypes()
@@ -275,7 +298,7 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.C
275298
for innerChunkRow := iter.Begin(); innerChunkRow != iter.End(); innerChunkRow = iter.Next() {
276299
innerRow := types.CopyRow(innerChunkRow.GetDatumRow(fields))
277300
e.rowCount++
278-
row, err := e.getRow(cols, innerRow)
301+
row, err := e.getRow(innerRow)
279302
if err != nil {
280303
return errors.Trace(err)
281304
}
@@ -305,16 +328,16 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.C
305328
// getRow gets the row which from `insert into select from` or `load data`.
306329
// The input values from these two statements are datums instead of
307330
// expressions which are used in `insert into set x=y`.
308-
func (e *InsertValues) getRow(cols []*table.Column, vals []types.Datum) ([]types.Datum, error) {
331+
func (e *InsertValues) getRow(vals []types.Datum) ([]types.Datum, error) {
309332
row := make([]types.Datum, len(e.Table.Cols()))
310333
hasValue := make([]bool, len(e.Table.Cols()))
311334
for i, v := range vals {
312-
casted, err := table.CastValue(e.ctx, v, cols[i].ToInfo())
335+
casted, err := table.CastValue(e.ctx, v, e.insertColumns[i].ToInfo())
313336
if e.filterErr(err) != nil {
314337
return nil, errors.Trace(err)
315338
}
316339

317-
offset := cols[i].Offset
340+
offset := e.insertColumns[i].Offset
318341
row[offset] = casted
319342
hasValue[offset] = true
320343
}

executor/load_data.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ func NewLoadDataInfo(ctx sessionctx.Context, row []types.Datum, tbl table.Table,
4444
InsertValues: insertVal,
4545
Table: tbl,
4646
Ctx: ctx,
47-
columns: cols,
4847
}
4948
}
5049

@@ -81,6 +80,9 @@ func (e *LoadDataExec) Close() error {
8180

8281
// Open implements the Executor Open interface.
8382
func (e *LoadDataExec) Open(ctx context.Context) error {
83+
if e.loadDataInfo.insertColumns != nil {
84+
e.loadDataInfo.initEvalBuffer()
85+
}
8486
return nil
8587
}
8688

@@ -95,7 +97,6 @@ type LoadDataInfo struct {
9597
LinesInfo *ast.LinesClause
9698
IgnoreLines uint64
9799
Ctx sessionctx.Context
98-
columns []*table.Column
99100
}
100101

101102
// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
@@ -274,7 +275,7 @@ func (e *LoadDataInfo) colsToRow(cols []field) []types.Datum {
274275
e.row[i].SetString(string(cols[i].str))
275276
}
276277
}
277-
row, err := e.getRow(e.columns, e.row)
278+
row, err := e.getRow(e.row)
278279
if err != nil {
279280
e.handleWarning(err,
280281
fmt.Sprintf("Load Data: insert data:%v failed:%v", e.row, errors.ErrorStack(err)))

executor/replace.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func (e *ReplaceExec) Open(ctx context.Context) error {
4242
if e.SelectExec != nil {
4343
return e.SelectExec.Open(ctx)
4444
}
45+
e.initEvalBuffer()
4546
return nil
4647
}
4748

@@ -178,13 +179,8 @@ func (e *ReplaceExec) exec(newRows [][]types.Datum) error {
178179
// Next implements the Executor Next interface.
179180
func (e *ReplaceExec) Next(ctx context.Context, chk *chunk.Chunk) error {
180181
chk.Reset()
181-
cols, err := e.getColumns(e.Table.Cols())
182-
if err != nil {
183-
return errors.Trace(err)
184-
}
185-
186182
if len(e.children) > 0 && e.children[0] != nil {
187-
return errors.Trace(e.insertRowsFromSelect(ctx, cols, e.exec))
183+
return e.insertRowsFromSelect(ctx, e.exec)
188184
}
189-
return errors.Trace(e.insertRows(cols, e.exec))
185+
return e.insertRows(e.exec)
190186
}

executor/update.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type UpdateExec struct {
4242
// columns2Handle stores relationship between column ordinal to its table handle.
4343
// the columns ordinals is present in ordinal range format, @see executor.cols2Handle
4444
columns2Handle cols2HandleSlice
45+
evalBuffer chunk.MutRow
4546
}
4647

4748
func (e *UpdateExec) exec(schema *expression.Schema) ([]types.Datum, error) {
@@ -141,6 +142,7 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error {
141142
fields := e.children[0].retTypes()
142143
globalRowIdx := 0
143144
chk := e.children[0].newFirstChunk()
145+
e.evalBuffer = chunk.MutRowFromTypes(fields)
144146
for {
145147
err := e.children[0].Next(ctx, chk)
146148
if err != nil {
@@ -181,17 +183,19 @@ func (e *UpdateExec) handleErr(colName model.CIStr, rowIdx int, err error) error
181183

182184
func (e *UpdateExec) composeNewRow(rowIdx int, oldRow []types.Datum) ([]types.Datum, error) {
183185
newRowData := types.CopyRow(oldRow)
186+
e.evalBuffer.SetDatums(newRowData...)
184187
for _, assign := range e.OrderedList {
185188
handleIdx, handleFound := e.columns2Handle.findHandle(int32(assign.Col.Index))
186189
if handleFound && e.canNotUpdate(oldRow[handleIdx]) {
187190
continue
188191
}
189-
val, err := assign.Expr.Eval(chunk.MutRowFromDatums(newRowData).ToRow())
192+
val, err := assign.Expr.Eval(e.evalBuffer.ToRow())
190193

191194
if err1 := e.handleErr(assign.Col.ColName, rowIdx, err); err1 != nil {
192-
return nil, errors.Trace(err1)
195+
return nil, err1
193196
}
194-
newRowData[assign.Col.Index] = val
197+
newRowData[assign.Col.Index] = *val.Copy()
198+
e.evalBuffer.SetDatum(assign.Col.Index, val)
195199
}
196200
return newRowData, nil
197201
}

0 commit comments

Comments
 (0)