From 032c8e599c71f1322115680fdba52fedde625725 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 14 Nov 2023 11:12:44 +0800 Subject: [PATCH] external/test: compare the performance of merge iter (#48560) --- br/pkg/lightning/backend/external/BUILD.bazel | 3 +- .../lightning/backend/external/bench_test.go | 230 +++++++++++++++++- 2 files changed, 225 insertions(+), 8 deletions(-) diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index a9f2f06abde9b..a4390fbf03242 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -65,7 +65,7 @@ go_test( ], embed = [":external"], flaky = True, - shard_count = 45, + shard_count = 46, deps = [ "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", @@ -89,6 +89,7 @@ go_test( "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_stretchr_testify//require", "@org_golang_x_exp//rand", + "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", ], ) diff --git a/br/pkg/lightning/backend/external/bench_test.go b/br/pkg/lightning/backend/external/bench_test.go index 55b7fea3ea303..798a81eae8025 100644 --- a/br/pkg/lightning/backend/external/bench_test.go +++ b/br/pkg/lightning/backend/external/bench_test.go @@ -18,14 +18,17 @@ import ( "context" "flag" "fmt" + "io" "os" "runtime/pprof" + "sync" "testing" "time" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/intest" + "golang.org/x/sync/errgroup" ) var testingStorageURI = flag.String("testing-storage-uri", "", "the URI of the storage used for testing") @@ -93,8 +96,7 @@ func (s *ascendingKeySource) outputSize() int { return s.totalSize } -type testSuite struct { - t *testing.T +type writeTestSuite struct { store storage.ExternalStorage source kvSource memoryLimit int @@ -103,7 +105,7 @@ type testSuite struct { afterWriterClose func() } -func writePlainFile(s *testSuite) { +func writePlainFile(s *writeTestSuite) { ctx := context.Background() buf := make([]byte, s.memoryLimit) offset := 0 @@ -139,7 +141,7 @@ func writePlainFile(s *testSuite) { } } -func writeExternalFile(s *testSuite) { +func writeExternalFile(s *writeTestSuite) { ctx := context.Background() builder := NewWriterBuilder(). SetMemorySizeLimit(uint64(s.memoryLimit)) @@ -164,7 +166,7 @@ func writeExternalFile(s *testSuite) { } } -func TestCompare(t *testing.T) { +func TestCompareWriter(t *testing.T) { store := openTestingStorage(t) source := newAscendingKeySource(20, 100, 10000000) memoryLimit := 64 * 1024 * 1024 @@ -195,8 +197,7 @@ func TestCompare(t *testing.T) { pprof.StopCPUProfile() } - suite := &testSuite{ - t: t, + suite := &writeTestSuite{ store: store, source: source, memoryLimit: memoryLimit, @@ -214,3 +215,218 @@ func TestCompare(t *testing.T) { writerSpeed := float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024 t.Logf("writer speed for %d bytes: %.2f MB/s", source.outputSize(), writerSpeed) } + +type readTestSuite struct { + store storage.ExternalStorage + totalKVCnt int + concurrency int + memoryLimit int + beforeCreateReader func() + beforeReaderClose func() + afterReaderClose func() +} + +func readFileSequential(s *readTestSuite) { + ctx := context.Background() + files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed") + intest.Assert(err == nil) + + buf := make([]byte, s.memoryLimit) + if s.beforeCreateReader != nil { + s.beforeCreateReader() + } + for i, file := range files { + reader, err := s.store.Open(ctx, file, nil) + intest.Assert(err == nil) + _, err = reader.Read(buf) + for err == nil { + _, err = reader.Read(buf) + } + intest.Assert(err == io.EOF) + if i == len(files)-1 { + if s.beforeReaderClose != nil { + s.beforeReaderClose() + } + } + err = reader.Close() + intest.Assert(err == nil) + } + if s.afterReaderClose != nil { + s.afterReaderClose() + } +} + +func readFileConcurrently(s *readTestSuite) { + ctx := context.Background() + files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed") + intest.Assert(err == nil) + + conc := min(s.concurrency, len(files)) + var eg errgroup.Group + eg.SetLimit(conc) + var once sync.Once + + if s.beforeCreateReader != nil { + s.beforeCreateReader() + } + for _, file := range files { + eg.Go(func() error { + buf := make([]byte, s.memoryLimit/conc) + reader, err := s.store.Open(ctx, file, nil) + intest.Assert(err == nil) + _, err = reader.Read(buf) + for err == nil { + _, err = reader.Read(buf) + } + intest.Assert(err == io.EOF) + once.Do(func() { + if s.beforeReaderClose != nil { + s.beforeReaderClose() + } + }) + err = reader.Close() + intest.Assert(err == nil) + return nil + }) + } + err = eg.Wait() + intest.Assert(err == nil) + if s.afterReaderClose != nil { + s.afterReaderClose() + } +} + +func createEvenlyDistributedFiles( + t *testing.T, + fileSize, fileCount int, +) (storage.ExternalStorage, int) { + store := openTestingStorage(t) + ctx := context.Background() + + files, statFiles, err := GetAllFileNames(ctx, store, "evenly_distributed") + intest.Assert(err == nil) + err = store.DeleteFiles(ctx, files) + intest.Assert(err == nil) + err = store.DeleteFiles(ctx, statFiles) + intest.Assert(err == nil) + + value := make([]byte, 100) + kvCnt := 0 + for i := 0; i < fileCount; i++ { + builder := NewWriterBuilder(). + SetMemorySizeLimit(uint64(float64(fileSize) * 1.1)) + writer := builder.Build( + store, + "evenly_distributed", + fmt.Sprintf("%d", i), + ) + + keyIdx := i + totalSize := 0 + for totalSize < fileSize { + key := fmt.Sprintf("key_%09d", keyIdx) + err := writer.WriteRow(ctx, []byte(key), value, nil) + intest.Assert(err == nil) + keyIdx += fileCount + totalSize += len(key) + len(value) + kvCnt++ + } + err := writer.Close(ctx) + intest.Assert(err == nil) + } + return store, kvCnt +} + +func readMergeIter(s *readTestSuite) { + ctx := context.Background() + files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed") + intest.Assert(err == nil) + + if s.beforeCreateReader != nil { + s.beforeCreateReader() + } + + readBufSize := s.memoryLimit / len(files) + zeroOffsets := make([]uint64, len(files)) + iter, err := NewMergeKVIter(ctx, files, zeroOffsets, s.store, readBufSize, false) + intest.Assert(err == nil) + + kvCnt := 0 + for iter.Next() { + kvCnt++ + if kvCnt == s.totalKVCnt/2 { + if s.beforeReaderClose != nil { + s.beforeReaderClose() + } + } + } + intest.Assert(kvCnt == s.totalKVCnt) + err = iter.Close() + intest.Assert(err == nil) + if s.afterReaderClose != nil { + s.afterReaderClose() + } +} + +func TestCompareReader(t *testing.T) { + fileSize := 50 * 1024 * 1024 + fileCnt := 24 + store, kvCnt := createEvenlyDistributedFiles(t, fileSize, fileCnt) + memoryLimit := 64 * 1024 * 1024 + fileIdx := 0 + var ( + now time.Time + elapsed time.Duration + file *os.File + err error + ) + beforeTest := func() { + fileIdx++ + file, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", fileIdx)) + intest.Assert(err == nil) + err = pprof.StartCPUProfile(file) + intest.Assert(err == nil) + now = time.Now() + } + beforeClose := func() { + file, err = os.Create(fmt.Sprintf("heap-profile-%d.prof", fileIdx)) + intest.Assert(err == nil) + // check heap profile to see the memory usage is expected + err = pprof.WriteHeapProfile(file) + intest.Assert(err == nil) + } + afterClose := func() { + elapsed = time.Since(now) + pprof.StopCPUProfile() + } + + suite := &readTestSuite{ + store: store, + totalKVCnt: kvCnt, + concurrency: 100, + memoryLimit: memoryLimit, + beforeCreateReader: beforeTest, + beforeReaderClose: beforeClose, + afterReaderClose: afterClose, + } + readFileSequential(suite) + t.Logf( + "sequential read speed for %d bytes: %.2f MB/s", + fileSize*fileCnt, + float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024, + ) + + readFileConcurrently(suite) + t.Logf( + "concurrent read speed for %d bytes: %.2f MB/s", + fileSize*fileCnt, + float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024, + ) + + readMergeIter(suite) + t.Logf( + "merge iter read speed for %d bytes: %.2f MB/s", + fileSize*fileCnt, + float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024, + ) +}