Skip to content

Commit

Permalink
Support dynamic paths for put and cut op (#4795)
Browse files Browse the repository at this point in the history
Closes #4555
  • Loading branch information
mattnibs authored Oct 25, 2023
1 parent 88f40ed commit ef9695f
Show file tree
Hide file tree
Showing 22 changed files with 506 additions and 237 deletions.
36 changes: 31 additions & 5 deletions compiler/kernel/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,41 @@ func (b *Builder) compileDotExpr(dot *dag.Dot) (expr.Evaluator, error) {
return expr.NewDotExpr(b.zctx(), record, dot.RHS), nil
}

func compileLval(e dag.Expr) (field.Path, error) {
if this, ok := e.(*dag.This); ok {
return field.Path(this.Path), nil
func (b *Builder) compileLval(e dag.Expr) (*expr.Lval, error) {
switch e := e.(type) {
case *dag.BinaryExpr:
if e.Op != "[" {
return nil, fmt.Errorf("internal error: invalid lval %#v", e)
}
lhs, err := b.compileLval(e.LHS)
if err != nil {
return nil, err
}
rhs, err := b.compileExpr(e.RHS)
if err != nil {
return nil, err
}
lhs.Elems = append(lhs.Elems, expr.NewExprLvalElem(b.zctx(), rhs))
return lhs, nil
case *dag.Dot:
lhs, err := b.compileLval(e.LHS)
if err != nil {
return nil, err
}
lhs.Elems = append(lhs.Elems, &expr.StaticLvalElem{Name: e.RHS})
return lhs, nil
case *dag.This:
var elems []expr.LvalElem
for _, elem := range e.Path {
elems = append(elems, &expr.StaticLvalElem{Name: elem})
}
return expr.NewLval(elems), nil
}
return nil, errors.New("invalid expression on lhs of assignment")
return nil, fmt.Errorf("internal error: invalid lval %#v", e)
}

func (b *Builder) compileAssignment(node *dag.Assignment) (expr.Assignment, error) {
lhs, err := compileLval(node.LHS)
lhs, err := b.compileLval(node.LHS)
if err != nil {
return expr.Assignment{}, err
}
Expand Down
8 changes: 4 additions & 4 deletions compiler/kernel/groupby.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ func (b *Builder) compileAggAssignment(assignment dag.Assignment) (field.Path, *
if !ok {
return nil, nil, errors.New("aggregator is not an aggregation expression")
}
lhs, err := compileLval(assignment.LHS)
if err != nil {
return nil, nil, fmt.Errorf("lhs of aggregation: %w", err)
this, ok := assignment.LHS.(*dag.This)
if !ok {
return nil, nil, fmt.Errorf("internal error: aggregator assignment LHS is not a static path: %#v", assignment.LHS)
}
m, err := b.compileAgg(aggAST)
return lhs, m, err
return this.Path, m, err
}

func (b *Builder) compileAgg(agg *dag.Agg) (*expr.Aggregator, error) {
Expand Down
40 changes: 7 additions & 33 deletions compiler/kernel/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
return nil, err
}
lhs, rhs := splitAssignments(assignments)
cutter, err := expr.NewCutter(b.octx.Zctx, lhs, rhs)
if err != nil {
return nil, err
}
cutter := expr.NewCutter(b.octx.Zctx, lhs, rhs)
if v.Quiet {
cutter.Quiet()
}
Expand Down Expand Up @@ -174,36 +171,13 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
if err != nil {
return nil, err
}
putter, err := expr.NewPutter(b.octx.Zctx, clauses)
if err != nil {
return nil, err
}
putter := expr.NewPutter(b.octx.Zctx, clauses)
return op.NewApplier(b.octx, parent, putter), nil
case *dag.Rename:
var srcs, dsts field.List
for _, fa := range v.Args {
dst, err := compileLval(fa.LHS)
if err != nil {
return nil, err
}
// We call CompileLval on the RHS because renames are
// restricted to dotted field name expressions.
src, err := compileLval(fa.RHS)
if err != nil {
return nil, err
}
if len(dst) != len(src) {
return nil, fmt.Errorf("cannot rename %s to %s", src, dst)
}
// Check that the prefixes match and, if not, report first place
// that they don't.
for i := 0; i <= len(src)-2; i++ {
if src[i] != dst[i] {
return nil, fmt.Errorf("cannot rename %s to %s (differ in %s vs %s)", src, dst, src[i], dst[i])
}
}
dsts = append(dsts, dst)
srcs = append(srcs, src)
for _, a := range v.Args {
dsts = append(dsts, a.LHS.(*dag.This).Path)
srcs = append(srcs, a.RHS.(*dag.This).Path)
}
renamer := expr.NewRenamer(b.octx.Zctx, srcs, dsts)
return op.NewApplier(b.octx, parent, renamer), nil
Expand Down Expand Up @@ -388,9 +362,9 @@ func (b *Builder) compileAssignments(assignments []dag.Assignment) ([]expr.Assig
return keys, nil
}

func splitAssignments(assignments []expr.Assignment) (field.List, []expr.Evaluator) {
func splitAssignments(assignments []expr.Assignment) ([]*expr.Lval, []expr.Evaluator) {
n := len(assignments)
lhs := make(field.List, 0, n)
lhs := make([]*expr.Lval, 0, n)
rhs := make([]expr.Evaluator, 0, n)
for _, a := range assignments {
lhs = append(lhs, a.LHS)
Expand Down
95 changes: 50 additions & 45 deletions compiler/semantic/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,10 +529,10 @@ func (a *analyzer) semExprs(in []ast.Expr) ([]dag.Expr, error) {
return exprs, nil
}

func (a *analyzer) semAssignments(assignments []ast.Assignment, summarize bool) ([]dag.Assignment, error) {
func (a *analyzer) semAssignments(assignments []ast.Assignment) ([]dag.Assignment, error) {
out := make([]dag.Assignment, 0, len(assignments))
for _, e := range assignments {
a, err := a.semAssignment(e, summarize)
a, err := a.semAssignment(e)
if err != nil {
return nil, err
}
Expand All @@ -541,64 +541,69 @@ func (a *analyzer) semAssignments(assignments []ast.Assignment, summarize bool)
return out, nil
}

func (a *analyzer) semAssignment(assign ast.Assignment, summarize bool) (dag.Assignment, error) {
func (a *analyzer) semAssignment(assign ast.Assignment) (dag.Assignment, error) {
rhs, err := a.semExpr(assign.RHS)
if err != nil {
return dag.Assignment{}, fmt.Errorf("rhs of assignment expression: %w", err)
}
if _, ok := rhs.(*dag.Agg); ok {
summarize = true
return dag.Assignment{}, fmt.Errorf("right-hand side of assignment: %w", err)
}
var lhs dag.Expr
if assign.LHS != nil {
lhs, err = a.semExpr(assign.LHS)
if assign.LHS == nil {
path, err := deriveLHSPath(rhs)
if err != nil {
return dag.Assignment{}, fmt.Errorf("lhs of assigment expression: %w", err)
return dag.Assignment{}, err
}
} else if call, ok := assign.RHS.(*ast.Call); ok {
path := []string{call.Name}
switch call.Name {
lhs = &dag.This{Kind: "This", Path: path}
} else if lhs, err = a.semExpr(assign.LHS); err != nil {
return dag.Assignment{}, fmt.Errorf("left-hand side of assignment: %w", err)
}
if !isLval(lhs) {
return dag.Assignment{}, errors.New("illegal left-hand side of assignment")
}
if this, ok := lhs.(*dag.This); ok && len(this.Path) == 0 {
return dag.Assignment{}, errors.New("cannot assign to 'this'")
}
return dag.Assignment{Kind: "Assignment", LHS: lhs, RHS: rhs}, nil
}

func isLval(e dag.Expr) bool {
switch e := e.(type) {
case *dag.BinaryExpr:
return e.Op == "[" && isLval(e.LHS)
case *dag.Dot:
return isLval(e.LHS)
case *dag.This:
return true
}
return false
}

func deriveLHSPath(rhs dag.Expr) ([]string, error) {
var path []string
switch rhs := rhs.(type) {
case *dag.Call:
path = []string{rhs.Name}
switch rhs.Name {
case "every":
// If LHS is nil and the call is every() make the LHS field ts since
// field ts assumed with every.
path = []string{"ts"}
case "quiet":
if len(call.Args) > 0 {
if p, ok := rhs.(*dag.Call).Args[0].(*dag.This); ok {
path = p.Path
if len(rhs.Args) > 0 {
if this, ok := rhs.Args[0].(*dag.This); ok {
path = this.Path
}
}
}
lhs = &dag.This{Kind: "This", Path: path}
} else if agg, ok := assign.RHS.(*ast.Agg); ok {
lhs = &dag.This{Kind: "This", Path: []string{agg.Name}}
} else if v, ok := rhs.(*dag.Var); ok {
lhs = &dag.This{Kind: "This", Path: []string{v.Name}}
} else {
lhs, err = a.semExpr(assign.RHS)
if err != nil {
return dag.Assignment{}, errors.New("assignment name could not be inferred from rhs expression")
}
}
if summarize {
// Summarize always outputs its results as new records of "this"
// so if we have an "as" that overrides "this", we just
// convert it back to a local this.
if dot, ok := lhs.(*dag.Dot); ok {
if v, ok := dot.LHS.(*dag.Var); ok && v.Name == "this" {
lhs = &dag.This{Kind: "This", Path: []string{dot.RHS}}
}
}
}
// Make sure we have a valid lval for lhs.
this, ok := lhs.(*dag.This)
if !ok {
return dag.Assignment{}, errors.New("illegal left-hand side of assignment")
}
if len(this.Path) == 0 {
return dag.Assignment{}, errors.New("cannot assign to 'this'")
case *dag.Agg:
path = []string{rhs.Name}
case *dag.Var:
path = []string{rhs.Name}
case *dag.This:
path = rhs.Path
default:
return nil, errors.New("cannot infer field from expression")
}
return dag.Assignment{Kind: "Assignment", LHS: lhs, RHS: rhs}, nil
return path, nil
}

func (a *analyzer) semFields(exprs []ast.Expr) ([]dag.Expr, error) {
Expand Down
Loading

0 comments on commit ef9695f

Please sign in to comment.