Skip to content

Commit

Permalink
fix: fully clean up partially opened TSI (influxdata#23430)
Browse files Browse the repository at this point in the history
When one partition in a TSI fails to open, all previously opened
partitions should be cleaned up, and remaining partitions 
should not be opened

closes influxdata#23427
  • Loading branch information
davidby-influx authored and chengshiwen committed Aug 27, 2024
1 parent 006d18b commit ae4ef4f
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 39 deletions.
8 changes: 5 additions & 3 deletions pkg/mmap/mmap_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ package mmap
import (
"os"
"syscall"

errors2 "github.com/influxdata/influxdb/pkg/errors"
)

// Map memory-maps a file.
func Map(path string, sz int64) ([]byte, error) {
func Map(path string, sz int64) (data []byte, err error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
defer errors2.Capture(&err, f.Close)()

fi, err := f.Stat()
if err != nil {
Expand All @@ -32,7 +34,7 @@ func Map(path string, sz int64) ([]byte, error) {
sz = fi.Size()
}

data, err := syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED)
data, err = syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
return nil, err
}
Expand Down
18 changes: 11 additions & 7 deletions tsdb/index/tsi1/file_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,7 @@ func (fs *FileSet) bytes() int {

// Close closes all the files in the file set.
func (fs FileSet) Close() error {
var err error
for _, f := range fs.files {
if e := f.Close(); e != nil && err == nil {
err = e
}
}
return err
return Files(fs.files).Close()
}

// Retain adds a reference count to all files.
Expand Down Expand Up @@ -492,6 +486,16 @@ func (a Files) IDs() []int {
return ids
}

func (a Files) Close() error {
var err error
for _, f := range a {
if e := f.Close(); e != nil && err == nil {
err = e
}
}
return err
}

// fileSetSeriesIDIterator attaches a fileset to an iterator that is released on close.
type fileSetSeriesIDIterator struct {
once sync.Once
Expand Down
44 changes: 22 additions & 22 deletions tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// IndexName is the name of the index.
Expand Down Expand Up @@ -241,7 +242,7 @@ func (i *Index) SeriesIDSet() *tsdb.SeriesIDSet {
}

// Open opens the index.
func (i *Index) Open() error {
func (i *Index) Open() (rErr error) {
i.mu.Lock()
defer i.mu.Unlock()

Expand Down Expand Up @@ -269,29 +270,16 @@ func (i *Index) Open() error {
partitionN := len(i.partitions)
n := i.availableThreads()

// Store results.
errC := make(chan error, partitionN)

// Run fn on each partition using a fixed number of goroutines.
var pidx uint32 // Index of maximum Partition being worked on.
for k := 0; k < n; k++ {
go func(k int) {
for {
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
if idx >= partitionN {
return // No more work.
}
err := i.partitions[idx].Open()
errC <- err
}
}(k)
g := new(errgroup.Group)
g.SetLimit(n)
for idx := 0; idx < partitionN; idx++ {
g.Go(i.partitions[idx].Open)
}

// Check for error
for i := 0; i < partitionN; i++ {
if err := <-errC; err != nil {
return err
}
err := g.Wait()
defer i.cleanUpFail(&rErr)
if err != nil {
return err
}

// Refresh cached sketches.
Expand All @@ -307,6 +295,18 @@ func (i *Index) Open() error {
return nil
}

func (i *Index) cleanUpFail(err *error) {
if nil != *err {
for _, p := range i.partitions {
if (p != nil) && p.IsOpen() {
if e := p.Close(); e != nil {
i.logger.Warn("Failed to clean up partition")
}
}
}
}
}

// Compact requests a compaction of partitions.
func (i *Index) Compact() {
i.mu.Lock()
Expand Down
25 changes: 25 additions & 0 deletions tsdb/index/tsi1/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"path"
"path/filepath"
"reflect"
"regexp"
Expand All @@ -13,8 +14,10 @@ import (
"testing"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/testing/assert"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
"github.com/stretchr/testify/require"
)

// Bloom filter settings used in tests.
Expand Down Expand Up @@ -207,6 +210,28 @@ func TestIndex_DropMeasurement(t *testing.T) {
})
}

func TestIndex_OpenFail(t *testing.T) {
idx := NewDefaultIndex()
require.NoError(t, idx.Open())
idx.Index.Close()
// mess up the index:
tslPath := path.Join(idx.Index.Path(), "3", "L0-00000001.tsl")
tslFile, err := os.OpenFile(tslPath, os.O_RDWR, 0666)
require.NoError(t, err)
require.NoError(t, tslFile.Truncate(0))
// write poisonous TSL file - first byte doesn't matter, remaining bytes are an invalid uvarint
_, err = tslFile.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff})
require.NoError(t, err)
require.NoError(t, tslFile.Close())
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(idx.Index.Path()))
require.EqualError(t, idx.Index.Open(), "parsing binary-encoded uint64 value failed; binary.Uvarint() returned -11")
// ensure each partition is closed:
for i := 0; i < int(idx.Index.PartitionN); i++ {
assert.Equal(t, idx.Index.PartitionAt(i).FileN(), 0)
}
require.NoError(t, idx.Close())
}

func TestIndex_Open(t *testing.T) {
// Opening a fresh index should set the MANIFEST version to current version.
idx := NewDefaultIndex()
Expand Down
24 changes: 17 additions & 7 deletions tsdb/index/tsi1/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ type Partition struct {
// NewPartition returns a new instance of Partition.
func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
return &Partition{
closing: make(chan struct{}),
path: path,
sfile: sfile,
seriesIDSet: tsdb.NewSeriesIDSet(),

closing: make(chan struct{}),
path: path,
sfile: sfile,
seriesIDSet: tsdb.NewSeriesIDSet(),
fileSet: &FileSet{},
MaxLogFileSize: tsdb.DefaultMaxIndexLogFileSize,

// compactionEnabled: true,
Expand Down Expand Up @@ -141,7 +141,7 @@ func (p *Partition) bytes() int {
var ErrIncompatibleVersion = errors.New("incompatible tsi1 index MANIFEST")

// Open opens the partition.
func (p *Partition) Open() error {
func (p *Partition) Open() (rErr error) {
p.mu.Lock()
defer p.mu.Unlock()

Expand Down Expand Up @@ -187,6 +187,12 @@ func (p *Partition) Open() error {

// Open each file in the manifest.
var files []File
defer func() {
if rErr != nil {
Files(files).Close()
}
}()

for _, filename := range m.Files {
switch filepath.Ext(filename) {
case LogFileExt:
Expand Down Expand Up @@ -231,7 +237,7 @@ func (p *Partition) Open() error {
}
}

// Build series existance set.
// Build series existence set.
if err := p.buildSeriesSet(); err != nil {
return err
}
Expand All @@ -245,6 +251,10 @@ func (p *Partition) Open() error {
return nil
}

func (p *Partition) IsOpen() bool {
return p.opened
}

// openLogFile opens a log file and appends it to the index.
func (p *Partition) openLogFile(path string) (*LogFile, error) {
f := NewLogFile(p.sfile, path)
Expand Down

0 comments on commit ae4ef4f

Please sign in to comment.