Skip to content

Commit

Permalink
add context cancellation checks on GetSeries
Browse files Browse the repository at this point in the history
Signed-off-by: Erlan Zholdubai uulu <erlanz@amazon.com>
  • Loading branch information
erlan-z committed Mar 22, 2024
1 parent 4b75a5c commit f8d0d61
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 8 deletions.
7 changes: 6 additions & 1 deletion pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,12 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st
if err != nil {
return storage.ErrSeriesSet(err)
}
return series.MetricsToSeriesSet(sortSeries, ms)
seriesSet, err := series.MetricsToSeriesSet(ctx, sortSeries, ms)
if err != nil {
return storage.ErrSeriesSet(err)
}

return seriesSet
}

return q.streamingSelect(ctx, sortSeries, minT, maxT, matchers)
Expand Down
18 changes: 13 additions & 5 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,12 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select
// we have all the sets from different sources (chunk from store, chunks from ingesters,
// time series from store and time series from ingesters).
// mergeSeriesSets will return sorted set.
return q.mergeSeriesSets(result)
mergedSeriesSet, err := q.mergeSeriesSets(ctx, result)
if err != nil {
return storage.ErrSeriesSet(err)
} else {
return mergedSeriesSet
}
}

// LabelValues implements storage.Querier.
Expand Down Expand Up @@ -553,7 +558,7 @@ func (querier) Close() error {
return nil
}

func (q querier) mergeSeriesSets(sets []storage.SeriesSet) storage.SeriesSet {
func (q querier) mergeSeriesSets(ctx context.Context, sets []storage.SeriesSet) (storage.SeriesSet, error) {
// Here we deal with sets that are based on chunks and build single set from them.
// Remaining sets are merged with chunks-based one using storage.NewMergeSeriesSet

Expand All @@ -565,6 +570,9 @@ func (q querier) mergeSeriesSets(sets []storage.SeriesSet) storage.SeriesSet {

// SeriesSet may have some series backed up by chunks, and some not.
for set.Next() {
if ctx.Err() != nil {
return nil, ctx.Err()
}
s := set.At()

if sc, ok := s.(SeriesWithChunks); ok {
Expand All @@ -582,18 +590,18 @@ func (q querier) mergeSeriesSets(sets []storage.SeriesSet) storage.SeriesSet {
}

if len(chunks) == 0 {
return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge)
return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge), nil
}

// partitionChunks returns set with sorted series, so it can be used by NewMergeSeriesSet
chunksSet := partitionChunks(chunks, q.mint, q.maxt, q.chunkIterFn)

if len(otherSets) == 0 {
return chunksSet
return chunksSet, nil
}

otherSets = append(otherSets, chunksSet)
return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge)
return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge), nil
}

type sliceSeriesSet struct {
Expand Down
8 changes: 6 additions & 2 deletions pkg/querier/series/series_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package series

import (
"context"
"sort"

"github.com/prometheus/common/model"
Expand Down Expand Up @@ -167,15 +168,18 @@ func MatrixToSeriesSet(sortSeries bool, m model.Matrix) storage.SeriesSet {
}

// MetricsToSeriesSet creates a storage.SeriesSet from a []metric.Metric
func MetricsToSeriesSet(sortSeries bool, ms []metric.Metric) storage.SeriesSet {
func MetricsToSeriesSet(ctx context.Context, sortSeries bool, ms []metric.Metric) (storage.SeriesSet, error) {
series := make([]storage.Series, 0, len(ms))
for _, m := range ms {
if ctx.Err() != nil {
return nil, ctx.Err()
}
series = append(series, &ConcreteSeries{
labels: metricToLabels(m.Metric),
samples: nil,
})
}
return NewConcreteSeriesSet(sortSeries, series)
return NewConcreteSeriesSet(sortSeries, series), nil
}

func metricToLabels(m model.Metric) labels.Labels {
Expand Down

0 comments on commit f8d0d61

Please sign in to comment.