From d84df2b42ebb99d5d3e40066edc0a6136088717d Mon Sep 17 00:00:00 2001 From: EasonBall <592838129@qq.com> Date: Wed, 20 Dec 2023 13:27:23 +0800 Subject: [PATCH] global sort: bench new merge step (#49595) ref pingcap/tidb#48952 --- .../lightning/backend/external/bench_test.go | 101 +++++++++++++++--- 1 file changed, 87 insertions(+), 14 deletions(-) diff --git a/br/pkg/lightning/backend/external/bench_test.go b/br/pkg/lightning/backend/external/bench_test.go index fe7cfcd6327b1..58fe2fa150199 100644 --- a/br/pkg/lightning/backend/external/bench_test.go +++ b/br/pkg/lightning/backend/external/bench_test.go @@ -577,13 +577,14 @@ func createEvenlyDistributedFiles( store storage.ExternalStorage, fileSize, fileCount int, subDir string, -) int { +) (int, kv.Key, kv.Key) { ctx := context.Background() cleanOldFiles(ctx, store, "/"+subDir) value := make([]byte, 100) kvCnt := 0 + var minKey, maxKey kv.Key for i := 0; i < fileCount; i++ { builder := NewWriterBuilder(). SetBlockSize(10 * 1024 * 1024). @@ -598,6 +599,13 @@ func createEvenlyDistributedFiles( totalSize := 0 for totalSize < fileSize { key := fmt.Sprintf("key_%09d", keyIdx) + if len(minKey) == 0 && len(maxKey) == 0 { + minKey = []byte(key) + maxKey = []byte(key) + } else { + minKey = BytesMin(minKey, []byte(key)) + maxKey = BytesMax(maxKey, []byte(key)) + } err := writer.WriteRow(ctx, []byte(key), value, nil) intest.AssertNoError(err) keyIdx += fileCount @@ -607,7 +615,7 @@ func createEvenlyDistributedFiles( err := writer.Close(ctx) intest.AssertNoError(err) } - return kvCnt + return kvCnt, minKey, maxKey } func readMergeIter(t *testing.T, s *readTestSuite) { @@ -656,7 +664,7 @@ func TestCompareReaderEvenlyDistributedContent(t *testing.T) { subDir := "evenly_distributed" store := openTestingStorage(t) - kvCnt := createEvenlyDistributedFiles(store, fileSize, fileCnt, subDir) + kvCnt, _, _ := createEvenlyDistributedFiles(store, fileSize, fileCnt, subDir) memoryLimit := 64 * 1024 * 1024 fileIdx := 0 var ( @@ -722,7 +730,7 @@ func createAscendingFiles( store storage.ExternalStorage, fileSize, fileCount int, subDir string, -) int { +) (int, kv.Key, kv.Key) { ctx := context.Background() cleanOldFiles(ctx, store, "/"+subDir) @@ -730,6 +738,7 @@ func createAscendingFiles( keyIdx := 0 value := make([]byte, 100) kvCnt := 0 + var minKey, maxKey kv.Key for i := 0; i < fileCount; i++ { builder := NewWriterBuilder(). SetMemorySizeLimit(uint64(float64(fileSize) * 1.1)) @@ -740,18 +749,25 @@ func createAscendingFiles( ) totalSize := 0 + var key string for totalSize < fileSize { - key := fmt.Sprintf("key_%09d", keyIdx) + key = fmt.Sprintf("key_%09d", keyIdx) + if i == 0 && totalSize == 0 { + minKey = []byte(key) + } err := writer.WriteRow(ctx, []byte(key), value, nil) intest.AssertNoError(err) keyIdx++ totalSize += len(key) + len(value) kvCnt++ } + if i == fileCount-1 { + maxKey = []byte(key) + } err := writer.Close(ctx) intest.AssertNoError(err) } - return kvCnt + return kvCnt, minKey, maxKey } var ( @@ -784,12 +800,12 @@ func TestReadMergeIterWithoutCheckHotspot(t *testing.T) { func testCompareReaderWithContent( t *testing.T, - createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) int, + createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) (int, kv.Key, kv.Key), fn func(t *testing.T, suite *readTestSuite)) { store := openTestingStorage(t) kvCnt := 0 if !*skipCreate { - kvCnt = createFn(store, *fileSize, *fileCount, *objectPrefix) + kvCnt, _, _ = createFn(store, *fileSize, *fileCount, *objectPrefix) } fileIdx := 0 var ( @@ -951,7 +967,55 @@ func mergeStep(t *testing.T, s *mergeTestSuite) { } elapsed := time.Since(now) t.Logf( - "merge speed for %d bytes in %s, speed: %.2f MB/s", + "merge speed for %d bytes in %s with %d concurrency, speed: %.2f MB/s", + totalSize.Load(), + elapsed, + s.concurrency, + float64(totalSize.Load())/elapsed.Seconds()/1024/1024, + ) +} + +func newMergeStep(t *testing.T, s *mergeTestSuite) { + ctx := context.Background() + datas, stats, err := GetAllFileNames(ctx, s.store, "/"+s.subDir) + intest.AssertNoError(err) + + mergeOutput := "merge_output" + totalSize := atomic.NewUint64(0) + onClose := func(s *WriterSummary) { + totalSize.Add(s.TotalSize) + } + if s.beforeMerge != nil { + s.beforeMerge() + } + + now := time.Now() + err = MergeOverlappingFilesV2( + ctx, + datas, + stats, + s.store, + s.minKey, + s.maxKey.Next(), + int64(5*size.MB), + mergeOutput, + "test", + DefaultBlockSize, + 8*1024, + 1*size.MB, + 8*1024, + onClose, + s.concurrency, + s.mergeIterHotspot, + ) + + intest.AssertNoError(err) + if s.afterMerge != nil { + s.afterMerge() + } + elapsed := time.Since(now) + t.Logf( + "new merge speed for %d bytes in %s, speed: %.2f MB/s", totalSize.Load(), elapsed, float64(totalSize.Load())/elapsed.Seconds()/1024/1024, @@ -960,13 +1024,14 @@ func mergeStep(t *testing.T, s *mergeTestSuite) { func testCompareMergeWithContent( t *testing.T, - createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) int, + concurrency int, + createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) (int, kv.Key, kv.Key), fn func(t *testing.T, suite *mergeTestSuite)) { store := openTestingStorage(t) kvCnt := 0 var minKey, maxKey kv.Key if !*skipCreate { - kvCnt = createFn(store, *fileSize, *fileCount, *objectPrefix) + kvCnt, minKey, maxKey = createFn(store, *fileSize, *fileCount, *objectPrefix) } fileIdx := 0 @@ -988,7 +1053,7 @@ func testCompareMergeWithContent( suite := &mergeTestSuite{ store: store, totalKVCnt: kvCnt, - concurrency: *concurrency, + concurrency: concurrency, memoryLimit: *memoryLimit, beforeMerge: beforeTest, afterMerge: afterTest, @@ -1002,6 +1067,14 @@ func testCompareMergeWithContent( } func TestMergeBench(t *testing.T) { - testCompareMergeWithContent(t, createAscendingFiles, mergeStep) - testCompareMergeWithContent(t, createEvenlyDistributedFiles, mergeStep) + testCompareMergeWithContent(t, 1, createAscendingFiles, mergeStep) + testCompareMergeWithContent(t, 1, createEvenlyDistributedFiles, mergeStep) + testCompareMergeWithContent(t, 2, createAscendingFiles, mergeStep) + testCompareMergeWithContent(t, 2, createEvenlyDistributedFiles, mergeStep) + testCompareMergeWithContent(t, 4, createAscendingFiles, mergeStep) + testCompareMergeWithContent(t, 4, createEvenlyDistributedFiles, mergeStep) + testCompareMergeWithContent(t, 8, createAscendingFiles, mergeStep) + testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, mergeStep) + testCompareMergeWithContent(t, 8, createAscendingFiles, newMergeStep) + testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, newMergeStep) }