Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fully clean up partially opened TSI #23430

Merged
merged 3 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/mmap/mmap_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ package mmap
import (
"os"
"syscall"

errors2 "github.com/influxdata/influxdb/pkg/errors"
)

// Map memory-maps a file.
func Map(path string, sz int64) ([]byte, error) {
func Map(path string, sz int64) (data []byte, err error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
defer errors2.Capture(&err, f.Close)()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


fi, err := f.Stat()
if err != nil {
Expand All @@ -32,7 +34,7 @@ func Map(path string, sz int64) ([]byte, error) {
sz = fi.Size()
}

data, err := syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED)
data, err = syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
return nil, err
}
Expand Down
18 changes: 11 additions & 7 deletions tsdb/index/tsi1/file_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,7 @@ func (fs *FileSet) bytes() int {

// Close closes all the files in the file set.
func (fs FileSet) Close() error {
var err error
lesam marked this conversation as resolved.
Show resolved Hide resolved
for _, f := range fs.files {
if e := f.Close(); e != nil && err == nil {
err = e
}
}
return err
return Files(fs.files).Close()
}

// Retain adds a reference count to all files.
Expand Down Expand Up @@ -456,6 +450,16 @@ func (a Files) IDs() []int {
return ids
}

func (a Files) Close() error {
var err error
for _, f := range a {
if e := f.Close(); e != nil && err == nil {
err = e
}
}
return err
}

// fileSetSeriesIDIterator attaches a fileset to an iterator that is released on close.
type fileSetSeriesIDIterator struct {
once sync.Once
Expand Down
45 changes: 23 additions & 22 deletions tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// IndexName is the name of the index.
Expand Down Expand Up @@ -252,7 +253,7 @@ func (i *Index) SeriesIDSet() *tsdb.SeriesIDSet {
}

// Open opens the index.
func (i *Index) Open() error {
func (i *Index) Open() (rErr error) {
i.mu.Lock()
defer i.mu.Unlock()

Expand All @@ -277,33 +278,21 @@ func (i *Index) Open() error {
i.partitions[j] = p
}

defer i.cleanUpFail(&rErr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to defer this only after the Wait on errGroup - otherwise we could accidentally add a return and run cleanUpFail concurrently with the partition opening, which would be bad.

As I understand it, the partition type assumes Close is never called concurrently with Open.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix pushed.


// Open all the Partitions in parallel.
partitionN := len(i.partitions)
n := i.availableThreads()

// Store results.
errC := make(chan error, partitionN)

// Run fn on each partition using a fixed number of goroutines.
var pidx uint32 // Index of maximum Partition being worked on.
for k := 0; k < n; k++ {
go func(k int) {
for {
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
if idx >= partitionN {
return // No more work.
}
err := i.partitions[idx].Open()
errC <- err
}
}(k)
g := new(errgroup.Group)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@lesam lesam Jun 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully agree (edit: with the linked article) but it is nice to use the pattern where it fits.

g.SetLimit(n)
for idx := 0; idx < partitionN; idx++ {
g.Go(i.partitions[idx].Open)
}

// Check for error
for i := 0; i < partitionN; i++ {
if err := <-errC; err != nil {
return err
}
err := g.Wait()
if err != nil {
return err
}

// Refresh cached sketches.
Expand All @@ -319,6 +308,18 @@ func (i *Index) Open() error {
return nil
}

func (i *Index) cleanUpFail(err *error) {
if nil != *err {
for _, p := range i.partitions {
if (p != nil) && p.IsOpen() {
if e := p.Close(); e != nil {
i.logger.Warn("Failed to clean up partition")
}
}
}
}
}

// Compact requests a compaction of partitions.
func (i *Index) Compact() {
i.mu.Lock()
Expand Down
25 changes: 25 additions & 0 deletions tsdb/index/tsi1/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"path"
"path/filepath"
"reflect"
"regexp"
Expand All @@ -14,8 +15,10 @@ import (
"time"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/testing/assert"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
"github.com/stretchr/testify/require"
)

// Bloom filter settings used in tests.
Expand Down Expand Up @@ -208,6 +211,28 @@ func TestIndex_DropMeasurement(t *testing.T) {
})
}

func TestIndex_OpenFail(t *testing.T) {
idx := NewDefaultIndex()
require.NoError(t, idx.Open())
idx.Index.Close()
// mess up the index:
tslPath := path.Join(idx.Index.Path(), "3", "L0-00000001.tsl")
tslFile, err := os.OpenFile(tslPath, os.O_RDWR, 0666)
require.NoError(t, err)
require.NoError(t, tslFile.Truncate(0))
// write poisonous TSL file - first byte doesn't matter, remaining bytes are an invalid uvarint
_, err = tslFile.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff})
require.NoError(t, err)
require.NoError(t, tslFile.Close())
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(idx.Index.Path()))
require.EqualError(t, idx.Index.Open(), "parsing binary-encoded uint64 value failed; binary.Uvarint() returned -11")
// ensure each partition is closed:
for i := 0; i < int(idx.Index.PartitionN); i++ {
assert.Equal(t, idx.Index.PartitionAt(i).FileN(), 0)
}
require.NoError(t, idx.Close())
}

func TestIndex_Open(t *testing.T) {
// Opening a fresh index should set the MANIFEST version to current version.
idx := NewDefaultIndex()
Expand Down
24 changes: 17 additions & 7 deletions tsdb/index/tsi1/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ type Partition struct {
// NewPartition returns a new instance of Partition.
func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
return &Partition{
closing: make(chan struct{}),
path: path,
sfile: sfile,
seriesIDSet: tsdb.NewSeriesIDSet(),

closing: make(chan struct{}),
path: path,
sfile: sfile,
seriesIDSet: tsdb.NewSeriesIDSet(),
fileSet: &FileSet{},
MaxLogFileSize: tsdb.DefaultMaxIndexLogFileSize,
MaxLogFileAge: tsdb.DefaultCompactFullWriteColdDuration,

Expand Down Expand Up @@ -144,7 +144,7 @@ func (p *Partition) bytes() int {
var ErrIncompatibleVersion = errors.New("incompatible tsi1 index MANIFEST")

// Open opens the partition.
func (p *Partition) Open() error {
func (p *Partition) Open() (rErr error) {
p.mu.Lock()
defer p.mu.Unlock()

Expand Down Expand Up @@ -190,6 +190,12 @@ func (p *Partition) Open() error {

// Open each file in the manifest.
var files []File
defer func() {
if rErr != nil {
Files(files).Close()
}
}()

for _, filename := range m.Files {
switch filepath.Ext(filename) {
case LogFileExt:
Expand Down Expand Up @@ -230,7 +236,7 @@ func (p *Partition) Open() error {
}
}

// Build series existance set.
// Build series existence set.
if err := p.buildSeriesSet(); err != nil {
return err
}
Expand All @@ -244,6 +250,10 @@ func (p *Partition) Open() error {
return nil
}

func (p *Partition) IsOpen() bool {
return p.opened
}

// openLogFile opens a log file and appends it to the index.
func (p *Partition) openLogFile(path string) (*LogFile, error) {
f := NewLogFile(p.sfile, path)
Expand Down