Skip to content

Commit

Permalink
remove uncompressed limit
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Jul 3, 2024
1 parent f3716cf commit 40c6a2e
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 124 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* `-ingest-storage.read-consistency`: configures the default read consistency.
* `-ingest-storage.migration.distributor-send-to-ingesters-enabled`: enabled tee-ing writes to classic ingesters and Kafka, used during a live migration to the new ingest storage architecture.
* `-ingester.partition-ring.*`: configures partitions ring backend.
* [ENHANCEMENT] Distributor: Add `max_otel_compressed_recv_msg_size` and `max_otel_uncompressed_recv_msg_size` to limit the otel compressed/uncompressed write request byte size. #8574
* [ENHANCEMENT] Distributor: Add `max_otel_decompressed_recv_msg_size` to limit the otel decompressed write request byte size. #8574
* [ENHANCEMENT] Compactor: Add `cortex_compactor_compaction_job_duration_seconds` and `cortex_compactor_compaction_job_blocks` histogram metrics to track duration of individual compaction jobs and number of blocks per job. #8371
* [ENHANCEMENT] Rules: Added per namespace max rules per rule group limit. The maximum number of rules per rule groups for all namespaces continues to be configured by `-ruler.max-rules-per-rule-group`, but now, this can be superseded by the new `-ruler.max-rules-per-rule-group-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8378
* [ENHANCEMENT] Rules: Added per namespace max rule groups per tenant limit. The maximum number of rule groups per rule tenant for all namespaces continues to be configured by `-ruler.max-rule-groups-per-tenant`, but now, this can be superseded by the new `-ruler.max-rule-groups-per-tenant-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8425
Expand Down
17 changes: 3 additions & 14 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1199,23 +1199,12 @@
},
{
"kind": "field",
"name": "max_otel_compressed_recv_msg_size",
"name": "max_otel_decompressed_recv_msg_size",
"required": false,
"desc": "Max message size in bytes that the distributors will accept for incoming compressed write requests to the otel API. If exceeded, the request will be rejected.",
"fieldValue": null,
"fieldDefaultValue": 1048576,
"fieldFlag": "distributor.max-otel-compressed-recv-msg-size",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_otel_uncompressed_recv_msg_size",
"required": false,
"desc": "Max message size in bytes that the distributors will accept for incoming uncompressed write requests to the otel API. If exceeded, the request will be rejected.",
"desc": "Maximum message size in bytes that the distributors will accept for incoming decompressed write requests to the OTEL API. Requests exceeding this limit will be rejected.",
"fieldValue": null,
"fieldDefaultValue": 10485760,
"fieldFlag": "distributor.max-otel-uncompressed-recv-msg-size",
"fieldFlag": "distributor.max-otel-decompressed-recv-msg-size",
"fieldType": "int",
"fieldCategory": "experimental"
},
Expand Down
6 changes: 2 additions & 4 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1199,10 +1199,8 @@ Usage of ./cmd/mimir/mimir:
[deprecated] When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed. (default true)
-distributor.max-exemplars-per-series-per-request int
[experimental] Maximum number of exemplars per series per request. 0 to disable limit in request. The exceeding exemplars are dropped.
-distributor.max-otel-compressed-recv-msg-size int
[experimental] Max message size in bytes that the distributors will accept for incoming compressed write requests to the otel API. If exceeded, the request will be rejected. (default 1048576)
-distributor.max-otel-uncompressed-recv-msg-size int
[experimental] Max message size in bytes that the distributors will accept for incoming uncompressed write requests to the otel API. If exceeded, the request will be rejected. (default 10485760)
-distributor.max-otel-decompressed-recv-msg-size int
[experimental] Maximum message size in bytes that the distributors will accept for incoming decompressed write requests to the OTEL API. Requests exceeding this limit will be rejected. (default 10485760)
-distributor.max-recv-msg-size int
Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected. (default 104857600)
-distributor.max-request-pool-buffer-size int
Expand Down
3 changes: 1 addition & 2 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ The following features are currently experimental:
- Limit exemplars per series per request
- `-distributor.max-exemplars-per-series-per-request`
- Limit otel write request byte size
- `max_otel_compressed_recv_msg_size`
- `max_otel_uncompressed_recv_msg_size`
- `max_otel_decompressed_recv_msg_size`
- Enforce a maximum pool buffer size for write requests
- `-distributor.max-request-pool-buffer-size`
- Enable direct translation from OTLP write requests to Mimir equivalents
Expand Down
16 changes: 5 additions & 11 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -846,17 +846,11 @@ ha_tracker:
# CLI flag: -distributor.max-recv-msg-size
[max_recv_msg_size: <int> | default = 104857600]
# (experimental) Max message size in bytes that the distributors will accept for
# incoming compressed write requests to the otel API. If exceeded, the request
# will be rejected.
# CLI flag: -distributor.max-otel-compressed-recv-msg-size
[max_otel_compressed_recv_msg_size: <int> | default = 1048576]
# (experimental) Max message size in bytes that the distributors will accept for
# incoming uncompressed write requests to the otel API. If exceeded, the request
# will be rejected.
# CLI flag: -distributor.max-otel-uncompressed-recv-msg-size
[max_otel_uncompressed_recv_msg_size: <int> | default = 10485760]
# (experimental) Maximum message size in bytes that the distributors will accept
# for incoming decompressed write requests to the OTEL API. Requests exceeding
# this limit will be rejected.
# CLI flag: -distributor.max-otel-decompressed-recv-msg-size
[max_otel_decompressed_recv_msg_size: <int> | default = 10485760]
# (experimental) Max size of the pooled buffers used for marshaling write
# requests. If 0, no max size is enforced.
Expand Down
14 changes: 14 additions & 0 deletions docs/sources/mimir/manage/mimir-runbooks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2236,6 +2236,20 @@ How to **fix** it:

- Increase the allowed limit by using the `-distributor.max-recv-msg-size` option.

### err-mimir-distributor-max-otel-decompressed-write-message-size

This error occurs when a distributor rejects an OTEL write request because its message size is larger than the allowed limit after decompression.

How it **works**:

- The distributor implements an upper limit on the message size of incoming otel write requests after decompression regardless compression type. See more otlp collector [compression details](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configgrpc/README.md#client-configuration).
- To configure the limit, set the `-distributor.max_otel_decompressed_recv_msg_size`

How to **fix** it:

- Increase the allowed limit by using the `-distributor.max_otel_decompressed_recv_msg_size` option.
- If you use batch processor in otlp collector, decrease the max batch size `send_batch_max_size` See [details](https://github.com/open-telemetry/opentelemetry-collector/blob/main/processor/batchprocessor/README.md)

### err-mimir-distributor-max-write-request-data-item-size

This error can only be returned when the experimental ingest storage is enabled and is caused by a write request containing a timeseries or metadata entry which is larger than the allowed limit.
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxOtelCompressedRecvMsgSize, pushConfig.MaxOtelUncompressedRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger, pushConfig.DirectOTLPTranslationEnabled), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxOtelDecompressedRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger, pushConfig.DirectOTLPTranslationEnabled), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand Down
6 changes: 2 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ type Config struct {
HATrackerConfig HATrackerConfig `yaml:"ha_tracker"`

MaxRecvMsgSize int `yaml:"max_recv_msg_size" category:"advanced"`
MaxOtelCompressedRecvMsgSize int `yaml:"max_otel_compressed_recv_msg_size" category:"experimental"`
MaxOtelUncompressedRecvMsgSize int `yaml:"max_otel_uncompressed_recv_msg_size" category:"experimental"`
MaxOtelDecompressedRecvMsgSize int `yaml:"max_otel_decompressed_recv_msg_size" category:"experimental"`
MaxRequestPoolBufferSize int `yaml:"max_request_pool_buffer_size" category:"experimental"`
RemoteTimeout time.Duration `yaml:"remote_timeout" category:"advanced"`

Expand Down Expand Up @@ -239,8 +238,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
cfg.RetryConfig.RegisterFlags(f)

f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected.")
f.IntVar(&cfg.MaxOtelCompressedRecvMsgSize, "distributor.max-otel-compressed-recv-msg-size", 1<<20, "Max message size in bytes that the distributors will accept for incoming compressed write requests to the otel API. If exceeded, the request will be rejected.")
f.IntVar(&cfg.MaxOtelUncompressedRecvMsgSize, "distributor.max-otel-uncompressed-recv-msg-size", 10<<20, "Max message size in bytes that the distributors will accept for incoming uncompressed write requests to the otel API. If exceeded, the request will be rejected.")
f.IntVar(&cfg.MaxOtelDecompressedRecvMsgSize, "distributor.max-otel-decompressed-recv-msg-size", 10<<20, "Maximum message size in bytes that the distributors will accept for incoming decompressed write requests to the OTEL API. Requests exceeding this limit will be rejected.")
f.IntVar(&cfg.MaxRequestPoolBufferSize, "distributor.max-request-pool-buffer-size", 0, "Max size of the pooled buffers used for marshaling write requests. If 0, no max size is enforced.")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.")
Expand Down
47 changes: 19 additions & 28 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ type OTLPHandlerLimits interface {

// OTLPHandler is an http.Handler accepting OTLP write requests.
func OTLPHandler(
maxOtelCompressedRecvMsgSize int,
maxOtelUncompressedRecvMsgSize int,
maxOtelDecompressedRecvMsgSize int,
requestBufferPool util.Pool,
sourceIPs *middleware.SourceIPExtractor,
enableOtelMetadataStorage bool,
Expand All @@ -69,32 +68,15 @@ func OTLPHandler(
) http.Handler {
discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError)

return otlpHandler(maxOtelUncompressedRecvMsgSize, requestBufferPool, sourceIPs, retryCfg, push, logger, func(ctx context.Context, r *http.Request, maxOtelUncompressedRecvMsgSize int, buffers *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) error {
return otlpHandler(maxOtelDecompressedRecvMsgSize, requestBufferPool, sourceIPs, retryCfg, push, logger, func(ctx context.Context, r *http.Request, maxOtelDecompressedRecvMsgSize int, buffers *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) error {
contentType := r.Header.Get("Content-Type")
contentEncoding := r.Header.Get("Content-Encoding")
var compression util.CompressionType
switch contentEncoding {
case "gzip":
compression = util.Gzip
// If requet is compressed, check against compressed message size limit,
// and from now on, all limit checks would be against uncompressed limit.
if r.ContentLength > int64(maxOtelCompressedRecvMsgSize) {
return httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxWriteMessageSizeErr{
actual: int(r.ContentLength),
limit: maxOtelCompressedRecvMsgSize,
id: globalerror.DistributorMaxOtelCompressedMessageSize,
}.Error())
}
case "":
compression = util.NoCompression
// If request is uncompressed, check against uncompressed message size limit.
if r.ContentLength > int64(maxOtelUncompressedRecvMsgSize) {
return httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxWriteMessageSizeErr{
actual: int(r.ContentLength),
limit: maxOtelUncompressedRecvMsgSize,
id: globalerror.DistributorMaxOtelUncompressedMessageSize,
}.Error())
}
default:
return httpgrpc.Errorf(http.StatusUnsupportedMediaType, "unsupported compression: %s. Only \"gzip\" or no compression supported", contentEncoding)
}
Expand All @@ -107,13 +89,13 @@ func OTLPHandler(
unmarshaler := otlpProtoUnmarshaler{
request: &exportReq,
}
protoBodySize, err := util.ParseProtoReader(ctx, reader, int(r.ContentLength), maxOtelUncompressedRecvMsgSize, buffers, unmarshaler, compression)
protoBodySize, err := util.ParseProtoReader(ctx, reader, int(r.ContentLength), maxOtelDecompressedRecvMsgSize, buffers, unmarshaler, compression)
var tooLargeErr util.MsgSizeTooLargeErr
if errors.As(err, &tooLargeErr) {
return exportReq, 0, httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxWriteMessageSizeErr{
actual: tooLargeErr.Actual,
limit: tooLargeErr.Limit,
id: globalerror.DistributorMaxOtelUncompressedMessageSize,
id: globalerror.DistributorMaxOtelDecompressedWriteMessageSize,
}.Error())
}
return exportReq, protoBodySize, err
Expand All @@ -136,13 +118,13 @@ func OTLPHandler(
}
}

reader = http.MaxBytesReader(nil, reader, int64(maxOtelUncompressedRecvMsgSize))
reader = http.MaxBytesReader(nil, reader, int64(maxOtelDecompressedRecvMsgSize))
if _, err := buf.ReadFrom(reader); err != nil {
if util.IsRequestBodyTooLarge(err) {
return exportReq, 0, httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxWriteMessageSizeErr{
actual: -1,
limit: maxOtelUncompressedRecvMsgSize,
id: globalerror.DistributorMaxOtelUncompressedMessageSize,
limit: maxOtelDecompressedRecvMsgSize,
id: globalerror.DistributorMaxOtelDecompressedWriteMessageSize,
}.Error())
}

Expand All @@ -156,14 +138,23 @@ func OTLPHandler(
return httpgrpc.Errorf(http.StatusUnsupportedMediaType, "unsupported content type: %s, supported: [%s, %s]", contentType, jsonContentType, pbContentType)
}

// Check the request size against the uncompressed message size limit, regardless of whether the request is compressed.
// If the request is compressed and its compressed length already exceeds the size limit, there's no need to decompress it.
if r.ContentLength > int64(maxOtelDecompressedRecvMsgSize) {
return httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxWriteMessageSizeErr{
actual: int(r.ContentLength),
limit: maxOtelDecompressedRecvMsgSize,
id: globalerror.DistributorMaxOtelDecompressedWriteMessageSize,
}.Error())
}

spanLogger, ctx := spanlogger.NewWithLogger(ctx, logger, "Distributor.OTLPHandler.decodeAndConvert")
defer spanLogger.Span.Finish()

spanLogger.SetTag("content_type", contentType)
spanLogger.SetTag("content_encoding", contentEncoding)
spanLogger.SetTag("content_length", r.ContentLength)

// decoderFunc would return error when uncompressedBodySize is surpassing uncompressed receive message size limit
otlpReq, uncompressedBodySize, err := decoderFunc(r.Body)
if err != nil {
return err
Expand Down Expand Up @@ -224,7 +215,7 @@ func OTLPHandler(
}

func otlpHandler(
maxOtelUncompressedRecvMsgSize int,
maxOtelDecompressedRecvMsgSize int,
requestBufferPool util.Pool,
sourceIPs *middleware.SourceIPExtractor,
retryCfg RetryConfig,
Expand All @@ -244,7 +235,7 @@ func otlpHandler(
supplier := func() (*mimirpb.WriteRequest, func(), error) {
rb := util.NewRequestBuffers(requestBufferPool)
var req mimirpb.PreallocWriteRequest
if err := parser(ctx, r, maxOtelUncompressedRecvMsgSize, rb, &req, logger); err != nil {
if err := parser(ctx, r, maxOtelDecompressedRecvMsgSize, rb, &req, logger); err != nil {
// Check for httpgrpc error, default to client error if parsing failed
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
err = httpgrpc.Errorf(http.StatusBadRequest, err.Error())
Expand Down
Loading

0 comments on commit 40c6a2e

Please sign in to comment.