Skip to content

Commit

Permalink
zq: Call user-defined op with source
Browse files Browse the repository at this point in the history
The change allows zq users to start a query with a call to a user-defined
op that contains a source (file) with no additional input sources.
Previously attempting to do this would result in error "redundant
inputs".
  • Loading branch information
mattnibs committed Oct 17, 2023
1 parent 24b9cc9 commit 777dc58
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 120 deletions.
41 changes: 9 additions & 32 deletions cli/queryflags/flags.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package queryflags

import (
"context"
"flag"
"fmt"
"net/url"
Expand All @@ -9,8 +10,8 @@ import (
"github.com/brimdata/zed/cli"
"github.com/brimdata/zed/compiler"
"github.com/brimdata/zed/compiler/ast"
"github.com/brimdata/zed/compiler/semantic"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zfmt"
"github.com/brimdata/zed/zson"
"golang.org/x/exp/slices"
)
Expand All @@ -36,11 +37,13 @@ func (f *Flags) ParseSourcesAndInputs(paths []string) ([]string, ast.Seq, bool,
// and appears to start with a from or yield operator.
// Otherwise, consider it a file.
if query, err := compiler.Parse(src, f.Includes...); err == nil {
if isFrom(query) {
return nil, query, false, nil
}
if isYield(query) {
return nil, query, true, nil
if a, err := semantic.NewAnalyzer(context.Background(), query, nil, nil); err == nil {
if a.HasSource() {
return nil, query, false, nil
}
if a.StartsWithYield() {
return nil, query, true, nil
}
}
}
return nil, nil, false, fmt.Errorf("no such file: %s", src)
Expand All @@ -53,32 +56,6 @@ func (f *Flags) ParseSourcesAndInputs(paths []string) ([]string, ast.Seq, bool,
return paths, query, false, nil
}

func isFrom(seq ast.Seq) bool {
if len(seq) > 0 {
switch op := seq[0].(type) {
case *ast.From:
return true
case *ast.Scope:
return isFrom(op.Body)
}
}
return false
}

func isYield(seq ast.Seq) bool {
if len(seq) > 0 {
switch op := seq[0].(type) {
case *ast.Yield:
return true
case *ast.Scope:
return isYield(op.Body)
case *ast.OpExpr:
return !zfmt.IsSearch(op.Expr) && !zfmt.IsBool(op.Expr)
}
}
return false
}

func isURLWithKnownScheme(path string, schemes ...string) bool {
u, err := url.Parse(path)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions cmd/zq/ztests/call-user-op-with-src.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
script: |
zq -z -I countfile.zed 'countfile()'
inputs:
- name: countfile.zed
data: |
op countfile(): (
file test.zson | count()
)
- name: test.zson
data: '{} {} {} {}'

outputs:
- name: stdout
data: |
4(uint64)
9 changes: 6 additions & 3 deletions compiler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,20 @@ func NewJob(octx *op.Context, in ast.Seq, src *data.Source, head *lakeparse.Comm
if len(seq) == 0 {
return nil, errors.New("internal error: AST seq cannot be empty")
}
entry, err := semantic.Analyze(octx.Context, seq, src, head)
analyzer, err := semantic.NewAnalyzer(octx.Context, seq, src, head)
if err != nil {
return nil, err
}
reader, _ := entry[0].(*kernel.Reader)
if err := analyzer.AddDefaultSource(); err != nil {
return nil, err
}
reader, _ := analyzer.Entry[0].(*kernel.Reader)
return &Job{
octx: octx,
builder: kernel.NewBuilder(octx, src),
optimizer: optimizer.New(octx.Context, src),
reader: reader,
entry: entry,
entry: analyzer.Entry,
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions compiler/lake.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,17 @@ func newDeleteJob(octx *op.Context, in ast.Seq, src *data.Source, head *lakepars
},
}},
})
entry, err := semantic.Analyze(octx.Context, seq, src, head)
a, err := semantic.NewAnalyzer(octx.Context, seq, src, head)
if err != nil {
return nil, err
}
if _, ok := entry[1].(*dag.Filter); !ok {
if _, ok := a.Entry[1].(*dag.Filter); !ok {
return nil, &InvalidDeleteWhereQuery{}
}
return &Job{
octx: octx,
builder: kernel.NewBuilder(octx, src),
optimizer: optimizer.New(octx.Context, src),
entry: entry,
entry: a.Entry,
}, nil
}
86 changes: 52 additions & 34 deletions compiler/semantic/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,9 @@ import (
"github.com/brimdata/zed/lakeparse"
)

// Analyze performs a semantic analysis of the AST, translating it from AST
// to DAG form, resolving syntax ambiguities, and performing constant propagation.
// After semantic analysis, the DAG is ready for either optimization or compilation.
func Analyze(ctx context.Context, seq ast.Seq, source *data.Source, head *lakeparse.Commitish) (dag.Seq, error) {
a := newAnalyzer(ctx, source, head)
s, err := a.semSeq(seq)
if err != nil {
return nil, err
}
op, err := a.buildFrom(s[0])
if err != nil {
return nil, err
}
if op != nil {
s.Prepend(op)
}
return s, nil
}
type Analyzer struct {
Entry dag.Seq

type analyzer struct {
ctx context.Context
head *lakeparse.Commitish
opStack []*ast.OpDecl
Expand All @@ -39,36 +22,48 @@ type analyzer struct {
zctx *zed.Context
}

func newAnalyzer(ctx context.Context, source *data.Source, head *lakeparse.Commitish) *analyzer {
return &analyzer{
// Analyze performs a semantic analysis of the AST, translating it from AST
// to DAG form, resolving syntax ambiguities, and performing constant propagation.
// After semantic analysis, the DAG is ready for either optimization or compilation.
func NewAnalyzer(ctx context.Context, seq ast.Seq, source *data.Source, head *lakeparse.Commitish) (*Analyzer, error) {
a := &Analyzer{
ctx: ctx,
head: head,
source: source,
scope: NewScope(nil),
zctx: zed.NewContext(),
}
var err error
if a.Entry, err = a.semSeq(seq); err != nil {
return nil, err
}
return a, nil
}

func (a *analyzer) enterScope() {
a.scope = NewScope(a.scope)
func (a *Analyzer) HasSource() bool {
return hasSource(a.Entry[0])
}

func (a *analyzer) exitScope() {
a.scope = a.scope.parent
}

func (a *analyzer) buildFrom(op dag.Op) (dag.Op, error) {
func hasSource(op dag.Op) bool {
switch op := op.(type) {
case *dag.FileScan, *dag.HTTPScan, *dag.PoolScan, *dag.LakeMetaScan, *dag.PoolMetaScan, *dag.CommitMetaScan, *dag.DeleteScan:
return nil, nil
return true
case *dag.Fork:
return a.buildFrom(op.Paths[0][0])
return hasSource(op.Paths[0][0])
case *dag.Scope:
return a.buildFrom(op.Body[0])
return hasSource(op.Body[0])
}
return false
}

func (a *Analyzer) AddDefaultSource() error {
if hasSource(a.Entry[0]) {
return nil
}
// No from so add a source.
if a.head == nil {
return &kernel.Reader{}, nil
a.Entry.Prepend(&kernel.Reader{})
return nil
}
pool := &ast.Pool{
Kind: "Pool",
Expand All @@ -81,9 +76,32 @@ func (a *analyzer) buildFrom(op dag.Op) (dag.Op, error) {
}
ops, err := a.semPool(pool)
if err != nil {
return nil, err
return err
}
a.Entry.Prepend(ops[0])
return nil
}

func (a *Analyzer) StartsWithYield() bool {
return startsWithYield(a.Entry[0])
}

func startsWithYield(op dag.Op) bool {
switch op := op.(type) {
case *dag.Yield:
return true
case *dag.Scope:
return startsWithYield(op.Body[0])
}
return ops[0], nil
return false
}

func (a *Analyzer) enterScope() {
a.scope = NewScope(a.scope)
}

func (a *Analyzer) exitScope() {
a.scope = a.scope.parent
}

type opDecl struct {
Expand Down
Loading

0 comments on commit 777dc58

Please sign in to comment.