diff --git a/pkg/mmap/mmap_unix.go b/pkg/mmap/mmap_unix.go index e4e4cffb501..6dac341481f 100644 --- a/pkg/mmap/mmap_unix.go +++ b/pkg/mmap/mmap_unix.go @@ -10,15 +10,17 @@ package mmap import ( "os" "syscall" + + errors2 "github.com/influxdata/influxdb/pkg/errors" ) // Map memory-maps a file. -func Map(path string, sz int64) ([]byte, error) { +func Map(path string, sz int64) (data []byte, err error) { f, err := os.Open(path) if err != nil { return nil, err } - defer f.Close() + defer errors2.Capture(&err, f.Close)() fi, err := f.Stat() if err != nil { @@ -32,7 +34,7 @@ func Map(path string, sz int64) ([]byte, error) { sz = fi.Size() } - data, err := syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED) + data, err = syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED) if err != nil { return nil, err } diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index 65fdc3b8e01..498af777d9b 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -46,13 +46,7 @@ func (fs *FileSet) bytes() int { // Close closes all the files in the file set. func (fs FileSet) Close() error { - var err error - for _, f := range fs.files { - if e := f.Close(); e != nil && err == nil { - err = e - } - } - return err + return Files(fs.files).Close() } // Retain adds a reference count to all files. @@ -492,6 +486,16 @@ func (a Files) IDs() []int { return ids } +func (a Files) Close() error { + var err error + for _, f := range a { + if e := f.Close(); e != nil && err == nil { + err = e + } + } + return err +} + // fileSetSeriesIDIterator attaches a fileset to an iterator that is released on close. type fileSetSeriesIDIterator struct { once sync.Once diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index c877a9dccd6..4c0b926d7df 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -20,6 +20,7 @@ import ( "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) // IndexName is the name of the index. @@ -241,7 +242,7 @@ func (i *Index) SeriesIDSet() *tsdb.SeriesIDSet { } // Open opens the index. -func (i *Index) Open() error { +func (i *Index) Open() (rErr error) { i.mu.Lock() defer i.mu.Unlock() @@ -269,29 +270,16 @@ func (i *Index) Open() error { partitionN := len(i.partitions) n := i.availableThreads() - // Store results. - errC := make(chan error, partitionN) - // Run fn on each partition using a fixed number of goroutines. - var pidx uint32 // Index of maximum Partition being worked on. - for k := 0; k < n; k++ { - go func(k int) { - for { - idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. - if idx >= partitionN { - return // No more work. - } - err := i.partitions[idx].Open() - errC <- err - } - }(k) + g := new(errgroup.Group) + g.SetLimit(n) + for idx := 0; idx < partitionN; idx++ { + g.Go(i.partitions[idx].Open) } - - // Check for error - for i := 0; i < partitionN; i++ { - if err := <-errC; err != nil { - return err - } + err := g.Wait() + defer i.cleanUpFail(&rErr) + if err != nil { + return err } // Refresh cached sketches. @@ -307,6 +295,18 @@ func (i *Index) Open() error { return nil } +func (i *Index) cleanUpFail(err *error) { + if nil != *err { + for _, p := range i.partitions { + if (p != nil) && p.IsOpen() { + if e := p.Close(); e != nil { + i.logger.Warn("Failed to clean up partition") + } + } + } + } +} + // Compact requests a compaction of partitions. func (i *Index) Compact() { i.mu.Lock() diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index b607be0bc7c..f2e8565085c 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "reflect" "regexp" @@ -13,8 +14,10 @@ import ( "testing" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/testing/assert" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/index/tsi1" + "github.com/stretchr/testify/require" ) // Bloom filter settings used in tests. @@ -207,6 +210,28 @@ func TestIndex_DropMeasurement(t *testing.T) { }) } +func TestIndex_OpenFail(t *testing.T) { + idx := NewDefaultIndex() + require.NoError(t, idx.Open()) + idx.Index.Close() + // mess up the index: + tslPath := path.Join(idx.Index.Path(), "3", "L0-00000001.tsl") + tslFile, err := os.OpenFile(tslPath, os.O_RDWR, 0666) + require.NoError(t, err) + require.NoError(t, tslFile.Truncate(0)) + // write poisonous TSL file - first byte doesn't matter, remaining bytes are an invalid uvarint + _, err = tslFile.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}) + require.NoError(t, err) + require.NoError(t, tslFile.Close()) + idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(idx.Index.Path())) + require.EqualError(t, idx.Index.Open(), "parsing binary-encoded uint64 value failed; binary.Uvarint() returned -11") + // ensure each partition is closed: + for i := 0; i < int(idx.Index.PartitionN); i++ { + assert.Equal(t, idx.Index.PartitionAt(i).FileN(), 0) + } + require.NoError(t, idx.Close()) +} + func TestIndex_Open(t *testing.T) { // Opening a fresh index should set the MANIFEST version to current version. idx := NewDefaultIndex() diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 16b7b730491..dc01568ffec 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -88,11 +88,11 @@ type Partition struct { // NewPartition returns a new instance of Partition. func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition { return &Partition{ - closing: make(chan struct{}), - path: path, - sfile: sfile, - seriesIDSet: tsdb.NewSeriesIDSet(), - + closing: make(chan struct{}), + path: path, + sfile: sfile, + seriesIDSet: tsdb.NewSeriesIDSet(), + fileSet: &FileSet{}, MaxLogFileSize: tsdb.DefaultMaxIndexLogFileSize, // compactionEnabled: true, @@ -141,7 +141,7 @@ func (p *Partition) bytes() int { var ErrIncompatibleVersion = errors.New("incompatible tsi1 index MANIFEST") // Open opens the partition. -func (p *Partition) Open() error { +func (p *Partition) Open() (rErr error) { p.mu.Lock() defer p.mu.Unlock() @@ -187,6 +187,12 @@ func (p *Partition) Open() error { // Open each file in the manifest. var files []File + defer func() { + if rErr != nil { + Files(files).Close() + } + }() + for _, filename := range m.Files { switch filepath.Ext(filename) { case LogFileExt: @@ -231,7 +237,7 @@ func (p *Partition) Open() error { } } - // Build series existance set. + // Build series existence set. if err := p.buildSeriesSet(); err != nil { return err } @@ -245,6 +251,10 @@ func (p *Partition) Open() error { return nil } +func (p *Partition) IsOpen() bool { + return p.opened +} + // openLogFile opens a log file and appends it to the index. func (p *Partition) openLogFile(path string) (*LogFile, error) { f := NewLogFile(p.sfile, path)