Skip to content

Commit

Permalink
Merge pull request #8770 from influxdata/js-reduce-walk-refs-memory-u…
Browse files Browse the repository at this point in the history
…sage

Reduce how long it takes to walk the varrefs in an expression
  • Loading branch information
jsternberg authored Aug 31, 2017
2 parents 732a0c2 + 466fc90 commit 0ef033f
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
- [#8755](https://github.com/influxdata/influxdb/pull/8755): Fix race condition accessing `seriesByID` map.
- [#8766](https://github.com/influxdata/influxdb/pull/8766): Fix deadlock when calling `SeriesIDsAllOrByExpr`
- [#8638](https://github.com/influxdata/influxdb/issues/8638): Fix `influx_inspect export` so it skips missing files.
- [#8770](https://github.com/influxdata/influxdb/pull/8770): Reduce how long it takes to walk the varrefs in an expression.

## v1.3.4 [unreleased]

Expand Down Expand Up @@ -195,6 +196,7 @@ The following new configuration options are available.
- [#8470](https://github.com/influxdata/influxdb/issues/8470): index file fd leak in tsi branch
- [#8468](https://github.com/influxdata/influxdb/pull/8468): Fix TSI non-contiguous compaction panic.
- [#8500](https://github.com/influxdata/influxdb/issues/8500): InfluxDB goes unresponsive

## v1.2.4 [2017-05-08]

### Bugfixes
Expand Down
42 changes: 23 additions & 19 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1803,29 +1803,33 @@ func walkNames(exp Expr) []string {

// walkRefs will walk the Expr and return the var refs used.
func walkRefs(exp Expr) []VarRef {
switch expr := exp.(type) {
case *VarRef:
return []VarRef{*expr}
case *Call:
a := make([]VarRef, 0, len(expr.Args))
for _, expr := range expr.Args {
if ref, ok := expr.(*VarRef); ok {
a = append(a, *ref)
refs := make(map[VarRef]struct{})
var walk func(exp Expr)
walk = func(exp Expr) {
switch expr := exp.(type) {
case *VarRef:
refs[*expr] = struct{}{}
case *Call:
for _, expr := range expr.Args {
if ref, ok := expr.(*VarRef); ok {
refs[*ref] = struct{}{}
}
}
case *BinaryExpr:
walk(expr.LHS)
walk(expr.RHS)
case *ParenExpr:
walk(expr.Expr)
}
return a
case *BinaryExpr:
lhs := walkRefs(expr.LHS)
rhs := walkRefs(expr.RHS)
ret := make([]VarRef, 0, len(lhs)+len(rhs))
ret = append(ret, lhs...)
ret = append(ret, rhs...)
return ret
case *ParenExpr:
return walkRefs(expr.Expr)
}
walk(exp)

return nil
// Turn the map into a slice.
a := make([]VarRef, 0, len(refs))
for ref := range refs {
a = append(a, ref)
}
return a
}

// ExprNames returns a list of non-"time" field names from an expression.
Expand Down
19 changes: 19 additions & 0 deletions influxql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1630,3 +1630,22 @@ func (fm *FieldMapper) MapType(m *influxql.Measurement, field string) influxql.D
}
return influxql.Unknown
}

// BenchmarkExprNames benchmarks how long it takes to run ExprNames.
func BenchmarkExprNames(b *testing.B) {
exprs := make([]string, 100)
for i := range exprs {
exprs[i] = fmt.Sprintf("host = 'server%02d'", i)
}
condition := MustParseExpr(strings.Join(exprs, " OR "))

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
refs := influxql.ExprNames(condition)
if have, want := refs, []influxql.VarRef{{Val: "host"}}; !reflect.DeepEqual(have, want) {
b.Fatalf("unexpected expression names: have=%s want=%s", have, want)
}
}
}
11 changes: 3 additions & 8 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1798,20 +1798,15 @@ func (e *Engine) createTagSetIterators(ref *influxql.VarRef, name string, t *que

// createTagSetGroupIterators creates a set of iterators for a subset of a tagset's series.
func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, name string, seriesKeys []string, t *query.TagSet, filters []influxql.Expr, opt query.IteratorOptions) ([]query.Iterator, error) {
conditionFields := make([]influxql.VarRef, len(influxql.ExprNames(opt.Condition)))

itrs := make([]query.Iterator, 0, len(seriesKeys))
for i, seriesKey := range seriesKeys {
fields := 0
var conditionFields []influxql.VarRef
if filters[i] != nil {
// Retrieve non-time fields from this series filter and filter out tags.
for _, f := range influxql.ExprNames(filters[i]) {
conditionFields[fields] = f
fields++
}
conditionFields = influxql.ExprNames(filters[i])
}

itr, err := e.createVarRefSeriesIterator(ref, name, seriesKey, t, filters[i], conditionFields[:fields], opt)
itr, err := e.createVarRefSeriesIterator(ref, name, seriesKey, t, filters[i], conditionFields, opt)
if err != nil {
return itrs, err
} else if itr == nil {
Expand Down

0 comments on commit 0ef033f

Please sign in to comment.