Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Back port PR #8716 #8722

Merged
merged 4 commits into from
Aug 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [#8677](https://github.com/influxdata/influxdb/issues/8677): Fix backups when snapshot is empty.
- [#8706](https://github.com/influxdata/influxdb/pull/8706): Cursor leak, resulting in an accumulation of `.tsm.tmp` files after compactions.
- [#8713](https://github.com/influxdata/influxdb/issues/8713): Deadlock when dropping measurement and writing
- [#8716](https://github.com/influxdata/influxdb/pull/8716): Ensure inputs are closed on error. Add runtime GC finalizer as additional guard to close iterators

### Features

Expand Down
9 changes: 4 additions & 5 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1560,7 +1560,7 @@ func (e *Engine) CreateIterator(measurement string, opt influxql.IteratorOptions
if err != nil {
return nil, err
}
return influxql.Iterators(itrs).Merge(opt)
return newMergeFinalizerIterator(itrs, opt)
}
}

Expand All @@ -1570,14 +1570,14 @@ func (e *Engine) CreateIterator(measurement string, opt influxql.IteratorOptions
} else if len(inputs) == 0 {
return nil, nil
}
return influxql.Iterators(inputs).Merge(opt)
return newMergeFinalizerIterator(inputs, opt)
}

itrs, err := e.createVarRefIterator(measurement, opt)
if err != nil {
return nil, err
}
return influxql.Iterators(itrs).Merge(opt)
return newMergeFinalizerIterator(itrs, opt)
}

func (e *Engine) createCallIterator(measurement string, call *influxql.Call, opt influxql.IteratorOptions) ([]influxql.Iterator, error) {
Expand Down Expand Up @@ -1631,6 +1631,7 @@ func (e *Engine) createCallIterator(measurement string, call *influxql.Call, opt

itr, err := influxql.NewCallIterator(input, opt)
if err != nil {
influxql.Iterators(inputs).Close()
return err
}
inputs[i] = itr
Expand Down Expand Up @@ -1754,8 +1755,6 @@ func (e *Engine) createTagSetIterators(ref *influxql.VarRef, name string, t *inf
group.keys = t.SeriesKeys[i*n:]
group.filters = t.Filters[i*n:]
}

group.itrs = make([]influxql.Iterator, 0, len(group.keys))
}

// Read series groups in parallel.
Expand Down
61 changes: 61 additions & 0 deletions tsdb/engine/tsm1/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package tsm1

import (
"fmt"
"runtime"
"sort"
"sync"

Expand Down Expand Up @@ -115,6 +116,21 @@ func (c *bufCursor) nextAt(seek int64) interface{} {
// amortize the cost of using a mutex when updating stats.
const statsBufferCopyIntervalN = 100

type floatFinalizerIterator struct {
influxql.FloatIterator
}

func newFloatFinalizerIterator(inner influxql.FloatIterator) *floatFinalizerIterator {
itr := &floatFinalizerIterator{inner}
runtime.SetFinalizer(itr, (*floatFinalizerIterator).Close)
return itr
}

func (itr *floatFinalizerIterator) Close() error {
runtime.SetFinalizer(itr, nil)
return itr.FloatIterator.Close()
}

type floatIterator struct {
cur floatCursor
aux []cursorAt
Expand Down Expand Up @@ -552,6 +568,21 @@ func (c *floatNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EO
func (c *floatNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*float64)(nil) }
func (c *floatNilLiteralCursor) nextAt(seek int64) interface{} { return (*float64)(nil) }

type integerFinalizerIterator struct {
influxql.IntegerIterator
}

func newIntegerFinalizerIterator(inner influxql.IntegerIterator) *integerFinalizerIterator {
itr := &integerFinalizerIterator{inner}
runtime.SetFinalizer(itr, (*integerFinalizerIterator).Close)
return itr
}

func (itr *integerFinalizerIterator) Close() error {
runtime.SetFinalizer(itr, nil)
return itr.IntegerIterator.Close()
}

type integerIterator struct {
cur integerCursor
aux []cursorAt
Expand Down Expand Up @@ -989,6 +1020,21 @@ func (c *integerNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.
func (c *integerNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*int64)(nil) }
func (c *integerNilLiteralCursor) nextAt(seek int64) interface{} { return (*int64)(nil) }

type stringFinalizerIterator struct {
influxql.StringIterator
}

func newStringFinalizerIterator(inner influxql.StringIterator) *stringFinalizerIterator {
itr := &stringFinalizerIterator{inner}
runtime.SetFinalizer(itr, (*stringFinalizerIterator).Close)
return itr
}

func (itr *stringFinalizerIterator) Close() error {
runtime.SetFinalizer(itr, nil)
return itr.StringIterator.Close()
}

type stringIterator struct {
cur stringCursor
aux []cursorAt
Expand Down Expand Up @@ -1426,6 +1472,21 @@ func (c *stringNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.E
func (c *stringNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*string)(nil) }
func (c *stringNilLiteralCursor) nextAt(seek int64) interface{} { return (*string)(nil) }

type booleanFinalizerIterator struct {
influxql.BooleanIterator
}

func newBooleanFinalizerIterator(inner influxql.BooleanIterator) *booleanFinalizerIterator {
itr := &booleanFinalizerIterator{inner}
runtime.SetFinalizer(itr, (*booleanFinalizerIterator).Close)
return itr
}

func (itr *booleanFinalizerIterator) Close() error {
runtime.SetFinalizer(itr, nil)
return itr.BooleanIterator.Close()
}

type booleanIterator struct {
cur booleanCursor
aux []cursorAt
Expand Down
16 changes: 16 additions & 0 deletions tsdb/engine/tsm1/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsm1
import (
"sort"
"fmt"
"runtime"
"sync"

"github.com/influxdata/influxdb/influxql"
Expand Down Expand Up @@ -111,6 +112,21 @@ const statsBufferCopyIntervalN = 100

{{range .}}

type {{.name}}FinalizerIterator struct {
influxql.{{.Name}}Iterator
}

func new{{.Name}}FinalizerIterator(inner influxql.{{.Name}}Iterator) *{{.name}}FinalizerIterator {
itr := &{{.name}}FinalizerIterator{inner}
runtime.SetFinalizer(itr, (*{{.name}}FinalizerIterator).Close)
return itr
}

func (itr *{{.name}}FinalizerIterator) Close() error {
runtime.SetFinalizer(itr, nil)
return itr.{{.Name}}Iterator.Close()
}

type {{.name}}Iterator struct {
cur {{.name}}Cursor
aux []cursorAt
Expand Down
35 changes: 35 additions & 0 deletions tsdb/engine/tsm1/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,38 @@ func (c cursorsAt) close() {
cur.close()
}
}

// newMergeFinalizerIterator creates a new Merge iterator from the inputs. If the call to Merge succeeds,
// the resulting Iterator will be wrapped in a finalizer iterator.
// If Merge returns an error, the inputs will be closed.
func newMergeFinalizerIterator(inputs []influxql.Iterator, opt influxql.IteratorOptions) (influxql.Iterator, error) {
itr, err := influxql.Iterators(inputs).Merge(opt)
if err != nil {
influxql.Iterators(inputs).Close()
return nil, err
}
return newFinalizerIterator(itr), nil
}

// newFinalizerIterator creates a new iterator that installs a runtime finalizer
// to ensure close is eventually called if the iterator is garbage collected.
// This additional guard attempts to protect against clients of CreateIterator not
// correctly closing them and leaking cursors.
func newFinalizerIterator(itr influxql.Iterator) influxql.Iterator {
if itr == nil {
return nil
}

switch inner := itr.(type) {
case influxql.FloatIterator:
return newFloatFinalizerIterator(inner)
case influxql.IntegerIterator:
return newIntegerFinalizerIterator(inner)
case influxql.StringIterator:
return newStringFinalizerIterator(inner)
case influxql.BooleanIterator:
return newBooleanFinalizerIterator(inner)
default:
panic(fmt.Sprintf("unsupported finalizer iterator type: %T", itr))
}
}