Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: speed up replace into statement #7027

Merged
merged 11 commits into from
Jul 18, 2018
28 changes: 27 additions & 1 deletion executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -214,7 +215,6 @@ func (b *batchChecker) initDupOldRowFromUniqueKey(ctx sessionctx.Context, t tabl
return errors.Trace(err)
}
handles = append(handles, handle)
break
}
}
}
Expand Down Expand Up @@ -249,3 +249,29 @@ func (b *batchChecker) deleteDupKeys(row toBeCheckedRow) {
delete(b.dupKVs, string(uk.newKV.key))
}
}

// getOldRow gets the table record row from storage for batch check.
func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64) (types.DatumRow, error) {
oldValue, ok := b.dupOldRowValues[string(t.RecordKey(handle))]
if !ok {
return nil, errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle)
}
cols := t.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(ctx, t.Meta(), handle, cols, oldValue)
if err != nil {
return nil, errors.Trace(err)
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(ctx, col.ToInfo())
if err != nil {
return nil, errors.Trace(err)
}
}
}
}
return oldRow, nil
}
25 changes: 2 additions & 23 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -207,30 +205,11 @@ func (e *InsertExec) Open(ctx context.Context) error {
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) (err error) {
// Get the table record row from storage for update.
oldValue, ok := e.dupOldRowValues[string(e.Table.RecordKey(handle))]
if !ok {
return errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle)
}
cols := e.Table.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(e.ctx, e.Table.Meta(), handle, cols, oldValue)
func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := e.getOldRow(e.ctx, e.Table, handle)
if err != nil {
return errors.Trace(err)
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(e.ctx, col.ToInfo())
if err != nil {
return errors.Trace(err)
}
}
}
}

// Do update row.
updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(handle, oldRow, row.row, onDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
Expand Down
172 changes: 133 additions & 39 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package executor
import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"golang.org/x/net/context"
Expand Down Expand Up @@ -44,7 +46,122 @@ func (e *ReplaceExec) Open(ctx context.Context) error {
return nil
}

func (e *ReplaceExec) exec(rows []types.DatumRow) error {
// removeRow removes the duplicate row and cleanup its keys in the key-value map,
// but if the to-be-removed row equals to the to-be-added row, no remove or add things to do.
func (e *ReplaceExec) removeRow(handle int64, newRow types.DatumRow) (bool, error) {
oldRow, err := e.getOldRow(e.ctx, e.Table, handle)
if err != nil {
return false, errors.Trace(err)
}
rowUnchanged, err := types.EqualDatums(e.ctx.GetSessionVars().StmtCtx, oldRow, newRow)
if err != nil {
return false, errors.Trace(err)
}
if rowUnchanged {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
return true, nil
}
err = e.Table.RemoveRecord(e.ctx, handle, oldRow)
if err != nil {
return false, errors.Trace(err)
}
e.ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, e.Table.Meta().ID, handle, nil)
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)

// Cleanup keys map, because the record was removed.
cleanupRows, err := e.getKeysNeedCheck(e.ctx, e.Table, []types.DatumRow{oldRow})
if err != nil {
return false, errors.Trace(err)
}
if len(cleanupRows) > 0 {
// The length of need-to-cleanup rows should be at most 1, due to we only input 1 row.
e.deleteDupKeys(cleanupRows[0])
}
return false, nil
}

// addRow adds a row when all the duplicate key were checked.
func (e *ReplaceExec) addRow(row types.DatumRow) (int64, error) {
// Set kv.PresumeKeyNotExists is safe here, because we've already removed all duplicated rows.
e.ctx.Txn().SetOption(kv.PresumeKeyNotExists, nil)
h, err := e.Table.AddRecord(e.ctx, row, false)
e.ctx.Txn().DelOption(kv.PresumeKeyNotExists)
if err != nil {
return 0, errors.Trace(err)
}
if !e.ctx.GetSessionVars().ImportingData {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
}
return h, nil
}

// replaceRow removes all duplicate rows for one row, then inserts it.
func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {
// Keep on removing duplicated rows.
for {
if r.handleKey != nil {
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return errors.Trace(err)
}
rowUnchanged, err := e.removeRow(handle, r.row)
if err != nil {
return errors.Trace(err)
}
if rowUnchanged {
return nil
}
continue
}
}

rowUnchanged, foundDupKey, err := e.removeIndexRow(r)
if err != nil {
return errors.Trace(err)
}
if rowUnchanged {
return nil
}
if foundDupKey {
continue
}
break
}

// No duplicated rows now, insert the row.
newHandle, err := e.addRow(r.row)
if err != nil {
return errors.Trace(err)
}
e.fillBackKeys(e.Table, r, newHandle)
return nil
}

// removeIndexRow removes the row which has a duplicated key.
// the return values:
// 1. bool: true when the row is unchanged. This means no need to remove, and then add the row.
// 2. bool: true when found the duplicated key. This only means that duplicated key was found,
// and the row was removed.
// 3. error: the error.
func (e *ReplaceExec) removeIndexRow(r toBeCheckedRow) (bool, bool, error) {
for _, uk := range r.uniqueKeys {
if val, found := e.dupKVs[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return false, found, errors.Trace(err)
}
rowUnchanged, err := e.removeRow(handle, r.row)
if err != nil {
return false, found, errors.Trace(err)
}
return rowUnchanged, found, nil
}
}
return false, false, nil
}

func (e *ReplaceExec) exec(newRows []types.DatumRow) error {
/*
* MySQL uses the following algorithm for REPLACE (and LOAD DATA ... REPLACE):
* 1. Try to insert the new row into the table
Expand All @@ -57,46 +174,23 @@ func (e *ReplaceExec) exec(rows []types.DatumRow) error {
* because in this case, one row was inserted after the duplicate was deleted.
* See http://dev.mysql.com/doc/refman/5.7/en/mysql-affected-rows.html
*/
idx := 0
rowsLen := len(rows)
sc := e.ctx.GetSessionVars().StmtCtx
for {
if idx >= rowsLen {
break
}
row := rows[idx]
h, err1 := e.Table.AddRecord(e.ctx, row, false)
if err1 == nil {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
idx++
continue
}
if err1 != nil && !kv.ErrKeyExists.Equal(err1) {
return errors.Trace(err1)
}
oldRow, err1 := e.Table.Row(e.ctx, h)
if err1 != nil {
return errors.Trace(err1)
}
rowUnchanged, err1 := types.EqualDatums(sc, oldRow, row)
if err1 != nil {
return errors.Trace(err1)
}
if rowUnchanged {
// If row unchanged, we do not need to do insert.
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
idx++
continue
}
// Remove current row and try replace again.
err1 = e.Table.RemoveRecord(e.ctx, h, oldRow)
if err1 != nil {
return errors.Trace(err1)
}
e.ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, e.Table.Meta().ID, h, nil)
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
err := e.batchGetInsertKeys(e.ctx, e.Table, newRows)
if err != nil {
return errors.Trace(err)
}

// Batch get the to-be-replaced rows in storage.
err = e.initDupOldRowValue(e.ctx, e.Table, newRows)
if err != nil {
return errors.Trace(err)
}

for _, r := range e.toBeCheckedRows {
err = e.replaceRow(r)
if err != nil {
return errors.Trace(err)
}
}
if e.lastInsertID != 0 {
e.ctx.GetSessionVars().SetLastInsertID(e.lastInsertID)
}
Expand Down