diff --git a/etc/config.sample.toml b/etc/config.sample.toml index f20c909..631f44f 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -127,6 +127,10 @@ # to cache snapshotting. # max-concurrent-compactions = 0 + # MaxConcurrentDeletes is the maximum number of simultaneous DELETE calls on a shard + # The default is 1, and should be left unchanged for most users + # MaxConcurrentDeletes = 1 + # CompactThroughput is the rate limit in bytes per second that we # will allow TSM compactions to write to disk. Note that short bursts are allowed # to happen at a possibly larger value, set by CompactThroughputBurst diff --git a/tsdb/config.go b/tsdb/config.go index 025a847..b990e4a 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -67,6 +67,9 @@ const ( // write-ahead log file will compact into an index file. DefaultMaxIndexLogFileSize = 1 * 1024 * 1024 // 1MB + // DefaultMaxConcurrentDeletes is the default number of concurrent DELETE calls on a shard. + DefaultMaxConcurrentDeletes = 1 + // DefaultSeriesIDSetCacheSize is the default number of series ID sets to cache in the TSI index. DefaultSeriesIDSetCacheSize = 100 @@ -131,6 +134,10 @@ type Config struct { // be compacted less frequently, store more series in-memory, and provide higher write throughput. MaxIndexLogFileSize toml.Size `toml:"max-index-log-file-size"` + // MaxConcurrentDeletes is the maximum number of simultaneous DELETE calls on a shard + // The default is 1, which was the previous hard-coded value. + MaxConcurrentDeletes int `toml:"max-concurrent-deletes"` + // SeriesIDSetCacheSize is the number items that can be cached within the TSI index. TSI caching can help // with query performance when the same tag key/value predicates are commonly used on queries. // Setting series-id-set-cache-size to 0 disables the cache. @@ -171,6 +178,7 @@ func NewConfig() Config { MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase, MaxValuesPerTag: DefaultMaxValuesPerTag, MaxConcurrentCompactions: DefaultMaxConcurrentCompactions, + MaxConcurrentDeletes: DefaultMaxConcurrentDeletes, MaxIndexLogFileSize: toml.Size(DefaultMaxIndexLogFileSize), SeriesIDSetCacheSize: DefaultSeriesIDSetCacheSize, @@ -194,6 +202,10 @@ func (c *Config) Validate() error { return errors.New("max-concurrent-compactions must be non-negative") } + if c.MaxConcurrentDeletes <= 0 { + return errors.New("max-concurrent-deletes must be positive") + } + if c.SeriesIDSetCacheSize < 0 { return errors.New("series-id-set-cache-size must be non-negative") } @@ -241,6 +253,7 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) { "max-series-per-database": c.MaxSeriesPerDatabase, "max-values-per-tag": c.MaxValuesPerTag, "max-concurrent-compactions": c.MaxConcurrentCompactions, + "max-concurrent-deletes": c.MaxConcurrentDeletes, "max-index-log-file-size": c.MaxIndexLogFileSize, "series-id-set-cache-size": c.SeriesIDSetCacheSize, "series-file-max-concurrent-compactions": c.SeriesFileMaxConcurrentSnapshotCompactions, diff --git a/tsdb/store.go b/tsdb/store.go index e301aa5..ce3b9ad 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -1031,9 +1031,9 @@ func (s *Store) DeleteMeasurement(database, name string) error { epochs := s.epochsForShards(shards) s.mu.RUnlock() - // Limit to 1 delete for each shard since expanding the measurement into the list + // Limit deletes for each shard since expanding the measurement into the list // of series keys can be very memory intensive if run concurrently. - limit := limiter.NewFixed(1) + limit := limiter.NewFixed(s.EngineOptions.Config.MaxConcurrentDeletes) return s.walkShards(shards, func(sh *Shard) error { limit.Take() defer limit.Release() @@ -1425,9 +1425,9 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi epochs := s.epochsForShards(shards) s.mu.RUnlock() - // Limit to 1 delete for each shard since expanding the measurement into the list + // Limit deletes for each shard since expanding the measurement into the list // of series keys can be very memory intensive if run concurrently. - limit := limiter.NewFixed(1) + limit := limiter.NewFixed(s.EngineOptions.Config.MaxConcurrentDeletes) return s.walkShards(shards, func(sh *Shard) error { // Determine list of measurements from sources.