From bc6224c75decc089dc0c602713d5ae434b6b15eb Mon Sep 17 00:00:00 2001 From: Hillium Date: Thu, 16 Jul 2020 18:59:20 +0800 Subject: [PATCH 01/20] restore: make split and restore pipelined. Signed-off-by: Hillium --- pkg/restore/batcher.go | 94 ++++++++++++++----------- pkg/restore/pipeline_items.go | 127 ++++++++++++++++++++++++++-------- pkg/restore/util.go | 2 + 3 files changed, 155 insertions(+), 68 deletions(-) diff --git a/pkg/restore/batcher.go b/pkg/restore/batcher.go index 1a4c1256d..1c65178f9 100644 --- a/pkg/restore/batcher.go +++ b/pkg/restore/batcher.go @@ -8,6 +8,8 @@ import ( "sync/atomic" "time" + "github.com/pingcap/kvproto/pkg/backup" + "github.com/pingcap/log" "go.uber.org/zap" @@ -39,8 +41,8 @@ type Batcher struct { autoCommitJoiner chan<- struct{} // everythingIsDone is for waiting for worker done: that is, after we send a // signal to autoCommitJoiner, we must give it enough time to get things done. - // Then, it should notify us by this waitgroup. - // Use waitgroup instead of a trivial channel for further extension. + // Then, it should notify us by this wait group. + // Use wait group instead of a trivial channel for further extension. everythingIsDone *sync.WaitGroup // sendErr is for output error information. sendErr chan<- error @@ -60,6 +62,29 @@ func (b *Batcher) Len() int { return int(atomic.LoadInt32(&b.size)) } +// contextCleaner is the worker goroutine that cleaning the 'context'. +// (e.g. make regions leave restore mode) +func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTable) { + defer b.everythingIsDone.Done() + for { + select { + case <-ctx.Done(): + return + case tbls, ok := <-tables: + if !ok { + return + } + if err := b.manager.Leave(ctx, tbls); err != nil { + b.sendErr <- err + return + } + for _, tbl := range tbls { + b.outCh <- tbl + } + } + } +} + // NewBatcher creates a new batcher by a sender and a context manager. // the former defines how the 'restore' a batch(i.e. send, or 'push down' the task to where). // the context manager defines the 'lifetime' of restoring tables(i.e. how to enter 'restore' mode, and how to exit). @@ -84,8 +109,11 @@ func NewBatcher( everythingIsDone: new(sync.WaitGroup), batchSizeThreshold: 1, } - b.everythingIsDone.Add(1) + b.everythingIsDone.Add(2) go b.sendWorker(ctx, sendChan) + restoredTables := make(chan []CreatedTable, 8) + go b.contextCleaner(ctx, restoredTables) + sender.PutSink(restoredTables, errCh) return b, output } @@ -105,7 +133,7 @@ func (b *Batcher) EnableAutoCommit(ctx context.Context, delay time.Duration) { // DisableAutoCommit blocks the current goroutine until the worker can gracefully stop, // and then disable auto commit. func (b *Batcher) DisableAutoCommit() { - b.joinWorker() + b.joinAutoCommitWorker() b.autoCommitJoiner = nil } @@ -114,9 +142,9 @@ func (b *Batcher) waitUntilSendDone() { b.everythingIsDone.Wait() } -// joinWorker blocks the current goroutine until the worker can gracefully stop. +// joinAutoCommitWorker blocks the current goroutine until the worker can gracefully stop. // return immediately when auto commit disabled. -func (b *Batcher) joinWorker() { +func (b *Batcher) joinAutoCommitWorker() { if b.autoCommitJoiner != nil { log.Debug("gracefully stopping worker goroutine") b.autoCommitJoiner <- struct{}{} @@ -129,14 +157,11 @@ func (b *Batcher) joinWorker() { func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) { sendUntil := func(lessOrEqual int) { for b.Len() > lessOrEqual { - tbls, err := b.Send(ctx) + err := b.Send(ctx) if err != nil { b.sendErr <- err return } - for _, t := range tbls { - b.outCh <- t - } } } @@ -148,6 +173,7 @@ func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) { sendUntil(0) case SendAllThenClose: sendUntil(0) + b.sender.Close() b.everythingIsDone.Done() return } @@ -181,7 +207,8 @@ func (b *Batcher) asyncSend(t SendType) { } } -type drainResult struct { +// DrainResult is the collection of some ranges and theirs metadata. +type DrainResult struct { // TablesToSend are tables that would be send at this batch. TablesToSend []CreatedTable // BlankTablesAfterSend are tables that will be full-restored after this batch send. @@ -190,8 +217,16 @@ type drainResult struct { Ranges []rtree.Range } -func newDrainResult() drainResult { - return drainResult{ +func (result DrainResult) Files() []*backup.File { + var files []*backup.File + for _, fs := range result.Ranges { + files = append(files, fs.Files...) + } + return files +} + +func newDrainResult() DrainResult { + return DrainResult{ TablesToSend: make([]CreatedTable, 0), BlankTablesAfterSend: make([]CreatedTable, 0), RewriteRules: EmptyRewriteRule(), @@ -217,7 +252,7 @@ func newDrainResult() drainResult { // |--|-------| // |t2|t3 | // as you can see, all restored ranges would be removed. -func (b *Batcher) drainRanges() drainResult { +func (b *Batcher) drainRanges() DrainResult { result := newDrainResult() b.cachedTablesMu.Lock() @@ -271,42 +306,22 @@ func (b *Batcher) drainRanges() drainResult { // Send sends all pending requests in the batcher. // returns tables sent FULLY in the current batch. -func (b *Batcher) Send(ctx context.Context) ([]CreatedTable, error) { +func (b *Batcher) Send(ctx context.Context) error { drainResult := b.drainRanges() tbs := drainResult.TablesToSend ranges := drainResult.Ranges - log.Info("restore batch start", append( ZapRanges(ranges), ZapTables(tbs), )..., ) - + // Leave is called at b.contextCleaner if err := b.manager.Enter(ctx, drainResult.TablesToSend); err != nil { - return nil, err - } - defer func() { - if err := b.manager.Leave(ctx, drainResult.BlankTablesAfterSend); err != nil { - log.Error("encountering error when leaving recover mode, we can go on but some regions may stick on restore mode", - append( - ZapRanges(ranges), - ZapTables(tbs), - zap.Error(err))..., - ) - } - if len(drainResult.BlankTablesAfterSend) > 0 { - log.Debug("table fully restored", - ZapTables(drainResult.BlankTablesAfterSend), - zap.Int("ranges", len(ranges)), - ) - } - }() - - if err := b.sender.RestoreBatch(ctx, ranges, drainResult.RewriteRules); err != nil { - return nil, err + return err } - return drainResult.BlankTablesAfterSend, nil + b.sender.RestoreBatch(drainResult) + return nil } func (b *Batcher) sendIfFull() { @@ -342,7 +357,6 @@ func (b *Batcher) Close() { b.waitUntilSendDone() close(b.outCh) close(b.sendCh) - b.sender.Close() } // SetThreshold sets the threshold that how big the batch size reaching need to send batch. diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index d99f5bd8e..a0189419a 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -4,9 +4,12 @@ package restore import ( "context" + "sync" + "sync/atomic" + + "golang.org/x/sync/errgroup" "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" "github.com/pingcap/parser/model" "go.uber.org/zap" @@ -129,8 +132,12 @@ func Exhaust(ec <-chan error) []error { // BatchSender is the abstract of how the batcher send a batch. type BatchSender interface { + // PutSink sets the sink of this sender, user to this interface promise + // call this function at least once before first call to `RestoreBatch`. + // TODO abstract the sink type + PutSink(outCh chan<- []CreatedTable, errCh chan<- error) // RestoreBatch will send the restore request. - RestoreBatch(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules) error + RestoreBatch(ranges DrainResult) Close() } @@ -138,6 +145,21 @@ type tikvSender struct { client *Client updateCh glue.Progress rejectStoreMap map[uint64]bool + + outCh atomic.Value + errCh atomic.Value + inCh chan<- DrainResult + + wg *sync.WaitGroup +} + +func (b *tikvSender) PutSink(outCh chan<- []CreatedTable, errCh chan<- error) { + b.outCh.Store(outCh) + b.errCh.Store(errCh) +} + +func (b *tikvSender) RestoreBatch(ranges DrainResult) { + b.inCh <- ranges } // NewTiKVSender make a sender that send restore requests to TiKV. @@ -159,42 +181,91 @@ func NewTiKVSender( rejectStoreMap[store.GetId()] = true } } + inCh := make(chan DrainResult, 1) + midCh := make(chan DrainResult, 4) - return &tikvSender{ + sender := &tikvSender{ client: cli, updateCh: updateCh, rejectStoreMap: rejectStoreMap, - }, nil -} - -func (b *tikvSender) RestoreBatch(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules) error { - if err := SplitRanges(ctx, b.client, ranges, rewriteRules, b.updateCh); err != nil { - log.Error("failed on split range", - zap.Any("ranges", ranges), - zap.Error(err), - ) - return err + inCh: inCh, + wg: new(sync.WaitGroup), } - files := []*backup.File{} - for _, fs := range ranges { - files = append(files, fs.Files...) - } + sender.wg.Add(2) + go sender.splitWorker(ctx, inCh, midCh) + go sender.restoreWorker(ctx, midCh) + return sender, nil +} - if err := b.client.RestoreFiles(files, rewriteRules, b.rejectStoreMap, b.updateCh); err != nil { - return err +func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, next chan<- DrainResult) { + defer log.Debug("split worker closed") + // TODO remove this magic number, allow us create it by config. + pool := utils.NewWorkerPool(4, "split & scatter") + eg, ectx := errgroup.WithContext(ctx) + defer func() { + if err := eg.Wait(); err != nil { + b.errCh.Load().(chan<- error) <- err + } + b.wg.Done() + close(next) + }() + for { + select { + case <-ctx.Done(): + return + case result, ok := <-ranges: + if !ok { + return + } + pool.ApplyOnErrorGroup(eg, func() error { + if err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh); err != nil { + log.Error("failed on split range", + zap.Any("ranges", ranges), + zap.Error(err), + ) + return err + } + next <- result + return nil + }) + } } +} - log.Info("restore batch done", - append( - ZapRanges(ranges), - zap.Int("file count", len(files)), - )..., - ) - - return nil +func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResult) { + defer func() { + log.Debug("restore worker closed") + b.wg.Done() + close(b.outCh.Load().(chan<- []CreatedTable)) + }() + for { + select { + case <-ctx.Done(): + return + case result, ok := <-ranges: + if !ok { + return + } + files := result.Files() + if err := b.client.RestoreFiles(files, result.RewriteRules, b.rejectStoreMap, b.updateCh); err != nil { + b.errCh.Load().(chan<- error) <- err + return + } + + log.Info("restore batch done", + append( + ZapRanges(result.Ranges), + zap.Int("file count", len(files)), + )..., + ) + b.outCh.Load().(chan<- []CreatedTable) <- result.BlankTablesAfterSend + } + } } func (b *tikvSender) Close() { - // don't close update channel here, since we may need it then. + close(b.inCh) + b.wg.Wait() + log.Debug("tikv sender closed") } diff --git a/pkg/restore/util.go b/pkg/restore/util.go index a7d8c5e33..c026f5e4a 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -555,6 +555,7 @@ func waitForRemoveRejectStores( } // ZapTables make zap field of table for debuging, including table names. +// TODO make it a lazy stringer func ZapTables(tables []CreatedTable) zapcore.Field { tableNames := make([]string, 0, len(tables)) for _, t := range tables { @@ -564,6 +565,7 @@ func ZapTables(tables []CreatedTable) zapcore.Field { } // ZapRanges make zap fields for debuging, which contains kv, size and count of ranges. +// TODO make it a lazy zap object func ZapRanges(ranges []rtree.Range) []zapcore.Field { totalKV := uint64(0) totalSize := uint64(0) From fb6880cd245c9bdc7f46795a538506fa83b9de52 Mon Sep 17 00:00:00 2001 From: Hillium Date: Thu, 16 Jul 2020 19:22:49 +0800 Subject: [PATCH 02/20] restore: adapt the unit test. Signed-off-by: Hillium --- pkg/restore/batcher.go | 7 +++--- pkg/restore/batcher_test.go | 48 +++++++++++++++++++++---------------- pkg/restore/util.go | 4 ++-- 3 files changed, 33 insertions(+), 26 deletions(-) diff --git a/pkg/restore/batcher.go b/pkg/restore/batcher.go index 1c65178f9..57099bc35 100644 --- a/pkg/restore/batcher.go +++ b/pkg/restore/batcher.go @@ -62,8 +62,8 @@ func (b *Batcher) Len() int { return int(atomic.LoadInt32(&b.size)) } -// contextCleaner is the worker goroutine that cleaning the 'context'. -// (e.g. make regions leave restore mode) +// contextCleaner is the worker goroutine that cleaning the 'context' +// (e.g. make regions leave restore mode). func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTable) { defer b.everythingIsDone.Done() for { @@ -217,8 +217,9 @@ type DrainResult struct { Ranges []rtree.Range } +// Files returns all files of this drain result. func (result DrainResult) Files() []*backup.File { - var files []*backup.File + var files = make([]*backup.File, 0, len(result.Ranges)*2) for _, fs := range result.Ranges { files = append(files, fs.Files...) } diff --git a/pkg/restore/batcher_test.go b/pkg/restore/batcher_test.go index 53a9fbbaa..d11855f5f 100644 --- a/pkg/restore/batcher_test.go +++ b/pkg/restore/batcher_test.go @@ -30,30 +30,36 @@ type drySender struct { rewriteRules *restore.RewriteRules ranges []rtree.Range nBatch int + + outCh chan<- []restore.CreatedTable + errCh chan<- error } -func (d *drySender) RestoreBatch( - _ctx context.Context, - ranges []rtree.Range, - rewriteRules *restore.RewriteRules, -) error { - d.mu.Lock() - defer d.mu.Unlock() - log.Info("fake restore range", restore.ZapRanges(ranges)...) - d.nBatch++ - d.rewriteRules.Append(*rewriteRules) - d.ranges = append(d.ranges, ranges...) - return nil +func (sender *drySender) PutSink(outCh chan<- []restore.CreatedTable, errCh chan<- error) { + sender.outCh = outCh + sender.errCh = errCh } -func (d *drySender) Close() {} +func (sender *drySender) RestoreBatch(ranges restore.DrainResult) { + sender.mu.Lock() + defer sender.mu.Unlock() + log.Info("fake restore range", restore.ZapRanges(ranges.Ranges)...) + sender.nBatch++ + sender.rewriteRules.Append(*ranges.RewriteRules) + sender.ranges = append(sender.ranges, ranges.Ranges...) + sender.outCh <- ranges.BlankTablesAfterSend +} + +func (sender *drySender) Close() { + close(sender.outCh) +} func waitForSend() { time.Sleep(10 * time.Millisecond) } -func (d *drySender) Ranges() []rtree.Range { - return d.ranges +func (sender *drySender) Ranges() []rtree.Range { + return sender.ranges } func newDrySender() *drySender { @@ -109,8 +115,8 @@ func (manager recordCurrentTableManager) Has(tables ...restore.TableWithRange) b return true } -func (d *drySender) HasRewriteRuleOfKey(prefix string) bool { - for _, rule := range d.rewriteRules.Table { +func (sender *drySender) HasRewriteRuleOfKey(prefix string) bool { + for _, rule := range sender.rewriteRules.Table { if bytes.Equal([]byte(prefix), rule.OldKeyPrefix) { return true } @@ -118,12 +124,12 @@ func (d *drySender) HasRewriteRuleOfKey(prefix string) bool { return false } -func (d *drySender) RangeLen() int { - return len(d.ranges) +func (sender *drySender) RangeLen() int { + return len(sender.ranges) } -func (d *drySender) BatchCount() int { - return d.nBatch +func (sender *drySender) BatchCount() int { + return sender.nBatch } var ( diff --git a/pkg/restore/util.go b/pkg/restore/util.go index c026f5e4a..547d1c706 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -555,7 +555,7 @@ func waitForRemoveRejectStores( } // ZapTables make zap field of table for debuging, including table names. -// TODO make it a lazy stringer +// TODO make it a lazy stringer. func ZapTables(tables []CreatedTable) zapcore.Field { tableNames := make([]string, 0, len(tables)) for _, t := range tables { @@ -565,7 +565,7 @@ func ZapTables(tables []CreatedTable) zapcore.Field { } // ZapRanges make zap fields for debuging, which contains kv, size and count of ranges. -// TODO make it a lazy zap object +// TODO make it a lazy zap object. func ZapRanges(ranges []rtree.Range) []zapcore.Field { totalKV := uint64(0) totalSize := uint64(0) From 781e634ae0579f069f36fb9c8f7525a24f2d7967 Mon Sep 17 00:00:00 2001 From: Hillium Date: Thu, 16 Jul 2020 22:55:35 +0800 Subject: [PATCH 03/20] restore: disable concurrent split for now Signed-off-by: Hillium --- pkg/restore/pipeline_items.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index a0189419a..6857a1b82 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -22,6 +22,7 @@ import ( const ( defaultBatcherOutputChannelSize = 1024 + splitConcurrency = 1 ) // ContextManager is the struct to manage a TiKV 'context' for restore. @@ -182,7 +183,7 @@ func NewTiKVSender( } } inCh := make(chan DrainResult, 1) - midCh := make(chan DrainResult, 4) + midCh := make(chan DrainResult, splitConcurrency) sender := &tikvSender{ client: cli, @@ -200,8 +201,7 @@ func NewTiKVSender( func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, next chan<- DrainResult) { defer log.Debug("split worker closed") - // TODO remove this magic number, allow us create it by config. - pool := utils.NewWorkerPool(4, "split & scatter") + pool := utils.NewWorkerPool(splitConcurrency, "split & scatter") eg, ectx := errgroup.WithContext(ctx) defer func() { if err := eg.Wait(); err != nil { From d63e437d8401160ec138a060168cb4e5da2b6607 Mon Sep 17 00:00:00 2001 From: Hillium Date: Fri, 17 Jul 2020 14:56:53 +0800 Subject: [PATCH 04/20] restore: remove concurrency limit, fixed checksum time summary Signed-off-by: Hillium --- pkg/restore/batcher.go | 6 ++---- pkg/restore/batcher_test.go | 2 +- pkg/restore/client.go | 8 +++++--- pkg/restore/pipeline_items.go | 6 ++---- pkg/restore/util.go | 37 +++++++++++++++++++++++------------ pkg/task/restore.go | 19 ++++++++++++------ 6 files changed, 47 insertions(+), 31 deletions(-) diff --git a/pkg/restore/batcher.go b/pkg/restore/batcher.go index 57099bc35..1ffc82f7d 100644 --- a/pkg/restore/batcher.go +++ b/pkg/restore/batcher.go @@ -312,10 +312,8 @@ func (b *Batcher) Send(ctx context.Context) error { tbs := drainResult.TablesToSend ranges := drainResult.Ranges log.Info("restore batch start", - append( - ZapRanges(ranges), - ZapTables(tbs), - )..., + ZapRanges(ranges), + ZapTables(tbs), ) // Leave is called at b.contextCleaner if err := b.manager.Enter(ctx, drainResult.TablesToSend); err != nil { diff --git a/pkg/restore/batcher_test.go b/pkg/restore/batcher_test.go index d11855f5f..b706dc09f 100644 --- a/pkg/restore/batcher_test.go +++ b/pkg/restore/batcher_test.go @@ -43,7 +43,7 @@ func (sender *drySender) PutSink(outCh chan<- []restore.CreatedTable, errCh chan func (sender *drySender) RestoreBatch(ranges restore.DrainResult) { sender.mu.Lock() defer sender.mu.Unlock() - log.Info("fake restore range", restore.ZapRanges(ranges.Ranges)...) + log.Info("fake restore range", restore.ZapRanges(ranges.Ranges)) sender.nBatch++ sender.rewriteRules.Append(*ranges.RewriteRules) sender.ranges = append(sender.ranges, ranges.Ranges...) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index d328d5526..2f457aa63 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -827,13 +827,10 @@ func (rc *Client) GoValidateChecksum( outCh := make(chan struct{}, 1) workers := utils.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum") go func() { - start := time.Now() wg := new(sync.WaitGroup) defer func() { log.Info("all checksum ended") wg.Wait() - elapsed := time.Since(start) - summary.CollectDuration("restore checksum", elapsed) outCh <- struct{}{} close(outCh) }() @@ -861,6 +858,11 @@ func (rc *Client) GoValidateChecksum( } func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient kv.Client) error { + start := time.Now() + defer func() { + elapsed := time.Since(start) + summary.CollectDuration("restore checksum", elapsed) + }() if tbl.OldTable.NoChecksum() { log.Warn("table has no checksum, skipping checksum", zap.Stringer("table", tbl.OldTable.Info.Name), diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index 6857a1b82..f7dfca245 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -254,10 +254,8 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResul } log.Info("restore batch done", - append( - ZapRanges(result.Ranges), - zap.Int("file count", len(files)), - )..., + ZapRanges(result.Ranges), + zap.Int("file count", len(files)), ) b.outCh.Load().(chan<- []CreatedTable) <- result.BlankTablesAfterSend } diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 547d1c706..fca77bede 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -555,30 +555,41 @@ func waitForRemoveRejectStores( } // ZapTables make zap field of table for debuging, including table names. -// TODO make it a lazy stringer. func ZapTables(tables []CreatedTable) zapcore.Field { - tableNames := make([]string, 0, len(tables)) - for _, t := range tables { - tableNames = append(tableNames, fmt.Sprintf("%s.%s", t.OldTable.Db.Name, t.OldTable.Info.Name)) - } - return zap.Strings("tables", tableNames) + return zap.Array("tables", tableSliceArrayMixIn(tables)) } // ZapRanges make zap fields for debuging, which contains kv, size and count of ranges. // TODO make it a lazy zap object. -func ZapRanges(ranges []rtree.Range) []zapcore.Field { +func ZapRanges(ranges []rtree.Range) zapcore.Field { + return zap.Object("rangeInfo", rangesSliceObjectMixin(ranges)) +} + +type tableSliceArrayMixIn []CreatedTable + +func (ss tableSliceArrayMixIn) MarshalLogArray(encoder zapcore.ArrayEncoder) error { + for _, s := range ss { + encoder.AppendString(fmt.Sprintf("%s.%s", + utils.EncloseName(s.Table.Name.String()), + utils.EncloseName(s.OldTable.Db.Name.String()))) + } + return nil +} + +type rangesSliceObjectMixin []rtree.Range + +func (rs rangesSliceObjectMixin) MarshalLogObject(encoder zapcore.ObjectEncoder) error { totalKV := uint64(0) totalSize := uint64(0) - for _, r := range ranges { + for _, r := range rs { for _, f := range r.Files { totalKV += f.GetTotalKvs() totalSize += f.GetTotalBytes() } } - return []zap.Field{ - zap.Int("ranges", len(ranges)), - zap.Uint64("total kv", totalKV), - zap.Uint64("total size", totalSize), - } + encoder.AddInt("ranges", len(rs)) + encoder.AddUint64("total kv", totalKV) + encoder.AddUint64("total size", totalSize) + return nil } diff --git a/pkg/task/restore.go b/pkg/task/restore.go index f53765c0f..dbe13eb36 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -25,11 +25,12 @@ import ( ) const ( - flagOnline = "online" - flagNoSchema = "no-schema" + flagOnline = "online" + flagNoSchema = "no-schema" + flagForceSendInterval = "force-send-interval" defaultRestoreConcurrency = 128 - maxRestoreBatchSizeLimit = 256 + maxRestoreBatchSizeLimit = 65535 defaultDDLConcurrency = 16 ) @@ -58,8 +59,9 @@ var ( type RestoreConfig struct { Config - Online bool `json:"online" toml:"online"` - NoSchema bool `json:"no-schema" toml:"no-schema"` + Online bool `json:"online" toml:"online"` + NoSchema bool `json:"no-schema" toml:"no-schema"` + ForceSendInterval time.Duration `json:"force-send-interval" toml:"force-send-interval"` } // DefineRestoreFlags defines common flags for the restore command. @@ -67,9 +69,13 @@ func DefineRestoreFlags(flags *pflag.FlagSet) { // TODO remove experimental tag if it's stable flags.Bool(flagOnline, false, "(experimental) Whether online when restore") flags.Bool(flagNoSchema, false, "skip creating schemas and tables, reuse existing empty ones") + flags.Duration(flagForceSendInterval, time.Second, "send restore request force "+ + "even we hasn't collected enough ranges as --concurrency requests") // Do not expose this flag _ = flags.MarkHidden(flagNoSchema) + // --concurrency is hidden yet, so we hide this too. + _ = flags.MarkHidden(flagForceSendInterval) } // ParseFromFlags parses the restore-related flags from the flag set. @@ -91,6 +97,7 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + cfg.ForceSendInterval, err = flags.GetDuration(flagForceSendInterval) if cfg.Config.Concurrency == 0 { cfg.Config.Concurrency = defaultRestoreConcurrency @@ -259,7 +266,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf manager := restore.NewBRContextManager(client) batcher, afterRestoreStream := restore.NewBatcher(ctx, sender, manager, errCh) batcher.SetThreshold(batchSize) - batcher.EnableAutoCommit(ctx, time.Second) + batcher.EnableAutoCommit(ctx, cfg.ForceSendInterval) go restoreTableStream(ctx, rangeStream, cfg.RemoveTiFlash, cfg.PD, client, batcher, errCh) var finish <-chan struct{} From 5f1449a664033e88f360527903c68257a825c7ca Mon Sep 17 00:00:00 2001 From: Hillium Date: Fri, 17 Jul 2020 14:59:39 +0800 Subject: [PATCH 05/20] restore: handle a error has been omitted past Signed-off-by: Hillium --- pkg/task/restore.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/task/restore.go b/pkg/task/restore.go index dbe13eb36..ac4ba09fc 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -98,6 +98,9 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { return errors.Trace(err) } cfg.ForceSendInterval, err = flags.GetDuration(flagForceSendInterval) + if err != nil { + return errors.Trace(err) + } if cfg.Config.Concurrency == 0 { cfg.Config.Concurrency = defaultRestoreConcurrency From 32197cd053e51a3100c7dfe99f2434aeebdbda82 Mon Sep 17 00:00:00 2001 From: Hillium Date: Fri, 17 Jul 2020 15:49:23 +0800 Subject: [PATCH 06/20] restore: abstract sink type Signed-off-by: Hillium --- pkg/restore/batcher.go | 3 ++- pkg/restore/batcher_test.go | 12 ++++----- pkg/restore/pipeline_items.go | 48 ++++++++++++++++++++++++++--------- 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/pkg/restore/batcher.go b/pkg/restore/batcher.go index 1ffc82f7d..398a38c86 100644 --- a/pkg/restore/batcher.go +++ b/pkg/restore/batcher.go @@ -113,7 +113,8 @@ func NewBatcher( go b.sendWorker(ctx, sendChan) restoredTables := make(chan []CreatedTable, 8) go b.contextCleaner(ctx, restoredTables) - sender.PutSink(restoredTables, errCh) + sink := chanTableSink{restoredTables, errCh} + sender.PutSink(sink) return b, output } diff --git a/pkg/restore/batcher_test.go b/pkg/restore/batcher_test.go index b706dc09f..072cd31b0 100644 --- a/pkg/restore/batcher_test.go +++ b/pkg/restore/batcher_test.go @@ -31,13 +31,11 @@ type drySender struct { ranges []rtree.Range nBatch int - outCh chan<- []restore.CreatedTable - errCh chan<- error + sink restore.TableSink } -func (sender *drySender) PutSink(outCh chan<- []restore.CreatedTable, errCh chan<- error) { - sender.outCh = outCh - sender.errCh = errCh +func (sender *drySender) PutSink(sink restore.TableSink) { + sender.sink = sink } func (sender *drySender) RestoreBatch(ranges restore.DrainResult) { @@ -47,11 +45,11 @@ func (sender *drySender) RestoreBatch(ranges restore.DrainResult) { sender.nBatch++ sender.rewriteRules.Append(*ranges.RewriteRules) sender.ranges = append(sender.ranges, ranges.Ranges...) - sender.outCh <- ranges.BlankTablesAfterSend + sender.sink.EmitTables(ranges.BlankTablesAfterSend...) } func (sender *drySender) Close() { - close(sender.outCh) + sender.sink.Close() } func waitForSend() { diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index f7dfca245..83d813848 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -5,7 +5,6 @@ package restore import ( "context" "sync" - "sync/atomic" "golang.org/x/sync/errgroup" @@ -25,6 +24,31 @@ const ( splitConcurrency = 1 ) +// TableSink is the 'sink' of restored data by a sender. +type TableSink interface { + EmitTables(tables ...CreatedTable) + EmitError(error) + Close() +} + +type chanTableSink struct { + outCh chan<- []CreatedTable + errCh chan<- error +} + +func (sink chanTableSink) EmitTables(tables ...CreatedTable) { + sink.outCh <- tables +} + +func (sink chanTableSink) EmitError(err error) { + sink.errCh <- err +} + +func (sink chanTableSink) Close() { + // ErrCh may has multi sender part, don't close it. + close(sink.outCh) +} + // ContextManager is the struct to manage a TiKV 'context' for restore. // Batcher will call Enter when any table should be restore on batch, // so you can do some prepare work here(e.g. set placement rules for online restore). @@ -136,7 +160,7 @@ type BatchSender interface { // PutSink sets the sink of this sender, user to this interface promise // call this function at least once before first call to `RestoreBatch`. // TODO abstract the sink type - PutSink(outCh chan<- []CreatedTable, errCh chan<- error) + PutSink(sink TableSink) // RestoreBatch will send the restore request. RestoreBatch(ranges DrainResult) Close() @@ -147,16 +171,16 @@ type tikvSender struct { updateCh glue.Progress rejectStoreMap map[uint64]bool - outCh atomic.Value - errCh atomic.Value - inCh chan<- DrainResult + sink TableSink + inCh chan<- DrainResult wg *sync.WaitGroup } -func (b *tikvSender) PutSink(outCh chan<- []CreatedTable, errCh chan<- error) { - b.outCh.Store(outCh) - b.errCh.Store(errCh) +func (b *tikvSender) PutSink(sink TableSink) { + // don't worry about visibility, since we will call this before first call to + // RestoreBatch, which is a sync point. + b.sink = sink } func (b *tikvSender) RestoreBatch(ranges DrainResult) { @@ -205,7 +229,7 @@ func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, eg, ectx := errgroup.WithContext(ctx) defer func() { if err := eg.Wait(); err != nil { - b.errCh.Load().(chan<- error) <- err + b.sink.EmitError(err) } b.wg.Done() close(next) @@ -237,7 +261,7 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResul defer func() { log.Debug("restore worker closed") b.wg.Done() - close(b.outCh.Load().(chan<- []CreatedTable)) + b.sink.Close() }() for { select { @@ -249,7 +273,7 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResul } files := result.Files() if err := b.client.RestoreFiles(files, result.RewriteRules, b.rejectStoreMap, b.updateCh); err != nil { - b.errCh.Load().(chan<- error) <- err + b.sink.EmitError(err) return } @@ -257,7 +281,7 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResul ZapRanges(result.Ranges), zap.Int("file count", len(files)), ) - b.outCh.Load().(chan<- []CreatedTable) <- result.BlankTablesAfterSend + b.sink.EmitTables(result.BlankTablesAfterSend...) } } } From f24b5abceef4d1489a0790a600c047c651b72f94 Mon Sep 17 00:00:00 2001 From: yujuncen Date: Fri, 17 Jul 2020 17:31:26 +0800 Subject: [PATCH 07/20] restore: change some log level remove reject stores sometime always grow too huge and provides little useful information, move it to DEBUG. add a log when any file is fully restored, which is useful troubleshooting when some restore goes slow. Signed-off-by: Hillium --- pkg/restore/client.go | 7 ++++++- pkg/restore/import.go | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 2f457aa63..c2dbb54d8 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -656,7 +656,12 @@ func (rc *Client) RestoreFiles( fileReplica := file rc.workerPool.Apply( func() { - defer wg.Done() + fileStart := time.Now() + defer func() { + log.Info("import file done", utils.ZapFile(fileReplica), + zap.Duration("take", time.Since(fileStart))) + wg.Done() + }() select { case <-rc.ctx.Done(): errCh <- rc.ctx.Err() diff --git a/pkg/restore/import.go b/pkg/restore/import.go index e1c7e9a5b..9d86d9ff0 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -222,7 +222,7 @@ func (importer *FileImporter) Import( if needReject { // TODO remove when TiFlash support restore startTime := time.Now() - log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStoreMap)) + log.Debug("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStoreMap)) for _, region := range regionInfos { if !waitForRemoveRejectStores(ctx, importer.metaClient, region, rejectStoreMap) { log.Error("waiting for removing rejected stores failed", @@ -230,7 +230,7 @@ func (importer *FileImporter) Import( return errors.New("waiting for removing rejected stores failed") } } - log.Info("waiting for removing rejected stores done", + log.Debug("waiting for removing rejected stores done", zap.Int("regions", len(regionInfos)), zap.Duration("take", time.Since(startTime))) needReject = false } From 683dbdf99aad92abe23e5b49f7228a44589306cf Mon Sep 17 00:00:00 2001 From: Hillium Date: Mon, 20 Jul 2020 11:27:33 +0800 Subject: [PATCH 08/20] restore: add log when file backed up Signed-off-by: Hillium --- pkg/restore/client.go | 4 +++- pkg/restore/pipeline_items.go | 27 +++++++++------------------ pkg/utils/logging.go | 19 +++++++++++++++++++ 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index c2dbb54d8..9269b6702 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -635,7 +635,9 @@ func (rc *Client) RestoreFiles( elapsed := time.Since(start) if err == nil { log.Info("Restore Files", - zap.Int("files", len(files)), zap.Duration("take", elapsed)) + zap.Int("files count", len(files)), + zap.Duration("take", elapsed), + zap.Object("files", utils.Files(files))) summary.CollectSuccessUnit("files", len(files), elapsed) } }() diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index 83d813848..9217ca88e 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -6,8 +6,6 @@ import ( "context" "sync" - "golang.org/x/sync/errgroup" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/parser/model" @@ -225,12 +223,7 @@ func NewTiKVSender( func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, next chan<- DrainResult) { defer log.Debug("split worker closed") - pool := utils.NewWorkerPool(splitConcurrency, "split & scatter") - eg, ectx := errgroup.WithContext(ctx) defer func() { - if err := eg.Wait(); err != nil { - b.sink.EmitError(err) - } b.wg.Done() close(next) }() @@ -242,17 +235,15 @@ func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, if !ok { return } - pool.ApplyOnErrorGroup(eg, func() error { - if err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh); err != nil { - log.Error("failed on split range", - zap.Any("ranges", ranges), - zap.Error(err), - ) - return err - } - next <- result - return nil - }) + if err := SplitRanges(ctx, b.client, result.Ranges, result.RewriteRules, b.updateCh); err != nil { + log.Error("failed on split range", + zap.Any("ranges", ranges), + zap.Error(err), + ) + b.sink.EmitError(err) + return + } + next <- result } } } diff --git a/pkg/utils/logging.go b/pkg/utils/logging.go index aa68c7b1e..e9e4c5cc9 100644 --- a/pkg/utils/logging.go +++ b/pkg/utils/logging.go @@ -85,6 +85,20 @@ func (keys zapArrayMarshalKeysMixIn) MarshalLogArray(enc zapcore.ArrayEncoder) e return nil } +type Files []*backup.File + +func (files Files) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + totalKVs := uint64(0) + totalSize := uint64(0) + for _, file := range files { + totalKVs += file.GetTotalKvs() + totalSize += file.GetTotalBytes() + } + encoder.AddUint64("totalKVs", totalKVs) + encoder.AddUint64("totalBytes", totalSize) + return nil +} + // WrapKey wrap a key as a Stringer that can print proper upper hex format. func WrapKey(key []byte) fmt.Stringer { return kv.Key(key) @@ -114,3 +128,8 @@ func ZapFile(file *backup.File) zapcore.Field { func ZapSSTMeta(sstMeta *import_sstpb.SSTMeta) zapcore.Field { return zap.Object("sstMeta", zapMarshalSSTMetaMixIn{sstMeta}) } + +// ZapFiles make the zap field for a set of file. +func ZapFiles(files []*backup.File) zapcore.Field { + return zap.Object("files", Files(files)) +} From 9946b3e444078088ba402544a1663820e76fbdb9 Mon Sep 17 00:00:00 2001 From: Hillium Date: Mon, 20 Jul 2020 11:34:07 +0800 Subject: [PATCH 09/20] restore: unexport some private fields Signed-off-by: Hillium --- pkg/restore/client.go | 5 ++--- pkg/utils/logging.go | 11 ++++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 9269b6702..af4a9b3b9 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -634,10 +634,9 @@ func (rc *Client) RestoreFiles( defer func() { elapsed := time.Since(start) if err == nil { - log.Info("Restore Files", - zap.Int("files count", len(files)), + log.Info("Restore files", zap.Duration("take", elapsed), - zap.Object("files", utils.Files(files))) + utils.ZapFiles(files)) summary.CollectSuccessUnit("files", len(files), elapsed) } }() diff --git a/pkg/utils/logging.go b/pkg/utils/logging.go index e9e4c5cc9..18491c63a 100644 --- a/pkg/utils/logging.go +++ b/pkg/utils/logging.go @@ -85,17 +85,18 @@ func (keys zapArrayMarshalKeysMixIn) MarshalLogArray(enc zapcore.ArrayEncoder) e return nil } -type Files []*backup.File +type files []*backup.File -func (files Files) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (fs files) MarshalLogObject(encoder zapcore.ObjectEncoder) error { totalKVs := uint64(0) totalSize := uint64(0) - for _, file := range files { + for _, file := range fs { totalKVs += file.GetTotalKvs() totalSize += file.GetTotalBytes() } encoder.AddUint64("totalKVs", totalKVs) encoder.AddUint64("totalBytes", totalSize) + encoder.AddInt("totalFileCount", len(fs)) return nil } @@ -130,6 +131,6 @@ func ZapSSTMeta(sstMeta *import_sstpb.SSTMeta) zapcore.Field { } // ZapFiles make the zap field for a set of file. -func ZapFiles(files []*backup.File) zapcore.Field { - return zap.Object("files", Files(files)) +func ZapFiles(fs []*backup.File) zapcore.Field { + return zap.Object("fs", files(fs)) } From 7b34f2be416ab1023a5748ed3e99a4430383507b Mon Sep 17 00:00:00 2001 From: Hillium Date: Mon, 20 Jul 2020 17:26:11 +0800 Subject: [PATCH 10/20] restore: grow the restore channel size Signed-off-by: Hillium --- pkg/restore/pipeline_items.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index 9217ca88e..a77e9f057 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -19,7 +19,7 @@ import ( const ( defaultBatcherOutputChannelSize = 1024 - splitConcurrency = 1 + midChBufferSize = 8 ) // TableSink is the 'sink' of restored data by a sender. @@ -157,7 +157,6 @@ func Exhaust(ec <-chan error) []error { type BatchSender interface { // PutSink sets the sink of this sender, user to this interface promise // call this function at least once before first call to `RestoreBatch`. - // TODO abstract the sink type PutSink(sink TableSink) // RestoreBatch will send the restore request. RestoreBatch(ranges DrainResult) @@ -205,7 +204,7 @@ func NewTiKVSender( } } inCh := make(chan DrainResult, 1) - midCh := make(chan DrainResult, splitConcurrency) + midCh := make(chan DrainResult, midChBufferSize) sender := &tikvSender{ client: cli, From b457a96b6ce7f6266b7345d3cfa84aa4cab266bc Mon Sep 17 00:00:00 2001 From: Hillium Date: Mon, 20 Jul 2020 17:45:10 +0800 Subject: [PATCH 11/20] restore: grow all channel size Signed-off-by: Hillium --- pkg/restore/batcher.go | 4 ++-- pkg/restore/pipeline_items.go | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/restore/batcher.go b/pkg/restore/batcher.go index 398a38c86..c19f5f619 100644 --- a/pkg/restore/batcher.go +++ b/pkg/restore/batcher.go @@ -96,7 +96,7 @@ func NewBatcher( manager ContextManager, errCh chan<- error, ) (*Batcher, <-chan CreatedTable) { - output := make(chan CreatedTable, defaultBatcherOutputChannelSize) + output := make(chan CreatedTable, defaultChannelSize) sendChan := make(chan SendType, 2) b := &Batcher{ rewriteRules: EmptyRewriteRule(), @@ -111,7 +111,7 @@ func NewBatcher( } b.everythingIsDone.Add(2) go b.sendWorker(ctx, sendChan) - restoredTables := make(chan []CreatedTable, 8) + restoredTables := make(chan []CreatedTable, defaultChannelSize) go b.contextCleaner(ctx, restoredTables) sink := chanTableSink{restoredTables, errCh} sender.PutSink(sink) diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index a77e9f057..531249a29 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -18,8 +18,7 @@ import ( ) const ( - defaultBatcherOutputChannelSize = 1024 - midChBufferSize = 8 + defaultChannelSize = 1024 ) // TableSink is the 'sink' of restored data by a sender. @@ -203,8 +202,8 @@ func NewTiKVSender( rejectStoreMap[store.GetId()] = true } } - inCh := make(chan DrainResult, 1) - midCh := make(chan DrainResult, midChBufferSize) + inCh := make(chan DrainResult, defaultChannelSize) + midCh := make(chan DrainResult, defaultChannelSize) sender := &tikvSender{ client: cli, From c0fe56cb1f427b9dba89bd501c37ae672a6a4ddb Mon Sep 17 00:00:00 2001 From: Hillium Date: Mon, 20 Jul 2020 19:38:48 +0800 Subject: [PATCH 12/20] restore: move some synchronous logic to async, call leave on tables when exit Signed-off-by: Hillium --- pkg/restore/batcher.go | 13 +++++-------- pkg/restore/batcher_test.go | 9 ++++++++- pkg/restore/pipeline_items.go | 22 ++++++++++++++++++---- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/pkg/restore/batcher.go b/pkg/restore/batcher.go index c19f5f619..aa8f5b6ef 100644 --- a/pkg/restore/batcher.go +++ b/pkg/restore/batcher.go @@ -65,6 +65,7 @@ func (b *Batcher) Len() int { // contextCleaner is the worker goroutine that cleaning the 'context' // (e.g. make regions leave restore mode). func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTable) { + defer b.manager.Close(ctx) defer b.everythingIsDone.Done() for { select { @@ -158,11 +159,7 @@ func (b *Batcher) joinAutoCommitWorker() { func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) { sendUntil := func(lessOrEqual int) { for b.Len() > lessOrEqual { - err := b.Send(ctx) - if err != nil { - b.sendErr <- err - return - } + b.Send(ctx) } } @@ -308,7 +305,7 @@ func (b *Batcher) drainRanges() DrainResult { // Send sends all pending requests in the batcher. // returns tables sent FULLY in the current batch. -func (b *Batcher) Send(ctx context.Context) error { +func (b *Batcher) Send(ctx context.Context) { drainResult := b.drainRanges() tbs := drainResult.TablesToSend ranges := drainResult.Ranges @@ -318,10 +315,10 @@ func (b *Batcher) Send(ctx context.Context) error { ) // Leave is called at b.contextCleaner if err := b.manager.Enter(ctx, drainResult.TablesToSend); err != nil { - return err + b.sendErr <- err + return } b.sender.RestoreBatch(drainResult) - return nil } func (b *Batcher) sendIfFull() { diff --git a/pkg/restore/batcher_test.go b/pkg/restore/batcher_test.go index 072cd31b0..1f7b4816b 100644 --- a/pkg/restore/batcher_test.go +++ b/pkg/restore/batcher_test.go @@ -70,6 +70,13 @@ func newDrySender() *drySender { type recordCurrentTableManager map[int64]bool +func (manager recordCurrentTableManager) Close(ctx context.Context) { + if len(manager) > 0 { + log.Panic("When closing, there are still some tables doesn't be sent", + zap.Any("tables", manager)) + } +} + func newMockManager() recordCurrentTableManager { return make(recordCurrentTableManager) } @@ -88,7 +95,7 @@ func (manager recordCurrentTableManager) Leave(_ context.Context, tables []resto return errors.Errorf("Table %d is removed before added", t.Table.ID) } log.Info("leaving", zap.Int64("table ID", t.Table.ID)) - manager[t.Table.ID] = false + delete(manager, t.Table.ID) } return nil } diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index 531249a29..c25ba765a 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -54,6 +54,9 @@ type ContextManager interface { Enter(ctx context.Context, tables []CreatedTable) error // Leave make some tables 'leave' this context(a.k.a., restore is done, do some post-works). Leave(ctx context.Context, tables []CreatedTable) error + // Close closes the context manager, sometimes when the manager is 'killed' and should do some cleanup + // it would be call. + Close(ctx context.Context) } // NewBRContextManager makes a BR context manager, that is, @@ -63,7 +66,7 @@ func NewBRContextManager(client *Client) ContextManager { return &brContextManager{ client: client, - hasTable: make(map[int64]bool), + hasTable: make(map[int64]CreatedTable), } } @@ -71,17 +74,25 @@ type brContextManager struct { client *Client // This 'set' of table ID allow us handle each table just once. - hasTable map[int64]bool + hasTable map[int64]CreatedTable +} + +func (manager *brContextManager) Close(ctx context.Context) { + tbls := make([]*model.TableInfo, 0, len(manager.hasTable)) + for _, tbl := range manager.hasTable { + tbls = append(tbls, tbl.Table) + } + splitPostWork(ctx, manager.client, tbls) } func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTable) error { placementRuleTables := make([]*model.TableInfo, 0, len(tables)) for _, tbl := range tables { - if !manager.hasTable[tbl.Table.ID] { + if _, ok := manager.hasTable[tbl.Table.ID]; !ok { placementRuleTables = append(placementRuleTables, tbl.Table) } - manager.hasTable[tbl.Table.ID] = true + manager.hasTable[tbl.Table.ID] = tbl } return splitPrepareWork(ctx, manager.client, placementRuleTables) @@ -96,6 +107,9 @@ func (manager *brContextManager) Leave(ctx context.Context, tables []CreatedTabl splitPostWork(ctx, manager.client, placementRuleTables) log.Info("restore table done", ZapTables(tables)) + for _, tbl := range placementRuleTables { + delete(manager.hasTable, tbl.ID) + } return nil } From 6df57bd820c94318f4a47048963e73d44742d123 Mon Sep 17 00:00:00 2001 From: Hillium Date: Fri, 24 Jul 2020 14:43:18 +0800 Subject: [PATCH 13/20] restore: fix conflicted merge --- pkg/restore/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 56e35da3b..37de625c3 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -649,10 +649,11 @@ func (rc *Client) RestoreFiles( fileReplica := file rc.workerPool.ApplyOnErrorGroup(eg, func() error { - defer func() { + fileStart := time.Now() + defer func() { log.Info("import file done", utils.ZapFile(fileReplica), zap.Duration("take", time.Since(fileStart))) - updateCh.Inc() + updateCh.Inc() }() return rc.fileImporter.Import(ectx, fileReplica, rejectStoreMap, rewriteRules) }) @@ -808,7 +809,6 @@ func (rc *Client) GoValidateChecksum( if err := wg.Wait(); err != nil { errCh <- err } - summary.CollectDuration("restore checksum", elapsed) outCh <- struct{}{} close(outCh) }() From 8de1774194e20a0763c648db234e269487a64100 Mon Sep 17 00:00:00 2001 From: Hillium Date: Thu, 3 Sep 2020 13:16:01 +0800 Subject: [PATCH 14/20] restore: do clean work in a not closed context Signed-off-by: Hillium --- pkg/restore/batcher.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/restore/batcher.go b/pkg/restore/batcher.go index aa8f5b6ef..08e49dd06 100644 --- a/pkg/restore/batcher.go +++ b/pkg/restore/batcher.go @@ -65,7 +65,17 @@ func (b *Batcher) Len() int { // contextCleaner is the worker goroutine that cleaning the 'context' // (e.g. make regions leave restore mode). func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTable) { - defer b.manager.Close(ctx) + defer func() { + if ctx.Err() != nil { + timeout := 5 * time.Second + log.Info("restore canceled, cleaning in a context with timeout", + zap.Stringer("timeout", timeout)) + limitedCtx, _ := context.WithTimeout(context.Background(), timeout) + b.manager.Close(limitedCtx) + } else { + b.manager.Close(ctx) + } + }() defer b.everythingIsDone.Done() for { select { @@ -156,6 +166,7 @@ func (b *Batcher) joinAutoCommitWorker() { } // sendWorker is the 'worker' that send all ranges to TiKV. +// TODO since all operations are asynchronous now, it's possible to remove this worker. func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) { sendUntil := func(lessOrEqual int) { for b.Len() > lessOrEqual { From 79a249e183642c55271cc3e902068beec44b0e6e Mon Sep 17 00:00:00 2001 From: Hillium Date: Thu, 3 Sep 2020 13:20:14 +0800 Subject: [PATCH 15/20] restore: fix CI Signed-off-by: Hillium --- pkg/restore/batcher.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/restore/batcher.go b/pkg/restore/batcher.go index 08e49dd06..4a7431ce4 100644 --- a/pkg/restore/batcher.go +++ b/pkg/restore/batcher.go @@ -70,7 +70,8 @@ func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTab timeout := 5 * time.Second log.Info("restore canceled, cleaning in a context with timeout", zap.Stringer("timeout", timeout)) - limitedCtx, _ := context.WithTimeout(context.Background(), timeout) + limitedCtx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() b.manager.Close(limitedCtx) } else { b.manager.Close(ctx) From 6a29f7f10ce4132b5228e08acca7e21821fc745c Mon Sep 17 00:00:00 2001 From: Hillium Date: Fri, 18 Sep 2020 10:47:57 +0800 Subject: [PATCH 16/20] restore: remove tiflash-relative code Signed-off-by: Hillium --- pkg/restore/client.go | 115 +--------------------------------- pkg/restore/pipeline_items.go | 17 +++-- 2 files changed, 9 insertions(+), 123 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 5a5c37b1f..8780dd75c 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -13,7 +13,6 @@ import ( "strconv" "time" - "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/import_sstpb" @@ -27,7 +26,6 @@ import ( "github.com/pingcap/tidb/util/codec" pd "github.com/tikv/pd/client" "github.com/tikv/pd/server/schedule/placement" - "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -64,27 +62,20 @@ type Client struct { // TODO Remove this field or replace it with a []*DB, // since https://github.com/pingcap/br/pull/377 needs more DBs to speed up DDL execution. // And for now, we must inject a pool of DBs to `Client.GoCreateTables`, otherwise there would be a race condition. - // Which is dirty: why we need DBs from different sources? + // This is dirty: why we need DBs from different sources? // By replace it with a []*DB, we can remove the dirty parameter of `Client.GoCreateTable`, // along with them in some private functions. // Before you do it, you can firstly read discussions at // https://github.com/pingcap/br/pull/377#discussion_r446594501, - // this probably isn't as easy and it seems like (however, not hard, too :D) + // this probably isn't as easy as it seems like (however, not hard, too :D) db *DB rateLimit uint64 isOnline bool noSchema bool hasSpeedLimited bool - // Those fields should be removed after we have FULLY supportted TiFlash. - // we place this field here to make a 'good' memory align, but mainly make golang-ci happy :) - tiFlashRecordUpdated bool restoreStores []uint64 - // tables that has TiFlash and those TiFlash have been removed, should be written to disk. - // Those fields should be removed after we have FULLY supportted TiFlash. - tablesRemovedTiFlash []*backup.Schema - storage storage.ExternalStorage backend *backup.StorageBackend switchModeInterval time.Duration @@ -480,107 +471,6 @@ func (rc *Client) createTablesWithDBPool(ctx context.Context, return eg.Wait() } -// makeTiFlashOfTableRecord make a 'record' repsenting TiFlash of a table that has been removed. -// We doesn't record table ID here because when restore TiFlash replicas, -// we use `ALTER TABLE db.tbl SET TIFLASH_REPLICA = xxx` DDL, instead of use some internal TiDB API. -func makeTiFlashOfTableRecord(table *utils.Table, replica int) (*backup.Schema, error) { - tableData, err := json.Marshal(table.Info) - if err != nil { - return nil, errors.Trace(err) - } - dbData, err := json.Marshal(table.Db) - if err != nil { - return nil, errors.Trace(err) - } - result := &backup.Schema{ - Db: dbData, - Table: tableData, - Crc64Xor: table.Crc64Xor, - TotalKvs: table.TotalKvs, - TotalBytes: table.TotalBytes, - TiflashReplicas: uint32(replica), - } - return result, nil -} - -// RemoveTiFlashOfTable removes TiFlash replica of some table, -// returns the removed count of TiFlash nodes. -// TODO: save the removed TiFlash information into disk. -// TODO: remove this after tiflash supports restore. -func (rc *Client) RemoveTiFlashOfTable(table CreatedTable, rule []placement.Rule) (int, error) { - if rule := utils.SearchPlacementRule(table.Table.ID, rule, placement.Learner); rule != nil { - if rule.Count > 0 { - log.Info("remove TiFlash of table", zap.Int64("table ID", table.Table.ID), zap.Int("count", rule.Count)) - err := multierr.Combine( - rc.db.AlterTiflashReplica(rc.ctx, table.OldTable, 0), - rc.removeTiFlashOf(table.OldTable, rule.Count), - rc.flushTiFlashRecord(), - ) - if err != nil { - return 0, errors.Trace(err) - } - return rule.Count, nil - } - } - return 0, nil -} - -func (rc *Client) removeTiFlashOf(table *utils.Table, replica int) error { - tableRecord, err := makeTiFlashOfTableRecord(table, replica) - if err != nil { - return err - } - rc.tablesRemovedTiFlash = append(rc.tablesRemovedTiFlash, tableRecord) - rc.tiFlashRecordUpdated = true - return nil -} - -func (rc *Client) flushTiFlashRecord() error { - // Today nothing to do :D - if !rc.tiFlashRecordUpdated { - return nil - } - - // should we make a deep copy here? - // currently, write things directly to backup meta is OK since there seems nobody uses it. - // But would it be better if we don't do it? - rc.backupMeta.Schemas = rc.tablesRemovedTiFlash - backupMetaData, err := proto.Marshal(rc.backupMeta) - if err != nil { - return errors.Trace(err) - } - backendURL := storage.FormatBackendURL(rc.backend) - log.Info("update backup meta", zap.Stringer("path", &backendURL)) - err = rc.storage.Write(rc.ctx, utils.SavedMetaFile, backupMetaData) - if err != nil { - return errors.Trace(err) - } - return nil -} - -// RecoverTiFlashOfTable recovers TiFlash replica of some table. -// TODO: remove this after tiflash supports restore. -func (rc *Client) RecoverTiFlashOfTable(table *utils.Table) error { - if table.TiFlashReplicas > 0 { - err := rc.db.AlterTiflashReplica(rc.ctx, table, table.TiFlashReplicas) - if err != nil { - return errors.Trace(err) - } - } - return nil -} - -// RecoverTiFlashReplica recovers all the tiflash replicas of a table -// TODO: remove this after tiflash supports restore. -func (rc *Client) RecoverTiFlashReplica(tables []*utils.Table) error { - for _, table := range tables { - if err := rc.RecoverTiFlashOfTable(table); err != nil { - return err - } - } - return nil -} - // ExecDDLs executes the queries of the ddl jobs. func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error { // Sort the ddl jobs by schema version in ascending order. @@ -622,7 +512,6 @@ func (rc *Client) setSpeedLimit() error { func (rc *Client) RestoreFiles( files []*backup.File, rewriteRules *RewriteRules, - rejectStoreMap map[uint64]bool, updateCh glue.Progress, ) (err error) { start := time.Now() diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index abd215bd0..094a74c29 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -176,9 +176,8 @@ type BatchSender interface { } type tikvSender struct { - client *Client - updateCh glue.Progress - rejectStoreMap map[uint64]bool + client *Client + updateCh glue.Progress sink TableSink inCh chan<- DrainResult @@ -202,17 +201,15 @@ func NewTiKVSender( cli *Client, updateCh glue.Progress, ) (BatchSender, error) { - rejectStoreMap := make(map[uint64]bool) inCh := make(chan DrainResult, defaultChannelSize) midCh := make(chan DrainResult, defaultChannelSize) sender := &tikvSender{ - client: cli, - updateCh: updateCh, - rejectStoreMap: rejectStoreMap, - inCh: inCh, - wg: new(sync.WaitGroup), + client: cli, + updateCh: updateCh, + inCh: inCh, + wg: new(sync.WaitGroup), } sender.wg.Add(2) @@ -263,7 +260,7 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResul return } files := result.Files() - if err := b.client.RestoreFiles(files, result.RewriteRules, b.rejectStoreMap, b.updateCh); err != nil { + if err := b.client.RestoreFiles(files, result.RewriteRules, b.updateCh); err != nil { b.sink.EmitError(err) return } From 8aee5d62460694d64b70480fcc457a86ecc6abaa Mon Sep 17 00:00:00 2001 From: Hillium Date: Fri, 18 Sep 2020 11:10:54 +0800 Subject: [PATCH 17/20] fix CI Signed-off-by: Hillium --- pkg/restore/pipeline_items.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index 094a74c29..4acca7905 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -201,7 +201,6 @@ func NewTiKVSender( cli *Client, updateCh glue.Progress, ) (BatchSender, error) { - inCh := make(chan DrainResult, defaultChannelSize) midCh := make(chan DrainResult, defaultChannelSize) From 461f40c88c522613ce3fd7961943a2a913c324b4 Mon Sep 17 00:00:00 2001 From: Hillium Date: Fri, 18 Sep 2020 12:31:26 +0800 Subject: [PATCH 18/20] restore: fix a malformed printf Signed-off-by: Hillium --- pkg/restore/util.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 746649423..15067f75b 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -514,8 +514,8 @@ type tableSliceArrayMixIn []CreatedTable func (ss tableSliceArrayMixIn) MarshalLogArray(encoder zapcore.ArrayEncoder) error { for _, s := range ss { encoder.AppendString(fmt.Sprintf("%s.%s", - utils.EncloseName(s.Table.Name.String()), - utils.EncloseName(s.OldTable.Db.Name.String()))) + utils.EncloseName(s.OldTable.Db.Name.String()), + utils.EncloseName(s.OldTable.Info.Name.String()))) } return nil } From 038e37d71cfc9a6d6f71f8f503d11ab9f6d47ce0 Mon Sep 17 00:00:00 2001 From: Hillium Date: Mon, 21 Sep 2020 13:32:09 +0800 Subject: [PATCH 19/20] restore: pass context to RestoreFiles --- pkg/restore/pipeline_items.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index 4acca7905..7dcc02792 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -259,7 +259,7 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResul return } files := result.Files() - if err := b.client.RestoreFiles(files, result.RewriteRules, b.updateCh); err != nil { + if err := b.client.RestoreFiles(ctx, files, result.RewriteRules, b.updateCh); err != nil { b.sink.EmitError(err) return } From 1cb094ac8d64f0d4e61d9328317bbbb71098fc65 Mon Sep 17 00:00:00 2001 From: Hillium Date: Fri, 9 Oct 2020 17:42:01 +0800 Subject: [PATCH 20/20] restore: reset unexpectedly moved to outside Signed-off-by: Hillium --- pkg/restore/import.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 2f78fa8c7..b58be38bd 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -338,9 +338,9 @@ func (importer *FileImporter) Import( zap.Error(errIngest)) return errIngest } - summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs) - summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes) } + summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs) + summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes) return nil }, newImportSSTBackoffer()) return err