Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

global sort: bench new merge step #49595

Merged
merged 4 commits into from
Dec 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 87 additions & 14 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,13 +577,14 @@ func createEvenlyDistributedFiles(
store storage.ExternalStorage,
fileSize, fileCount int,
subDir string,
) int {
) (int, kv.Key, kv.Key) {
ctx := context.Background()

cleanOldFiles(ctx, store, "/"+subDir)

value := make([]byte, 100)
kvCnt := 0
var minKey, maxKey kv.Key
for i := 0; i < fileCount; i++ {
builder := NewWriterBuilder().
SetBlockSize(10 * 1024 * 1024).
Expand All @@ -598,6 +599,13 @@ func createEvenlyDistributedFiles(
totalSize := 0
for totalSize < fileSize {
key := fmt.Sprintf("key_%09d", keyIdx)
if len(minKey) == 0 && len(maxKey) == 0 {
minKey = []byte(key)
maxKey = []byte(key)
} else {
minKey = BytesMin(minKey, []byte(key))
maxKey = BytesMax(maxKey, []byte(key))
}
err := writer.WriteRow(ctx, []byte(key), value, nil)
intest.AssertNoError(err)
keyIdx += fileCount
Expand All @@ -607,7 +615,7 @@ func createEvenlyDistributedFiles(
err := writer.Close(ctx)
intest.AssertNoError(err)
}
return kvCnt
return kvCnt, minKey, maxKey
}

func readMergeIter(t *testing.T, s *readTestSuite) {
Expand Down Expand Up @@ -656,7 +664,7 @@ func TestCompareReaderEvenlyDistributedContent(t *testing.T) {
subDir := "evenly_distributed"
store := openTestingStorage(t)

kvCnt := createEvenlyDistributedFiles(store, fileSize, fileCnt, subDir)
kvCnt, _, _ := createEvenlyDistributedFiles(store, fileSize, fileCnt, subDir)
memoryLimit := 64 * 1024 * 1024
fileIdx := 0
var (
Expand Down Expand Up @@ -722,14 +730,15 @@ func createAscendingFiles(
store storage.ExternalStorage,
fileSize, fileCount int,
subDir string,
) int {
) (int, kv.Key, kv.Key) {
ctx := context.Background()

cleanOldFiles(ctx, store, "/"+subDir)

keyIdx := 0
value := make([]byte, 100)
kvCnt := 0
var minKey, maxKey kv.Key
for i := 0; i < fileCount; i++ {
builder := NewWriterBuilder().
SetMemorySizeLimit(uint64(float64(fileSize) * 1.1))
Expand All @@ -740,18 +749,25 @@ func createAscendingFiles(
)

totalSize := 0
var key string
for totalSize < fileSize {
key := fmt.Sprintf("key_%09d", keyIdx)
key = fmt.Sprintf("key_%09d", keyIdx)
if i == 0 && totalSize == 0 {
minKey = []byte(key)
}
err := writer.WriteRow(ctx, []byte(key), value, nil)
intest.AssertNoError(err)
keyIdx++
totalSize += len(key) + len(value)
kvCnt++
}
if i == fileCount-1 {
maxKey = []byte(key)
}
err := writer.Close(ctx)
intest.AssertNoError(err)
}
return kvCnt
return kvCnt, minKey, maxKey
}

var (
Expand Down Expand Up @@ -784,12 +800,12 @@ func TestReadMergeIterWithoutCheckHotspot(t *testing.T) {

func testCompareReaderWithContent(
t *testing.T,
createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) int,
createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) (int, kv.Key, kv.Key),
fn func(t *testing.T, suite *readTestSuite)) {
store := openTestingStorage(t)
kvCnt := 0
if !*skipCreate {
kvCnt = createFn(store, *fileSize, *fileCount, *objectPrefix)
kvCnt, _, _ = createFn(store, *fileSize, *fileCount, *objectPrefix)
}
fileIdx := 0
var (
Expand Down Expand Up @@ -951,7 +967,55 @@ func mergeStep(t *testing.T, s *mergeTestSuite) {
}
elapsed := time.Since(now)
t.Logf(
"merge speed for %d bytes in %s, speed: %.2f MB/s",
"merge speed for %d bytes in %s with %d concurrency, speed: %.2f MB/s",
totalSize.Load(),
elapsed,
s.concurrency,
float64(totalSize.Load())/elapsed.Seconds()/1024/1024,
)
}

func newMergeStep(t *testing.T, s *mergeTestSuite) {
ctx := context.Background()
datas, stats, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
intest.AssertNoError(err)

mergeOutput := "merge_output"
totalSize := atomic.NewUint64(0)
onClose := func(s *WriterSummary) {
totalSize.Add(s.TotalSize)
}
if s.beforeMerge != nil {
s.beforeMerge()
}

now := time.Now()
err = MergeOverlappingFilesV2(
ctx,
datas,
stats,
s.store,
s.minKey,
s.maxKey.Next(),
int64(5*size.MB),
mergeOutput,
"test",
DefaultBlockSize,
8*1024,
1*size.MB,
8*1024,
onClose,
s.concurrency,
s.mergeIterHotspot,
)

intest.AssertNoError(err)
if s.afterMerge != nil {
s.afterMerge()
}
elapsed := time.Since(now)
t.Logf(
"new merge speed for %d bytes in %s, speed: %.2f MB/s",
totalSize.Load(),
elapsed,
float64(totalSize.Load())/elapsed.Seconds()/1024/1024,
Expand All @@ -960,13 +1024,14 @@ func mergeStep(t *testing.T, s *mergeTestSuite) {

func testCompareMergeWithContent(
t *testing.T,
createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) int,
concurrency int,
createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) (int, kv.Key, kv.Key),
fn func(t *testing.T, suite *mergeTestSuite)) {
store := openTestingStorage(t)
kvCnt := 0
var minKey, maxKey kv.Key
if !*skipCreate {
kvCnt = createFn(store, *fileSize, *fileCount, *objectPrefix)
kvCnt, minKey, maxKey = createFn(store, *fileSize, *fileCount, *objectPrefix)
}

fileIdx := 0
Expand All @@ -988,7 +1053,7 @@ func testCompareMergeWithContent(
suite := &mergeTestSuite{
store: store,
totalKVCnt: kvCnt,
concurrency: *concurrency,
concurrency: concurrency,
memoryLimit: *memoryLimit,
beforeMerge: beforeTest,
afterMerge: afterTest,
Expand All @@ -1002,6 +1067,14 @@ func testCompareMergeWithContent(
}

func TestMergeBench(t *testing.T) {
testCompareMergeWithContent(t, createAscendingFiles, mergeStep)
testCompareMergeWithContent(t, createEvenlyDistributedFiles, mergeStep)
testCompareMergeWithContent(t, 1, createAscendingFiles, mergeStep)
testCompareMergeWithContent(t, 1, createEvenlyDistributedFiles, mergeStep)
testCompareMergeWithContent(t, 2, createAscendingFiles, mergeStep)
testCompareMergeWithContent(t, 2, createEvenlyDistributedFiles, mergeStep)
testCompareMergeWithContent(t, 4, createAscendingFiles, mergeStep)
testCompareMergeWithContent(t, 4, createEvenlyDistributedFiles, mergeStep)
testCompareMergeWithContent(t, 8, createAscendingFiles, mergeStep)
testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, mergeStep)
testCompareMergeWithContent(t, 8, createAscendingFiles, newMergeStep)
testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, newMergeStep)
}
Loading