Skip to content

Commit

Permalink
Add new query stats metrics to track prometheus querystats
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
  • Loading branch information
SungJin1212 committed Oct 3, 2024
1 parent 53556fe commit 797b6ee
Show file tree
Hide file tree
Showing 10 changed files with 352 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_total_queryable_samples_total` and `cortex_query_peak_samples` to track totalQueryableSamples and peakSample per user. #6228
* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func NewQuerierHandler(
// This is used for the stats API which we should not support. Or find other ways to.
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
reg,
nil,
querier.StatsRenderer,
false,
nil,
false,
Expand Down
42 changes: 32 additions & 10 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,15 @@ type Handler struct {
roundTripper http.RoundTripper

// Metrics.
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
querySamples *prometheus.CounterVec
queryChunkBytes *prometheus.CounterVec
queryDataBytes *prometheus.CounterVec
rejectedQueries *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
queryFetchedSamples *prometheus.CounterVec
queryTotalQueryableSamples *prometheus.CounterVec
queryPeakSamples *prometheus.HistogramVec
queryChunkBytes *prometheus.CounterVec
queryDataBytes *prometheus.CounterVec
rejectedQueries *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
}

// NewHandler creates a new frontend handler.
Expand All @@ -117,11 +119,25 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
Help: "Number of series fetched to execute a query.",
}, []string{"user"})

h.querySamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
h.queryFetchedSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_samples_total",
Help: "Number of samples fetched to execute a query.",
}, []string{"user"})

// It tracks TotalSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go#L237 for each user.
h.queryTotalQueryableSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_samples_processed_total",
Help: "Number of samples processed to execute a query.",
}, []string{"user"})

h.queryPeakSamples = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_query_peak_samples",
Help: "Highest count of samples considered to execute a query.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"user"})

h.queryChunkBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_chunks_bytes_total",
Help: "Size of all chunks fetched to execute a query in bytes.",
Expand All @@ -143,7 +159,9 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
h.querySeries.DeleteLabelValues(user)
h.querySamples.DeleteLabelValues(user)
h.queryFetchedSamples.DeleteLabelValues(user)
h.queryTotalQueryableSamples.DeleteLabelValues(user)
h.queryPeakSamples.DeleteLabelValues(user)
h.queryChunkBytes.DeleteLabelValues(user)
h.queryDataBytes.DeleteLabelValues(user)
if err := util.DeleteMatchingLabels(h.rejectedQueries, map[string]string{"user": user}); err != nil {
Expand Down Expand Up @@ -301,6 +319,8 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
numSeries := stats.LoadFetchedSeries()
numChunks := stats.LoadFetchedChunks()
numSamples := stats.LoadFetchedSamples()
numProcessedSamples := stats.LoadProcessedSamples()
numPeakSamples := stats.LoadPeakSamples()
numChunkBytes := stats.LoadFetchedChunkBytes()
numDataBytes := stats.LoadFetchedDataBytes()
numStoreGatewayTouchedPostings := stats.LoadStoreGatewayTouchedPostings()
Expand All @@ -312,7 +332,9 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
f.querySamples.WithLabelValues(userID).Add(float64(numSamples))
f.queryFetchedSamples.WithLabelValues(userID).Add(float64(numSamples))
f.queryTotalQueryableSamples.WithLabelValues(userID).Add(float64(numProcessedSamples))
f.queryPeakSamples.WithLabelValues(userID).Observe(float64(numPeakSamples))
f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes))
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
f.activeUsers.UpdateUserTimestamp(userID, time.Now())
Expand Down
26 changes: 14 additions & 12 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with stats enabled",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripper,
expectedStatusCode: http.StatusOK,
},
Expand All @@ -202,7 +202,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonResponseTooLarge",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusRequestEntityTooLarge,
Expand All @@ -218,7 +218,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonTooManyRequests",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusTooManyRequests,
Expand All @@ -234,7 +234,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonTooManySamples",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -250,7 +250,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonTooLongRange",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -266,7 +266,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonSeriesFetched",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -282,7 +282,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonChunksFetched",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -298,7 +298,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonChunkBytesFetched",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -314,7 +314,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonDataBytesFetched",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -330,7 +330,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonSeriesLimitStoreGateway",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -346,7 +346,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonChunksLimitStoreGateway",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -362,7 +362,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonBytesLimitStoreGateway",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand Down Expand Up @@ -395,6 +395,8 @@ func TestHandler_ServeHTTP(t *testing.T) {
"cortex_query_fetched_series_total",
"cortex_query_samples_total",
"cortex_query_fetched_chunks_bytes_total",
"cortex_query_samples_processed_total",
"cortex_query_peak_samples",
)

assert.NoError(t, err)
Expand Down
42 changes: 42 additions & 0 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,46 @@ func (s *QueryStats) LoadStoreGatewayTouchedPostingBytes() uint64 {
return atomic.LoadUint64(&s.StoreGatewayTouchedPostingBytes)
}

func (s *QueryStats) AddProcessedSamples(count uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.ProcessedSamples, count)
}

func (s *QueryStats) LoadProcessedSamples() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.ProcessedSamples)
}

func (s *QueryStats) AddPeakSamples(count uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.PeakSamples, count)
}

func (s *QueryStats) SetPeakSamples(count uint64) {
if s == nil {
return
}

atomic.StoreUint64(&s.PeakSamples, count)
}

func (s *QueryStats) LoadPeakSamples() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.PeakSamples)
}

// Merge the provided Stats into this one.
func (s *QueryStats) Merge(other *QueryStats) {
if s == nil || other == nil {
Expand All @@ -317,6 +357,8 @@ func (s *QueryStats) Merge(other *QueryStats) {
s.AddFetchedChunks(other.LoadFetchedChunks())
s.AddStoreGatewayTouchedPostings(other.LoadStoreGatewayTouchedPostings())
s.AddStoreGatewayTouchedPostingBytes(other.LoadStoreGatewayTouchedPostingBytes())
s.AddProcessedSamples(other.LoadProcessedSamples())
s.SetPeakSamples(max(s.LoadPeakSamples(), other.LoadPeakSamples()))
s.AddExtraFields(other.LoadExtraFields()...)
}

Expand Down
Loading

0 comments on commit 797b6ee

Please sign in to comment.