Skip to content

Commit

Permalink
store: add query concurrency limit
Browse files Browse the repository at this point in the history
  • Loading branch information
Giedrius Statkevičius committed Feb 4, 2019
1 parent ac7cdde commit eef3a65
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 6 deletions.
7 changes: 6 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks.").
Default("2GB").Bytes()

maxSampleCount := cmd.Flag("grpc-sample-limit", "Maximum amount of samples returned via a single Series call.").
maxSampleCount := cmd.Flag("grpc-sample-limit", "Maximum amount of samples returned via a single Series call. 0 means no limit.").
Default("50000000").Uint()

maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int()

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Expand Down Expand Up @@ -67,6 +69,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
uint64(*indexCacheSize),
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
int(*maxConcurrent),
name,
debugLogging,
*syncInterval,
Expand All @@ -92,6 +95,7 @@ func runStore(
indexCacheSizeBytes uint64,
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
component string,
verbose bool,
syncInterval time.Duration,
Expand Down Expand Up @@ -123,6 +127,7 @@ func runStore(
indexCacheSizeBytes,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
)
Expand Down
5 changes: 4 additions & 1 deletion docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ Flags:
for chunks.
--grpc-sample-limit=50000000
Maximum amount of samples returned via a single
Series call.
Series call. 0 means no limit.
--grpc-concurrent-limit=20
Maximum number of concurrent Series calls. 0
means no limit.
--objstore.config-file=<bucket.config-yaml-path>
Path to YAML file that contains object store
configuration.
Expand Down
15 changes: 12 additions & 3 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/gate"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/fileutil"
Expand Down Expand Up @@ -175,6 +176,9 @@ type BucketStore struct {
// The maximum of samples Thanos Store could return in one Series() call.
// Set to 0 to remove this limit (not recommended).
maxSampleCount uint64

// Query gate which limits the maximum amount of concurrent queries.
queryGate *gate.Gate
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -187,6 +191,7 @@ func NewBucketStore(
indexCacheSizeBytes uint64,
maxChunkPoolBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
) (*BucketStore, error) {
Expand All @@ -212,6 +217,7 @@ func NewBucketStore(
maxSampleCount: maxSampleCount,
debugLogging: debugLogging,
blockSyncConcurrency: blockSyncConcurrency,
queryGate: gate.New(maxConcurrent),
}
s.metrics = newBucketStoreMetrics(reg)

Expand Down Expand Up @@ -658,10 +664,13 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels
}

// Series implements the storepb.StoreServer interface.
// TODO(bwplotka): It buffers all chunks in memory and only then streams to client.
// 1. Either count chunk sizes and error out too big query.
// 2. Stream posting -> series -> chunk all together.
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
err := s.queryGate.Start(srv.Context())
if err != nil {
return errors.Wrapf(err, "gate Start failed")
}
defer s.queryGate.Done()

matchers, err := translateMatchers(req.Matchers)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket) *
testutil.Ok(t, os.RemoveAll(dir2))
}

store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, 0, false, 20)
store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, 0, 20, false, 20)
testutil.Ok(t, err)

s.store = store
Expand Down

0 comments on commit eef3a65

Please sign in to comment.