Skip to content

Commit

Permalink
Support the WHERE clause in outer queries with subqueries
Browse files Browse the repository at this point in the history
  • Loading branch information
jsternberg committed Jan 21, 2017
1 parent bcdb0a7 commit d17566d
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 2 deletions.
236 changes: 236 additions & 0 deletions influxql/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2403,6 +2403,65 @@ type floatDedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type floatFilterIterator struct {
input FloatIterator
cond Expr
opt IteratorOptions
m map[string]interface{}
}

func newFloatFilterIterator(input FloatIterator, cond Expr, opt IteratorOptions) FloatIterator {
// Strip out time conditions from the WHERE clause.
// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
n := RewriteFunc(CloneExpr(cond), func(n Node) Node {
switch n := n.(type) {
case *BinaryExpr:
if n.LHS.String() == "time" {
return &BooleanLiteral{Val: true}
}
}
return n
})

cond, _ = n.(Expr)
if cond == nil {
return input
} else if n, ok := cond.(*BooleanLiteral); ok && n.Val {
return input
}

return &floatFilterIterator{
input: input,
cond: cond,
opt: opt,
m: make(map[string]interface{}),
}
}

func (itr *floatFilterIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *floatFilterIterator) Close() error { return itr.input.Close() }

func (itr *floatFilterIterator) Next() (*FloatPoint, error) {
for {
p, err := itr.input.Next()
if err != nil || p == nil {
return nil, err
}

for i, ref := range itr.opt.Aux {
itr.m[ref.Val] = p.Aux[i]
}
for k, v := range p.Tags.KeyValues() {
itr.m[k] = v
}

if !EvalBool(itr.cond, itr.m) {
continue
}
return p, nil
}
}

// newFloatDedupeIterator returns a new instance of floatDedupeIterator.
func newFloatDedupeIterator(input FloatIterator) *floatDedupeIterator {
return &floatDedupeIterator{
Expand Down Expand Up @@ -4860,6 +4919,65 @@ type integerDedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type integerFilterIterator struct {
input IntegerIterator
cond Expr
opt IteratorOptions
m map[string]interface{}
}

func newIntegerFilterIterator(input IntegerIterator, cond Expr, opt IteratorOptions) IntegerIterator {
// Strip out time conditions from the WHERE clause.
// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
n := RewriteFunc(CloneExpr(cond), func(n Node) Node {
switch n := n.(type) {
case *BinaryExpr:
if n.LHS.String() == "time" {
return &BooleanLiteral{Val: true}
}
}
return n
})

cond, _ = n.(Expr)
if cond == nil {
return input
} else if n, ok := cond.(*BooleanLiteral); ok && n.Val {
return input
}

return &integerFilterIterator{
input: input,
cond: cond,
opt: opt,
m: make(map[string]interface{}),
}
}

func (itr *integerFilterIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *integerFilterIterator) Close() error { return itr.input.Close() }

func (itr *integerFilterIterator) Next() (*IntegerPoint, error) {
for {
p, err := itr.input.Next()
if err != nil || p == nil {
return nil, err
}

for i, ref := range itr.opt.Aux {
itr.m[ref.Val] = p.Aux[i]
}
for k, v := range p.Tags.KeyValues() {
itr.m[k] = v
}

if !EvalBool(itr.cond, itr.m) {
continue
}
return p, nil
}
}

// newIntegerDedupeIterator returns a new instance of integerDedupeIterator.
func newIntegerDedupeIterator(input IntegerIterator) *integerDedupeIterator {
return &integerDedupeIterator{
Expand Down Expand Up @@ -7302,6 +7420,65 @@ type stringDedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type stringFilterIterator struct {
input StringIterator
cond Expr
opt IteratorOptions
m map[string]interface{}
}

func newStringFilterIterator(input StringIterator, cond Expr, opt IteratorOptions) StringIterator {
// Strip out time conditions from the WHERE clause.
// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
n := RewriteFunc(CloneExpr(cond), func(n Node) Node {
switch n := n.(type) {
case *BinaryExpr:
if n.LHS.String() == "time" {
return &BooleanLiteral{Val: true}
}
}
return n
})

cond, _ = n.(Expr)
if cond == nil {
return input
} else if n, ok := cond.(*BooleanLiteral); ok && n.Val {
return input
}

return &stringFilterIterator{
input: input,
cond: cond,
opt: opt,
m: make(map[string]interface{}),
}
}

func (itr *stringFilterIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *stringFilterIterator) Close() error { return itr.input.Close() }

func (itr *stringFilterIterator) Next() (*StringPoint, error) {
for {
p, err := itr.input.Next()
if err != nil || p == nil {
return nil, err
}

for i, ref := range itr.opt.Aux {
itr.m[ref.Val] = p.Aux[i]
}
for k, v := range p.Tags.KeyValues() {
itr.m[k] = v
}

if !EvalBool(itr.cond, itr.m) {
continue
}
return p, nil
}
}

// newStringDedupeIterator returns a new instance of stringDedupeIterator.
func newStringDedupeIterator(input StringIterator) *stringDedupeIterator {
return &stringDedupeIterator{
Expand Down Expand Up @@ -9744,6 +9921,65 @@ type booleanDedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type booleanFilterIterator struct {
input BooleanIterator
cond Expr
opt IteratorOptions
m map[string]interface{}
}

func newBooleanFilterIterator(input BooleanIterator, cond Expr, opt IteratorOptions) BooleanIterator {
// Strip out time conditions from the WHERE clause.
// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
n := RewriteFunc(CloneExpr(cond), func(n Node) Node {
switch n := n.(type) {
case *BinaryExpr:
if n.LHS.String() == "time" {
return &BooleanLiteral{Val: true}
}
}
return n
})

cond, _ = n.(Expr)
if cond == nil {
return input
} else if n, ok := cond.(*BooleanLiteral); ok && n.Val {
return input
}

return &booleanFilterIterator{
input: input,
cond: cond,
opt: opt,
m: make(map[string]interface{}),
}
}

func (itr *booleanFilterIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *booleanFilterIterator) Close() error { return itr.input.Close() }

func (itr *booleanFilterIterator) Next() (*BooleanPoint, error) {
for {
p, err := itr.input.Next()
if err != nil || p == nil {
return nil, err
}

for i, ref := range itr.opt.Aux {
itr.m[ref.Val] = p.Aux[i]
}
for k, v := range p.Tags.KeyValues() {
itr.m[k] = v
}

if !EvalBool(itr.cond, itr.m) {
continue
}
return p, nil
}
}

// newBooleanDedupeIterator returns a new instance of booleanDedupeIterator.
func newBooleanDedupeIterator(input BooleanIterator) *booleanDedupeIterator {
return &booleanDedupeIterator{
Expand Down
59 changes: 59 additions & 0 deletions influxql/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,65 @@ type {{$k.name}}DedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type {{$k.name}}FilterIterator struct {
input {{$k.Name}}Iterator
cond Expr
opt IteratorOptions
m map[string]interface{}
}

func new{{$k.Name}}FilterIterator(input {{$k.Name}}Iterator, cond Expr, opt IteratorOptions) {{$k.Name}}Iterator {
// Strip out time conditions from the WHERE clause.
// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
n := RewriteFunc(CloneExpr(cond), func(n Node) Node {
switch n := n.(type) {
case *BinaryExpr:
if n.LHS.String() == "time" {
return &BooleanLiteral{Val: true}
}
}
return n
})

cond, _ = n.(Expr)
if cond == nil {
return input
} else if n, ok := cond.(*BooleanLiteral); ok && n.Val {
return input
}

return &{{$k.name}}FilterIterator{
input: input,
cond: cond,
opt: opt,
m: make(map[string]interface{}),
}
}

func (itr *{{$k.name}}FilterIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *{{$k.name}}FilterIterator) Close() error { return itr.input.Close() }

func (itr *{{$k.name}}FilterIterator) Next() (*{{$k.Name}}Point, error) {
for {
p, err := itr.input.Next()
if err != nil || p == nil {
return nil, err
}

for i, ref := range itr.opt.Aux {
itr.m[ref.Val] = p.Aux[i]
}
for k, v := range p.Tags.KeyValues() {
itr.m[k] = v
}

if !EvalBool(itr.cond, itr.m) {
continue
}
return p, nil
}
}

// new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator.
func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator {
return &{{$k.name}}DedupeIterator{
Expand Down
22 changes: 22 additions & 0 deletions influxql/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,28 @@ func NewLimitIterator(input Iterator, opt IteratorOptions) Iterator {
}
}

// NewFilterIterator returns an iterator that filters the points based on the
// condition. This iterator is not nearly as efficient as filtering points
// within the query engine and is only used when filtering subqueries.
func NewFilterIterator(input Iterator, cond Expr, opt IteratorOptions) Iterator {
if input == nil {
return nil
}

switch input := input.(type) {
case FloatIterator:
return newFloatFilterIterator(input, cond, opt)
case IntegerIterator:
return newIntegerFilterIterator(input, cond, opt)
case StringIterator:
return newStringFilterIterator(input, cond, opt)
case BooleanIterator:
return newBooleanFilterIterator(input, cond, opt)
default:
panic(fmt.Sprintf("unsupported filter iterator type: %T", input))
}
}

// NewDedupeIterator returns an iterator that only outputs unique points.
// This iterator maintains a serialized copy of each row so it is inefficient
// to use on large datasets. It is intended for small datasets such as meta queries.
Expand Down
Loading

0 comments on commit d17566d

Please sign in to comment.