Skip to content

Commit

Permalink
Adding cortex_ingester_inflight_query_requests metric to track the nu…
Browse files Browse the repository at this point in the history
…mber of query being executed on ingesters
  • Loading branch information
alanprot committed Mar 5, 2024
1 parent eafc37a commit 6cd1bef
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 3 deletions.
47 changes: 45 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ type Ingester struct {
// Rate of pushed samples. Only used by V2-ingester to limit global samples push rate.
ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64

inflightQueryRequests atomic.Int64
maxInflightQueryRequests atomic.Int64
}

// Shipper interface is used to have an easy way to mock it in tests.
Expand Down Expand Up @@ -627,7 +630,13 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
logger: logger,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
}
i.metrics = newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests)
i.metrics = newIngesterMetrics(registerer,
false,
cfg.ActiveSeriesMetricsEnabled,
i.getInstanceLimits,
i.ingestionRate,
&i.inflightPushRequests,
&i.maxInflightQueryRequests)

// Replace specific metrics which we can't directly track but we need to read
// them from the underlying system (ie. TSDB).
Expand Down Expand Up @@ -692,7 +701,14 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
}
i.metrics = newIngesterMetrics(registerer, false, false, i.getInstanceLimits, nil, &i.inflightPushRequests)
i.metrics = newIngesterMetrics(registerer,
false,
false,
i.getInstanceLimits,
nil,
&i.inflightPushRequests,
&i.maxInflightQueryRequests,
)

i.TSDBState.shipperIngesterID = "flusher"

Expand Down Expand Up @@ -1250,6 +1266,9 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client
return nil, err
}

c := i.trackInflightQueryRequest()
defer c()

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1322,6 +1341,9 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
return nil, err
}

c := i.trackInflightQueryRequest()
defer c()

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1370,13 +1392,17 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery

// LabelValues returns all label values that are associated with a given label name.
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelsValuesCommon(ctx, req)
defer cleanup()
return resp, err
}

// LabelValuesStream returns all label values that are associated with a given label name.
func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) error {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req)
defer cleanup()

Expand Down Expand Up @@ -1451,13 +1477,17 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu

// LabelNames return all the label names.
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelNamesCommon(ctx, req)
defer cleanup()
return resp, err
}

// LabelNamesStream return all the label names.
func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) error {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelNamesCommon(stream.Context(), req)
defer cleanup()

Expand Down Expand Up @@ -1741,6 +1771,9 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return err
}

c := i.trackInflightQueryRequest()
defer c()

spanlog, ctx := spanlogger.New(stream.Context(), "QueryStream")
defer spanlog.Finish()

Expand Down Expand Up @@ -1786,6 +1819,16 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return nil
}

func (i *Ingester) trackInflightQueryRequest() func() {
max := i.inflightQueryRequests.Inc()
if m := i.maxInflightQueryRequests.Load(); max > m {
i.maxInflightQueryRequests.CompareAndSwap(m, max)
}
return func() {
i.inflightQueryRequests.Dec()
}
}

// queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples, totalBatchSizeBytes int, _ error) {
q, err := db.ChunkQuerier(from, through)
Expand Down
22 changes: 21 additions & 1 deletion pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,17 @@ type ingesterMetrics struct {
ingestionRate prometheus.GaugeFunc
maxInflightPushRequests prometheus.GaugeFunc
inflightRequests prometheus.GaugeFunc
inflightQueryRequests prometheus.GaugeFunc
}

func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSDB bool, activeSeriesEnabled bool, instanceLimitsFn func() *InstanceLimits, ingestionRate *util_math.EwmaRate, inflightRequests *atomic.Int64) *ingesterMetrics {
func newIngesterMetrics(r prometheus.Registerer,
createMetricsConflictingWithTSDB bool,
activeSeriesEnabled bool,
instanceLimitsFn func() *InstanceLimits,
ingestionRate *util_math.EwmaRate,
inflightRequests *atomic.Int64,
maxInflightQueryRequests *atomic.Int64,
) *ingesterMetrics {
const (
instanceLimits = "cortex_ingester_instance_limits"
instanceLimitsHelp = "Instance limits used by this ingester." // Must be same for all registrations.
Expand Down Expand Up @@ -193,6 +201,18 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSD
return 0
}),

inflightQueryRequests: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_ingester_inflight_query_requests",
Help: "Max number of inflight query requests in ingester.",
}, func() float64 {
if maxInflightQueryRequests != nil {
r := maxInflightQueryRequests.Load()
maxInflightQueryRequests.Store(0)
return float64(r)
}
return 0
}),

// Not registered automatically, but only if activeSeriesEnabled is true.
activeSeriesPerUser: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_ingester_active_series",
Expand Down
126 changes: 126 additions & 0 deletions pkg/ingester/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,134 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

util_math "github.com/cortexproject/cortex/pkg/util/math"
)

func TestIngesterMetrics(t *testing.T) {
mainReg := prometheus.NewPedanticRegistry()
ingestionRate := util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval)
inflightPushRequests := &atomic.Int64{}
maxInflightQueryRequests := &atomic.Int64{}
maxInflightQueryRequests.Store(98)
inflightPushRequests.Store(14)

m := newIngesterMetrics(mainReg,
false,
true,
func() *InstanceLimits {
return &InstanceLimits{
MaxIngestionRate: 12,
MaxInMemoryTenants: 1,
MaxInMemorySeries: 11,
MaxInflightPushRequests: 6,
}
},
ingestionRate,
inflightPushRequests,
maxInflightQueryRequests)

require.NotNil(t, m)

err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(`
# HELP cortex_ingester_inflight_push_requests Current number of inflight push requests in ingester.
# TYPE cortex_ingester_inflight_push_requests gauge
cortex_ingester_inflight_push_requests 14
# HELP cortex_ingester_inflight_query_requests Max number of inflight query requests in ingester.
# TYPE cortex_ingester_inflight_query_requests gauge
cortex_ingester_inflight_query_requests 98
# HELP cortex_ingester_ingested_exemplars_failures_total The total number of exemplars that errored on ingestion.
# TYPE cortex_ingester_ingested_exemplars_failures_total counter
cortex_ingester_ingested_exemplars_failures_total 0
# HELP cortex_ingester_ingested_exemplars_total The total number of exemplars ingested.
# TYPE cortex_ingester_ingested_exemplars_total counter
cortex_ingester_ingested_exemplars_total 0
# HELP cortex_ingester_ingested_metadata_failures_total The total number of metadata that errored on ingestion.
# TYPE cortex_ingester_ingested_metadata_failures_total counter
cortex_ingester_ingested_metadata_failures_total 0
# HELP cortex_ingester_ingested_metadata_total The total number of metadata ingested.
# TYPE cortex_ingester_ingested_metadata_total counter
cortex_ingester_ingested_metadata_total 0
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 0
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
# TYPE cortex_ingester_ingested_samples_total counter
cortex_ingester_ingested_samples_total 0
# HELP cortex_ingester_ingestion_rate_samples_per_second Current ingestion rate in samples/sec that ingester is using to limit access.
# TYPE cortex_ingester_ingestion_rate_samples_per_second gauge
cortex_ingester_ingestion_rate_samples_per_second 0
# HELP cortex_ingester_instance_limits Instance limits used by this ingester.
# TYPE cortex_ingester_instance_limits gauge
cortex_ingester_instance_limits{limit="max_inflight_push_requests"} 6
cortex_ingester_instance_limits{limit="max_ingestion_rate"} 12
cortex_ingester_instance_limits{limit="max_series"} 11
cortex_ingester_instance_limits{limit="max_tenants"} 1
# HELP cortex_ingester_memory_metadata The current number of metadata in memory.
# TYPE cortex_ingester_memory_metadata gauge
cortex_ingester_memory_metadata 0
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 0
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 0
# HELP cortex_ingester_queried_chunks The total number of chunks returned from queries.
# TYPE cortex_ingester_queried_chunks histogram
cortex_ingester_queried_chunks_bucket{le="10"} 0
cortex_ingester_queried_chunks_bucket{le="80"} 0
cortex_ingester_queried_chunks_bucket{le="640"} 0
cortex_ingester_queried_chunks_bucket{le="5120"} 0
cortex_ingester_queried_chunks_bucket{le="40960"} 0
cortex_ingester_queried_chunks_bucket{le="327680"} 0
cortex_ingester_queried_chunks_bucket{le="2.62144e+06"} 0
cortex_ingester_queried_chunks_bucket{le="+Inf"} 0
cortex_ingester_queried_chunks_sum 0
cortex_ingester_queried_chunks_count 0
# HELP cortex_ingester_queried_exemplars The total number of exemplars returned from queries.
# TYPE cortex_ingester_queried_exemplars histogram
cortex_ingester_queried_exemplars_bucket{le="10"} 0
cortex_ingester_queried_exemplars_bucket{le="50"} 0
cortex_ingester_queried_exemplars_bucket{le="250"} 0
cortex_ingester_queried_exemplars_bucket{le="1250"} 0
cortex_ingester_queried_exemplars_bucket{le="6250"} 0
cortex_ingester_queried_exemplars_bucket{le="+Inf"} 0
cortex_ingester_queried_exemplars_sum 0
cortex_ingester_queried_exemplars_count 0
# HELP cortex_ingester_queried_samples The total number of samples returned from queries.
# TYPE cortex_ingester_queried_samples histogram
cortex_ingester_queried_samples_bucket{le="10"} 0
cortex_ingester_queried_samples_bucket{le="80"} 0
cortex_ingester_queried_samples_bucket{le="640"} 0
cortex_ingester_queried_samples_bucket{le="5120"} 0
cortex_ingester_queried_samples_bucket{le="40960"} 0
cortex_ingester_queried_samples_bucket{le="327680"} 0
cortex_ingester_queried_samples_bucket{le="2.62144e+06"} 0
cortex_ingester_queried_samples_bucket{le="2.097152e+07"} 0
cortex_ingester_queried_samples_bucket{le="+Inf"} 0
cortex_ingester_queried_samples_sum 0
cortex_ingester_queried_samples_count 0
# HELP cortex_ingester_queried_series The total number of series returned from queries.
# TYPE cortex_ingester_queried_series histogram
cortex_ingester_queried_series_bucket{le="10"} 0
cortex_ingester_queried_series_bucket{le="80"} 0
cortex_ingester_queried_series_bucket{le="640"} 0
cortex_ingester_queried_series_bucket{le="5120"} 0
cortex_ingester_queried_series_bucket{le="40960"} 0
cortex_ingester_queried_series_bucket{le="327680"} 0
cortex_ingester_queried_series_bucket{le="+Inf"} 0
cortex_ingester_queried_series_sum 0
cortex_ingester_queried_series_count 0
# HELP cortex_ingester_queries_total The total number of queries the ingester has handled.
# TYPE cortex_ingester_queries_total counter
cortex_ingester_queries_total 0
`))
require.NoError(t, err)

require.Equal(t, int64(0), maxInflightQueryRequests.Load())
}

func TestTSDBMetrics(t *testing.T) {
mainReg := prometheus.NewPedanticRegistry()

Expand Down

0 comments on commit 6cd1bef

Please sign in to comment.