Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configurable DELETE concurrency (#23055) #23059

Merged
merged 1 commit into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@
# to cache snapshotting.
# max-concurrent-compactions = 0

# MaxConcurrentDeletes is the maximum number of simultaneous DELETE calls on a shard
# The default is 1, and should be left unchanged for most users
# MaxConcurrentDeletes = 1

# CompactThroughput is the rate limit in bytes per second that we
# will allow TSM compactions to write to disk. Note that short bursts are allowed
# to happen at a possibly larger value, set by CompactThroughputBurst
Expand Down
12 changes: 12 additions & 0 deletions tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ const (
// write-ahead log file will compact into an index file.
DefaultMaxIndexLogFileSize = 1 * 1024 * 1024 // 1MB

// DefaultMaxConcurrentDeletes is the default number of concurrent DELETE calls on a shard.
DefaultMaxConcurrentDeletes = 1

// DefaultSeriesIDSetCacheSize is the default number of series ID sets to cache in the TSI index.
DefaultSeriesIDSetCacheSize = 100

Expand Down Expand Up @@ -138,6 +141,10 @@ type Config struct {
// be compacted less frequently, store more series in-memory, and provide higher write throughput.
MaxIndexLogFileSize toml.Size `toml:"max-index-log-file-size"`

// MaxConcurrentDeletes is the maximum number of simultaneous DELETE calls on a shard
// The default is 1, which was the previous hard-coded value.
MaxConcurrentDeletes int `toml:"max-concurrent-deletes"`

// SeriesIDSetCacheSize is the number items that can be cached within the TSI index. TSI caching can help
// with query performance when the same tag key/value predicates are commonly used on queries.
// Setting series-id-set-cache-size to 0 disables the cache.
Expand Down Expand Up @@ -178,6 +185,7 @@ func NewConfig() Config {
MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase,
MaxValuesPerTag: DefaultMaxValuesPerTag,
MaxConcurrentCompactions: DefaultMaxConcurrentCompactions,
MaxConcurrentDeletes: DefaultMaxConcurrentDeletes,

MaxIndexLogFileSize: toml.Size(DefaultMaxIndexLogFileSize),
SeriesIDSetCacheSize: DefaultSeriesIDSetCacheSize,
Expand All @@ -201,6 +209,10 @@ func (c *Config) Validate() error {
return errors.New("max-concurrent-compactions must be non-negative")
}

if c.MaxConcurrentDeletes <= 0 {
return errors.New("max-concurrent-deletes must be positive")
}

if c.SeriesIDSetCacheSize < 0 {
return errors.New("series-id-set-cache-size must be non-negative")
}
Expand Down
8 changes: 4 additions & 4 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,9 +1000,9 @@ func (s *Store) DeleteMeasurement(database, name string) error {
epochs := s.epochsForShards(shards)
s.mu.RUnlock()

// Limit to 1 delete for each shard since expanding the measurement into the list
// Limit deletes for each shard since expanding the measurement into the list
// of series keys can be very memory intensive if run concurrently.
limit := limiter.NewFixed(1)
limit := limiter.NewFixed(s.EngineOptions.Config.MaxConcurrentDeletes)
return s.walkShards(shards, func(sh *Shard) error {
limit.Take()
defer limit.Release()
Expand Down Expand Up @@ -1391,9 +1391,9 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
epochs := s.epochsForShards(shards)
s.mu.RUnlock()

// Limit to 1 delete for each shard since expanding the measurement into the list
// Limit deletes for each shard since expanding the measurement into the list
// of series keys can be very memory intensive if run concurrently.
limit := limiter.NewFixed(1)
limit := limiter.NewFixed(s.EngineOptions.Config.MaxConcurrentDeletes)

return s.walkShards(shards, func(sh *Shard) error {
// Determine list of measurements from sources.
Expand Down