Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

[cherry-pick] restore: split & scatter regions concurrently(tidb#27034) #1429

Merged
merged 4 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (region zapMarshalRegionMarshaler) MarshalLogObject(enc zapcore.ObjectEncod
for _, peer := range region.GetPeers() {
peers = append(peers, peer.String())
}
enc.AddUint64("ID", region.Id)
enc.AddUint64("ID", region.GetId())
enc.AddString("startKey", redact.Key(region.GetStartKey()))
enc.AddString("endKey", redact.Key(region.GetEndKey()))
enc.AddString("epoch", region.GetRegionEpoch().String())
Expand Down
2 changes: 2 additions & 0 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ func (importer *FileImporter) Import(
logutil.Region(info.Region),
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey),
logutil.Key("file-simple-start", file.StartKey),
logutil.Key("file-simple-end", file.EndKey),
logutil.ShortError(e))
continue regionLoop
}
Expand Down
149 changes: 120 additions & 29 deletions pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
Expand Down Expand Up @@ -74,6 +75,7 @@ type brContextManager struct {

// This 'set' of table ID allow us to handle each table just once.
hasTable map[int64]CreatedTable
mu sync.Mutex
}

func (manager *brContextManager) Close(ctx context.Context) {
Expand All @@ -86,6 +88,8 @@ func (manager *brContextManager) Close(ctx context.Context) {

func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTable) error {
placementRuleTables := make([]*model.TableInfo, 0, len(tables))
manager.mu.Lock()
defer manager.mu.Unlock()

for _, tbl := range tables {
if _, ok := manager.hasTable[tbl.Table.ID]; !ok {
Expand All @@ -98,6 +102,8 @@ func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTabl
}

func (manager *brContextManager) Leave(ctx context.Context, tables []CreatedTable) error {
manager.mu.Lock()
defer manager.mu.Unlock()
placementRuleTables := make([]*model.TableInfo, 0, len(tables))

for _, table := range tables {
Expand Down Expand Up @@ -183,6 +189,8 @@ type tikvSender struct {
inCh chan<- DrainResult

wg *sync.WaitGroup

tableWaiters *sync.Map
}

func (b *tikvSender) PutSink(sink TableSink) {
Expand All @@ -192,6 +200,7 @@ func (b *tikvSender) PutSink(sink TableSink) {
}

func (b *tikvSender) RestoreBatch(ranges DrainResult) {
log.Info("restore batch: waiting ranges", zap.Int("range", len(b.inCh)))
b.inCh <- ranges
}

Expand All @@ -200,29 +209,52 @@ func NewTiKVSender(
ctx context.Context,
cli *Client,
updateCh glue.Progress,
splitConcurrency uint,
) (BatchSender, error) {
inCh := make(chan DrainResult, defaultChannelSize)
midCh := make(chan DrainResult, defaultChannelSize)
midCh := make(chan drainResultAndDone, defaultChannelSize)

sender := &tikvSender{
client: cli,
updateCh: updateCh,
inCh: inCh,
wg: new(sync.WaitGroup),
client: cli,
updateCh: updateCh,
inCh: inCh,
wg: new(sync.WaitGroup),
tableWaiters: new(sync.Map),
}

sender.wg.Add(2)
go sender.splitWorker(ctx, inCh, midCh)
go sender.splitWorker(ctx, inCh, midCh, splitConcurrency)
go sender.restoreWorker(ctx, midCh)
return sender, nil
}

func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, next chan<- DrainResult) {
func (b *tikvSender) Close() {
close(b.inCh)
b.wg.Wait()
log.Debug("tikv sender closed")
}

type drainResultAndDone struct {
result DrainResult
done func()
}

func (b *tikvSender) splitWorker(ctx context.Context,
ranges <-chan DrainResult,
next chan<- drainResultAndDone,
concurrency uint,
) {
defer log.Debug("split worker closed")
eg, ectx := errgroup.WithContext(ctx)
defer func() {
b.wg.Done()
if err := eg.Wait(); err != nil {
b.sink.EmitError(err)
return
}
close(next)
}()
pool := utils.NewWorkerPool(concurrency, "split")
for {
select {
case <-ctx.Done():
Expand All @@ -231,44 +263,103 @@ func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult,
if !ok {
return
}
if err := SplitRanges(ctx, b.client, result.Ranges, result.RewriteRules, b.updateCh); err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
b.sink.EmitError(err)
return
}
next <- result
// When the batcher has sent all ranges from a table, it would
// mark this table 'all done'(BlankTablesAfterSend), and then we can send it to checksum.
//
// When there a sole worker sequentially running those batch tasks, everything is fine, however,
// in the context of multi-workers, that become buggy, for example:
// |------table 1, ranges 1------|------table 1, ranges 2------|
// The batcher send batches: [
// {Ranges: ranges 1},
// {Ranges: ranges 2, BlankTablesAfterSend: table 1}
// ]
// And there are two workers runs concurrently:
// worker 1: {Ranges: ranges 1}
// worker 2: {Ranges: ranges 2, BlankTablesAfterSend: table 1}
// And worker 2 finished its job before worker 1 done. Note the table wasn't restored fully,
// hence the checksum would fail.
done := b.registerTableIsRestoring(result.TablesToSend)
pool.ApplyOnErrorGroup(eg, func() error {
err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh)
if err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
return err
}
next <- drainResultAndDone{
result: result,
done: done,
}
return nil
})
}
}
}

// registerTableIsRestoring marks some tables as 'current restoring'.
// Returning a function that mark the restore has been done.
func (b *tikvSender) registerTableIsRestoring(ts []CreatedTable) func() {
wgs := make([]*sync.WaitGroup, 0, len(ts))
for _, t := range ts {
i, _ := b.tableWaiters.LoadOrStore(t.Table.ID, new(sync.WaitGroup))
wg := i.(*sync.WaitGroup)
wg.Add(1)
wgs = append(wgs, wg)
}
return func() {
for _, wg := range wgs {
wg.Done()
}
}
}

// waitTablesDone block the current goroutine,
// till all tables provided are no more ‘current restoring’.
func (b *tikvSender) waitTablesDone(ts []CreatedTable) {
for _, t := range ts {
wg, ok := b.tableWaiters.Load(t.Table.ID)
if !ok {
log.Panic("bug! table done before register!",
zap.Any("wait-table-map", b.tableWaiters),
zap.Stringer("table", t.Table.Name))
}
b.tableWaiters.Delete(t.Table.ID)
wg.(*sync.WaitGroup).Wait()
}
}

func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResult) {
func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResultAndDone) {
eg, ectx := errgroup.WithContext(ctx)
defer func() {
log.Debug("restore worker closed")
if err := eg.Wait(); err != nil {
b.sink.EmitError(err)
return
}
b.wg.Done()
b.sink.Close()
}()
for {
select {
case <-ctx.Done():
return
case result, ok := <-ranges:
case r, ok := <-ranges:
if !ok {
return
}
files := result.Files()
if err := b.client.RestoreFiles(ctx, files, result.RewriteRules, b.updateCh); err != nil {
b.sink.EmitError(err)
return
}

log.Info("restore batch done", rtree.ZapRanges(result.Ranges))
b.sink.EmitTables(result.BlankTablesAfterSend...)
files := r.result.Files()
// There has been a worker in the `RestoreFiles` procedure.
// Spawning a raw goroutine won't make too many requests to TiKV.
eg.Go(func() error {
e := b.client.RestoreFiles(ectx, files, r.result.RewriteRules, b.updateCh)
if e != nil {
return e
}
log.Info("restore batch done", rtree.ZapRanges(r.result.Ranges))
r.done()
b.waitTablesDone(r.result.BlankTablesAfterSend)
b.sink.EmitTables(r.result.BlankTablesAfterSend...)
return nil
})
}
}
}

func (b *tikvSender) Close() {
close(b.inCh)
b.wg.Wait()
log.Debug("tikv sender closed")
}
2 changes: 2 additions & 0 deletions pkg/restore/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ func SortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range
"rewrite start key",
logutil.Key("key", rg.StartKey), logutil.RewriteRule(rule))
}
oldKey := rg.EndKey
rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules)
if rule == nil {
log.Warn("cannot find rewrite rule", logutil.Key("key", rg.EndKey))
} else {
log.Debug(
"rewrite end key",
logutil.Key("origin-key", oldKey),
logutil.Key("key", rg.EndKey),
logutil.RewriteRule(rule))
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,13 @@ SplitRegions:
for i := 0; i < SplitRetryTimes; i++ {
regions, errScan := PaginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit)
if errScan != nil {
if berrors.ErrPDBatchScanRegion.Equal(errScan) {
log.Warn("inconsistent region info get.", logutil.ShortError(errScan))
time.Sleep(time.Second)
continue SplitRegions
}
return errors.Trace(errScan)
}
if len(regions) == 0 {
log.Warn("split regions cannot scan any region")
return nil
}
splitKeyMap := GetSplitKeys(rewriteRules, sortedRanges, regions)
regionMap := make(map[uint64]*RegionInfo)
for _, region := range regions {
Expand Down
36 changes: 32 additions & 4 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@ const (
FlagMergeRegionSizeBytes = "merge-region-size-bytes"
// FlagMergeRegionKeyCount is the flag name of merge small regions by key count
FlagMergeRegionKeyCount = "merge-region-key-count"
// FlagPDConcurrency controls concurrency pd-relative operations like split & scatter.
FlagPDConcurrency = "pd-concurrency"
// FlagBatchFlushInterval controls after how long the restore batch would be auto sended.
FlagBatchFlushInterval = "batch-flush-interval"

defaultRestoreConcurrency = 128
maxRestoreBatchSizeLimit = 10240
defaultPDConcurrency = 1
defaultBatchFlushInterval = 16 * time.Second
defaultDDLConcurrency = 16
)

Expand Down Expand Up @@ -70,9 +76,15 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) {
flags.Uint64(FlagMergeRegionSizeBytes, restore.DefaultMergeRegionSizeBytes,
"the threshold of merging small regions (Default 96MB, region split size)")
flags.Uint64(FlagMergeRegionKeyCount, restore.DefaultMergeRegionKeyCount,
"the threshold of merging smalle regions (Default 960_000, region split key count)")
"the threshold of merging small regions (Default 960_000, region split key count)")
flags.Uint(FlagPDConcurrency, defaultPDConcurrency,
"concurrency pd-relative operations like split & scatter.")
flags.Duration(FlagBatchFlushInterval, defaultBatchFlushInterval,
"after how long a restore batch would be auto sended.")
_ = flags.MarkHidden(FlagMergeRegionSizeBytes)
_ = flags.MarkHidden(FlagMergeRegionKeyCount)
_ = flags.MarkHidden(FlagPDConcurrency)
_ = flags.MarkHidden(FlagBatchFlushInterval)
}

// ParseFromFlags parses the config from the flag set.
Expand All @@ -98,7 +110,9 @@ type RestoreConfig struct {
Config
RestoreCommonConfig

NoSchema bool `json:"no-schema" toml:"no-schema"`
NoSchema bool `json:"no-schema" toml:"no-schema"`
PDConcurrency uint `json:"pd-concurrency" toml:"pd-concurrency"`
BatchFlushInterval time.Duration `json:"batch-flush-interval" toml:"batch-flush-interval"`
}

// DefineRestoreFlags defines common flags for the restore tidb command.
Expand Down Expand Up @@ -129,6 +143,14 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = defaultRestoreConcurrency
}
cfg.PDConcurrency, err = flags.GetUint(FlagPDConcurrency)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagPDConcurrency)
}
cfg.BatchFlushInterval, err = flags.GetDuration(FlagBatchFlushInterval)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagBatchFlushInterval)
}
return nil
}

Expand All @@ -146,6 +168,12 @@ func (cfg *RestoreConfig) adjustRestoreConfig() {
if cfg.Config.SwitchModeInterval == 0 {
cfg.Config.SwitchModeInterval = defaultSwitchInterval
}
if cfg.PDConcurrency == 0 {
cfg.PDConcurrency = defaultPDConcurrency
}
if cfg.BatchFlushInterval == 0 {
cfg.BatchFlushInterval = defaultBatchFlushInterval
}
}

// CheckRestoreDBAndTable is used to check whether the restore dbs or tables have been backup
Expand Down Expand Up @@ -379,14 +407,14 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
int64(rangeSize+len(files)+len(tables)),
!cfg.LogProgress)
defer updateCh.Close()
sender, err := restore.NewTiKVSender(ctx, client, updateCh)
sender, err := restore.NewTiKVSender(ctx, client, updateCh, cfg.PDConcurrency)
if err != nil {
return errors.Trace(err)
}
manager := restore.NewBRContextManager(client)
batcher, afterRestoreStream := restore.NewBatcher(ctx, sender, manager, errCh)
batcher.SetThreshold(batchSize)
batcher.EnableAutoCommit(ctx, time.Second)
batcher.EnableAutoCommit(ctx, cfg.BatchFlushInterval)
go restoreTableStream(ctx, rangeStream, batcher, errCh)

var finish <-chan struct{}
Expand Down
Loading