Skip to content

Commit

Permalink
Add new query stats metrics to track prometheus querystats
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
  • Loading branch information
SungJin1212 committed Sep 23, 2024
1 parent d829d65 commit d4ddae7
Show file tree
Hide file tree
Showing 10 changed files with 345 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_total_queryable_samples_total` and `cortex_query_peak_samples` to track totalQueryableSamples and peakSample per user. #6228
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188
* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func NewQuerierHandler(
// This is used for the stats API which we should not support. Or find other ways to.
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
reg,
nil,
querier.StatsRenderer,
false,
nil,
false,
Expand Down
32 changes: 25 additions & 7 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,15 @@ type Handler struct {
roundTripper http.RoundTripper

// Metrics.
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
querySamples *prometheus.CounterVec
queryChunkBytes *prometheus.CounterVec
queryDataBytes *prometheus.CounterVec
rejectedQueries *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
querySamples *prometheus.CounterVec
queryTotalQueryableSamples *prometheus.CounterVec
queryPeakSamples *prometheus.CounterVec
queryChunkBytes *prometheus.CounterVec
queryDataBytes *prometheus.CounterVec
rejectedQueries *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
}

// NewHandler creates a new frontend handler.
Expand All @@ -122,6 +124,16 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
Help: "Number of samples fetched to execute a query.",
}, []string{"user"})

h.queryTotalQueryableSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_total_queryable_samples_total",
Help: "Number of total queryable samples to execute a query.",
}, []string{"user"})

h.queryPeakSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_peak_samples_total",
Help: "Highest count of samples considered to execute a query.",
}, []string{"user"})

h.queryChunkBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_chunks_bytes_total",
Help: "Size of all chunks fetched to execute a query in bytes.",
Expand All @@ -144,6 +156,8 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
h.querySeconds.DeleteLabelValues(user)
h.querySeries.DeleteLabelValues(user)
h.querySamples.DeleteLabelValues(user)
h.queryTotalQueryableSamples.DeleteLabelValues(user)
h.queryPeakSamples.DeleteLabelValues(user)
h.queryChunkBytes.DeleteLabelValues(user)
h.queryDataBytes.DeleteLabelValues(user)
if err := util.DeleteMatchingLabels(h.rejectedQueries, map[string]string{"user": user}); err != nil {
Expand Down Expand Up @@ -301,6 +315,8 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
numSeries := stats.LoadFetchedSeries()
numChunks := stats.LoadFetchedChunks()
numSamples := stats.LoadFetchedSamples()
numTotalQueryableSamples := stats.LoadTotalQueryableSamples()
numPeakSamples := stats.LoadPeakSamples()
numChunkBytes := stats.LoadFetchedChunkBytes()
numDataBytes := stats.LoadFetchedDataBytes()
numStoreGatewayTouchedPostings := stats.LoadStoreGatewayTouchedPostings()
Expand All @@ -313,6 +329,8 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
f.querySamples.WithLabelValues(userID).Add(float64(numSamples))
f.queryTotalQueryableSamples.WithLabelValues(userID).Add(float64(numTotalQueryableSamples))
f.queryPeakSamples.WithLabelValues(userID).Add(float64(numPeakSamples))
f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes))
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
f.activeUsers.UpdateUserTimestamp(userID, time.Now())
Expand Down
26 changes: 14 additions & 12 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with stats enabled",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripper,
expectedStatusCode: http.StatusOK,
},
Expand All @@ -202,7 +202,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonResponseTooLarge",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusRequestEntityTooLarge,
Expand All @@ -218,7 +218,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonTooManyRequests",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusTooManyRequests,
Expand All @@ -234,7 +234,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonTooManySamples",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -250,7 +250,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonTooLongRange",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -266,7 +266,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonSeriesFetched",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -282,7 +282,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonChunksFetched",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -298,7 +298,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonChunkBytesFetched",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -314,7 +314,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonDataBytesFetched",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -330,7 +330,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonSeriesLimitStoreGateway",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -346,7 +346,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonChunksLimitStoreGateway",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -362,7 +362,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonBytesLimitStoreGateway",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand Down Expand Up @@ -395,6 +395,8 @@ func TestHandler_ServeHTTP(t *testing.T) {
"cortex_query_fetched_series_total",
"cortex_query_samples_total",
"cortex_query_fetched_chunks_bytes_total",
"cortex_query_total_queryable_samples_total",
"cortex_query_peak_samples_total",
)

assert.NoError(t, err)
Expand Down
42 changes: 42 additions & 0 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,46 @@ func (s *QueryStats) LoadStoreGatewayTouchedPostingBytes() uint64 {
return atomic.LoadUint64(&s.StoreGatewayTouchedPostingBytes)
}

func (s *QueryStats) AddTotalQueryableSamples(count uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.TotalQueryableSamples, count)
}

func (s *QueryStats) LoadTotalQueryableSamples() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.TotalQueryableSamples)
}

func (s *QueryStats) AddPeakSamples(count uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.PeakSamples, count)
}

func (s *QueryStats) SetPeakSamples(count uint64) {
if s == nil {
return
}

atomic.StoreUint64(&s.PeakSamples, count)
}

func (s *QueryStats) LoadPeakSamples() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.PeakSamples)
}

// Merge the provided Stats into this one.
func (s *QueryStats) Merge(other *QueryStats) {
if s == nil || other == nil {
Expand All @@ -317,6 +357,8 @@ func (s *QueryStats) Merge(other *QueryStats) {
s.AddFetchedChunks(other.LoadFetchedChunks())
s.AddStoreGatewayTouchedPostings(other.LoadStoreGatewayTouchedPostings())
s.AddStoreGatewayTouchedPostingBytes(other.LoadStoreGatewayTouchedPostingBytes())
s.AddTotalQueryableSamples(other.LoadTotalQueryableSamples())
s.SetPeakSamples(max(s.LoadPeakSamples(), other.LoadPeakSamples()))
s.AddExtraFields(other.LoadExtraFields()...)
}

Expand Down
Loading

0 comments on commit d4ddae7

Please sign in to comment.