Skip to content

Commit

Permalink
executor: fix the error that is raised when executing the window func…
Browse files Browse the repository at this point in the history
…tion with range type frame (#46927)

close #46298
  • Loading branch information
xzhangxian1008 authored Sep 19, 2023
1 parent 8f0b9d5 commit b3ec110
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 5 deletions.
9 changes: 8 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4962,6 +4962,8 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) exec.Execut
exec.orderByCols = orderByCols
exec.expectedCmpResult = cmpResult
exec.isRangeFrame = true
exec.start.InitCompareCols(b.ctx, exec.orderByCols)
exec.end.InitCompareCols(b.ctx, exec.orderByCols)
}
}
return exec
Expand All @@ -4984,14 +4986,19 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) exec.Execut
if len(v.OrderBy) > 0 && v.OrderBy[0].Desc {
cmpResult = 1
}
processor = &rangeFrameWindowProcessor{
tmpProcessor := &rangeFrameWindowProcessor{
windowFuncs: windowFuncs,
partialResults: partialResults,
start: v.Frame.Start,
end: v.Frame.End,
orderByCols: orderByCols,
expectedCmpResult: cmpResult,
}

tmpProcessor.start.InitCompareCols(b.ctx, orderByCols)
tmpProcessor.end.InitCompareCols(b.ctx, orderByCols)

processor = tmpProcessor
}
return &WindowExec{BaseExecutor: base,
processor: processor,
Expand Down
4 changes: 2 additions & 2 deletions executor/pipelined_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (e *PipelinedWindowExec) getStart(ctx sessionctx.Context) (uint64, error) {
var res int64
var err error
for i := range e.orderByCols {
res, _, err = e.start.CmpFuncs[i](ctx, e.orderByCols[i], e.start.CalcFuncs[i], e.getRow(start), e.getRow(e.curRowIdx))
res, _, err = e.start.CmpFuncs[i](ctx, e.start.CompareCols[i], e.start.CalcFuncs[i], e.getRow(start), e.getRow(e.curRowIdx))
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -304,7 +304,7 @@ func (e *PipelinedWindowExec) getEnd(ctx sessionctx.Context) (uint64, error) {
var res int64
var err error
for i := range e.orderByCols {
res, _, err = e.end.CmpFuncs[i](ctx, e.end.CalcFuncs[i], e.orderByCols[i], e.getRow(e.curRowIdx), e.getRow(end))
res, _, err = e.end.CmpFuncs[i](ctx, e.end.CalcFuncs[i], e.end.CompareCols[i], e.getRow(e.curRowIdx), e.getRow(end))
if err != nil {
return 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (p *rangeFrameWindowProcessor) getStartOffset(ctx sessionctx.Context, rows
var res int64
var err error
for i := range p.orderByCols {
res, _, err = p.start.CmpFuncs[i](ctx, p.orderByCols[i], p.start.CalcFuncs[i], rows[p.lastStartOffset], rows[p.curRowIdx])
res, _, err = p.start.CmpFuncs[i](ctx, p.start.CompareCols[i], p.start.CalcFuncs[i], rows[p.lastStartOffset], rows[p.curRowIdx])
if err != nil {
return 0, err
}
Expand All @@ -425,7 +425,7 @@ func (p *rangeFrameWindowProcessor) getEndOffset(ctx sessionctx.Context, rows []
var res int64
var err error
for i := range p.orderByCols {
res, _, err = p.end.CmpFuncs[i](ctx, p.end.CalcFuncs[i], p.orderByCols[i], rows[p.curRowIdx], rows[p.lastEndOffset])
res, _, err = p.end.CmpFuncs[i](ctx, p.end.CalcFuncs[i], p.end.CompareCols[i], rows[p.curRowIdx], rows[p.lastEndOffset])
if err != nil {
return 0, err
}
Expand Down
12 changes: 12 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5281,3 +5281,15 @@ func TestIssue45033(t *testing.T) {
from t3 alias3) alias4
where alias4.c2 = alias2.alias_col1);`).Check(testkit.Rows("0"))
}

func TestIssue46298(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists test.first_range;")
tk.MustExec("create table test.first_range(p int not null, o tinyint not null, v int not null);")
tk.MustExec("insert into test.first_range (p, o, v) values (0, 0, 0), (1, 1, 1), (1, 2, 2), (1, 4, 4), (1, 8, 8), (2, 0, 0), (2, 3, 3), (2, 10, 10), (2, 13, 13), (2, 15, 15), (3, 1, 1), (3, 3, 3), (3, 5, 5), (3, 9, 9), (3, 15, 15), (3, 20, 20), (3, 31, 31);")
tk.MustQuery("select *, first_value(v) over (partition by p order by o range between 3.1 preceding and 2.9 following) as a from test.first_range;")
tk.MustExec(`set @@tidb_enable_pipelined_window_function=0`)
tk.MustQuery("select *, first_value(v) over (partition by p order by o range between 3.1 preceding and 2.9 following) as a from test.first_range;")
}
1 change: 1 addition & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6823,6 +6823,7 @@ func (b *PlanBuilder) buildWindowFunctionFrameBound(_ context.Context, spec *ast
if err != nil {
return nil, err
}

bound.CmpFuncs[0] = expression.GetCmpFunction(b.ctx, orderByItems[0].Col, bound.CalcFuncs[0])
return bound, nil
}
Expand Down
19 changes: 19 additions & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2034,6 +2034,8 @@ type FrameBound struct {
// We will build the date_add or date_sub functions for frames like `INTERVAL '2:30' MINUTE_SECOND FOLLOWING`,
// and plus or minus for frames like `1 preceding`.
CalcFuncs []expression.Expression
// Sometimes we need to cast order by column to a specific type when frame type is range
CompareCols []expression.Expression
// CmpFuncs is used to decide whether one row is included in the current frame.
CmpFuncs []expression.CompareFunc
}
Expand All @@ -2052,6 +2054,23 @@ func (fb *FrameBound) Clone() *FrameBound {
return cloned
}

// InitCompareCols will init CompareCols
func (fb *FrameBound) InitCompareCols(ctx sessionctx.Context, orderByCols []*expression.Column) {
if len(fb.CalcFuncs) > 0 {
fb.CompareCols = make([]expression.Expression, len(orderByCols))
if fb.CalcFuncs[0].GetType().EvalType() != orderByCols[0].GetType().EvalType() {
fb.CompareCols[0], _ = expression.NewFunctionBase(ctx, ast.Cast, fb.CalcFuncs[0].GetType(), orderByCols[0])

// As compare column has been converted, compare function should also be changed
fb.CmpFuncs[0] = expression.GetCmpFunction(ctx, fb.CompareCols[0], fb.CalcFuncs[0])
} else {
for i, col := range orderByCols {
fb.CompareCols[i] = col
}
}
}
}

// LogicalWindow represents a logical window function plan.
type LogicalWindow struct {
logicalSchemaProducer
Expand Down

0 comments on commit b3ec110

Please sign in to comment.