Skip to content

Commit

Permalink
br: add waitgroup for delete type files (#43751) (#43796)
Browse files Browse the repository at this point in the history
close #43739
  • Loading branch information
ti-chi-bot authored May 26, 2023
1 parent 9dd1a1a commit c8f7346
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 2 deletions.
11 changes: 9 additions & 2 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1862,6 +1862,7 @@ func ApplyKVFilesWithBatchMethod(
batchCount int,
batchSize uint64,
applyFunc func(files []*backuppb.DataFileInfo, kvCount int64, size uint64),
applyWg *sync.WaitGroup,
) error {
var (
tableMapFiles = make(map[int64]*FilesInTable)
Expand Down Expand Up @@ -1940,6 +1941,7 @@ func ApplyKVFilesWithBatchMethod(
}
}

applyWg.Wait()
for _, fwt := range tableMapFiles {
for _, fs := range fwt.regionMapFiles {
for _, d := range fs.deleteFiles {
Expand Down Expand Up @@ -1970,6 +1972,7 @@ func ApplyKVFilesWithSingelMethod(
ctx context.Context,
files LogIter,
applyFunc func(file []*backuppb.DataFileInfo, kvCount int64, size uint64),
applyWg *sync.WaitGroup,
) error {
deleteKVFiles := make([]*backuppb.DataFileInfo, 0)

Expand All @@ -1986,6 +1989,7 @@ func ApplyKVFilesWithSingelMethod(
applyFunc([]*backuppb.DataFileInfo{f}, f.GetNumberOfEntries(), f.GetLength())
}

applyWg.Wait()
log.Info("restore delete files", zap.Int("count", len(deleteKVFiles)))
for _, file := range deleteKVFiles {
f := file
Expand Down Expand Up @@ -2025,6 +2029,7 @@ func (rc *Client) RestoreKVFiles(
ctx = opentracing.ContextWithSpan(ctx, span1)
}

var applyWg sync.WaitGroup
eg, ectx := errgroup.WithContext(ctx)
applyFunc := func(files []*backuppb.DataFileInfo, kvCount int64, size uint64) {
if len(files) == 0 {
Expand All @@ -2043,8 +2048,10 @@ func (rc *Client) RestoreKVFiles(
log.Debug("skip file due to table id not matched", zap.Int64("table-id", files[0].TableId))
skipFile += len(files)
} else {
applyWg.Add(1)
rc.workerPool.ApplyOnErrorGroup(eg, func() (err error) {
fileStart := time.Now()
defer applyWg.Done()
defer func() {
onProgress(int64(len(files)))
updateStats(uint64(kvCount), size)
Expand All @@ -2067,9 +2074,9 @@ func (rc *Client) RestoreKVFiles(

rc.workerPool.ApplyOnErrorGroup(eg, func() error {
if supportBatch {
err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc)
err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc, &applyWg)
} else {
err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc)
err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc, &applyWg)
}
return errors.Trace(err)
})
Expand Down
96 changes: 96 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,7 @@ func TestApplyKVFilesWithSingelMethod(t *testing.T) {
Type: backuppb.FileType_Put,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*backuppb.DataFileInfo,
kvCount int64,
Expand All @@ -1155,6 +1156,7 @@ func TestApplyKVFilesWithSingelMethod(t *testing.T) {
context.TODO(),
iter.FromSlice(ds),
applyFunc,
&applyWg,
)

require.Equal(t, totalKVCount, int64(15))
Expand Down Expand Up @@ -1209,6 +1211,7 @@ func TestApplyKVFilesWithBatchMethod1(t *testing.T) {
RegionId: 1,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*backuppb.DataFileInfo,
kvCount int64,
Expand All @@ -1229,6 +1232,7 @@ func TestApplyKVFilesWithBatchMethod1(t *testing.T) {
batchCount,
batchSize,
applyFunc,
&applyWg,
)

require.Equal(t, runCount, 3)
Expand Down Expand Up @@ -1297,6 +1301,7 @@ func TestApplyKVFilesWithBatchMethod2(t *testing.T) {
RegionId: 1,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*backuppb.DataFileInfo,
kvCount int64,
Expand All @@ -1317,6 +1322,7 @@ func TestApplyKVFilesWithBatchMethod2(t *testing.T) {
batchCount,
batchSize,
applyFunc,
&applyWg,
)

require.Equal(t, runCount, 4)
Expand Down Expand Up @@ -1379,6 +1385,7 @@ func TestApplyKVFilesWithBatchMethod3(t *testing.T) {
RegionId: 3,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*backuppb.DataFileInfo,
kvCount int64,
Expand All @@ -1399,6 +1406,7 @@ func TestApplyKVFilesWithBatchMethod3(t *testing.T) {
batchCount,
batchSize,
applyFunc,
&applyWg,
)

require.Equal(t, totalKVCount, int64(25))
Expand Down Expand Up @@ -1459,6 +1467,7 @@ func TestApplyKVFilesWithBatchMethod4(t *testing.T) {
TableId: 2,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*backuppb.DataFileInfo,
kvCount int64,
Expand All @@ -1479,6 +1488,7 @@ func TestApplyKVFilesWithBatchMethod4(t *testing.T) {
batchCount,
batchSize,
applyFunc,
&applyWg,
)

require.Equal(t, runCount, 4)
Expand All @@ -1494,6 +1504,92 @@ func TestApplyKVFilesWithBatchMethod4(t *testing.T) {
)
}

func TestApplyKVFilesWithBatchMethod5(t *testing.T) {
var lock sync.Mutex
types := make([]backuppb.FileType, 0)
ds := []*backuppb.DataFileInfo{
{
Path: "log1",
NumberOfEntries: 5,
Length: 2000,
Cf: stream.WriteCF,
Type: backuppb.FileType_Delete,
TableId: 1,
}, {
Path: "log2",
NumberOfEntries: 5,
Length: 100,
Cf: stream.WriteCF,
Type: backuppb.FileType_Put,
TableId: 1,
}, {
Path: "log3",
NumberOfEntries: 5,
Length: 100,
Cf: stream.WriteCF,
Type: backuppb.FileType_Put,
TableId: 2,
}, {
Path: "log4",
NumberOfEntries: 5,
Length: 100,
Cf: stream.WriteCF,
Type: backuppb.FileType_Put,
TableId: 1,
}, {
Path: "log5",
NumberOfEntries: 5,
Length: 100,
Cf: stream.DefaultCF,
Type: backuppb.FileType_Put,
TableId: 2,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*backuppb.DataFileInfo,
kvCount int64,
size uint64,
) {
if len(files) == 0 {
return
}
applyWg.Add(1)
go func() {
defer applyWg.Done()
if files[0].Type == backuppb.FileType_Put {
time.Sleep(time.Second)
}
lock.Lock()
types = append(types, files[0].Type)
lock.Unlock()
}()
}

restore.ApplyKVFilesWithBatchMethod(
context.TODO(),
iter.FromSlice(ds),
2,
1500,
applyFunc,
&applyWg,
)

applyWg.Wait()
require.Equal(t, backuppb.FileType_Delete, types[len(types)-1])

types = make([]backuppb.FileType, 0)
restore.ApplyKVFilesWithSingelMethod(
context.TODO(),
iter.FromSlice(ds),
applyFunc,
&applyWg,
)

applyWg.Wait()
require.Equal(t, backuppb.FileType_Delete, types[len(types)-1])
}

func TestCheckNewCollationEnable(t *testing.T) {
caseList := []struct {
backupMeta *backuppb.BackupMeta
Expand Down

0 comments on commit c8f7346

Please sign in to comment.