diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index c123b9d..d285597 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -591,7 +591,7 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta. // If the write returned an error, continue to the next response if result.Err != nil { atomic.AddInt64(&w.stats.WriteErr, 1) - w.Logger.Info("Write failed", zap.Uint64("node_id", result.Owner.NodeID), zap.Uint64("shard_id", shard.ID), zap.Error(result.Err)) + w.Logger.Warn("Write failed", zap.Uint64("node_id", result.Owner.NodeID), zap.Uint64("shard_id", shard.ID), zap.Error(result.Err)) if result.Err.Error() == hh.ErrHintedHandoffQueueNotEmpty.Error() || result.Err.Error() == hh.ErrQueueBlocked.Error() { continue diff --git a/tsdb/store.go b/tsdb/store.go index f4f8595..00b98a2 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -72,6 +72,28 @@ func (d *databaseState) removeIndexType(indexType string) { // hasMultipleIndexTypes returns true if the database has multiple index types. func (d *databaseState) hasMultipleIndexTypes() bool { return d != nil && len(d.indexTypes) > 1 } +type shardErrorMap struct { + mu sync.Mutex + shardErrors map[uint64]error +} + +func (se *shardErrorMap) setShardOpenError(shardID uint64, err error) { + se.mu.Lock() + defer se.mu.Unlock() + if err == nil { + delete(se.shardErrors, shardID) + } else { + se.shardErrors[shardID] = &ErrPreviousShardFail{error: fmt.Errorf("opening shard previously failed with: %w", err)} + } +} + +func (se *shardErrorMap) shardError(shardID uint64) (error, bool) { + se.mu.Lock() + defer se.mu.Unlock() + oldErr, hasErr := se.shardErrors[shardID] + return oldErr, hasErr +} + // Store manages shards and indexes for databases. type Store struct { mu sync.RWMutex @@ -88,6 +110,9 @@ type Store struct { // This prevents new shards from being created while old ones are being deleted. pendingShardDeletes map[uint64]struct{} + // Maintains a set of shards that failed to open + badShards shardErrorMap + // Epoch tracker helps serialize writes and deletes that may conflict. It // is stored by shard. epochs map[uint64]*epochTracker @@ -112,6 +137,7 @@ func NewStore(path string) *Store { sfiles: make(map[string]*SeriesFile), indexes: make(map[string]interface{}), pendingShardDeletes: make(map[uint64]struct{}), + badShards: shardErrorMap{shardErrors: make(map[uint64]error)}, epochs: make(map[uint64]*epochTracker), EngineOptions: NewEngineOptions(), Logger: logger, @@ -400,9 +426,9 @@ func (s *Store) loadShards() error { shard.CompactionDisabled = s.EngineOptions.CompactionDisabled shard.WithLogger(s.baseLogger) - err = shard.Open() + err = s.OpenShard(shard, false) if err != nil { - log.Info("Failed to open shard", logger.Shard(shardID), zap.Error(err)) + log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err)) resC <- &res{err: fmt.Errorf("Failed to open shard: %d: %s", shardID, err)} return } @@ -558,6 +584,42 @@ func (s *Store) Shard(id uint64) *Shard { return sh } +type ErrPreviousShardFail struct { + error +} + +func (e ErrPreviousShardFail) Unwrap() error { + return e.error +} + +func (e ErrPreviousShardFail) Is(err error) bool { + _, sOk := err.(ErrPreviousShardFail) + _, pOk := err.(*ErrPreviousShardFail) + return sOk || pOk +} + +func (e ErrPreviousShardFail) Error() string { + return e.error.Error() +} + +func (s *Store) OpenShard(sh *Shard, force bool) error { + if sh == nil { + return errors.New("cannot open nil shard") + } + oldErr, bad := s.badShards.shardError(sh.ID()) + if force || !bad { + err := sh.Open() + s.badShards.setShardOpenError(sh.ID(), err) + return err + } else { + return oldErr + } +} + +func (s *Store) SetShardOpenErrorForTest(shardID uint64, err error) { + s.badShards.setShardOpenError(shardID, err) +} + // Shards returns a list of shards by id. func (s *Store) Shards(ids []uint64) []*Shard { s.mu.RLock() @@ -651,7 +713,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en shard.WithLogger(s.baseLogger) shard.EnableOnOpen = enabled - if err := shard.Open(); err != nil { + if err := s.OpenShard(shard, false); err != nil { return err } diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 8b78a02..52ba842 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -27,6 +27,7 @@ import ( "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/index/inmem" "github.com/influxdata/influxql" + "github.com/stretchr/testify/require" ) // Ensure the store can delete a retention policy and all shards under @@ -143,6 +144,31 @@ func TestStore_CreateShard(t *testing.T) { } } +func TestStore_BadShard(t *testing.T) { + const errStr = "a shard open error" + indexes := tsdb.RegisteredIndexes() + for _, idx := range indexes { + func() { + s := MustOpenStore(idx) + defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx) + + sh := tsdb.NewTempShard(idx) + err := s.OpenShard(sh.Shard, false) + require.NoError(t, err, "opening temp shard") + defer require.NoError(t, sh.Close(), "closing temporary shard") + + s.SetShardOpenErrorForTest(sh.ID(), errors.New(errStr)) + err2 := s.OpenShard(sh.Shard, false) + require.Error(t, err2, "no error opening bad shard") + require.True(t, errors.Is(err2, tsdb.ErrPreviousShardFail{}), "exp: ErrPreviousShardFail, got: %v", err2) + require.EqualError(t, err2, "opening shard previously failed with: "+errStr) + + // This should succeed with the force (and because opening an open shard automatically succeeds) + require.NoError(t, s.OpenShard(sh.Shard, true), "forced re-opening previously failing shard") + }() + } +} + func TestStore_CreateMixedShards(t *testing.T) { t.Parallel() @@ -2048,7 +2074,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) { return } time.Sleep(500 * time.Microsecond) - if err := sh.Open(); err != nil { + if err := s.OpenShard(sh, false); err != nil { errC <- err return } @@ -2133,7 +2159,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) { return } time.Sleep(500 * time.Microsecond) - if err := sh.Open(); err != nil { + if err := s.OpenShard(sh, false); err != nil { errC <- err return } @@ -2224,7 +2250,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) { return } time.Sleep(500 * time.Microsecond) - if err := sh.Open(); err != nil { + if err := s.OpenShard(sh, false); err != nil { errC <- err return }