Skip to content

Commit

Permalink
fix: copy names from mmapped memory before closing iterator (influxda…
Browse files Browse the repository at this point in the history
…ta#22040)

This fix ensures that memory-mapped files are not released
before pointers into them are copied into heap memory.
MeasurementNamesByExpr() and MeasurementNamesByPredicate() can
cause panics by copying memory from mmapped files that have been
released. The functions they call use iterators to files which
are closed (releasing the mmapped files) before the memory is
safely copied to the heap.

closes influxdata#22000
  • Loading branch information
davidby-influx authored and chengshiwen committed Aug 27, 2024
1 parent 80c8f58 commit 0370f63
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ v1.8.11 [unreleased]
- [#21666](https://github.com/influxdata/influxdb/pull/21666): fix: do not panic on cleaning up failed iterators
- [#21792](https://github.com/influxdata/influxdb/pull/21792): fix: error instead of panic for statement rewrite failure
- [#21795](https://github.com/influxdata/influxdb/pull/21795): fix: show shards gives empty expiry time for inf duration shards
- [#22040](https://github.com/influxdata/influxdb/pull/22040): fix: copy names from mmapped memory before closing iterator

v1.8.10 [2021-10-11]
-------------------
Expand Down
129 changes: 101 additions & 28 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,11 @@ func (a MeasurementIterators) Close() (err error) {
return err
}

type MeasurementSliceIterator interface {
MeasurementIterator
UnderlyingSlice() [][]byte
}

type measurementSliceIterator struct {
names [][]byte
}
Expand All @@ -962,6 +967,37 @@ func (itr *measurementSliceIterator) Next() (name []byte, err error) {
return name, nil
}

func (itr *measurementSliceIterator) UnderlyingSlice() [][]byte {
return itr.names
}

// fileMeasurementSliceIterator is designed to allow a tag value slice
// iterator to use memory from a memory-mapped file, pinning it
// with the underlying file iterators
type fileMeasurementSliceIterator struct {
measurementSliceIterator
fileIterators MeasurementIterators
}

func (itr *fileMeasurementSliceIterator) Close() error {
e1 := itr.fileIterators.Close()
e2 := itr.measurementSliceIterator.Close()
if e1 != nil {
return e1
} else {
return e2
}
}

func newFileMeasurementSliceIterator(names [][]byte, itrs MeasurementIterators) *fileMeasurementSliceIterator {
return &fileMeasurementSliceIterator{
measurementSliceIterator: measurementSliceIterator{
names: names,
},
fileIterators: itrs,
}
}

// MergeMeasurementIterators returns an iterator that merges a set of iterators.
// Iterators that are first in the list take precedence and a deletion by those
// early iterators will invalidate elements by later iterators.
Expand Down Expand Up @@ -1314,17 +1350,24 @@ func (is IndexSet) DedupeInmemIndexes() IndexSet {

// MeasurementNamesByExpr returns a slice of measurement names matching the
// provided condition. If no condition is provided then all names are returned.
func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) (_ [][]byte, err error) {
release := is.SeriesFile.Retain()
defer release()

// Return filtered list if expression exists.
if expr != nil {
names, err := is.measurementNamesByExpr(auth, expr)
itr, err := is.measurementNamesByExpr(auth, expr)
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
return slices.CopyChunkedByteSlices(names, 1000), nil
defer func() {
if e := itr.Close(); err == nil {
err = e
}
}()
return slices.CopyChunkedByteSlices(itr.UnderlyingSlice(), 1000), nil
}

itr, err := is.measurementIterator()
Expand All @@ -1333,10 +1376,14 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influx
} else if itr == nil {
return nil, nil
}
defer itr.Close()
defer func() {
if e := itr.Close(); err == nil {
err = e
}
}()

// Iterate over all measurements if no condition exists.
var names [][]byte
// Iterate over all measurements if no condition exists.
for {
e, err := itr.Next()
if err != nil {
Expand All @@ -1354,7 +1401,7 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influx
return slices.CopyChunkedByteSlices(names, 1000), nil
}

func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) (MeasurementSliceIterator, error) {
if expr == nil {
return nil, nil
}
Expand Down Expand Up @@ -1394,20 +1441,22 @@ func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influx
return is.measurementNamesByTagFilter(auth, e.Op, tag.Val, value, regex)

case influxql.OR, influxql.AND:

lhs, err := is.measurementNamesByExpr(auth, e.LHS)
if err != nil {
return nil, err
}

rhs, err := is.measurementNamesByExpr(auth, e.RHS)
if err != nil {
lhs.Close()
return nil, err
}

mis := MeasurementIterators{lhs, rhs}
if e.Op == influxql.OR {
return bytesutil.Union(lhs, rhs), nil
return newFileMeasurementSliceIterator(bytesutil.Union(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil
}
return bytesutil.Intersect(lhs, rhs), nil
return newFileMeasurementSliceIterator(bytesutil.Intersect(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil

default:
return nil, fmt.Errorf("invalid tag comparison operator")
Expand All @@ -1416,24 +1465,24 @@ func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influx
case *influxql.ParenExpr:
return is.measurementNamesByExpr(auth, e.Expr)
default:
return nil, fmt.Errorf("%#v", expr)
return nil, fmt.Errorf("invalid measurement expression %#v", expr)
}
}

// measurementNamesByNameFilter returns matching measurement names in sorted order.
func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) {
func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op influxql.Token, val string, regex *regexp.Regexp) (MeasurementSliceIterator, error) {
itr, err := is.measurementIterator()
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
defer itr.Close()

var names [][]byte
for {
e, err := itr.Next()
if err != nil {
itr.Close()
return nil, err
} else if e == nil {
break
Expand All @@ -1456,24 +1505,31 @@ func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op in
}
}
bytesutil.Sort(names)
return names, nil
return newFileMeasurementSliceIterator(names, MeasurementIterators{itr}), nil
}

// MeasurementNamesByPredicate returns a slice of measurement names matching the
// provided condition. If no condition is provided then all names are returned.
// This behaves differently from MeasurementNamesByExpr because it will
// return measurements using flux predicates.
func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) (_ [][]byte, err error) {
release := is.SeriesFile.Retain()
defer release()

// Return filtered list if expression exists.
if expr != nil {
names, err := is.measurementNamesByPredicate(auth, expr)
itr, err := is.measurementNamesByPredicate(auth, expr)
if err != nil {
return nil, err
}
return slices.CopyChunkedByteSlices(names, 1000), nil
if itr != nil {
defer func() {
if e := itr.Close(); err == nil {
err = e
}
}()
}
return slices.CopyChunkedByteSlices(itr.UnderlyingSlice(), 1000), nil
}

itr, err := is.measurementIterator()
Expand All @@ -1482,10 +1538,14 @@ func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr i
} else if itr == nil {
return nil, nil
}
defer itr.Close()
defer func() {
if e := itr.Close(); err == nil {
err = e
}
}()

// Iterate over all measurements if no condition exists.
var names [][]byte
// Iterate over all measurements if no condition exists.
for {
e, err := itr.Next()
if err != nil {
Expand All @@ -1503,7 +1563,7 @@ func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr i
return slices.CopyChunkedByteSlices(names, 1000), nil
}

func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) (MeasurementSliceIterator, error) {
if expr == nil {
return nil, nil
}
Expand Down Expand Up @@ -1547,16 +1607,17 @@ func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr i
if err != nil {
return nil, err
}

rhs, err := is.measurementNamesByPredicate(auth, e.RHS)
if err != nil {
lhs.Close()
return nil, err
}
mis := MeasurementIterators{lhs, rhs}

if e.Op == influxql.OR {
return bytesutil.Union(lhs, rhs), nil
return newFileMeasurementSliceIterator(bytesutil.Union(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil
}
return bytesutil.Intersect(lhs, rhs), nil
return newFileMeasurementSliceIterator(bytesutil.Intersect(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil

default:
return nil, fmt.Errorf("invalid tag comparison operator")
Expand All @@ -1569,16 +1630,21 @@ func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr i
}
}

func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) (MeasurementSliceIterator, error) {
var names [][]byte
failed := true

mitr, err := is.measurementIterator()
if err != nil {
return nil, err
} else if mitr == nil {
return nil, nil
}
defer mitr.Close()
defer func() {
if failed {
mitr.Close()
}
}()

// valEqual determines if the provided []byte is equal to the tag value
// to be filtered on.
Expand Down Expand Up @@ -1693,19 +1759,25 @@ func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op inf
}

bytesutil.Sort(names)
return names, nil
failed = false
return newFileMeasurementSliceIterator(names, MeasurementIterators{mitr}), nil
}

func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) (MeasurementSliceIterator, error) {
var names [][]byte
failed := true

mitr, err := is.measurementIterator()
if err != nil {
return nil, err
} else if mitr == nil {
return nil, nil
}
defer mitr.Close()
defer func() {
if failed {
mitr.Close()
}
}()

var checkMeasurement func(auth query.FineAuthorizer, me []byte) (bool, error)
switch op {
Expand Down Expand Up @@ -1756,7 +1828,8 @@ func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op
}

bytesutil.Sort(names)
return names, nil
failed = false
return newFileMeasurementSliceIterator(names, MeasurementIterators{mitr}), nil
}

// measurementAuthorizedSeries determines if the measurement contains a series
Expand Down

0 comments on commit 0370f63

Please sign in to comment.