From f9a6e471921f0b3b33d63f2c4c3f5122c702a44f Mon Sep 17 00:00:00 2001 From: Yiding Cui Date: Tue, 29 Nov 2022 13:51:59 +0800 Subject: [PATCH] planner: support push part of order property down to the partition table (#36108) ref pingcap/tidb#26166 --- br/pkg/backup/client.go | 6 +- br/pkg/checksum/executor_test.go | 2 +- br/pkg/lightning/backend/local/duplicate.go | 66 ++++---- ddl/db_partition_test.go | 2 +- distsql/request_builder.go | 114 +++++++++----- distsql/request_builder_test.go | 28 ++-- executor/admin.go | 15 +- executor/builder.go | 40 +++-- executor/distsql.go | 24 +-- executor/index_merge_reader.go | 4 +- executor/partition_table_test.go | 145 +++++++++++++++++- executor/table_reader.go | 12 +- kv/BUILD.bazel | 1 + kv/kv.go | 145 +++++++++++++++++- .../testdata/integration_suite_in.json | 2 +- .../testdata/integration_suite_out.json | 13 +- planner/core/find_best_task.go | 1 + planner/core/fragment.go | 4 +- planner/core/integration_partition_test.go | 12 +- planner/core/partition_pruner_test.go | 14 +- planner/core/physical_plans.go | 7 +- planner/core/task.go | 138 +++++++++++++++++ .../core/testdata/partition_pruner_out.json | 38 ++--- store/copr/batch_coprocessor.go | 3 +- store/copr/copr_test/coprocessor_test.go | 8 +- store/copr/coprocessor.go | 30 +++- tablecodec/tablecodec.go | 27 ++++ testkit/result.go | 5 + 28 files changed, 719 insertions(+), 187 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 7614ca78e52c7..865e7fa2f3078 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -290,10 +290,12 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) { ranges = ranger.FullIntRange(false) } + retRanges := make([]kv.KeyRange, 0, 1+len(tbl.Indices)) kvRanges, err := distsql.TableHandleRangesToKVRanges(nil, []int64{tblID}, tbl.IsCommonHandle, ranges, nil) if err != nil { return nil, errors.Trace(err) } + retRanges = kvRanges.AppendSelfTo(retRanges) for _, index := range tbl.Indices { if index.State != model.StatePublic { @@ -304,9 +306,9 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) { if err != nil { return nil, errors.Trace(err) } - kvRanges = append(kvRanges, idxRanges...) + retRanges = idxRanges.AppendSelfTo(retRanges) } - return kvRanges, nil + return retRanges, nil } // BuildBackupRangeAndSchema gets KV range and schema of tables. diff --git a/br/pkg/checksum/executor_test.go b/br/pkg/checksum/executor_test.go index adcaed9c314f9..876103bc055a2 100644 --- a/br/pkg/checksum/executor_test.go +++ b/br/pkg/checksum/executor_test.go @@ -104,7 +104,7 @@ func TestChecksum(t *testing.T) { first = false ranges, err := backup.BuildTableRanges(tableInfo3) require.NoError(t, err) - require.Equalf(t, ranges[:1], req.KeyRanges, "%v", req.KeyRanges) + require.Equalf(t, ranges[:1], req.KeyRanges.FirstPartitionRange(), "%v", req.KeyRanges.FirstPartitionRange()) } return nil })) diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index b2858a8456f36..25bc7fabf514e 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -211,7 +211,7 @@ func physicalTableIDs(tableInfo *model.TableInfo) []int64 { } // tableHandleKeyRanges returns all key ranges associated with the tableInfo. -func tableHandleKeyRanges(tableInfo *model.TableInfo) ([]tidbkv.KeyRange, error) { +func tableHandleKeyRanges(tableInfo *model.TableInfo) (*tidbkv.KeyRanges, error) { ranges := ranger.FullIntRange(false) if tableInfo.IsCommonHandle { ranges = ranger.FullRange() @@ -221,18 +221,9 @@ func tableHandleKeyRanges(tableInfo *model.TableInfo) ([]tidbkv.KeyRange, error) } // tableIndexKeyRanges returns all key ranges associated with the tableInfo and indexInfo. -func tableIndexKeyRanges(tableInfo *model.TableInfo, indexInfo *model.IndexInfo) ([]tidbkv.KeyRange, error) { +func tableIndexKeyRanges(tableInfo *model.TableInfo, indexInfo *model.IndexInfo) (*tidbkv.KeyRanges, error) { tableIDs := physicalTableIDs(tableInfo) - //nolint: prealloc - var keyRanges []tidbkv.KeyRange - for _, tid := range tableIDs { - partitionKeysRanges, err := distsql.IndexRangesToKVRanges(nil, tid, indexInfo.ID, ranger.FullRange(), nil) - if err != nil { - return nil, errors.Trace(err) - } - keyRanges = append(keyRanges, partitionKeysRanges...) - } - return keyRanges, nil + return distsql.IndexRangesToKVRangesForTables(nil, tableIDs, indexInfo.ID, ranger.FullRange(), nil) } // DupKVStream is a streaming interface for collecting duplicate key-value pairs. @@ -561,14 +552,20 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) { if err != nil { return nil, errors.Trace(err) } - tasks := make([]dupTask, 0, len(keyRanges)) - for _, kr := range keyRanges { - tableID := tablecodec.DecodeTableID(kr.StartKey) - tasks = append(tasks, dupTask{ - KeyRange: kr, - tableID: tableID, - }) + tasks := make([]dupTask, 0, keyRanges.TotalRangeNum()*(1+len(m.tbl.Meta().Indices))) + putToTaskFunc := func(ranges []tidbkv.KeyRange) { + if len(ranges) == 0 { + return + } + tid := tablecodec.DecodeTableID(ranges[0].StartKey) + for _, r := range ranges { + tasks = append(tasks, dupTask{ + KeyRange: r, + tableID: tid, + }) + } } + keyRanges.ForEachPartition(putToTaskFunc) for _, indexInfo := range m.tbl.Meta().Indices { if indexInfo.State != model.StatePublic { continue @@ -577,14 +574,7 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) { if err != nil { return nil, errors.Trace(err) } - for _, kr := range keyRanges { - tableID := tablecodec.DecodeTableID(kr.StartKey) - tasks = append(tasks, dupTask{ - KeyRange: kr, - tableID: tableID, - indexInfo: indexInfo, - }) - } + keyRanges.ForEachPartition(putToTaskFunc) } return tasks, nil } @@ -598,15 +588,19 @@ func (m *DuplicateManager) buildIndexDupTasks() ([]dupTask, error) { if err != nil { return nil, errors.Trace(err) } - tasks := make([]dupTask, 0, len(keyRanges)) - for _, kr := range keyRanges { - tableID := tablecodec.DecodeTableID(kr.StartKey) - tasks = append(tasks, dupTask{ - KeyRange: kr, - tableID: tableID, - indexInfo: indexInfo, - }) - } + tasks := make([]dupTask, 0, keyRanges.TotalRangeNum()) + keyRanges.ForEachPartition(func(ranges []tidbkv.KeyRange) { + if len(ranges) == 0 { + return + } + tid := tablecodec.DecodeTableID(ranges[0].StartKey) + for _, r := range ranges { + tasks = append(tasks, dupTask{ + KeyRange: r, + tableID: tid, + }) + } + }) return tasks, nil } return nil, nil diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index e5ad2aa2bbfec..d714ed716f9f9 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -1409,7 +1409,7 @@ func TestAlterTableDropPartitionByList(t *testing.T) { );`) tk.MustExec(`insert into t values (1),(3),(5),(null)`) tk.MustExec(`alter table t drop partition p1`) - tk.MustQuery("select * from t").Sort().Check(testkit.Rows("1", "5", "")) + tk.MustQuery("select * from t order by id").Check(testkit.Rows("", "1", "5")) ctx := tk.Session() is := domain.GetDomain(ctx).InfoSchema() tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 4a8b3ddfeab13..a293c4d10963e 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -20,7 +20,6 @@ import ( "sort" "sync/atomic" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/ddl/placement" @@ -71,6 +70,9 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) { if err != nil { builder.err = err } + if builder.Request.KeyRanges == nil { + builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(nil) + } return &builder.Request, builder.err } @@ -86,7 +88,7 @@ func (builder *RequestBuilder) SetMemTracker(tracker *memory.Tracker) *RequestBu // br refers it, so have to keep it. func (builder *RequestBuilder) SetTableRanges(tid int64, tableRanges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder { if builder.err == nil { - builder.Request.KeyRanges = TableRangesToKVRanges(tid, tableRanges, fb) + builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(TableRangesToKVRanges(tid, tableRanges, fb)) } return builder } @@ -112,7 +114,9 @@ func (builder *RequestBuilder) SetIndexRangesForTables(sc *stmtctx.StatementCont // SetHandleRanges sets "KeyRanges" for "kv.Request" by converting table handle range // "ranges" to "KeyRanges" firstly. func (builder *RequestBuilder) SetHandleRanges(sc *stmtctx.StatementContext, tid int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder { - return builder.SetHandleRangesForTables(sc, []int64{tid}, isCommonHandle, ranges, fb) + builder = builder.SetHandleRangesForTables(sc, []int64{tid}, isCommonHandle, ranges, fb) + builder.err = builder.Request.KeyRanges.SetToNonPartitioned() + return builder } // SetHandleRangesForTables sets "KeyRanges" for "kv.Request" by converting table handle range @@ -127,14 +131,17 @@ func (builder *RequestBuilder) SetHandleRangesForTables(sc *stmtctx.StatementCon // SetTableHandles sets "KeyRanges" for "kv.Request" by converting table handles // "handles" to "KeyRanges" firstly. func (builder *RequestBuilder) SetTableHandles(tid int64, handles []kv.Handle) *RequestBuilder { - builder.Request.KeyRanges, builder.FixedRowCountHint = TableHandlesToKVRanges(tid, handles) + var keyRanges []kv.KeyRange + keyRanges, builder.FixedRowCountHint = TableHandlesToKVRanges(tid, handles) + builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges) return builder } // SetPartitionsAndHandles sets "KeyRanges" for "kv.Request" by converting ParitionHandles to KeyRanges. // handles in slice must be kv.PartitionHandle. func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *RequestBuilder { - builder.Request.KeyRanges = PartitionHandlesToKVRanges(handles) + keyRanges := PartitionHandlesToKVRanges(handles) + builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges) return builder } @@ -183,10 +190,22 @@ func (builder *RequestBuilder) SetChecksumRequest(checksum *tipb.ChecksumRequest // SetKeyRanges sets "KeyRanges" for "kv.Request". func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBuilder { + builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges) + return builder +} + +// SetWrappedKeyRanges sets "KeyRanges" for "kv.Request". +func (builder *RequestBuilder) SetWrappedKeyRanges(keyRanges *kv.KeyRanges) *RequestBuilder { builder.Request.KeyRanges = keyRanges return builder } +// SetPartitionKeyRanges sets the "KeyRanges" for "kv.Request" on partitioned table cases. +func (builder *RequestBuilder) SetPartitionKeyRanges(keyRanges [][]kv.KeyRange) *RequestBuilder { + builder.Request.KeyRanges = kv.NewPartitionedKeyRanges(keyRanges) + return builder +} + // SetStartTS sets "StartTS" for "kv.Request". func (builder *RequestBuilder) SetStartTS(startTS uint64) *RequestBuilder { builder.Request.StartTs = startTS @@ -318,13 +337,12 @@ func (builder *RequestBuilder) verifyTxnScope() error { return nil } visitPhysicalTableID := make(map[int64]struct{}) - for _, keyRange := range builder.Request.KeyRanges { - tableID := tablecodec.DecodeTableID(keyRange.StartKey) - if tableID > 0 { - visitPhysicalTableID[tableID] = struct{}{} - } else { - return errors.New("requestBuilder can't decode tableID from keyRange") - } + tids, err := tablecodec.VerifyTableIDForRanges(builder.Request.KeyRanges) + if err != nil { + return err + } + for _, tid := range tids { + visitPhysicalTableID[tid] = struct{}{} } for phyTableID := range visitPhysicalTableID { @@ -376,7 +394,7 @@ func (builder *RequestBuilder) SetClosestReplicaReadAdjuster(chkFn kv.CoprReques } // TableHandleRangesToKVRanges convert table handle ranges to "KeyRanges" for multiple tables. -func TableHandleRangesToKVRanges(sc *stmtctx.StatementContext, tid []int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) { +func TableHandleRangesToKVRanges(sc *stmtctx.StatementContext, tid []int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) (*kv.KeyRanges, error) { if !isCommonHandle { return tablesRangesToKVRanges(tid, ranges, fb), nil } @@ -387,14 +405,18 @@ func TableHandleRangesToKVRanges(sc *stmtctx.StatementContext, tid []int64, isCo // Note this function should not be exported, but currently // br refers to it, so have to keep it. func TableRangesToKVRanges(tid int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) []kv.KeyRange { - return tablesRangesToKVRanges([]int64{tid}, ranges, fb) + if len(ranges) == 0 { + return []kv.KeyRange{} + } + return tablesRangesToKVRanges([]int64{tid}, ranges, fb).FirstPartitionRange() } // tablesRangesToKVRanges converts table ranges to "KeyRange". -func tablesRangesToKVRanges(tids []int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) []kv.KeyRange { +func tablesRangesToKVRanges(tids []int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) *kv.KeyRanges { if fb == nil || fb.Hist == nil { return tableRangesToKVRangesWithoutSplit(tids, ranges) } + // The following codes are deprecated since the feedback is deprecated. krs := make([]kv.KeyRange, 0, len(ranges)) feedbackRanges := make([]*ranger.Range, 0, len(ranges)) for _, ran := range ranges { @@ -420,20 +442,23 @@ func tablesRangesToKVRanges(tids []int64, ranges []*ranger.Range, fb *statistics } } fb.StoreRanges(feedbackRanges) - return krs + return kv.NewNonParitionedKeyRanges(krs) } -func tableRangesToKVRangesWithoutSplit(tids []int64, ranges []*ranger.Range) []kv.KeyRange { - krs := make([]kv.KeyRange, 0, len(ranges)*len(tids)) +func tableRangesToKVRangesWithoutSplit(tids []int64, ranges []*ranger.Range) *kv.KeyRanges { + krs := make([][]kv.KeyRange, len(tids)) + for i := range krs { + krs[i] = make([]kv.KeyRange, 0, len(ranges)) + } for _, ran := range ranges { low, high := encodeHandleKey(ran) - for _, tid := range tids { + for i, tid := range tids { startKey := tablecodec.EncodeRowKey(tid, low) endKey := tablecodec.EncodeRowKey(tid, high) - krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) + krs[i] = append(krs[i], kv.KeyRange{StartKey: startKey, EndKey: endKey}) } } - return krs + return kv.NewPartitionedKeyRanges(krs) } func encodeHandleKey(ran *ranger.Range) ([]byte, []byte) { @@ -587,27 +612,33 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange { } // IndexRangesToKVRanges converts index ranges to "KeyRange". -func IndexRangesToKVRanges(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) { +func IndexRangesToKVRanges(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) (*kv.KeyRanges, error) { return IndexRangesToKVRangesWithInterruptSignal(sc, tid, idxID, ranges, fb, nil, nil) } // IndexRangesToKVRangesWithInterruptSignal converts index ranges to "KeyRange". // The process can be interrupted by set `interruptSignal` to true. -func IndexRangesToKVRangesWithInterruptSignal(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) { - return indexRangesToKVRangesForTablesWithInterruptSignal(sc, []int64{tid}, idxID, ranges, fb, memTracker, interruptSignal) +func IndexRangesToKVRangesWithInterruptSignal(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback, memTracker *memory.Tracker, interruptSignal *atomic.Value) (*kv.KeyRanges, error) { + keyRanges, err := indexRangesToKVRangesForTablesWithInterruptSignal(sc, []int64{tid}, idxID, ranges, fb, memTracker, interruptSignal) + if err != nil { + return nil, err + } + err = keyRanges.SetToNonPartitioned() + return keyRanges, err } // IndexRangesToKVRangesForTables converts indexes ranges to "KeyRange". -func IndexRangesToKVRangesForTables(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) { +func IndexRangesToKVRangesForTables(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) (*kv.KeyRanges, error) { return indexRangesToKVRangesForTablesWithInterruptSignal(sc, tids, idxID, ranges, fb, nil, nil) } // IndexRangesToKVRangesForTablesWithInterruptSignal converts indexes ranges to "KeyRange". // The process can be interrupted by set `interruptSignal` to true. -func indexRangesToKVRangesForTablesWithInterruptSignal(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) { +func indexRangesToKVRangesForTablesWithInterruptSignal(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback, memTracker *memory.Tracker, interruptSignal *atomic.Value) (*kv.KeyRanges, error) { if fb == nil || fb.Hist == nil { return indexRangesToKVWithoutSplit(sc, tids, idxID, ranges, memTracker, interruptSignal) } + // The following code is non maintained since the feedback deprecated. feedbackRanges := make([]*ranger.Range, 0, len(ranges)) for _, ran := range ranges { low, high, err := EncodeIndexKey(sc, ran) @@ -642,11 +673,11 @@ func indexRangesToKVRangesForTablesWithInterruptSignal(sc *stmtctx.StatementCont } } fb.StoreRanges(feedbackRanges) - return krs, nil + return kv.NewNonParitionedKeyRanges(krs), nil } // CommonHandleRangesToKVRanges converts common handle ranges to "KeyRange". -func CommonHandleRangesToKVRanges(sc *stmtctx.StatementContext, tids []int64, ranges []*ranger.Range) ([]kv.KeyRange, error) { +func CommonHandleRangesToKVRanges(sc *stmtctx.StatementContext, tids []int64, ranges []*ranger.Range) (*kv.KeyRanges, error) { rans := make([]*ranger.Range, 0, len(ranges)) for _, ran := range ranges { low, high, err := EncodeIndexKey(sc, ran) @@ -656,20 +687,23 @@ func CommonHandleRangesToKVRanges(sc *stmtctx.StatementContext, tids []int64, ra rans = append(rans, &ranger.Range{LowVal: []types.Datum{types.NewBytesDatum(low)}, HighVal: []types.Datum{types.NewBytesDatum(high)}, LowExclude: false, HighExclude: true, Collators: collate.GetBinaryCollatorSlice(1)}) } - krs := make([]kv.KeyRange, 0, len(rans)) + krs := make([][]kv.KeyRange, len(tids)) + for i := range krs { + krs[i] = make([]kv.KeyRange, 0, len(ranges)) + } for _, ran := range rans { low, high := ran.LowVal[0].GetBytes(), ran.HighVal[0].GetBytes() if ran.LowExclude { low = kv.Key(low).PrefixNext() } ran.LowVal[0].SetBytes(low) - for _, tid := range tids { + for i, tid := range tids { startKey := tablecodec.EncodeRowKey(tid, low) endKey := tablecodec.EncodeRowKey(tid, high) - krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) + krs[i] = append(krs[i], kv.KeyRange{StartKey: startKey, EndKey: endKey}) } } - return krs, nil + return kv.NewPartitionedKeyRanges(krs), nil } // VerifyTxnScope verify whether the txnScope and visited physical table break the leader rule's dcLocation. @@ -691,8 +725,12 @@ func VerifyTxnScope(txnScope string, physicalTableID int64, is infoschema.InfoSc return true } -func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) { - krs := make([]kv.KeyRange, 0, len(ranges)) +func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, memTracker *memory.Tracker, interruptSignal *atomic.Value) (*kv.KeyRanges, error) { + krs := make([][]kv.KeyRange, len(tids)) + for i := range krs { + krs[i] = make([]kv.KeyRange, 0, len(ranges)) + } + const checkSignalStep = 8 var estimatedMemUsage int64 // encodeIndexKey and EncodeIndexSeekKey is time-consuming, thus we need to @@ -705,13 +743,13 @@ func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idx if i == 0 { estimatedMemUsage += int64(cap(low) + cap(high)) } - for _, tid := range tids { + for j, tid := range tids { startKey := tablecodec.EncodeIndexSeekKey(tid, idxID, low) endKey := tablecodec.EncodeIndexSeekKey(tid, idxID, high) if i == 0 { estimatedMemUsage += int64(cap(startKey)) + int64(cap(endKey)) } - krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) + krs[j] = append(krs[j], kv.KeyRange{StartKey: startKey, EndKey: endKey}) } if i%checkSignalStep == 0 { if i == 0 && memTracker != nil { @@ -719,11 +757,11 @@ func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idx memTracker.Consume(estimatedMemUsage) } if interruptSignal != nil && interruptSignal.Load().(bool) { - return nil, nil + return kv.NewPartitionedKeyRanges(nil), nil } } } - return krs, nil + return kv.NewPartitionedKeyRanges(krs), nil } // EncodeIndexKey gets encoded keys containing low and high diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index 2ffde4a512c0d..fa55229e36fa5 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -192,8 +192,8 @@ func TestIndexRangesToKVRanges(t *testing.T) { actual, err := IndexRangesToKVRanges(new(stmtctx.StatementContext), 12, 15, ranges, nil) require.NoError(t, err) - for i := range actual { - require.Equal(t, expect[i], actual[i]) + for i := range actual.FirstPartitionRange() { + require.Equal(t, expect[i], actual.FirstPartitionRange()[i]) } } @@ -242,7 +242,7 @@ func TestRequestBuilder1(t *testing.T) { Tp: 103, StartTs: 0x0, Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0}, - KeyRanges: []kv.KeyRange{ + KeyRanges: kv.NewNonParitionedKeyRanges([]kv.KeyRange{ { StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3}, @@ -263,7 +263,7 @@ func TestRequestBuilder1(t *testing.T) { StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x23}, EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x23}, }, - }, + }), Cacheable: true, KeepOrder: false, Desc: false, @@ -325,7 +325,7 @@ func TestRequestBuilder2(t *testing.T) { Tp: 103, StartTs: 0x0, Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0}, - KeyRanges: []kv.KeyRange{ + KeyRanges: kv.NewNonParitionedKeyRanges([]kv.KeyRange{ { StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3}, @@ -346,7 +346,7 @@ func TestRequestBuilder2(t *testing.T) { StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x23}, EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x23}, }, - }, + }), Cacheable: true, KeepOrder: false, Desc: false, @@ -378,7 +378,7 @@ func TestRequestBuilder3(t *testing.T) { Tp: 103, StartTs: 0x0, Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0}, - KeyRanges: []kv.KeyRange{ + KeyRanges: kv.NewNonParitionedKeyRanges([]kv.KeyRange{ { StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, @@ -395,7 +395,7 @@ func TestRequestBuilder3(t *testing.T) { StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64}, EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x65}, }, - }, + }), Cacheable: true, KeepOrder: false, Desc: false, @@ -444,7 +444,7 @@ func TestRequestBuilder4(t *testing.T) { Tp: 103, StartTs: 0x0, Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0}, - KeyRanges: keyRanges, + KeyRanges: kv.NewNonParitionedKeyRanges(keyRanges), Cacheable: true, KeepOrder: false, Desc: false, @@ -491,7 +491,7 @@ func TestRequestBuilder5(t *testing.T) { Tp: 104, StartTs: 0x0, Data: []uint8{0x8, 0x0, 0x18, 0x0, 0x20, 0x0}, - KeyRanges: keyRanges, + KeyRanges: kv.NewNonParitionedKeyRanges(keyRanges), KeepOrder: true, Desc: false, Concurrency: 15, @@ -520,7 +520,7 @@ func TestRequestBuilder6(t *testing.T) { Tp: 105, StartTs: 0x0, Data: []uint8{0x10, 0x0, 0x18, 0x0}, - KeyRanges: keyRanges, + KeyRanges: kv.NewNonParitionedKeyRanges(keyRanges), KeepOrder: false, Desc: false, Concurrency: concurrency, @@ -557,6 +557,7 @@ func TestRequestBuilder7(t *testing.T) { Tp: 0, StartTs: 0x0, KeepOrder: false, + KeyRanges: kv.NewNonParitionedKeyRanges(nil), Desc: false, Concurrency: concurrency, IsolationLevel: 0, @@ -583,6 +584,7 @@ func TestRequestBuilder8(t *testing.T) { Tp: 0, StartTs: 0x0, Data: []uint8(nil), + KeyRanges: kv.NewNonParitionedKeyRanges(nil), Concurrency: variable.DefDistSQLScanConcurrency, IsolationLevel: 0, Priority: 0, @@ -635,8 +637,8 @@ func TestIndexRangesToKVRangesWithFbs(t *testing.T) { EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5}, }, } - for i := 0; i < len(actual); i++ { - require.Equal(t, expect[i], actual[i]) + for i := 0; i < len(actual.FirstPartitionRange()); i++ { + require.Equal(t, expect[i], actual.FirstPartitionRange()[i]) } } diff --git a/executor/admin.go b/executor/admin.go index 1a0f5579281cc..a0484ce957b30 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -265,10 +265,11 @@ func (e *RecoverIndexExec) buildTableScan(ctx context.Context, txn kv.Transactio return nil, err } var builder distsql.RequestBuilder - builder.KeyRanges, err = buildRecoverIndexKeyRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalID, startHandle) + keyRanges, err := buildRecoverIndexKeyRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalID, startHandle) if err != nil { return nil, err } + builder.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges) kvReq, err := builder. SetDAGRequest(dagPB). SetStartTS(txn.StartTS()). @@ -737,7 +738,16 @@ func (e *CleanupIndexExec) buildIndexScan(ctx context.Context, txn kv.Transactio sc := e.ctx.GetSessionVars().StmtCtx var builder distsql.RequestBuilder ranges := ranger.FullRange() - kvReq, err := builder.SetIndexRanges(sc, e.physicalID, e.index.Meta().ID, ranges). + keyRanges, err := distsql.IndexRangesToKVRanges(sc, e.physicalID, e.index.Meta().ID, ranges, nil) + if err != nil { + return nil, err + } + err = keyRanges.SetToNonPartitioned() + if err != nil { + return nil, err + } + keyRanges.FirstPartitionRange()[0].StartKey = kv.Key(e.lastIdxKey).PrefixNext() + kvReq, err := builder.SetWrappedKeyRanges(keyRanges). SetDAGRequest(dagPB). SetStartTS(txn.StartTS()). SetKeepOrder(true). @@ -748,7 +758,6 @@ func (e *CleanupIndexExec) buildIndexScan(ctx context.Context, txn kv.Transactio return nil, err } - kvReq.KeyRanges[0].StartKey = kv.Key(e.lastIdxKey).PrefixNext() kvReq.Concurrency = 1 result, err := distsql.Select(ctx, e.ctx, kvReq, e.getIdxColTypes(), statistics.NewQueryFeedback(0, nil, 0, false)) if err != nil { diff --git a/executor/builder.go b/executor/builder.go index 3d015849aa5ef..01bf8496fe77b 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -4262,32 +4262,37 @@ type kvRangeBuilderFromRangeAndPartition struct { } func (h kvRangeBuilderFromRangeAndPartition) buildKeyRangeSeparately(ranges []*ranger.Range) ([]int64, [][]kv.KeyRange, error) { - ret := make([][]kv.KeyRange, 0, len(h.partitions)) + ret := make([][]kv.KeyRange, len(h.partitions)) pids := make([]int64, 0, len(h.partitions)) - for _, p := range h.partitions { + for i, p := range h.partitions { pid := p.GetPhysicalID() + pids = append(pids, pid) meta := p.Meta() + if len(ranges) == 0 { + continue + } kvRange, err := distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, meta != nil && meta.IsCommonHandle, ranges, nil) if err != nil { return nil, nil, err } - pids = append(pids, pid) - ret = append(ret, kvRange) + ret[i] = kvRange.AppendSelfTo(ret[i]) } return pids, ret, nil } -func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(ranges []*ranger.Range) ([]kv.KeyRange, error) { - //nolint: prealloc - var ret []kv.KeyRange - for _, p := range h.partitions { +func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(ranges []*ranger.Range) ([][]kv.KeyRange, error) { + ret := make([][]kv.KeyRange, len(h.partitions)) + if len(ranges) == 0 { + return ret, nil + } + for i, p := range h.partitions { pid := p.GetPhysicalID() meta := p.Meta() kvRange, err := distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, meta != nil && meta.IsCommonHandle, ranges, nil) if err != nil { return nil, err } - ret = append(ret, kvRange...) + ret[i] = kvRange.AppendSelfTo(ret[i]) } return ret, nil } @@ -4334,7 +4339,7 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T if err != nil { return nil, err } - e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + e.kvRanges = kvReq.KeyRanges.AppendSelfTo(e.kvRanges) e.resultHandler = &tableResultHandler{} result, err := builder.SelectResult(ctx, builder.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) if err != nil { @@ -4357,6 +4362,8 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex } else { b.SetTableHandles(getPhysicalTableID(e.table), handles) } + } else { + b.SetKeyRanges(nil) } return builder.buildTableReaderBase(ctx, e, b) } @@ -4545,6 +4552,9 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*indexJoin func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, lookUpContents []*indexJoinLookUpContent, ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (_ []kv.KeyRange, err error) { kvRanges := make([]kv.KeyRange, 0, len(ranges)*len(lookUpContents)) + if len(ranges) == 0 { + return []kv.KeyRange{}, nil + } lastPos := len(ranges[0].LowVal) - 1 sc := ctx.GetSessionVars().StmtCtx tmpDatumRanges := make([]*ranger.Range, 0, len(lookUpContents)) @@ -4557,7 +4567,7 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l } if cwc == nil { // Index id is -1 means it's a common handle. - var tmpKvRanges []kv.KeyRange + var tmpKvRanges *kv.KeyRanges var err error if indexID == -1 { tmpKvRanges, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{tableID}, ranges) @@ -4567,7 +4577,7 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l if err != nil { return nil, err } - kvRanges = append(kvRanges, tmpKvRanges...) + kvRanges = tmpKvRanges.AppendSelfTo(kvRanges) continue } nextColRanges, err := cwc.BuildRangesByRow(ctx, content.row) @@ -4604,9 +4614,11 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l } // Index id is -1 means it's a common handle. if indexID == -1 { - return distsql.CommonHandleRangesToKVRanges(ctx.GetSessionVars().StmtCtx, []int64{tableID}, tmpDatumRanges) + tmpKeyRanges, err := distsql.CommonHandleRangesToKVRanges(ctx.GetSessionVars().StmtCtx, []int64{tableID}, tmpDatumRanges) + return tmpKeyRanges.FirstPartitionRange(), err } - return distsql.IndexRangesToKVRangesWithInterruptSignal(ctx.GetSessionVars().StmtCtx, tableID, indexID, tmpDatumRanges, nil, memTracker, interruptSignal) + tmpKeyRanges, err := distsql.IndexRangesToKVRangesWithInterruptSignal(ctx.GetSessionVars().StmtCtx, tableID, indexID, tmpDatumRanges, nil, memTracker, interruptSignal) + return tmpKeyRanges.FirstPartitionRange(), err } func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) Executor { diff --git a/executor/distsql.go b/executor/distsql.go index 0cef7e66d441e..966a4a09ce357 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -243,11 +243,18 @@ func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error return err } +// TODO: cleanup this method. func (e *IndexReaderExecutor) buildKeyRanges(sc *stmtctx.StatementContext, ranges []*ranger.Range, physicalID int64) ([]kv.KeyRange, error) { + var ( + rRanges *kv.KeyRanges + err error + ) if e.index.ID == -1 { - return distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, ranges) + rRanges, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, ranges) + } else { + rRanges, err = distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, ranges, e.feedback) } - return distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, ranges, e.feedback) + return rRanges.FirstPartitionRange(), err } // Open implements the Executor Open interface. @@ -458,9 +465,6 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) { sc := e.ctx.GetSessionVars().StmtCtx if e.partitionTableMode { - if e.keepOrder { // this case should be prevented by the optimizer - return errors.New("invalid execution plan: cannot keep order when accessing a partition table by IndexLookUpReader") - } e.feedback.Invalidate() // feedback for partition tables is not ready e.partitionKVRanges = make([][]kv.KeyRange, 0, len(e.prunedPartitions)) for _, p := range e.prunedPartitions { @@ -472,7 +476,7 @@ func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) { if e.partitionRangeMap != nil && e.partitionRangeMap[physicalID] != nil { ranges = e.partitionRangeMap[physicalID] } - var kvRange []kv.KeyRange + var kvRange *kv.KeyRanges if e.index.ID == -1 { kvRange, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, ranges) } else { @@ -481,15 +485,17 @@ func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) { if err != nil { return err } - e.partitionKVRanges = append(e.partitionKVRanges, kvRange) + e.partitionKVRanges = append(e.partitionKVRanges, kvRange.FirstPartitionRange()) } } else { physicalID := getPhysicalTableID(e.table) + var kvRanges *kv.KeyRanges if e.index.ID == -1 { - e.kvRanges, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, e.ranges) + kvRanges, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, e.ranges) } else { - e.kvRanges, err = distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, e.ranges, e.feedback) + kvRanges, err = distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, e.ranges, e.feedback) } + e.kvRanges = kvRanges.FirstPartitionRange() } return err } diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 0e7eb394710fd..09e0885dee23b 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -194,7 +194,7 @@ func (e *IndexMergeReaderExecutor) buildKeyRangesForTable(tbl table.Table) (rang if err != nil { return nil, err } - keyRanges := append(firstKeyRanges, secondKeyRanges...) + keyRanges := append(firstKeyRanges.FirstPartitionRange(), secondKeyRanges.FirstPartitionRange()...) ranges = append(ranges, keyRanges) continue } @@ -202,7 +202,7 @@ func (e *IndexMergeReaderExecutor) buildKeyRangesForTable(tbl table.Table) (rang if err != nil { return nil, err } - ranges = append(ranges, keyRange) + ranges = append(ranges, keyRange.FirstPartitionRange()) } return ranges, nil } diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 50bb68a7b5235..b2ba37634a8a4 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -84,7 +84,7 @@ partition p2 values less than (10))`) // Table reader: one partition tk.MustQuery("select * from pt where c > 8").Check(testkit.Rows("9 9")) // Table reader: more than one partition - tk.MustQuery("select * from pt where c < 2 or c >= 9").Check(testkit.Rows("0 0", "9 9")) + tk.MustQuery("select * from pt where c < 2 or c >= 9").Sort().Check(testkit.Rows("0 0", "9 9")) // Index reader tk.MustQuery("select c from pt").Sort().Check(testkit.Rows("0", "2", "4", "6", "7", "9", "")) @@ -96,7 +96,7 @@ partition p2 values less than (10))`) tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt").Sort().Check(testkit.Rows("0 0", "2 2", "4 4", "6 6", "7 7", "9 9", " ")) tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt where id < 4 and c > 10").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt where id < 10 and c > 8").Check(testkit.Rows("9 9")) - tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt where id < 10 and c < 2 or c >= 9").Check(testkit.Rows("0 0", "9 9")) + tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt where id < 10 and c < 2 or c >= 9").Sort().Check(testkit.Rows("0 0", "9 9")) // Index Merge tk.MustExec("set @@tidb_enable_index_merge = 1") @@ -377,14 +377,67 @@ func TestOrderByandLimit(t *testing.T) { // regular table tk.MustExec("create table tregular(a int, b int, index idx_a(a))") + // range partition table with int pk + tk.MustExec(`create table trange_intpk(a int primary key, b int) partition by range(a) ( + partition p0 values less than(300), + partition p1 values less than (500), + partition p2 values less than(1100));`) + + // hash partition table with int pk + tk.MustExec("create table thash_intpk(a int primary key, b int) partition by hash(a) partitions 4;") + + // regular table with int pk + tk.MustExec("create table tregular_intpk(a int primary key, b int)") + + // range partition table with clustered index + tk.MustExec(`create table trange_clustered(a int, b int, primary key(a, b) clustered) partition by range(a) ( + partition p0 values less than(300), + partition p1 values less than (500), + partition p2 values less than(1100));`) + + // hash partition table with clustered index + tk.MustExec("create table thash_clustered(a int, b int, primary key(a, b) clustered) partition by hash(a) partitions 4;") + + // regular table with clustered index + tk.MustExec("create table tregular_clustered(a int, b int, primary key(a, b) clustered)") + // generate some random data to be inserted vals := make([]string, 0, 2000) for i := 0; i < 2000; i++ { vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(1100), rand.Intn(2000))) } + + dedupValsA := make([]string, 0, 2000) + dedupMapA := make(map[int]struct{}, 2000) + for i := 0; i < 2000; i++ { + valA := rand.Intn(1100) + if _, ok := dedupMapA[valA]; ok { + continue + } + dedupValsA = append(dedupValsA, fmt.Sprintf("(%v, %v)", valA, rand.Intn(2000))) + dedupMapA[valA] = struct{}{} + } + + dedupValsAB := make([]string, 0, 2000) + dedupMapAB := make(map[string]struct{}, 2000) + for i := 0; i < 2000; i++ { + val := fmt.Sprintf("(%v, %v)", rand.Intn(1100), rand.Intn(2000)) + if _, ok := dedupMapAB[val]; ok { + continue + } + dedupValsAB = append(dedupValsAB, val) + dedupMapAB[val] = struct{}{} + } + tk.MustExec("insert into trange values " + strings.Join(vals, ",")) tk.MustExec("insert into thash values " + strings.Join(vals, ",")) tk.MustExec("insert into tregular values " + strings.Join(vals, ",")) + tk.MustExec("insert into trange_intpk values " + strings.Join(dedupValsA, ",")) + tk.MustExec("insert into thash_intpk values " + strings.Join(dedupValsA, ",")) + tk.MustExec("insert into tregular_intpk values " + strings.Join(dedupValsA, ",")) + tk.MustExec("insert into trange_clustered values " + strings.Join(dedupValsAB, ",")) + tk.MustExec("insert into thash_clustered values " + strings.Join(dedupValsAB, ",")) + tk.MustExec("insert into tregular_clustered values " + strings.Join(dedupValsAB, ",")) // test indexLookUp for i := 0; i < 100; i++ { @@ -398,6 +451,29 @@ func TestOrderByandLimit(t *testing.T) { tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) } + // test indexLookUp with order property pushed down. + for i := 0; i < 100; i++ { + // explain select * from t where a > {y} use index(idx_a) order by a limit {x}; // check if IndexLookUp is used + // select * from t where a > {y} use index(idx_a) order by a limit {x}; // it can return the correct result + x := rand.Intn(1099) + y := rand.Intn(2000) + 1 + // Since we only use order by a not order by a, b, the result is not stable when we read both a and b. + // We cut the max element so that the result can be stable. + maxEle := tk.MustQuery(fmt.Sprintf("select ifnull(max(a), 1100) from (select * from tregular use index(idx_a) where a > %v order by a limit %v) t", x, y)).Rows()[0][0] + queryRangePartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y) + queryHashPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y) + queryRegular := fmt.Sprintf("select * from tregular use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v;", x, x+1, maxEle, y) + require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "Limit")) + require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "IndexLookUp")) + require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "Limit")) + require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "IndexLookUp")) + require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "TopN")) // but not fully pushed + require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "TopN")) + regularResult := tk.MustQuery(queryRegular).Sort().Rows() + tk.MustQuery(queryRangePartitionWithLimitHint).Sort().Check(regularResult) + tk.MustQuery(queryHashPartitionWithLimitHint).Sort().Check(regularResult) + } + // test tableReader for i := 0; i < 100; i++ { // explain select * from t where a > {y} ignore index(idx_a) order by a limit {x}; // check if IndexLookUp is used @@ -410,6 +486,51 @@ func TestOrderByandLimit(t *testing.T) { tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) } + // test tableReader with order property pushed down. + for i := 0; i < 100; i++ { + // explain select * from t where a > {y} ignore index(idx_a) order by a limit {x}; // check if IndexLookUp is used + // select * from t where a > {y} ignore index(idx_a) order by a limit {x}; // it can return the correct result + x := rand.Intn(1099) + y := rand.Intn(2000) + 1 + queryRangePartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange ignore index(idx_a) where a > %v order by a, b limit %v;", x, y) + queryHashPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash ignore index(idx_a) where a > %v order by a, b limit %v;", x, y) + queryRegular := fmt.Sprintf("select * from tregular ignore index(idx_a) where a > %v order by a, b limit %v;", x, y) + require.True(t, tk.HasPlan(queryRangePartition, "TableReader")) // check if tableReader is used + require.True(t, tk.HasPlan(queryHashPartition, "TableReader")) + require.False(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is not pushed + require.False(t, tk.HasPlan(queryHashPartition, "Limit")) + regularResult := tk.MustQuery(queryRegular).Sort().Rows() + tk.MustQuery(queryRangePartition).Sort().Check(regularResult) + tk.MustQuery(queryHashPartition).Sort().Check(regularResult) + + // test int pk + // To be simplified, we only read column a. + queryRangePartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange_intpk use index(primary) where a > %v order by a limit %v", x, y) + queryHashPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from thash_intpk use index(primary) where a > %v order by a limit %v", x, y) + queryRegular = fmt.Sprintf("select a from tregular_intpk where a > %v order by a limit %v", x, y) + require.True(t, tk.HasPlan(queryRangePartition, "TableReader")) + require.True(t, tk.HasPlan(queryHashPartition, "TableReader")) + require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is not pushed + require.True(t, tk.HasPlan(queryHashPartition, "Limit")) + regularResult = tk.MustQuery(queryRegular).Rows() + tk.MustQuery(queryRangePartition).Check(regularResult) + tk.MustQuery(queryHashPartition).Check(regularResult) + + // test clustered index + queryRangePartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange_clustered use index(primary) where a > %v order by a, b limit %v;", x, y) + queryHashPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash_clustered use index(primary) where a > %v order by a, b limit %v;", x, y) + queryRegular = fmt.Sprintf("select * from tregular_clustered where a > %v order by a, b limit %v;", x, y) + require.True(t, tk.HasPlan(queryRangePartition, "TableReader")) // check if tableReader is used + require.True(t, tk.HasPlan(queryHashPartition, "TableReader")) + require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed + require.True(t, tk.HasPlan(queryHashPartition, "Limit")) + require.True(t, tk.HasPlan(queryRangePartition, "TopN")) // but not fully pushed + require.True(t, tk.HasPlan(queryHashPartition, "TopN")) + regularResult = tk.MustQuery(queryRegular).Rows() + tk.MustQuery(queryRangePartition).Check(regularResult) + tk.MustQuery(queryHashPartition).Check(regularResult) + } + // test indexReader for i := 0; i < 100; i++ { // explain select a from t where a > {y} use index(idx_a) order by a limit {x}; // check if IndexLookUp is used @@ -422,6 +543,24 @@ func TestOrderByandLimit(t *testing.T) { tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) } + // test indexReader with order property pushed down. + for i := 0; i < 100; i++ { + // explain select a from t where a > {y} use index(idx_a) order by a limit {x}; // check if IndexLookUp is used + // select a from t where a > {y} use index(idx_a) order by a limit {x}; // it can return the correct result + x := rand.Intn(1099) + y := rand.Intn(2000) + 1 + queryRangePartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange use index(idx_a) where a > %v order by a limit %v;", x, y) + queryHashPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange use index(idx_a) where a > %v order by a limit %v;", x, y) + queryRegular := fmt.Sprintf("select a from tregular use index(idx_a) where a > %v order by a limit %v;", x, y) + require.True(t, tk.HasPlan(queryRangePartition, "IndexReader")) // check if indexReader is used + require.True(t, tk.HasPlan(queryHashPartition, "IndexReader")) + require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed + require.True(t, tk.HasPlan(queryHashPartition, "Limit")) + regularResult := tk.MustQuery(queryRegular).Sort().Rows() + tk.MustQuery(queryRangePartition).Sort().Check(regularResult) + tk.MustQuery(queryHashPartition).Sort().Check(regularResult) + } + // test indexMerge for i := 0; i < 100; i++ { // explain select /*+ use_index_merge(t) */ * from t where a > 2 or b < 5 order by a limit {x}; // check if IndexMerge is used @@ -2834,7 +2973,7 @@ partition p1 values less than (7), partition p2 values less than (10))`) tk.MustExec("alter table p add unique idx(id)") tk.MustExec("insert into p values (1,3), (3,4), (5,6), (7,9)") - tk.MustQuery("select id from p use index (idx)").Check(testkit.Rows("1", "3", "5", "7")) + tk.MustQuery("select id from p use index (idx) order by id").Check(testkit.Rows("1", "3", "5", "7")) } func TestGlobalIndexDoubleRead(t *testing.T) { diff --git a/executor/table_reader.go b/executor/table_reader.go index ce5a10b125754..984212dcf7328 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -61,7 +61,7 @@ func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Con } type kvRangeBuilder interface { - buildKeyRange(ranges []*ranger.Range) ([]kv.KeyRange, error) + buildKeyRange(ranges []*ranger.Range) ([][]kv.KeyRange, error) buildKeyRangeSeparately(ranges []*ranger.Range) ([]int64, [][]kv.KeyRange, error) } @@ -205,13 +205,13 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { if err != nil { return err } - e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + e.kvRanges = kvReq.KeyRanges.AppendSelfTo(e.kvRanges) if len(secondPartRanges) != 0 { kvReq, err = e.buildKVReq(ctx, secondPartRanges) if err != nil { return err } - e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + e.kvRanges = kvReq.KeyRanges.AppendSelfTo(e.kvRanges) } return nil } @@ -314,10 +314,10 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra if err != nil { return nil, err } - slices.SortFunc(kvReq.KeyRanges, func(i, j kv.KeyRange) bool { + kvReq.KeyRanges.SortByFunc(func(i, j kv.KeyRange) bool { return bytes.Compare(i.StartKey, j.StartKey) < 0 }) - e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + e.kvRanges = kvReq.KeyRanges.AppendSelfTo(e.kvRanges) result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) if err != nil { @@ -409,7 +409,7 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R if err != nil { return nil, err } - reqBuilder = builder.SetKeyRanges(kvRange) + reqBuilder = builder.SetPartitionKeyRanges(kvRange) } else { reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback) } diff --git a/kv/BUILD.bazel b/kv/BUILD.bazel index 32dd9f1474179..992d99d382e42 100644 --- a/kv/BUILD.bazel +++ b/kv/BUILD.bazel @@ -48,6 +48,7 @@ go_library( "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", + "@org_golang_x_exp//slices", "@org_uber_go_zap//:zap", ], ) diff --git a/kv/kv.go b/kv/kv.go index 72e8111f0343d..8263746093a5c 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -15,6 +15,7 @@ package kv import ( + "bytes" "context" "crypto/tls" "time" @@ -33,6 +34,7 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" + "golang.org/x/exp/slices" ) // UnCommitIndexKVFlag uses to indicate the index key/value is no need to commit. @@ -335,13 +337,148 @@ func (t StoreType) Name() string { return "unspecified" } +// KeyRanges wrap the ranges for partitioned table cases. +// We might send ranges from different in the one request. +type KeyRanges struct { + ranges [][]KeyRange + + isPartitioned bool +} + +// NewPartitionedKeyRanges constructs a new RequestRange for partitioned table. +func NewPartitionedKeyRanges(ranges [][]KeyRange) *KeyRanges { + return &KeyRanges{ + ranges: ranges, + isPartitioned: true, + } +} + +// NewNonParitionedKeyRanges constructs a new RequestRange for a non partitioned table. +func NewNonParitionedKeyRanges(ranges []KeyRange) *KeyRanges { + return &KeyRanges{ + ranges: [][]KeyRange{ranges}, + isPartitioned: false, + } +} + +// FirstPartitionRange returns the the result of first range. +// We may use some func to generate ranges for both partitioned table and non partitioned table. +// This method provides a way to fallback to non-partitioned ranges. +func (rr *KeyRanges) FirstPartitionRange() []KeyRange { + if len(rr.ranges) == 0 { + return []KeyRange{} + } + return rr.ranges[0] +} + +// SetToNonPartitioned set the status to non-partitioned. +func (rr *KeyRanges) SetToNonPartitioned() error { + if len(rr.ranges) > 1 { + return errors.Errorf("you want to change the partitioned ranges to non-partitioned ranges") + } + rr.isPartitioned = false + return nil +} + +// AppendSelfTo appends itself to another slice. +func (rr *KeyRanges) AppendSelfTo(ranges []KeyRange) []KeyRange { + for _, r := range rr.ranges { + ranges = append(ranges, r...) + } + return ranges +} + +// SortByFunc sorts each partition's ranges. +// Since the ranges are sorted in most cases, we check it first. +func (rr *KeyRanges) SortByFunc(sortFunc func(i, j KeyRange) bool) { + if !slices.IsSortedFunc(rr.ranges, func(i, j []KeyRange) bool { + // A simple short-circuit since the empty range actually won't make anything wrong. + if len(i) == 0 || len(j) == 0 { + return true + } + return sortFunc(i[0], j[0]) + }) { + slices.SortFunc(rr.ranges, func(i, j []KeyRange) bool { + if len(i) == 0 { + return true + } + if len(j) == 0 { + return false + } + return sortFunc(i[0], j[0]) + }) + } + for i := range rr.ranges { + if !slices.IsSortedFunc(rr.ranges[i], sortFunc) { + slices.SortFunc(rr.ranges[i], sortFunc) + } + } +} + +// ForEachPartitionWithErr runs the func for each partition with an error check. +func (rr *KeyRanges) ForEachPartitionWithErr(theFunc func([]KeyRange) error) (err error) { + for i := range rr.ranges { + err = theFunc(rr.ranges[i]) + if err != nil { + return err + } + } + return nil +} + +// ForEachPartition runs the func for each partition without error check. +func (rr *KeyRanges) ForEachPartition(theFunc func([]KeyRange)) { + for i := range rr.ranges { + theFunc(rr.ranges[i]) + } +} + +// PartitionNum returns how many partition is involved in the ranges. +func (rr *KeyRanges) PartitionNum() int { + return len(rr.ranges) +} + +// IsFullySorted checks whether the ranges are sorted inside partition and each partition is also sorated. +func (rr *KeyRanges) IsFullySorted() bool { + sortedByPartition := slices.IsSortedFunc(rr.ranges, func(i, j []KeyRange) bool { + // A simple short-circuit since the empty range actually won't make anything wrong. + if len(i) == 0 || len(j) == 0 { + return true + } + return bytes.Compare(i[0].StartKey, j[0].StartKey) < 0 + }) + if !sortedByPartition { + return false + } + for _, ranges := range rr.ranges { + if !slices.IsSortedFunc(ranges, func(i, j KeyRange) bool { + return bytes.Compare(i.StartKey, j.StartKey) < 0 + }) { + return false + } + } + return true +} + +// TotalRangeNum returns how many ranges there are. +func (rr *KeyRanges) TotalRangeNum() int { + ret := 0 + for _, r := range rr.ranges { + ret += len(r) + } + return ret +} + // Request represents a kv request. type Request struct { // Tp is the request type. - Tp int64 - StartTs uint64 - Data []byte - KeyRanges []KeyRange + Tp int64 + StartTs uint64 + Data []byte + + // KeyRanges makes sure that the request is sent first by partition then by region. + // When the table is small, it's possible that multiple partitions are in the same region. + KeyRanges *KeyRanges // For PartitionTableScan used by tiflash. PartitionIDAndRanges []PartitionIDAndRanges diff --git a/planner/cascades/testdata/integration_suite_in.json b/planner/cascades/testdata/integration_suite_in.json index 569cb12860ac3..5533e6c672fcb 100644 --- a/planner/cascades/testdata/integration_suite_in.json +++ b/planner/cascades/testdata/integration_suite_in.json @@ -142,7 +142,7 @@ { "name": "TestCascadePlannerHashedPartTable", "cases": [ - "select * from pt1" + "select * from pt1 order by a" ] }, { diff --git a/planner/cascades/testdata/integration_suite_out.json b/planner/cascades/testdata/integration_suite_out.json index e8d98a41ec557..262a825256e41 100644 --- a/planner/cascades/testdata/integration_suite_out.json +++ b/planner/cascades/testdata/integration_suite_out.json @@ -1198,17 +1198,18 @@ "Name": "TestCascadePlannerHashedPartTable", "Cases": [ { - "SQL": "select * from pt1", + "SQL": "select * from pt1 order by a", "Plan": [ - "TableReader_5 10000.00 root partition:all data:TableFullScan_6", - "└─TableFullScan_6 10000.00 cop[tikv] table:pt1 keep order:false, stats:pseudo" + "Sort_11 10000.00 root test.pt1.a", + "└─TableReader_9 10000.00 root partition:all data:TableFullScan_10", + " └─TableFullScan_10 10000.00 cop[tikv] table:pt1 keep order:false, stats:pseudo" ], "Result": [ - "4 40", "1 10", - "5 50", "2 20", - "3 30" + "3 30", + "4 40", + "5 50" ] } ] diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index dca8a704994d3..8596746822b23 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -2289,6 +2289,7 @@ func (ds *DataSource) getOriginalPhysicalIndexScan(prop *property.PhysicalProper physicalTableID: ds.physicalTableID, tblColHists: ds.TblColHists, pkIsHandleCol: ds.getPKIsHandleCol(), + constColsByCond: path.ConstCols, prop: prop, }.Init(ds.ctx, ds.blockOffset) statsTbl := ds.statisticTable diff --git a/planner/core/fragment.go b/planner/core/fragment.go index 5dfa93186826f..c6aec17f21e6d 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -406,7 +406,7 @@ func (e *mppTaskGenerator) constructMPPBuildTaskReqForPartitionedTable(ts *Physi return nil, nil, errors.Trace(err) } partitionIDAndRanges[i].ID = pid - partitionIDAndRanges[i].KeyRanges = kvRanges + partitionIDAndRanges[i].KeyRanges = kvRanges.FirstPartitionRange() allPartitionsIDs[i] = pid } return &kv.MPPBuildTasksRequest{PartitionIDAndRanges: partitionIDAndRanges}, allPartitionsIDs, nil @@ -417,5 +417,5 @@ func (e *mppTaskGenerator) constructMPPBuildTaskForNonPartitionTable(ts *Physica if err != nil { return nil, errors.Trace(err) } - return &kv.MPPBuildTasksRequest{KeyRanges: kvRanges}, nil + return &kv.MPPBuildTasksRequest{KeyRanges: kvRanges.FirstPartitionRange()}, nil } diff --git a/planner/core/integration_partition_test.go b/planner/core/integration_partition_test.go index 7823f18474ad1..f1b915b66d038 100644 --- a/planner/core/integration_partition_test.go +++ b/planner/core/integration_partition_test.go @@ -1458,12 +1458,12 @@ func TestRangeColumnsExpr(t *testing.T) { "TableReader 1.14 root partition:p5,p12 data:Selection", "└─Selection 1.14 cop[tikv] in(rce.t.a, 4, 14), in(rce.t.b, NULL, 10)", " └─TableFullScan 21.00 cop[tikv] table:t keep order:false")) - tk.MustQuery(`select * from tref where a in (4,14) and b in (null,10)`).Check(testkit.Rows( - "4 10 3", - "14 10 4")) - tk.MustQuery(`select * from t where a in (4,14) and b in (null,10)`).Check(testkit.Rows( - "4 10 3", - "14 10 4")) + tk.MustQuery(`select * from tref where a in (4,14) and b in (null,10)`).Sort().Check(testkit.Rows( + "14 10 4", + "4 10 3")) + tk.MustQuery(`select * from t where a in (4,14) and b in (null,10)`).Sort().Check(testkit.Rows( + "14 10 4", + "4 10 3")) tk.MustQuery(`explain format = 'brief' select * from t where a in (4,14) and (b in (11,10) OR b is null)`).Check(testkit.Rows( "TableReader 3.43 root partition:p1,p5,p6,p11,p12 data:Selection", "└─Selection 3.43 cop[tikv] in(rce.t.a, 4, 14), or(in(rce.t.b, 11, 10), isnull(rce.t.b))", diff --git a/planner/core/partition_pruner_test.go b/planner/core/partition_pruner_test.go index 84d51524c682a..f037d7fe887e7 100644 --- a/planner/core/partition_pruner_test.go +++ b/planner/core/partition_pruner_test.go @@ -312,15 +312,15 @@ func TestListPartitionPruner(t *testing.T) { for i, tt := range input { testdata.OnRecord(func() { output[i].SQL = tt - output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Sort().Rows()) output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + tt).Rows()) }) tk.MustQuery("explain format = 'brief' " + tt).Check(testkit.Rows(output[i].Plan...)) - result := tk.MustQuery(tt) + result := tk.MustQuery(tt).Sort() result.Check(testkit.Rows(output[i].Result...)) // If the query doesn't specified the partition, compare the result with normal table if !strings.Contains(tt, "partition(") { - result.Check(tk2.MustQuery(tt).Rows()) + result.Check(tk.MustQuery(tt).Sort().Rows()) valid = true } require.True(t, valid) @@ -393,7 +393,7 @@ func TestListColumnsPartitionPruner(t *testing.T) { indexPlanTree := testdata.ConvertRowsToStrings(indexPlan.Rows()) testdata.OnRecord(func() { output[i].SQL = tt.SQL - output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(tt.SQL).Rows()) + output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(tt.SQL).Sort().Rows()) // Test for table without index. output[i].Plan = planTree // Test for table with index. @@ -408,14 +408,14 @@ func TestListColumnsPartitionPruner(t *testing.T) { checkPrunePartitionInfo(t, tt.SQL, tt.Pruner, indexPlanTree) // compare the result. - result := tk.MustQuery(tt.SQL) + result := tk.MustQuery(tt.SQL).Sort() idxResult := tk1.MustQuery(tt.SQL) - result.Check(idxResult.Rows()) + result.Check(idxResult.Sort().Rows()) result.Check(testkit.Rows(output[i].Result...)) // If the query doesn't specified the partition, compare the result with normal table if !strings.Contains(tt.SQL, "partition(") { - result.Check(tk2.MustQuery(tt.SQL).Rows()) + result.Check(tk2.MustQuery(tt.SQL).Sort().Rows()) valid = true } } diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 5146e63e42c25..ced23204f639d 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -677,7 +677,12 @@ type PhysicalIndexScan struct { // tblColHists contains all columns before pruning, which are used to calculate row-size tblColHists *statistics.HistColl pkIsHandleCol *expression.Column - prop *property.PhysicalProperty + + // constColsByCond records the constant part of the index columns caused by the access conds. + // e.g. the index is (a, b, c) and there's filter a = 1 and b = 2, then the column a and b are const part. + constColsByCond []bool + + prop *property.PhysicalProperty } // Clone implements PhysicalPlan interface. diff --git a/planner/core/task.go b/planner/core/task.go index c446b4ea27696..ac3d10a323a7b 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -24,11 +24,13 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/charset" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/collate" @@ -959,6 +961,10 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { } needPushDown := len(cols) > 0 if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDown(copTask.getStoreType()) && len(copTask.rootTaskConds) == 0 { + newTask, changed := p.pushTopNDownToDynamicPartition(copTask) + if changed { + return newTask + } // If all columns in topN are from index plan, we push it to index plan, otherwise we finish the index plan and // push it to table plan. var pushedDownTopN *PhysicalTopN @@ -978,6 +984,138 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { return attachPlan2Task(p, rootTask) } +// pushTopNDownToDynamicPartition is a temp solution for partition table. It actually does the same thing as DataSource's isMatchProp. +// We need to support a more enhanced read strategy in the execution phase. So that we can achieve Limit(TiDB)->Reader(TiDB)->Limit(TiKV/TiFlash)->Scan(TiKV/TiFlash). +// Before that is done, we use this logic to provide a way to keep the order property when reading from TiKV, so that we can use the orderliness of index to speed up the query. +// Here we can change the execution plan to TopN(TiDB)->Reader(TiDB)->Limit(TiKV)->Scan(TiKV).(TiFlash is not supported). +func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bool) { + copTsk = copTsk.copy().(*copTask) + if len(copTsk.rootTaskConds) > 0 { + return nil, false + } + colsProp, ok := GetPropByOrderByItems(p.ByItems) + if !ok { + return nil, false + } + allSameOrder, isDesc := colsProp.AllSameOrder() + if !allSameOrder { + return nil, false + } + checkIndexMatchProp := func(idxCols []*expression.Column, idxColLens []int, constColsByCond []bool, colsProp *property.PhysicalProperty) bool { + // If the number of the by-items is bigger than the index columns. We cannot push down since it must not keep order. + if len(idxCols) < len(colsProp.SortItems) { + return false + } + idxPos := 0 + for _, byItem := range colsProp.SortItems { + found := false + for ; idxPos < len(idxCols); idxPos++ { + if idxColLens[idxPos] == types.UnspecifiedLength && idxCols[idxPos].Equal(p.SCtx(), byItem.Col) { + found = true + idxPos++ + break + } + if len(constColsByCond) == 0 || idxPos > len(constColsByCond) || !constColsByCond[idxPos] { + found = false + break + } + } + if !found { + return false + } + } + return true + } + var ( + idxScan *PhysicalIndexScan + tblScan *PhysicalTableScan + tblInfo *model.TableInfo + err error + ) + if copTsk.indexPlan != nil { + copTsk.indexPlan, err = copTsk.indexPlan.Clone() + if err != nil { + return nil, false + } + finalIdxScanPlan := copTsk.indexPlan + for len(finalIdxScanPlan.Children()) > 0 && finalIdxScanPlan.Children()[0] != nil { + finalIdxScanPlan = finalIdxScanPlan.Children()[0] + } + idxScan = finalIdxScanPlan.(*PhysicalIndexScan) + tblInfo = idxScan.Table + } + if copTsk.tablePlan != nil { + copTsk.tablePlan, err = copTsk.tablePlan.Clone() + if err != nil { + return nil, false + } + finalTblScanPlan := copTsk.tablePlan + for len(finalTblScanPlan.Children()) > 0 { + finalTblScanPlan = finalTblScanPlan.Children()[0] + } + tblScan = finalTblScanPlan.(*PhysicalTableScan) + tblInfo = tblScan.Table + } + + pi := tblInfo.GetPartitionInfo() + if pi == nil { + return nil, false + } + if pi.Type == model.PartitionTypeList { + return nil, false + } + + if !copTsk.indexPlanFinished { + // If indexPlan side isn't finished, there's no selection on the table side. + + propMatched := checkIndexMatchProp(idxScan.IdxCols, idxScan.IdxColLens, idxScan.constColsByCond, colsProp) + if !propMatched { + return nil, false + } + + idxScan.Desc = isDesc + childProfile := copTsk.plan().statsInfo() + newCount := p.Offset + p.Count + stats := deriveLimitStats(childProfile, float64(newCount)) + pushedLimit := PhysicalLimit{ + Count: newCount, + }.Init(p.SCtx(), stats, p.SelectBlockOffset()) + pushedLimit.SetSchema(copTsk.indexPlan.Schema()) + copTsk = attachPlan2Task(pushedLimit, copTsk).(*copTask) + } else if copTsk.indexPlan == nil { + if tblScan.HandleCols == nil { + return nil, false + } + + if tblScan.HandleCols.IsInt() { + pk := tblScan.HandleCols.GetCol(0) + if len(colsProp.SortItems) != 1 || !colsProp.SortItems[0].Col.Equal(p.SCtx(), pk) { + return nil, false + } + } else { + idxCols, idxColLens := expression.IndexInfo2PrefixCols(tblScan.Columns, tblScan.Schema().Columns, tables.FindPrimaryIndex(tblScan.Table)) + matched := checkIndexMatchProp(idxCols, idxColLens, nil, colsProp) + if !matched { + return nil, false + } + } + tblScan.Desc = isDesc + childProfile := copTsk.plan().statsInfo() + newCount := p.Offset + p.Count + stats := deriveLimitStats(childProfile, float64(newCount)) + pushedLimit := PhysicalLimit{ + Count: newCount, + }.Init(p.SCtx(), stats, p.SelectBlockOffset()) + pushedLimit.SetSchema(copTsk.tablePlan.Schema()) + copTsk = attachPlan2Task(pushedLimit, copTsk).(*copTask) + } else { + return nil, false + } + + rootTask := copTsk.convertToRootTask(p.ctx) + return attachPlan2Task(p, rootTask), true +} + func (p *PhysicalProjection) attach2Task(tasks ...task) task { t := tasks[0].copy() if cop, ok := t.(*copTask); ok { diff --git a/planner/core/testdata/partition_pruner_out.json b/planner/core/testdata/partition_pruner_out.json index 3206c3dc5853d..0da53482a34cc 100644 --- a/planner/core/testdata/partition_pruner_out.json +++ b/planner/core/testdata/partition_pruner_out.json @@ -847,9 +847,9 @@ { "SQL": "select * from t7 where a is null or a > 0 order by a;", "Result": [ - "", "1", - "2" + "2", + "" ], "Plan": [ "Sort 3343.33 root test_partition.t7.a", @@ -866,8 +866,8 @@ { "SQL": "select * from t1 order by id,a", "Result": [ - " 10 ", "1 1 1", + "10 10 10", "2 2 2", "3 3 3", "4 4 4", @@ -876,7 +876,7 @@ "7 7 7", "8 8 8", "9 9 9", - "10 10 10" + " 10 " ], "Plan": [ "Sort 10000.00 root test_partition.t1.id, test_partition.t1.a", @@ -1341,8 +1341,8 @@ { "SQL": "select * from t1 where a = 1 or true order by id,a", "Result": [ - " 10 ", "1 1 1", + "10 10 10", "2 2 2", "3 3 3", "4 4 4", @@ -1351,7 +1351,7 @@ "7 7 7", "8 8 8", "9 9 9", - "10 10 10" + " 10 " ], "Plan": [ "Sort 10000.00 root test_partition.t1.id, test_partition.t1.a", @@ -1973,13 +1973,13 @@ "SQL": "select * from t1 where a < 3 or b > 4", "Result": [ "1 1 1", + "10 10 10", "2 2 2", "5 5 5", "6 6 6", "7 7 7", "8 8 8", - "9 9 9", - "10 10 10" + "9 9 9" ], "Plan": [ "TableReader 5548.89 root partition:p0,p1 data:Selection", @@ -2059,11 +2059,11 @@ "SQL": "select * from t1 where (a<=1 and b<=1) or (a >=6 and b>=6)", "Result": [ "1 1 1", + "10 10 10", "6 6 6", "7 7 7", "8 8 8", - "9 9 9", - "10 10 10" + "9 9 9" ], "Plan": [ "TableReader 2092.85 root partition:p0,p1 data:Selection", @@ -2080,6 +2080,7 @@ "SQL": "select * from t1 where a <= 100 and b <= 100", "Result": [ "1 1 1", + "10 10 10", "2 2 2", "3 3 3", "4 4 4", @@ -2087,8 +2088,7 @@ "6 6 6", "7 7 7", "8 8 8", - "9 9 9", - "10 10 10" + "9 9 9" ], "Plan": [ "TableReader 1104.45 root partition:p0,p1 data:Selection", @@ -2126,10 +2126,10 @@ { "SQL": "select * from t1 left join t2 on true where (t1.a <=1 or t1.a <= 3 and (t1.b >=3 and t1.b <= 5)) and (t2.a >= 6 and t2.a <= 8) and t2.b>=7 and t2.id>=7 order by t1.id,t1.a", "Result": [ - "1 1 1 8 8 8", "1 1 1 7 7 7", - "3 3 3 8 8 8", - "3 3 3 7 7 7" + "1 1 1 8 8 8", + "3 3 3 7 7 7", + "3 3 3 8 8 8" ], "Plan": [ "Sort 93855.70 root test_partition.t1.id, test_partition.t1.a", @@ -2326,8 +2326,8 @@ { "SQL": "select * from t1 where a = 3 or true order by id,a", "Result": [ - " 10 ", "1 1 1", + "10 10 10", "2 2 2", "3 3 3", "4 4 4", @@ -2336,7 +2336,7 @@ "7 7 7", "8 8 8", "9 9 9", - "10 10 10" + " 10 " ], "Plan": [ "Sort 10000.00 root test_partition.t1.id, test_partition.t1.a", @@ -2463,6 +2463,7 @@ "SQL": "select * from t1 where (a >= 1 and a <= 6) or (a>=3 and b >=3)", "Result": [ "1 1 1", + "10 10 10", "2 2 2", "3 3 3", "4 4 4", @@ -2470,8 +2471,7 @@ "6 6 6", "7 7 7", "8 8 8", - "9 9 9", - "10 10 10" + "9 9 9" ], "Plan": [ "TableReader 1333.33 root partition:p0,p1 data:Selection", diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index bfd3bbcc94fdd..5f6e435028e3b 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -695,7 +695,8 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V } tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store.kvStore, keyRanges, req.StoreType, nil, 0, false, 0, partitionIDs) } else { - ranges := NewKeyRanges(req.KeyRanges) + // TODO: merge the if branch. + ranges := NewKeyRanges(req.KeyRanges.FirstPartitionRange()) tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store.kvStore, ranges, req.StoreType, nil, 0, false, 0) } diff --git a/store/copr/copr_test/coprocessor_test.go b/store/copr/copr_test/coprocessor_test.go index f92db7ba7c334..208f2e2bd2190 100644 --- a/store/copr/copr_test/coprocessor_test.go +++ b/store/copr/copr_test/coprocessor_test.go @@ -43,7 +43,7 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) { req := &kv.Request{ Tp: kv.ReqTypeDAG, - KeyRanges: copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z"), + KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), FixedRowCountHint: []int{1, 1, 3, copr.CopSmallTaskRow}, Concurrency: 15, } @@ -57,7 +57,7 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) { req = &kv.Request{ Tp: kv.ReqTypeDAG, - KeyRanges: copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z"), + KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), FixedRowCountHint: []int{1, 1, 3, 3}, Concurrency: 15, } @@ -72,7 +72,7 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) { // cross-region long range req = &kv.Request{ Tp: kv.ReqTypeDAG, - KeyRanges: copr.BuildKeyRanges("a", "z"), + KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "z")), FixedRowCountHint: []int{10}, Concurrency: 15, } @@ -86,7 +86,7 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) { req = &kv.Request{ Tp: kv.ReqTypeDAG, - KeyRanges: copr.BuildKeyRanges("a", "z"), + KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "z")), FixedRowCountHint: []int{copr.CopSmallTaskRow + 1}, Concurrency: 15, } diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 982e981f24c79..fdff94a09717d 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -15,7 +15,6 @@ package copr import ( - "bytes" "context" "fmt" "math" @@ -53,7 +52,6 @@ import ( "github.com/tikv/client-go/v2/txnkv/txnsnapshot" "github.com/tikv/client-go/v2/util" "go.uber.org/zap" - "golang.org/x/exp/slices" ) var coprCacheCounterEvict = tidbmetrics.DistSQLCoprCacheCounter.WithLabelValues("evict") @@ -121,10 +119,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars } failpoint.Inject("checkKeyRangeSortedForPaging", func(_ failpoint.Value) { if req.Paging.Enable { - isSorted := slices.IsSortedFunc(req.KeyRanges, func(i, j kv.KeyRange) bool { - return bytes.Compare(i.StartKey, j.StartKey) < 0 - }) - if !isSorted { + if !req.KeyRanges.IsFullySorted() { logutil.BgLogger().Fatal("distsql request key range not sorted!") } } @@ -138,8 +133,27 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars }) bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) - ranges := NewKeyRanges(req.KeyRanges) - tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req, eventCb) + var ( + tasks []*copTask + err error + ) + buildTaskFunc := func(ranges []kv.KeyRange) error { + keyRanges := NewKeyRanges(ranges) + tasksFromRanges, err := buildCopTasks(bo, c.store.GetRegionCache(), keyRanges, req, eventCb) + if err != nil { + return err + } + if len(tasks) == 0 { + tasks = tasksFromRanges + return nil + } + tasks = append(tasks, tasksFromRanges...) + return nil + } + // Here we build the task by partition, not directly by region. + // This is because it's possible that TiDB merge multiple small partition into one region which break some assumption. + // Keep it split by partition would be more safe. + err = req.KeyRanges.ForEachPartitionWithErr(buildTaskFunc) reqType := "null" if req.ClosestReplicaReadAdjuster != nil { reqType = "miss" diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index e45576b9d0674..c2d98f5a2b17e 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1627,3 +1627,30 @@ func IndexKVIsUnique(value []byte) bool { segs := SplitIndexValue(value) return segs.IntHandle != nil || segs.CommonHandle != nil } + +// VerifyTableIDForRanges verifies that all given ranges are valid to decode the table id. +func VerifyTableIDForRanges(keyRanges *kv.KeyRanges) ([]int64, error) { + tids := make([]int64, 0, keyRanges.PartitionNum()) + collectFunc := func(ranges []kv.KeyRange) error { + if len(ranges) == 0 { + return nil + } + tid := DecodeTableID(ranges[0].StartKey) + if tid <= 0 { + return errors.New("Incorrect keyRange is constrcuted") + } + tids = append(tids, tid) + for i := 1; i < len(ranges); i++ { + tmpTID := DecodeTableID(ranges[i].StartKey) + if tmpTID <= 0 { + return errors.New("Incorrect keyRange is constrcuted") + } + if tid != tmpTID { + return errors.Errorf("Using multi partition's ranges as single table's") + } + } + return nil + } + err := keyRanges.ForEachPartitionWithErr(collectFunc) + return tids, err +} diff --git a/testkit/result.go b/testkit/result.go index 0f7ad0ce53cbc..210d32d4c57b9 100644 --- a/testkit/result.go +++ b/testkit/result.go @@ -49,6 +49,11 @@ func (res *Result) Check(expected [][]interface{}) { res.require.Equal(needBuff.String(), resBuff.String(), res.comment) } +// AddComment adds the extra comment for the Result's output. +func (res *Result) AddComment(c string) { + res.comment += "\n" + c +} + // CheckWithFunc asserts the result match the expected results in the way `f` specifies. func (res *Result) CheckWithFunc(expected [][]interface{}, f func([]string, []interface{}) bool) { res.require.Equal(len(res.rows), len(expected), res.comment+"\nResult length mismatch")