Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding cortex_ingester_max_inflight_query_requests metric #5798

Merged
merged 4 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [ENHANCEMENT] Querier: Added `querier.store-gateway-query-stats-enabled` to enable or disable store gateway query stats log. #5749
* [ENHANCEMENT] Upgrade to go 1.21.6. #5765
* [ENHANCEMENT] AlertManager: Retrying AlertManager Delete Silence on error #5794
* [ENHANCEMENT] Ingester: Add new ingester metric `cortex_ingester_max_inflight_query_requests`. #5798
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717
* [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719
* [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734
Expand Down
52 changes: 50 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ const (

// Period at which to attempt purging metadata from memory.
metadataPurgePeriod = 5 * time.Minute

// Period at which we should reset the max inflight query requests counter.
maxInflightRequestResetPeriod = 1 * time.Minute
)

var (
Expand Down Expand Up @@ -200,6 +203,9 @@ type Ingester struct {
// Rate of pushed samples. Only used by V2-ingester to limit global samples push rate.
ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64

inflightQueryRequests atomic.Int64
maxInflightQueryRequests util_math.MaxTracker
}

// Shipper interface is used to have an easy way to mock it in tests.
Expand Down Expand Up @@ -627,7 +633,13 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
logger: logger,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
}
i.metrics = newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests)
i.metrics = newIngesterMetrics(registerer,
false,
cfg.ActiveSeriesMetricsEnabled,
i.getInstanceLimits,
i.ingestionRate,
&i.inflightPushRequests,
&i.maxInflightQueryRequests)

// Replace specific metrics which we can't directly track but we need to read
// them from the underlying system (ie. TSDB).
Expand Down Expand Up @@ -692,7 +704,14 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
}
i.metrics = newIngesterMetrics(registerer, false, false, i.getInstanceLimits, nil, &i.inflightPushRequests)
i.metrics = newIngesterMetrics(registerer,
false,
false,
i.getInstanceLimits,
nil,
&i.inflightPushRequests,
&i.maxInflightQueryRequests,
)

i.TSDBState.shipperIngesterID = "flusher"

Expand Down Expand Up @@ -815,6 +834,9 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
metadataPurgeTicker := time.NewTicker(metadataPurgePeriod)
defer metadataPurgeTicker.Stop()

maxInflightRequestResetTicker := time.NewTicker(maxInflightRequestResetPeriod)
defer maxInflightRequestResetTicker.Stop()

for {
select {
case <-metadataPurgeTicker.C:
Expand All @@ -831,6 +853,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error {

case <-activeSeriesTickerChan:
i.updateActiveSeries()
case <-maxInflightRequestResetTicker.C:
i.maxInflightQueryRequests.Tick()
case <-userTSDBConfigTicker.C:
i.updateUserTSDBConfigs()
case <-ctx.Done():
Expand Down Expand Up @@ -1250,6 +1274,9 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client
return nil, err
}

c := i.trackInflightQueryRequest()
defer c()

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1322,6 +1349,9 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
return nil, err
}

c := i.trackInflightQueryRequest()
defer c()

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1370,13 +1400,17 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery

// LabelValues returns all label values that are associated with a given label name.
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelsValuesCommon(ctx, req)
defer cleanup()
return resp, err
}

// LabelValuesStream returns all label values that are associated with a given label name.
func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) error {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req)
defer cleanup()

Expand Down Expand Up @@ -1451,13 +1485,17 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu

// LabelNames return all the label names.
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelNamesCommon(ctx, req)
defer cleanup()
return resp, err
}

// LabelNamesStream return all the label names.
func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) error {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelNamesCommon(stream.Context(), req)
defer cleanup()

Expand Down Expand Up @@ -1741,6 +1779,9 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return err
}

c := i.trackInflightQueryRequest()
defer c()

spanlog, ctx := spanlogger.New(stream.Context(), "QueryStream")
defer spanlog.Finish()

Expand Down Expand Up @@ -1786,6 +1827,13 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return nil
}

func (i *Ingester) trackInflightQueryRequest() func() {
i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc())
return func() {
i.inflightQueryRequests.Dec()
}
}

// queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples, totalBatchSizeBytes int, _ error) {
q, err := db.ChunkQuerier(from, through)
Expand Down
24 changes: 21 additions & 3 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,17 @@ type ingesterMetrics struct {
ingestionRate prometheus.GaugeFunc
maxInflightPushRequests prometheus.GaugeFunc
inflightRequests prometheus.GaugeFunc
inflightQueryRequests prometheus.GaugeFunc
}

func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSDB bool, activeSeriesEnabled bool, instanceLimitsFn func() *InstanceLimits, ingestionRate *util_math.EwmaRate, inflightRequests *atomic.Int64) *ingesterMetrics {
func newIngesterMetrics(r prometheus.Registerer,
createMetricsConflictingWithTSDB bool,
activeSeriesEnabled bool,
instanceLimitsFn func() *InstanceLimits,
ingestionRate *util_math.EwmaRate,
inflightPushRequests *atomic.Int64,
maxInflightQueryRequests *util_math.MaxTracker,
) *ingesterMetrics {
const (
instanceLimits = "cortex_ingester_instance_limits"
instanceLimitsHelp = "Instance limits used by this ingester." // Must be same for all registrations.
Expand Down Expand Up @@ -187,8 +195,18 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSD
Name: "cortex_ingester_inflight_push_requests",
Help: "Current number of inflight push requests in ingester.",
}, func() float64 {
if inflightRequests != nil {
return float64(inflightRequests.Load())
if inflightPushRequests != nil {
return float64(inflightPushRequests.Load())
}
return 0
}),

inflightQueryRequests: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_ingester_max_inflight_query_requests",
Help: "Max number of inflight query requests in ingester.",
}, func() float64 {
if maxInflightQueryRequests != nil {
return float64(maxInflightQueryRequests.Load())
}
return 0
}),
Expand Down
141 changes: 141 additions & 0 deletions pkg/ingester/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,149 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

util_math "github.com/cortexproject/cortex/pkg/util/math"
)

func TestIngesterMetrics(t *testing.T) {
mainReg := prometheus.NewPedanticRegistry()
ingestionRate := util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval)
inflightPushRequests := &atomic.Int64{}
maxInflightQueryRequests := util_math.MaxTracker{}
maxInflightQueryRequests.Track(98)
inflightPushRequests.Store(14)

m := newIngesterMetrics(mainReg,
false,
true,
func() *InstanceLimits {
return &InstanceLimits{
MaxIngestionRate: 12,
MaxInMemoryTenants: 1,
MaxInMemorySeries: 11,
MaxInflightPushRequests: 6,
}
},
ingestionRate,
inflightPushRequests,
&maxInflightQueryRequests)

require.NotNil(t, m)

err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(`
# HELP cortex_ingester_inflight_push_requests Current number of inflight push requests in ingester.
# TYPE cortex_ingester_inflight_push_requests gauge
cortex_ingester_inflight_push_requests 14
# HELP cortex_ingester_max_inflight_query_requests Max number of inflight query requests in ingester.
# TYPE cortex_ingester_max_inflight_query_requests gauge
cortex_ingester_max_inflight_query_requests 98
# HELP cortex_ingester_ingested_exemplars_failures_total The total number of exemplars that errored on ingestion.
# TYPE cortex_ingester_ingested_exemplars_failures_total counter
cortex_ingester_ingested_exemplars_failures_total 0
# HELP cortex_ingester_ingested_exemplars_total The total number of exemplars ingested.
# TYPE cortex_ingester_ingested_exemplars_total counter
cortex_ingester_ingested_exemplars_total 0
# HELP cortex_ingester_ingested_metadata_failures_total The total number of metadata that errored on ingestion.
# TYPE cortex_ingester_ingested_metadata_failures_total counter
cortex_ingester_ingested_metadata_failures_total 0
# HELP cortex_ingester_ingested_metadata_total The total number of metadata ingested.
# TYPE cortex_ingester_ingested_metadata_total counter
cortex_ingester_ingested_metadata_total 0
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 0
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
# TYPE cortex_ingester_ingested_samples_total counter
cortex_ingester_ingested_samples_total 0
# HELP cortex_ingester_ingestion_rate_samples_per_second Current ingestion rate in samples/sec that ingester is using to limit access.
# TYPE cortex_ingester_ingestion_rate_samples_per_second gauge
cortex_ingester_ingestion_rate_samples_per_second 0
# HELP cortex_ingester_instance_limits Instance limits used by this ingester.
# TYPE cortex_ingester_instance_limits gauge
cortex_ingester_instance_limits{limit="max_inflight_push_requests"} 6
cortex_ingester_instance_limits{limit="max_ingestion_rate"} 12
cortex_ingester_instance_limits{limit="max_series"} 11
cortex_ingester_instance_limits{limit="max_tenants"} 1
# HELP cortex_ingester_memory_metadata The current number of metadata in memory.
# TYPE cortex_ingester_memory_metadata gauge
cortex_ingester_memory_metadata 0
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 0
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 0
# HELP cortex_ingester_queried_chunks The total number of chunks returned from queries.
# TYPE cortex_ingester_queried_chunks histogram
cortex_ingester_queried_chunks_bucket{le="10"} 0
cortex_ingester_queried_chunks_bucket{le="80"} 0
cortex_ingester_queried_chunks_bucket{le="640"} 0
cortex_ingester_queried_chunks_bucket{le="5120"} 0
cortex_ingester_queried_chunks_bucket{le="40960"} 0
cortex_ingester_queried_chunks_bucket{le="327680"} 0
cortex_ingester_queried_chunks_bucket{le="2.62144e+06"} 0
cortex_ingester_queried_chunks_bucket{le="+Inf"} 0
cortex_ingester_queried_chunks_sum 0
cortex_ingester_queried_chunks_count 0
# HELP cortex_ingester_queried_exemplars The total number of exemplars returned from queries.
# TYPE cortex_ingester_queried_exemplars histogram
cortex_ingester_queried_exemplars_bucket{le="10"} 0
cortex_ingester_queried_exemplars_bucket{le="50"} 0
cortex_ingester_queried_exemplars_bucket{le="250"} 0
cortex_ingester_queried_exemplars_bucket{le="1250"} 0
cortex_ingester_queried_exemplars_bucket{le="6250"} 0
cortex_ingester_queried_exemplars_bucket{le="+Inf"} 0
cortex_ingester_queried_exemplars_sum 0
cortex_ingester_queried_exemplars_count 0
# HELP cortex_ingester_queried_samples The total number of samples returned from queries.
# TYPE cortex_ingester_queried_samples histogram
cortex_ingester_queried_samples_bucket{le="10"} 0
cortex_ingester_queried_samples_bucket{le="80"} 0
cortex_ingester_queried_samples_bucket{le="640"} 0
cortex_ingester_queried_samples_bucket{le="5120"} 0
cortex_ingester_queried_samples_bucket{le="40960"} 0
cortex_ingester_queried_samples_bucket{le="327680"} 0
cortex_ingester_queried_samples_bucket{le="2.62144e+06"} 0
cortex_ingester_queried_samples_bucket{le="2.097152e+07"} 0
cortex_ingester_queried_samples_bucket{le="+Inf"} 0
cortex_ingester_queried_samples_sum 0
cortex_ingester_queried_samples_count 0
# HELP cortex_ingester_queried_series The total number of series returned from queries.
# TYPE cortex_ingester_queried_series histogram
cortex_ingester_queried_series_bucket{le="10"} 0
cortex_ingester_queried_series_bucket{le="80"} 0
cortex_ingester_queried_series_bucket{le="640"} 0
cortex_ingester_queried_series_bucket{le="5120"} 0
cortex_ingester_queried_series_bucket{le="40960"} 0
cortex_ingester_queried_series_bucket{le="327680"} 0
cortex_ingester_queried_series_bucket{le="+Inf"} 0
cortex_ingester_queried_series_sum 0
cortex_ingester_queried_series_count 0
# HELP cortex_ingester_queries_total The total number of queries the ingester has handled.
# TYPE cortex_ingester_queries_total counter
cortex_ingester_queries_total 0
`))
require.NoError(t, err)

require.Equal(t, int64(98), maxInflightQueryRequests.Load())
maxInflightQueryRequests.Tick()

err = testutil.GatherAndCompare(mainReg, bytes.NewBufferString(`
# HELP cortex_ingester_max_inflight_query_requests Max number of inflight query requests in ingester.
# TYPE cortex_ingester_max_inflight_query_requests gauge
cortex_ingester_max_inflight_query_requests 98
`), "cortex_ingester_max_inflight_query_requests")
require.NoError(t, err)
maxInflightQueryRequests.Tick()
err = testutil.GatherAndCompare(mainReg, bytes.NewBufferString(`
# HELP cortex_ingester_max_inflight_query_requests Max number of inflight query requests in ingester.
# TYPE cortex_ingester_max_inflight_query_requests gauge
cortex_ingester_max_inflight_query_requests 0
`), "cortex_ingester_max_inflight_query_requests")
require.NoError(t, err)
}

func TestTSDBMetrics(t *testing.T) {
mainReg := prometheus.NewPedanticRegistry()

Expand Down
30 changes: 30 additions & 0 deletions pkg/util/math/max_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package math

import "go.uber.org/atomic"

type MaxTracker struct {
current atomic.Int64
old atomic.Int64
}

func (m *MaxTracker) Track(max int64) {
if l := m.current.Load(); l < max {
m.current.CompareAndSwap(l, max)
}
}

func (m *MaxTracker) Tick() {
m.old.Store(m.current.Load())
m.current.Store(0)
}

func (m *MaxTracker) Load() int64 {
c := m.current.Load()
o := m.old.Load()

if c > o {
return c
}

return o
}
17 changes: 17 additions & 0 deletions pkg/util/math/max_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package math

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestMaxTracker(t *testing.T) {
mt := MaxTracker{}
mt.Track(50)
require.Equal(t, int64(50), mt.Load())
mt.Tick()
require.Equal(t, int64(50), mt.Load())
mt.Tick()
require.Equal(t, int64(0), mt.Load())
}
Loading