diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index a110a4836fe80..146612259cb5e 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -1862,6 +1862,7 @@ func ApplyKVFilesWithBatchMethod( batchCount int, batchSize uint64, applyFunc func(files []*backuppb.DataFileInfo, kvCount int64, size uint64), + applyWg *sync.WaitGroup, ) error { var ( tableMapFiles = make(map[int64]*FilesInTable) @@ -1940,6 +1941,7 @@ func ApplyKVFilesWithBatchMethod( } } + applyWg.Wait() for _, fwt := range tableMapFiles { for _, fs := range fwt.regionMapFiles { for _, d := range fs.deleteFiles { @@ -1970,6 +1972,7 @@ func ApplyKVFilesWithSingelMethod( ctx context.Context, files LogIter, applyFunc func(file []*backuppb.DataFileInfo, kvCount int64, size uint64), + applyWg *sync.WaitGroup, ) error { deleteKVFiles := make([]*backuppb.DataFileInfo, 0) @@ -1986,6 +1989,7 @@ func ApplyKVFilesWithSingelMethod( applyFunc([]*backuppb.DataFileInfo{f}, f.GetNumberOfEntries(), f.GetLength()) } + applyWg.Wait() log.Info("restore delete files", zap.Int("count", len(deleteKVFiles))) for _, file := range deleteKVFiles { f := file @@ -2025,6 +2029,7 @@ func (rc *Client) RestoreKVFiles( ctx = opentracing.ContextWithSpan(ctx, span1) } + var applyWg sync.WaitGroup eg, ectx := errgroup.WithContext(ctx) applyFunc := func(files []*backuppb.DataFileInfo, kvCount int64, size uint64) { if len(files) == 0 { @@ -2043,8 +2048,10 @@ func (rc *Client) RestoreKVFiles( log.Debug("skip file due to table id not matched", zap.Int64("table-id", files[0].TableId)) skipFile += len(files) } else { + applyWg.Add(1) rc.workerPool.ApplyOnErrorGroup(eg, func() (err error) { fileStart := time.Now() + defer applyWg.Done() defer func() { onProgress(int64(len(files))) updateStats(uint64(kvCount), size) @@ -2067,9 +2074,9 @@ func (rc *Client) RestoreKVFiles( rc.workerPool.ApplyOnErrorGroup(eg, func() error { if supportBatch { - err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc) + err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc, &applyWg) } else { - err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc) + err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc, &applyWg) } return errors.Trace(err) }) diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index ae943a96f276b..72c4a9397ba96 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -1139,6 +1139,7 @@ func TestApplyKVFilesWithSingelMethod(t *testing.T) { Type: backuppb.FileType_Put, }, } + var applyWg sync.WaitGroup applyFunc := func( files []*backuppb.DataFileInfo, kvCount int64, @@ -1155,6 +1156,7 @@ func TestApplyKVFilesWithSingelMethod(t *testing.T) { context.TODO(), iter.FromSlice(ds), applyFunc, + &applyWg, ) require.Equal(t, totalKVCount, int64(15)) @@ -1209,6 +1211,7 @@ func TestApplyKVFilesWithBatchMethod1(t *testing.T) { RegionId: 1, }, } + var applyWg sync.WaitGroup applyFunc := func( files []*backuppb.DataFileInfo, kvCount int64, @@ -1229,6 +1232,7 @@ func TestApplyKVFilesWithBatchMethod1(t *testing.T) { batchCount, batchSize, applyFunc, + &applyWg, ) require.Equal(t, runCount, 3) @@ -1297,6 +1301,7 @@ func TestApplyKVFilesWithBatchMethod2(t *testing.T) { RegionId: 1, }, } + var applyWg sync.WaitGroup applyFunc := func( files []*backuppb.DataFileInfo, kvCount int64, @@ -1317,6 +1322,7 @@ func TestApplyKVFilesWithBatchMethod2(t *testing.T) { batchCount, batchSize, applyFunc, + &applyWg, ) require.Equal(t, runCount, 4) @@ -1379,6 +1385,7 @@ func TestApplyKVFilesWithBatchMethod3(t *testing.T) { RegionId: 3, }, } + var applyWg sync.WaitGroup applyFunc := func( files []*backuppb.DataFileInfo, kvCount int64, @@ -1399,6 +1406,7 @@ func TestApplyKVFilesWithBatchMethod3(t *testing.T) { batchCount, batchSize, applyFunc, + &applyWg, ) require.Equal(t, totalKVCount, int64(25)) @@ -1459,6 +1467,7 @@ func TestApplyKVFilesWithBatchMethod4(t *testing.T) { TableId: 2, }, } + var applyWg sync.WaitGroup applyFunc := func( files []*backuppb.DataFileInfo, kvCount int64, @@ -1479,6 +1488,7 @@ func TestApplyKVFilesWithBatchMethod4(t *testing.T) { batchCount, batchSize, applyFunc, + &applyWg, ) require.Equal(t, runCount, 4) @@ -1494,6 +1504,92 @@ func TestApplyKVFilesWithBatchMethod4(t *testing.T) { ) } +func TestApplyKVFilesWithBatchMethod5(t *testing.T) { + var lock sync.Mutex + types := make([]backuppb.FileType, 0) + ds := []*backuppb.DataFileInfo{ + { + Path: "log1", + NumberOfEntries: 5, + Length: 2000, + Cf: stream.WriteCF, + Type: backuppb.FileType_Delete, + TableId: 1, + }, { + Path: "log2", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + TableId: 1, + }, { + Path: "log3", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + TableId: 2, + }, { + Path: "log4", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + TableId: 1, + }, { + Path: "log5", + NumberOfEntries: 5, + Length: 100, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + TableId: 2, + }, + } + var applyWg sync.WaitGroup + applyFunc := func( + files []*backuppb.DataFileInfo, + kvCount int64, + size uint64, + ) { + if len(files) == 0 { + return + } + applyWg.Add(1) + go func() { + defer applyWg.Done() + if files[0].Type == backuppb.FileType_Put { + time.Sleep(time.Second) + } + lock.Lock() + types = append(types, files[0].Type) + lock.Unlock() + }() + } + + restore.ApplyKVFilesWithBatchMethod( + context.TODO(), + iter.FromSlice(ds), + 2, + 1500, + applyFunc, + &applyWg, + ) + + applyWg.Wait() + require.Equal(t, backuppb.FileType_Delete, types[len(types)-1]) + + types = make([]backuppb.FileType, 0) + restore.ApplyKVFilesWithSingelMethod( + context.TODO(), + iter.FromSlice(ds), + applyFunc, + &applyWg, + ) + + applyWg.Wait() + require.Equal(t, backuppb.FileType_Delete, types[len(types)-1]) +} + func TestCheckNewCollationEnable(t *testing.T) { caseList := []struct { backupMeta *backuppb.BackupMeta