Skip to content

Commit

Permalink
planner: support push part of order property down to the partition ta…
Browse files Browse the repository at this point in the history
…ble (#36108)

ref #26166
  • Loading branch information
winoros authored Nov 29, 2022
1 parent e618d86 commit f9a6e47
Show file tree
Hide file tree
Showing 28 changed files with 719 additions and 187 deletions.
6 changes: 4 additions & 2 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,12 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
ranges = ranger.FullIntRange(false)
}

retRanges := make([]kv.KeyRange, 0, 1+len(tbl.Indices))
kvRanges, err := distsql.TableHandleRangesToKVRanges(nil, []int64{tblID}, tbl.IsCommonHandle, ranges, nil)
if err != nil {
return nil, errors.Trace(err)
}
retRanges = kvRanges.AppendSelfTo(retRanges)

for _, index := range tbl.Indices {
if index.State != model.StatePublic {
Expand All @@ -304,9 +306,9 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
if err != nil {
return nil, errors.Trace(err)
}
kvRanges = append(kvRanges, idxRanges...)
retRanges = idxRanges.AppendSelfTo(retRanges)
}
return kvRanges, nil
return retRanges, nil
}

// BuildBackupRangeAndSchema gets KV range and schema of tables.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/checksum/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestChecksum(t *testing.T) {
first = false
ranges, err := backup.BuildTableRanges(tableInfo3)
require.NoError(t, err)
require.Equalf(t, ranges[:1], req.KeyRanges, "%v", req.KeyRanges)
require.Equalf(t, ranges[:1], req.KeyRanges.FirstPartitionRange(), "%v", req.KeyRanges.FirstPartitionRange())
}
return nil
}))
Expand Down
66 changes: 30 additions & 36 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func physicalTableIDs(tableInfo *model.TableInfo) []int64 {
}

// tableHandleKeyRanges returns all key ranges associated with the tableInfo.
func tableHandleKeyRanges(tableInfo *model.TableInfo) ([]tidbkv.KeyRange, error) {
func tableHandleKeyRanges(tableInfo *model.TableInfo) (*tidbkv.KeyRanges, error) {
ranges := ranger.FullIntRange(false)
if tableInfo.IsCommonHandle {
ranges = ranger.FullRange()
Expand All @@ -221,18 +221,9 @@ func tableHandleKeyRanges(tableInfo *model.TableInfo) ([]tidbkv.KeyRange, error)
}

// tableIndexKeyRanges returns all key ranges associated with the tableInfo and indexInfo.
func tableIndexKeyRanges(tableInfo *model.TableInfo, indexInfo *model.IndexInfo) ([]tidbkv.KeyRange, error) {
func tableIndexKeyRanges(tableInfo *model.TableInfo, indexInfo *model.IndexInfo) (*tidbkv.KeyRanges, error) {
tableIDs := physicalTableIDs(tableInfo)
//nolint: prealloc
var keyRanges []tidbkv.KeyRange
for _, tid := range tableIDs {
partitionKeysRanges, err := distsql.IndexRangesToKVRanges(nil, tid, indexInfo.ID, ranger.FullRange(), nil)
if err != nil {
return nil, errors.Trace(err)
}
keyRanges = append(keyRanges, partitionKeysRanges...)
}
return keyRanges, nil
return distsql.IndexRangesToKVRangesForTables(nil, tableIDs, indexInfo.ID, ranger.FullRange(), nil)
}

// DupKVStream is a streaming interface for collecting duplicate key-value pairs.
Expand Down Expand Up @@ -561,14 +552,20 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) {
if err != nil {
return nil, errors.Trace(err)
}
tasks := make([]dupTask, 0, len(keyRanges))
for _, kr := range keyRanges {
tableID := tablecodec.DecodeTableID(kr.StartKey)
tasks = append(tasks, dupTask{
KeyRange: kr,
tableID: tableID,
})
tasks := make([]dupTask, 0, keyRanges.TotalRangeNum()*(1+len(m.tbl.Meta().Indices)))
putToTaskFunc := func(ranges []tidbkv.KeyRange) {
if len(ranges) == 0 {
return
}
tid := tablecodec.DecodeTableID(ranges[0].StartKey)
for _, r := range ranges {
tasks = append(tasks, dupTask{
KeyRange: r,
tableID: tid,
})
}
}
keyRanges.ForEachPartition(putToTaskFunc)
for _, indexInfo := range m.tbl.Meta().Indices {
if indexInfo.State != model.StatePublic {
continue
Expand All @@ -577,14 +574,7 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) {
if err != nil {
return nil, errors.Trace(err)
}
for _, kr := range keyRanges {
tableID := tablecodec.DecodeTableID(kr.StartKey)
tasks = append(tasks, dupTask{
KeyRange: kr,
tableID: tableID,
indexInfo: indexInfo,
})
}
keyRanges.ForEachPartition(putToTaskFunc)
}
return tasks, nil
}
Expand All @@ -598,15 +588,19 @@ func (m *DuplicateManager) buildIndexDupTasks() ([]dupTask, error) {
if err != nil {
return nil, errors.Trace(err)
}
tasks := make([]dupTask, 0, len(keyRanges))
for _, kr := range keyRanges {
tableID := tablecodec.DecodeTableID(kr.StartKey)
tasks = append(tasks, dupTask{
KeyRange: kr,
tableID: tableID,
indexInfo: indexInfo,
})
}
tasks := make([]dupTask, 0, keyRanges.TotalRangeNum())
keyRanges.ForEachPartition(func(ranges []tidbkv.KeyRange) {
if len(ranges) == 0 {
return
}
tid := tablecodec.DecodeTableID(ranges[0].StartKey)
for _, r := range ranges {
tasks = append(tasks, dupTask{
KeyRange: r,
tableID: tid,
})
}
})
return tasks, nil
}
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ func TestAlterTableDropPartitionByList(t *testing.T) {
);`)
tk.MustExec(`insert into t values (1),(3),(5),(null)`)
tk.MustExec(`alter table t drop partition p1`)
tk.MustQuery("select * from t").Sort().Check(testkit.Rows("1", "5", "<nil>"))
tk.MustQuery("select * from t order by id").Check(testkit.Rows("<nil>", "1", "5"))
ctx := tk.Session()
is := domain.GetDomain(ctx).InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
Expand Down
Loading

0 comments on commit f9a6e47

Please sign in to comment.