From 585468f6f90b2f537910e915e7ff5154f7d3c6c3 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 7 Nov 2023 15:01:47 +0800 Subject: [PATCH 1/3] init --- pkg/executor/BUILD.bazel | 4 +- pkg/executor/benchmark_test.go | 13 +++--- pkg/executor/builder.go | 17 ++++---- pkg/executor/executor.go | 5 ++- pkg/executor/executor_pkg_test.go | 41 +++++++++--------- pkg/executor/executor_required_rows_test.go | 13 +++--- pkg/executor/sort_exec/BUILD.bazel | 35 +++++++++++++++ pkg/executor/{ => sort_exec}/sort.go | 48 ++++++++++----------- pkg/executor/{ => sort_exec}/sort_test.go | 2 +- 9 files changed, 109 insertions(+), 69 deletions(-) create mode 100644 pkg/executor/sort_exec/BUILD.bazel rename pkg/executor/{ => sort_exec}/sort.go (93%) rename pkg/executor/{ => sort_exec}/sort_test.go (99%) diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index ab0dd7d31e72e..b0cac939a774f 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -77,7 +77,6 @@ go_library( "shuffle.go", "simple.go", "slow_query.go", - "sort.go", "split.go", "stmtsummary.go", "table_reader.go", @@ -127,6 +126,7 @@ go_library( "//pkg/executor/lockstats", "//pkg/executor/metrics", "//pkg/executor/mppcoordmanager", + "//pkg/executor/sort_exec", "//pkg/expression", "//pkg/expression/aggregation", "//pkg/infoschema", @@ -349,7 +349,6 @@ go_test( "shuffle_test.go", "slow_query_sql_test.go", "slow_query_test.go", - "sort_test.go", "split_test.go", "stale_txn_test.go", "stmtsummary_test.go", @@ -381,6 +380,7 @@ go_test( "//pkg/executor/importer", "//pkg/executor/internal/builder", "//pkg/executor/internal/exec", + "//pkg/executor/sort_exec", "//pkg/expression", "//pkg/expression/aggregation", "//pkg/infoschema", diff --git a/pkg/executor/benchmark_test.go b/pkg/executor/benchmark_test.go index 458ca9250c0f2..967b9cdb732a1 100644 --- a/pkg/executor/benchmark_test.go +++ b/pkg/executor/benchmark_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/executor/aggfuncs" "github.com/pingcap/tidb/pkg/executor/aggregate" "github.com/pingcap/tidb/pkg/executor/internal/exec" + "github.com/pingcap/tidb/pkg/executor/sort_exec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/expression/aggregation" "github.com/pingcap/tidb/pkg/parser/ast" @@ -1646,20 +1647,20 @@ func prepare4MergeJoin(tc *mergeJoinTestCase, innerDS, outerDS *mockDataSource, var leftExec, rightExec exec.Executor if sorted { - leftSortExec := &SortExec{ + leftSortExec := &sort_exec.SortExec{ BaseExecutor: exec.NewBaseExecutor(tc.ctx, innerDS.Schema(), 3, innerDS), ByItems: make([]*util.ByItems, 0, len(tc.innerJoinKeyIdx)), - schema: innerDS.Schema(), + ExecSchema: innerDS.Schema(), } for _, key := range innerJoinKeys { leftSortExec.ByItems = append(leftSortExec.ByItems, &util.ByItems{Expr: key}) } leftExec = leftSortExec - rightSortExec := &SortExec{ + rightSortExec := &sort_exec.SortExec{ BaseExecutor: exec.NewBaseExecutor(tc.ctx, outerDS.Schema(), 4, outerDS), ByItems: make([]*util.ByItems, 0, len(tc.outerJoinKeyIdx)), - schema: outerDS.Schema(), + ExecSchema: outerDS.Schema(), } for _, key := range outerJoinKeys { rightSortExec.ByItems = append(rightSortExec.ByItems, &util.ByItems{Expr: key}) @@ -1917,10 +1918,10 @@ func benchmarkSortExec(b *testing.B, cas *sortCase) { ndvs: cas.ndvs, } dataSource := buildMockDataSource(opt) - executor := &SortExec{ + executor := &sort_exec.SortExec{ BaseExecutor: exec.NewBaseExecutor(cas.ctx, dataSource.Schema(), 4, dataSource), ByItems: make([]*util.ByItems, 0, len(cas.orderByIdx)), - schema: dataSource.Schema(), + ExecSchema: dataSource.Schema(), } for _, idx := range cas.orderByIdx { executor.ByItems = append(executor.ByItems, &util.ByItems{Expr: cas.columns()[idx]}) diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index a980eb1d37fe5..1bf4193dd1dc4 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -47,6 +47,7 @@ import ( "github.com/pingcap/tidb/pkg/executor/internal/vecgroupchecker" "github.com/pingcap/tidb/pkg/executor/lockstats" executor_metrics "github.com/pingcap/tidb/pkg/executor/metrics" + "github.com/pingcap/tidb/pkg/executor/sort_exec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/expression/aggregation" "github.com/pingcap/tidb/pkg/infoschema" @@ -1297,7 +1298,7 @@ func (b *executorBuilder) buildTrace(v *plannercore.Trace) exec.Executor { optimizerTraceTarget: v.OptimizerTraceTarget, } if t.format == plannercore.TraceFormatLog && !t.optimizerTrace { - return &SortExec{ + return &sort_exec.SortExec{ BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), t), ByItems: []*plannerutil.ByItems{ {Expr: &expression.Column{ @@ -1305,7 +1306,7 @@ func (b *executorBuilder) buildTrace(v *plannercore.Trace) exec.Executor { RetType: types.NewFieldType(mysql.TypeTimestamp), }}, }, - schema: v.Schema(), + ExecSchema: v.Schema(), } } return t @@ -2245,10 +2246,10 @@ func (b *executorBuilder) buildSort(v *plannercore.PhysicalSort) exec.Executor { if b.err != nil { return nil } - sortExec := SortExec{ + sortExec := sort_exec.SortExec{ BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec), ByItems: v.ByItems, - schema: v.Schema(), + ExecSchema: v.Schema(), } executor_metrics.ExecutorCounterSortExec.Inc() return &sortExec @@ -2259,15 +2260,15 @@ func (b *executorBuilder) buildTopN(v *plannercore.PhysicalTopN) exec.Executor { if b.err != nil { return nil } - sortExec := SortExec{ + sortExec := sort_exec.SortExec{ BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec), ByItems: v.ByItems, - schema: v.Schema(), + ExecSchema: v.Schema(), } executor_metrics.ExecutorCounterTopNExec.Inc() - return &TopNExec{ + return &sort_exec.TopNExec{ SortExec: sortExec, - limit: &plannercore.PhysicalLimit{Count: v.Count, Offset: v.Offset}, + Limit: &plannercore.PhysicalLimit{Count: v.Count, Offset: v.Offset}, } } diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index a1817c916b7b5..eecc14d158b06 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/pkg/executor/aggregate" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/executor/internal/pdhelper" + "github.com/pingcap/tidb/pkg/executor/sort_exec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" @@ -102,12 +103,12 @@ var ( _ exec.Executor = &ShowDDLExec{} _ exec.Executor = &ShowDDLJobsExec{} _ exec.Executor = &ShowDDLJobQueriesExec{} - _ exec.Executor = &SortExec{} + _ exec.Executor = &sort_exec.SortExec{} _ exec.Executor = &aggregate.StreamAggExec{} _ exec.Executor = &TableDualExec{} _ exec.Executor = &TableReaderExecutor{} _ exec.Executor = &TableScanExec{} - _ exec.Executor = &TopNExec{} + _ exec.Executor = &sort_exec.TopNExec{} _ exec.Executor = &UnionExec{} _ exec.Executor = &FastCheckTableExec{} diff --git a/pkg/executor/executor_pkg_test.go b/pkg/executor/executor_pkg_test.go index 45a98caba9258..162f26f3df515 100644 --- a/pkg/executor/executor_pkg_test.go +++ b/pkg/executor/executor_pkg_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/pkg/executor/aggfuncs" "github.com/pingcap/tidb/pkg/executor/aggregate" "github.com/pingcap/tidb/pkg/executor/internal/exec" + "github.com/pingcap/tidb/pkg/executor/sort_exec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/kv" plannerutil "github.com/pingcap/tidb/pkg/planner/util" @@ -290,10 +291,10 @@ func TestSortSpillDisk(t *testing.T) { ndvs: cas.ndvs, } dataSource := buildMockDataSource(opt) - exe := &SortExec{ + exe := &sort_exec.SortExec{ BaseExecutor: exec.NewBaseExecutor(cas.ctx, dataSource.Schema(), 0, dataSource), ByItems: make([]*plannerutil.ByItems, 0, len(cas.orderByIdx)), - schema: dataSource.Schema(), + ExecSchema: dataSource.Schema(), } for _, idx := range cas.orderByIdx { exe.ByItems = append(exe.ByItems, &plannerutil.ByItems{Expr: cas.columns()[idx]}) @@ -311,9 +312,9 @@ func TestSortSpillDisk(t *testing.T) { } } // Test only 1 partition and all data in memory. - require.Len(t, exe.partitionList, 1) - require.Equal(t, false, exe.partitionList[0].AlreadySpilledSafeForTest()) - require.Equal(t, 2048, exe.partitionList[0].NumRow()) + require.Len(t, exe.PartitionList, 1) + require.Equal(t, false, exe.PartitionList[0].AlreadySpilledSafeForTest()) + require.Equal(t, 2048, exe.PartitionList[0].NumRow()) err = exe.Close() require.NoError(t, err) @@ -334,16 +335,16 @@ func TestSortSpillDisk(t *testing.T) { // Now spilling is in parallel. // Maybe the second add() will called before spilling, depends on // Golang goroutine scheduling. So the result has two possibilities. - if len(exe.partitionList) == 2 { - require.Len(t, exe.partitionList, 2) - require.Equal(t, true, exe.partitionList[0].AlreadySpilledSafeForTest()) - require.Equal(t, true, exe.partitionList[1].AlreadySpilledSafeForTest()) - require.Equal(t, 1024, exe.partitionList[0].NumRow()) - require.Equal(t, 1024, exe.partitionList[1].NumRow()) + if len(exe.PartitionList) == 2 { + require.Len(t, exe.PartitionList, 2) + require.Equal(t, true, exe.PartitionList[0].AlreadySpilledSafeForTest()) + require.Equal(t, true, exe.PartitionList[1].AlreadySpilledSafeForTest()) + require.Equal(t, 1024, exe.PartitionList[0].NumRow()) + require.Equal(t, 1024, exe.PartitionList[1].NumRow()) } else { - require.Len(t, exe.partitionList, 1) - require.Equal(t, true, exe.partitionList[0].AlreadySpilledSafeForTest()) - require.Equal(t, 2048, exe.partitionList[0].NumRow()) + require.Len(t, exe.PartitionList, 1) + require.Equal(t, true, exe.PartitionList[0].AlreadySpilledSafeForTest()) + require.Equal(t, 2048, exe.PartitionList[0].NumRow()) } err = exe.Close() @@ -363,9 +364,9 @@ func TestSortSpillDisk(t *testing.T) { } } // Test only 1 partition but spill disk. - require.Len(t, exe.partitionList, 1) - require.Equal(t, true, exe.partitionList[0].AlreadySpilledSafeForTest()) - require.Equal(t, 2048, exe.partitionList[0].NumRow()) + require.Len(t, exe.PartitionList, 1) + require.Equal(t, true, exe.PartitionList[0].AlreadySpilledSafeForTest()) + require.Equal(t, 2048, exe.PartitionList[0].NumRow()) err = exe.Close() require.NoError(t, err) @@ -385,10 +386,10 @@ func TestSortSpillDisk(t *testing.T) { ndvs: cas.ndvs, } dataSource = buildMockDataSource(opt) - exe = &SortExec{ + exe = &sort_exec.SortExec{ BaseExecutor: exec.NewBaseExecutor(cas.ctx, dataSource.Schema(), 0, dataSource), ByItems: make([]*plannerutil.ByItems, 0, len(cas.orderByIdx)), - schema: dataSource.Schema(), + ExecSchema: dataSource.Schema(), } for _, idx := range cas.orderByIdx { exe.ByItems = append(exe.ByItems, &plannerutil.ByItems{Expr: cas.columns()[idx]}) @@ -406,7 +407,7 @@ func TestSortSpillDisk(t *testing.T) { } } // Don't spill too many partitions. - require.True(t, len(exe.partitionList) <= 4) + require.True(t, len(exe.PartitionList) <= 4) err = exe.Close() require.NoError(t, err) } diff --git a/pkg/executor/executor_required_rows_test.go b/pkg/executor/executor_required_rows_test.go index a22f15e2494c6..034f47ae40fef 100644 --- a/pkg/executor/executor_required_rows_test.go +++ b/pkg/executor/executor_required_rows_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/executor/internal/exec" + "github.com/pingcap/tidb/pkg/executor/sort_exec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/expression/aggregation" "github.com/pingcap/tidb/pkg/parser/ast" @@ -278,10 +279,10 @@ func TestSortRequiredRows(t *testing.T) { } func buildSortExec(sctx sessionctx.Context, byItems []*util.ByItems, src exec.Executor) exec.Executor { - sortExec := SortExec{ + sortExec := sort_exec.SortExec{ BaseExecutor: exec.NewBaseExecutor(sctx, src.Schema(), 0, src), ByItems: byItems, - schema: src.Schema(), + ExecSchema: src.Schema(), } return &sortExec } @@ -385,14 +386,14 @@ func TestTopNRequiredRows(t *testing.T) { } func buildTopNExec(ctx sessionctx.Context, offset, count int, byItems []*util.ByItems, src exec.Executor) exec.Executor { - sortExec := SortExec{ + sortExec := sort_exec.SortExec{ BaseExecutor: exec.NewBaseExecutor(ctx, src.Schema(), 0, src), ByItems: byItems, - schema: src.Schema(), + ExecSchema: src.Schema(), } - return &TopNExec{ + return &sort_exec.TopNExec{ SortExec: sortExec, - limit: &plannercore.PhysicalLimit{Count: uint64(count), Offset: uint64(offset)}, + Limit: &plannercore.PhysicalLimit{Count: uint64(count), Offset: uint64(offset)}, } } diff --git a/pkg/executor/sort_exec/BUILD.bazel b/pkg/executor/sort_exec/BUILD.bazel new file mode 100644 index 0000000000000..1f9546905373e --- /dev/null +++ b/pkg/executor/sort_exec/BUILD.bazel @@ -0,0 +1,35 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "sort_exec", + srcs = ["sort.go"], + importpath = "github.com/pingcap/tidb/pkg/executor/sort_exec", + visibility = ["//visibility:public"], + deps = [ + "//pkg/executor/internal/exec", + "//pkg/expression", + "//pkg/planner/core", + "//pkg/planner/util", + "//pkg/sessionctx/variable", + "//pkg/util/chunk", + "//pkg/util/disk", + "//pkg/util/memory", + "//pkg/util/sqlkiller", + "@com_github_pingcap_failpoint//:failpoint", + ], +) + +go_test( + name = "sort_exec_test", + timeout = "short", + srcs = ["sort_test.go"], + flaky = True, + deps = [ + "//pkg/config", + "//pkg/sessionctx/variable", + "//pkg/testkit", + "//pkg/util", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/executor/sort.go b/pkg/executor/sort_exec/sort.go similarity index 93% rename from pkg/executor/sort.go rename to pkg/executor/sort_exec/sort.go index 31652778001c3..63e3022b4ee57 100644 --- a/pkg/executor/sort.go +++ b/pkg/executor/sort_exec/sort.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package executor +package sort_exec import ( "container/heap" @@ -36,10 +36,10 @@ import ( type SortExec struct { exec.BaseExecutor - ByItems []*util.ByItems - Idx int - fetched bool - schema *expression.Schema + ByItems []*util.ByItems + Idx int + fetched bool + ExecSchema *expression.Schema // keyColumns is the column index of the by items. keyColumns []int @@ -51,8 +51,8 @@ type SortExec struct { memTracker *memory.Tracker diskTracker *disk.Tracker - // partitionList is the chunks to store row values for partitions. Every partition is a sorted list. - partitionList []*chunk.SortedRowContainer + // PartitionList is the chunks to store row values for partitions. Every partition is a sorted list. + PartitionList []*chunk.SortedRowContainer // multiWayMerge uses multi-way merge for spill disk. // The multi-way merge algorithm can refer to https://en.wikipedia.org/wiki/K-way_merge_algorithm @@ -63,13 +63,13 @@ type SortExec struct { // Close implements the Executor Close interface. func (e *SortExec) Close() error { - for _, container := range e.partitionList { + for _, container := range e.PartitionList { err := container.Close() if err != nil { return err } } - e.partitionList = e.partitionList[:0] + e.PartitionList = e.PartitionList[:0] if e.rowChunks != nil { e.memTracker.Consume(-e.rowChunks.GetMemTracker().BytesConsumed()) @@ -97,7 +97,7 @@ func (e *SortExec) Open(ctx context.Context) error { e.diskTracker = memory.NewTracker(e.ID(), -1) e.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker) } - e.partitionList = e.partitionList[:0] + e.PartitionList = e.PartitionList[:0] return e.Children(0).Open(ctx) } @@ -121,16 +121,16 @@ func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error { e.fetched = true } - if len(e.partitionList) == 0 { + if len(e.PartitionList) == 0 { return nil } - if len(e.partitionList) > 1 { + if len(e.PartitionList) > 1 { if err := e.externalSorting(req); err != nil { return err } } else { - for !req.IsFull() && e.Idx < e.partitionList[0].NumRow() { - _, _, err := e.partitionList[0].GetSortedRowAndAlwaysAppendToChunk(e.Idx, req) + for !req.IsFull() && e.Idx < e.PartitionList[0].NumRow() { + _, _, err := e.PartitionList[0].GetSortedRowAndAlwaysAppendToChunk(e.Idx, req) if err != nil { return err } @@ -142,11 +142,11 @@ func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error { func (e *SortExec) externalSorting(req *chunk.Chunk) (err error) { if e.multiWayMerge == nil { - e.multiWayMerge = &multiWayMerge{e.lessRow, e.compressRow, make([]partitionPointer, 0, len(e.partitionList))} - for i := 0; i < len(e.partitionList); i++ { + e.multiWayMerge = &multiWayMerge{e.lessRow, e.compressRow, make([]partitionPointer, 0, len(e.PartitionList))} + for i := 0; i < len(e.PartitionList); i++ { chk := chunk.New(exec.RetTypes(e), 1, 1) - row, _, err := e.partitionList[i].GetSortedRowAndAlwaysAppendToChunk(0, chk) + row, _, err := e.PartitionList[i].GetSortedRowAndAlwaysAppendToChunk(0, chk) if err != nil { return err } @@ -160,12 +160,12 @@ func (e *SortExec) externalSorting(req *chunk.Chunk) (err error) { req.AppendRow(partitionPtr.row) partitionPtr.consumed++ partitionPtr.chk.Reset() - if partitionPtr.consumed >= e.partitionList[partitionPtr.partitionID].NumRow() { + if partitionPtr.consumed >= e.PartitionList[partitionPtr.partitionID].NumRow() { heap.Remove(e.multiWayMerge, 0) continue } - partitionPtr.row, _, err = e.partitionList[partitionPtr.partitionID]. + partitionPtr.row, _, err = e.PartitionList[partitionPtr.partitionID]. GetSortedRowAndAlwaysAppendToChunk(partitionPtr.consumed, partitionPtr.chk) if err != nil { return err @@ -209,7 +209,7 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error { } if err := e.rowChunks.Add(chk); err != nil { if errors.Is(err, chunk.ErrCannotAddBecauseSorted) { - e.partitionList = append(e.partitionList, e.rowChunks) + e.PartitionList = append(e.PartitionList, e.rowChunks) e.rowChunks = chunk.NewSortedRowContainer(fields, e.MaxChunkSize(), byItemsDesc, e.keyColumns, e.keyCmpFuncs) e.rowChunks.GetMemTracker().AttachTo(e.memTracker) e.rowChunks.GetMemTracker().SetLabel(memory.LabelForRowChunks) @@ -242,7 +242,7 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error { if err != nil { return err } - e.partitionList = append(e.partitionList, e.rowChunks) + e.PartitionList = append(e.PartitionList, e.rowChunks) } return nil } @@ -333,7 +333,7 @@ func (h *multiWayMerge) Swap(i, j int) { // Instead of sorting all the rows fetched from the table, it keeps the Top-N elements only in a heap to reduce memory usage. type TopNExec struct { SortExec - limit *plannercore.PhysicalLimit + Limit *plannercore.PhysicalLimit totalLimit uint64 // rowChunks is the chunks to store row values. @@ -430,8 +430,8 @@ func (e *TopNExec) Open(ctx context.Context) error { func (e *TopNExec) Next(ctx context.Context, req *chunk.Chunk) error { req.Reset() if !e.fetched { - e.totalLimit = e.limit.Offset + e.limit.Count - e.Idx = int(e.limit.Offset) + e.totalLimit = e.Limit.Offset + e.Limit.Count + e.Idx = int(e.Limit.Offset) err := e.loadChunksUntilTotalLimit(ctx) if err != nil { return err diff --git a/pkg/executor/sort_test.go b/pkg/executor/sort_exec/sort_test.go similarity index 99% rename from pkg/executor/sort_test.go rename to pkg/executor/sort_exec/sort_test.go index 019a52d147338..5f52e5b9183c2 100644 --- a/pkg/executor/sort_test.go +++ b/pkg/executor/sort_exec/sort_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package executor_test +package sort_exec_test import ( "bytes" From cea5d2c96961bbde0524356d7f1bb42d7c1f4634 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 7 Nov 2023 15:25:09 +0800 Subject: [PATCH 2/3] fix ci --- pkg/executor/BUILD.bazel | 4 ++-- pkg/executor/benchmark_test.go | 8 ++++---- pkg/executor/builder.go | 10 +++++----- pkg/executor/executor.go | 6 +++--- pkg/executor/executor_pkg_test.go | 6 +++--- pkg/executor/executor_required_rows_test.go | 8 ++++---- .../{sort_exec => sortexec}/BUILD.bazel | 17 +++++++++++++++-- pkg/executor/{sort_exec => sortexec}/sort.go | 2 +- .../{sort_exec => sortexec}/sort_test.go | 2 +- 9 files changed, 38 insertions(+), 25 deletions(-) rename pkg/executor/{sort_exec => sortexec}/BUILD.bazel (68%) rename pkg/executor/{sort_exec => sortexec}/sort.go (99%) rename pkg/executor/{sort_exec => sortexec}/sort_test.go (99%) diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index b0cac939a774f..247972dd8dbef 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -126,7 +126,7 @@ go_library( "//pkg/executor/lockstats", "//pkg/executor/metrics", "//pkg/executor/mppcoordmanager", - "//pkg/executor/sort_exec", + "//pkg/executor/sortexec", "//pkg/expression", "//pkg/expression/aggregation", "//pkg/infoschema", @@ -380,7 +380,7 @@ go_test( "//pkg/executor/importer", "//pkg/executor/internal/builder", "//pkg/executor/internal/exec", - "//pkg/executor/sort_exec", + "//pkg/executor/sortexec", "//pkg/expression", "//pkg/expression/aggregation", "//pkg/infoschema", diff --git a/pkg/executor/benchmark_test.go b/pkg/executor/benchmark_test.go index 967b9cdb732a1..533faf835644e 100644 --- a/pkg/executor/benchmark_test.go +++ b/pkg/executor/benchmark_test.go @@ -31,7 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/executor/aggfuncs" "github.com/pingcap/tidb/pkg/executor/aggregate" "github.com/pingcap/tidb/pkg/executor/internal/exec" - "github.com/pingcap/tidb/pkg/executor/sort_exec" + "github.com/pingcap/tidb/pkg/executor/sortexec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/expression/aggregation" "github.com/pingcap/tidb/pkg/parser/ast" @@ -1647,7 +1647,7 @@ func prepare4MergeJoin(tc *mergeJoinTestCase, innerDS, outerDS *mockDataSource, var leftExec, rightExec exec.Executor if sorted { - leftSortExec := &sort_exec.SortExec{ + leftSortExec := &sortexec.SortExec{ BaseExecutor: exec.NewBaseExecutor(tc.ctx, innerDS.Schema(), 3, innerDS), ByItems: make([]*util.ByItems, 0, len(tc.innerJoinKeyIdx)), ExecSchema: innerDS.Schema(), @@ -1657,7 +1657,7 @@ func prepare4MergeJoin(tc *mergeJoinTestCase, innerDS, outerDS *mockDataSource, } leftExec = leftSortExec - rightSortExec := &sort_exec.SortExec{ + rightSortExec := &sortexec.SortExec{ BaseExecutor: exec.NewBaseExecutor(tc.ctx, outerDS.Schema(), 4, outerDS), ByItems: make([]*util.ByItems, 0, len(tc.outerJoinKeyIdx)), ExecSchema: outerDS.Schema(), @@ -1918,7 +1918,7 @@ func benchmarkSortExec(b *testing.B, cas *sortCase) { ndvs: cas.ndvs, } dataSource := buildMockDataSource(opt) - executor := &sort_exec.SortExec{ + executor := &sortexec.SortExec{ BaseExecutor: exec.NewBaseExecutor(cas.ctx, dataSource.Schema(), 4, dataSource), ByItems: make([]*util.ByItems, 0, len(cas.orderByIdx)), ExecSchema: dataSource.Schema(), diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index 1bf4193dd1dc4..789e75134526b 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -47,7 +47,7 @@ import ( "github.com/pingcap/tidb/pkg/executor/internal/vecgroupchecker" "github.com/pingcap/tidb/pkg/executor/lockstats" executor_metrics "github.com/pingcap/tidb/pkg/executor/metrics" - "github.com/pingcap/tidb/pkg/executor/sort_exec" + "github.com/pingcap/tidb/pkg/executor/sortexec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/expression/aggregation" "github.com/pingcap/tidb/pkg/infoschema" @@ -1298,7 +1298,7 @@ func (b *executorBuilder) buildTrace(v *plannercore.Trace) exec.Executor { optimizerTraceTarget: v.OptimizerTraceTarget, } if t.format == plannercore.TraceFormatLog && !t.optimizerTrace { - return &sort_exec.SortExec{ + return &sortexec.SortExec{ BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), t), ByItems: []*plannerutil.ByItems{ {Expr: &expression.Column{ @@ -2246,7 +2246,7 @@ func (b *executorBuilder) buildSort(v *plannercore.PhysicalSort) exec.Executor { if b.err != nil { return nil } - sortExec := sort_exec.SortExec{ + sortExec := sortexec.SortExec{ BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec), ByItems: v.ByItems, ExecSchema: v.Schema(), @@ -2260,13 +2260,13 @@ func (b *executorBuilder) buildTopN(v *plannercore.PhysicalTopN) exec.Executor { if b.err != nil { return nil } - sortExec := sort_exec.SortExec{ + sortExec := sortexec.SortExec{ BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec), ByItems: v.ByItems, ExecSchema: v.Schema(), } executor_metrics.ExecutorCounterTopNExec.Inc() - return &sort_exec.TopNExec{ + return &sortexec.TopNExec{ SortExec: sortExec, Limit: &plannercore.PhysicalLimit{Count: v.Count, Offset: v.Offset}, } diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index eecc14d158b06..53cf815415ca0 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -39,7 +39,7 @@ import ( "github.com/pingcap/tidb/pkg/executor/aggregate" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/executor/internal/pdhelper" - "github.com/pingcap/tidb/pkg/executor/sort_exec" + "github.com/pingcap/tidb/pkg/executor/sortexec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" @@ -103,12 +103,12 @@ var ( _ exec.Executor = &ShowDDLExec{} _ exec.Executor = &ShowDDLJobsExec{} _ exec.Executor = &ShowDDLJobQueriesExec{} - _ exec.Executor = &sort_exec.SortExec{} + _ exec.Executor = &sortexec.SortExec{} _ exec.Executor = &aggregate.StreamAggExec{} _ exec.Executor = &TableDualExec{} _ exec.Executor = &TableReaderExecutor{} _ exec.Executor = &TableScanExec{} - _ exec.Executor = &sort_exec.TopNExec{} + _ exec.Executor = &sortexec.TopNExec{} _ exec.Executor = &UnionExec{} _ exec.Executor = &FastCheckTableExec{} diff --git a/pkg/executor/executor_pkg_test.go b/pkg/executor/executor_pkg_test.go index 162f26f3df515..896f38a4211a7 100644 --- a/pkg/executor/executor_pkg_test.go +++ b/pkg/executor/executor_pkg_test.go @@ -26,7 +26,7 @@ import ( "github.com/pingcap/tidb/pkg/executor/aggfuncs" "github.com/pingcap/tidb/pkg/executor/aggregate" "github.com/pingcap/tidb/pkg/executor/internal/exec" - "github.com/pingcap/tidb/pkg/executor/sort_exec" + "github.com/pingcap/tidb/pkg/executor/sortexec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/kv" plannerutil "github.com/pingcap/tidb/pkg/planner/util" @@ -291,7 +291,7 @@ func TestSortSpillDisk(t *testing.T) { ndvs: cas.ndvs, } dataSource := buildMockDataSource(opt) - exe := &sort_exec.SortExec{ + exe := &sortexec.SortExec{ BaseExecutor: exec.NewBaseExecutor(cas.ctx, dataSource.Schema(), 0, dataSource), ByItems: make([]*plannerutil.ByItems, 0, len(cas.orderByIdx)), ExecSchema: dataSource.Schema(), @@ -386,7 +386,7 @@ func TestSortSpillDisk(t *testing.T) { ndvs: cas.ndvs, } dataSource = buildMockDataSource(opt) - exe = &sort_exec.SortExec{ + exe = &sortexec.SortExec{ BaseExecutor: exec.NewBaseExecutor(cas.ctx, dataSource.Schema(), 0, dataSource), ByItems: make([]*plannerutil.ByItems, 0, len(cas.orderByIdx)), ExecSchema: dataSource.Schema(), diff --git a/pkg/executor/executor_required_rows_test.go b/pkg/executor/executor_required_rows_test.go index 034f47ae40fef..1aef3d44b7b67 100644 --- a/pkg/executor/executor_required_rows_test.go +++ b/pkg/executor/executor_required_rows_test.go @@ -24,7 +24,7 @@ import ( "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/executor/internal/exec" - "github.com/pingcap/tidb/pkg/executor/sort_exec" + "github.com/pingcap/tidb/pkg/executor/sortexec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/expression/aggregation" "github.com/pingcap/tidb/pkg/parser/ast" @@ -279,7 +279,7 @@ func TestSortRequiredRows(t *testing.T) { } func buildSortExec(sctx sessionctx.Context, byItems []*util.ByItems, src exec.Executor) exec.Executor { - sortExec := sort_exec.SortExec{ + sortExec := sortexec.SortExec{ BaseExecutor: exec.NewBaseExecutor(sctx, src.Schema(), 0, src), ByItems: byItems, ExecSchema: src.Schema(), @@ -386,12 +386,12 @@ func TestTopNRequiredRows(t *testing.T) { } func buildTopNExec(ctx sessionctx.Context, offset, count int, byItems []*util.ByItems, src exec.Executor) exec.Executor { - sortExec := sort_exec.SortExec{ + sortExec := sortexec.SortExec{ BaseExecutor: exec.NewBaseExecutor(ctx, src.Schema(), 0, src), ByItems: byItems, ExecSchema: src.Schema(), } - return &sort_exec.TopNExec{ + return &sortexec.TopNExec{ SortExec: sortExec, Limit: &plannercore.PhysicalLimit{Count: uint64(count), Offset: uint64(offset)}, } diff --git a/pkg/executor/sort_exec/BUILD.bazel b/pkg/executor/sortexec/BUILD.bazel similarity index 68% rename from pkg/executor/sort_exec/BUILD.bazel rename to pkg/executor/sortexec/BUILD.bazel index 1f9546905373e..81d3caed95b91 100644 --- a/pkg/executor/sort_exec/BUILD.bazel +++ b/pkg/executor/sortexec/BUILD.bazel @@ -1,9 +1,9 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "sort_exec", + name = "sortexec", srcs = ["sort.go"], - importpath = "github.com/pingcap/tidb/pkg/executor/sort_exec", + importpath = "github.com/pingcap/tidb/pkg/executor/sortexec", visibility = ["//visibility:public"], deps = [ "//pkg/executor/internal/exec", @@ -33,3 +33,16 @@ go_test( "@com_github_stretchr_testify//require", ], ) + +go_test( + name = "sortexec_test", + srcs = ["sort_test.go"], + deps = [ + "//pkg/config", + "//pkg/sessionctx/variable", + "//pkg/testkit", + "//pkg/util", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/executor/sort_exec/sort.go b/pkg/executor/sortexec/sort.go similarity index 99% rename from pkg/executor/sort_exec/sort.go rename to pkg/executor/sortexec/sort.go index 63e3022b4ee57..44d9caca299b2 100644 --- a/pkg/executor/sort_exec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package sort_exec +package sortexec import ( "container/heap" diff --git a/pkg/executor/sort_exec/sort_test.go b/pkg/executor/sortexec/sort_test.go similarity index 99% rename from pkg/executor/sort_exec/sort_test.go rename to pkg/executor/sortexec/sort_test.go index 5f52e5b9183c2..29742415e1b75 100644 --- a/pkg/executor/sort_exec/sort_test.go +++ b/pkg/executor/sortexec/sort_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package sort_exec_test +package sortexec_test import ( "bytes" From ee9f3fcf88a80de0363115cb48f5fb312da5b1ad Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 7 Nov 2023 18:11:09 +0800 Subject: [PATCH 3/3] fix ut --- pkg/executor/test/executor/executor_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/executor/test/executor/executor_test.go b/pkg/executor/test/executor/executor_test.go index eaa31c582ced8..10c40f7d65b70 100644 --- a/pkg/executor/test/executor/executor_test.go +++ b/pkg/executor/test/executor/executor_test.go @@ -2999,9 +2999,9 @@ func TestCompileOutOfMemoryQuota(t *testing.T) { } func TestSignalCheckpointForSort(t *testing.T) { - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/SignalCheckpointForSort", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/sortexec/SignalCheckpointForSort", `return(true)`)) defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/SignalCheckpointForSort")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/sortexec/SignalCheckpointForSort")) }() require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/chunk/SignalCheckpointForSort", `return(true)`)) defer func() {