From 94a8ac40ede30c7f90da331e8b9c114e6c5cf1e5 Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Mon, 25 Mar 2024 17:15:31 -0700 Subject: [PATCH] fix: improved shard deletion (#24602) Avoid unnecessarily deleting series from the series file Try harder to delete series from InMem indices Log all errors on shard deletion Closes https://github.com/influxdata/influxdb/issues/24834 (cherry picked from commit 8ff06d5a92ca3a0a34b31bfa9d37e6cff72a58a3) closes https://github.com/influxdata/influxdb/issues/24835 --- tsdb/store.go | 46 +++++++++++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/tsdb/store.go b/tsdb/store.go index 956c4468d3d..992f4735127 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -811,7 +811,7 @@ func (s *Store) DeleteShard(shardID uint64) error { return ErrShardNotFound } - // Remove the shard from Store so it's not returned to callers requesting + // Remove the shard from Store, so it's not returned to callers requesting // shards. Also mark that this shard is currently being deleted in a separate // map so that we do not have to retain the global store lock while deleting // files. @@ -840,7 +840,6 @@ func (s *Store) DeleteShard(shardID uint64) error { defer s.mu.Unlock() delete(s.epochs, shardID) delete(s.pendingShardDeletes, shardID) - s.databases[db].removeIndexType(sh.IndexType()) }() // Get the shard's local bitset of series IDs. @@ -851,9 +850,10 @@ func (s *Store) DeleteShard(shardID uint64) error { ss := index.SeriesIDSet() - s.walkShards(shards, func(sh *Shard) error { + err = s.walkShards(shards, func(sh *Shard) error { index, err := sh.Index() if err != nil { + s.Logger.Error("cannot find shard index", zap.Uint64("shard_id", sh.ID()), zap.Error(err)) return err } @@ -861,6 +861,12 @@ func (s *Store) DeleteShard(shardID uint64) error { return nil }) + if err != nil { + // We couldn't get the index for a shard. Rather than deleting series which may + // exist in that shard as well as in the current shard, we stop the current deletion + return err + } + // Remove any remaining series in the set from the series file, as they don't // exist in any of the database's remaining shards. if ss.Cardinality() > 0 { @@ -872,7 +878,7 @@ func (s *Store) DeleteShard(shardID uint64) error { var keyBuf []byte // Series key buffer. var name []byte var tagsBuf models.Tags // Buffer for tags container. - var err error + var errs []error ss.ForEach(func(id uint64) { skey := sfile.SeriesKey(id) // Series File series key @@ -881,22 +887,32 @@ func (s *Store) DeleteShard(shardID uint64) error { } name, tagsBuf = ParseSeriesKeyInto(skey, tagsBuf) + keyBuf = keyBuf[:0] keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf) - if err = index.DropSeriesGlobal(keyBuf); err != nil { - return + if tmpErr := index.DropSeriesGlobal(keyBuf); tmpErr != nil { + sfile.Logger.Error( + "cannot drop series", + zap.Uint64("series_id", id), + zap.String("key", string(keyBuf)), + zap.Error(tmpErr)) + errs = append(errs, tmpErr) } }) - - if err != nil { - return err + if len(errs) != 0 { + return errors.Join(errs...) } } ss.ForEach(func(id uint64) { - sfile.DeleteSeriesID(id) + if err := sfile.DeleteSeriesID(id); err != nil { + sfile.Logger.Error( + "cannot delete series in shard", + zap.Uint64("series_id", id), + zap.Uint64("shard_id", shardID), + zap.Error(err)) + } }) } - } // enter the epoch tracker @@ -916,9 +932,13 @@ func (s *Store) DeleteShard(shardID uint64) error { // Remove the on-disk shard data. if err := os.RemoveAll(sh.path); err != nil { return err + } else if err = os.RemoveAll(sh.walPath); err != nil { + return err + } else { + // Remove index type from the database on success + s.databases[db].removeIndexType(sh.IndexType()) + return nil } - - return os.RemoveAll(sh.walPath) } // DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.