Skip to content

Commit

Permalink
Limit delete to run one shard at a time
Browse files Browse the repository at this point in the history
There was a change to speed up deleting and dropping measurements
that executed the deletes in parallel for all shards at once. #7015

When TSI was merged in #7618, the series keys passed into Shard.DeleteMeasurement
were removed and were expanded lower down.  This causes memory to blow up
when a delete across many shards occurs as we now expand the set of series
keys N times instead of just once as before.

While running the deletes in parallel would be ideal, there have been a number
of optimizations in the delete path that make running deletes serially pretty
good.  This change just limits the concurrency of the deletes which keeps memory
more stable.
  • Loading branch information
jwilder committed Jul 25, 2017
1 parent fe1167c commit b72094b
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,13 @@ func (s *Store) DeleteMeasurement(database, name string) error {
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()

// Limit to 1 delete for each shard since expanding the measurement into the list
// of series keys can be very memory intensive if run concurrently.
limit := limiter.NewFixed(1)
return s.walkShards(shards, func(sh *Shard) error {
limit.Take()
defer limit.Release()

if err := sh.DeleteMeasurement([]byte(name)); err != nil {
return err
}
Expand Down Expand Up @@ -834,6 +840,10 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
s.mu.RLock()
defer s.mu.RUnlock()

// Limit to 1 delete for each shard since expanding the measurement into the list
// of series keys can be very memory intensive if run concurrently.
limit := limiter.NewFixed(1)

return s.walkShards(shards, func(sh *Shard) error {
// Determine list of measurements from sources.
// Use all measurements if no FROM clause was provided.
Expand All @@ -852,6 +862,9 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
}
sort.Strings(names)

limit.Take()
defer limit.Release()

// Find matching series keys for each measurement.
var keys [][]byte
for _, name := range names {
Expand Down

0 comments on commit b72094b

Please sign in to comment.