Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improved shard deletion (#24602) #24893

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ func (s *Store) DeleteShard(shardID uint64) error {
return ErrShardNotFound
}

// Remove the shard from Store so it's not returned to callers requesting
// Remove the shard from Store, so it's not returned to callers requesting
// shards. Also mark that this shard is currently being deleted in a separate
// map so that we do not have to retain the global store lock while deleting
// files.
Expand Down Expand Up @@ -840,7 +840,6 @@ func (s *Store) DeleteShard(shardID uint64) error {
defer s.mu.Unlock()
delete(s.epochs, shardID)
delete(s.pendingShardDeletes, shardID)
s.databases[db].removeIndexType(sh.IndexType())
}()

// Get the shard's local bitset of series IDs.
Expand All @@ -851,16 +850,23 @@ func (s *Store) DeleteShard(shardID uint64) error {

ss := index.SeriesIDSet()

s.walkShards(shards, func(sh *Shard) error {
err = s.walkShards(shards, func(sh *Shard) error {
index, err := sh.Index()
if err != nil {
s.Logger.Error("cannot find shard index", zap.Uint64("shard_id", sh.ID()), zap.Error(err))
return err
}

ss.Diff(index.SeriesIDSet())
return nil
})

if err != nil {
// We couldn't get the index for a shard. Rather than deleting series which may
// exist in that shard as well as in the current shard, we stop the current deletion
return err
}

// Remove any remaining series in the set from the series file, as they don't
// exist in any of the database's remaining shards.
if ss.Cardinality() > 0 {
Expand All @@ -872,7 +878,7 @@ func (s *Store) DeleteShard(shardID uint64) error {
var keyBuf []byte // Series key buffer.
var name []byte
var tagsBuf models.Tags // Buffer for tags container.
var err error
var errs []error

ss.ForEach(func(id uint64) {
skey := sfile.SeriesKey(id) // Series File series key
Expand All @@ -881,22 +887,32 @@ func (s *Store) DeleteShard(shardID uint64) error {
}

name, tagsBuf = ParseSeriesKeyInto(skey, tagsBuf)
keyBuf = keyBuf[:0]
keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf)
if err = index.DropSeriesGlobal(keyBuf); err != nil {
return
if tmpErr := index.DropSeriesGlobal(keyBuf); tmpErr != nil {
sfile.Logger.Error(
"cannot drop series",
zap.Uint64("series_id", id),
zap.String("key", string(keyBuf)),
zap.Error(tmpErr))
errs = append(errs, tmpErr)
}
})

if err != nil {
return err
if len(errs) != 0 {
return errors.Join(errs...)
}
}

ss.ForEach(func(id uint64) {
sfile.DeleteSeriesID(id)
if err := sfile.DeleteSeriesID(id); err != nil {
sfile.Logger.Error(
"cannot delete series in shard",
zap.Uint64("series_id", id),
zap.Uint64("shard_id", shardID),
zap.Error(err))
}
})
}

}

// enter the epoch tracker
Expand All @@ -916,9 +932,13 @@ func (s *Store) DeleteShard(shardID uint64) error {
// Remove the on-disk shard data.
if err := os.RemoveAll(sh.path); err != nil {
return err
} else if err = os.RemoveAll(sh.walPath); err != nil {
return err
} else {
// Remove index type from the database on success
s.databases[db].removeIndexType(sh.IndexType())
return nil
}

return os.RemoveAll(sh.walPath)
}

// DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
Expand Down