Skip to content

Commit

Permalink
executor: put sort files into one folder (#48351)
Browse files Browse the repository at this point in the history
ref #47733
  • Loading branch information
xzhangxian1008 authored Nov 7, 2023
1 parent d3f6415 commit 633d223
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 71 deletions.
4 changes: 2 additions & 2 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ go_library(
"shuffle.go",
"simple.go",
"slow_query.go",
"sort.go",
"split.go",
"stmtsummary.go",
"table_reader.go",
Expand Down Expand Up @@ -127,6 +126,7 @@ go_library(
"//pkg/executor/lockstats",
"//pkg/executor/metrics",
"//pkg/executor/mppcoordmanager",
"//pkg/executor/sortexec",
"//pkg/expression",
"//pkg/expression/aggregation",
"//pkg/infoschema",
Expand Down Expand Up @@ -349,7 +349,6 @@ go_test(
"shuffle_test.go",
"slow_query_sql_test.go",
"slow_query_test.go",
"sort_test.go",
"split_test.go",
"stale_txn_test.go",
"stmtsummary_test.go",
Expand Down Expand Up @@ -381,6 +380,7 @@ go_test(
"//pkg/executor/importer",
"//pkg/executor/internal/builder",
"//pkg/executor/internal/exec",
"//pkg/executor/sortexec",
"//pkg/expression",
"//pkg/expression/aggregation",
"//pkg/infoschema",
Expand Down
13 changes: 7 additions & 6 deletions pkg/executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/executor/aggfuncs"
"github.com/pingcap/tidb/pkg/executor/aggregate"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/sortexec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -1646,20 +1647,20 @@ func prepare4MergeJoin(tc *mergeJoinTestCase, innerDS, outerDS *mockDataSource,

var leftExec, rightExec exec.Executor
if sorted {
leftSortExec := &SortExec{
leftSortExec := &sortexec.SortExec{
BaseExecutor: exec.NewBaseExecutor(tc.ctx, innerDS.Schema(), 3, innerDS),
ByItems: make([]*util.ByItems, 0, len(tc.innerJoinKeyIdx)),
schema: innerDS.Schema(),
ExecSchema: innerDS.Schema(),
}
for _, key := range innerJoinKeys {
leftSortExec.ByItems = append(leftSortExec.ByItems, &util.ByItems{Expr: key})
}
leftExec = leftSortExec

rightSortExec := &SortExec{
rightSortExec := &sortexec.SortExec{
BaseExecutor: exec.NewBaseExecutor(tc.ctx, outerDS.Schema(), 4, outerDS),
ByItems: make([]*util.ByItems, 0, len(tc.outerJoinKeyIdx)),
schema: outerDS.Schema(),
ExecSchema: outerDS.Schema(),
}
for _, key := range outerJoinKeys {
rightSortExec.ByItems = append(rightSortExec.ByItems, &util.ByItems{Expr: key})
Expand Down Expand Up @@ -1917,10 +1918,10 @@ func benchmarkSortExec(b *testing.B, cas *sortCase) {
ndvs: cas.ndvs,
}
dataSource := buildMockDataSource(opt)
executor := &SortExec{
executor := &sortexec.SortExec{
BaseExecutor: exec.NewBaseExecutor(cas.ctx, dataSource.Schema(), 4, dataSource),
ByItems: make([]*util.ByItems, 0, len(cas.orderByIdx)),
schema: dataSource.Schema(),
ExecSchema: dataSource.Schema(),
}
for _, idx := range cas.orderByIdx {
executor.ByItems = append(executor.ByItems, &util.ByItems{Expr: cas.columns()[idx]})
Expand Down
17 changes: 9 additions & 8 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/pkg/executor/internal/vecgroupchecker"
"github.com/pingcap/tidb/pkg/executor/lockstats"
executor_metrics "github.com/pingcap/tidb/pkg/executor/metrics"
"github.com/pingcap/tidb/pkg/executor/sortexec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/infoschema"
Expand Down Expand Up @@ -1297,15 +1298,15 @@ func (b *executorBuilder) buildTrace(v *plannercore.Trace) exec.Executor {
optimizerTraceTarget: v.OptimizerTraceTarget,
}
if t.format == plannercore.TraceFormatLog && !t.optimizerTrace {
return &SortExec{
return &sortexec.SortExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), t),
ByItems: []*plannerutil.ByItems{
{Expr: &expression.Column{
Index: 0,
RetType: types.NewFieldType(mysql.TypeTimestamp),
}},
},
schema: v.Schema(),
ExecSchema: v.Schema(),
}
}
return t
Expand Down Expand Up @@ -2245,10 +2246,10 @@ func (b *executorBuilder) buildSort(v *plannercore.PhysicalSort) exec.Executor {
if b.err != nil {
return nil
}
sortExec := SortExec{
sortExec := sortexec.SortExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec),
ByItems: v.ByItems,
schema: v.Schema(),
ExecSchema: v.Schema(),
}
executor_metrics.ExecutorCounterSortExec.Inc()
return &sortExec
Expand All @@ -2259,15 +2260,15 @@ func (b *executorBuilder) buildTopN(v *plannercore.PhysicalTopN) exec.Executor {
if b.err != nil {
return nil
}
sortExec := SortExec{
sortExec := sortexec.SortExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec),
ByItems: v.ByItems,
schema: v.Schema(),
ExecSchema: v.Schema(),
}
executor_metrics.ExecutorCounterTopNExec.Inc()
return &TopNExec{
return &sortexec.TopNExec{
SortExec: sortExec,
limit: &plannercore.PhysicalLimit{Count: v.Count, Offset: v.Offset},
Limit: &plannercore.PhysicalLimit{Count: v.Count, Offset: v.Offset},
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/pkg/executor/aggregate"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/internal/pdhelper"
"github.com/pingcap/tidb/pkg/executor/sortexec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -102,12 +103,12 @@ var (
_ exec.Executor = &ShowDDLExec{}
_ exec.Executor = &ShowDDLJobsExec{}
_ exec.Executor = &ShowDDLJobQueriesExec{}
_ exec.Executor = &SortExec{}
_ exec.Executor = &sortexec.SortExec{}
_ exec.Executor = &aggregate.StreamAggExec{}
_ exec.Executor = &TableDualExec{}
_ exec.Executor = &TableReaderExecutor{}
_ exec.Executor = &TableScanExec{}
_ exec.Executor = &TopNExec{}
_ exec.Executor = &sortexec.TopNExec{}
_ exec.Executor = &UnionExec{}
_ exec.Executor = &FastCheckTableExec{}

Expand Down
41 changes: 21 additions & 20 deletions pkg/executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/pkg/executor/aggfuncs"
"github.com/pingcap/tidb/pkg/executor/aggregate"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/sortexec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
Expand Down Expand Up @@ -290,10 +291,10 @@ func TestSortSpillDisk(t *testing.T) {
ndvs: cas.ndvs,
}
dataSource := buildMockDataSource(opt)
exe := &SortExec{
exe := &sortexec.SortExec{
BaseExecutor: exec.NewBaseExecutor(cas.ctx, dataSource.Schema(), 0, dataSource),
ByItems: make([]*plannerutil.ByItems, 0, len(cas.orderByIdx)),
schema: dataSource.Schema(),
ExecSchema: dataSource.Schema(),
}
for _, idx := range cas.orderByIdx {
exe.ByItems = append(exe.ByItems, &plannerutil.ByItems{Expr: cas.columns()[idx]})
Expand All @@ -311,9 +312,9 @@ func TestSortSpillDisk(t *testing.T) {
}
}
// Test only 1 partition and all data in memory.
require.Len(t, exe.partitionList, 1)
require.Equal(t, false, exe.partitionList[0].AlreadySpilledSafeForTest())
require.Equal(t, 2048, exe.partitionList[0].NumRow())
require.Len(t, exe.PartitionList, 1)
require.Equal(t, false, exe.PartitionList[0].AlreadySpilledSafeForTest())
require.Equal(t, 2048, exe.PartitionList[0].NumRow())
err = exe.Close()
require.NoError(t, err)

Expand All @@ -334,16 +335,16 @@ func TestSortSpillDisk(t *testing.T) {
// Now spilling is in parallel.
// Maybe the second add() will called before spilling, depends on
// Golang goroutine scheduling. So the result has two possibilities.
if len(exe.partitionList) == 2 {
require.Len(t, exe.partitionList, 2)
require.Equal(t, true, exe.partitionList[0].AlreadySpilledSafeForTest())
require.Equal(t, true, exe.partitionList[1].AlreadySpilledSafeForTest())
require.Equal(t, 1024, exe.partitionList[0].NumRow())
require.Equal(t, 1024, exe.partitionList[1].NumRow())
if len(exe.PartitionList) == 2 {
require.Len(t, exe.PartitionList, 2)
require.Equal(t, true, exe.PartitionList[0].AlreadySpilledSafeForTest())
require.Equal(t, true, exe.PartitionList[1].AlreadySpilledSafeForTest())
require.Equal(t, 1024, exe.PartitionList[0].NumRow())
require.Equal(t, 1024, exe.PartitionList[1].NumRow())
} else {
require.Len(t, exe.partitionList, 1)
require.Equal(t, true, exe.partitionList[0].AlreadySpilledSafeForTest())
require.Equal(t, 2048, exe.partitionList[0].NumRow())
require.Len(t, exe.PartitionList, 1)
require.Equal(t, true, exe.PartitionList[0].AlreadySpilledSafeForTest())
require.Equal(t, 2048, exe.PartitionList[0].NumRow())
}

err = exe.Close()
Expand All @@ -363,9 +364,9 @@ func TestSortSpillDisk(t *testing.T) {
}
}
// Test only 1 partition but spill disk.
require.Len(t, exe.partitionList, 1)
require.Equal(t, true, exe.partitionList[0].AlreadySpilledSafeForTest())
require.Equal(t, 2048, exe.partitionList[0].NumRow())
require.Len(t, exe.PartitionList, 1)
require.Equal(t, true, exe.PartitionList[0].AlreadySpilledSafeForTest())
require.Equal(t, 2048, exe.PartitionList[0].NumRow())
err = exe.Close()
require.NoError(t, err)

Expand All @@ -385,10 +386,10 @@ func TestSortSpillDisk(t *testing.T) {
ndvs: cas.ndvs,
}
dataSource = buildMockDataSource(opt)
exe = &SortExec{
exe = &sortexec.SortExec{
BaseExecutor: exec.NewBaseExecutor(cas.ctx, dataSource.Schema(), 0, dataSource),
ByItems: make([]*plannerutil.ByItems, 0, len(cas.orderByIdx)),
schema: dataSource.Schema(),
ExecSchema: dataSource.Schema(),
}
for _, idx := range cas.orderByIdx {
exe.ByItems = append(exe.ByItems, &plannerutil.ByItems{Expr: cas.columns()[idx]})
Expand All @@ -406,7 +407,7 @@ func TestSortSpillDisk(t *testing.T) {
}
}
// Don't spill too many partitions.
require.True(t, len(exe.partitionList) <= 4)
require.True(t, len(exe.PartitionList) <= 4)
err = exe.Close()
require.NoError(t, err)
}
13 changes: 7 additions & 6 deletions pkg/executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/sortexec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -278,10 +279,10 @@ func TestSortRequiredRows(t *testing.T) {
}

func buildSortExec(sctx sessionctx.Context, byItems []*util.ByItems, src exec.Executor) exec.Executor {
sortExec := SortExec{
sortExec := sortexec.SortExec{
BaseExecutor: exec.NewBaseExecutor(sctx, src.Schema(), 0, src),
ByItems: byItems,
schema: src.Schema(),
ExecSchema: src.Schema(),
}
return &sortExec
}
Expand Down Expand Up @@ -385,14 +386,14 @@ func TestTopNRequiredRows(t *testing.T) {
}

func buildTopNExec(ctx sessionctx.Context, offset, count int, byItems []*util.ByItems, src exec.Executor) exec.Executor {
sortExec := SortExec{
sortExec := sortexec.SortExec{
BaseExecutor: exec.NewBaseExecutor(ctx, src.Schema(), 0, src),
ByItems: byItems,
schema: src.Schema(),
ExecSchema: src.Schema(),
}
return &TopNExec{
return &sortexec.TopNExec{
SortExec: sortExec,
limit: &plannercore.PhysicalLimit{Count: uint64(count), Offset: uint64(offset)},
Limit: &plannercore.PhysicalLimit{Count: uint64(count), Offset: uint64(offset)},
}
}

Expand Down
48 changes: 48 additions & 0 deletions pkg/executor/sortexec/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "sortexec",
srcs = ["sort.go"],
importpath = "github.com/pingcap/tidb/pkg/executor/sortexec",
visibility = ["//visibility:public"],
deps = [
"//pkg/executor/internal/exec",
"//pkg/expression",
"//pkg/planner/core",
"//pkg/planner/util",
"//pkg/sessionctx/variable",
"//pkg/util/chunk",
"//pkg/util/disk",
"//pkg/util/memory",
"//pkg/util/sqlkiller",
"@com_github_pingcap_failpoint//:failpoint",
],
)

go_test(
name = "sort_exec_test",
timeout = "short",
srcs = ["sort_test.go"],
flaky = True,
deps = [
"//pkg/config",
"//pkg/sessionctx/variable",
"//pkg/testkit",
"//pkg/util",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
],
)

go_test(
name = "sortexec_test",
srcs = ["sort_test.go"],
deps = [
"//pkg/config",
"//pkg/sessionctx/variable",
"//pkg/testkit",
"//pkg/util",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit 633d223

Please sign in to comment.