From 922d1041a71c581b3cd9efac50fc8ca29f7ee50a Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Tue, 10 Jul 2018 17:10:54 +0800 Subject: [PATCH 1/7] opt replace into --- executor/batch_checker.go | 27 ++++++++ executor/insert.go | 25 +------ executor/replace.go | 133 +++++++++++++++++++++++++++----------- 3 files changed, 124 insertions(+), 61 deletions(-) diff --git a/executor/batch_checker.go b/executor/batch_checker.go index ea052481db457..97429b537d9b3 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -16,6 +16,7 @@ package executor import ( "github.com/juju/errors" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -249,3 +250,29 @@ func (b *batchChecker) deleteDupKeys(row toBeCheckedRow) { delete(b.dupKVs, string(uk.newKV.key)) } } + +func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64) (types.DatumRow, error) { + // Get the table record row from storage for update. + oldValue, ok := b.dupOldRowValues[string(t.RecordKey(handle))] + if !ok { + return nil, errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle) + } + cols := t.WritableCols() + oldRow, oldRowMap, err := tables.DecodeRawRowData(ctx, t.Meta(), handle, cols, oldValue) + if err != nil { + return nil, errors.Trace(err) + } + // Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue. + for _, col := range cols { + if col.State != model.StatePublic && oldRow[col.Offset].IsNull() { + _, found := oldRowMap[col.ID] + if !found { + oldRow[col.Offset], err = table.GetColOriginDefaultValue(ctx, col.ToInfo()) + if err != nil { + return nil, errors.Trace(err) + } + } + } + } + return oldRow, nil +} diff --git a/executor/insert.go b/executor/insert.go index 1d8dbfd716c1b..e29204ab22872 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -17,9 +17,7 @@ import ( "github.com/juju/errors" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" - "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -207,30 +205,11 @@ func (e *InsertExec) Open(ctx context.Context) error { } // updateDupRow updates a duplicate row to a new row. -func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) (err error) { - // Get the table record row from storage for update. - oldValue, ok := e.dupOldRowValues[string(e.Table.RecordKey(handle))] - if !ok { - return errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle) - } - cols := e.Table.WritableCols() - oldRow, oldRowMap, err := tables.DecodeRawRowData(e.ctx, e.Table.Meta(), handle, cols, oldValue) +func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error { + oldRow, err := e.getOldRow(e.ctx, e.Table, handle) if err != nil { return errors.Trace(err) } - // Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue. - for _, col := range cols { - if col.State != model.StatePublic && oldRow[col.Offset].IsNull() { - _, found := oldRowMap[col.ID] - if !found { - oldRow[col.Offset], err = table.GetColOriginDefaultValue(e.ctx, col.ToInfo()) - if err != nil { - return errors.Trace(err) - } - } - } - } - // Do update row. updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(handle, oldRow, row.row, onDuplicate) if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) { diff --git a/executor/replace.go b/executor/replace.go index b112de8297e36..2c864b3273651 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -16,6 +16,8 @@ package executor import ( "github.com/juju/errors" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "golang.org/x/net/context" @@ -44,7 +46,49 @@ func (e *ReplaceExec) Open(ctx context.Context) error { return nil } -func (e *ReplaceExec) exec(rows []types.DatumRow) error { +func (e *ReplaceExec) removeRow(handle int64, newRow types.DatumRow) error { + oldRow, err := e.getOldRow(e.ctx, e.Table, handle) + if err != nil { + return errors.Trace(err) + } + rowUnchanged, err := types.EqualDatums(e.ctx.GetSessionVars().StmtCtx, oldRow, newRow) + if err != nil { + return errors.Trace(err) + } + err = e.Table.RemoveRecord(e.ctx, handle, oldRow) + if err != nil { + return errors.Trace(err) + } + e.ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, e.Table.Meta().ID, handle, nil) + if !rowUnchanged { + e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) + } + + // cleanup keys map + cleanupRow, err := e.getKeysNeedCheck(e.ctx, e.Table, []types.DatumRow{oldRow}) + if err != nil { + return errors.Trace(err) + } + if len(cleanupRow) > 0 { + e.deleteDupKeys(cleanupRow[0]) + } + return nil +} + +func (e *ReplaceExec) insertRow(row types.DatumRow) (int64, error) { + e.ctx.Txn().SetOption(kv.PresumeKeyNotExists, nil) + h, err := e.Table.AddRecord(e.ctx, row, false) + e.ctx.Txn().DelOption(kv.PresumeKeyNotExists) + if err != nil { + return 0, errors.Trace(err) + } + if !e.ctx.GetSessionVars().ImportingData { + e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row) + } + return h, nil +} + +func (e *ReplaceExec) exec(newRows []types.DatumRow) error { /* * MySQL uses the following algorithm for REPLACE (and LOAD DATA ... REPLACE): * 1. Try to insert the new row into the table @@ -57,46 +101,59 @@ func (e *ReplaceExec) exec(rows []types.DatumRow) error { * because in this case, one row was inserted after the duplicate was deleted. * See http://dev.mysql.com/doc/refman/5.7/en/mysql-affected-rows.html */ - idx := 0 - rowsLen := len(rows) - sc := e.ctx.GetSessionVars().StmtCtx - for { - if idx >= rowsLen { + err := e.batchGetInsertKeys(e.ctx, e.Table, newRows) + if err != nil { + return errors.Trace(err) + } + + // Batch get the to-be-updated rows in storage. + err = e.initDupOldRowValue(e.ctx, e.Table, newRows) + if err != nil { + return errors.Trace(err) + } + + for i, r := range e.toBeCheckedRows { + for { + if r.handleKey != nil { + if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found { + handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key) + if err != nil { + return errors.Trace(err) + } + err = e.removeRow(handle, newRows[i]) + if err != nil { + return errors.Trace(err) + } + continue + } + } + + foundDupKey := false + for _, uk := range r.uniqueKeys { + if val, found := e.dupKVs[string(uk.newKV.key)]; found { + handle, err := tables.DecodeHandle(val) + if err != nil { + return errors.Trace(err) + } + err = e.removeRow(handle, newRows[i]) + if err != nil { + return errors.Trace(err) + } + foundDupKey = true + break + } + } + if foundDupKey { + continue + } + newHandle, err := e.insertRow(newRows[i]) + if err != nil { + return errors.Trace(err) + } + e.fillBackKeys(e.Table, r, newHandle) break } - row := rows[idx] - h, err1 := e.Table.AddRecord(e.ctx, row, false) - if err1 == nil { - e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row) - idx++ - continue - } - if err1 != nil && !kv.ErrKeyExists.Equal(err1) { - return errors.Trace(err1) - } - oldRow, err1 := e.Table.Row(e.ctx, h) - if err1 != nil { - return errors.Trace(err1) - } - rowUnchanged, err1 := types.EqualDatums(sc, oldRow, row) - if err1 != nil { - return errors.Trace(err1) - } - if rowUnchanged { - // If row unchanged, we do not need to do insert. - e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) - idx++ - continue - } - // Remove current row and try replace again. - err1 = e.Table.RemoveRecord(e.ctx, h, oldRow) - if err1 != nil { - return errors.Trace(err1) - } - e.ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, e.Table.Meta().ID, h, nil) - e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) } - if e.lastInsertID != 0 { e.ctx.GetSessionVars().SetLastInsertID(e.lastInsertID) } From c79ef01b822b0af9db98490d1d5ee590124e6c68 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Tue, 10 Jul 2018 17:34:52 +0800 Subject: [PATCH 2/7] add comments --- executor/batch_checker.go | 4 ++-- executor/insert.go | 2 +- executor/replace.go | 15 +++++++++------ 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/executor/batch_checker.go b/executor/batch_checker.go index 97429b537d9b3..481c9f86ef4c6 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -251,8 +251,8 @@ func (b *batchChecker) deleteDupKeys(row toBeCheckedRow) { } } -func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64) (types.DatumRow, error) { - // Get the table record row from storage for update. +// decodeOldRow decodes the table record row from storage for batch check. +func (b *batchChecker) decodeOldRow(ctx sessionctx.Context, t table.Table, handle int64) (types.DatumRow, error) { oldValue, ok := b.dupOldRowValues[string(t.RecordKey(handle))] if !ok { return nil, errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle) diff --git a/executor/insert.go b/executor/insert.go index e29204ab22872..0080acb89a2e9 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -206,7 +206,7 @@ func (e *InsertExec) Open(ctx context.Context) error { // updateDupRow updates a duplicate row to a new row. func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error { - oldRow, err := e.getOldRow(e.ctx, e.Table, handle) + oldRow, err := e.decodeOldRow(e.ctx, e.Table, handle) if err != nil { return errors.Trace(err) } diff --git a/executor/replace.go b/executor/replace.go index 2c864b3273651..f1c84f26b6c3f 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -47,7 +47,7 @@ func (e *ReplaceExec) Open(ctx context.Context) error { } func (e *ReplaceExec) removeRow(handle int64, newRow types.DatumRow) error { - oldRow, err := e.getOldRow(e.ctx, e.Table, handle) + oldRow, err := e.decodeOldRow(e.ctx, e.Table, handle) if err != nil { return errors.Trace(err) } @@ -64,18 +64,20 @@ func (e *ReplaceExec) removeRow(handle int64, newRow types.DatumRow) error { e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) } - // cleanup keys map - cleanupRow, err := e.getKeysNeedCheck(e.ctx, e.Table, []types.DatumRow{oldRow}) + // Cleanup keys map, because the record was removed. + cleanupRows, err := e.getKeysNeedCheck(e.ctx, e.Table, []types.DatumRow{oldRow}) if err != nil { return errors.Trace(err) } - if len(cleanupRow) > 0 { - e.deleteDupKeys(cleanupRow[0]) + if len(cleanupRows) > 0 { + // The length of need-to-cleanup rows should be at most 1, due to we only input 1 row. + e.deleteDupKeys(cleanupRows[0]) } return nil } func (e *ReplaceExec) insertRow(row types.DatumRow) (int64, error) { + // Set kv.PresumeKeyNotExists is safe here, because we've already removed all duplicated rows. e.ctx.Txn().SetOption(kv.PresumeKeyNotExists, nil) h, err := e.Table.AddRecord(e.ctx, row, false) e.ctx.Txn().DelOption(kv.PresumeKeyNotExists) @@ -106,7 +108,7 @@ func (e *ReplaceExec) exec(newRows []types.DatumRow) error { return errors.Trace(err) } - // Batch get the to-be-updated rows in storage. + // Batch get the to-be-replaced rows in storage. err = e.initDupOldRowValue(e.ctx, e.Table, newRows) if err != nil { return errors.Trace(err) @@ -146,6 +148,7 @@ func (e *ReplaceExec) exec(newRows []types.DatumRow) error { if foundDupKey { continue } + // No duplicated rows now, insert the row and go to the next row. newHandle, err := e.insertRow(newRows[i]) if err != nil { return errors.Trace(err) From 3e8e4d30803760a043a211e3e059658b05b25d7b Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Tue, 10 Jul 2018 21:12:33 +0800 Subject: [PATCH 3/7] need more dupKey row values --- executor/batch_checker.go | 1 - 1 file changed, 1 deletion(-) diff --git a/executor/batch_checker.go b/executor/batch_checker.go index 481c9f86ef4c6..bee238590707b 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -215,7 +215,6 @@ func (b *batchChecker) initDupOldRowFromUniqueKey(ctx sessionctx.Context, t tabl return errors.Trace(err) } handles = append(handles, handle) - break } } } From adbcbf33bfea6ecdf27da9b047921566042f8f02 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Thu, 12 Jul 2018 15:32:53 +0800 Subject: [PATCH 4/7] address comments --- executor/batch_checker.go | 4 ++-- executor/insert.go | 2 +- executor/replace.go | 40 +++++++++++++++++++++++++-------------- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/executor/batch_checker.go b/executor/batch_checker.go index bee238590707b..44267fde63b08 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -250,8 +250,8 @@ func (b *batchChecker) deleteDupKeys(row toBeCheckedRow) { } } -// decodeOldRow decodes the table record row from storage for batch check. -func (b *batchChecker) decodeOldRow(ctx sessionctx.Context, t table.Table, handle int64) (types.DatumRow, error) { +// getOldRow gets the table record row from storage for batch check. +func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64) (types.DatumRow, error) { oldValue, ok := b.dupOldRowValues[string(t.RecordKey(handle))] if !ok { return nil, errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle) diff --git a/executor/insert.go b/executor/insert.go index 0080acb89a2e9..e29204ab22872 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -206,7 +206,7 @@ func (e *InsertExec) Open(ctx context.Context) error { // updateDupRow updates a duplicate row to a new row. func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error { - oldRow, err := e.decodeOldRow(e.ctx, e.Table, handle) + oldRow, err := e.getOldRow(e.ctx, e.Table, handle) if err != nil { return errors.Trace(err) } diff --git a/executor/replace.go b/executor/replace.go index f1c84f26b6c3f..1c8554210c347 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -46,37 +46,42 @@ func (e *ReplaceExec) Open(ctx context.Context) error { return nil } -func (e *ReplaceExec) removeRow(handle int64, newRow types.DatumRow) error { - oldRow, err := e.decodeOldRow(e.ctx, e.Table, handle) +// removeRow removes the duplicate row and cleanup its keys in the key-value map, +// but if the to-be-removed row equals to the to-be-added row, no remove or add things to do. +func (e *ReplaceExec) removeRow(handle int64, newRow types.DatumRow) (bool, error) { + oldRow, err := e.getOldRow(e.ctx, e.Table, handle) if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) } rowUnchanged, err := types.EqualDatums(e.ctx.GetSessionVars().StmtCtx, oldRow, newRow) if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) + } + if rowUnchanged { + e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) + return true, nil } err = e.Table.RemoveRecord(e.ctx, handle, oldRow) if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) } e.ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, e.Table.Meta().ID, handle, nil) - if !rowUnchanged { - e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) - } + e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) // Cleanup keys map, because the record was removed. cleanupRows, err := e.getKeysNeedCheck(e.ctx, e.Table, []types.DatumRow{oldRow}) if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) } if len(cleanupRows) > 0 { // The length of need-to-cleanup rows should be at most 1, due to we only input 1 row. e.deleteDupKeys(cleanupRows[0]) } - return nil + return false, nil } -func (e *ReplaceExec) insertRow(row types.DatumRow) (int64, error) { +// addRow adds a row when all the duplicate key were checked. +func (e *ReplaceExec) addRow(row types.DatumRow) (int64, error) { // Set kv.PresumeKeyNotExists is safe here, because we've already removed all duplicated rows. e.ctx.Txn().SetOption(kv.PresumeKeyNotExists, nil) h, err := e.Table.AddRecord(e.ctx, row, false) @@ -116,16 +121,20 @@ func (e *ReplaceExec) exec(newRows []types.DatumRow) error { for i, r := range e.toBeCheckedRows { for { + rowUnchanged := false if r.handleKey != nil { if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found { handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key) if err != nil { return errors.Trace(err) } - err = e.removeRow(handle, newRows[i]) + rowUnchanged, err = e.removeRow(handle, newRows[i]) if err != nil { return errors.Trace(err) } + if rowUnchanged { + break + } continue } } @@ -137,7 +146,7 @@ func (e *ReplaceExec) exec(newRows []types.DatumRow) error { if err != nil { return errors.Trace(err) } - err = e.removeRow(handle, newRows[i]) + rowUnchanged, err = e.removeRow(handle, newRows[i]) if err != nil { return errors.Trace(err) } @@ -145,11 +154,14 @@ func (e *ReplaceExec) exec(newRows []types.DatumRow) error { break } } + if rowUnchanged { + break + } if foundDupKey { continue } // No duplicated rows now, insert the row and go to the next row. - newHandle, err := e.insertRow(newRows[i]) + newHandle, err := e.addRow(newRows[i]) if err != nil { return errors.Trace(err) } From c6cf3cab9c3918364512eb20100bff6355210406 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Tue, 17 Jul 2018 13:09:29 +0800 Subject: [PATCH 5/7] address comments --- executor/replace.go | 47 ++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/executor/replace.go b/executor/replace.go index 1c8554210c347..99126bbca5fe7 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -95,6 +95,29 @@ func (e *ReplaceExec) addRow(row types.DatumRow) (int64, error) { return h, nil } +// removeIndexRow removes the row which has a duplicated key. +// the return values: +// 1. bool: true when the row is unchanged. This means no need to remove, and then add the row. +// 2. bool: true when found the duplicated key. This only means that duplicated key was found, +// and the row was removed. +// 3. error: the error. +func (e *ReplaceExec) removeIndexRow(r toBeCheckedRow) (bool, bool, error) { + for _, uk := range r.uniqueKeys { + if val, found := e.dupKVs[string(uk.newKV.key)]; found { + handle, err := tables.DecodeHandle(val) + if err != nil { + return false, found, errors.Trace(err) + } + rowUnchanged, err := e.removeRow(handle, r.row) + if err != nil { + return false, found, errors.Trace(err) + } + return rowUnchanged, found, nil + } + } + return false, false, nil +} + func (e *ReplaceExec) exec(newRows []types.DatumRow) error { /* * MySQL uses the following algorithm for REPLACE (and LOAD DATA ... REPLACE): @@ -119,16 +142,15 @@ func (e *ReplaceExec) exec(newRows []types.DatumRow) error { return errors.Trace(err) } - for i, r := range e.toBeCheckedRows { + for _, r := range e.toBeCheckedRows { for { - rowUnchanged := false if r.handleKey != nil { if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found { handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key) if err != nil { return errors.Trace(err) } - rowUnchanged, err = e.removeRow(handle, newRows[i]) + rowUnchanged, err := e.removeRow(handle, r.row) if err != nil { return errors.Trace(err) } @@ -139,20 +161,9 @@ func (e *ReplaceExec) exec(newRows []types.DatumRow) error { } } - foundDupKey := false - for _, uk := range r.uniqueKeys { - if val, found := e.dupKVs[string(uk.newKV.key)]; found { - handle, err := tables.DecodeHandle(val) - if err != nil { - return errors.Trace(err) - } - rowUnchanged, err = e.removeRow(handle, newRows[i]) - if err != nil { - return errors.Trace(err) - } - foundDupKey = true - break - } + rowUnchanged, foundDupKey, err := e.removeIndexRow(r) + if err != nil { + return errors.Trace(err) } if rowUnchanged { break @@ -161,7 +172,7 @@ func (e *ReplaceExec) exec(newRows []types.DatumRow) error { continue } // No duplicated rows now, insert the row and go to the next row. - newHandle, err := e.addRow(newRows[i]) + newHandle, err := e.addRow(r.row) if err != nil { return errors.Trace(err) } From 3cb2720c70ed9fa4d14cce635cb8698ceea93220 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Tue, 17 Jul 2018 17:33:23 +0800 Subject: [PATCH 6/7] address comments --- executor/replace.go | 81 +++++++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/executor/replace.go b/executor/replace.go index 99126bbca5fe7..d6d30e08214e1 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -95,6 +95,49 @@ func (e *ReplaceExec) addRow(row types.DatumRow) (int64, error) { return h, nil } +// replaceRow remove all duplicate rows for one row, then insert it. +func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error { + // Keep on removing duplicated rows. + for { + if r.handleKey != nil { + if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found { + handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key) + if err != nil { + return errors.Trace(err) + } + rowUnchanged, err := e.removeRow(handle, r.row) + if err != nil { + return errors.Trace(err) + } + if rowUnchanged { + return nil + } + continue + } + } + + rowUnchanged, foundDupKey, err := e.removeIndexRow(r) + if err != nil { + return errors.Trace(err) + } + if rowUnchanged { + return nil + } + if foundDupKey { + continue + } + break + } + + // No duplicated rows now, insert the row. + newHandle, err := e.addRow(r.row) + if err != nil { + return errors.Trace(err) + } + e.fillBackKeys(e.Table, r, newHandle) + return nil +} + // removeIndexRow removes the row which has a duplicated key. // the return values: // 1. bool: true when the row is unchanged. This means no need to remove, and then add the row. @@ -143,41 +186,9 @@ func (e *ReplaceExec) exec(newRows []types.DatumRow) error { } for _, r := range e.toBeCheckedRows { - for { - if r.handleKey != nil { - if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found { - handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key) - if err != nil { - return errors.Trace(err) - } - rowUnchanged, err := e.removeRow(handle, r.row) - if err != nil { - return errors.Trace(err) - } - if rowUnchanged { - break - } - continue - } - } - - rowUnchanged, foundDupKey, err := e.removeIndexRow(r) - if err != nil { - return errors.Trace(err) - } - if rowUnchanged { - break - } - if foundDupKey { - continue - } - // No duplicated rows now, insert the row and go to the next row. - newHandle, err := e.addRow(r.row) - if err != nil { - return errors.Trace(err) - } - e.fillBackKeys(e.Table, r, newHandle) - break + err = e.replaceRow(r) + if err != nil { + return errors.Trace(err) } } if e.lastInsertID != 0 { From 17ffd906b8a65ee1a40d0c1e4b4aac9bd135e5b4 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Wed, 18 Jul 2018 10:39:56 +0800 Subject: [PATCH 7/7] address comments --- executor/replace.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/replace.go b/executor/replace.go index d6d30e08214e1..d959cdfb911f5 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -95,7 +95,7 @@ func (e *ReplaceExec) addRow(row types.DatumRow) (int64, error) { return h, nil } -// replaceRow remove all duplicate rows for one row, then insert it. +// replaceRow removes all duplicate rows for one row, then inserts it. func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error { // Keep on removing duplicated rows. for {