Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming series limit at block series client #6972

Merged
merged 5 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6925](https://github.com/thanos-io/thanos/pull/6925) Store Gateway: Support float native histogram.
- [#6954](https://github.com/thanos-io/thanos/pull/6954) Index Cache: Support tracing for fetch APIs.
- [#6943](https://github.com/thanos-io/thanos/pull/6943) Ruler: Added `keep_firing_for` field in alerting rule.
- [#6972](https://github.com/thanos-io/thanos/pull/6972) Store Gateway: Added `--store.streaming-series-limit` to apply series limit when streaming series to apply for series actually matched.

### Changed

Expand Down
5 changes: 5 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type storeConfig struct {
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration
lazyExpandedPostingsEnabled bool
streamingSeriesLimit uint64
}

func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -186,6 +187,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.").
Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled)

cmd.Flag("store.streaming-series-limit", "The maximum series allowed to match when streaming series. The Series/LabelNames/LabelValues call fails if this limit is exceeded. 0 means no limit.").
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should be a limiter we apply when lazy postings are enabled, instead of through a separate flag. To me it seems like the store-gw should be able to choose the limiter itself and not rely on a config value.

Copy link
Contributor Author

@yeya24 yeya24 Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that makes sense to me. We can apply the limit in a streaming way when lazy postings or vertical sharding is enabled.

The only concern to me is that it changes the existing behavior. If a user don't want streaming series limit at all but they are using vertical sharding, then they don't have a way to restore previous behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah vertical sharding is hard to solve because it's distributed. But query shards could also hit different store-gw instances, so I am not sure if a global series limit is meaningful.

Copy link
Contributor Author

@yeya24 yeya24 Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I didn't mean to talk about global series limiter. Just saying in a single request to a single instanct, CX might still want to use the existing series limiter rather than the streaming one because existing series limiter covers how many series touched. So they don't have to download series at all before rejecting a request. Expanding postings is enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for these cases, maybe we should make the limiter configurable in the constructor? In Thanos we can use the streaming limiter for lazy postings, and the original one for eager postings. And Cortex can decide to configure store-gw as it sees fit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be quite complex tbh. I want to keep it simple, too.
I am thinking about two options:

  1. As you said we can just do streaming limiter for lazy postings and all other cases use original one. Ysers doesn't need to configure anything. Store Gateway will check whether it is lazy posting or not
  2. Change everything to just use streaming limiter. Honestly I think this is fine as it is what we use for other Stores like sidecar, receiver, etc.

I think I will probably go with option 1 in this pr. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this sounds good to me 👍 Let's start with this solution and we can still change it in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. PTAL

Hidden().Default("0").Uint64Var(&sc.streamingSeriesLimit)

cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb)

cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").
Expand Down Expand Up @@ -388,6 +392,7 @@ func runStore(
return conf.estimatedMaxChunkSize
}),
store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled),
store.WithStreamingSeriesLimiterFactory(store.NewSeriesLimiterFactory(conf.streamingSeriesLimit)),
}

if conf.debugLogging {
Expand Down
41 changes: 35 additions & 6 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ type BucketStore struct {
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call,
// or LabelName and LabelValues calls when used with matchers.
seriesLimiterFactory SeriesLimiterFactory
// streamingSeriesLimiterFactory creates a series limiter but applies series limit for actual matched series
// rather than touched series when stremaing series. Can be useful for vertical sharding or lazy postings scenario.
streamingSeriesLimiterFactory SeriesLimiterFactory

// bytesLimiterFactory creates a new limiter used to limit the amount of bytes fetched/touched by each Series() call.
bytesLimiterFactory BytesLimiterFactory
Expand Down Expand Up @@ -531,6 +534,14 @@ func WithDontResort(true bool) BucketStoreOption {
}
}

func WithStreamingSeriesLimiterFactory(factory SeriesLimiterFactory) BucketStoreOption {
return func(s *BucketStore) {
if true {
s.streamingSeriesLimiterFactory = factory
}
}
}

// NewBucketStore creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
func NewBucketStore(
Expand Down Expand Up @@ -952,8 +963,10 @@ type blockSeriesClient struct {
indexr *bucketIndexReader
chunkr *bucketChunkReader
loadAggregates []storepb.Aggr
chunksLimiter ChunksLimiter
bytesLimiter BytesLimiter

streamingSeriesLimiter SeriesLimiter
chunksLimiter ChunksLimiter
bytesLimiter BytesLimiter

lazyExpandedPostingEnabled bool
lazyExpandedPostingsCount prometheus.Counter
Expand Down Expand Up @@ -986,7 +999,8 @@ func newBlockSeriesClient(
logger log.Logger,
b *bucketBlock,
req *storepb.SeriesRequest,
limiter ChunksLimiter,
streamingSeriesLimiter SeriesLimiter,
chunksLimiter ChunksLimiter,
bytesLimiter BytesLimiter,
blockMatchers []*labels.Matcher,
shardMatcher *storepb.ShardMatcher,
Expand Down Expand Up @@ -1022,7 +1036,8 @@ func newBlockSeriesClient(
maxt: req.MaxTime,
indexr: b.indexReader(),
chunkr: chunkr,
chunksLimiter: limiter,
streamingSeriesLimiter: streamingSeriesLimiter,
chunksLimiter: chunksLimiter,
bytesLimiter: bytesLimiter,
skipChunks: req.SkipChunks,
seriesFetchDurationSum: seriesFetchDurationSum,
Expand Down Expand Up @@ -1169,6 +1184,7 @@ func (b *blockSeriesClient) nextBatch(tenant string) error {
return errors.Wrap(err, "preload series")
}

seriesMatched := 0
b.entries = b.entries[:0]
OUTER:
for i := 0; i < len(postingsBatch); i++ {
Expand Down Expand Up @@ -1209,6 +1225,7 @@ OUTER:
continue
}

seriesMatched++
s := seriesEntry{lset: completeLabelset}
if b.skipChunks {
b.entries = append(b.entries, s)
Expand Down Expand Up @@ -1238,6 +1255,11 @@ OUTER:
b.entries = append(b.entries, s)
}

// Apply series limit before fetching chunks, for actual series matched.
if err := b.streamingSeriesLimiter.Reserve(uint64(seriesMatched)); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err)
}

if !b.skipChunks {
if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter, b.tenant); err != nil {
return errors.Wrap(err, "load chunks")
Expand Down Expand Up @@ -1405,8 +1427,10 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
g, gctx = errgroup.WithContext(ctx)
resHints = &hintspb.SeriesResponseHints{}
reqBlockMatchers []*labels.Matcher
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant))
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))

chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant))
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))
streamingSeriesLimiter = s.streamingSeriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))

queryStatsEnabled = false
)
Expand Down Expand Up @@ -1464,6 +1488,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
s.logger,
blk,
req,
streamingSeriesLimiter,
chunksLimiter,
bytesLimiter,
sortedBlockMatchers,
Expand Down Expand Up @@ -1701,6 +1726,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
var mtx sync.Mutex
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))
var streamingSeriesLimiter = s.streamingSeriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))
var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant))

for _, b := range s.blocks {
Expand Down Expand Up @@ -1764,6 +1790,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
s.logger,
b,
seriesReq,
streamingSeriesLimiter,
nil,
bytesLimiter,
reqSeriesMatchersNoExtLabels,
Expand Down Expand Up @@ -1901,6 +1928,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
var mtx sync.Mutex
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))
var streamingSeriesLimiter = s.streamingSeriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))
var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant))

for _, b := range s.blocks {
Expand Down Expand Up @@ -1967,6 +1995,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
s.logger,
b,
seriesReq,
streamingSeriesLimiter,
nil,
bytesLimiter,
reqSeriesMatchersNoExtLabels,
Expand Down
1 change: 1 addition & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2777,6 +2777,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
nil,
blk,
req,
seriesLimiter,
chunksLimiter,
NewBytesLimiterFactory(0)(nil),
matchers,
Expand Down