From 3c87f524edcf440319c77ae8bbd4d027cec27634 Mon Sep 17 00:00:00 2001 From: WeblWabl Date: Wed, 16 Oct 2024 10:09:15 -0500 Subject: [PATCH] feat(logging): Add startup logging for shard counts (#25378) * feat(tsdb): Adds shard opening progress checks to startup This PR adds a check to see how many shards are remaining vs how many shards are opened. This change displays the percent completed too. closes influxdata/feature-requests#476 --- cmd/influxd/run/command.go | 4 + cmd/influxd/run/server.go | 14 ++ cmd/influxd/run/startup_logger.go | 32 ++++ tsdb/store.go | 297 +++++++++++++++++++----------- tsdb/store_test.go | 293 ++++++++++++++++++++++------- 5 files changed, 462 insertions(+), 178 deletions(-) create mode 100644 cmd/influxd/run/startup_logger.go diff --git a/cmd/influxd/run/command.go b/cmd/influxd/run/command.go index cfb7e67b153..6f30536bf5a 100644 --- a/cmd/influxd/run/command.go +++ b/cmd/influxd/run/command.go @@ -153,6 +153,10 @@ func (cmd *Command) Run(args ...string) error { s.Logger = cmd.Logger s.CPUProfile = options.CPUProfile s.MemProfile = options.MemProfile + + sl := NewStartupProgressLogger(s.Logger) + s.SetStartupMetrics(sl) + if err := s.Open(); err != nil { return fmt.Errorf("open server: %s", err) } diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index e8b747b4659..c0340450271 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -65,6 +65,11 @@ type BuildInfo struct { Time string } +type StartupProgress interface { + AddShard() + CompletedShard() +} + // Server represents a container for the metadata and storage data and services. // It is built using a Config and it manages the startup and shutdown of all // services in the proper order. @@ -96,6 +101,8 @@ type Server struct { Monitor *monitor.Monitor + StartupProgressMetrics StartupProgress + // Server reporting and registration reportingDisabled bool @@ -279,6 +286,10 @@ func (s *Server) SetLogOutput(w io.Writer) { s.MuxLogger = tcp.MuxLogger(w) } +func (s *Server) SetStartupMetrics(sp StartupProgress) { + s.StartupProgressMetrics = sp +} + func (s *Server) appendMonitorService() { s.Services = append(s.Services, s.Monitor) } @@ -465,6 +476,9 @@ func (s *Server) Open() error { s.MetaClient.WithLogger(s.Logger) } s.TSDBStore.WithLogger(s.Logger) + + s.TSDBStore.WithStartupMetrics(s.StartupProgressMetrics) + if s.config.Data.QueryLogEnabled { s.QueryExecutor.WithLogger(s.Logger) } else if s.config.Coordinator.LogQueriesAfter > 0 || s.config.Coordinator.LogTimedOutQueries { diff --git a/cmd/influxd/run/startup_logger.go b/cmd/influxd/run/startup_logger.go new file mode 100644 index 00000000000..e471deff07f --- /dev/null +++ b/cmd/influxd/run/startup_logger.go @@ -0,0 +1,32 @@ +package run + +import ( + "fmt" + "sync/atomic" + + "go.uber.org/zap" +) + +type StartupProgressLogger struct { + shardsCompleted atomic.Uint64 + shardsTotal atomic.Uint64 + logger *zap.Logger +} + +func NewStartupProgressLogger(logger *zap.Logger) *StartupProgressLogger { + return &StartupProgressLogger{ + logger: logger, + } +} + +func (s *StartupProgressLogger) AddShard() { + s.shardsTotal.Add(1) +} + +func (s *StartupProgressLogger) CompletedShard() { + shardsCompleted := s.shardsCompleted.Add(1) + totalShards := s.shardsTotal.Load() + + percentShards := float64(shardsCompleted) / float64(totalShards) * 100 + s.logger.Info(fmt.Sprintf("Finished loading shard, current progress %.1f%% shards (%d / %d).", percentShards, shardsCompleted, totalShards)) +} diff --git a/tsdb/store.go b/tsdb/store.go index 74ad52c36b7..5020c9abdc1 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -54,6 +54,12 @@ const SeriesFileDirectory = "_series" // databaseState keeps track of the state of a database. type databaseState struct{ indexTypes map[string]int } +// struct to hold the result of opening each reader in a goroutine +type shardResponse struct { + s *Shard + err error +} + // addIndexType records that the database has a shard with the given index type. func (d *databaseState) addIndexType(indexType string) { if d.indexTypes == nil { @@ -135,6 +141,11 @@ type Store struct { baseLogger *zap.Logger Logger *zap.Logger + startupProgressMetrics interface { + AddShard() + CompletedShard() + } + closing chan struct{} wg sync.WaitGroup opened bool @@ -167,6 +178,13 @@ func (s *Store) WithLogger(log *zap.Logger) { } } +func (s *Store) WithStartupMetrics(sp interface { + AddShard() + CompletedShard() +}) { + s.startupProgressMetrics = sp +} + // Statistics returns statistics for period monitoring. func (s *Store) Statistics(tags map[string]string) []models.Statistic { s.mu.RLock() @@ -310,12 +328,6 @@ func (s *Store) Open() error { } func (s *Store) loadShards() error { - // res holds the result from opening each shard in a goroutine - type res struct { - s *Shard - err error - } - // Limit the number of concurrent TSM files to be opened to the number of cores. s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0)) @@ -363,9 +375,8 @@ func (s *Store) loadShards() error { log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb_open") defer logEnd() + shardLoaderWg := new(sync.WaitGroup) t := limiter.NewFixed(runtime.GOMAXPROCS(0)) - resC := make(chan *res) - var n int // Determine how many shards we need to open by checking the store path. dbDirs, err := os.ReadDir(s.path) @@ -373,126 +384,155 @@ func (s *Store) loadShards() error { return err } - for _, db := range dbDirs { - dbPath := filepath.Join(s.path, db.Name()) - if !db.IsDir() { - log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory")) - continue - } + walkShardsAndProcess := func(fn func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error) error { + for _, db := range dbDirs { + rpDirs, err := s.getRetentionPolicyDirs(db, log) + if err != nil { + return err + } else if rpDirs == nil { + continue + } - if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) { - log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter")) - continue - } + // Load series file. + sfile, err := s.openSeriesFile(db.Name()) + if err != nil { + return err + } - // Load series file. - sfile, err := s.openSeriesFile(db.Name()) - if err != nil { - return err - } + // Retrieve database index. + idx, err := s.createIndexIfNotExists(db.Name()) + if err != nil { + return err + } - // Retrieve database index. - idx, err := s.createIndexIfNotExists(db.Name()) - if err != nil { - return err + for _, rp := range rpDirs { + shardDirs, err := s.getShards(rp, db, log) + if err != nil { + return err + } else if shardDirs == nil { + continue + } + + for _, sh := range shardDirs { + // Series file should not be in a retention policy but skip just in case. + if sh.Name() == SeriesFileDirectory { + log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name()))) + continue + } + + if err := fn(sfile, idx, sh, db, rp); err != nil { + return err + } + } + } } - // Load each retention policy within the database directory. - rpDirs, err := os.ReadDir(dbPath) + return nil + } + + // We use `rawShardCount` as a buffer size for channel creation below. + // If there is no startupProgressMetrics count then this will be 0 creating a + // zero buffer channel. + rawShardCount := 0 + if s.startupProgressMetrics != nil { + err := walkShardsAndProcess(func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error { + rawShardCount++ + s.startupProgressMetrics.AddShard() + return nil + }) if err != nil { return err } + } - for _, rp := range rpDirs { - rpPath := filepath.Join(s.path, db.Name(), rp.Name()) - if !rp.IsDir() { - log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory")) - continue - } + shardResC := make(chan *shardResponse, rawShardCount) + err = walkShardsAndProcess(func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error { + shardLoaderWg.Add(1) - // The .series directory is not a retention policy. - if rp.Name() == SeriesFileDirectory { - continue - } + go func(db, rp, sh string) { + defer shardLoaderWg.Done() - if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(db.Name(), rp.Name()) { - log.Info("Skipping retention policy dir", logger.RetentionPolicy(rp.Name()), zap.String("reason", "failed retention policy filter")) - continue - } + t.Take() + defer t.Release() + + start := time.Now() + path := filepath.Join(s.path, db, rp, sh) + walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh) - shardDirs, err := os.ReadDir(rpPath) + // Shard file names are numeric shardIDs + shardID, err := strconv.ParseUint(sh, 10, 64) if err != nil { - return err + log.Info("invalid shard ID found at path", zap.String("path", path)) + shardResC <- &shardResponse{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)} + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.CompletedShard() + } + return } - for _, sh := range shardDirs { - // Series file should not be in a retention policy but skip just in case. - if sh.Name() == SeriesFileDirectory { - log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name()))) - continue + if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) { + log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID)) + shardResC <- &shardResponse{} + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.CompletedShard() } + return + } - n++ - go func(db, rp, sh string) { - t.Take() - defer t.Release() + // Copy options and assign shared index. + opt := s.EngineOptions + opt.InmemIndex = idx - start := time.Now() - path := filepath.Join(s.path, db, rp, sh) - walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh) + // Provide an implementation of the ShardIDSets + opt.SeriesIDSets = shardSet{store: s, db: db} - // Shard file names are numeric shardIDs - shardID, err := strconv.ParseUint(sh, 10, 64) - if err != nil { - log.Info("invalid shard ID found at path", zap.String("path", path)) - resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)} - return - } + // Existing shards should continue to use inmem index. + if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) { + opt.IndexVersion = InmemIndexName + } - if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) { - log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID)) - resC <- &res{} - return - } + // Open engine. + shard := NewShard(shardID, path, walPath, sfile, opt) - // Copy options and assign shared index. - opt := s.EngineOptions - opt.InmemIndex = idx + // Disable compactions, writes and queries until all shards are loaded + shard.EnableOnOpen = false + shard.CompactionDisabled = s.EngineOptions.CompactionDisabled + shard.WithLogger(s.baseLogger) - // Provide an implementation of the ShardIDSets - opt.SeriesIDSets = shardSet{store: s, db: db} + err = s.OpenShard(shard, false) + if err != nil { + log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err)) + shardResC <- &shardResponse{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)} + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.CompletedShard() + } + return + } - // Existing shards should continue to use inmem index. - if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) { - opt.IndexVersion = InmemIndexName - } + shardResC <- &shardResponse{s: shard} + log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start))) + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.CompletedShard() + } + }(db.Name(), rp.Name(), sh.Name()) - // Open engine. - shard := NewShard(shardID, path, walPath, sfile, opt) + return nil + }) - // Disable compactions, writes and queries until all shards are loaded - shard.EnableOnOpen = false - shard.CompactionDisabled = s.EngineOptions.CompactionDisabled - shard.WithLogger(s.baseLogger) + if err := s.enableShards(shardLoaderWg, shardResC); err != nil { + return err + } - err = s.OpenShard(shard, false) - if err != nil { - log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err)) - resC <- &res{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)} - return - } + return nil +} - resC <- &res{s: shard} - log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start))) - }(db.Name(), rp.Name(), sh.Name()) - } - } - } +func (s *Store) enableShards(wg *sync.WaitGroup, resC chan *shardResponse) error { + go func() { + wg.Wait() + close(resC) + }() - // Gather results of opening shards concurrently, keeping track of how - // many databases we are managing. - for i := 0; i < n; i++ { - res := <-resC + for res := range resC { if res.s == nil || res.err != nil { continue } @@ -503,7 +543,6 @@ func (s *Store) loadShards() error { } s.databases[res.s.database].addIndexType(res.s.IndexType()) } - close(resC) // Check if any databases are running multiple index types. for db, state := range s.databases { @@ -1182,12 +1221,8 @@ func byIndexType(name string) ShardPredicate { // concurrent use. If any of the functions return an error, the first error is // returned. func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error { - // struct to hold the result of opening each reader in a goroutine - type res struct { - err error - } - resC := make(chan res) + resC := make(chan shardResponse, len(shards)) var n int for _, sh := range shards { @@ -1195,11 +1230,11 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error { go func(sh *Shard) { if err := fn(sh); err != nil { - resC <- res{err: fmt.Errorf("shard %d: %s", sh.id, err)} + resC <- shardResponse{err: fmt.Errorf("shard %d: %s", sh.id, err)} return } - resC <- res{} + resC <- shardResponse{} }(sh) } @@ -2367,3 +2402,49 @@ func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error { } return nil } + +func (s *Store) getRetentionPolicyDirs(db os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) { + dbPath := filepath.Join(s.path, db.Name()) + if !db.IsDir() { + log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory")) + return nil, nil + } + + if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) { + log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter")) + return nil, nil + } + + // Load each retention policy within the database directory. + rpDirs, err := os.ReadDir(dbPath) + if err != nil { + return nil, err + } + + return rpDirs, nil +} + +func (s *Store) getShards(rpDir os.DirEntry, dbDir os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) { + rpPath := filepath.Join(s.path, dbDir.Name(), rpDir.Name()) + if !rpDir.IsDir() { + log.Info("Skipping retention policy dir", zap.String("name", rpDir.Name()), zap.String("reason", "not a directory")) + return nil, nil + } + + // The .series directory is not a retention policy. + if rpDir.Name() == SeriesFileDirectory { + return nil, nil + } + + if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(dbDir.Name(), rpDir.Name()) { + log.Info("Skipping retention policy dir", logger.RetentionPolicy(rpDir.Name()), zap.String("reason", "failed retention policy filter")) + return nil, nil + } + + shardDirs, err := os.ReadDir(rpPath) + if err != nil { + return nil, err + } + + return shardDirs, nil +} diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 3c2010db513..91f2ae85fff 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -5,6 +5,8 @@ import ( "context" "errors" "fmt" + "go.uber.org/zap/zaptest" + "log" "math" "math/rand" "os" @@ -19,7 +21,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/influxdata/influxdb/internal" - "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/deep" "github.com/influxdata/influxdb/pkg/slices" @@ -36,7 +37,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) { t.Parallel() test := func(index string) { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() // Create a new shard and verify that it exists. @@ -87,7 +88,7 @@ func TestStore_DeleteRetentionPolicy(t *testing.T) { } // Reopen other shard and check it still exists. - if err := s.Reopen(); err != nil { + if err := s.Reopen(t); err != nil { t.Error(err) } else if sh := s.Shard(3); sh == nil { t.Errorf("shard 3 does not exist") @@ -112,7 +113,7 @@ func TestStore_CreateShard(t *testing.T) { t.Parallel() test := func(index string) { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() // Create a new shard and verify that it exists. @@ -130,7 +131,7 @@ func TestStore_CreateShard(t *testing.T) { } // Reopen shard and recheck. - if err := s.Reopen(); err != nil { + if err := s.Reopen(t); err != nil { t.Fatal(err) } else if sh := s.Shard(1); sh == nil { t.Fatalf("expected shard(1)") @@ -144,12 +145,102 @@ func TestStore_CreateShard(t *testing.T) { } } +// Ensure the store can create a new shard. +func TestStore_StartupShardProgress(t *testing.T) { + t.Parallel() + + test := func(index string) { + s := MustOpenStore(t, index) + defer s.Close() + + // Create a new shard and verify that it exists. + require.NoError(t, s.CreateShard("db0", "rp0", 1, true)) + sh := s.Shard(1) + require.NotNil(t, sh) + + // Create another shard and verify that it exists. + require.NoError(t, s.CreateShard("db0", "rp0", 2, true)) + sh = s.Shard(2) + require.NotNil(t, sh) + + msl := &mockStartupLogger{} + + // Reopen shard and recheck. + require.NoError(t, s.Reopen(t, WithStartupMetrics(msl))) + + // Equality check to make sure shards are always added prior to + // completion being called. This test opens 3 total shards - 1 shard + // fails, but we still want to track that it was attempted to be opened. + require.Equal(t, msl.shardTracker, []string{ + "shard-add", + "shard-add", + "shard-complete", + "shard-complete", + }) + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) + } +} + +// Introduces a test to ensure that shard loading still accounts for bad shards. We still want these to show up +// during the initial shard loading even though its in a error state. +func TestStore_BadShardLoading(t *testing.T) { + t.Parallel() + + test := func(index string) { + s := MustOpenStore(t, index) + defer s.Close() + + // Create a new shard and verify that it exists. + require.NoError(t, s.CreateShard("db0", "rp0", 1, true)) + sh := s.Shard(1) + require.NotNil(t, sh) + + // Create another shard and verify that it exists. + require.NoError(t, s.CreateShard("db0", "rp0", 2, true)) + sh = s.Shard(2) + require.NotNil(t, sh) + + // Create another shard and verify that it exists. + require.NoError(t, s.CreateShard("db0", "rp0", 3, true)) + sh = s.Shard(3) + require.NotNil(t, sh) + + s.SetShardOpenErrorForTest(sh.ID(), errors.New("a shard opening error occurred")) + err2 := s.OpenShard(s.Shard(sh.ID()), false) + require.Error(t, err2, "no error opening bad shard") + + msl := &mockStartupLogger{} + + // Reopen shard and recheck. + require.NoError(t, s.Reopen(t, WithStartupMetrics(msl))) + + // Equality check to make sure shards are always added prior to + // completion being called. This test opens 3 total shards - 1 shard + // fails, but we still want to track that it was attempted to be opened. + require.Equal(t, msl.shardTracker, []string{ + "shard-add", + "shard-add", + "shard-add", + "shard-complete", + "shard-complete", + "shard-complete", + }) + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) + } +} + func TestStore_BadShard(t *testing.T) { const errStr = "a shard open error" indexes := tsdb.RegisteredIndexes() for _, idx := range indexes { func() { - s := MustOpenStore(idx) + s := MustOpenStore(t, idx) defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx) sh := tsdb.NewTempShard(idx) @@ -175,7 +266,7 @@ func TestStore_CreateMixedShards(t *testing.T) { t.Parallel() test := func(index1 string, index2 string) { - s := MustOpenStore(index1) + s := MustOpenStore(t, index1) defer s.Close() // Create a new shard and verify that it exists. @@ -187,7 +278,7 @@ func TestStore_CreateMixedShards(t *testing.T) { s.EngineOptions.IndexVersion = index2 s.index = index2 - if err := s.Reopen(); err != nil { + if err := s.Reopen(t); err != nil { t.Fatal(err) } @@ -199,7 +290,7 @@ func TestStore_CreateMixedShards(t *testing.T) { } // Reopen shard and recheck. - if err := s.Reopen(); err != nil { + if err := s.Reopen(t); err != nil { t.Fatal(err) } else if sh := s.Shard(1); sh == nil { t.Fatalf("expected shard(1)") @@ -231,7 +322,7 @@ func TestStore_DropMeasurementMixedShards(t *testing.T) { t.Parallel() test := func(index1 string, index2 string) { - s := MustOpenStore(index1) + s := MustOpenStore(t, index1) defer s.Close() if err := s.CreateShard("db0", "rp0", 1, true); err != nil { @@ -242,7 +333,7 @@ func TestStore_DropMeasurementMixedShards(t *testing.T) { s.EngineOptions.IndexVersion = index2 s.index = index2 - if err := s.Reopen(); err != nil { + if err := s.Reopen(t); err != nil { t.Fatal(err) } @@ -276,7 +367,7 @@ func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) { t.Parallel() test := func(index string) { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() if err := s.CreateShard("db0", "rp0", 1, true); err != nil { @@ -340,7 +431,7 @@ func TestStore_WriteMixedShards(t *testing.T) { t.Parallel() test := func(index1 string, index2 string) { - s := MustOpenStore(index1) + s := MustOpenStore(t, index1) defer s.Close() if err := s.CreateShard("db0", "rp0", 1, true); err != nil { @@ -351,7 +442,7 @@ func TestStore_WriteMixedShards(t *testing.T) { s.EngineOptions.IndexVersion = index2 s.index = index2 - if err := s.Reopen(); err != nil { + if err := s.Reopen(t); err != nil { t.Fatal(err) } @@ -413,7 +504,7 @@ func TestStore_DeleteSeries_NonExistentDB(t *testing.T) { t.Parallel() test := func(index string) { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() if err := s.DeleteSeries("db0", nil, nil); err != nil { @@ -431,7 +522,7 @@ func TestStore_DeleteSeries_MultipleSources(t *testing.T) { t.Parallel() test := func(index string) { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() if err := s.CreateShard("db0", "rp0", 1, true); err != nil { @@ -461,7 +552,7 @@ func TestStore_DeleteShard(t *testing.T) { t.Parallel() test := func(index string) error { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() // Create a new shard and verify that it exists. @@ -493,7 +584,7 @@ func TestStore_DeleteShard(t *testing.T) { s.MustWriteToShardString(3, "cpu,serverb=b v=1") // Reopen the store and check all shards still exist - if err := s.Reopen(); err != nil { + if err := s.Reopen(t); err != nil { return err } for i := uint64(1); i <= 3; i++ { @@ -550,7 +641,7 @@ func TestStore_CreateShardSnapShot(t *testing.T) { t.Parallel() test := func(index string) { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() // Create a new shard and verify that it exists. @@ -578,7 +669,7 @@ func TestStore_Open(t *testing.T) { t.Parallel() test := func(index string) { - s := NewStore(index) + s := NewStore(t, index) defer s.Close() if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil { @@ -621,9 +712,14 @@ func TestStore_Open_InvalidDatabaseFile(t *testing.T) { t.Parallel() test := func(index string) { - s := NewStore(index) + s := NewStore(t, index) defer s.Close() + // Ensure the directory exists before creating the file. + if err := os.MkdirAll(s.Path(), 0777); err != nil { + t.Fatal(err) + } + // Create a file instead of a directory for a database. if _, err := os.Create(filepath.Join(s.Path(), "db0")); err != nil { t.Fatal(err) @@ -647,7 +743,7 @@ func TestStore_Open_InvalidRetentionPolicy(t *testing.T) { t.Parallel() test := func(index string) { - s := NewStore(index) + s := NewStore(t, index) defer s.Close() // Create an RP file instead of a directory. @@ -677,7 +773,7 @@ func TestStore_Open_InvalidShard(t *testing.T) { t.Parallel() test := func(index string) { - s := NewStore(index) + s := NewStore(t, index) defer s.Close() // Create a non-numeric shard file. @@ -707,7 +803,7 @@ func TestShards_CreateIterator(t *testing.T) { t.Parallel() test := func(index string) { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() // Create shard #0 with data. @@ -792,7 +888,7 @@ func TestStore_NewReadersBlocked(t *testing.T) { test := func(index string) { t.Helper() - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() shardInUse := func(shardID uint64) bool { @@ -865,7 +961,7 @@ func TestStore_NewReadersBlocked(t *testing.T) { // Ensure the store can backup a shard and another store can restore it. func TestStore_BackupRestoreShard(t *testing.T) { test := func(index string) { - s0, s1 := MustOpenStore(index), MustOpenStore(index) + s0, s1 := MustOpenStore(t, index), MustOpenStore(t, index) defer s0.Close() defer s1.Close() @@ -876,7 +972,7 @@ func TestStore_BackupRestoreShard(t *testing.T) { `cpu value=3 20`, ) - if err := s0.Reopen(); err != nil { + if err := s0.Reopen(t); err != nil { t.Fatal(err) } @@ -946,7 +1042,7 @@ func TestStore_Shard_SeriesN(t *testing.T) { t.Parallel() test := func(index string) error { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() // Create shard with data. @@ -982,7 +1078,7 @@ func TestStore_MeasurementNames_Deduplicate(t *testing.T) { t.Parallel() test := func(index string) { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() // Create shard with data. @@ -1083,7 +1179,7 @@ func TestStore_Cardinality_Tombstoning(t *testing.T) { } test := func(index string) { - store := NewStore(index) + store := NewStore(t, index) if err := store.Open(); err != nil { panic(err) } @@ -1148,7 +1244,7 @@ func TestStore_Cardinality_Unique(t *testing.T) { } test := func(index string) { - store := NewStore(index) + store := NewStore(t, index) store.EngineOptions.Config.MaxSeriesPerDatabase = 0 if err := store.Open(); err != nil { panic(err) @@ -1230,7 +1326,7 @@ func TestStore_Cardinality_Duplicates(t *testing.T) { } test := func(index string) { - store := NewStore(index) + store := NewStore(t, index) store.EngineOptions.Config.MaxSeriesPerDatabase = 0 if err := store.Open(); err != nil { panic(err) @@ -1250,7 +1346,7 @@ func TestStore_MetaQuery_Timeout(t *testing.T) { } test := func(index string) { - store := NewStore(index) + store := NewStore(t, index) store.EngineOptions.Config.MaxSeriesPerDatabase = 0 if err := store.Open(); err != nil { panic(err) @@ -1442,7 +1538,7 @@ func TestStore_Cardinality_Compactions(t *testing.T) { } test := func(index string) error { - store := NewStore(index) + store := NewStore(t, index) store.EngineOptions.Config.MaxSeriesPerDatabase = 0 if err := store.Open(); err != nil { panic(err) @@ -1467,7 +1563,7 @@ func TestStore_Cardinality_Limit_On_InMem_Index(t *testing.T) { t.Skip("Skipping test in short, race and appveyor mode.") } - store := NewStore("inmem") + store := NewStore(t, "inmem") store.EngineOptions.Config.MaxSeriesPerDatabase = 100000 if err := store.Open(); err != nil { panic(err) @@ -1581,7 +1677,7 @@ func TestStore_Sketches(t *testing.T) { } test := func(index string) error { - store := MustOpenStore(index) + store := MustOpenStore(t, index) defer store.Close() // Generate point data to write to the shards. @@ -1610,7 +1706,7 @@ func TestStore_Sketches(t *testing.T) { } // Reopen the store. - if err := store.Reopen(); err != nil { + if err := store.Reopen(t); err != nil { return err } @@ -1643,7 +1739,7 @@ func TestStore_Sketches(t *testing.T) { } // Reopen the store. - if err := store.Reopen(); err != nil { + if err := store.Reopen(t); err != nil { return err } @@ -1779,7 +1875,7 @@ func TestStore_TagValues(t *testing.T) { } setup := func(index string) (*Store, []uint64) { // returns shard ids - s := MustOpenStore(index) + s := MustOpenStore(t, index) fmtStr := `cpu1%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d cpu1%[1]d,host=nofoo value=1 %[4]d @@ -1830,7 +1926,7 @@ func TestStore_Measurements_Auth(t *testing.T) { t.Parallel() test := func(index string) error { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() // Create shard #0 with data. @@ -1919,7 +2015,7 @@ func TestStore_TagKeys_Auth(t *testing.T) { t.Parallel() test := func(index string) error { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() // Create shard #0 with data. @@ -2017,7 +2113,7 @@ func TestStore_TagValues_Auth(t *testing.T) { t.Parallel() test := func(index string) error { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() // Create shard #0 with data. @@ -2148,7 +2244,7 @@ func createTagValues(mname string, kvs map[string][]string) tsdb.TagValues { func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() shardN := 10 @@ -2233,7 +2329,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) { func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() shardN := 10 @@ -2324,7 +2420,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) { func TestStore_TagValues_ConcurrentDropShard(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { - s := MustOpenStore(index) + s := MustOpenStore(t, index) defer s.Close() shardN := 10 @@ -2428,7 +2524,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) { func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) { for _, index := range tsdb.RegisteredIndexes() { - store := NewStore(index) + store := NewStore(b, index) if err := store.Open(); err != nil { panic(err) } @@ -2459,7 +2555,7 @@ func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen( func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) { var store *Store setup := func(index string) error { - store := MustOpenStore(index) + store := MustOpenStore(b, index) // Generate test series (measurements + unique tag sets). series := genTestSeries(mCnt, tkCnt, tvCnt) @@ -2530,7 +2626,7 @@ func BenchmarkStore_TagValues(b *testing.B) { } setup := func(shards, measurements, tagValues int, index string, useRandom bool) (*Store, []uint64) { // returns shard ids - s := NewStore(index) + s := NewStore(b, index) if err := s.Open(); err != nil { panic(err) } @@ -2632,23 +2728,46 @@ func BenchmarkStore_TagValues(b *testing.B) { // Store is a test wrapper for tsdb.Store. type Store struct { *tsdb.Store - index string + path string + index string + walPath string + opts []StoreOption } -// NewStore returns a new instance of Store with a temporary path. -func NewStore(index string) *Store { - path, err := os.MkdirTemp("", "influxdb-tsdb-") - if err != nil { - panic(err) +type StoreOption func(s *Store) error + +func WithStartupMetrics(sm *mockStartupLogger) StoreOption { + return func(s *Store) error { + s.WithStartupMetrics(sm) + return nil } +} - s := &Store{Store: tsdb.NewStore(path), index: index} +// NewStore returns a new instance of Store with a temporary path. +func NewStore(tb testing.TB, index string, opts ...StoreOption) *Store { + tb.Helper() + + // The WAL directory must not be rooted under the data path. Otherwise reopening + // the store will generate series indices for the WAL directories. + rootPath := tb.TempDir() + path := filepath.Join(rootPath, "data") + walPath := filepath.Join(rootPath, "wal") + + s := &Store{ + Store: tsdb.NewStore(path), + path: path, + index: index, + walPath: walPath, + opts: opts, + } s.EngineOptions.IndexVersion = index - s.EngineOptions.Config.WALDir = filepath.Join(path, "wal") + s.EngineOptions.Config.WALDir = walPath s.EngineOptions.Config.TraceLoggingEnabled = true + s.WithLogger(zaptest.NewLogger(tb)) - if testing.Verbose() { - s.WithLogger(logger.New(os.Stdout)) + for _, o := range s.opts { + err := o(s) + require.NoError(tb, err) } return s @@ -2656,8 +2775,8 @@ func NewStore(index string) *Store { // MustOpenStore returns a new, open Store using the specified index, // at a temporary path. -func MustOpenStore(index string) *Store { - s := NewStore(index) +func MustOpenStore(tb testing.TB, index string, opts ...StoreOption) *Store { + s := NewStore(tb, index, opts...) if err := s.Open(); err != nil { panic(err) @@ -2665,27 +2784,44 @@ func MustOpenStore(index string) *Store { return s } -// Reopen closes and reopens the store as a new store. -func (s *Store) Reopen() error { - if err := s.Store.Close(); err != nil { - return err +func (s *Store) Reopen(tb testing.TB, newOpts ...StoreOption) error { + tb.Helper() + + if s.Store != nil { + if err := s.Store.Close(); err != nil { + return err + } } - s.Store = tsdb.NewStore(s.Path()) + s.Store = tsdb.NewStore(s.path) s.EngineOptions.IndexVersion = s.index - s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal") + s.EngineOptions.Config.WALDir = s.walPath s.EngineOptions.Config.TraceLoggingEnabled = true + s.WithLogger(zaptest.NewLogger(tb)) + if len(newOpts) > 0 { + s.opts = newOpts + } - if testing.Verbose() { - s.WithLogger(logger.New(os.Stdout)) + for _, o := range s.opts { + err := o(s) + require.NoError(tb, err) } + return s.Store.Open() } // Close closes the store and removes the underlying data. func (s *Store) Close() error { - defer os.RemoveAll(s.Path()) - return s.Store.Close() + defer func(path string) { + err := os.RemoveAll(path) + if err != nil { + log.Fatal(err) + } + }(s.path) + if s.Store != nil { + return s.Store.Close() + } + return nil } // MustCreateShardWithData creates a shard and writes line protocol data to it. @@ -2754,3 +2890,20 @@ func dirExists(path string) bool { } return !os.IsNotExist(err) } + +type mockStartupLogger struct { + shardTracker []string + mu sync.Mutex +} + +func (m *mockStartupLogger) AddShard() { + m.mu.Lock() + m.shardTracker = append(m.shardTracker, fmt.Sprintf("shard-add")) + m.mu.Unlock() +} + +func (m *mockStartupLogger) CompletedShard() { + m.mu.Lock() + m.shardTracker = append(m.shardTracker, fmt.Sprintf("shard-complete")) + m.mu.Unlock() +}