Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

[cherry-pick] restore: split & scatter regions concurrently(tidb#27034) #1429

Merged
merged 4 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (region zapMarshalRegionMarshaler) MarshalLogObject(enc zapcore.ObjectEncod
for _, peer := range region.GetPeers() {
peers = append(peers, peer.String())
}
enc.AddUint64("ID", region.Id)
enc.AddUint64("ID", region.GetId())
enc.AddString("startKey", redact.Key(region.GetStartKey()))
enc.AddString("endKey", redact.Key(region.GetEndKey()))
enc.AddString("epoch", region.GetRegionEpoch().String())
Expand Down
2 changes: 2 additions & 0 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ func (importer *FileImporter) Import(
logutil.Region(info.Region),
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey),
logutil.Key("file-simple-start", file.StartKey),
logutil.Key("file-simple-end", file.EndKey),
logutil.ShortError(e))
continue regionLoop
}
Expand Down
148 changes: 119 additions & 29 deletions pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
Expand Down Expand Up @@ -74,6 +75,7 @@ type brContextManager struct {

// This 'set' of table ID allow us to handle each table just once.
hasTable map[int64]CreatedTable
mu sync.Mutex
}

func (manager *brContextManager) Close(ctx context.Context) {
Expand All @@ -86,6 +88,8 @@ func (manager *brContextManager) Close(ctx context.Context) {

func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTable) error {
placementRuleTables := make([]*model.TableInfo, 0, len(tables))
manager.mu.Lock()
defer manager.mu.Unlock()

for _, tbl := range tables {
if _, ok := manager.hasTable[tbl.Table.ID]; !ok {
Expand All @@ -98,6 +102,8 @@ func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTabl
}

func (manager *brContextManager) Leave(ctx context.Context, tables []CreatedTable) error {
manager.mu.Lock()
defer manager.mu.Unlock()
placementRuleTables := make([]*model.TableInfo, 0, len(tables))

for _, table := range tables {
Expand Down Expand Up @@ -183,6 +189,8 @@ type tikvSender struct {
inCh chan<- DrainResult

wg *sync.WaitGroup

tableWaiters *sync.Map
}

func (b *tikvSender) PutSink(sink TableSink) {
Expand All @@ -192,6 +200,7 @@ func (b *tikvSender) PutSink(sink TableSink) {
}

func (b *tikvSender) RestoreBatch(ranges DrainResult) {
log.Info("restore batch: waiting ranges", zap.Int("range", len(b.inCh)))
b.inCh <- ranges
}

Expand All @@ -200,29 +209,52 @@ func NewTiKVSender(
ctx context.Context,
cli *Client,
updateCh glue.Progress,
splitConcurrency uint,
) (BatchSender, error) {
inCh := make(chan DrainResult, defaultChannelSize)
midCh := make(chan DrainResult, defaultChannelSize)
midCh := make(chan drainResultAndDone, defaultChannelSize)

sender := &tikvSender{
client: cli,
updateCh: updateCh,
inCh: inCh,
wg: new(sync.WaitGroup),
client: cli,
updateCh: updateCh,
inCh: inCh,
wg: new(sync.WaitGroup),
tableWaiters: new(sync.Map),
}

sender.wg.Add(2)
go sender.splitWorker(ctx, inCh, midCh)
go sender.splitWorker(ctx, inCh, midCh, splitConcurrency)
go sender.restoreWorker(ctx, midCh)
return sender, nil
}

func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, next chan<- DrainResult) {
func (b *tikvSender) Close() {
close(b.inCh)
b.wg.Wait()
log.Debug("tikv sender closed")
}

type drainResultAndDone struct {
result DrainResult
done func()
}

func (b *tikvSender) splitWorker(ctx context.Context,
ranges <-chan DrainResult,
next chan<- drainResultAndDone,
concurrency uint,
) {
defer log.Debug("split worker closed")
eg, ectx := errgroup.WithContext(ctx)
defer func() {
b.wg.Done()
if err := eg.Wait(); err != nil {
b.sink.EmitError(err)
return
}
close(next)
}()
pool := utils.NewWorkerPool(concurrency, "split")
for {
select {
case <-ctx.Done():
Expand All @@ -231,44 +263,102 @@ func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult,
if !ok {
return
}
if err := SplitRanges(ctx, b.client, result.Ranges, result.RewriteRules, b.updateCh); err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
b.sink.EmitError(err)
return
}
next <- result
// When the batcher has sent all ranges from a table, it would
// mark this table 'all done'(BlankTablesAfterSend), and then we can send it to checksum.
//
// When there a sole worker sequentially running those batch tasks, everything is fine, however,
// in the context of multi-workers, that become buggy, for example:
// |------table 1, ranges 1------|------table 1, ranges 2------|
// The batcher send batches: [
// {Ranges: ranges 1},
// {Ranges: ranges 2, BlankTablesAfterSend: table 1}
// ]
// And there are two workers runs concurrently:
// worker 1: {Ranges: ranges 1}
// worker 2: {Ranges: ranges 2, BlankTablesAfterSend: table 1}
// And worker 2 finished its job before worker 1 done. Note the table wasn't restored fully,
// hence the checksum would fail.
done := b.registerTableIsRestoring(result.TablesToSend)
pool.ApplyOnErrorGroup(eg, func() error {
err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh)
if err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
return err
}
next <- drainResultAndDone{
result: result,
done: done,
}
return nil
})
}
}
}

// registerTableIsRestoring marks some tables as 'current restoring'.
// Returning a function that mark the restore has been done.
func (b *tikvSender) registerTableIsRestoring(ts []CreatedTable) func() {
wgs := make([]*sync.WaitGroup, 0, len(ts))
for _, t := range ts {
i, _ := b.tableWaiters.LoadOrStore(t.Table.ID, new(sync.WaitGroup))
wg := i.(*sync.WaitGroup)
wg.Add(1)
wgs = append(wgs, wg)
}
return func() {
for _, wg := range wgs {
wg.Done()
}
}
}

// waitTablesDone block the current goroutine,
// till all tables provided are no more ‘current restoring’.
func (b *tikvSender) waitTablesDone(ts []CreatedTable) {
for _, t := range ts {
wg, ok := b.tableWaiters.LoadAndDelete(t.Table.ID)
if !ok {
log.Panic("bug! table done before register!",
zap.Any("wait-table-map", b.tableWaiters),
zap.Stringer("table", t.Table.Name))
}
wg.(*sync.WaitGroup).Wait()
}
}

func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResult) {
func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResultAndDone) {
eg, ectx := errgroup.WithContext(ctx)
defer func() {
log.Debug("restore worker closed")
if err := eg.Wait(); err != nil {
b.sink.EmitError(err)
return
}
b.wg.Done()
b.sink.Close()
}()
for {
select {
case <-ctx.Done():
return
case result, ok := <-ranges:
case r, ok := <-ranges:
if !ok {
return
}
files := result.Files()
if err := b.client.RestoreFiles(ctx, files, result.RewriteRules, b.updateCh); err != nil {
b.sink.EmitError(err)
return
}

log.Info("restore batch done", rtree.ZapRanges(result.Ranges))
b.sink.EmitTables(result.BlankTablesAfterSend...)
files := r.result.Files()
// There has been a worker in the `RestoreFiles` procedure.
// Spawning a raw goroutine won't make too many requests to TiKV.
eg.Go(func() error {
e := b.client.RestoreFiles(ectx, files, r.result.RewriteRules, b.updateCh)
if e != nil {
return e
}
log.Info("restore batch done", rtree.ZapRanges(r.result.Ranges))
r.done()
b.waitTablesDone(r.result.BlankTablesAfterSend)
b.sink.EmitTables(r.result.BlankTablesAfterSend...)
return nil
})
}
}
}

func (b *tikvSender) Close() {
close(b.inCh)
b.wg.Wait()
log.Debug("tikv sender closed")
}
2 changes: 2 additions & 0 deletions pkg/restore/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ func SortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range
"rewrite start key",
logutil.Key("key", rg.StartKey), logutil.RewriteRule(rule))
}
oldKey := rg.EndKey
rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules)
if rule == nil {
log.Warn("cannot find rewrite rule", logutil.Key("key", rg.EndKey))
} else {
log.Debug(
"rewrite end key",
logutil.Key("origin-key", oldKey),
logutil.Key("key", rg.EndKey),
logutil.RewriteRule(rule))
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,13 @@ SplitRegions:
for i := 0; i < SplitRetryTimes; i++ {
regions, errScan := PaginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit)
if errScan != nil {
if berrors.ErrPDBatchScanRegion.Equal(errScan) {
log.Warn("inconsistent region info get.", logutil.ShortError(errScan))
time.Sleep(time.Second)
continue SplitRegions
}
return errors.Trace(errScan)
}
if len(regions) == 0 {
log.Warn("split regions cannot scan any region")
return nil
}
splitKeyMap := GetSplitKeys(rewriteRules, sortedRanges, regions)
regionMap := make(map[uint64]*RegionInfo)
for _, region := range regions {
Expand Down
36 changes: 32 additions & 4 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@ const (
FlagMergeRegionSizeBytes = "merge-region-size-bytes"
// FlagMergeRegionKeyCount is the flag name of merge small regions by key count
FlagMergeRegionKeyCount = "merge-region-key-count"
// FlagPDConcurrency controls concurrency pd-relative operations like split & scatter.
FlagPDConcurrency = "pd-concurrency"
// FlagBatchFlushInterval controls after how long the restore batch would be auto sended.
FlagBatchFlushInterval = "batch-flush-interval"

defaultRestoreConcurrency = 128
maxRestoreBatchSizeLimit = 10240
defaultPDConcurrency = 1
defaultBatchFlushInterval = 16 * time.Second
defaultDDLConcurrency = 16
)

Expand Down Expand Up @@ -70,9 +76,15 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) {
flags.Uint64(FlagMergeRegionSizeBytes, restore.DefaultMergeRegionSizeBytes,
"the threshold of merging small regions (Default 96MB, region split size)")
flags.Uint64(FlagMergeRegionKeyCount, restore.DefaultMergeRegionKeyCount,
"the threshold of merging smalle regions (Default 960_000, region split key count)")
"the threshold of merging small regions (Default 960_000, region split key count)")
flags.Uint(FlagPDConcurrency, defaultPDConcurrency,
"concurrency pd-relative operations like split & scatter.")
flags.Duration(FlagBatchFlushInterval, defaultBatchFlushInterval,
"after how long a restore batch would be auto sended.")
_ = flags.MarkHidden(FlagMergeRegionSizeBytes)
_ = flags.MarkHidden(FlagMergeRegionKeyCount)
_ = flags.MarkHidden(FlagPDConcurrency)
_ = flags.MarkHidden(FlagBatchFlushInterval)
}

// ParseFromFlags parses the config from the flag set.
Expand All @@ -98,7 +110,9 @@ type RestoreConfig struct {
Config
RestoreCommonConfig

NoSchema bool `json:"no-schema" toml:"no-schema"`
NoSchema bool `json:"no-schema" toml:"no-schema"`
PDConcurrency uint `json:"pd-concurrency" toml:"pd-concurrency"`
BatchFlushInterval time.Duration `json:"batch-flush-interval" toml:"batch-flush-interval"`
}

// DefineRestoreFlags defines common flags for the restore tidb command.
Expand Down Expand Up @@ -129,6 +143,14 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = defaultRestoreConcurrency
}
cfg.PDConcurrency, err = flags.GetUint(FlagPDConcurrency)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagPDConcurrency)
}
cfg.BatchFlushInterval, err = flags.GetDuration(FlagBatchFlushInterval)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagBatchFlushInterval)
}
return nil
}

Expand All @@ -146,6 +168,12 @@ func (cfg *RestoreConfig) adjustRestoreConfig() {
if cfg.Config.SwitchModeInterval == 0 {
cfg.Config.SwitchModeInterval = defaultSwitchInterval
}
if cfg.PDConcurrency == 0 {
cfg.PDConcurrency = defaultPDConcurrency
}
if cfg.BatchFlushInterval == 0 {
cfg.BatchFlushInterval = defaultBatchFlushInterval
}
}

// CheckRestoreDBAndTable is used to check whether the restore dbs or tables have been backup
Expand Down Expand Up @@ -379,14 +407,14 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
int64(rangeSize+len(files)+len(tables)),
!cfg.LogProgress)
defer updateCh.Close()
sender, err := restore.NewTiKVSender(ctx, client, updateCh)
sender, err := restore.NewTiKVSender(ctx, client, updateCh, cfg.PDConcurrency)
if err != nil {
return errors.Trace(err)
}
manager := restore.NewBRContextManager(client)
batcher, afterRestoreStream := restore.NewBatcher(ctx, sender, manager, errCh)
batcher.SetThreshold(batchSize)
batcher.EnableAutoCommit(ctx, time.Second)
batcher.EnableAutoCommit(ctx, cfg.BatchFlushInterval)
go restoreTableStream(ctx, rangeStream, batcher, errCh)

var finish <-chan struct{}
Expand Down
12 changes: 11 additions & 1 deletion pkg/utils/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ func NewWorkerPool(limit uint, name string) *WorkerPool {
}
}

// IdleCount counts how many idle workers in the pool.
func (pool *WorkerPool) IdleCount() int {
return len(pool.workers)
}

// Limit is the limit of the pool
func (pool *WorkerPool) Limit() int {
return int(pool.limit)
}

// Apply executes a task.
func (pool *WorkerPool) Apply(fn taskFunc) {
worker := pool.apply()
Expand Down Expand Up @@ -93,5 +103,5 @@ func (pool *WorkerPool) recycle(worker *Worker) {

// HasWorker checks if the pool has unallocated workers.
func (pool *WorkerPool) HasWorker() bool {
return len(pool.workers) > 0
return pool.IdleCount() > 0
}