Skip to content

Commit

Permalink
global sort: bench new merge step (#49595)
Browse files Browse the repository at this point in the history
ref #48952
  • Loading branch information
ywqzzy authored Dec 20, 2023
1 parent 18d6980 commit d84df2b
Showing 1 changed file with 87 additions and 14 deletions.
101 changes: 87 additions & 14 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,13 +577,14 @@ func createEvenlyDistributedFiles(
store storage.ExternalStorage,
fileSize, fileCount int,
subDir string,
) int {
) (int, kv.Key, kv.Key) {
ctx := context.Background()

cleanOldFiles(ctx, store, "/"+subDir)

value := make([]byte, 100)
kvCnt := 0
var minKey, maxKey kv.Key
for i := 0; i < fileCount; i++ {
builder := NewWriterBuilder().
SetBlockSize(10 * 1024 * 1024).
Expand All @@ -598,6 +599,13 @@ func createEvenlyDistributedFiles(
totalSize := 0
for totalSize < fileSize {
key := fmt.Sprintf("key_%09d", keyIdx)
if len(minKey) == 0 && len(maxKey) == 0 {
minKey = []byte(key)
maxKey = []byte(key)
} else {
minKey = BytesMin(minKey, []byte(key))
maxKey = BytesMax(maxKey, []byte(key))
}
err := writer.WriteRow(ctx, []byte(key), value, nil)
intest.AssertNoError(err)
keyIdx += fileCount
Expand All @@ -607,7 +615,7 @@ func createEvenlyDistributedFiles(
err := writer.Close(ctx)
intest.AssertNoError(err)
}
return kvCnt
return kvCnt, minKey, maxKey
}

func readMergeIter(t *testing.T, s *readTestSuite) {
Expand Down Expand Up @@ -656,7 +664,7 @@ func TestCompareReaderEvenlyDistributedContent(t *testing.T) {
subDir := "evenly_distributed"
store := openTestingStorage(t)

kvCnt := createEvenlyDistributedFiles(store, fileSize, fileCnt, subDir)
kvCnt, _, _ := createEvenlyDistributedFiles(store, fileSize, fileCnt, subDir)
memoryLimit := 64 * 1024 * 1024
fileIdx := 0
var (
Expand Down Expand Up @@ -722,14 +730,15 @@ func createAscendingFiles(
store storage.ExternalStorage,
fileSize, fileCount int,
subDir string,
) int {
) (int, kv.Key, kv.Key) {
ctx := context.Background()

cleanOldFiles(ctx, store, "/"+subDir)

keyIdx := 0
value := make([]byte, 100)
kvCnt := 0
var minKey, maxKey kv.Key
for i := 0; i < fileCount; i++ {
builder := NewWriterBuilder().
SetMemorySizeLimit(uint64(float64(fileSize) * 1.1))
Expand All @@ -740,18 +749,25 @@ func createAscendingFiles(
)

totalSize := 0
var key string
for totalSize < fileSize {
key := fmt.Sprintf("key_%09d", keyIdx)
key = fmt.Sprintf("key_%09d", keyIdx)
if i == 0 && totalSize == 0 {
minKey = []byte(key)
}
err := writer.WriteRow(ctx, []byte(key), value, nil)
intest.AssertNoError(err)
keyIdx++
totalSize += len(key) + len(value)
kvCnt++
}
if i == fileCount-1 {
maxKey = []byte(key)
}
err := writer.Close(ctx)
intest.AssertNoError(err)
}
return kvCnt
return kvCnt, minKey, maxKey
}

var (
Expand Down Expand Up @@ -784,12 +800,12 @@ func TestReadMergeIterWithoutCheckHotspot(t *testing.T) {

func testCompareReaderWithContent(
t *testing.T,
createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) int,
createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) (int, kv.Key, kv.Key),
fn func(t *testing.T, suite *readTestSuite)) {
store := openTestingStorage(t)
kvCnt := 0
if !*skipCreate {
kvCnt = createFn(store, *fileSize, *fileCount, *objectPrefix)
kvCnt, _, _ = createFn(store, *fileSize, *fileCount, *objectPrefix)
}
fileIdx := 0
var (
Expand Down Expand Up @@ -951,7 +967,55 @@ func mergeStep(t *testing.T, s *mergeTestSuite) {
}
elapsed := time.Since(now)
t.Logf(
"merge speed for %d bytes in %s, speed: %.2f MB/s",
"merge speed for %d bytes in %s with %d concurrency, speed: %.2f MB/s",
totalSize.Load(),
elapsed,
s.concurrency,
float64(totalSize.Load())/elapsed.Seconds()/1024/1024,
)
}

func newMergeStep(t *testing.T, s *mergeTestSuite) {
ctx := context.Background()
datas, stats, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
intest.AssertNoError(err)

mergeOutput := "merge_output"
totalSize := atomic.NewUint64(0)
onClose := func(s *WriterSummary) {
totalSize.Add(s.TotalSize)
}
if s.beforeMerge != nil {
s.beforeMerge()
}

now := time.Now()
err = MergeOverlappingFilesV2(
ctx,
datas,
stats,
s.store,
s.minKey,
s.maxKey.Next(),
int64(5*size.MB),
mergeOutput,
"test",
DefaultBlockSize,
8*1024,
1*size.MB,
8*1024,
onClose,
s.concurrency,
s.mergeIterHotspot,
)

intest.AssertNoError(err)
if s.afterMerge != nil {
s.afterMerge()
}
elapsed := time.Since(now)
t.Logf(
"new merge speed for %d bytes in %s, speed: %.2f MB/s",
totalSize.Load(),
elapsed,
float64(totalSize.Load())/elapsed.Seconds()/1024/1024,
Expand All @@ -960,13 +1024,14 @@ func mergeStep(t *testing.T, s *mergeTestSuite) {

func testCompareMergeWithContent(
t *testing.T,
createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) int,
concurrency int,
createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) (int, kv.Key, kv.Key),
fn func(t *testing.T, suite *mergeTestSuite)) {
store := openTestingStorage(t)
kvCnt := 0
var minKey, maxKey kv.Key
if !*skipCreate {
kvCnt = createFn(store, *fileSize, *fileCount, *objectPrefix)
kvCnt, minKey, maxKey = createFn(store, *fileSize, *fileCount, *objectPrefix)
}

fileIdx := 0
Expand All @@ -988,7 +1053,7 @@ func testCompareMergeWithContent(
suite := &mergeTestSuite{
store: store,
totalKVCnt: kvCnt,
concurrency: *concurrency,
concurrency: concurrency,
memoryLimit: *memoryLimit,
beforeMerge: beforeTest,
afterMerge: afterTest,
Expand All @@ -1002,6 +1067,14 @@ func testCompareMergeWithContent(
}

func TestMergeBench(t *testing.T) {
testCompareMergeWithContent(t, createAscendingFiles, mergeStep)
testCompareMergeWithContent(t, createEvenlyDistributedFiles, mergeStep)
testCompareMergeWithContent(t, 1, createAscendingFiles, mergeStep)
testCompareMergeWithContent(t, 1, createEvenlyDistributedFiles, mergeStep)
testCompareMergeWithContent(t, 2, createAscendingFiles, mergeStep)
testCompareMergeWithContent(t, 2, createEvenlyDistributedFiles, mergeStep)
testCompareMergeWithContent(t, 4, createAscendingFiles, mergeStep)
testCompareMergeWithContent(t, 4, createEvenlyDistributedFiles, mergeStep)
testCompareMergeWithContent(t, 8, createAscendingFiles, mergeStep)
testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, mergeStep)
testCompareMergeWithContent(t, 8, createAscendingFiles, newMergeStep)
testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, newMergeStep)
}

0 comments on commit d84df2b

Please sign in to comment.