From 6461ee2ea8ef79a96a90f9df4b7b1068644985f7 Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Tue, 7 Jun 2022 08:37:00 -0700 Subject: [PATCH] fix: do not rename files on mmap failure (#23396) If NewTSMReader() fails because mmap fails, do not rename the file, because the error is probably caused by vm.max_map_count being too low closes https://github.com/influxdata/influxdb/issues/23172 (cherry picked from commit ec412f793b0fbb0d4c1ee35ebbb1fbbc69baabf0) closes https://github.com/influxdata/influxdb/issues/23413 --- tsdb/engine/tsm1/engine.go | 3 +- tsdb/engine/tsm1/file_store.go | 42 ++++--- tsdb/engine/tsm1/file_store_internal_test.go | 109 +++++++++++++++++++ tsdb/engine/tsm1/file_store_test.go | 38 +++++++ tsdb/engine/tsm1/reader.go | 31 +++++- 5 files changed, 202 insertions(+), 21 deletions(-) create mode 100644 tsdb/engine/tsm1/file_store_internal_test.go diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 7ff8b074729..d2f178c4aff 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -216,12 +216,11 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay) } - fs := NewFileStore(path) + fs := NewFileStore(path, WithMadviseWillNeed(opt.Config.TSMWillNeed)) fs.openLimiter = opt.OpenLimiter if opt.FileStoreObserver != nil { fs.WithObserver(opt.FileStoreObserver) } - fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed cache := NewCache(uint64(opt.Config.CacheMaxMemorySize)) diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index b950c7dd838..2bedecd27f5 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -178,9 +178,8 @@ type FileStore struct { currentGeneration int dir string - files []TSMFile - tsmMMAPWillNeed bool // If true then the kernel will be advised MMAP_WILLNEED for TSM files. - openLimiter limiter.Fixed // limit the number of concurrent opening TSM files. + files []TSMFile + openLimiter limiter.Fixed // limit the number of concurrent opening TSM files. logger *zap.Logger // Logger to be used for important messages traceLogger *zap.Logger // Logger to be used when trace-logging is on. @@ -196,6 +195,8 @@ type FileStore struct { obs tsdb.FileStoreObserver copyFiles bool + + readerOptions []tsmReaderOption } // FileStat holds information about a TSM file on disk. @@ -232,7 +233,7 @@ func (f FileStat) ContainsKey(key []byte) bool { } // NewFileStore returns a new instance of FileStore based on the given directory. -func NewFileStore(dir string) *FileStore { +func NewFileStore(dir string, options ...tsmReaderOption) *FileStore { logger := zap.NewNop() fs := &FileStore{ dir: dir, @@ -248,6 +249,7 @@ func NewFileStore(dir string) *FileStore { obs: noFileStoreObserver{}, parseFileName: DefaultParseFileName, copyFiles: runtime.GOOS == "windows", + readerOptions: options, } fs.purger.fileStore = fs return fs @@ -554,26 +556,36 @@ func (f *FileStore) Open() error { defer f.openLimiter.Release() start := time.Now() - df, err := NewTSMReader(file, WithMadviseWillNeed(f.tsmMMAPWillNeed)) + df, err := NewTSMReader(file, f.readerOptions...) f.logger.Info("Opened file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Duration("duration", time.Since(start))) - // If we are unable to read a TSM file then log the error, rename - // the file, and continue loading the shard without it. + // If we are unable to read a TSM file then log the error. if err != nil { - f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err)) file.Close() - if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil { - f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e)) - readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)} + if errors.Is(err, MmapError{}) { + // An MmapError may indicate we have insufficient + // handles for the mmap call, in which case the file should + // be left untouched, and the vm.max_map_count be raised. + f.logger.Error("Cannot read TSM file, system limit for vm.max_map_count may be too low", + zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err)) + readerC <- &res{r: df, err: fmt.Errorf("cannot read file %s, system limit for vm.max_map_count may be too low: %v", file.Name(), err)} + return + } else { + // If the file is corrupt, rename it and + // continue loading the shard without it. + f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err)) + if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil { + f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e)) + readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)} + return + } + readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %v", file.Name(), err)} return } - readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %v", file.Name(), err)} - return } - df.WithObserver(f.obs) readerC <- &res{r: df} }(i, file) @@ -793,7 +805,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF } } - tsm, err := NewTSMReader(fd, WithMadviseWillNeed(f.tsmMMAPWillNeed)) + tsm, err := NewTSMReader(fd, f.readerOptions...) if err != nil { if newName != oldName { if err1 := os.Rename(newName, oldName); err1 != nil { diff --git a/tsdb/engine/tsm1/file_store_internal_test.go b/tsdb/engine/tsm1/file_store_internal_test.go new file mode 100644 index 00000000000..e5db2d29bff --- /dev/null +++ b/tsdb/engine/tsm1/file_store_internal_test.go @@ -0,0 +1,109 @@ +package tsm1 + +import ( + "github.com/influxdata/influxdb/tsdb" +) + +var TestMmapInitFailOption = func(err error) tsmReaderOption { + return func(r *TSMReader) { + r.accessor = &badBlockAccessor{error: err} + } +} + +type badBlockAccessor struct { + error +} + +func (b *badBlockAccessor) init() (*indirectIndex, error) { + return nil, b.error +} + +func (b *badBlockAccessor) read(key []byte, timestamp int64) ([]Value, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readAll(key []byte) ([]Value, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) rename(path string) error { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) path() string { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) close() error { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) free() error { + //TODO implement me + panic("implement me") +} diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index 71a89a7ed1e..b4386735548 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/tsdb/engine/tsm1" + "github.com/stretchr/testify/assert" ) func TestFileStore_Read(t *testing.T) { @@ -2375,6 +2376,43 @@ func TestFileStore_Open(t *testing.T) { } } +func TestFileStore_OpenFail(t *testing.T) { + var err error + dir := MustTempDir() + defer func() { assert.NoError(t, os.RemoveAll(dir), "failed to remove temporary directory") }() + + // Create a TSM file... + data := keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}} + + files, err := newFileDir(dir, data) + if err != nil { + fatal(t, "creating test files", err) + } + assert.Equal(t, 1, len(files)) + f := files[0] + + const mmapErrMsg = "mmap failure in test" + const fullMmapErrMsg = "system limit for vm.max_map_count may be too low: " + mmapErrMsg + // With an mmap failure, the files should all be left where they are, because they are not corrupt + openFail(t, dir, fullMmapErrMsg, tsm1.NewMmapError(fmt.Errorf(mmapErrMsg))) + assert.FileExistsf(t, f, "file not found, but should not have been moved for mmap failure") + + // With a non-mmap failure, the file failing to open should be moved aside + const otherErrMsg = "some Random Init Failure" + openFail(t, dir, otherErrMsg, fmt.Errorf(otherErrMsg)) + assert.NoFileExistsf(t, f, "file found, but should have been moved for open failure") + assert.FileExistsf(t, f+"."+tsm1.BadTSMFileExtension, "file not found, but should have been moved here for open failure") +} + +func openFail(t *testing.T, dir string, fullErrMsg string, initErr error) { + fs := tsm1.NewFileStore(dir, tsm1.TestMmapInitFailOption(initErr)) + err := fs.Open() + assert.Error(t, err) + assert.Contains(t, err.Error(), fullErrMsg) + defer func() { assert.NoError(t, fs.Close(), "unexpected error on FileStore.Close") }() + assert.Equal(t, 0, fs.Count(), "file count mismatch") +} + func TestFileStore_Remove(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index 733e3f48d9b..a4071eee828 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -218,6 +218,7 @@ var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption { } } +// TODO(DSB) - add a tsmReaderOption in a test call that has the mmmapAccessor mock a failure // NewTSMReader returns a new TSMReader from the given file. func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) { t := &TSMReader{} @@ -231,9 +232,11 @@ func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) { } t.size = stat.Size() t.lastModified = stat.ModTime().UnixNano() - t.accessor = &mmapAccessor{ - f: f, - mmapWillNeed: t.madviseWillNeed, + if t.accessor == nil { + t.accessor = &mmapAccessor{ + f: f, + mmapWillNeed: t.madviseWillNeed, + } } index, err := t.accessor.init() @@ -1341,6 +1344,24 @@ func verifyVersion(r io.Reader) error { return nil } +type MmapError struct { + error +} + +func (m *MmapError) Unwrap() error { + return m.error +} + +func (m MmapError) Is(e error) bool { + _, oks := e.(MmapError) + _, okp := e.(*MmapError) + return oks || okp +} + +func NewMmapError(e error) MmapError { + return MmapError{error: e} +} + func (m *mmapAccessor) init() (*indirectIndex, error) { m.mu.Lock() defer m.mu.Unlock() @@ -1366,7 +1387,9 @@ func (m *mmapAccessor) init() (*indirectIndex, error) { m.b, err = mmap(m.f, 0, int(stat.Size())) if err != nil { - return nil, err + // Wrap the error to let callers know this was an error + // from mmap, and may indicate vm.max_map_count is too low + return nil, NewMmapError(err) } if len(m.b) < 8 { return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")