Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
systay committed Feb 21, 2025
1 parent db77d81 commit a5fa246
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 45 deletions.
14 changes: 0 additions & 14 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,6 @@ func (f *Filter) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
return f.Source.GetOrdering(ctx)
}

func (f *Filter) Compact(*plancontext.PlanningContext) (Operator, *ApplyResult) {
if len(f.Predicates) == 0 {
return f.Source, Rewrote("filter with no predicates removed")
}

other, isFilter := f.Source.(*Filter)
if !isFilter {
return f, NoRewrite
}
f.Source = other.Source
f.Predicates = append(f.Predicates, other.Predicates...)
return f, Rewrote("two filters merged into one")
}

func (f *Filter) planOffsets(ctx *plancontext.PlanningContext) Operator {
cfg := &evalengine.Config{
ResolveType: ctx.TypeForExpr,
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/planbuilder/operators/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ func (j *Join) GetOrdering(*plancontext.PlanningContext) []OrderBy {
return nil
}

func (j *Join) Compact(ctx *plancontext.PlanningContext) (Operator, *ApplyResult) {
func (j *Join) tryCompact(ctx *plancontext.PlanningContext) Operator {
if !j.JoinType.IsCommutative() {
// if we can't move tables around, we can't merge these inputs
return j, NoRewrite
return nil
}

lqg, lok := j.LHS.(*QueryGraph)
rqg, rok := j.RHS.(*QueryGraph)
if !lok || !rok {
return j, NoRewrite
return nil
}

newOp := &QueryGraph{
Expand All @@ -69,7 +69,7 @@ func (j *Join) Compact(ctx *plancontext.PlanningContext) (Operator, *ApplyResult
if j.Predicate != nil {
newOp.collectPredicate(ctx, j.Predicate)
}
return newOp, Rewrote("merge querygraphs into a single one")
return newOp
}

func createStraightJoin(ctx *plancontext.PlanningContext, join *sqlparser.JoinTableExpr, lhs, rhs Operator) Operator {
Expand Down
34 changes: 7 additions & 27 deletions go/vt/vtgate/planbuilder/operators/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,10 @@ func (p *Projection) Compact(ctx *plancontext.PlanningContext) (Operator, *Apply
return p.Source, Rewrote("removed projection only passing through the input")
}

switch src := p.Source.(type) {
case *Route:
return p.compactWithRoute(ctx, src)
case *ApplyJoin:
return p.compactWithJoin(ctx, src)
}
// switch src := p.Source.(type) {
// case *ApplyJoin:
// return p.compactWithJoin(ctx, src)
// }
return p, NoRewrite
}

Expand All @@ -531,6 +529,9 @@ func (p *Projection) compactWithJoin(ctx *plancontext.PlanningContext, join *App
if col.Original.As.NotEmpty() {
return p, NoRewrite
}
if len(join.Columns) == 0 {
return p, NoRewrite
}
newColumns = append(newColumns, join.Columns[colInfo])
newColumnsAST.add(join.JoinColumns.columns[colInfo])
case nil:
Expand All @@ -555,27 +556,6 @@ func (p *Projection) compactWithJoin(ctx *plancontext.PlanningContext, join *App
return join, Rewrote("remove projection from before join")
}

func (p *Projection) compactWithRoute(ctx *plancontext.PlanningContext, rb *Route) (Operator, *ApplyResult) {
ap, err := p.GetAliasedProjections()
if err != nil {
return p, NoRewrite
}

for i, col := range ap {
offset, ok := col.Info.(Offset)
if !ok || int(offset) != i {
return p, NoRewrite
}
}
columns := rb.GetColumns(ctx)

if len(columns) == len(ap) {
return rb, Rewrote("remove projection from before route")
}
rb.ResultColumns = len(columns)
return rb, NoRewrite
}

// needsEvaluation finds the expression given by this argument and checks if the inside and outside expressions match
// we can't rely on the content of the info field since it's not filled in until offset plan time
func (p *Projection) needsEvaluation(ctx *plancontext.PlanningContext, e sqlparser.Expr) bool {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/planbuilder/operators/projection_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func tryPushProjection(
case *Limit:
return Swap(p, src, "push projection under limit")
case *ApplyJoin:
op, res := p.compactWithJoin(ctx, src)
if res != NoRewrite {
return op, res
}

if p.FromAggr || !p.canPush(ctx) {
return p, NoRewrite
}
Expand Down Expand Up @@ -239,6 +244,7 @@ func pushProjectionInApplyJoin(
// we can't push down expression evaluation to the rhs if we are not sure if it will even be executed
return p, NoRewrite
}

if IsOuter(src) {
// for outer joins, we have to check that we can send down the projection to the rhs
for _, expr := range ap.GetColumns() {
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,18 @@ func tryPushFilter(ctx *plancontext.PlanningContext, in *Filter) (Operator, *App
}
src.Outer, in.Source = in, src.Outer
return src, Rewrote("push filter to outer query in subquery container")
case *Filter:
if len(in.Predicates) == 0 {
return in.Source, Rewrote("filter with no predicates removed")
}

other, isFilter := in.Source.(*Filter)
if !isFilter {
return in, NoRewrite
}
in.Source = other.Source
in.Predicates = append(in.Predicates, other.Predicates...)
return in, Rewrote("two filters merged into one")
}

return in, NoRewrite
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/planbuilder/operators/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func pushDerived(ctx *plancontext.PlanningContext, op *Horizon) (Operator, *Appl
}

func optimizeJoin(ctx *plancontext.PlanningContext, op *Join) (Operator, *ApplyResult) {
if newOp := op.tryCompact(ctx); newOp != nil {
return newOp, Rewrote("merged query graphs")
}
return mergeOrJoin(ctx, op.LHS, op.RHS, sqlparser.SplitAndExpression(nil, op.Predicate), op.JoinType)
}

Expand Down

0 comments on commit a5fa246

Please sign in to comment.