Skip to content

Commit

Permalink
ddl: use kv.Key as reorg progress marker instead of kv.Handle (#20908)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Nov 11, 2020
1 parent 0c3b171 commit d184120
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 276 deletions.
178 changes: 91 additions & 87 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
package ddl

import (
"bytes"
"context"
"math"
"fmt"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -120,14 +121,14 @@ type backfiller interface {
type backfillResult struct {
addedCount int
scanCount int
nextHandle kv.Handle
nextKey kv.Key
err error
}

// backfillTaskContext is the context of the batch adding indices or updating column values.
// After finishing the batch adding indices or updating column values, result in backfillTaskContext will be merged into backfillResult.
type backfillTaskContext struct {
nextHandle kv.Handle
nextKey kv.Key
done bool
addedCount int
scanCount int
Expand Down Expand Up @@ -175,20 +176,15 @@ func closeBackfillWorkers(workers []*backfillWorker) {

type reorgBackfillTask struct {
physicalTableID int64
startHandle kv.Handle
endHandle kv.Handle
// endIncluded indicates whether the range include the endHandle.
// When the last handle is math.MaxInt64, set endIncluded to true to
// tell worker backfilling index of endHandle.
endIncluded bool
startKey kv.Key
endKey kv.Key
}

func (r *reorgBackfillTask) String() string {
rightParenthesis := ")"
if r.endIncluded {
rightParenthesis = "]"
}
return "physicalTableID_" + strconv.FormatInt(r.physicalTableID, 10) + "_" + "[" + r.startHandle.String() + "," + r.endHandle.String() + rightParenthesis
physicalID := strconv.FormatInt(r.physicalTableID, 10)
startKey := tryDecodeToHandleString(r.startKey)
endKey := tryDecodeToHandleString(r.endKey)
return "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey + "]"
}

func logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) {
Expand All @@ -203,7 +199,7 @@ func logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32)

// mergeBackfillCtxToResult merge partial result in taskCtx into result.
func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResult) {
result.nextHandle = taskCtx.nextHandle
result.nextKey = taskCtx.nextKey
result.addedCount += taskCtx.addedCount
result.scanCount += taskCtx.scanCount
}
Expand All @@ -226,7 +222,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
result := &backfillResult{
err: nil,
addedCount: 0,
nextHandle: handleRange.startHandle,
nextKey: handleRange.startKey,
}
lastLogCount := 0
lastLogTime := time.Now()
Expand Down Expand Up @@ -265,19 +261,25 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,

if num := result.scanCount - lastLogCount; num >= 30000 {
lastLogCount = result.scanCount
logutil.BgLogger().Info("[ddl] backfill worker back fill index", zap.Int("workerID", w.id), zap.Int("addedCount", result.addedCount),
zap.Int("scanCount", result.scanCount), zap.String("nextHandle", toString(taskCtx.nextHandle)), zap.Float64("speed(rows/s)", float64(num)/time.Since(lastLogTime).Seconds()))
logutil.BgLogger().Info("[ddl] backfill worker back fill index",
zap.Int("workerID", w.id),
zap.Int("addedCount", result.addedCount),
zap.Int("scanCount", result.scanCount),
zap.String("nextHandle", tryDecodeToHandleString(taskCtx.nextKey)),
zap.Float64("speed(rows/s)", float64(num)/time.Since(lastLogTime).Seconds()))
lastLogTime = time.Now()
}

handleRange.startHandle = taskCtx.nextHandle
handleRange.startKey = taskCtx.nextKey
if taskCtx.done {
break
}
}
logutil.BgLogger().Info("[ddl] backfill worker finish task", zap.Int("workerID", w.id),
zap.String("task", task.String()), zap.Int("addedCount", result.addedCount),
zap.Int("scanCount", result.scanCount), zap.String("nextHandle", toString(result.nextHandle)),
zap.String("task", task.String()),
zap.Int("addedCount", result.addedCount),
zap.Int("scanCount", result.scanCount),
zap.String("nextHandle", tryDecodeToHandleString(result.nextKey)),
zap.String("takeTime", time.Since(startTime).String()))
return result
}
Expand All @@ -297,7 +299,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller) {
logutil.BgLogger().Debug("[ddl] backfill worker got task", zap.Int("workerID", w.id), zap.String("task", task.String()))
failpoint.Inject("mockBackfillRunErr", func() {
if w.id == 0 {
result := &backfillResult{addedCount: 0, nextHandle: nil, err: errors.Errorf("mock backfill error")}
result := &backfillResult{addedCount: 0, nextKey: nil, err: errors.Errorf("mock backfill error")}
w.resultCh <- result
failpoint.Continue()
}
Expand All @@ -314,13 +316,12 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller) {
// splitTableRanges uses PD region's key ranges to split the backfilling table key range space,
// to speed up backfilling data in table with disperse handle.
// The `t` should be a non-partitioned table or a partition.
func splitTableRanges(t table.PhysicalTable, store kv.Storage, startHandle, endHandle kv.Handle) ([]kv.KeyRange, error) {
startRecordKey := t.RecordKey(startHandle)
endRecordKey := t.RecordKey(endHandle)

logutil.BgLogger().Info("[ddl] split table range from PD", zap.Int64("physicalTableID", t.GetPhysicalID()),
zap.String("startHandle", toString(startHandle)), zap.String("endHandle", toString(endHandle)))
kvRange := kv.KeyRange{StartKey: startRecordKey, EndKey: endRecordKey}
func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key) ([]kv.KeyRange, error) {
logutil.BgLogger().Info("[ddl] split table range from PD",
zap.Int64("physicalTableID", t.GetPhysicalID()),
zap.String("startHandle", tryDecodeToHandleString(startKey)),
zap.String("endHandle", tryDecodeToHandleString(endKey)))
kvRange := kv.KeyRange{StartKey: startKey, EndKey: endKey}
s, ok := store.(tikv.Storage)
if !ok {
// Only support split ranges in tikv.Storage now.
Expand All @@ -339,10 +340,11 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startHandle, endH
return ranges, nil
}

func (w *worker) waitTaskResults(workers []*backfillWorker, taskCnt int, totalAddedCount *int64, startHandle kv.Handle) (kv.Handle, int64, error) {
func (w *worker) waitTaskResults(workers []*backfillWorker, taskCnt int,
totalAddedCount *int64, startKey kv.Key) (kv.Key, int64, error) {
var (
addedCount int64
nextHandle = startHandle
nextKey = startKey
firstErr error
)
for i := 0; i < taskCnt; i++ {
Expand All @@ -362,11 +364,11 @@ func (w *worker) waitTaskResults(workers []*backfillWorker, taskCnt int, totalAd
if firstErr == nil {
*totalAddedCount += int64(result.addedCount)
addedCount += int64(result.addedCount)
nextHandle = result.nextHandle
nextKey = result.nextKey
}
}

return nextHandle, addedCount, errors.Trace(firstErr)
return nextKey, addedCount, errors.Trace(firstErr)
}

// handleReorgTasks sends tasks to workers, and waits for all the running workers to return results,
Expand All @@ -376,69 +378,78 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64,
workers[i].taskCh <- task
}

startHandle := batchTasks[0].startHandle
startKey := batchTasks[0].startKey
taskCnt := len(batchTasks)
startTime := time.Now()
nextHandle, taskAddedCount, err := w.waitTaskResults(workers, taskCnt, totalAddedCount, startHandle)
nextKey, taskAddedCount, err := w.waitTaskResults(workers, taskCnt, totalAddedCount, startKey)
elapsedTime := time.Since(startTime)
if err == nil {
err = w.isReorgRunnable(reorgInfo.d)
}

if err != nil {
// Update the reorg handle that has been processed.
err1 := reorgInfo.UpdateReorgMeta(nextHandle)
err1 := reorgInfo.UpdateReorgMeta(nextKey)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds())
logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey), zap.Int64("elementID", reorgInfo.currElement.ID),
zap.Int64("totalAddedCount", *totalAddedCount), zap.String("startHandle", toString(startHandle)),
zap.String("nextHandle", toString(nextHandle)), zap.Int64("batchAddedCount", taskAddedCount),
zap.String("taskFailedError", err.Error()), zap.String("takeTime", elapsedTime.String()),
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
zap.Int64("elementID", reorgInfo.currElement.ID),
zap.Int64("totalAddedCount", *totalAddedCount),
zap.String("startHandle", tryDecodeToHandleString(startKey)),
zap.String("nextHandle", tryDecodeToHandleString(nextKey)),
zap.Int64("batchAddedCount", taskAddedCount),
zap.String("taskFailedError", err.Error()),
zap.String("takeTime", elapsedTime.String()),
zap.NamedError("updateHandleError", err1))
return errors.Trace(err)
}

// nextHandle will be updated periodically in runReorgJob, so no need to update it here.
w.reorgCtx.setNextHandle(nextHandle)
w.reorgCtx.setNextKey(nextKey)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds())
logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey), zap.Int64("elementID", reorgInfo.currElement.ID),
zap.Int64("totalAddedCount", *totalAddedCount), zap.String("startHandle", toString(startHandle)),
zap.String("nextHandle", toString(nextHandle)), zap.Int64("batchAddedCount", taskAddedCount), zap.String("takeTime", elapsedTime.String()))
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
zap.Int64("elementID", reorgInfo.currElement.ID),
zap.Int64("totalAddedCount", *totalAddedCount),
zap.String("startHandle", tryDecodeToHandleString(startKey)),
zap.String("nextHandle", tryDecodeToHandleString(nextKey)),
zap.Int64("batchAddedCount", taskAddedCount),
zap.String("takeTime", elapsedTime.String()))
return nil
}

func decodeHandleRange(keyRange kv.KeyRange) (kv.Handle, kv.Handle, error) {
startHandle, err := tablecodec.DecodeRowKey(keyRange.StartKey)
func tryDecodeToHandleString(key kv.Key) string {
handle, err := tablecodec.DecodeRowKey(key)
if err != nil {
return nil, nil, errors.Trace(err)
}
endHandle, err := tablecodec.DecodeRowKey(keyRange.EndKey)
if err != nil {
return nil, nil, errors.Trace(err)
recordPrefixIdx := bytes.Index(key, []byte("_r"))
if recordPrefixIdx == -1 {
return fmt.Sprintf("key: %x", key)
}
handleBytes := key[recordPrefixIdx+2:]
terminatedWithZero := len(handleBytes) > 0 && handleBytes[len(handleBytes)-1] == 0
if terminatedWithZero {
handle, err := tablecodec.DecodeRowKey(key[:len(key)-1])
if err == nil {
return handle.String() + ".next"
}
}
return fmt.Sprintf("%x", handleBytes)
}

return startHandle, endHandle, nil
return handle.String()
}

// sendRangeTaskToWorkers sends tasks to workers, and returns remaining kvRanges that is not handled.
func (w *worker) sendRangeTaskToWorkers(workers []*backfillWorker, reorgInfo *reorgInfo,
totalAddedCount *int64, kvRanges []kv.KeyRange, globalEndHandle kv.Handle) ([]kv.KeyRange, error) {
totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) {
batchTasks := make([]*reorgBackfillTask, 0, len(workers))
physicalTableID := reorgInfo.PhysicalTableID

// Build reorg tasks.
for _, keyRange := range kvRanges {
startHandle, endHandle, err := decodeHandleRange(keyRange)
if err != nil {
return nil, errors.Trace(err)
}

endIncluded := false
if endHandle.Equal(globalEndHandle) {
endIncluded = true
}
task := &reorgBackfillTask{physicalTableID, startHandle, endHandle, endIncluded}
task := &reorgBackfillTask{
physicalTableID: physicalTableID,
startKey: keyRange.StartKey,
endKey: keyRange.EndKey}
batchTasks = append(batchTasks, task)

if len(batchTasks) >= len(workers) {
Expand Down Expand Up @@ -520,7 +531,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
job := reorgInfo.Job
totalAddedCount := job.GetRowCount()

startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t)
if err != nil {
Expand All @@ -530,7 +541,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
if err := w.isReorgRunnable(reorgInfo.d); err != nil {
return errors.Trace(err)
}
if startHandle == nil && endHandle == nil {
if startKey == nil && endKey == nil {
return nil
}

Expand All @@ -548,7 +559,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
}()

for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle)
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -619,48 +630,41 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
}
})

logutil.BgLogger().Info("[ddl] start backfill workers to reorg record", zap.Int("workerCnt", len(backfillWorkers)),
zap.Int("regionCnt", len(kvRanges)), zap.String("startHandle", toString(startHandle)), zap.String("endHandle", toString(endHandle)))
remains, err := w.sendRangeTaskToWorkers(backfillWorkers, reorgInfo, &totalAddedCount, kvRanges, endHandle)
logutil.BgLogger().Info("[ddl] start backfill workers to reorg record",
zap.Int("workerCnt", len(backfillWorkers)),
zap.Int("regionCnt", len(kvRanges)),
zap.String("startHandle", tryDecodeToHandleString(startKey)),
zap.String("endHandle", tryDecodeToHandleString(endKey)))
remains, err := w.sendRangeTaskToWorkers(backfillWorkers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
}

if len(remains) == 0 {
break
}
startHandle, _, err = decodeHandleRange(remains[0])
if err != nil {
return errors.Trace(err)
}
startKey = remains[0].StartKey
}
return nil
}

// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error)

func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64, startHandle kv.Handle, endHandle kv.Handle, endIncluded bool, fn recordIterFunc) error {
func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64,
startKey kv.Key, endKey kv.Key, fn recordIterFunc) error {
var firstKey kv.Key
if startHandle == nil {
if startKey == nil {
firstKey = t.RecordPrefix()
} else {
firstKey = t.RecordKey(startHandle)
firstKey = startKey
}

var upperBound kv.Key
if endHandle == nil {
if endKey == nil {
upperBound = t.RecordPrefix().PrefixNext()
} else {
if endIncluded {
if endHandle.IsInt() && endHandle.IntValue() == math.MaxInt64 {
upperBound = t.RecordKey(endHandle).PrefixNext()
} else {
upperBound = t.RecordKey(endHandle.Next())
}
} else {
upperBound = t.RecordKey(endHandle)
}
upperBound = endKey.PrefixNext()
}

ver := kv.Version{Ver: version}
Expand Down
Loading

0 comments on commit d184120

Please sign in to comment.