Skip to content

Commit

Permalink
feat: configurable DELETE concurrency (#23055)
Browse files Browse the repository at this point in the history
Currently, deletion of series or measurements are
serialized. This new feature will add
max-concurrent-deletes to the [data] section of the
 configuration file. Legal values are any positive
 number, defaulting to 1, the current behavior.

 closes influxdata/influxdb#23054
  • Loading branch information
davidby-influx authored and chengshiwen committed Sep 1, 2024
1 parent b241a01 commit 7c46d50
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
4 changes: 4 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@
# to cache snapshotting.
# max-concurrent-compactions = 0

# MaxConcurrentDeletes is the maximum number of simultaneous DELETE calls on a shard
# The default is 1, and should be left unchanged for most users
# MaxConcurrentDeletes = 1

# CompactThroughput is the rate limit in bytes per second that we
# will allow TSM compactions to write to disk. Note that short bursts are allowed
# to happen at a possibly larger value, set by CompactThroughputBurst
Expand Down
13 changes: 13 additions & 0 deletions tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ const (
// write-ahead log file will compact into an index file.
DefaultMaxIndexLogFileSize = 1 * 1024 * 1024 // 1MB

// DefaultMaxConcurrentDeletes is the default number of concurrent DELETE calls on a shard.
DefaultMaxConcurrentDeletes = 1

// DefaultSeriesIDSetCacheSize is the default number of series ID sets to cache in the TSI index.
DefaultSeriesIDSetCacheSize = 100

Expand Down Expand Up @@ -131,6 +134,10 @@ type Config struct {
// be compacted less frequently, store more series in-memory, and provide higher write throughput.
MaxIndexLogFileSize toml.Size `toml:"max-index-log-file-size"`

// MaxConcurrentDeletes is the maximum number of simultaneous DELETE calls on a shard
// The default is 1, which was the previous hard-coded value.
MaxConcurrentDeletes int `toml:"max-concurrent-deletes"`

// SeriesIDSetCacheSize is the number items that can be cached within the TSI index. TSI caching can help
// with query performance when the same tag key/value predicates are commonly used on queries.
// Setting series-id-set-cache-size to 0 disables the cache.
Expand Down Expand Up @@ -171,6 +178,7 @@ func NewConfig() Config {
MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase,
MaxValuesPerTag: DefaultMaxValuesPerTag,
MaxConcurrentCompactions: DefaultMaxConcurrentCompactions,
MaxConcurrentDeletes: DefaultMaxConcurrentDeletes,

MaxIndexLogFileSize: toml.Size(DefaultMaxIndexLogFileSize),
SeriesIDSetCacheSize: DefaultSeriesIDSetCacheSize,
Expand All @@ -194,6 +202,10 @@ func (c *Config) Validate() error {
return errors.New("max-concurrent-compactions must be non-negative")
}

if c.MaxConcurrentDeletes <= 0 {
return errors.New("max-concurrent-deletes must be positive")
}

if c.SeriesIDSetCacheSize < 0 {
return errors.New("series-id-set-cache-size must be non-negative")
}
Expand Down Expand Up @@ -241,6 +253,7 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
"max-series-per-database": c.MaxSeriesPerDatabase,
"max-values-per-tag": c.MaxValuesPerTag,
"max-concurrent-compactions": c.MaxConcurrentCompactions,
"max-concurrent-deletes": c.MaxConcurrentDeletes,
"max-index-log-file-size": c.MaxIndexLogFileSize,
"series-id-set-cache-size": c.SeriesIDSetCacheSize,
"series-file-max-concurrent-compactions": c.SeriesFileMaxConcurrentSnapshotCompactions,
Expand Down
8 changes: 4 additions & 4 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,9 +1031,9 @@ func (s *Store) DeleteMeasurement(database, name string) error {
epochs := s.epochsForShards(shards)
s.mu.RUnlock()

// Limit to 1 delete for each shard since expanding the measurement into the list
// Limit deletes for each shard since expanding the measurement into the list
// of series keys can be very memory intensive if run concurrently.
limit := limiter.NewFixed(1)
limit := limiter.NewFixed(s.EngineOptions.Config.MaxConcurrentDeletes)
return s.walkShards(shards, func(sh *Shard) error {
limit.Take()
defer limit.Release()
Expand Down Expand Up @@ -1425,9 +1425,9 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
epochs := s.epochsForShards(shards)
s.mu.RUnlock()

// Limit to 1 delete for each shard since expanding the measurement into the list
// Limit deletes for each shard since expanding the measurement into the list
// of series keys can be very memory intensive if run concurrently.
limit := limiter.NewFixed(1)
limit := limiter.NewFixed(s.EngineOptions.Config.MaxConcurrentDeletes)

return s.walkShards(shards, func(sh *Shard) error {
// Determine list of measurements from sources.
Expand Down

0 comments on commit 7c46d50

Please sign in to comment.