From eef3a65a37ab05ec2a9c96ca767d2a188a228499 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 4 Feb 2019 16:39:05 +0200 Subject: [PATCH] store: add query concurrency limit --- cmd/thanos/store.go | 7 ++++++- docs/components/store.md | 5 ++++- pkg/store/bucket.go | 15 ++++++++++++--- pkg/store/bucket_e2e_test.go | 2 +- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index bbdc0eaebe0..25f8512abf5 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -36,9 +36,11 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks."). Default("2GB").Bytes() - maxSampleCount := cmd.Flag("grpc-sample-limit", "Maximum amount of samples returned via a single Series call."). + maxSampleCount := cmd.Flag("grpc-sample-limit", "Maximum amount of samples returned via a single Series call. 0 means no limit."). Default("50000000").Uint() + maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int() + objStoreConfig := regCommonObjStoreFlags(cmd, "", true) syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). @@ -67,6 +69,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string uint64(*indexCacheSize), uint64(*chunkPoolSize), uint64(*maxSampleCount), + int(*maxConcurrent), name, debugLogging, *syncInterval, @@ -92,6 +95,7 @@ func runStore( indexCacheSizeBytes uint64, chunkPoolSizeBytes uint64, maxSampleCount uint64, + maxConcurrent int, component string, verbose bool, syncInterval time.Duration, @@ -123,6 +127,7 @@ func runStore( indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount, + maxConcurrent, verbose, blockSyncConcurrency, ) diff --git a/docs/components/store.md b/docs/components/store.md index 8e603c9f101..866cfe2642b 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -106,7 +106,10 @@ Flags: for chunks. --grpc-sample-limit=50000000 Maximum amount of samples returned via a single - Series call. + Series call. 0 means no limit. + --grpc-concurrent-limit=20 + Maximum number of concurrent Series calls. 0 + means no limit. --objstore.config-file= Path to YAML file that contains object store configuration. diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index cac5b4e75c9..a0569abed36 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -31,6 +31,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/fileutil" @@ -175,6 +176,9 @@ type BucketStore struct { // The maximum of samples Thanos Store could return in one Series() call. // Set to 0 to remove this limit (not recommended). maxSampleCount uint64 + + // Query gate which limits the maximum amount of concurrent queries. + queryGate *gate.Gate } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -187,6 +191,7 @@ func NewBucketStore( indexCacheSizeBytes uint64, maxChunkPoolBytes uint64, maxSampleCount uint64, + maxConcurrent int, debugLogging bool, blockSyncConcurrency int, ) (*BucketStore, error) { @@ -212,6 +217,7 @@ func NewBucketStore( maxSampleCount: maxSampleCount, debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, + queryGate: gate.New(maxConcurrent), } s.metrics = newBucketStoreMetrics(reg) @@ -658,10 +664,13 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels } // Series implements the storepb.StoreServer interface. -// TODO(bwplotka): It buffers all chunks in memory and only then streams to client. -// 1. Either count chunk sizes and error out too big query. -// 2. Stream posting -> series -> chunk all together. func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + err := s.queryGate.Start(srv.Context()) + if err != nil { + return errors.Wrapf(err, "gate Start failed") + } + defer s.queryGate.Done() + matchers, err := translateMatchers(req.Matchers) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 980ffcf1cf2..b84f31e5baf 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -87,7 +87,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket) * testutil.Ok(t, os.RemoveAll(dir2)) } - store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, 0, false, 20) + store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, 0, 20, false, 20) testutil.Ok(t, err) s.store = store