Skip to content

Commit

Permalink
restore: use a limited workpool (#51872)
Browse files Browse the repository at this point in the history
ref #50701, close #51621
  • Loading branch information
3pointer authored Mar 26, 2024
1 parent 240ddf8 commit b6dd179
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
12 changes: 11 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,16 @@ func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string)
// SetConcurrency sets the concurrency of dbs tables files.
func (rc *Client) SetConcurrency(c uint) {
log.Info("download worker pool", zap.Uint("size", c))
if rc.granularity == string(CoarseGrained) {
// we believe 32 is large enough for download worker pool.
// it won't reach the limit if sst files distribute evenly.
// when restore memory usage is still too high, we should reduce concurrencyPerStore
// to sarifice some speed to reduce memory usage.
count := uint(rc.storeCount) * rc.concurrencyPerStore * 32
log.Info("download coarse worker pool", zap.Uint("size", count))
rc.workerPool = utils.NewWorkerPool(count, "file")
return
}
rc.workerPool = utils.NewWorkerPool(c, "file")
}

Expand Down Expand Up @@ -1543,8 +1553,8 @@ LOOPFORTABLE:
// wait for download worker notified
rc.fileImporter.cond.Wait()
}
eg.Go(restoreFn)
rc.fileImporter.cond.L.Unlock()
rc.workerPool.ApplyOnErrorGroup(eg, restoreFn)
} else {
// if we are not use coarse granularity which means
// we still pipeline split & scatter regions and import sst files
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func (cfg *RestoreCommonConfig) adjust() {
func DefineRestoreCommonFlags(flags *pflag.FlagSet) {
// TODO remove experimental tag if it's stable
flags.Bool(flagOnline, false, "(experimental) Whether online when restore")
flags.String(flagGranularity, string(restore.FineGrained), "(experimental) Whether split & scatter regions using fine-grained way during restore")
flags.Uint(flagConcurrencyPerStore, 128, "(experimental) The size of thread pool on each store that executes tasks")
flags.String(flagGranularity, string(restore.FineGrained), "Whether split & scatter regions using fine-grained way during restore")
flags.Uint(flagConcurrencyPerStore, 128, "The size of thread pool on each store that executes tasks, only enabled when `--granularity=coarse-grained`")
flags.Uint32(flagConcurrency, 128, "(deprecated) The size of thread pool on BR that executes tasks, "+
"where each task restores one SST file to TiKV")
flags.Uint64(FlagMergeRegionSizeBytes, conn.DefaultMergeRegionSizeBytes,
Expand Down

0 comments on commit b6dd179

Please sign in to comment.