Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

external/test: compare the performance of merge iter #48560

Merged
merged 10 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 45,
shard_count = 46,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand All @@ -89,6 +89,7 @@ go_test(
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_stretchr_testify//require",
"@org_golang_x_exp//rand",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
],
)
230 changes: 223 additions & 7 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import (
"context"
"flag"
"fmt"
"io"
"os"
"runtime/pprof"
"sync"
"testing"
"time"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/intest"
"golang.org/x/sync/errgroup"
)

var testingStorageURI = flag.String("testing-storage-uri", "", "the URI of the storage used for testing")
Expand Down Expand Up @@ -93,8 +96,7 @@ func (s *ascendingKeySource) outputSize() int {
return s.totalSize
}

type testSuite struct {
t *testing.T
type writeTestSuite struct {
store storage.ExternalStorage
source kvSource
memoryLimit int
Expand All @@ -103,7 +105,7 @@ type testSuite struct {
afterWriterClose func()
}

func writePlainFile(s *testSuite) {
func writePlainFile(s *writeTestSuite) {
ctx := context.Background()
buf := make([]byte, s.memoryLimit)
offset := 0
Expand Down Expand Up @@ -139,7 +141,7 @@ func writePlainFile(s *testSuite) {
}
}

func writeExternalFile(s *testSuite) {
func writeExternalFile(s *writeTestSuite) {
ctx := context.Background()
builder := NewWriterBuilder().
SetMemorySizeLimit(uint64(s.memoryLimit))
Expand All @@ -164,7 +166,7 @@ func writeExternalFile(s *testSuite) {
}
}

func TestCompare(t *testing.T) {
func TestCompareWriter(t *testing.T) {
store := openTestingStorage(t)
source := newAscendingKeySource(20, 100, 10000000)
memoryLimit := 64 * 1024 * 1024
Expand Down Expand Up @@ -195,8 +197,7 @@ func TestCompare(t *testing.T) {
pprof.StopCPUProfile()
}

suite := &testSuite{
t: t,
suite := &writeTestSuite{
store: store,
source: source,
memoryLimit: memoryLimit,
Expand All @@ -214,3 +215,218 @@ func TestCompare(t *testing.T) {
writerSpeed := float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024
t.Logf("writer speed for %d bytes: %.2f MB/s", source.outputSize(), writerSpeed)
}

type readTestSuite struct {
store storage.ExternalStorage
totalKVCnt int
concurrency int
memoryLimit int
beforeCreateReader func()
beforeReaderClose func()
afterReaderClose func()
}

func readFileSequential(s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed")
intest.Assert(err == nil)

buf := make([]byte, s.memoryLimit)
if s.beforeCreateReader != nil {
s.beforeCreateReader()
}
for i, file := range files {
reader, err := s.store.Open(ctx, file, nil)
intest.Assert(err == nil)
_, err = reader.Read(buf)
for err == nil {
_, err = reader.Read(buf)
}
intest.Assert(err == io.EOF)
if i == len(files)-1 {
if s.beforeReaderClose != nil {
s.beforeReaderClose()
}
}
err = reader.Close()
intest.Assert(err == nil)
}
if s.afterReaderClose != nil {
s.afterReaderClose()
}
}

func readFileConcurrently(s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed")
intest.Assert(err == nil)

conc := min(s.concurrency, len(files))
var eg errgroup.Group
eg.SetLimit(conc)
var once sync.Once

if s.beforeCreateReader != nil {
s.beforeCreateReader()
}
for _, file := range files {
eg.Go(func() error {
buf := make([]byte, s.memoryLimit/conc)
reader, err := s.store.Open(ctx, file, nil)
intest.Assert(err == nil)
_, err = reader.Read(buf)
for err == nil {
_, err = reader.Read(buf)
}
intest.Assert(err == io.EOF)
once.Do(func() {
if s.beforeReaderClose != nil {
s.beforeReaderClose()
}
})
err = reader.Close()
intest.Assert(err == nil)
return nil
})
}
err = eg.Wait()
intest.Assert(err == nil)
if s.afterReaderClose != nil {
s.afterReaderClose()
}
}

func createEvenlyDistributedFiles(
t *testing.T,
fileSize, fileCount int,
) (storage.ExternalStorage, int) {
store := openTestingStorage(t)
ctx := context.Background()

files, statFiles, err := GetAllFileNames(ctx, store, "evenly_distributed")
intest.Assert(err == nil)
err = store.DeleteFiles(ctx, files)
intest.Assert(err == nil)
err = store.DeleteFiles(ctx, statFiles)
intest.Assert(err == nil)

value := make([]byte, 100)
kvCnt := 0
for i := 0; i < fileCount; i++ {
builder := NewWriterBuilder().
SetMemorySizeLimit(uint64(float64(fileSize) * 1.1))
writer := builder.Build(
store,
"evenly_distributed",
fmt.Sprintf("%d", i),
)

keyIdx := i
totalSize := 0
for totalSize < fileSize {
key := fmt.Sprintf("key_%09d", keyIdx)
err := writer.WriteRow(ctx, []byte(key), value, nil)
intest.Assert(err == nil)
keyIdx += fileCount
totalSize += len(key) + len(value)
kvCnt++
}
err := writer.Close(ctx)
intest.Assert(err == nil)
}
return store, kvCnt
}

func readMergeIter(s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed")
intest.Assert(err == nil)

if s.beforeCreateReader != nil {
s.beforeCreateReader()
}

readBufSize := s.memoryLimit / len(files)
zeroOffsets := make([]uint64, len(files))
iter, err := NewMergeKVIter(ctx, files, zeroOffsets, s.store, readBufSize, false)
intest.Assert(err == nil)

kvCnt := 0
for iter.Next() {
kvCnt++
if kvCnt == s.totalKVCnt/2 {
if s.beforeReaderClose != nil {
s.beforeReaderClose()
}
}
}
intest.Assert(kvCnt == s.totalKVCnt)
err = iter.Close()
intest.Assert(err == nil)
if s.afterReaderClose != nil {
s.afterReaderClose()
}
}

func TestCompareReader(t *testing.T) {
fileSize := 50 * 1024 * 1024
fileCnt := 24
store, kvCnt := createEvenlyDistributedFiles(t, fileSize, fileCnt)
memoryLimit := 64 * 1024 * 1024
fileIdx := 0
var (
now time.Time
elapsed time.Duration
file *os.File
err error
)
beforeTest := func() {
fileIdx++
file, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", fileIdx))
intest.Assert(err == nil)
err = pprof.StartCPUProfile(file)
intest.Assert(err == nil)
now = time.Now()
}
beforeClose := func() {
file, err = os.Create(fmt.Sprintf("heap-profile-%d.prof", fileIdx))
intest.Assert(err == nil)
// check heap profile to see the memory usage is expected
err = pprof.WriteHeapProfile(file)
intest.Assert(err == nil)
}
afterClose := func() {
elapsed = time.Since(now)
pprof.StopCPUProfile()
}

suite := &readTestSuite{
store: store,
totalKVCnt: kvCnt,
concurrency: 100,
memoryLimit: memoryLimit,
beforeCreateReader: beforeTest,
beforeReaderClose: beforeClose,
afterReaderClose: afterClose,
}
readFileSequential(suite)
t.Logf(
"sequential read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)

readFileConcurrently(suite)
t.Logf(
"concurrent read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)

readMergeIter(suite)
t.Logf(
"merge iter read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)
}
Loading