Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, lightning: integrate operator for global sort #46734

Merged
merged 35 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b82492e
lightning: add ExternalEngineConfig to create an external engine
lance6716 Sep 4, 2023
ba62aee
add two more config
lance6716 Sep 4, 2023
fc7e903
fix UT
lance6716 Sep 4, 2023
3a5da7a
fix bazel
lance6716 Sep 5, 2023
1ede83e
fix lint
lance6716 Sep 5, 2023
0c60d4c
add UT
lance6716 Sep 5, 2023
faae105
fix sec lint
lance6716 Sep 5, 2023
549d0da
fix bazel
lance6716 Sep 5, 2023
2c6257b
address comment
lance6716 Sep 5, 2023
ea7494d
have to use failpoint
lance6716 Sep 5, 2023
7f7bef5
fix CI
lance6716 Sep 5, 2023
4a89257
address comment
lance6716 Sep 6, 2023
f6de222
switch to failpoint
lance6716 Sep 7, 2023
820326d
Merge branch 'master' of github.com:pingcap/tidb into external-config
lance6716 Sep 7, 2023
83ec65d
ddl, lightning: integrate operator for global sort
tangenta Sep 7, 2023
6f098c4
update bazel
tangenta Sep 7, 2023
be3e412
use jobID as external storage prefix
tangenta Sep 7, 2023
feb25b1
Merge remote-tracking branch 'upstream/master' into global-sort-operator
tangenta Sep 7, 2023
8791caa
fix file prefix and linter
tangenta Sep 8, 2023
e093425
fix linter
tangenta Sep 8, 2023
7ecb83f
use the same writer for each index
tangenta Sep 11, 2023
c7fee41
fix GetRegionSplitSizeKeys
tangenta Sep 11, 2023
37d9fd4
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Sep 11, 2023
45d24c9
use one write worker for global sort
tangenta Sep 11, 2023
c06da77
remove unnecessary changes
tangenta Sep 11, 2023
395bd5f
unregister backend ctx after merge sort
tangenta Sep 11, 2023
7849e2e
ddl: get all file names from subtask meta
tangenta Sep 11, 2023
ba083ae
address comments
tangenta Sep 11, 2023
5aa5e09
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Sep 11, 2023
5c209b5
apply tidb_cloud_storage_uri
tangenta Sep 11, 2023
56a7378
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Sep 12, 2023
271d1fd
fix (#127)
ywqzzy Sep 12, 2023
d9e6bce
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Sep 12, 2023
a19bd15
collect min max key correctly
tangenta Sep 12, 2023
5ab3433
add comment
tangenta Sep 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/membuf",
Expand Down Expand Up @@ -55,7 +52,6 @@ go_test(
flaky = True,
shard_count = 31,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/storage",
"//kv",
Expand Down
12 changes: 7 additions & 5 deletions br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
"context"
"path"
"slices"
"strconv"
"testing"
"time"

"github.com/cockroachdb/pebble"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -62,12 +62,14 @@ func TestIter(t *testing.T) {
SetMemorySizeLimit(uint64(rand.Intn(100)+1)).
SetPropSizeDistance(uint64(rand.Intn(50)+1)).
SetPropKeysDistance(uint64(rand.Intn(10)+1)).
Build(store, "/subtask", i)
Build(store, "/subtask", strconv.Itoa(i))
kvStart := i * 100
kvEnd := (i + 1) * 100
err := w.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs[kvStart:kvEnd]))
require.NoError(t, err)
_, err = w.Close(ctx)
for j := kvStart; j < kvEnd; j++ {
err := w.WriteRow(ctx, kvPairs[j].Key, kvPairs[j].Val, nil)
require.NoError(t, err)
}
err := w.Close(ctx)
require.NoError(t, err)
}

Expand Down
11 changes: 4 additions & 7 deletions br/pkg/lightning/backend/external/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import (
type KeyValueStore struct {
dataWriter storage.ExternalFileWriter

rc *rangePropertiesCollector
ctx context.Context
writerID int
seq int
offset uint64
rc *rangePropertiesCollector
ctx context.Context
seq int
offset uint64
}

// NewKeyValueStore creates a new KeyValueStore. The data will be written to the
Expand All @@ -39,14 +38,12 @@ func NewKeyValueStore(
ctx context.Context,
dataWriter storage.ExternalFileWriter,
rangePropertiesCollector *rangePropertiesCollector,
writerID int,
seq int,
) (*KeyValueStore, error) {
kvStore := &KeyValueStore{
dataWriter: dataWriter,
ctx: ctx,
rc: rangePropertiesCollector,
writerID: writerID,
seq: seq,
}
return kvStore, nil
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/external/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
}
rc.reset()
initRC := *rc
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1)
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1)
require.NoError(t, err)

require.Equal(t, &initRC, rc)
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
propKeysDist: 100,
}
rc.reset()
kvStore, err = NewKeyValueStore(ctx, writer, rc, 2, 2)
kvStore, err = NewKeyValueStore(ctx, writer, rc, 2)
require.NoError(t, err)
err = kvStore.AddKeyValue(k1, v1)
require.NoError(t, err)
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestKVReadWrite(t *testing.T) {
propKeysDist: 2,
}
rc.reset()
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1)
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1)
require.NoError(t, err)

kvCnt := rand.Intn(10) + 10
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type mergeIter[T heapElem, R sortedReader[T]] struct {
}

// readerOpenerFn is a function that opens a sorted reader.
type readerOpenerFn[T heapElem, R sortedReader[T]] func(ctx context.Context) (*R, error)
type readerOpenerFn[T heapElem, R sortedReader[T]] func() (*R, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we uses a pointer to interface as return type? @lance6716

Copy link
Contributor

@lance6716 lance6716 Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

R can be a struct who implements sortedReader[T], not an interface. For example, R is kvReaderProxy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugly to have pointers to interface

we should unify using one way. and in this case, the implementer should know the result doing it, i.e. the struct might be copied around.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember one problem is the zero value of R maybe ready to use. so I need nil to distinguish them. will check it tomorrow.

Copy link
Contributor

@lance6716 lance6716 Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can change *R to R later, and also let mergeIter use readers []R, readerClosed []bool instead of readers []*R. Because R may not be "comparable"


// newMergeIter creates a merge iterator for multiple sorted reader opener
// functions.
Expand All @@ -104,12 +104,12 @@ func newMergeIter[
}

// Open readers in parallel.
wg, wgCtx := errgroup.WithContext(ctx)
wg := errgroup.Group{}
for i, f := range readerOpeners {
i := i
f := f
wg.Go(func() error {
rd, err := f(wgCtx)
rd, err := f()
switch err {
case nil:
case io.EOF:
Expand Down Expand Up @@ -278,7 +278,7 @@ func NewMergeKVIter(

for i := range paths {
i := i
readerOpeners = append(readerOpeners, func(ctx context.Context) (*kvReaderProxy, error) {
readerOpeners = append(readerOpeners, func() (*kvReaderProxy, error) {
rd, err := newKVReader(ctx, paths[i], exStorage, pathsStartOffset[i], readBufferSize)
if err != nil {
return nil, err
Expand Down Expand Up @@ -351,7 +351,7 @@ func NewMergePropIter(
readerOpeners := make([]readerOpenerFn[*rangeProperty, statReaderProxy], 0, len(paths))
for i := range paths {
i := i
readerOpeners = append(readerOpeners, func(ctx context.Context) (*statReaderProxy, error) {
readerOpeners = append(readerOpeners, func() (*statReaderProxy, error) {
rd, err := newStatsReader(ctx, exStorage, paths[i], 4096)
if err != nil {
return nil, err
Expand Down
12 changes: 6 additions & 6 deletions br/pkg/lightning/backend/external/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestMergeKVIter(t *testing.T) {
propKeysDist: 2,
}
rc.reset()
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1)
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1)
require.NoError(t, err)
for _, kv := range data[i] {
err = kvStore.AddKeyValue([]byte(kv[0]), []byte(kv[1]))
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestOneUpstream(t *testing.T) {
propKeysDist: 2,
}
rc.reset()
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1)
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1)
require.NoError(t, err)
for _, kv := range data[i] {
err = kvStore.AddKeyValue([]byte(kv[0]), []byte(kv[1]))
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestCorruptContent(t *testing.T) {
propKeysDist: 2,
}
rc.reset()
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1)
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1)
require.NoError(t, err)
for _, kv := range data[i] {
err = kvStore.AddKeyValue([]byte(kv[0]), []byte(kv[1]))
Expand Down Expand Up @@ -254,7 +254,7 @@ func generateMockFileReader() *kvReader {
propKeysDist: 2,
}
rc.reset()
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1)
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func BenchmarkValueT(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
rd := generateMockFileReader()
opener := func(ctx context.Context) (*kvReaderProxy, error) {
opener := func() (*kvReaderProxy, error) {
return &kvReaderProxy{r: rd}, nil
}
it, err := newMergeIter[kvPair, kvReaderProxy](ctx, []readerOpenerFn[kvPair, kvReaderProxy]{opener})
Expand Down Expand Up @@ -323,7 +323,7 @@ func BenchmarkPointerT(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
rd := generateMockFileReader()
opener := func(ctx context.Context) (*kvReaderPointerProxy, error) {
opener := func() (*kvReaderPointerProxy, error) {
return &kvReaderPointerProxy{r: rd}, nil
}
it, err := newMergeIter[*kvPair, kvReaderPointerProxy](ctx, []readerOpenerFn[*kvPair, kvReaderPointerProxy]{opener})
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/external/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestOnlyOneGroup(t *testing.T) {
SetMemorySizeLimit(15).
SetPropSizeDistance(1).
SetPropKeysDistance(1).
Build(memStore, subDir, 5)
Build(memStore, subDir, "5")

dataFiles, statFiles, err := MockExternalEngineWithWriter(memStore, writer, subDir, [][]byte{{1}, {2}}, [][]byte{{1}, {2}})
require.NoError(t, err)
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestRangeSplitterStrictCase(t *testing.T) {
SetMemorySizeLimit(15). // slightly larger than len("key01") + len("value01")
SetPropSizeDistance(1).
SetPropKeysDistance(1).
Build(memStore, subDir, 1)
Build(memStore, subDir, "1")
keys1 := [][]byte{
[]byte("key01"), []byte("key11"), []byte("key21"),
}
Expand All @@ -196,7 +196,7 @@ func TestRangeSplitterStrictCase(t *testing.T) {
SetMemorySizeLimit(15).
SetPropSizeDistance(1).
SetPropKeysDistance(1).
Build(memStore, subDir, 2)
Build(memStore, subDir, "2")
keys2 := [][]byte{
[]byte("key02"), []byte("key12"), []byte("key22"),
}
Expand All @@ -212,7 +212,7 @@ func TestRangeSplitterStrictCase(t *testing.T) {
SetMemorySizeLimit(15).
SetPropSizeDistance(1).
SetPropKeysDistance(1).
Build(memStore, subDir, 3)
Build(memStore, subDir, "3")
keys3 := [][]byte{
[]byte("key03"), []byte("key13"), []byte("key23"),
}
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestExactlyKeyNum(t *testing.T) {
SetMemorySizeLimit(15).
SetPropSizeDistance(1).
SetPropKeysDistance(1).
Build(memStore, subDir, 5)
Build(memStore, subDir, "5")

dataFiles, statFiles, err := MockExternalEngineWithWriter(memStore, writer, subDir, keys, values)
require.NoError(t, err)
Expand Down
18 changes: 6 additions & 12 deletions br/pkg/lightning/backend/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"sort"
"strings"

kv2 "github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -170,7 +168,7 @@ func MockExternalEngine(
SetMemorySizeLimit(128).
SetPropSizeDistance(32).
SetPropKeysDistance(4).
Build(storage, "/mock-test", 0)
Build(storage, "/mock-test", "0")
return MockExternalEngineWithWriter(storage, writer, subDir, keys, values)
}

Expand All @@ -184,17 +182,13 @@ func MockExternalEngineWithWriter(
values [][]byte,
) (dataFiles []string, statsFiles []string, err error) {
ctx := context.Background()
kvs := make([]common.KvPair, len(keys))
for i := range keys {
kvs[i].Key = keys[i]
kvs[i].Val = values[i]
}
rows := kv2.MakeRowsFromKvPairs(kvs)
err = writer.AppendRows(ctx, nil, rows)
if err != nil {
return nil, nil, err
err := writer.WriteRow(ctx, keys[i], values[i], nil)
if err != nil {
return nil, nil, err
}
}
_, err = writer.Close(ctx)
err = writer.Close(ctx)
if err != nil {
return nil, nil, err
}
Expand Down
61 changes: 34 additions & 27 deletions br/pkg/lightning/backend/external/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"context"
"testing"

"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -126,37 +124,45 @@ func TestGetAllFileNames(t *testing.T) {
SetMemorySizeLimit(20).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, "/subtask", 0)
kvPairs := make([]common.KvPair, 0, 30)
Build(store, "/subtask", "0")

keys := make([][]byte, 0, 30)
values := make([][]byte, 0, 30)
for i := 0; i < 30; i++ {
kvPairs = append(kvPairs, common.KvPair{
Key: []byte{byte(i)},
Val: []byte{byte(i)},
})
keys = append(keys, []byte{byte(i)})
values = append(values, []byte{byte(i)})
}
err := w.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs))
require.NoError(t, err)
_, err = w.Close(ctx)

for i, key := range keys {
err := w.WriteRow(ctx, key, values[i], nil)
require.NoError(t, err)
}
err := w.Close(ctx)
require.NoError(t, err)

w2 := NewWriterBuilder().
SetMemorySizeLimit(20).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, "/subtask", 3)
err = w2.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs))
Build(store, "/subtask", "3")
for i, key := range keys {
err := w2.WriteRow(ctx, key, values[i], nil)
require.NoError(t, err)
}
require.NoError(t, err)
_, err = w2.Close(ctx)
err = w2.Close(ctx)
require.NoError(t, err)

w3 := NewWriterBuilder().
SetMemorySizeLimit(20).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, "/subtask", 12)
err = w3.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs))
require.NoError(t, err)
_, err = w3.Close(ctx)
Build(store, "/subtask", "12")
for i, key := range keys {
err := w3.WriteRow(ctx, key, values[i], nil)
require.NoError(t, err)
}
err = w3.Close(ctx)
require.NoError(t, err)

dataFiles, statFiles, err := GetAllFileNames(ctx, store, "/subtask")
Expand All @@ -180,17 +186,18 @@ func TestCleanUpFiles(t *testing.T) {
SetMemorySizeLimit(20).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, "/subtask", 0)
kvPairs := make([]common.KvPair, 0, 30)
Build(store, "/subtask", "0")
keys := make([][]byte, 0, 30)
values := make([][]byte, 0, 30)
for i := 0; i < 30; i++ {
kvPairs = append(kvPairs, common.KvPair{
Key: []byte{byte(i)},
Val: []byte{byte(i)},
})
keys = append(keys, []byte{byte(i)})
values = append(values, []byte{byte(i)})
}
err := w.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs))
require.NoError(t, err)
_, err = w.Close(ctx)
for i, key := range keys {
err := w.WriteRow(ctx, key, values[i], nil)
require.NoError(t, err)
}
err := w.Close(ctx)
require.NoError(t, err)

dataFiles, statFiles, err := GetAllFileNames(ctx, store, "/subtask")
Expand Down
Loading