Skip to content

Commit

Permalink
executor: speed up replace into statement (#7027)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored and coocood committed Jul 18, 2018
1 parent edcc012 commit 50193eb
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 63 deletions.
28 changes: 27 additions & 1 deletion executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -214,7 +215,6 @@ func (b *batchChecker) initDupOldRowFromUniqueKey(ctx sessionctx.Context, t tabl
return errors.Trace(err)
}
handles = append(handles, handle)
break
}
}
}
Expand Down Expand Up @@ -249,3 +249,29 @@ func (b *batchChecker) deleteDupKeys(row toBeCheckedRow) {
delete(b.dupKVs, string(uk.newKV.key))
}
}

// getOldRow gets the table record row from storage for batch check.
func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64) (types.DatumRow, error) {
oldValue, ok := b.dupOldRowValues[string(t.RecordKey(handle))]
if !ok {
return nil, errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle)
}
cols := t.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(ctx, t.Meta(), handle, cols, oldValue)
if err != nil {
return nil, errors.Trace(err)
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(ctx, col.ToInfo())
if err != nil {
return nil, errors.Trace(err)
}
}
}
}
return oldRow, nil
}
25 changes: 2 additions & 23 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -207,30 +205,11 @@ func (e *InsertExec) Open(ctx context.Context) error {
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) (err error) {
// Get the table record row from storage for update.
oldValue, ok := e.dupOldRowValues[string(e.Table.RecordKey(handle))]
if !ok {
return errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle)
}
cols := e.Table.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(e.ctx, e.Table.Meta(), handle, cols, oldValue)
func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := e.getOldRow(e.ctx, e.Table, handle)
if err != nil {
return errors.Trace(err)
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(e.ctx, col.ToInfo())
if err != nil {
return errors.Trace(err)
}
}
}
}

// Do update row.
updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(handle, oldRow, row.row, onDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
Expand Down
172 changes: 133 additions & 39 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package executor
import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"golang.org/x/net/context"
Expand Down Expand Up @@ -44,7 +46,122 @@ func (e *ReplaceExec) Open(ctx context.Context) error {
return nil
}

func (e *ReplaceExec) exec(rows []types.DatumRow) error {
// removeRow removes the duplicate row and cleanup its keys in the key-value map,
// but if the to-be-removed row equals to the to-be-added row, no remove or add things to do.
func (e *ReplaceExec) removeRow(handle int64, newRow types.DatumRow) (bool, error) {
oldRow, err := e.getOldRow(e.ctx, e.Table, handle)
if err != nil {
return false, errors.Trace(err)
}
rowUnchanged, err := types.EqualDatums(e.ctx.GetSessionVars().StmtCtx, oldRow, newRow)
if err != nil {
return false, errors.Trace(err)
}
if rowUnchanged {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
return true, nil
}
err = e.Table.RemoveRecord(e.ctx, handle, oldRow)
if err != nil {
return false, errors.Trace(err)
}
e.ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, e.Table.Meta().ID, handle, nil)
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)

// Cleanup keys map, because the record was removed.
cleanupRows, err := e.getKeysNeedCheck(e.ctx, e.Table, []types.DatumRow{oldRow})
if err != nil {
return false, errors.Trace(err)
}
if len(cleanupRows) > 0 {
// The length of need-to-cleanup rows should be at most 1, due to we only input 1 row.
e.deleteDupKeys(cleanupRows[0])
}
return false, nil
}

// addRow adds a row when all the duplicate key were checked.
func (e *ReplaceExec) addRow(row types.DatumRow) (int64, error) {
// Set kv.PresumeKeyNotExists is safe here, because we've already removed all duplicated rows.
e.ctx.Txn().SetOption(kv.PresumeKeyNotExists, nil)
h, err := e.Table.AddRecord(e.ctx, row, false)
e.ctx.Txn().DelOption(kv.PresumeKeyNotExists)
if err != nil {
return 0, errors.Trace(err)
}
if !e.ctx.GetSessionVars().ImportingData {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
}
return h, nil
}

// replaceRow removes all duplicate rows for one row, then inserts it.
func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {
// Keep on removing duplicated rows.
for {
if r.handleKey != nil {
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return errors.Trace(err)
}
rowUnchanged, err := e.removeRow(handle, r.row)
if err != nil {
return errors.Trace(err)
}
if rowUnchanged {
return nil
}
continue
}
}

rowUnchanged, foundDupKey, err := e.removeIndexRow(r)
if err != nil {
return errors.Trace(err)
}
if rowUnchanged {
return nil
}
if foundDupKey {
continue
}
break
}

// No duplicated rows now, insert the row.
newHandle, err := e.addRow(r.row)
if err != nil {
return errors.Trace(err)
}
e.fillBackKeys(e.Table, r, newHandle)
return nil
}

// removeIndexRow removes the row which has a duplicated key.
// the return values:
// 1. bool: true when the row is unchanged. This means no need to remove, and then add the row.
// 2. bool: true when found the duplicated key. This only means that duplicated key was found,
// and the row was removed.
// 3. error: the error.
func (e *ReplaceExec) removeIndexRow(r toBeCheckedRow) (bool, bool, error) {
for _, uk := range r.uniqueKeys {
if val, found := e.dupKVs[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return false, found, errors.Trace(err)
}
rowUnchanged, err := e.removeRow(handle, r.row)
if err != nil {
return false, found, errors.Trace(err)
}
return rowUnchanged, found, nil
}
}
return false, false, nil
}

func (e *ReplaceExec) exec(newRows []types.DatumRow) error {
/*
* MySQL uses the following algorithm for REPLACE (and LOAD DATA ... REPLACE):
* 1. Try to insert the new row into the table
Expand All @@ -57,46 +174,23 @@ func (e *ReplaceExec) exec(rows []types.DatumRow) error {
* because in this case, one row was inserted after the duplicate was deleted.
* See http://dev.mysql.com/doc/refman/5.7/en/mysql-affected-rows.html
*/
idx := 0
rowsLen := len(rows)
sc := e.ctx.GetSessionVars().StmtCtx
for {
if idx >= rowsLen {
break
}
row := rows[idx]
h, err1 := e.Table.AddRecord(e.ctx, row, false)
if err1 == nil {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
idx++
continue
}
if err1 != nil && !kv.ErrKeyExists.Equal(err1) {
return errors.Trace(err1)
}
oldRow, err1 := e.Table.Row(e.ctx, h)
if err1 != nil {
return errors.Trace(err1)
}
rowUnchanged, err1 := types.EqualDatums(sc, oldRow, row)
if err1 != nil {
return errors.Trace(err1)
}
if rowUnchanged {
// If row unchanged, we do not need to do insert.
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
idx++
continue
}
// Remove current row and try replace again.
err1 = e.Table.RemoveRecord(e.ctx, h, oldRow)
if err1 != nil {
return errors.Trace(err1)
}
e.ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, e.Table.Meta().ID, h, nil)
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
err := e.batchGetInsertKeys(e.ctx, e.Table, newRows)
if err != nil {
return errors.Trace(err)
}

// Batch get the to-be-replaced rows in storage.
err = e.initDupOldRowValue(e.ctx, e.Table, newRows)
if err != nil {
return errors.Trace(err)
}

for _, r := range e.toBeCheckedRows {
err = e.replaceRow(r)
if err != nil {
return errors.Trace(err)
}
}
if e.lastInsertID != 0 {
e.ctx.GetSessionVars().SetLastInsertID(e.lastInsertID)
}
Expand Down

0 comments on commit 50193eb

Please sign in to comment.