Skip to content

Commit

Permalink
Limit delete to run one shard at a time
Browse files Browse the repository at this point in the history
There was a change to speed up deleting and dropping measurements
that executed the deletes in parallel for all shards at once. #7015

When TSI was merged in #7618, the series keys passed into Shard.DeleteMeasurement
were removed and were expanded lower down.  This causes memory to blow up
when a delete across many shards occurs as we now expand the set of series
keys N times instead of just once as before.

While running the deletes in parallel would be ideal, there have been a number
of optimizations in the delete path that make running deletes serially pretty
good.  This change just limits the concurrency of the deletes which keeps memory
more stable.
  • Loading branch information
jwilder committed Jul 27, 2017
1 parent 42b82e7 commit 935379d
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Bugfixes

- [#8629](https://github.com/influxdata/influxdb/pull/8629): Interrupt in progress TSM compactions
- [#8630](https://github.com/influxdata/influxdb/pull/8630): Prevent excessive memory usage when dropping series

## v1.3.1 [2017-07-20]

Expand Down
13 changes: 13 additions & 0 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,13 @@ func (s *Store) DeleteMeasurement(database, name string) error {
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()

// Limit to 1 delete for each shard since expanding the measurement into the list
// of series keys can be very memory intensive if run concurrently.
limit := limiter.NewFixed(1)
return s.walkShards(shards, func(sh *Shard) error {
limit.Take()
defer limit.Release()

if err := sh.DeleteMeasurement([]byte(name)); err != nil {
return err
}
Expand Down Expand Up @@ -834,6 +840,10 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
s.mu.RLock()
defer s.mu.RUnlock()

// Limit to 1 delete for each shard since expanding the measurement into the list
// of series keys can be very memory intensive if run concurrently.
limit := limiter.NewFixed(1)

return s.walkShards(shards, func(sh *Shard) error {
// Determine list of measurements from sources.
// Use all measurements if no FROM clause was provided.
Expand All @@ -852,6 +862,9 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
}
sort.Strings(names)

limit.Take()
defer limit.Release()

// Find matching series keys for each measurement.
var keys [][]byte
for _, name := range names {
Expand Down

0 comments on commit 935379d

Please sign in to comment.