Skip to content

Commit

Permalink
external/test: compare the performance of merge iter (#48560)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Nov 14, 2023
1 parent 50237bd commit 032c8e5
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 8 deletions.
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 45,
shard_count = 46,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand All @@ -89,6 +89,7 @@ go_test(
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_stretchr_testify//require",
"@org_golang_x_exp//rand",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
],
)
230 changes: 223 additions & 7 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import (
"context"
"flag"
"fmt"
"io"
"os"
"runtime/pprof"
"sync"
"testing"
"time"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/intest"
"golang.org/x/sync/errgroup"
)

var testingStorageURI = flag.String("testing-storage-uri", "", "the URI of the storage used for testing")
Expand Down Expand Up @@ -93,8 +96,7 @@ func (s *ascendingKeySource) outputSize() int {
return s.totalSize
}

type testSuite struct {
t *testing.T
type writeTestSuite struct {
store storage.ExternalStorage
source kvSource
memoryLimit int
Expand All @@ -103,7 +105,7 @@ type testSuite struct {
afterWriterClose func()
}

func writePlainFile(s *testSuite) {
func writePlainFile(s *writeTestSuite) {
ctx := context.Background()
buf := make([]byte, s.memoryLimit)
offset := 0
Expand Down Expand Up @@ -139,7 +141,7 @@ func writePlainFile(s *testSuite) {
}
}

func writeExternalFile(s *testSuite) {
func writeExternalFile(s *writeTestSuite) {
ctx := context.Background()
builder := NewWriterBuilder().
SetMemorySizeLimit(uint64(s.memoryLimit))
Expand All @@ -164,7 +166,7 @@ func writeExternalFile(s *testSuite) {
}
}

func TestCompare(t *testing.T) {
func TestCompareWriter(t *testing.T) {
store := openTestingStorage(t)
source := newAscendingKeySource(20, 100, 10000000)
memoryLimit := 64 * 1024 * 1024
Expand Down Expand Up @@ -195,8 +197,7 @@ func TestCompare(t *testing.T) {
pprof.StopCPUProfile()
}

suite := &testSuite{
t: t,
suite := &writeTestSuite{
store: store,
source: source,
memoryLimit: memoryLimit,
Expand All @@ -214,3 +215,218 @@ func TestCompare(t *testing.T) {
writerSpeed := float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024
t.Logf("writer speed for %d bytes: %.2f MB/s", source.outputSize(), writerSpeed)
}

type readTestSuite struct {
store storage.ExternalStorage
totalKVCnt int
concurrency int
memoryLimit int
beforeCreateReader func()
beforeReaderClose func()
afterReaderClose func()
}

func readFileSequential(s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed")
intest.Assert(err == nil)

buf := make([]byte, s.memoryLimit)
if s.beforeCreateReader != nil {
s.beforeCreateReader()
}
for i, file := range files {
reader, err := s.store.Open(ctx, file, nil)
intest.Assert(err == nil)
_, err = reader.Read(buf)
for err == nil {
_, err = reader.Read(buf)
}
intest.Assert(err == io.EOF)
if i == len(files)-1 {
if s.beforeReaderClose != nil {
s.beforeReaderClose()
}
}
err = reader.Close()
intest.Assert(err == nil)
}
if s.afterReaderClose != nil {
s.afterReaderClose()
}
}

func readFileConcurrently(s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed")
intest.Assert(err == nil)

conc := min(s.concurrency, len(files))
var eg errgroup.Group
eg.SetLimit(conc)
var once sync.Once

if s.beforeCreateReader != nil {
s.beforeCreateReader()
}
for _, file := range files {
eg.Go(func() error {
buf := make([]byte, s.memoryLimit/conc)
reader, err := s.store.Open(ctx, file, nil)
intest.Assert(err == nil)
_, err = reader.Read(buf)
for err == nil {
_, err = reader.Read(buf)
}
intest.Assert(err == io.EOF)
once.Do(func() {
if s.beforeReaderClose != nil {
s.beforeReaderClose()
}
})
err = reader.Close()
intest.Assert(err == nil)
return nil
})
}
err = eg.Wait()
intest.Assert(err == nil)
if s.afterReaderClose != nil {
s.afterReaderClose()
}
}

func createEvenlyDistributedFiles(
t *testing.T,
fileSize, fileCount int,
) (storage.ExternalStorage, int) {
store := openTestingStorage(t)
ctx := context.Background()

files, statFiles, err := GetAllFileNames(ctx, store, "evenly_distributed")
intest.Assert(err == nil)
err = store.DeleteFiles(ctx, files)
intest.Assert(err == nil)
err = store.DeleteFiles(ctx, statFiles)
intest.Assert(err == nil)

value := make([]byte, 100)
kvCnt := 0
for i := 0; i < fileCount; i++ {
builder := NewWriterBuilder().
SetMemorySizeLimit(uint64(float64(fileSize) * 1.1))
writer := builder.Build(
store,
"evenly_distributed",
fmt.Sprintf("%d", i),
)

keyIdx := i
totalSize := 0
for totalSize < fileSize {
key := fmt.Sprintf("key_%09d", keyIdx)
err := writer.WriteRow(ctx, []byte(key), value, nil)
intest.Assert(err == nil)
keyIdx += fileCount
totalSize += len(key) + len(value)
kvCnt++
}
err := writer.Close(ctx)
intest.Assert(err == nil)
}
return store, kvCnt
}

func readMergeIter(s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed")
intest.Assert(err == nil)

if s.beforeCreateReader != nil {
s.beforeCreateReader()
}

readBufSize := s.memoryLimit / len(files)
zeroOffsets := make([]uint64, len(files))
iter, err := NewMergeKVIter(ctx, files, zeroOffsets, s.store, readBufSize, false)
intest.Assert(err == nil)

kvCnt := 0
for iter.Next() {
kvCnt++
if kvCnt == s.totalKVCnt/2 {
if s.beforeReaderClose != nil {
s.beforeReaderClose()
}
}
}
intest.Assert(kvCnt == s.totalKVCnt)
err = iter.Close()
intest.Assert(err == nil)
if s.afterReaderClose != nil {
s.afterReaderClose()
}
}

func TestCompareReader(t *testing.T) {
fileSize := 50 * 1024 * 1024
fileCnt := 24
store, kvCnt := createEvenlyDistributedFiles(t, fileSize, fileCnt)
memoryLimit := 64 * 1024 * 1024
fileIdx := 0
var (
now time.Time
elapsed time.Duration
file *os.File
err error
)
beforeTest := func() {
fileIdx++
file, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", fileIdx))
intest.Assert(err == nil)
err = pprof.StartCPUProfile(file)
intest.Assert(err == nil)
now = time.Now()
}
beforeClose := func() {
file, err = os.Create(fmt.Sprintf("heap-profile-%d.prof", fileIdx))
intest.Assert(err == nil)
// check heap profile to see the memory usage is expected
err = pprof.WriteHeapProfile(file)
intest.Assert(err == nil)
}
afterClose := func() {
elapsed = time.Since(now)
pprof.StopCPUProfile()
}

suite := &readTestSuite{
store: store,
totalKVCnt: kvCnt,
concurrency: 100,
memoryLimit: memoryLimit,
beforeCreateReader: beforeTest,
beforeReaderClose: beforeClose,
afterReaderClose: afterClose,
}
readFileSequential(suite)
t.Logf(
"sequential read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)

readFileConcurrently(suite)
t.Logf(
"concurrent read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)

readMergeIter(suite)
t.Logf(
"merge iter read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)
}

0 comments on commit 032c8e5

Please sign in to comment.