Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add ability to limit max num of samples / concurrent queries #798

Merged
merged 57 commits into from
Mar 23, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
1bc1f59
store: add ability to limit max samples / conc. queries
Feb 1, 2019
e87f763
store/bucket: account for the RawChunk case
Feb 5, 2019
1ab1dc6
store/bucket_e2e_test: adjust sample limit size
Feb 5, 2019
d7c3ade
store/bucket: add metric thanos_bucket_store_queries_limited_total
Feb 5, 2019
12db24a
store/bucket: register queriesLimited metric
Feb 5, 2019
9d0b8a7
store: make changes according to the review comments
Feb 8, 2019
9727072
docs/store: update
Feb 8, 2019
d9c733a
store: gating naming changes, add span/extra metric
Feb 8, 2019
c4ce735
store: improve error messages
Feb 8, 2019
30eef19
store/limiter: improve error messages
Feb 8, 2019
194394d
store/gate: time -> seconds
Feb 8, 2019
2e51c2e
store/bucket_e2e_test: narrow down the first query
Feb 8, 2019
58a14fa
store/bucket: check for negative maxConcurrent
Feb 8, 2019
4d1b7ed
cmd/store: clarify help message
Feb 8, 2019
3ae3733
Merge remote-tracking branch 'origin/master' into smpl
Feb 8, 2019
b149f74
pkg/store: hook thanos_bucket_store_queries_limited into Limiter
Feb 8, 2019
e79c56d
store/bucket_test: fix NewBucketStore call
Feb 8, 2019
1d07515
docs: update again
Feb 8, 2019
38a093b
store/gate: spelling fix
Feb 9, 2019
7b13f7e
store/gate: spelling fix #2
Feb 9, 2019
4540394
Merge remote-tracking branch 'origin/master' into feature/store_sampl…
GiedriusS Feb 15, 2019
3e532fe
store/bucket: remove pointless newline
GiedriusS Feb 15, 2019
cff979c
store/gate: generalize gate timing
Mar 14, 2019
e7ea64b
store/gate: convert the g.gateTiming metric into a histogram
Mar 14, 2019
23c7368
store/bucket: change comment wording
Mar 14, 2019
da575e4
store/bucket: remove type from maxSamplesPerChunk
Mar 14, 2019
e390846
store/bucket: rename metric into thanos_bucket_store_queries_dropped
Mar 14, 2019
24e8e1f
thanos/store: clarify help message
Mar 14, 2019
4012eca
store/gate: rename metric to thanos_bucket_store_queries_in_flight
Mar 14, 2019
e7be55d
store/gate: fix MustRegister() call
Mar 14, 2019
3e8150d
docs: update
Mar 14, 2019
5ec5ce9
store/bucket: clarify the name of the span
Mar 14, 2019
810a131
store/bucket: inline calculation into the function call
Mar 14, 2019
541f180
Merge remote-tracking branch 'origin' into fork_store_sample_limit
Mar 14, 2019
ae8e425
CHANGELOG: add item about this
Mar 14, 2019
de8a234
store/gate: reduce number of buckets
Mar 15, 2019
07b4658
store/bucket: rename metric to thanos_bucket_store_queries_dropped_total
Mar 15, 2019
61d6ecd
store/bucket: move defer out of code block
Mar 15, 2019
70b115d
store/gate: generalize gate for different kinds of subsystems
Mar 15, 2019
36f1153
store/limiter: remove non-nil check
Mar 15, 2019
9b74bbe
CHANGELOG: fixes
Mar 15, 2019
82bdb3c
store/limiter: convert failedCounter to non-ptr
Mar 15, 2019
4d8420f
store/limiter: remove invalid comment
Mar 15, 2019
590b9a6
*: update according to review comments
Mar 18, 2019
3f40bac
CHANGELOG: update
Mar 18, 2019
f4734e5
*: fix according to review
Mar 18, 2019
d6c1534
*: fix according to review
Mar 18, 2019
1147acd
*: make docs
Mar 18, 2019
1d0fad3
CHANGELOG: clean up
Mar 18, 2019
ef4a51e
CHANGELOG: update
GiedriusS Mar 18, 2019
48141fd
Merge remote-tracking branch 'origin' into feature/store_sample_limit
Mar 20, 2019
c9a7d83
*: queries_in_flight_total -> queries_in_flight
GiedriusS Mar 21, 2019
d71f1d8
Merge branch 'master' into smpl_limit
Mar 22, 2019
280a8ca
store/bucket: do not wraper samplesLimiter error
Mar 22, 2019
11c4b18
store/bucket: err -> errors.Wrap
Mar 22, 2019
31a8346
store: make store.grpc.series-max-concurrency 20 by default
GiedriusS Mar 23, 2019
6e98dfd
CHANGELOG: add warning about new limit
GiedriusS Mar 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks.").
Default("2GB").Bytes()

maxSampleCount := cmd.Flag("grpc-sample-limit", "Maximum amount of samples returned via a single Series call. 0 means no limit.").
Default("50000000").Uint()

maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above grpc-concurrent-limit > query.grpc.max-concurrency?

See query.max-concurrency https://github.com/prometheus/prometheus/blob/master/cmd/prometheus/main.go#L233

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will rename to store.grpc.max-concurrency as we are talking about Thanos Store here :P


objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Expand Down Expand Up @@ -63,6 +68,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
peer,
uint64(*indexCacheSize),
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
int(*maxConcurrent),
name,
debugLogging,
*syncInterval,
Expand All @@ -87,6 +94,8 @@ func runStore(
peer cluster.Peer,
indexCacheSizeBytes uint64,
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
component string,
verbose bool,
syncInterval time.Duration,
Expand Down Expand Up @@ -117,6 +126,8 @@ func runStore(
dataDir,
indexCacheSizeBytes,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
)
Expand Down
6 changes: 6 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ Flags:
--index-cache-size=250MB Maximum size of items held in the index cache.
--chunk-pool-size=2GB Maximum size of concurrently allocatable bytes
for chunks.
--grpc-sample-limit=50000000
Maximum amount of samples returned via a single
Series call. 0 means no limit.
--grpc-concurrent-limit=20
Maximum number of concurrent Series calls. 0
means no limit.
--objstore.config-file=<bucket.config-yaml-path>
Path to YAML file that contains object store
configuration.
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package downsample

import (
"github.com/prometheus/tsdb"
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"

"github.com/prometheus/tsdb"

"github.com/improbable-eng/thanos/pkg/block/metadata"

"github.com/prometheus/prometheus/pkg/value"
Expand Down
73 changes: 63 additions & 10 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type bucketStoreMetrics struct {
seriesMergeDuration prometheus.Histogram
resultSeriesCount prometheus.Summary
chunkSizeBytes prometheus.Histogram
queriesLimited prometheus.Counter
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -131,6 +132,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
},
})

m.queriesLimited = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_queries_limited_total",
Help: "Total number of queries that were dropped due to the sample limit.",
})

if reg != nil {
reg.MustRegister(
m.blockLoads,
Expand All @@ -147,6 +153,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
m.seriesMergeDuration,
m.resultSeriesCount,
m.chunkSizeBytes,
m.queriesLimited,
)
}
return &m
Expand All @@ -171,6 +178,13 @@ type BucketStore struct {
debugLogging bool
// Number of goroutines to use when syncing blocks from object storage.
blockSyncConcurrency int

// The maximum of samples Thanos Store could return in one Series() call.
// Set to 0 to remove this limit (not recommended).
maxSampleCount uint64

// Query gate which limits the maximum amount of concurrent queries.
queryGate *Gate
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -182,6 +196,8 @@ func NewBucketStore(
dir string,
indexCacheSizeBytes uint64,
maxChunkPoolBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
) (*BucketStore, error) {
Expand All @@ -204,8 +220,10 @@ func NewBucketStore(
chunkPool: chunkPool,
blocks: map[ulid.ULID]*bucketBlock{},
blockSets: map[uint64]*bucketBlockSet{},
maxSampleCount: maxSampleCount,
debugLogging: debugLogging,
blockSyncConcurrency: blockSyncConcurrency,
queryGate: NewGate(maxConcurrent, reg),
}
s.metrics = newBucketStoreMetrics(reg)

Expand Down Expand Up @@ -463,14 +481,16 @@ func (s *bucketSeriesSet) Err() error {
return s.err
}

func (s *BucketStore) blockSeries(
func (bs *BucketStore) blockSeries(
ctx context.Context,
ulid ulid.ULID,
extLset map[string]string,
indexr *bucketIndexReader,
chunkr *bucketChunkReader,
matchers []labels.Matcher,
req *storepb.SeriesRequest,
samples *uint64,
samplesLock *sync.Mutex,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(matchers)
if err != nil {
Expand Down Expand Up @@ -559,7 +579,7 @@ func (s *BucketStore) blockSeries(
if err != nil {
return nil, nil, errors.Wrap(err, "get chunk")
}
if err := populateChunk(&s.chks[i], chk, req.Aggregates); err != nil {
if err := bs.populateChunk(&s.chks[i], chk, req.Aggregates, samples, samplesLock); err != nil {
return nil, nil, errors.Wrap(err, "populate chunk")
}
}
Expand All @@ -568,8 +588,30 @@ func (s *BucketStore) blockSeries(
return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil
}

func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error {
func (bs *BucketStore) checkSamples(gotSamples uint64, samples *uint64, samplesLock *sync.Mutex) error {
samplesLock.Lock()
*samples += gotSamples
if bs.maxSampleCount > 0 && *samples > bs.maxSampleCount {
samplesLock.Unlock()
bs.metrics.queriesLimited.Inc()
return errors.Errorf("sample limit violated (got %v, limit %v)", *samples, bs.maxSampleCount)
}
samplesLock.Unlock()
return nil
}

func (bs *BucketStore) populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr,
samples *uint64, samplesLock *sync.Mutex) error {

if in.Encoding() == chunkenc.EncXOR {
ch, err := chunkenc.FromData(in.Encoding(), in.Bytes())
if err != nil {
return errors.Errorf("failed to create a chunk")
}
err = bs.checkSamples(uint64(ch.NumSamples()), samples, samplesLock)
if err != nil {
return errors.Wrapf(err, "check samples")
}
out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()}
return nil
}
Expand All @@ -578,6 +620,10 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
}

ac := downsample.AggrChunk(in.Bytes())
err := bs.checkSamples(uint64(ac.NumSamples()), samples, samplesLock)
if err != nil {
return errors.Wrapf(err, "check samples")
}

for _, at := range aggrs {
switch at {
Expand Down Expand Up @@ -652,19 +698,24 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels
}

// Series implements the storepb.StoreServer interface.
// TODO(bwplotka): It buffers all chunks in memory and only then streams to client.
// 1. Either count chunk sizes and error out too big query.
// 2. Stream posting -> series -> chunk all together.
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
err := s.queryGate.Start(srv.Context())
if err != nil {
return errors.Wrapf(err, "gate Start failed")
}
defer s.queryGate.Done()

matchers, err := translateMatchers(req.Matchers)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
var (
stats = &queryStats{}
g run.Group
res []storepb.SeriesSet
mtx sync.Mutex
stats = &queryStats{}
g run.Group
res []storepb.SeriesSet
mtx sync.Mutex
samples uint64
samplesLock sync.Mutex
)
s.mtx.RLock()

Expand Down Expand Up @@ -701,6 +752,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
chunkr,
blockMatchers,
req,
&samples,
&samplesLock,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down
31 changes: 28 additions & 3 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *storeSuite) Close() {
s.wg.Wait()
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, maxSampleCount uint64) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand Down Expand Up @@ -87,7 +87,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket) *
testutil.Ok(t, os.RemoveAll(dir2))
}

store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20)
store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, maxSampleCount, 20, false, 20)
testutil.Ok(t, err)

s.store = store
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestBucketStore_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt)
s := prepareStoreWithTestBlocks(t, dir, bkt, 0)
defer s.Close()

mint, maxt := s.store.TimeRange()
Expand Down Expand Up @@ -215,6 +215,31 @@ func TestBucketStore_e2e(t *testing.T) {
MaxTime: maxt,
}, srv))
testutil.Equals(t, 0, len(srv.SeriesSet))

// Test the samples limit.
testutil.Ok(t, os.RemoveAll(dir))
s = prepareStoreWithTestBlocks(t, dir, bkt, 30)
mint, maxt = s.store.TimeRange()
defer s.Close()

srv = newStoreSeriesServer(ctx)

testutil.Ok(t, s.store.Series(&storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
},
MinTime: mint,
MaxTime: maxt,
}, srv))

testutil.NotOk(t, s.store.Series(&storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"},
},
MinTime: mint,
MaxTime: maxt,
}, srv))

})

}
43 changes: 43 additions & 0 deletions pkg/store/gate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package store

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/gate"
)

// Gate wraps the Prometheus gate with extra metrics.
type Gate struct {
g *gate.Gate
currentQueries prometheus.Gauge
}

// NewGate returns a new gate.
func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate {
g := &Gate{
g: gate.New(maxConcurrent),
}
g.currentQueries = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_queries_total",
Help: "Total number of currently executing queries.",
})

if reg != nil {
reg.MustRegister(g.currentQueries)
}

return g
}

// Start iniates a new query.
func (g *Gate) Start(ctx context.Context) error {
g.currentQueries.Inc()
return g.g.Start(ctx)
}

// Done finishes a query.
func (g *Gate) Done() {
g.currentQueries.Dec()
g.g.Done()
}