Skip to content

Commit

Permalink
zq: Remove dual input join (#4689)
Browse files Browse the repository at this point in the history
Remove functionality from zq where if a program starts with a headless
join and is run with two inputs, the first file is the left part of the
join while the second is the right side.
  • Loading branch information
mattnibs authored Jun 29, 2023
1 parent 8bf604c commit c40f238
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 75 deletions.
33 changes: 4 additions & 29 deletions compiler/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,10 @@ func (f *fsCompiler) NewQuery(octx *op.Context, seq ast.Seq, readers []zio.Reade
if err != nil {
return nil, err
}
if isJoin(seq) {
if len(readers) != 2 {
return nil, errors.New("join operator requires two inputs")
}
if len(job.readers) != 2 {
return nil, errors.New("internal error: join expected by semantic analyzer")
}
job.readers[0].Readers = readers[0:1]
job.readers[1].Readers = readers[1:2]
} else if len(readers) == 0 {
if len(readers) == 0 {
// If there's no reader but the DAG wants an input, then
// flag an error.
if len(job.readers) != 0 {
if job.reader != nil {
return nil, errors.New("no input specified: use a command-line file or a Zed source operator")
}
} else {
Expand All @@ -47,13 +38,10 @@ func (f *fsCompiler) NewQuery(octx *op.Context, seq ast.Seq, readers []zio.Reade
// TBD: we could have such a configuration is a composite
// from command includes a "pass" operator, but we can add this later.
// See issue #2640.
if len(job.readers) == 0 {
if job.reader == nil {
return nil, errors.New("redundant inputs specified: use either command-line files or a Zed source operator")
}
if len(job.readers) != 1 {
return nil, errors.New("Zed query requires a single input path")
}
job.readers[0].Readers = readers
job.reader.Readers = readers
}
return optimizeAndBuild(job)
}
Expand All @@ -66,19 +54,6 @@ func (*fsCompiler) NewLakeDeleteQuery(octx *op.Context, program ast.Seq, head *l
panic("NewLakeDeleteQuery called on compiler.fsCompiler")
}

func isJoin(seq ast.Seq) bool {
if len(seq) == 0 {
return false
}
switch op := seq[0].(type) {
case *ast.Join:
return true
case *ast.Scope:
return isJoin(op.Body)
}
return false
}

func optimizeAndBuild(job *Job) (*runtime.Query, error) {
// Call optimize to possible push down a filter predicate into the
// kernel.Reader so that the zng scanner can do boyer-moore.
Expand Down
28 changes: 7 additions & 21 deletions compiler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Job struct {
builder *kernel.Builder
optimizer *optimizer.Optimizer
outputs []zbuf.Puller
readers []*kernel.Reader
reader *kernel.Reader
puller zbuf.Puller
entry dag.Seq
}
Expand All @@ -40,7 +40,7 @@ func NewJob(octx *op.Context, in ast.Seq, src *data.Source, head *lakeparse.Comm
if len(seq) == 0 {
return nil, errors.New("internal error: AST seq cannot be empty")
}
from, readers, err := buildFrom(seq[0], head)
from, reader, err := buildFrom(seq[0], head)
if err != nil {
return nil, err
}
Expand All @@ -55,38 +55,24 @@ func NewJob(octx *op.Context, in ast.Seq, src *data.Source, head *lakeparse.Comm
octx: octx,
builder: kernel.NewBuilder(octx, src),
optimizer: optimizer.New(octx.Context, src),
readers: readers,
reader: reader,
entry: entry,
}, nil
}

func buildFrom(op ast.Op, head *lakeparse.Commitish) (*ast.From, []*kernel.Reader, error) {
var readers []*kernel.Reader
func buildFrom(op ast.Op, head *lakeparse.Commitish) (*ast.From, *kernel.Reader, error) {
var from *ast.From
switch op := op.(type) {
case *ast.From:
// Already have an entry point with From. Do nothing.
return nil, nil, nil
case *ast.Join:
readers = []*kernel.Reader{{}, {}}
trunk0 := ast.Trunk{
Kind: "Trunk",
Source: readers[0],
}
trunk1 := ast.Trunk{
Kind: "Trunk",
Source: readers[1],
}
return &ast.From{
Kind: "From",
Trunks: []ast.Trunk{trunk0, trunk1},
}, readers, nil
case *ast.Scope:
if len(op.Body) == 0 {
return nil, nil, errors.New("internal error: scope op has empty body")
}
return buildFrom(op.Body[0], head)
default:
var readers *kernel.Reader
trunk := ast.Trunk{Kind: "Trunk"}
if head != nil {
// For the lakes, if there is no from operator, then
Expand All @@ -102,8 +88,8 @@ func buildFrom(op ast.Op, head *lakeparse.Commitish) (*ast.From, []*kernel.Reade
},
}
} else {
readers = []*kernel.Reader{{}}
trunk.Source = readers[0]
readers = &kernel.Reader{}
trunk.Source = readers
}
from = &ast.From{
Kind: "From",
Expand Down
2 changes: 1 addition & 1 deletion compiler/lake.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (l *lakeCompiler) NewLakeQuery(octx *op.Context, program ast.Seq, paralleli
if err != nil {
return nil, err
}
if len(job.readers) != 0 {
if job.reader != nil {
return nil, errors.New("query must include a 'from' operator")
}
if err := job.Optimize(); err != nil {
Expand Down
11 changes: 6 additions & 5 deletions compiler/reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package compiler

import (
"errors"
"fmt"

"github.com/brimdata/zed/compiler/ast"
Expand Down Expand Up @@ -29,12 +30,12 @@ func CompileWithSortKey(octx *op.Context, seq ast.Seq, r zio.Reader, sortKey ord
if err != nil {
return nil, err
}
readers := job.readers
if len(readers) != 1 {
return nil, fmt.Errorf("CompileWithSortKey: Zed program expected %d readers", len(readers))
reader := job.reader
if reader == nil {
return nil, errors.New("CompileWithSortKey: Zed program expected a reader")
}
readers[0].Readers = []zio.Reader{r}
readers[0].SortKey = sortKey
reader.Readers = []zio.Reader{r}
reader.SortKey = sortKey
return optimizeAndBuild(job)
}

Expand Down
2 changes: 1 addition & 1 deletion runtime/op/join/ztests/empty-inner.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
script: |
zq -z 'left join on a=b hit:=sc | sort a' A.zson C.zson
zq -z 'left join (file C.zson) on a=b hit:=sc' A.zson
inputs:
- name: A.zson
Expand Down
8 changes: 4 additions & 4 deletions runtime/op/join/ztests/expr.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
script: |
zq -z 'left join on s b' A.zson B.zson
zq -z 'left join (file B.zson) on s b' A.zson
echo ===
zq -z 'left join on s=(lower(s)) b' A.zson B.zson
zq -z 'left join (file B.zson) on s=(lower(s)) b' A.zson
echo ===
zq -z 'left join on (lower(s))=(lower(s)) b' A.zson B.zson
zq -z 'left join (file B.zson) on (lower(s))=(lower(s)) b' A.zson
echo ===
zq -z 'left join on s' A.zson B.zson
zq -z 'left join (file B.zson) on s' A.zson
inputs:
- name: A.zson
Expand Down
2 changes: 1 addition & 1 deletion runtime/op/join/ztests/join-union.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
script: |
zq -z 'inner join on a=b' a.zson b.zson
zq -z 'inner join (file b.zson) on a=b' a.zson
inputs:
- name: a.zson
data: |
Expand Down
8 changes: 4 additions & 4 deletions runtime/op/join/ztests/kinds.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
script: |
echo === ANTI ===
zq -z 'anti join on a=b | sort a' A.zson B.zson
zq -z 'anti join (file B.zson) on a=b | sort a' A.zson
echo === LEFT ===
zq -z 'left join on a=b hit:=sb | sort a' A.zson B.zson
zq -z 'left join (file B.zson) on a=b hit:=sb | sort a' A.zson
echo === INNER ===
zq -z 'inner join on a=b hit:=sb | sort a' A.zson B.zson
zq -z 'inner join (file B.zson) on a=b hit:=sb | sort a' A.zson
echo === RIGHT ===
zq -z 'right join on b=c hit:=sb | sort c' B.zson C.zson
zq -z 'right join (file C.zson) on b=c hit:=sb | sort c' B.zson
inputs:
- name: A.zson
Expand Down
2 changes: 1 addition & 1 deletion runtime/ztests/parallel-err.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ inputs:
outputs:
- name: stderr
data: |
join operator requires two inputs
join requires two upstream parallel query paths
2 changes: 1 addition & 1 deletion runtime/ztests/parallel-stdin.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
script: |
zq -z 'join on a=b b:=b' - B.zson
zq -z 'join (file B.zson) on a=b b:=b' -
inputs:
- name: stdin
Expand Down
2 changes: 1 addition & 1 deletion runtime/ztests/parallel.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
script: |
zq -z 'join on a=b b:=b' A.zson B.zson
zq -z 'join (file B.zson) on a=b b:=b' A.zson
inputs:
- name: A.zson
Expand Down
17 changes: 11 additions & 6 deletions zfmt/ztests/join.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
script: |
zc -C "join on x=x p:=a"
zc -C "join (file test.zson) on x=x p:=a"
echo ===
zc -C -s "join on x=x p:=a"
zc -C -s "join (file test.zson) on x=x p:=a"
outputs:
- name: stdout
data: |
join on x=x p:=a
join (
from (
file test.zson
)
) on x=x p:=a
===
fork (
reader
| fork (
=>
reader
pass
=>
reader
file test.zson
)
| join on x=x p:=a

0 comments on commit c40f238

Please sign in to comment.