Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka replay speed: add bytes limit for inflight fetch requests #9892

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
* [ENHANCEMENT] Compactor: refresh deletion marks when updating the bucket index concurrently. This speeds up updating the bucket index by up to 16 times when there is a lot of blocks churn (thousands of blocks churning every cleanup cycle). #9881
* [ENHANCEMENT] PromQL: make `sort_by_label` stable. #9879
* [ENHANCEMENT] Distributor: Initialize ha_tracker cache before ha_tracker and distributor reach running state and begin serving writes. #9826
* [ENHANCEMENT] Ingester: `-ingest-storage.kafka.max-buffered-bytes` to limit the memory for buffered records when using concurrent fetching. #9892
* [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
* [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
* [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6753,6 +6753,16 @@
"fieldFlag": "ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes",
"fieldType": "boolean"
},
{
"kind": "field",
"name": "max_buffered_bytes",
"required": false,
"desc": "The maximum number of buffered records ready to be processed. This limit applies to the sum of all inflight requests. Set to 0 to disable the limit.",
"fieldValue": null,
"fieldDefaultValue": 100000000,
"fieldFlag": "ingest-storage.kafka.max-buffered-bytes",
"fieldType": "int"
},
{
"kind": "field",
"name": "ingestion_concurrency_max",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,8 @@ Usage of ./cmd/mimir/mimir:
How frequently to poll the last produced offset, used to enforce strong read consistency. (default 1s)
-ingest-storage.kafka.last-produced-offset-retry-timeout duration
How long to retry a failed request to get the last produced offset. (default 10s)
-ingest-storage.kafka.max-buffered-bytes int
The maximum number of buffered records ready to be processed. This limit applies to the sum of all inflight requests. Set to 0 to disable the limit. (default 100000000)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ Usage of ./cmd/mimir/mimir:
How frequently to poll the last produced offset, used to enforce strong read consistency. (default 1s)
-ingest-storage.kafka.last-produced-offset-retry-timeout duration
How long to retry a failed request to get the last produced offset. (default 10s)
-ingest-storage.kafka.max-buffered-bytes int
The maximum number of buffered records ready to be processed. This limit applies to the sum of all inflight requests. Set to 0 to disable the limit. (default 100000000)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3924,6 +3924,11 @@ kafka:
# CLI flag: -ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes
[use_compressed_bytes_as_fetch_max_bytes: <boolean> | default = true]

# The maximum number of buffered records ready to be processed. This limit
# applies to the sum of all inflight requests. Set to 0 to disable the limit.
# CLI flag: -ingest-storage.kafka.max-buffered-bytes
[max_buffered_bytes: <int> | default = 100000000]

# The maximum number of concurrent ingestion streams to the TSDB head. Every
# tenant has their own set of streams. 0 to disable.
# CLI flag: -ingest-storage.kafka.ingestion-concurrency-max
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"flag"
"fmt"
"math"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -106,6 +107,7 @@ type KafkaConfig struct {
OngoingFetchConcurrency int `yaml:"ongoing_fetch_concurrency"`
OngoingRecordsPerFetch int `yaml:"ongoing_records_per_fetch"`
UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"`
MaxBufferedBytes int `yaml:"max_buffered_bytes"`

IngestionConcurrencyMax int `yaml:"ingestion_concurrency_max"`
IngestionConcurrencyBatchSize int `yaml:"ingestion_concurrency_batch_size"`
Expand Down Expand Up @@ -168,6 +170,7 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. 0 to disable.")
f.IntVar(&cfg.OngoingRecordsPerFetch, prefix+".ongoing-records-per-fetch", 30, "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on "+prefix+".ongoing-fetch-concurrency being greater than 0.")
f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.")
f.IntVar(&cfg.MaxBufferedBytes, prefix+".max-buffered-bytes", 100_000_000, "The maximum number of buffered records ready to be processed. This limit applies to the sum of all inflight requests. Set to 0 to disable the limit.")

f.IntVar(&cfg.IngestionConcurrencyMax, prefix+".ingestion-concurrency-max", 0, "The maximum number of concurrent ingestion streams to the TSDB head. Every tenant has their own set of streams. 0 to disable.")
f.IntVar(&cfg.IngestionConcurrencyBatchSize, prefix+".ingestion-concurrency-batch-size", 150, "The number of timeseries to batch together before ingesting to the TSDB head. Only use this setting when -ingest-storage.kafka.ingestion-concurrency-max is greater than 0.")
Expand Down Expand Up @@ -221,6 +224,10 @@ func (cfg *KafkaConfig) Validate() error {
return fmt.Errorf("ingest-storage.kafka.startup-records-per-fetch and ingest-storage.kafka.ongoing-records-per-fetch must be greater than 0")
}

if cfg.MaxBufferedBytes >= math.MaxInt32 {
return fmt.Errorf("ingest-storage.kafka.max-buffered-bytes must be less than %d", math.MaxInt32)
}

if (cfg.SASLUsername == "") != (cfg.SASLPassword.String() == "") {
return ErrInconsistentSASLCredentials
}
Expand Down
134 changes: 93 additions & 41 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type fetcher interface {

// BufferedRecords returns the number of records that have been fetched but not yet consumed.
BufferedRecords() int64

// BufferedBytes returns the number of bytes that have been fetched but not yet consumed.
BufferedBytes() int64

// BytesPerRecord returns the current estimation for how many bytes each record is.
BytesPerRecord() int64
}

// fetchWant represents a range of offsets to fetch.
Expand Down Expand Up @@ -232,9 +238,12 @@ type concurrentFetchers struct {
startOffsets *genericOffsetReader[int64]

// trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes.
trackCompressedBytes bool
trackCompressedBytes bool
maxBufferedBytesLimit int32

bufferedFetchedRecords *atomic.Int64
bufferedFetchedRecords *atomic.Int64
bufferedFetchedBytes *atomic.Int64
estimatedBytesPerRecord *atomic.Int64
}

// newConcurrentFetchers creates a new concurrentFetchers. startOffset can be kafkaOffsetStart, kafkaOffsetEnd or a specific offset.
Expand All @@ -247,6 +256,7 @@ func newConcurrentFetchers(
startOffset int64,
concurrency int,
recordsPerFetch int,
maxBufferedBytesLimit int32,
trackCompressedBytes bool,
minBytesWaitTime time.Duration,
offsetReader *partitionOffsetClient,
Expand All @@ -267,20 +277,27 @@ func newConcurrentFetchers(
if err != nil {
return nil, fmt.Errorf("resolving offset to start consuming from: %w", err)
}

if maxBufferedBytesLimit <= 0 {
maxBufferedBytesLimit = math.MaxInt32
}
f := &concurrentFetchers{
bufferedFetchedRecords: atomic.NewInt64(0),
client: client,
logger: logger,
topicName: topic,
partitionID: partition,
metrics: metrics,
minBytesWaitTime: minBytesWaitTime,
lastReturnedRecord: startOffset - 1,
startOffsets: startOffsetsReader,
trackCompressedBytes: trackCompressedBytes,
tracer: recordsTracer(),
orderedFetches: make(chan fetchResult),
done: make(chan struct{}),
bufferedFetchedRecords: atomic.NewInt64(0),
bufferedFetchedBytes: atomic.NewInt64(0),
estimatedBytesPerRecord: atomic.NewInt64(0),
client: client,
logger: logger,
topicName: topic,
partitionID: partition,
metrics: metrics,
minBytesWaitTime: minBytesWaitTime,
lastReturnedRecord: startOffset - 1,
startOffsets: startOffsetsReader,
trackCompressedBytes: trackCompressedBytes,
maxBufferedBytesLimit: maxBufferedBytesLimit,
tracer: recordsTracer(),
orderedFetches: make(chan fetchResult),
done: make(chan struct{}),
}

topics, err := kadm.NewClient(client).ListTopics(ctx, topic)
Expand All @@ -306,6 +323,15 @@ func (r *concurrentFetchers) BufferedRecords() int64 {
return r.bufferedFetchedRecords.Load()
}

// BufferedBytes implements fetcher.
func (r *concurrentFetchers) BufferedBytes() int64 {
return r.bufferedFetchedBytes.Load()
}

func (r *concurrentFetchers) BytesPerRecord() int64 {
return r.estimatedBytesPerRecord.Load()
}

// Stop implements fetcher.
func (r *concurrentFetchers) Stop() {
// Ensure it's not already stopped.
Expand All @@ -323,6 +349,7 @@ func (r *concurrentFetchers) Stop() {
// When the fetcher is stopped, buffered records are intentionally dropped. For this reason,
// we do reset the counter of buffered records here.
r.bufferedFetchedRecords.Store(0)
r.bufferedFetchedBytes.Store(0)

level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
}
Expand Down Expand Up @@ -610,11 +637,48 @@ func casHWM(highWwatermark *atomic.Int64, newHWM int64) {
}
}

type inflightFetchWants struct {
// wants is the list of all fetchResult of all inflight fetch operations. Pending results
// are ordered in the same order these results should be returned to PollFetches(), so the first one
// in the list is the next one that should be returned.
wants list.List

// bytes is the sum of the MaxBytes of all fetchWants that are currently inflight.
bytes *atomic.Int64
}

// peekNextResult is the channel where we expect a worker will write the result of the next fetch
// operation. This result is the next result that will be returned to PollFetches(), guaranteeing
// records ordering. The channel can be closed. In this case you are expected to call removeNextResult.
func (w *inflightFetchWants) peekNextResult() chan fetchResult {
if w.wants.Len() == 0 {
return nil
}
return w.wants.Front().Value.(fetchWant).result
}

func (w *inflightFetchWants) count() int {
return w.wants.Len()
}

func (w *inflightFetchWants) append(nextFetch fetchWant) {
w.bytes.Add(int64(nextFetch.MaxBytes()))
w.wants.PushBack(nextFetch)
}

func (w *inflightFetchWants) removeNextResult() {
head := w.wants.Front()
// The MaxBytes of the fetchWant might have changed as it was being fetched (e.g. UpdateBytesPerRecord).
// But we don't care about that here because we're only interested in the MaxBytes when the fetchWant was added to the inflight fetchWants.
w.bytes.Sub(int64(head.Value.(fetchWant).MaxBytes()))
w.wants.Remove(head)
}

func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concurrency, recordsPerFetch int) {
level.Info(r.logger).Log("msg", "starting concurrent fetchers", "start_offset", startOffset, "concurrency", concurrency, "recordsPerFetch", recordsPerFetch)

// HWM is updated by the fetchers. A value of 0 is the same as there not being any produced records.
// A value of 0 doesn't prevent progress because we ensure there is at least one dispatched fetchWant.
// A value of 0 doesn't prevent progress because we ensure there is at least one inflight fetchWant.
highWatermark := atomic.NewInt64(0)

wants := make(chan fetchWant)
Expand All @@ -633,17 +697,8 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
// It contains the offset range to fetch and a channel where the result should be written to.
nextFetch = fetchWantFrom(startOffset, recordsPerFetch)

// nextResult is the channel where we expect a worker will write the result of the next fetch
// operation. This result is the next result that will be returned to PollFetches(), guaranteeing
// records ordering.
nextResult chan fetchResult

// pendingResults is the list of all fetchResult of all inflight fetch operations. Pending results
// are ordered in the same order these results should be returned to PollFetches(), so the first one
// in the list is the next one that should be returned, unless nextResult is valued (if nextResult
// is valued, then nextResult is the next and the first item in the pendingResults list is the
// 2nd next one).
pendingResults = list.New()
// inflight is the list of all fetchWants that are currently in flight.
inflight = inflightFetchWants{bytes: r.bufferedFetchedBytes}

// bufferedResult is the next fetch that should be polled by PollFetches().
bufferedResult fetchResult
Expand All @@ -654,23 +709,30 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
// are some ordered buffered records ready.
//
// It is guaranteed that this channel is non-nil when bufferedResult is non-empty.
//
// The idea is that we don't want to block the main loop when we have records ready to be consumed.
readyBufferedResults chan fetchResult
)
nextFetch.bytesPerRecord = 10_000 // start with an estimation, we will update it as we consume

for {
refillBufferedResult := nextResult
// refillBufferedResult is the channel of the next fetch result. This variable is valued (non-nil) only when
// we're ready to actually read the result, so that we don't try to read the next result if we're not ready.
refillBufferedResult := inflight.peekNextResult()
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
if readyBufferedResults != nil {
// We have a single result that's still not consumed.
// So we don't try to get new results from the fetchers.
refillBufferedResult = nil
}
dispatchNextWant := chan fetchWant(nil)
if nextResult == nil || nextFetch.startOffset <= highWatermark.Load() {
wouldExceedInflightBytesLimit := inflight.bytes.Load()+int64(nextFetch.MaxBytes()) > int64(r.maxBufferedBytesLimit)
if inflight.count() == 0 || (!wouldExceedInflightBytesLimit && nextFetch.startOffset <= highWatermark.Load()) {
// In Warpstream fetching past the end induced more delays than MinBytesWaitTime.
// So we dispatch a fetch only if it's fetching an existing offset.
// This shouldn't noticeably affect performance with Apache Kafka, after all franz-go only has a concurrency of 1 per partition.
//
// We also don't want to have too many fetches in flight, so we only dispatch a fetch if it wouldn't exceed the memory limit.
//
// At the same time we don't want to reach a deadlock where the HWM is not updated and there are no fetches in flight.
// When there isn't a fetch in flight the HWM will never be updated, we will dispatch the next fetchWant even if that means it's above the HWM.
dispatchNextWant = wants
Expand All @@ -682,22 +744,12 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
return

case dispatchNextWant <- nextFetch:
pendingResults.PushBack(nextFetch.result)
if nextResult == nil {
// In case we previously exhausted pendingResults, we just created
nextResult = pendingResults.Front().Value.(chan fetchResult)
pendingResults.Remove(pendingResults.Front())
}
inflight.append(nextFetch)
nextFetch = nextFetch.Next(recordsPerFetch)

case result, moreLeft := <-refillBufferedResult:
if !moreLeft {
if pendingResults.Len() > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've lost this if check (removeNextResult assumes the list is non empty). Is it a problem?

To my understanding it's not a problem because, with the new logic, we have the guarantee that if refillBufferedResult is valued that there's at least 1 item in the list (the fetch we're currently reading from, which is set to refillBufferedResult itself). Is my understanding correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's right. Now instead of keeping nextResult as state we compute it on every iteration. The invariants from before still hold

nextResult = pendingResults.Front().Value.(chan fetchResult)
pendingResults.Remove(pendingResults.Front())
} else {
nextResult = nil
}
inflight.removeNextResult()
continue
}
nextFetch = nextFetch.UpdateBytesPerRecord(result.fetchedBytes, len(result.Records))
Expand Down
Loading
Loading