From 935379d7515980e3c830551bd58e1390f52ac8f9 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 25 Jul 2017 16:20:52 -0600 Subject: [PATCH] Limit delete to run one shard at a time There was a change to speed up deleting and dropping measurements that executed the deletes in parallel for all shards at once. #7015 When TSI was merged in #7618, the series keys passed into Shard.DeleteMeasurement were removed and were expanded lower down. This causes memory to blow up when a delete across many shards occurs as we now expand the set of series keys N times instead of just once as before. While running the deletes in parallel would be ideal, there have been a number of optimizations in the delete path that make running deletes serially pretty good. This change just limits the concurrency of the deletes which keeps memory more stable. --- CHANGELOG.md | 1 + tsdb/store.go | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 463d057d76a..a3c2c9fa701 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Bugfixes - [#8629](https://github.com/influxdata/influxdb/pull/8629): Interrupt in progress TSM compactions +- [#8630](https://github.com/influxdata/influxdb/pull/8630): Prevent excessive memory usage when dropping series ## v1.3.1 [2017-07-20] diff --git a/tsdb/store.go b/tsdb/store.go index 2cc51ac2cd0..4a6b414b45e 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -570,7 +570,13 @@ func (s *Store) DeleteMeasurement(database, name string) error { shards := s.filterShards(byDatabase(database)) s.mu.RUnlock() + // Limit to 1 delete for each shard since expanding the measurement into the list + // of series keys can be very memory intensive if run concurrently. + limit := limiter.NewFixed(1) return s.walkShards(shards, func(sh *Shard) error { + limit.Take() + defer limit.Release() + if err := sh.DeleteMeasurement([]byte(name)); err != nil { return err } @@ -834,6 +840,10 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi s.mu.RLock() defer s.mu.RUnlock() + // Limit to 1 delete for each shard since expanding the measurement into the list + // of series keys can be very memory intensive if run concurrently. + limit := limiter.NewFixed(1) + return s.walkShards(shards, func(sh *Shard) error { // Determine list of measurements from sources. // Use all measurements if no FROM clause was provided. @@ -852,6 +862,9 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi } sort.Strings(names) + limit.Take() + defer limit.Release() + // Find matching series keys for each measurement. var keys [][]byte for _, name := range names {