Skip to content

Commit

Permalink
ingest: use single configuration for fetch concurrency (grafana#10156)
Browse files Browse the repository at this point in the history
This replaces the two previous flags with one single one.

```
# old flags
-ingest-storage.kafka.startup-fetch-concurrency
-ingest-storage.kafka.ongoing-fetch-concurrency

# new flag
-ingest-storage.kafka.fetch-concurrency-max
```

At GL we've been running with a single value for both. The reason for having two flags in the first place is because we couldn't find a balance between throughput and latency. This was largely fixed with the change to not issue fetch requests for beyond the HWM (9892). So now we no longer need the complexity of being able to change concurrency settings

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov authored and bjorns163 committed Dec 30, 2024
1 parent eafbc6a commit 9390a96
Show file tree
Hide file tree
Showing 14 changed files with 39 additions and 355 deletions.
2 changes: 0 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
* [CHANGE] Distributor: Drop experimental `-distributor.direct-otlp-translation-enabled` flag, since direct OTLP translation is well tested at this point. #9647
* [CHANGE] Ingester: Change `-initial-delay` for circuit breakers to begin when the first request is received, rather than at breaker activation. #9842
* [CHANGE] Query-frontend: apply query pruning before query sharding instead of after. #9913
* [CHANGE] Ingester: remove experimental flags `-ingest-storage.kafka.ongoing-records-per-fetch` and `-ingest-storage.kafka.startup-records-per-fetch`. They are removed in favour of `-ingest-storage.kafka.max-buffered-bytes`. #9906
* [CHANGE] Ingester: Replace `cortex_discarded_samples_total` label from `sample-out-of-bounds` to `sample-timestamp-too-old`. #9885
* [CHANGE] Ruler: the `/prometheus/config/v1/rules` does not return an error anymore if a rule group is missing in the object storage after been successfully returned by listing the storage, because it could have been deleted in the meanwhile. #9936
* [CHANGE] Querier: The `.` pattern in regular expressions in PromQL matches newline characters. With this change regular expressions like `.*` match strings that include `\n`. To maintain the old behaviour, you will have to change regular expressions by replacing all `.` patterns with `[^\n]`, e.g. `foo[^\n]*`. This upgrades PromQL compatibility from Prometheus 2.0 to 3.0. #9844
Expand Down Expand Up @@ -101,7 +100,6 @@
* [ENHANCEMENT] Compactor: refresh deletion marks when updating the bucket index concurrently. This speeds up updating the bucket index by up to 16 times when there is a lot of blocks churn (thousands of blocks churning every cleanup cycle). #9881
* [ENHANCEMENT] PromQL: make `sort_by_label` stable. #9879
* [ENHANCEMENT] Distributor: Initialize ha_tracker cache before ha_tracker and distributor reach running state and begin serving writes. #9826 #9976
* [ENHANCEMENT] Ingester: `-ingest-storage.kafka.max-buffered-bytes` to limit the memory for buffered records when using concurrent fetching. #9892
* [ENHANCEMENT] Querier: improve performance and memory consumption of queries that select many series. #9914
* [ENHANCEMENT] Ruler: Support OAuth2 and proxies in Alertmanager client #9945 #10030
* [ENHANCEMENT] Ingester: Add `-blocks-storage.tsdb.bigger-out-of-order-blocks-for-old-samples` to build 24h blocks for out-of-order data belonging to the previous days instead of building smaller 2h blocks. This reduces pressure on compactors and ingesters when the out-of-order samples span multiple days in the past. #9844 #10033 #10035
Expand Down
16 changes: 3 additions & 13 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6748,22 +6748,12 @@
},
{
"kind": "field",
"name": "startup_fetch_concurrency",
"name": "fetch_concurrency_max",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.",
"desc": "The maximum number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. Concurrent fetch requests are issued only when there is sufficient backlog of records to consume. 0 to disable.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.startup-fetch-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "ongoing_fetch_concurrency",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to disable.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.ongoing-fetch-concurrency",
"fieldFlag": "ingest-storage.kafka.fetch-concurrency-max",
"fieldType": "int"
},
{
Expand Down
6 changes: 2 additions & 4 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,8 @@ Usage of ./cmd/mimir/mimir:
How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s)
-ingest-storage.kafka.dial-timeout duration
The maximum time allowed to open a connection to a Kafka broker. (default 2s)
-ingest-storage.kafka.fetch-concurrency-max int
The maximum number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. Concurrent fetch requests are issued only when there is sufficient backlog of records to consume. 0 to disable.
-ingest-storage.kafka.ingestion-concurrency-batch-size int
The number of timeseries to batch together before ingesting to the TSDB head. Only use this setting when -ingest-storage.kafka.ingestion-concurrency-max is greater than 0. (default 150)
-ingest-storage.kafka.ingestion-concurrency-estimated-bytes-per-sample int
Expand All @@ -1529,8 +1531,6 @@ Usage of ./cmd/mimir/mimir:
The maximum number of buffered records ready to be processed. This limit applies to the sum of all inflight requests. Set to 0 to disable the limit. (default 100000000)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to disable.
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
Expand All @@ -1539,8 +1539,6 @@ Usage of ./cmd/mimir/mimir:
The password used to authenticate to Kafka using the SASL plain mechanism. To enable SASL, configure both the username and password.
-ingest-storage.kafka.sasl-username string
The username used to authenticate to Kafka using the SASL plain mechanism. To enable SASL, configure both the username and password.
-ingest-storage.kafka.startup-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
Expand Down
6 changes: 2 additions & 4 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ Usage of ./cmd/mimir/mimir:
How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s)
-ingest-storage.kafka.dial-timeout duration
The maximum time allowed to open a connection to a Kafka broker. (default 2s)
-ingest-storage.kafka.fetch-concurrency-max int
The maximum number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. Concurrent fetch requests are issued only when there is sufficient backlog of records to consume. 0 to disable.
-ingest-storage.kafka.ingestion-concurrency-batch-size int
The number of timeseries to batch together before ingesting to the TSDB head. Only use this setting when -ingest-storage.kafka.ingestion-concurrency-max is greater than 0. (default 150)
-ingest-storage.kafka.ingestion-concurrency-estimated-bytes-per-sample int
Expand All @@ -431,8 +433,6 @@ Usage of ./cmd/mimir/mimir:
The maximum number of buffered records ready to be processed. This limit applies to the sum of all inflight requests. Set to 0 to disable the limit. (default 100000000)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to disable.
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
Expand All @@ -441,8 +441,6 @@ Usage of ./cmd/mimir/mimir:
The password used to authenticate to Kafka using the SASL plain mechanism. To enable SASL, configure both the username and password.
-ingest-storage.kafka.sasl-username string
The username used to authenticate to Kafka using the SASL plain mechanism. To enable SASL, configure both the username and password.
-ingest-storage.kafka.startup-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
Expand Down
3 changes: 1 addition & 2 deletions development/mimir-ingest-storage/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ std.manifestYamlDoc({
'-ingester.ring.prefix=exclusive-prefix',
'-ingest-storage.kafka.consume-from-position-at-startup=end',
'-ingest-storage.kafka.consume-from-timestamp-at-startup=0',
'-ingest-storage.kafka.startup-fetch-concurrency=15',
'-ingest-storage.kafka.ongoing-fetch-concurrency=2',
'-ingest-storage.kafka.fetch-concurrency-max=2',
'-ingest-storage.kafka.ingestion-concurrency-max=2',
'-ingest-storage.kafka.ingestion-concurrency-batch-size=150',
],
Expand Down
2 changes: 1 addition & 1 deletion development/mimir-ingest-storage/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@
"command":
- "sh"
- "-c"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-max=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.fetch-concurrency-max=2 -ingest-storage.kafka.ingestion-concurrency-max=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150"
"depends_on":
"kafka_1":
"condition": "service_healthy"
Expand Down
16 changes: 5 additions & 11 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3939,17 +3939,11 @@ kafka:
# CLI flag: -ingest-storage.kafka.wait-strong-read-consistency-timeout
[wait_strong_read_consistency_timeout: <duration> | default = 20s]
# The number of concurrent fetch requests that the ingester makes when reading
# data from Kafka during startup. 0 to disable.
# CLI flag: -ingest-storage.kafka.startup-fetch-concurrency
[startup_fetch_concurrency: <int> | default = 0]
# The number of concurrent fetch requests that the ingester makes when reading
# data continuously from Kafka after startup. Is disabled unless
# ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to
# disable.
# CLI flag: -ingest-storage.kafka.ongoing-fetch-concurrency
[ongoing_fetch_concurrency: <int> | default = 0]
# The maximum number of concurrent fetch requests that the ingester makes when
# reading data from Kafka during startup. Concurrent fetch requests are issued
# only when there is sufficient backlog of records to consume. 0 to disable.
# CLI flag: -ingest-storage.kafka.fetch-concurrency-max
[fetch_concurrency_max: <int> | default = 0]
# When enabled, the fetch request MaxBytes field is computed using the
# compressed size of previous records. When disabled, MaxBytes is computed
Expand Down
3 changes: 1 addition & 2 deletions integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ blocks_storage:
"-ingester.partition-ring.min-partition-owners-duration": "0s",

// Enable ingestion and fetch concurrency
"-ingest-storage.kafka.ongoing-fetch-concurrency": "12",
"-ingest-storage.kafka.startup-fetch-concurrency": "12",
"-ingest-storage.kafka.fetch-concurrency-max": "12",
"-ingest-storage.kafka.ingestion-concurrency-max": "8",
"-ingest-storage.kafka.auto-create-topic-default-partitions": "10",
}
Expand Down
14 changes: 4 additions & 10 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ type KafkaConfig struct {
// Used when logging unsampled client errors. Set from ingester's ErrorSampleRate.
FallbackClientErrorSampleRate int64 `yaml:"-"`

StartupFetchConcurrency int `yaml:"startup_fetch_concurrency"`
OngoingFetchConcurrency int `yaml:"ongoing_fetch_concurrency"`
FetchConcurrencyMax int `yaml:"fetch_concurrency_max"`
UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"`
MaxBufferedBytes int `yaml:"max_buffered_bytes"`

Expand Down Expand Up @@ -177,8 +176,7 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.")

f.IntVar(&cfg.StartupFetchConcurrency, prefix+".startup-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.")
f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. 0 to disable.")
f.IntVar(&cfg.FetchConcurrencyMax, prefix+".fetch-concurrency-max", 0, "The maximum number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. Concurrent fetch requests are issued only when there is sufficient backlog of records to consume. 0 to disable.")
f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.")
f.IntVar(&cfg.MaxBufferedBytes, prefix+".max-buffered-bytes", 100_000_000, "The maximum number of buffered records ready to be processed. This limit applies to the sum of all inflight requests. Set to 0 to disable the limit.")

Expand Down Expand Up @@ -222,12 +220,8 @@ func (cfg *KafkaConfig) Validate() error {
return ErrInvalidMaxConsumerLagAtStartup
}

if cfg.StartupFetchConcurrency < 0 {
return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater or equal to 0")
}

if cfg.OngoingFetchConcurrency > 0 && cfg.StartupFetchConcurrency <= 0 {
return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater than 0 when ingest-storage.kafka.ongoing-fetch-concurrency is greater than 0")
if cfg.FetchConcurrencyMax < 0 {
return fmt.Errorf("ingest-storage.kafka.fetch-concurrency-max must be greater than or equal to 0")
}

if cfg.MaxBufferedBytes >= math.MaxInt32 {
Expand Down
12 changes: 0 additions & 12 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ type fetcher interface {
// record and use the returned context when doing something that is common to all records.
PollFetches(context.Context) (kgo.Fetches, context.Context)

// Update updates the fetcher with the given concurrency.
Update(ctx context.Context, concurrency int)

// Stop stops the fetcher.
Stop()

Expand Down Expand Up @@ -400,15 +397,6 @@ func (r *concurrentFetchers) Stop() {
level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_offset", r.lastReturnedOffset)
}

// Update implements fetcher
func (r *concurrentFetchers) Update(ctx context.Context, concurrency int) {
r.Stop()
r.done = make(chan struct{})

r.wg.Add(1)
go r.start(ctx, r.lastReturnedOffset+1, concurrency)
}

// PollFetches implements fetcher
func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, context.Context) {
waitStartTime := time.Now()
Expand Down
Loading

0 comments on commit 9390a96

Please sign in to comment.