Skip to content

Commit

Permalink
Distributor: Add limits for Otel write request byte size (#8574)
Browse files Browse the repository at this point in the history
* Distributor: Add limits for otel write request byte size

* remove uncompressed limit

* Addresss comments

* Adddress comments on doc

* Update CHANGELOG.md

Co-authored-by: Peter Štibraný <pstibrany@gmail.com>

* Update pkg/distributor/push.go

Co-authored-by: Peter Štibraný <pstibrany@gmail.com>

* update doc and set new default

* Update CHANGELOG.md

Co-authored-by: Peter Štibraný <pstibrany@gmail.com>

* Update docs/sources/mimir/configure/about-versioning.md

Co-authored-by: Peter Štibraný <pstibrany@gmail.com>

* Update docs/sources/mimir/manage/mimir-runbooks/_index.md

Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>

* Update docs/sources/mimir/manage/mimir-runbooks/_index.md

Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>

* fix unittest

---------

Co-authored-by: Peter Štibraný <pstibrany@gmail.com>
Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
3 people authored Jul 4, 2024
1 parent 827833a commit e7ab906
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* `-ingest-storage.read-consistency`: configures the default read consistency.
* `-ingest-storage.migration.distributor-send-to-ingesters-enabled`: enabled tee-ing writes to classic ingesters and Kafka, used during a live migration to the new ingest storage architecture.
* `-ingester.partition-ring.*`: configures partitions ring backend.
* [CHANGE] Distributor: Incoming OTLP requests were previously size-limited by using limit from `-distributor.max-recv-msg-size` option. We have added option `-distributor.max-otlp-request-size` for limiting OTLP requests, with default value of 100 MiB. #8574
* [ENHANCEMENT] Compactor: Add `cortex_compactor_compaction_job_duration_seconds` and `cortex_compactor_compaction_job_blocks` histogram metrics to track duration of individual compaction jobs and number of blocks per job. #8371
* [ENHANCEMENT] Rules: Added per namespace max rules per rule group limit. The maximum number of rules per rule groups for all namespaces continues to be configured by `-ruler.max-rules-per-rule-group`, but now, this can be superseded by the new `-ruler.max-rules-per-rule-group-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8378
* [ENHANCEMENT] Rules: Added per namespace max rule groups per tenant limit. The maximum number of rule groups per rule tenant for all namespaces continues to be configured by `-ruler.max-rule-groups-per-tenant`, but now, this can be superseded by the new `-ruler.max-rule-groups-per-tenant-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8425
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,17 @@
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "max_otlp_request_size",
"required": false,
"desc": "Maximum OTLP request size in bytes that the distributors accept. Requests exceeding this limit are rejected.",
"fieldValue": null,
"fieldDefaultValue": 104857600,
"fieldFlag": "distributor.max-otlp-request-size",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_request_pool_buffer_size",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,8 @@ Usage of ./cmd/mimir/mimir:
[deprecated] When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed. (default true)
-distributor.max-exemplars-per-series-per-request int
[experimental] Maximum number of exemplars per series per request. 0 to disable limit in request. The exceeding exemplars are dropped.
-distributor.max-otlp-request-size int
[experimental] Maximum OTLP request size in bytes that the distributors accept. Requests exceeding this limit are rejected. (default 104857600)
-distributor.max-recv-msg-size int
Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected. (default 104857600)
-distributor.max-request-pool-buffer-size int
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ The following features are currently experimental:
- `-distributor.retry-after-header.max-backoff-exponent`
- Limit exemplars per series per request
- `-distributor.max-exemplars-per-series-per-request`
- Limit OTLP write request byte size
- `-distributor.max-otlp-request-size`
- Enforce a maximum pool buffer size for write requests
- `-distributor.max-request-pool-buffer-size`
- Enable direct translation from OTLP write requests to Mimir equivalents
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,11 @@ ha_tracker:
# CLI flag: -distributor.max-recv-msg-size
[max_recv_msg_size: <int> | default = 104857600]
# (experimental) Maximum OTLP request size in bytes that the distributors
# accept. Requests exceeding this limit are rejected.
# CLI flag: -distributor.max-otlp-request-size
[max_otlp_request_size: <int> | default = 104857600]
# (experimental) Max size of the pooled buffers used for marshaling write
# requests. If 0, no max size is enforced.
# CLI flag: -distributor.max-request-pool-buffer-size
Expand Down
14 changes: 14 additions & 0 deletions docs/sources/mimir/manage/mimir-runbooks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2238,6 +2238,20 @@ How to **fix** it:

- Increase the allowed limit by using the `-distributor.max-recv-msg-size` option.

### err-mimir-distributor-max-otlp-request-size

This error occurs when a distributor rejects an OTel write request because its message size is larger than the allowed limit before or after decompression.

How it **works**:

- The distributor implements an upper limit on the message size of incoming OTel write requests before and after decompression regardless of the compression type. Refer to [OTLP collector compression details](https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp) for more information.
- Configure this limit in the `-distributor.max-otlp-request-size` setting.

How to **fix** it:

- If you use the batch processor in the OTLP collector, decrease the maximum batch size in the `send_batch_max_size` setting. Refer to [Batch Collector](https://github.com/open-telemetry/opentelemetry-collector/blob/main/processor/batchprocessor/README.md) for details.
- Increase the allowed limit in the `-distributor.max-otlp-request-size` setting.

### err-mimir-distributor-max-write-request-data-item-size

This error can only be returned when the experimental ingest storage is enabled and is caused by a write request containing a timeseries or metadata entry which is larger than the allowed limit.
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger, pushConfig.DirectOTLPTranslationEnabled), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger, pushConfig.DirectOTLPTranslationEnabled), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ const (
// metaLabelTenantID is the name of the metric_relabel_configs label with tenant ID.
metaLabelTenantID = model.MetaLabelPrefix + "tenant_id"

maxOTLPRequestSizeFlag = "distributor.max-otlp-request-size"

instanceIngestionRateTickInterval = time.Second

// Size of "slab" when using pooled buffers for marshaling write requests. When handling single Push request
Expand Down Expand Up @@ -186,6 +188,7 @@ type Config struct {
HATrackerConfig HATrackerConfig `yaml:"ha_tracker"`

MaxRecvMsgSize int `yaml:"max_recv_msg_size" category:"advanced"`
MaxOTLPRequestSize int `yaml:"max_otlp_request_size" category:"experimental"`
MaxRequestPoolBufferSize int `yaml:"max_request_pool_buffer_size" category:"experimental"`
RemoteTimeout time.Duration `yaml:"remote_timeout" category:"advanced"`

Expand Down Expand Up @@ -237,6 +240,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
cfg.RetryConfig.RegisterFlags(f)

f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected.")
f.IntVar(&cfg.MaxOTLPRequestSize, maxOTLPRequestSizeFlag, 100<<20, "Maximum OTLP request size in bytes that the distributors accept. Requests exceeding this limit are rejected.")
f.IntVar(&cfg.MaxRequestPoolBufferSize, "distributor.max-request-pool-buffer-size", 0, "Max size of the pooled buffers used for marshaling write requests. If 0, no max size is enforced.")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.")
Expand Down
8 changes: 5 additions & 3 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func OTLPHandler(
protoBodySize, err := util.ParseProtoReader(ctx, reader, int(r.ContentLength), maxRecvMsgSize, buffers, unmarshaler, compression)
var tooLargeErr util.MsgSizeTooLargeErr
if errors.As(err, &tooLargeErr) {
return exportReq, 0, httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxWriteMessageSizeErr{
return exportReq, 0, httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxOTLPRequestSizeErr{
actual: tooLargeErr.Actual,
limit: tooLargeErr.Limit,
}.Error())
Expand Down Expand Up @@ -119,7 +119,7 @@ func OTLPHandler(
reader = http.MaxBytesReader(nil, reader, int64(maxRecvMsgSize))
if _, err := buf.ReadFrom(reader); err != nil {
if util.IsRequestBodyTooLarge(err) {
return exportReq, 0, httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxWriteMessageSizeErr{
return exportReq, 0, httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxOTLPRequestSizeErr{
actual: -1,
limit: maxRecvMsgSize,
}.Error())
Expand All @@ -135,8 +135,10 @@ func OTLPHandler(
return httpgrpc.Errorf(http.StatusUnsupportedMediaType, "unsupported content type: %s, supported: [%s, %s]", contentType, jsonContentType, pbContentType)
}

// Check the request size against the message size limit, regardless of whether the request is compressed.
// If the request is compressed and its compressed length already exceeds the size limit, there's no need to decompress it.
if r.ContentLength > int64(maxRecvMsgSize) {
return httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxWriteMessageSizeErr{
return httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxOTLPRequestSizeErr{
actual: int(r.ContentLength),
limit: maxRecvMsgSize,
}.Error())
Expand Down
22 changes: 18 additions & 4 deletions pkg/distributor/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func TestHandlerOTLPPush(t *testing.T) {
enableOtelMetadataStorage: true,
},
{
name: "Write samples. Request too big",
name: "Write samples. No compression, request too big",
compression: false,
maxMsgSize: 30,
series: sampleSeries,
Expand All @@ -296,8 +296,8 @@ func TestHandlerOTLPPush(t *testing.T) {
return err
},
responseCode: http.StatusRequestEntityTooLarge,
errMessage: "the incoming push request has been rejected because its message size of 63 bytes is larger",
expectedLogs: []string{`level=error user=test msg="detected an error while ingesting OTLP metrics request (the request may have been partially ingested)" httpCode=413 err="rpc error: code = Code(413) desc = the incoming push request has been rejected because its message size of 63 bytes is larger than the allowed limit of 30 bytes (err-mimir-distributor-max-write-message-size). To adjust the related limit, configure -distributor.max-recv-msg-size, or contact your service administrator." insight=true`},
errMessage: "the incoming OTLP request has been rejected because its message size of 63 bytes is larger",
expectedLogs: []string{`level=error user=test msg="detected an error while ingesting OTLP metrics request (the request may have been partially ingested)" httpCode=413 err="rpc error: code = Code(413) desc = the incoming OTLP request has been rejected because its message size of 63 bytes is larger than the allowed limit of 30 bytes (err-mimir-distributor-max-otlp-request-size). To adjust the related limit, configure -distributor.max-otlp-request-size, or contact your service administrator." insight=true`},
},
{
name: "Write samples. Unsupported compression",
Expand All @@ -313,6 +313,20 @@ func TestHandlerOTLPPush(t *testing.T) {
errMessage: "Only \"gzip\" or no compression supported",
expectedLogs: []string{`level=error user=test msg="detected an error while ingesting OTLP metrics request (the request may have been partially ingested)" httpCode=415 err="rpc error: code = Code(415) desc = unsupported compression: snappy. Only \"gzip\" or no compression supported" insight=true`},
},
{
name: "Write samples. With compression, request too big",
compression: true,
maxMsgSize: 30,
series: sampleSeries,
metadata: sampleMetadata,
verifyFunc: func(_ *testing.T, pushReq *Request) error {
_, err := pushReq.WriteRequest()
return err
},
responseCode: http.StatusRequestEntityTooLarge,
errMessage: "the incoming OTLP request has been rejected because its message size of 78 bytes is larger",
expectedLogs: []string{`level=error user=test msg="detected an error while ingesting OTLP metrics request (the request may have been partially ingested)" httpCode=413 err="rpc error: code = Code(413) desc = the incoming OTLP request has been rejected because its message size of 78 bytes is larger than the allowed limit of 30 bytes (err-mimir-distributor-max-otlp-request-size). To adjust the related limit, configure -distributor.max-otlp-request-size, or contact your service administrator." insight=true`},
},
{
name: "Rate limited request",
maxMsgSize: 100000,
Expand Down Expand Up @@ -571,7 +585,7 @@ func TestHandler_otlpWriteRequestTooBigWithCompression(t *testing.T) {
respStatus := &status.Status{}
err = proto.Unmarshal(body, respStatus)
assert.NoError(t, err)
assert.Contains(t, respStatus.GetMessage(), "the incoming push request has been rejected because its message size is larger than the allowed limit of 140 bytes (err-mimir-distributor-max-write-message-size). To adjust the related limit, configure -distributor.max-recv-msg-size, or contact your service administrator.")
assert.Contains(t, respStatus.GetMessage(), "the incoming OTLP request has been rejected because its message size is larger than the allowed limit of 140 bytes (err-mimir-distributor-max-otlp-request-size). To adjust the related limit, configure -distributor.max-otlp-request-size, or contact your service administrator.")
}

func TestHandler_toOtlpGRPCHTTPStatus(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/distributor/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ func (e distributorMaxWriteMessageSizeErr) Error() string {
return globalerror.DistributorMaxWriteMessageSize.MessageWithPerInstanceLimitConfig(fmt.Sprintf("the incoming push request has been rejected because its message size%s is larger than the allowed limit of %d bytes", msgSizeDesc, e.limit), "distributor.max-recv-msg-size")
}

type distributorMaxOTLPRequestSizeErr struct {
actual, limit int
}

func (e distributorMaxOTLPRequestSizeErr) Error() string {
msgSizeDesc := fmt.Sprintf(" of %d bytes", e.actual)
if e.actual < 0 {
msgSizeDesc = ""
}
return globalerror.DistributorMaxOTLPRequestSize.MessageWithPerInstanceLimitConfig(fmt.Sprintf("the incoming OTLP request has been rejected because its message size%s is larger than the allowed limit of %d bytes", msgSizeDesc, e.limit), maxOTLPRequestSizeFlag)
}

func handler(
maxRecvMsgSize int,
requestBufferPool util.Pool,
Expand Down
1 change: 1 addition & 0 deletions pkg/util/globalerror/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const (
BucketIndexTooOld ID = "bucket-index-too-old"

DistributorMaxWriteMessageSize ID = "distributor-max-write-message-size"
DistributorMaxOTLPRequestSize ID = "distributor-max-otlp-request-size"
DistributorMaxWriteRequestDataItemSize ID = "distributor-max-write-request-data-item-size"

// Map Prometheus TSDB native histogram validation errors to Mimir errors.
Expand Down

0 comments on commit e7ab906

Please sign in to comment.