Skip to content

Commit

Permalink
Refactor the subquery code and fix outer condition queries
Browse files Browse the repository at this point in the history
This change refactors the subquery code into a separate builder class to
help allow for more reuse and make the functions smaller and easier to
read.

The previous function that handled most of the code was too big and
impossible to reason through.

This also goes and replaces the complicated logic of aggregates that had
a subquery source with the simpler IteratorMapper. I think the overhead
from the IteratorMapper will be more, but I also believe that the actual
code is simpler and more robust to produce more accurate answers. It
might be a future project to optimize that section of code, but I don't
have any actual numbers for the efficiency of one method and I believe
accuracy and code clarity may be more important at the moment since I am
otherwise incapable of reading my own code.
  • Loading branch information
jsternberg committed Mar 1, 2017
1 parent b942f3a commit 529d8b9
Show file tree
Hide file tree
Showing 7 changed files with 503 additions and 386 deletions.
17 changes: 16 additions & 1 deletion cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4692,7 +4692,7 @@ func TestServer_Query_Subqueries(t *testing.T) {
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT host FROM (SELECT mean(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0}]}`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host"],"values":[["2000-01-01T00:00:00Z","server01"],["2000-01-01T00:00:00Z","server02"]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
Expand Down Expand Up @@ -4749,6 +4749,21 @@ func TestServer_Query_Subqueries(t *testing.T) {
command: `SELECT value FROM (SELECT max(usage_user), usage_user - usage_system AS value FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z' AND value > 0`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:00Z",40]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT max FROM (SELECT max(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z' AND host = 'server01'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","max"],"values":[["2000-01-01T00:00:00Z",70]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean(value) FROM (SELECT max(usage_user), usage_user - usage_system AS value FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z' AND value > 0`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",40]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean(value) FROM (SELECT max(usage_user), usage_user - usage_system AS value FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z' AND host =~ /server/`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",-2]]}]}]}`,
},
}...)

for i, query := range test.queries {
Expand Down
256 changes: 256 additions & 0 deletions influxql/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2539,6 +2539,70 @@ type floatDedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type floatIteratorMapper struct {
e *Emitter
buf []interface{}
driver IteratorMap // which iterator to use for the primary value, can be nil
fields []IteratorMap // which iterator to use for an aux field
point FloatPoint
}

func newFloatIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *floatIteratorMapper {
e := NewEmitter(itrs, opt.Ascending, 0)
e.OmitTime = true
return &floatIteratorMapper{
e: e,
buf: make([]interface{}, len(itrs)),
driver: driver,
fields: fields,
point: FloatPoint{
Aux: make([]interface{}, len(fields)),
},
}
}

func (itr *floatIteratorMapper) Next() (*FloatPoint, error) {
t, name, tags, err := itr.e.loadBuf()
if err != nil || t == ZeroTime {
return nil, err
}
itr.point.Time = t
itr.point.Name = name
itr.point.Tags = tags

itr.e.readInto(t, name, tags, itr.buf)
if itr.driver != nil {
if v := itr.driver.Value(tags, itr.buf); v != nil {
if v, ok := v.(float64); ok {
itr.point.Value = v
itr.point.Nil = false
} else {
itr.point.Value = 0
itr.point.Nil = true
}
} else {
itr.point.Value = 0
itr.point.Nil = true
}
}
for i, f := range itr.fields {
itr.point.Aux[i] = f.Value(tags, itr.buf)
}
return &itr.point, nil
}

func (itr *floatIteratorMapper) Stats() IteratorStats {
stats := IteratorStats{}
for _, itr := range itr.e.itrs {
stats.Add(itr.Stats())
}
return stats
}

func (itr *floatIteratorMapper) Close() error {
return itr.e.Close()
}

type floatFilterIterator struct {
input FloatIterator
cond Expr
Expand Down Expand Up @@ -5191,6 +5255,70 @@ type integerDedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type integerIteratorMapper struct {
e *Emitter
buf []interface{}
driver IteratorMap // which iterator to use for the primary value, can be nil
fields []IteratorMap // which iterator to use for an aux field
point IntegerPoint
}

func newIntegerIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *integerIteratorMapper {
e := NewEmitter(itrs, opt.Ascending, 0)
e.OmitTime = true
return &integerIteratorMapper{
e: e,
buf: make([]interface{}, len(itrs)),
driver: driver,
fields: fields,
point: IntegerPoint{
Aux: make([]interface{}, len(fields)),
},
}
}

func (itr *integerIteratorMapper) Next() (*IntegerPoint, error) {
t, name, tags, err := itr.e.loadBuf()
if err != nil || t == ZeroTime {
return nil, err
}
itr.point.Time = t
itr.point.Name = name
itr.point.Tags = tags

itr.e.readInto(t, name, tags, itr.buf)
if itr.driver != nil {
if v := itr.driver.Value(tags, itr.buf); v != nil {
if v, ok := v.(int64); ok {
itr.point.Value = v
itr.point.Nil = false
} else {
itr.point.Value = 0
itr.point.Nil = true
}
} else {
itr.point.Value = 0
itr.point.Nil = true
}
}
for i, f := range itr.fields {
itr.point.Aux[i] = f.Value(tags, itr.buf)
}
return &itr.point, nil
}

func (itr *integerIteratorMapper) Stats() IteratorStats {
stats := IteratorStats{}
for _, itr := range itr.e.itrs {
stats.Add(itr.Stats())
}
return stats
}

func (itr *integerIteratorMapper) Close() error {
return itr.e.Close()
}

type integerFilterIterator struct {
input IntegerIterator
cond Expr
Expand Down Expand Up @@ -7828,6 +7956,70 @@ type stringDedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type stringIteratorMapper struct {
e *Emitter
buf []interface{}
driver IteratorMap // which iterator to use for the primary value, can be nil
fields []IteratorMap // which iterator to use for an aux field
point StringPoint
}

func newStringIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *stringIteratorMapper {
e := NewEmitter(itrs, opt.Ascending, 0)
e.OmitTime = true
return &stringIteratorMapper{
e: e,
buf: make([]interface{}, len(itrs)),
driver: driver,
fields: fields,
point: StringPoint{
Aux: make([]interface{}, len(fields)),
},
}
}

func (itr *stringIteratorMapper) Next() (*StringPoint, error) {
t, name, tags, err := itr.e.loadBuf()
if err != nil || t == ZeroTime {
return nil, err
}
itr.point.Time = t
itr.point.Name = name
itr.point.Tags = tags

itr.e.readInto(t, name, tags, itr.buf)
if itr.driver != nil {
if v := itr.driver.Value(tags, itr.buf); v != nil {
if v, ok := v.(string); ok {
itr.point.Value = v
itr.point.Nil = false
} else {
itr.point.Value = ""
itr.point.Nil = true
}
} else {
itr.point.Value = ""
itr.point.Nil = true
}
}
for i, f := range itr.fields {
itr.point.Aux[i] = f.Value(tags, itr.buf)
}
return &itr.point, nil
}

func (itr *stringIteratorMapper) Stats() IteratorStats {
stats := IteratorStats{}
for _, itr := range itr.e.itrs {
stats.Add(itr.Stats())
}
return stats
}

func (itr *stringIteratorMapper) Close() error {
return itr.e.Close()
}

type stringFilterIterator struct {
input StringIterator
cond Expr
Expand Down Expand Up @@ -10465,6 +10657,70 @@ type booleanDedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type booleanIteratorMapper struct {
e *Emitter
buf []interface{}
driver IteratorMap // which iterator to use for the primary value, can be nil
fields []IteratorMap // which iterator to use for an aux field
point BooleanPoint
}

func newBooleanIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *booleanIteratorMapper {
e := NewEmitter(itrs, opt.Ascending, 0)
e.OmitTime = true
return &booleanIteratorMapper{
e: e,
buf: make([]interface{}, len(itrs)),
driver: driver,
fields: fields,
point: BooleanPoint{
Aux: make([]interface{}, len(fields)),
},
}
}

func (itr *booleanIteratorMapper) Next() (*BooleanPoint, error) {
t, name, tags, err := itr.e.loadBuf()
if err != nil || t == ZeroTime {
return nil, err
}
itr.point.Time = t
itr.point.Name = name
itr.point.Tags = tags

itr.e.readInto(t, name, tags, itr.buf)
if itr.driver != nil {
if v := itr.driver.Value(tags, itr.buf); v != nil {
if v, ok := v.(bool); ok {
itr.point.Value = v
itr.point.Nil = false
} else {
itr.point.Value = false
itr.point.Nil = true
}
} else {
itr.point.Value = false
itr.point.Nil = true
}
}
for i, f := range itr.fields {
itr.point.Aux[i] = f.Value(tags, itr.buf)
}
return &itr.point, nil
}

func (itr *booleanIteratorMapper) Stats() IteratorStats {
stats := IteratorStats{}
for _, itr := range itr.e.itrs {
stats.Add(itr.Stats())
}
return stats
}

func (itr *booleanIteratorMapper) Close() error {
return itr.e.Close()
}

type booleanFilterIterator struct {
input BooleanIterator
cond Expr
Expand Down
64 changes: 64 additions & 0 deletions influxql/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,70 @@ type {{$k.name}}DedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type {{$k.name}}IteratorMapper struct {
e *Emitter
buf []interface{}
driver IteratorMap // which iterator to use for the primary value, can be nil
fields []IteratorMap // which iterator to use for an aux field
point {{$k.Name}}Point
}

func new{{$k.Name}}IteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *{{$k.name}}IteratorMapper {
e := NewEmitter(itrs, opt.Ascending, 0)
e.OmitTime = true
return &{{$k.name}}IteratorMapper{
e: e,
buf: make([]interface{}, len(itrs)),
driver: driver,
fields: fields,
point: {{$k.Name}}Point{
Aux: make([]interface{}, len(fields)),
},
}
}

func (itr *{{$k.name}}IteratorMapper) Next() (*{{$k.Name}}Point, error) {
t, name, tags, err := itr.e.loadBuf()
if err != nil || t == ZeroTime {
return nil, err
}
itr.point.Time = t
itr.point.Name = name
itr.point.Tags = tags

itr.e.readInto(t, name, tags, itr.buf)
if itr.driver != nil {
if v := itr.driver.Value(tags, itr.buf); v != nil {
if v, ok := v.({{$k.Type}}); ok {
itr.point.Value = v
itr.point.Nil = false
} else {
itr.point.Value = {{$k.Nil}}
itr.point.Nil = true
}
} else {
itr.point.Value = {{$k.Nil}}
itr.point.Nil = true
}
}
for i, f := range itr.fields {
itr.point.Aux[i] = f.Value(tags, itr.buf)
}
return &itr.point, nil
}

func (itr *{{$k.name}}IteratorMapper) Stats() IteratorStats {
stats := IteratorStats{}
for _, itr := range itr.e.itrs {
stats.Add(itr.Stats())
}
return stats
}

func (itr *{{$k.name}}IteratorMapper) Close() error {
return itr.e.Close()
}

type {{$k.name}}FilterIterator struct {
input {{$k.Name}}Iterator
cond Expr
Expand Down
Loading

0 comments on commit 529d8b9

Please sign in to comment.