Skip to content

Commit

Permalink
add storage wall time to querier stats
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed Mar 6, 2024
1 parent 2cd304b commit 4fbf05d
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 46 deletions.
6 changes: 6 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query

func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) {
wallTime := stats.LoadWallTime()
storageWallTime := stats.LoadStorageWallTime()
numSeries := stats.LoadFetchedSeries()
numChunks := stats.LoadFetchedChunks()
numSamples := stats.LoadFetchedSamples()
Expand Down Expand Up @@ -356,6 +357,11 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
if priority, ok := stats.LoadPriority(); ok {
logMessage = append(logMessage, "priority", priority)
}
if sws := storageWallTime.Seconds(); sws > 0 {
// Only include storage wall time field if set. This value can be 0
// for query APIs that don't call `Querier` interface.
logMessage = append(logMessage, "storage_wall_time_seconds", sws)
}

if error != nil {
s, ok := status.FromError(error)
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
queryStats: &querier_stats.QueryStats{
Stats: querier_stats.Stats{
WallTime: 3 * time.Second,
StorageWallTime: 100 * time.Minute,
FetchedSeriesCount: 100,
FetchedChunksCount: 200,
FetchedSamplesCount: 300,
Expand All @@ -335,7 +336,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
SplitQueries: 10,
},
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 storage_wall_time_seconds=6000`,
},
"should include user agent": {
header: http.Header{"User-Agent": []string{"Grafana"}},
Expand Down
33 changes: 24 additions & 9 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/iterators"
"github.com/cortexproject/cortex/pkg/querier/lazyquery"
seriesset "github.com/cortexproject/cortex/pkg/querier/series"
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -284,10 +285,11 @@ type querier struct {
limiterHolder *limiterHolder
}

func (q querier) setupFromCtx(ctx context.Context) (context.Context, string, int64, int64, storage.Querier, []storage.Querier, error) {
func (q querier) setupFromCtx(ctx context.Context) (context.Context, *querier_stats.QueryStats, string, int64, int64, storage.Querier, []storage.Querier, error) {
stats := querier_stats.FromContext(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return ctx, userID, 0, 0, nil, nil, err
return ctx, stats, userID, 0, 0, nil, nil, err
}

q.limiterHolder.limiterInitializer.Do(func() {
Expand All @@ -298,12 +300,12 @@ func (q querier) setupFromCtx(ctx context.Context) (context.Context, string, int

mint, maxt, err := validateQueryTimeRange(ctx, userID, q.mint, q.maxt, q.limits, q.maxQueryIntoFuture)
if err != nil {
return ctx, userID, 0, 0, nil, nil, err
return ctx, stats, userID, 0, 0, nil, nil, err
}

dqr, err := q.distributor.Querier(mint, maxt)
if err != nil {
return ctx, userID, 0, 0, nil, nil, err
return ctx, stats, userID, 0, 0, nil, nil, err
}
metadataQuerier := dqr

Expand All @@ -319,23 +321,27 @@ func (q querier) setupFromCtx(ctx context.Context) (context.Context, string, int

cqr, err := s.Querier(mint, maxt)
if err != nil {
return ctx, userID, 0, 0, nil, nil, err
return ctx, stats, userID, 0, 0, nil, nil, err
}

queriers = append(queriers, cqr)
}
return ctx, userID, mint, maxt, metadataQuerier, queriers, nil
return ctx, stats, userID, mint, maxt, metadataQuerier, queriers, nil
}

// Select implements storage.Querier interface.
// The bool passed is ignored because the series is always sorted.
func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
ctx, userID, mint, maxt, metadataQuerier, queriers, err := q.setupFromCtx(ctx)
ctx, stats, userID, mint, maxt, metadataQuerier, queriers, err := q.setupFromCtx(ctx)
if err == errEmptyTimeRange {
return storage.EmptySeriesSet()
} else if err != nil {
return storage.ErrSeriesSet(err)
}
startT := time.Now()
defer func() {
stats.AddStorageWallTime(time.Since(startT))
}()

log, ctx := spanlogger.New(ctx, "querier.Select")
defer log.Span.Finish()
Expand Down Expand Up @@ -426,12 +432,17 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select

// LabelValues implements storage.Querier.
func (q querier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
ctx, _, _, _, metadataQuerier, queriers, err := q.setupFromCtx(ctx)
ctx, stats, _, _, _, metadataQuerier, queriers, err := q.setupFromCtx(ctx)
if err == errEmptyTimeRange {
return nil, nil, nil
} else if err != nil {
return nil, nil, err
}
startT := time.Now()
defer func() {
stats.AddStorageWallTime(time.Since(startT))
}()

if !q.queryStoreForLabels {
return metadataQuerier.LabelValues(ctx, name, matchers...)
}
Expand Down Expand Up @@ -475,12 +486,16 @@ func (q querier) LabelValues(ctx context.Context, name string, matchers ...*labe
}

func (q querier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
ctx, _, _, _, metadataQuerier, queriers, err := q.setupFromCtx(ctx)
ctx, stats, _, _, _, metadataQuerier, queriers, err := q.setupFromCtx(ctx)
if err == errEmptyTimeRange {
return nil, nil, nil
} else if err != nil {
return nil, nil, err
}
startT := time.Now()
defer func() {
stats.AddStorageWallTime(time.Since(startT))
}()

if !q.queryStoreForLabels {
return metadataQuerier.LabelNames(ctx, matchers...)
Expand Down
19 changes: 19 additions & 0 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,24 @@ func (s *QueryStats) LoadFetchedChunks() uint64 {
return atomic.LoadUint64(&s.FetchedChunksCount)
}

// AddStorageWallTime adds some time to the counter.
func (s *QueryStats) AddStorageWallTime(t time.Duration) {
if s == nil {
return
}

atomic.AddInt64((*int64)(&s.StorageWallTime), int64(t))
}

// LoadStorageWallTime returns current storage wall time.
func (s *QueryStats) LoadStorageWallTime() time.Duration {
if s == nil {
return 0
}

return time.Duration(atomic.LoadInt64((*int64)(&s.StorageWallTime)))
}

func (s *QueryStats) AddSplitQueries(count uint64) {
if s == nil {
return
Expand Down Expand Up @@ -259,6 +277,7 @@ func (s *QueryStats) Merge(other *QueryStats) {
}

s.AddWallTime(other.LoadWallTime())
s.AddStorageWallTime(other.LoadStorageWallTime())
s.AddFetchedSeries(other.LoadFetchedSeries())
s.AddFetchedChunkBytes(other.LoadFetchedChunkBytes())
s.AddFetchedDataBytes(other.LoadFetchedDataBytes())
Expand Down
131 changes: 95 additions & 36 deletions pkg/querier/stats/stats.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/querier/stats/stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ message Stats {
// The total number of split queries sent after going through all the middlewares.
// It includes the number of requests that might be discarded by the queue.
uint64 split_queries = 9;
// The sum of wall time spent in the querier to fetch and merge data from storage.
google.protobuf.Duration storage_wall_time = 10 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false];
}
Loading

0 comments on commit 4fbf05d

Please sign in to comment.