diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f2f7cdf5dcb..657a1b3ab95 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -200,6 +200,9 @@ type Ingester struct { // Rate of pushed samples. Only used by V2-ingester to limit global samples push rate. ingestionRate *util_math.EwmaRate inflightPushRequests atomic.Int64 + + inflightQueryRequests atomic.Int64 + maxInflightQueryRequests atomic.Int64 } // Shipper interface is used to have an easy way to mock it in tests. @@ -627,7 +630,13 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe logger: logger, ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), } - i.metrics = newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests) + i.metrics = newIngesterMetrics(registerer, + false, + cfg.ActiveSeriesMetricsEnabled, + i.getInstanceLimits, + i.ingestionRate, + &i.inflightPushRequests, + &i.maxInflightQueryRequests) // Replace specific metrics which we can't directly track but we need to read // them from the underlying system (ie. TSDB). @@ -692,7 +701,14 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe TSDBState: newTSDBState(bucketClient, registerer), logger: logger, } - i.metrics = newIngesterMetrics(registerer, false, false, i.getInstanceLimits, nil, &i.inflightPushRequests) + i.metrics = newIngesterMetrics(registerer, + false, + false, + i.getInstanceLimits, + nil, + &i.inflightPushRequests, + &i.maxInflightQueryRequests, + ) i.TSDBState.shipperIngesterID = "flusher" @@ -1250,6 +1266,9 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client return nil, err } + c := i.trackInflightQueryRequest() + defer c() + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1322,6 +1341,9 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery return nil, err } + c := i.trackInflightQueryRequest() + defer c() + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1370,6 +1392,8 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery // LabelValues returns all label values that are associated with a given label name. func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) { + c := i.trackInflightQueryRequest() + defer c() resp, cleanup, err := i.labelsValuesCommon(ctx, req) defer cleanup() return resp, err @@ -1377,6 +1401,8 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque // LabelValuesStream returns all label values that are associated with a given label name. func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) error { + c := i.trackInflightQueryRequest() + defer c() resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req) defer cleanup() @@ -1451,6 +1477,8 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu // LabelNames return all the label names. func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) { + c := i.trackInflightQueryRequest() + defer c() resp, cleanup, err := i.labelNamesCommon(ctx, req) defer cleanup() return resp, err @@ -1458,6 +1486,8 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest // LabelNamesStream return all the label names. func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) error { + c := i.trackInflightQueryRequest() + defer c() resp, cleanup, err := i.labelNamesCommon(stream.Context(), req) defer cleanup() @@ -1741,6 +1771,9 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return err } + c := i.trackInflightQueryRequest() + defer c() + spanlog, ctx := spanlogger.New(stream.Context(), "QueryStream") defer spanlog.Finish() @@ -1786,6 +1819,16 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return nil } +func (i *Ingester) trackInflightQueryRequest() func() { + max := i.inflightQueryRequests.Inc() + if m := i.maxInflightQueryRequests.Load(); max > m { + i.maxInflightQueryRequests.CompareAndSwap(m, max) + } + return func() { + i.inflightQueryRequests.Dec() + } +} + // queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples, totalBatchSizeBytes int, _ error) { q, err := db.ChunkQuerier(from, through) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index fe469e7d767..386d1453eda 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -46,9 +46,17 @@ type ingesterMetrics struct { ingestionRate prometheus.GaugeFunc maxInflightPushRequests prometheus.GaugeFunc inflightRequests prometheus.GaugeFunc + inflightQueryRequests prometheus.GaugeFunc } -func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSDB bool, activeSeriesEnabled bool, instanceLimitsFn func() *InstanceLimits, ingestionRate *util_math.EwmaRate, inflightRequests *atomic.Int64) *ingesterMetrics { +func newIngesterMetrics(r prometheus.Registerer, + createMetricsConflictingWithTSDB bool, + activeSeriesEnabled bool, + instanceLimitsFn func() *InstanceLimits, + ingestionRate *util_math.EwmaRate, + inflightRequests *atomic.Int64, + maxInflightQueryRequests *atomic.Int64, +) *ingesterMetrics { const ( instanceLimits = "cortex_ingester_instance_limits" instanceLimitsHelp = "Instance limits used by this ingester." // Must be same for all registrations. @@ -193,6 +201,18 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSD return 0 }), + inflightQueryRequests: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_ingester_inflight_query_requests", + Help: "Max number of inflight query requests in ingester.", + }, func() float64 { + if maxInflightQueryRequests != nil { + r := maxInflightQueryRequests.Load() + maxInflightQueryRequests.Store(0) + return float64(r) + } + return 0 + }), + // Not registered automatically, but only if activeSeriesEnabled is true. activeSeriesPerUser: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_ingester_active_series", diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index cd688219f42..4f262c8d9ed 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -8,8 +8,134 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + util_math "github.com/cortexproject/cortex/pkg/util/math" ) +func TestIngesterMetrics(t *testing.T) { + mainReg := prometheus.NewPedanticRegistry() + ingestionRate := util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval) + inflightPushRequests := &atomic.Int64{} + maxInflightQueryRequests := &atomic.Int64{} + maxInflightQueryRequests.Store(98) + inflightPushRequests.Store(14) + + m := newIngesterMetrics(mainReg, + false, + true, + func() *InstanceLimits { + return &InstanceLimits{ + MaxIngestionRate: 12, + MaxInMemoryTenants: 1, + MaxInMemorySeries: 11, + MaxInflightPushRequests: 6, + } + }, + ingestionRate, + inflightPushRequests, + maxInflightQueryRequests) + + require.NotNil(t, m) + + err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(` + # HELP cortex_ingester_inflight_push_requests Current number of inflight push requests in ingester. + # TYPE cortex_ingester_inflight_push_requests gauge + cortex_ingester_inflight_push_requests 14 + # HELP cortex_ingester_inflight_query_requests Max number of inflight query requests in ingester. + # TYPE cortex_ingester_inflight_query_requests gauge + cortex_ingester_inflight_query_requests 98 + # HELP cortex_ingester_ingested_exemplars_failures_total The total number of exemplars that errored on ingestion. + # TYPE cortex_ingester_ingested_exemplars_failures_total counter + cortex_ingester_ingested_exemplars_failures_total 0 + # HELP cortex_ingester_ingested_exemplars_total The total number of exemplars ingested. + # TYPE cortex_ingester_ingested_exemplars_total counter + cortex_ingester_ingested_exemplars_total 0 + # HELP cortex_ingester_ingested_metadata_failures_total The total number of metadata that errored on ingestion. + # TYPE cortex_ingester_ingested_metadata_failures_total counter + cortex_ingester_ingested_metadata_failures_total 0 + # HELP cortex_ingester_ingested_metadata_total The total number of metadata ingested. + # TYPE cortex_ingester_ingested_metadata_total counter + cortex_ingester_ingested_metadata_total 0 + # HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion. + # TYPE cortex_ingester_ingested_samples_failures_total counter + cortex_ingester_ingested_samples_failures_total 0 + # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. + # TYPE cortex_ingester_ingested_samples_total counter + cortex_ingester_ingested_samples_total 0 + # HELP cortex_ingester_ingestion_rate_samples_per_second Current ingestion rate in samples/sec that ingester is using to limit access. + # TYPE cortex_ingester_ingestion_rate_samples_per_second gauge + cortex_ingester_ingestion_rate_samples_per_second 0 + # HELP cortex_ingester_instance_limits Instance limits used by this ingester. + # TYPE cortex_ingester_instance_limits gauge + cortex_ingester_instance_limits{limit="max_inflight_push_requests"} 6 + cortex_ingester_instance_limits{limit="max_ingestion_rate"} 12 + cortex_ingester_instance_limits{limit="max_series"} 11 + cortex_ingester_instance_limits{limit="max_tenants"} 1 + # HELP cortex_ingester_memory_metadata The current number of metadata in memory. + # TYPE cortex_ingester_memory_metadata gauge + cortex_ingester_memory_metadata 0 + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 0 + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 0 + # HELP cortex_ingester_queried_chunks The total number of chunks returned from queries. + # TYPE cortex_ingester_queried_chunks histogram + cortex_ingester_queried_chunks_bucket{le="10"} 0 + cortex_ingester_queried_chunks_bucket{le="80"} 0 + cortex_ingester_queried_chunks_bucket{le="640"} 0 + cortex_ingester_queried_chunks_bucket{le="5120"} 0 + cortex_ingester_queried_chunks_bucket{le="40960"} 0 + cortex_ingester_queried_chunks_bucket{le="327680"} 0 + cortex_ingester_queried_chunks_bucket{le="2.62144e+06"} 0 + cortex_ingester_queried_chunks_bucket{le="+Inf"} 0 + cortex_ingester_queried_chunks_sum 0 + cortex_ingester_queried_chunks_count 0 + # HELP cortex_ingester_queried_exemplars The total number of exemplars returned from queries. + # TYPE cortex_ingester_queried_exemplars histogram + cortex_ingester_queried_exemplars_bucket{le="10"} 0 + cortex_ingester_queried_exemplars_bucket{le="50"} 0 + cortex_ingester_queried_exemplars_bucket{le="250"} 0 + cortex_ingester_queried_exemplars_bucket{le="1250"} 0 + cortex_ingester_queried_exemplars_bucket{le="6250"} 0 + cortex_ingester_queried_exemplars_bucket{le="+Inf"} 0 + cortex_ingester_queried_exemplars_sum 0 + cortex_ingester_queried_exemplars_count 0 + # HELP cortex_ingester_queried_samples The total number of samples returned from queries. + # TYPE cortex_ingester_queried_samples histogram + cortex_ingester_queried_samples_bucket{le="10"} 0 + cortex_ingester_queried_samples_bucket{le="80"} 0 + cortex_ingester_queried_samples_bucket{le="640"} 0 + cortex_ingester_queried_samples_bucket{le="5120"} 0 + cortex_ingester_queried_samples_bucket{le="40960"} 0 + cortex_ingester_queried_samples_bucket{le="327680"} 0 + cortex_ingester_queried_samples_bucket{le="2.62144e+06"} 0 + cortex_ingester_queried_samples_bucket{le="2.097152e+07"} 0 + cortex_ingester_queried_samples_bucket{le="+Inf"} 0 + cortex_ingester_queried_samples_sum 0 + cortex_ingester_queried_samples_count 0 + # HELP cortex_ingester_queried_series The total number of series returned from queries. + # TYPE cortex_ingester_queried_series histogram + cortex_ingester_queried_series_bucket{le="10"} 0 + cortex_ingester_queried_series_bucket{le="80"} 0 + cortex_ingester_queried_series_bucket{le="640"} 0 + cortex_ingester_queried_series_bucket{le="5120"} 0 + cortex_ingester_queried_series_bucket{le="40960"} 0 + cortex_ingester_queried_series_bucket{le="327680"} 0 + cortex_ingester_queried_series_bucket{le="+Inf"} 0 + cortex_ingester_queried_series_sum 0 + cortex_ingester_queried_series_count 0 + # HELP cortex_ingester_queries_total The total number of queries the ingester has handled. + # TYPE cortex_ingester_queries_total counter + cortex_ingester_queries_total 0 + `)) + require.NoError(t, err) + + require.Equal(t, int64(0), maxInflightQueryRequests.Load()) +} + func TestTSDBMetrics(t *testing.T) { mainReg := prometheus.NewPedanticRegistry()