Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new query stats metrics to track prometheus querystats #6228

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_total_queryable_samples_total` and `cortex_query_peak_samples` to track totalQueryableSamples and peakSample per user. #6228
* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func NewQuerierHandler(
// This is used for the stats API which we should not support. Or find other ways to.
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
reg,
nil,
querier.StatsRenderer,
false,
nil,
false,
Expand Down
42 changes: 32 additions & 10 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,15 @@ type Handler struct {
roundTripper http.RoundTripper

// Metrics.
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
querySamples *prometheus.CounterVec
queryChunkBytes *prometheus.CounterVec
queryDataBytes *prometheus.CounterVec
rejectedQueries *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
queryFetchedSamples *prometheus.CounterVec
queryTotalQueryableSamples *prometheus.CounterVec
queryPeakSamples *prometheus.HistogramVec
queryChunkBytes *prometheus.CounterVec
queryDataBytes *prometheus.CounterVec
rejectedQueries *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
}

// NewHandler creates a new frontend handler.
Expand All @@ -117,11 +119,25 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
Help: "Number of series fetched to execute a query.",
}, []string{"user"})

h.querySamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
h.queryFetchedSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_samples_total",
Help: "Number of samples fetched to execute a query.",
}, []string{"user"})

// It tracks TotalSamples in https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go#L237 for each user.
h.queryTotalQueryableSamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_samples_processed_total",
Help: "Number of samples processed to execute a query.",
}, []string{"user"})

h.queryPeakSamples = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_query_peak_samples",
Help: "Highest count of samples considered to execute a query.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"user"})

h.queryChunkBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_chunks_bytes_total",
Help: "Size of all chunks fetched to execute a query in bytes.",
Expand All @@ -143,7 +159,9 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
h.querySeries.DeleteLabelValues(user)
h.querySamples.DeleteLabelValues(user)
h.queryFetchedSamples.DeleteLabelValues(user)
h.queryTotalQueryableSamples.DeleteLabelValues(user)
h.queryPeakSamples.DeleteLabelValues(user)
h.queryChunkBytes.DeleteLabelValues(user)
h.queryDataBytes.DeleteLabelValues(user)
if err := util.DeleteMatchingLabels(h.rejectedQueries, map[string]string{"user": user}); err != nil {
Expand Down Expand Up @@ -301,6 +319,8 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
numSeries := stats.LoadFetchedSeries()
numChunks := stats.LoadFetchedChunks()
numSamples := stats.LoadFetchedSamples()
numProcessedSamples := stats.LoadProcessedSamples()
numPeakSamples := stats.LoadPeakSamples()
numChunkBytes := stats.LoadFetchedChunkBytes()
numDataBytes := stats.LoadFetchedDataBytes()
numStoreGatewayTouchedPostings := stats.LoadStoreGatewayTouchedPostings()
Expand All @@ -312,7 +332,9 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
f.querySamples.WithLabelValues(userID).Add(float64(numSamples))
f.queryFetchedSamples.WithLabelValues(userID).Add(float64(numSamples))
f.queryTotalQueryableSamples.WithLabelValues(userID).Add(float64(numProcessedSamples))
f.queryPeakSamples.WithLabelValues(userID).Observe(float64(numPeakSamples))
f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes))
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
f.activeUsers.UpdateUserTimestamp(userID, time.Now())
Expand Down
26 changes: 14 additions & 12 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with stats enabled",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripper,
expectedStatusCode: http.StatusOK,
},
Expand All @@ -202,7 +202,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonResponseTooLarge",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusRequestEntityTooLarge,
Expand All @@ -218,7 +218,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonTooManyRequests",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusTooManyRequests,
Expand All @@ -234,7 +234,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonTooManySamples",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -250,7 +250,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonTooLongRange",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -266,7 +266,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonSeriesFetched",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -282,7 +282,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonChunksFetched",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -298,7 +298,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonChunkBytesFetched",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -314,7 +314,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonDataBytesFetched",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -330,7 +330,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonSeriesLimitStoreGateway",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -346,7 +346,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonChunksLimitStoreGateway",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand All @@ -362,7 +362,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
{
name: "test handler with reasonBytesLimitStoreGateway",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 4,
expectedMetrics: 6,
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Expand Down Expand Up @@ -395,6 +395,8 @@ func TestHandler_ServeHTTP(t *testing.T) {
"cortex_query_fetched_series_total",
"cortex_query_samples_total",
"cortex_query_fetched_chunks_bytes_total",
"cortex_query_samples_processed_total",
"cortex_query_peak_samples",
)

assert.NoError(t, err)
Expand Down
42 changes: 42 additions & 0 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,46 @@ func (s *QueryStats) LoadStoreGatewayTouchedPostingBytes() uint64 {
return atomic.LoadUint64(&s.StoreGatewayTouchedPostingBytes)
}

func (s *QueryStats) AddProcessedSamples(count uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.ProcessedSamples, count)
}

func (s *QueryStats) LoadProcessedSamples() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.ProcessedSamples)
}

func (s *QueryStats) AddPeakSamples(count uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.PeakSamples, count)
}

func (s *QueryStats) SetPeakSamples(count uint64) {
if s == nil {
return
}

atomic.StoreUint64(&s.PeakSamples, count)
}

func (s *QueryStats) LoadPeakSamples() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.PeakSamples)
}

// Merge the provided Stats into this one.
func (s *QueryStats) Merge(other *QueryStats) {
if s == nil || other == nil {
Expand All @@ -317,6 +357,8 @@ func (s *QueryStats) Merge(other *QueryStats) {
s.AddFetchedChunks(other.LoadFetchedChunks())
s.AddStoreGatewayTouchedPostings(other.LoadStoreGatewayTouchedPostings())
s.AddStoreGatewayTouchedPostingBytes(other.LoadStoreGatewayTouchedPostingBytes())
s.AddProcessedSamples(other.LoadProcessedSamples())
s.SetPeakSamples(max(s.LoadPeakSamples(), other.LoadPeakSamples()))
s.AddExtraFields(other.LoadExtraFields()...)
}

Expand Down
Loading
Loading