From 497b9e3a5f791f7f9c5e1e87b049f9b74aa1afef Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Thu, 13 Jan 2022 11:04:57 -0800 Subject: [PATCH] feat: configurable DELETE concurrency (#23055) Currently, deletion of series or measurements are serialized. This new feature will add max-concurrent-deletes to the [data] section of the configuration file. Legal values are any positive number, defaulting to 1, the current behavior. closes https://github.com/influxdata/influxdb/issues/23054 (cherry picked from commit eb3bc7069f9b30b7c3ba03856caf10c6c116a5d9) --- etc/config.sample.toml | 4 ++++ tsdb/config.go | 12 ++++++++++++ tsdb/store.go | 8 ++++---- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index e926a1c3861..b32fdd2da85 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -111,6 +111,10 @@ # to cache snapshotting. # max-concurrent-compactions = 0 + # MaxConcurrentDeletes is the maximum number of simultaneous DELETE calls on a shard + # The default is 1, and should be left unchanged for most users + # MaxConcurrentDeletes = 1 + # CompactThroughput is the rate limit in bytes per second that we # will allow TSM compactions to write to disk. Note that short bursts are allowed # to happen at a possibly larger value, set by CompactThroughputBurst diff --git a/tsdb/config.go b/tsdb/config.go index ef73e8b7043..feb4927783a 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -67,6 +67,9 @@ const ( // write-ahead log file will compact into an index file. DefaultMaxIndexLogFileSize = 1 * 1024 * 1024 // 1MB + // DefaultMaxConcurrentDeletes is the default number of concurrent DELETE calls on a shard. + DefaultMaxConcurrentDeletes = 1 + // DefaultSeriesIDSetCacheSize is the default number of series ID sets to cache in the TSI index. DefaultSeriesIDSetCacheSize = 100 @@ -138,6 +141,10 @@ type Config struct { // be compacted less frequently, store more series in-memory, and provide higher write throughput. MaxIndexLogFileSize toml.Size `toml:"max-index-log-file-size"` + // MaxConcurrentDeletes is the maximum number of simultaneous DELETE calls on a shard + // The default is 1, which was the previous hard-coded value. + MaxConcurrentDeletes int `toml:"max-concurrent-deletes"` + // SeriesIDSetCacheSize is the number items that can be cached within the TSI index. TSI caching can help // with query performance when the same tag key/value predicates are commonly used on queries. // Setting series-id-set-cache-size to 0 disables the cache. @@ -178,6 +185,7 @@ func NewConfig() Config { MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase, MaxValuesPerTag: DefaultMaxValuesPerTag, MaxConcurrentCompactions: DefaultMaxConcurrentCompactions, + MaxConcurrentDeletes: DefaultMaxConcurrentDeletes, MaxIndexLogFileSize: toml.Size(DefaultMaxIndexLogFileSize), SeriesIDSetCacheSize: DefaultSeriesIDSetCacheSize, @@ -201,6 +209,10 @@ func (c *Config) Validate() error { return errors.New("max-concurrent-compactions must be non-negative") } + if c.MaxConcurrentDeletes <= 0 { + return errors.New("max-concurrent-deletes must be positive") + } + if c.SeriesIDSetCacheSize < 0 { return errors.New("series-id-set-cache-size must be non-negative") } diff --git a/tsdb/store.go b/tsdb/store.go index bce30a37d6f..3d1b39952bf 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -1000,9 +1000,9 @@ func (s *Store) DeleteMeasurement(database, name string) error { epochs := s.epochsForShards(shards) s.mu.RUnlock() - // Limit to 1 delete for each shard since expanding the measurement into the list + // Limit deletes for each shard since expanding the measurement into the list // of series keys can be very memory intensive if run concurrently. - limit := limiter.NewFixed(1) + limit := limiter.NewFixed(s.EngineOptions.Config.MaxConcurrentDeletes) return s.walkShards(shards, func(sh *Shard) error { limit.Take() defer limit.Release() @@ -1391,9 +1391,9 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi epochs := s.epochsForShards(shards) s.mu.RUnlock() - // Limit to 1 delete for each shard since expanding the measurement into the list + // Limit deletes for each shard since expanding the measurement into the list // of series keys can be very memory intensive if run concurrently. - limit := limiter.NewFixed(1) + limit := limiter.NewFixed(s.EngineOptions.Config.MaxConcurrentDeletes) return s.walkShards(shards, func(sh *Shard) error { // Determine list of measurements from sources.