Skip to content

Commit

Permalink
Merge remote-tracking branch 'pingcap/master' into feat-branch-reorg-…
Browse files Browse the repository at this point in the history
…part-master-merge
  • Loading branch information
mjonss committed Feb 10, 2023
2 parents 618ca02 + 6cb7374 commit a608929
Show file tree
Hide file tree
Showing 23 changed files with 962 additions and 327 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,11 @@ bazel_test: failpoint-enable bazel_ci_prepare


bazel_coverage_test: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_keep_going=false \
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_keep_going=false \
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest,distributereorg \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/google/btree"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand Down Expand Up @@ -1224,6 +1225,15 @@ func (w *Writer) flushKVs(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}

failpoint.Inject("orphanWriterGoRoutine", func() {
_ = common.KillMySelf()
// mimic we meet context cancel error when `addSST`
<-ctx.Done()
time.Sleep(5 * time.Second)
failpoint.Return(errors.Trace(ctx.Err()))
})

err = w.addSST(ctx, meta)
if err != nil {
return errors.Trace(err)
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,11 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
o.logger.Error("restore failed", log.ShortError(err))
return errors.Trace(err)
}

failpoint.Inject("orphanWriterGoRoutine", func() {
// don't exit too quickly to expose panic
defer time.Sleep(time.Second * 10)
})
defer procedure.Close()

err = procedure.Run(ctx)
Expand Down
14 changes: 13 additions & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ func (tr *TableRestore) restoreEngine(
metrics, _ := metric.FromContext(ctx)

// Restore table data
ChunkLoop:
for chunkIndex, chunk := range cp.Chunks {
if rc.status != nil && rc.status.backend == config.BackendTiDB {
rc.status.FinishedFileSize.Add(chunk.Chunk.Offset - chunk.Key.Offset)
Expand All @@ -524,9 +525,15 @@ func (tr *TableRestore) restoreEngine(
}
checkFlushLock.Unlock()

failpoint.Inject("orphanWriterGoRoutine", func() {
if chunkIndex > 0 {
<-pCtx.Done()
}
})

select {
case <-pCtx.Done():
return nil, pCtx.Err()
break ChunkLoop
default:
}

Expand Down Expand Up @@ -615,6 +622,11 @@ func (tr *TableRestore) restoreEngine(
}

wg.Wait()
select {
case <-pCtx.Done():
return nil, pCtx.Err()
default:
}

// Report some statistics into the log for debugging.
totalKVSize := uint64(0)
Expand Down
5 changes: 5 additions & 0 deletions br/tests/lightning_checkpoint_chunks/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ for i in $(seq "$CHUNK_COUNT"); do
done
done

PKG="github.com/pingcap/tidb/br/pkg/lightning"
export GO_FAILPOINTS="$PKG/backend/local/orphanWriterGoRoutine=return();$PKG/restore/orphanWriterGoRoutine=return();$PKG/orphanWriterGoRoutine=return()"
# test won't panic
do_run_lightning config

# Set the failpoint to kill the lightning instance as soon as
# one file (after writing totally $ROW_COUNT rows) is imported.
# If checkpoint does work, this should kill exactly $CHUNK_COUNT instances of lightnings.
Expand Down
8 changes: 7 additions & 1 deletion ddl/dist_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ import (
)

// CheckBackfillJobFinishInterval is export for test.
var CheckBackfillJobFinishInterval = 300 * time.Millisecond
var (
CheckBackfillJobFinishInterval = 300 * time.Millisecond
telemetryDistReorgUsage = metrics.TelemetryDistReorgCnt
)

const (
distPhysicalTableConcurrency = 16
Expand All @@ -48,6 +51,9 @@ const (
func initDistReorg(reorgMeta *model.DDLReorgMeta) {
isDistReorg := variable.DDLEnableDistributeReorg.Load()
reorgMeta.IsDistReorg = isDistReorg
if isDistReorg {
metrics.TelemetryDistReorgCnt.Inc()
}
}

// BackfillJobRangeMeta is export for test.
Expand Down
15 changes: 15 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,15 @@ func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *m
func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}

failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) && indexInfo.BackfillState == model.BackfillStateMerging &&
MockDMLExecutionStateMerging != nil {
MockDMLExecutionStateMerging()
}
})

sctx, err1 := w.sessPool.get()
if err1 != nil {
err = err1
Expand Down Expand Up @@ -1789,6 +1798,12 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
// MockDMLExecution is only used for test.
var MockDMLExecution func()

// MockDMLExecutionMerging is only used for test.
var MockDMLExecutionMerging func()

// MockDMLExecutionStateMerging is only used for test.
var MockDMLExecutionStateMerging func()

func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error {
if reorgInfo.mergingTmpIdx {
logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
Expand Down
72 changes: 44 additions & 28 deletions ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -198,6 +199,12 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC
return nil
})

failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) && MockDMLExecutionMerging != nil {
MockDMLExecutionMerging()
}
})
logSlowOperations(time.Since(oprStartTime), "AddIndexMergeDataInTxn", 3000)
return
}
Expand Down Expand Up @@ -252,40 +259,49 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
return false, nil
}

originVal, handle, isDelete, unique, keyVer := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle)
if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
return true, nil
tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle)
if err != nil {
return false, err
}
tempIdxVal = tempIdxVal.FilterOverwritten()

// Extract the operations on the original index and replay them later.
for _, elem := range tempIdxVal {
if elem.KeyVer == tables.TempIndexKeyTypeMerge || elem.KeyVer == tables.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
continue
}

if handle == nil {
// If the handle is not found in the value of the temp index, it means
// 1) This is not a deletion marker, the handle is in the key or the origin value.
// 2) This is a deletion marker, but the handle is in the key of temp index.
handle, err = tablecodec.DecodeIndexHandle(indexKey, originVal, len(w.index.Meta().Columns))
if err != nil {
return false, err
if elem.Handle == nil {
// If the handle is not found in the value of the temp index, it means
// 1) This is not a deletion marker, the handle is in the key or the origin value.
// 2) This is a deletion marker, but the handle is in the key of temp index.
elem.Handle, err = tablecodec.DecodeIndexHandle(indexKey, elem.Value, len(w.index.Meta().Columns))
if err != nil {
return false, err
}
}
}

originIdxKey := make([]byte, len(indexKey))
copy(originIdxKey, indexKey)
tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey)
originIdxKey := make([]byte, len(indexKey))
copy(originIdxKey, indexKey)
tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey)

idxRecord := &temporaryIndexRecord{
handle: handle,
delete: isDelete,
unique: unique,
skip: false,
}
if !isDelete {
idxRecord.vals = originVal
idxRecord.distinct = tablecodec.IndexKVIsUnique(originVal)
idxRecord := &temporaryIndexRecord{
handle: elem.Handle,
delete: elem.Delete,
unique: elem.Distinct,
skip: false,
}
if !elem.Delete {
idxRecord.vals = elem.Value
idxRecord.distinct = tablecodec.IndexKVIsUnique(elem.Value)
}
w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord)
w.originIdxKeys = append(w.originIdxKeys, originIdxKey)
w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey)
}
w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord)
w.originIdxKeys = append(w.originIdxKeys, originIdxKey)
w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey)

lastKey = indexKey
return true, nil
})
Expand Down
1 change: 1 addition & 0 deletions ddl/indexmergetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_test(
"//ddl/internal/callback",
"//ddl/testutil",
"//domain",
"//errno",
"//kv",
"//meta/autoid",
"//parser/model",
Expand Down
Loading

0 comments on commit a608929

Please sign in to comment.