Skip to content

Commit

Permalink
zq: Call user-defined op with source (#4808)
Browse files Browse the repository at this point in the history
The change allows zq users to start a query with a call to a user-defined
op that contains a source (file) with no additional input sources.
Previously attempting to do this would result in error "redundant
inputs".
  • Loading branch information
mattnibs authored Oct 25, 2023
1 parent ef9695f commit 40639dc
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 52 deletions.
41 changes: 9 additions & 32 deletions cli/queryflags/flags.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package queryflags

import (
"context"
"flag"
"fmt"
"net/url"
Expand All @@ -9,8 +10,8 @@ import (
"github.com/brimdata/zed/cli"
"github.com/brimdata/zed/compiler"
"github.com/brimdata/zed/compiler/ast"
"github.com/brimdata/zed/compiler/semantic"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zfmt"
"github.com/brimdata/zed/zson"
"golang.org/x/exp/slices"
)
Expand All @@ -36,11 +37,13 @@ func (f *Flags) ParseSourcesAndInputs(paths []string) ([]string, ast.Seq, bool,
// and appears to start with a from or yield operator.
// Otherwise, consider it a file.
if query, err := compiler.Parse(src, f.Includes...); err == nil {
if isFrom(query) {
return nil, query, false, nil
}
if isYield(query) {
return nil, query, true, nil
if s, err := semantic.Analyze(context.Background(), query, nil, nil); err == nil {
if semantic.HasSource(s) {
return nil, query, false, nil
}
if semantic.StartsWithYield(s) {
return nil, query, true, nil
}
}
}
return nil, nil, false, fmt.Errorf("no such file: %s", src)
Expand All @@ -53,32 +56,6 @@ func (f *Flags) ParseSourcesAndInputs(paths []string) ([]string, ast.Seq, bool,
return paths, query, false, nil
}

func isFrom(seq ast.Seq) bool {
if len(seq) > 0 {
switch op := seq[0].(type) {
case *ast.From:
return true
case *ast.Scope:
return isFrom(op.Body)
}
}
return false
}

func isYield(seq ast.Seq) bool {
if len(seq) > 0 {
switch op := seq[0].(type) {
case *ast.Yield:
return true
case *ast.Scope:
return isYield(op.Body)
case *ast.OpExpr:
return !zfmt.IsSearch(op.Expr) && !zfmt.IsBool(op.Expr)
}
}
return false
}

func isURLWithKnownScheme(path string, schemes ...string) bool {
u, err := url.Parse(path)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions cmd/zq/ztests/call-user-op-with-src.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
script: |
zq -z -I countfile.zed 'countfile()'
inputs:
- name: countfile.zed
data: |
op countfile(): (
file test.zson | count()
)
- name: test.zson
data: '{} {} {} {}'

outputs:
- name: stdout
data: |
4(uint64)
2 changes: 1 addition & 1 deletion compiler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewJob(octx *op.Context, in ast.Seq, src *data.Source, head *lakeparse.Comm
if len(seq) == 0 {
return nil, errors.New("internal error: AST seq cannot be empty")
}
entry, err := semantic.Analyze(octx.Context, seq, src, head)
entry, err := semantic.AnalyzeAddSource(octx.Context, seq, src, head)
if err != nil {
return nil, err
}
Expand Down
66 changes: 47 additions & 19 deletions compiler/semantic/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,21 @@ func Analyze(ctx context.Context, seq ast.Seq, source *data.Source, head *lakepa
if err != nil {
return nil, err
}
op, err := a.buildFrom(s[0])
return s, nil
}

// AnalyzeAddSource is the same as Analyze but it adds a default source if the
// DAG does not have one.
func AnalyzeAddSource(ctx context.Context, seq ast.Seq, source *data.Source, head *lakeparse.Commitish) (dag.Seq, error) {
a := newAnalyzer(ctx, source, head)
s, err := a.semSeq(seq)
if err != nil {
return nil, err
}
if op != nil {
s.Prepend(op)
if !HasSource(s) {
if err = a.addDefaultSource(&s); err != nil {
return nil, err
}
}
return s, nil
}
Expand All @@ -49,26 +58,26 @@ func newAnalyzer(ctx context.Context, source *data.Source, head *lakeparse.Commi
}
}

func (a *analyzer) enterScope() {
a.scope = NewScope(a.scope)
}

func (a *analyzer) exitScope() {
a.scope = a.scope.parent
}

func (a *analyzer) buildFrom(op dag.Op) (dag.Op, error) {
switch op := op.(type) {
func HasSource(seq dag.Seq) bool {
switch op := seq[0].(type) {
case *dag.FileScan, *dag.HTTPScan, *dag.PoolScan, *dag.LakeMetaScan, *dag.PoolMetaScan, *dag.CommitMetaScan, *dag.DeleteScan:
return nil, nil
return true
case *dag.Fork:
return a.buildFrom(op.Paths[0][0])
return HasSource(op.Paths[0])
case *dag.Scope:
return a.buildFrom(op.Body[0])
return HasSource(op.Body)
}
return false
}

func (a *analyzer) addDefaultSource(seq *dag.Seq) error {
if HasSource(*seq) {
return nil
}
// No from so add a source.
if a.head == nil {
return &kernel.Reader{}, nil
seq.Prepend(&kernel.Reader{})
return nil
}
pool := &ast.Pool{
Kind: "Pool",
Expand All @@ -81,9 +90,28 @@ func (a *analyzer) buildFrom(op dag.Op) (dag.Op, error) {
}
ops, err := a.semPool(pool)
if err != nil {
return nil, err
return err
}
seq.Prepend(ops[0])
return nil
}

func StartsWithYield(seq dag.Seq) bool {
switch op := seq[0].(type) {
case *dag.Yield:
return true
case *dag.Scope:
return StartsWithYield(op.Body)
}
return ops[0], nil
return false
}

func (a *analyzer) enterScope() {
a.scope = NewScope(a.scope)
}

func (a *analyzer) exitScope() {
a.scope = a.scope.parent
}

type opDecl struct {
Expand Down

0 comments on commit 40639dc

Please sign in to comment.