Skip to content

Commit

Permalink
planner/core: stabilize the execution order of window function… (#11125)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored and zz-jason committed Jul 8, 2019
1 parent c352e0d commit 63b5225
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
63 changes: 62 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"math"
"math/bits"
"reflect"
"sort"
"strings"
"unicode"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/format"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/opcode"
Expand Down Expand Up @@ -3117,11 +3119,70 @@ func (b *PlanBuilder) buildWindowFunctionFrame(spec *ast.WindowSpec, orderByItem
return frame, err
}

func getAllByItems(itemsBuf []*ast.ByItem, spec *ast.WindowSpec) []*ast.ByItem {
itemsBuf = itemsBuf[:0]
if spec.PartitionBy != nil {
itemsBuf = append(itemsBuf, spec.PartitionBy.Items...)
}
if spec.OrderBy != nil {
itemsBuf = append(itemsBuf, spec.OrderBy.Items...)
}
return itemsBuf
}

func restoreByItemText(item *ast.ByItem) string {
var sb strings.Builder
ctx := format.NewRestoreCtx(0, &sb)
err := item.Expr.Restore(ctx)
if err != nil {
return ""
}
return sb.String()
}

func compareItems(lItems []*ast.ByItem, rItems []*ast.ByItem) bool {
minLen := mathutil.Min(len(lItems), len(rItems))
for i := 0; i < minLen; i++ {
res := strings.Compare(restoreByItemText(lItems[i]), restoreByItemText(rItems[i]))
if res != 0 {
return res < 0
}
res = compareBool(lItems[i].Desc, rItems[i].Desc)
if res != 0 {
return res < 0
}
}
return len(lItems) < len(rItems)
}

type windowFuncs struct {
spec *ast.WindowSpec
funcs []*ast.WindowFuncExpr
}

// sortWindowSpecs sorts the window specifications by reversed alphabetical order, then we could add less `Sort` operator
// in physical plan because the window functions with the same partition by and order by clause will be at near places.
func sortWindowSpecs(groupedFuncs map[*ast.WindowSpec][]*ast.WindowFuncExpr) []windowFuncs {
windows := make([]windowFuncs, 0, len(groupedFuncs))
for spec, funcs := range groupedFuncs {
windows = append(windows, windowFuncs{spec, funcs})
}
lItemsBuf := make([]*ast.ByItem, 0, 4)
rItemsBuf := make([]*ast.ByItem, 0, 4)
sort.SliceStable(windows, func(i, j int) bool {
lItemsBuf = getAllByItems(lItemsBuf, windows[i].spec)
rItemsBuf = getAllByItems(rItemsBuf, windows[j].spec)
return !compareItems(lItemsBuf, rItemsBuf)
})
return windows
}

func (b *PlanBuilder) buildWindowFunctions(p LogicalPlan, groupedFuncs map[*ast.WindowSpec][]*ast.WindowFuncExpr, aggMap map[*ast.AggregateFuncExpr]int) (LogicalPlan, map[*ast.WindowFuncExpr]int, error) {
args := make([]ast.ExprNode, 0, 4)
windowMap := make(map[*ast.WindowFuncExpr]int)
for spec, funcs := range groupedFuncs {
for _, window := range sortWindowSpecs(groupedFuncs) {
args = args[:0]
spec, funcs := window.spec, window.funcs
for _, windowFunc := range funcs {
args = append(args, windowFunc.Args...)
}
Expand Down
5 changes: 5 additions & 0 deletions planner/core/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2313,6 +2313,11 @@ func (s *testPlanSuite) TestWindowFunction(c *C) {
sql: "delete from t order by (sum(a) over())",
result: "[planner:3593]You cannot use the window function 'sum' in this context.'",
},
{
// The best execution order should be (a,c), (a, b, c), (a, b), (), it requires only 2 sort operations.
sql: "select sum(a) over (partition by a order by b), sum(b) over (order by a, b, c), sum(c) over(partition by a order by c), sum(d) over() from t",
result: "TableReader(Table(t))->Sort->Window(sum(cast(test.t.c)) over(partition by test.t.a order by test.t.c asc range between unbounded preceding and current row))->Sort->Window(sum(cast(test.t.b)) over(order by test.t.a asc, test.t.b asc, test.t.c asc range between unbounded preceding and current row))->Window(sum(cast(test.t.a)) over(partition by test.t.a order by test.t.b asc range between unbounded preceding and current row))->Window(sum(cast(test.t.d)) over())->Projection",
},
}

s.Parser.EnableWindowFunc(true)
Expand Down

0 comments on commit 63b5225

Please sign in to comment.