Skip to content

Commit

Permalink
Merge branch 'master' into intersect_index_merge
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Nov 29, 2022
2 parents 5cfbe24 + 9689b47 commit b1a2ef7
Show file tree
Hide file tree
Showing 54 changed files with 2,055 additions and 1,011 deletions.
14 changes: 7 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare

bazel_build: bazel_ci_prepare
mkdir -p bin
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) --remote_download_minimal \
//... --//build:with_nogo_flag=true
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
//cmd/importer:importer //tidb-server:tidb-server //tidb-server:tidb-server-check --//build:with_nogo_flag=true
Expand All @@ -442,27 +442,27 @@ bazel_golangcilinter:
-- run $$($(PACKAGE_DIRECTORIES)) --config ./.golangci.yaml

bazel_brietest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
-- //tests/realtikvtest/brietest/...

bazel_pessimistictest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
-- //tests/realtikvtest/pessimistictest/...

bazel_sessiontest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
-- //tests/realtikvtest/sessiontest/...

bazel_statisticstest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
-- //tests/realtikvtest/statisticstest/...

bazel_txntest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
-- //tests/realtikvtest/txntest/...

bazel_addindextest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
-- //tests/realtikvtest/addindextest/...

bazel_lint: bazel_prepare
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,12 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
ranges = ranger.FullIntRange(false)
}

retRanges := make([]kv.KeyRange, 0, 1+len(tbl.Indices))
kvRanges, err := distsql.TableHandleRangesToKVRanges(nil, []int64{tblID}, tbl.IsCommonHandle, ranges, nil)
if err != nil {
return nil, errors.Trace(err)
}
retRanges = kvRanges.AppendSelfTo(retRanges)

for _, index := range tbl.Indices {
if index.State != model.StatePublic {
Expand All @@ -304,9 +306,9 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
if err != nil {
return nil, errors.Trace(err)
}
kvRanges = append(kvRanges, idxRanges...)
retRanges = idxRanges.AppendSelfTo(retRanges)
}
return kvRanges, nil
return retRanges, nil
}

// BuildBackupRangeAndSchema gets KV range and schema of tables.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/checksum/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestChecksum(t *testing.T) {
first = false
ranges, err := backup.BuildTableRanges(tableInfo3)
require.NoError(t, err)
require.Equalf(t, ranges[:1], req.KeyRanges, "%v", req.KeyRanges)
require.Equalf(t, ranges[:1], req.KeyRanges.FirstPartitionRange(), "%v", req.KeyRanges.FirstPartitionRange())
}
return nil
}))
Expand Down
66 changes: 30 additions & 36 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func physicalTableIDs(tableInfo *model.TableInfo) []int64 {
}

// tableHandleKeyRanges returns all key ranges associated with the tableInfo.
func tableHandleKeyRanges(tableInfo *model.TableInfo) ([]tidbkv.KeyRange, error) {
func tableHandleKeyRanges(tableInfo *model.TableInfo) (*tidbkv.KeyRanges, error) {
ranges := ranger.FullIntRange(false)
if tableInfo.IsCommonHandle {
ranges = ranger.FullRange()
Expand All @@ -221,18 +221,9 @@ func tableHandleKeyRanges(tableInfo *model.TableInfo) ([]tidbkv.KeyRange, error)
}

// tableIndexKeyRanges returns all key ranges associated with the tableInfo and indexInfo.
func tableIndexKeyRanges(tableInfo *model.TableInfo, indexInfo *model.IndexInfo) ([]tidbkv.KeyRange, error) {
func tableIndexKeyRanges(tableInfo *model.TableInfo, indexInfo *model.IndexInfo) (*tidbkv.KeyRanges, error) {
tableIDs := physicalTableIDs(tableInfo)
//nolint: prealloc
var keyRanges []tidbkv.KeyRange
for _, tid := range tableIDs {
partitionKeysRanges, err := distsql.IndexRangesToKVRanges(nil, tid, indexInfo.ID, ranger.FullRange(), nil)
if err != nil {
return nil, errors.Trace(err)
}
keyRanges = append(keyRanges, partitionKeysRanges...)
}
return keyRanges, nil
return distsql.IndexRangesToKVRangesForTables(nil, tableIDs, indexInfo.ID, ranger.FullRange(), nil)
}

// DupKVStream is a streaming interface for collecting duplicate key-value pairs.
Expand Down Expand Up @@ -561,14 +552,20 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) {
if err != nil {
return nil, errors.Trace(err)
}
tasks := make([]dupTask, 0, len(keyRanges))
for _, kr := range keyRanges {
tableID := tablecodec.DecodeTableID(kr.StartKey)
tasks = append(tasks, dupTask{
KeyRange: kr,
tableID: tableID,
})
tasks := make([]dupTask, 0, keyRanges.TotalRangeNum()*(1+len(m.tbl.Meta().Indices)))
putToTaskFunc := func(ranges []tidbkv.KeyRange) {
if len(ranges) == 0 {
return
}
tid := tablecodec.DecodeTableID(ranges[0].StartKey)
for _, r := range ranges {
tasks = append(tasks, dupTask{
KeyRange: r,
tableID: tid,
})
}
}
keyRanges.ForEachPartition(putToTaskFunc)
for _, indexInfo := range m.tbl.Meta().Indices {
if indexInfo.State != model.StatePublic {
continue
Expand All @@ -577,14 +574,7 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) {
if err != nil {
return nil, errors.Trace(err)
}
for _, kr := range keyRanges {
tableID := tablecodec.DecodeTableID(kr.StartKey)
tasks = append(tasks, dupTask{
KeyRange: kr,
tableID: tableID,
indexInfo: indexInfo,
})
}
keyRanges.ForEachPartition(putToTaskFunc)
}
return tasks, nil
}
Expand All @@ -598,15 +588,19 @@ func (m *DuplicateManager) buildIndexDupTasks() ([]dupTask, error) {
if err != nil {
return nil, errors.Trace(err)
}
tasks := make([]dupTask, 0, len(keyRanges))
for _, kr := range keyRanges {
tableID := tablecodec.DecodeTableID(kr.StartKey)
tasks = append(tasks, dupTask{
KeyRange: kr,
tableID: tableID,
indexInfo: indexInfo,
})
}
tasks := make([]dupTask, 0, keyRanges.TotalRangeNum())
keyRanges.ForEachPartition(func(ranges []tidbkv.KeyRange) {
if len(ranges) == 0 {
return
}
tid := tablecodec.DecodeTableID(ranges[0].StartKey)
for _, r := range ranges {
tasks = append(tasks, dupTask{
KeyRange: r,
tableID: tid,
})
}
})
return tasks, nil
}
return nil, nil
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,13 @@ func (rng StringifyRange) String() string {
sb.WriteString(")")
return sb.String()
}

// StringifyMany returns an array marshaler for a slice of stringers.
func StringifyMany[T fmt.Stringer](items []T) zapcore.ArrayMarshaler {
return zapcore.ArrayMarshalerFunc(func(ae zapcore.ArrayEncoder) error {
for _, item := range items {
ae.AppendString(item.String())
}
return nil
})
}
6 changes: 2 additions & 4 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ go_library(
"models.go",
"prefix_scanner.go",
"regioniter.go",
"tsheap.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/streamhelper",
visibility = ["//visibility:public"],
Expand All @@ -21,6 +20,7 @@ go_library(
"//br/pkg/logutil",
"//br/pkg/redact",
"//br/pkg/streamhelper/config",
"//br/pkg/streamhelper/spans",
"//br/pkg/utils",
"//config",
"//kv",
Expand All @@ -29,7 +29,6 @@ go_library(
"//util/mathutil",
"@com_github_gogo_protobuf//proto",
"@com_github_golang_protobuf//proto",
"@com_github_google_btree//:btree",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
Expand All @@ -44,7 +43,6 @@ go_library(
"@org_golang_google_grpc//keepalive",
"@org_golang_x_sync//errgroup",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)

Expand All @@ -56,7 +54,6 @@ go_test(
"basic_lib_for_test.go",
"integration_test.go",
"regioniter_test.go",
"tsheap_test.go",
],
flaky = True,
race = "on",
Expand All @@ -68,6 +65,7 @@ go_test(
"//br/pkg/redact",
"//br/pkg/storage",
"//br/pkg/streamhelper/config",
"//br/pkg/streamhelper/spans",
"//br/pkg/utils",
"//kv",
"//tablecodec",
Expand Down
Loading

0 comments on commit b1a2ef7

Please sign in to comment.