Skip to content

Commit

Permalink
lightning: add error message for pre-deduplication (#44317)
Browse files Browse the repository at this point in the history
ref #41629
  • Loading branch information
lance6716 authored Jun 7, 2023
1 parent aedbcd0 commit 6bab55c
Show file tree
Hide file tree
Showing 16 changed files with 433 additions and 68 deletions.
20 changes: 17 additions & 3 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ const (
importMutexStateReadLock
)

const (
// DupDetectDirSuffix is used by pre-deduplication to store the encoded index KV.
DupDetectDirSuffix = ".dupdetect"
// DupResultDirSuffix is used by pre-deduplication to store the duplicated row ID.
DupResultDirSuffix = ".dupresult"
)

// engineMeta contains some field that is necessary to continue the engine restore/import process.
// These field should be written to disk when we update chunk checkpoint
type engineMeta struct {
Expand Down Expand Up @@ -159,14 +166,21 @@ func (e *Engine) Close() error {
return err
}

// Cleanup remove meta and db files
// Cleanup remove meta, db and duplicate detection files
func (e *Engine) Cleanup(dataDir string) error {
if err := os.RemoveAll(e.sstDir); err != nil {
return errors.Trace(err)
}
uuid := e.UUID.String()
if err := os.RemoveAll(filepath.Join(dataDir, uuid+DupDetectDirSuffix)); err != nil {
return errors.Trace(err)
}
if err := os.RemoveAll(filepath.Join(dataDir, uuid+DupResultDirSuffix)); err != nil {
return errors.Trace(err)
}

dbPath := filepath.Join(dataDir, e.UUID.String())
return os.RemoveAll(dbPath)
dbPath := filepath.Join(dataDir, uuid)
return errors.Trace(os.RemoveAll(dbPath))
}

// Exist checks if db folder existing (meta sometimes won't flush before lightning exit)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/duplicate/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ type Handler interface {
Begin(key []byte) error
// Append appends a keyID to the current duplicate key.
// Multiple keyIDs are appended in lexicographical order.
// NOTE: keyID may be changed after the call.
Append(keyID []byte) error
// End is called when all keyIDs of the current duplicate key have been appended.
End() error
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ const (
insertIntoConflictErrorV2 = `
INSERT INTO %s.` + conflictErrorV2TableName + `
(task_id, table_name, path, offset, error, row_id, row_data)
VALUES (?, ?, ?, ?, ?, ?, ?);
VALUES (?, ?, ?, ?, ?, ?, ?);
`
)

Expand All @@ -156,6 +156,11 @@ func (em *ErrorManager) TypeErrorsRemain() int64 {
return em.remainingError.Type.Load()
}

// RemainRecord returns the number of errors that need be recorded.
func (em *ErrorManager) RemainRecord() int64 {
return em.maxErrRecords.Load()
}

// New creates a new error manager.
func New(db *sql.DB, cfg *config.Config, logger log.Logger) *ErrorManager {
maxErrRecords := &atomic.Int64{}
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@ go_library(
"//parser/mysql",
"//planner/core",
"//store/driver",
"//store/driver/txn",
"//store/pdtypes",
"//table",
"//table/tables",
"//tablecodec",
"//types",
"//util/codec",
"//util/collate",
"//util/dbterror",
"//util/engine",
Expand Down Expand Up @@ -147,7 +150,9 @@ go_test(
"//store/mockstore",
"//store/pdtypes",
"//table/tables",
"//tablecodec",
"//types",
"//util/codec",
"//util/dbutil",
"//util/extsort",
"//util/mock",
Expand Down
121 changes: 118 additions & 3 deletions br/pkg/lightning/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
kv2 "github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/tidb"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand All @@ -36,7 +37,11 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/extsort"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -199,7 +204,15 @@ func (cr *chunkProcessor) process(

logTask := logger.Begin(zap.InfoLevel, "restore file")

readTotalDur, encodeTotalDur, encodeErr := cr.encodeLoop(ctx, kvsCh, t, logger, kvEncoder, deliverCompleteCh, rc)
readTotalDur, encodeTotalDur, encodeErr := cr.encodeLoop(
ctx,
kvsCh,
t,
logger,
kvEncoder,
deliverCompleteCh,
rc,
)
var deliverErr error
select {
case deliverResult, ok := <-deliverCompleteCh:
Expand Down Expand Up @@ -233,6 +246,36 @@ func (cr *chunkProcessor) encodeLoop(
) (readTotalDur time.Duration, encodeTotalDur time.Duration, err error) {
defer close(kvsCh)

// when AddIndexBySQL, we use all PK and UK to run pre-deduplication, and then we
// strip almost all secondary index to run encodeLoop. In encodeLoop when we meet
// a duplicated row marked by pre-deduplication, we need original table structure
// to generate the duplicate error message, so here create a new encoder with
// original table structure.
originalTableEncoder := kvEncoder
if rc.cfg.TikvImporter.AddIndexBySQL {
encTable, err := tables.TableFromMeta(t.alloc, t.tableInfo.Desired)
if err != nil {
return 0, 0, errors.Trace(err)
}

originalTableEncoder, err = rc.encBuilder.NewEncoder(ctx, &encode.EncodingConfig{
SessionOptions: encode.SessionOptions{
SQLMode: rc.cfg.TiDB.SQLMode,
Timestamp: cr.chunk.Timestamp,
SysVars: rc.sysVars,
// use chunk.PrevRowIDMax as the auto random seed, so it can stay the same value after recover from checkpoint.
AutoRandomSeed: cr.chunk.Chunk.PrevRowIDMax,
},
Path: cr.chunk.Key.Path,
Table: encTable,
Logger: logger,
})
if err != nil {
return 0, 0, errors.Trace(err)
}
defer originalTableEncoder.Close()
}

send := func(kvs []deliveredKVs) error {
select {
case kvsCh <- kvs:
Expand Down Expand Up @@ -376,11 +419,32 @@ func (cr *chunkProcessor) encodeLoop(
}
if isDupIgnored {
cr.parser.RecycleRow(lastRow)
lastOffset := curOffset
curOffset = newOffset

if rc.errorMgr.RemainRecord() <= 0 {
continue
}

dupMsg := cr.getDuplicateMessage(
originalTableEncoder,
lastRow,
lastOffset,
dupIgnoreRowsIter.UnsafeValue(),
t.tableInfo.Desired,
logger,
)
rowText := tidb.EncodeRowForRecord(ctx, t.encTable, rc.cfg.TiDB.SQLMode, lastRow.Row, cr.chunk.ColumnPermutation)
// TODO: fill error message
err = rc.errorMgr.RecordConflictErrorV2(ctx, logger, t.tableName, cr.chunk.Key.Path, newOffset, "", lastRow.RowID, rowText)
err = rc.errorMgr.RecordConflictErrorV2(
ctx,
logger,
t.tableName,
cr.chunk.Key.Path,
newOffset,
dupMsg,
lastRow.RowID,
rowText,
)
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -452,6 +516,57 @@ func (cr *chunkProcessor) encodeLoop(
return
}

// getDuplicateMessage gets the duplicate message like a SQL error. When it meets
// internal error, the error message will be returned instead of the duplicate message.
// If the index is not found (which is not expected), an empty string will be returned.
func (cr *chunkProcessor) getDuplicateMessage(
kvEncoder encode.Encoder,
lastRow mydump.Row,
lastOffset int64,
encodedIdxID []byte,
tableInfo *model.TableInfo,
logger log.Logger,
) string {
_, idxID, err := codec.DecodeVarint(encodedIdxID)
if err != nil {
return err.Error()
}
kvs, err := kvEncoder.Encode(lastRow.Row, lastRow.RowID, cr.chunk.ColumnPermutation, lastOffset)
if err != nil {
return err.Error()
}

if idxID == conflictOnHandle {
for _, kv := range kvs.(*kv2.Pairs).Pairs {
if tablecodec.IsRecordKey(kv.Key) {
dupErr := txn.ExtractKeyExistsErrFromHandle(kv.Key, kv.Val, tableInfo)
return dupErr.Error()
}
}
// should not happen
logger.Warn("fail to find conflict record key",
zap.String("file", cr.chunk.FileMeta.Path),
zap.Any("row", lastRow.Row))
} else {
for _, kv := range kvs.(*kv2.Pairs).Pairs {
_, decodedIdxID, isRecordKey, err := tablecodec.DecodeKeyHead(kv.Key)
if err != nil {
return err.Error()
}
if !isRecordKey && decodedIdxID == idxID {
dupErr := txn.ExtractKeyExistsErrFromIndex(kv.Key, kv.Val, tableInfo, idxID)
return dupErr.Error()
}
}
// should not happen
logger.Warn("fail to find conflict index key",
zap.String("file", cr.chunk.FileMeta.Path),
zap.Int64("idxID", idxID),
zap.Any("row", lastRow.Row))
}
return ""
}

//nolint:nakedret // TODO: refactor
func (cr *chunkProcessor) deliverLoop(
ctx context.Context,
Expand Down
73 changes: 61 additions & 12 deletions br/pkg/lightning/importer/dup_detect.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/extsort"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -105,38 +108,63 @@ func makeDupHandlerConstructor(
}
}

// ErrDuplicateKey is an error class for duplicate key error.
var ErrDuplicateKey = errors.Normalize("duplicate key detected on indexID %d of KeyID: %v", errors.RFCCodeText("Lightning:PreDedup:ErrDuplicateKey"))

var (
_ duplicate.Handler = &errorOnDup{}
_ duplicate.Handler = &replaceOnDup{}
_ duplicate.Handler = &ignoreOnDup{}
)

type errorOnDup struct{}
type errorOnDup struct {
idxID int64
keyIDs [][]byte
}

func (errorOnDup) Begin(key []byte) error {
// TODO: add more useful information to the error message.
return errors.Errorf("duplicate key detected: %X", key)
func (h *errorOnDup) Begin(key []byte) error {
idxID, err := decodeIndexID(key)
if err != nil {
return err
}
h.idxID = idxID
return nil
}

func (errorOnDup) Append(_ []byte) error { return nil }
func (errorOnDup) End() error { return nil }
func (errorOnDup) Close() error { return nil }
func (h *errorOnDup) Append(keyID []byte) error {
if len(h.keyIDs) >= 2 {
// we only need 2 keyIDs to report the error.
return nil
}
h.keyIDs = append(h.keyIDs, slices.Clone(keyID))
return nil
}
func (h *errorOnDup) End() error {
return ErrDuplicateKey.GenWithStackByArgs(h.idxID, h.keyIDs)
}
func (*errorOnDup) Close() error { return nil }

type replaceOnDup struct {
// All keyIDs except the last one will be written to w.
// keyID written to w will be ignored during importing.
w extsort.Writer
keyID []byte
idxID []byte // Varint encoded indexID
}

func (h *replaceOnDup) Begin(_ []byte) error {
func (h *replaceOnDup) Begin(key []byte) error {
h.keyID = h.keyID[:0]
idxID, err := decodeIndexID(key)
if err != nil {
return err
}
h.idxID = codec.EncodeVarint(nil, idxID)
return nil
}

func (h *replaceOnDup) Append(keyID []byte) error {
if len(h.keyID) > 0 {
if err := h.w.Put(h.keyID, nil); err != nil {
if err := h.w.Put(h.keyID, h.idxID); err != nil {
return err
}
}
Expand All @@ -157,10 +185,16 @@ type ignoreOnDup struct {
// keyID written to w will be ignored during importing.
w extsort.Writer
first bool
idxID []byte // Varint encoded indexID
}

func (h *ignoreOnDup) Begin(_ []byte) error {
func (h *ignoreOnDup) Begin(key []byte) error {
h.first = true
idxID, err := decodeIndexID(key)
if err != nil {
return err
}
h.idxID = codec.EncodeVarint(nil, idxID)
return nil
}

Expand All @@ -169,7 +203,7 @@ func (h *ignoreOnDup) Append(keyID []byte) error {
h.first = false
return nil
}
return h.w.Put(keyID, nil)
return h.w.Put(keyID, h.idxID)
}

func (*ignoreOnDup) End() error {
Expand Down Expand Up @@ -319,7 +353,7 @@ func simplifyTable(
usedColOffsets := make(map[int]struct{})
for _, idxInfo := range tblInfo.Indices {
if idxInfo.Primary || idxInfo.Unique {
usedIndices = append(usedIndices, idxInfo)
usedIndices = append(usedIndices, idxInfo.Clone())
for _, col := range idxInfo.Columns {
usedColOffsets[col.Offset] = struct{}{}
}
Expand Down Expand Up @@ -368,3 +402,18 @@ func simplifyTable(
}
return newTblInfo, newColPerm
}

const conflictOnHandle = int64(-1)

func decodeIndexID(key []byte) (int64, error) {
switch {
case tablecodec.IsRecordKey(key):
return conflictOnHandle, nil
case tablecodec.IsIndexKey(key):
_, idxID, _, err := tablecodec.DecodeIndexKey(key)
return idxID, errors.Trace(err)

default:
return 0, errors.Errorf("unexpected key: %X, expected a record key or index key", key)
}
}
Loading

0 comments on commit 6bab55c

Please sign in to comment.