From a58508de9e0b019d81083b0ece9fa7d70d6b5a8d Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Fri, 26 Apr 2024 15:45:12 +0200 Subject: [PATCH] Receivers|Store: cache matchers for series calls We have tried caching matchers before with a time-based expiration cache, this time we are trying with LRU cache. We saw some of our receivers busy with compiling regexes and with high CPU usage, similar to the profile of the benchmark I added here: * Adding matcher cache for method `MatchersToPromMatchers` and a new version which uses the cache. * The main change is in `matchesExternalLabels` function which now receives a cache instance. adding matcher cache and refactor matchers Co-authored-by: Andre Branchizio Signed-off-by: Pedro Tanaka Using the cache in proxy and tsdb stores (only receiver) Signed-off-by: Pedro Tanaka fixing problem with deep equality Signed-off-by: Pedro Tanaka adding some docs Signed-off-by: Pedro Tanaka Adding benchmark Signed-off-by: Pedro Tanaka undo unecessary changes Signed-off-by: Pedro Tanaka Adjusting metric names Signed-off-by: Pedro Tanaka --- pkg/store/local.go | 2 +- pkg/store/prometheus.go | 18 +++-- pkg/store/proxy.go | 13 +++- pkg/store/proxy_test.go | 40 +++++++++- pkg/store/storepb/custom.go | 50 +++++++++---- pkg/store/storepb/matcher_cache.go | 97 +++++++++++++++++++++++++ pkg/store/storepb/matcher_cache_test.go | 85 ++++++++++++++++++++++ pkg/store/tsdb.go | 40 ++++++++-- 8 files changed, 313 insertions(+), 32 deletions(-) create mode 100644 pkg/store/storepb/matcher_cache.go create mode 100644 pkg/store/storepb/matcher_cache_test.go diff --git a/pkg/store/local.go b/pkg/store/local.go index 4e88c0a7e3f..01c6ee336ad 100644 --- a/pkg/store/local.go +++ b/pkg/store/local.go @@ -151,7 +151,7 @@ func (s *LocalStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.I // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels) + match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, nil) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index d52fcb07d92..46405559dbc 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -149,7 +149,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -524,8 +524,16 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que // matchesExternalLabels returns false if given matchers are not matching external labels. // If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels. -func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels) (bool, []*labels.Matcher, error) { - tms, err := storepb.MatchersToPromMatchers(ms...) +func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache *storepb.MatchersCache) (bool, []*labels.Matcher, error) { + var ( + tms []*labels.Matcher + err error + ) + if cache != nil { + tms, err = storepb.MatchersToPromMatchersCached(cache, ms...) + } else { + tms, err = storepb.MatchersToPromMatchers(ms...) + } if err != nil { return false, nil, err } @@ -573,7 +581,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -636,7 +644,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 6dbe5df7a31..f4a185011be 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -86,6 +86,7 @@ type ProxyStore struct { retrievalStrategy RetrievalStrategy debugLogging bool tsdbSelector *TSDBSelector + matcherCache *storepb.MatchersCache } type proxyStoreMetrics struct { @@ -109,7 +110,7 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(* } } -// BucketStoreOption are functions that configure BucketStore. +// ProxyStoreOption are functions that configure the ProxyStore. type ProxyStoreOption func(s *ProxyStore) // WithProxyStoreDebugLogging toggles debug logging. @@ -126,6 +127,13 @@ func WithTSDBSelector(selector *TSDBSelector) ProxyStoreOption { } } +// WithMatcherCache sets the matcher cache instance for the proxy. +func WithMatcherCache(cache *storepb.MatchersCache) ProxyStoreOption { + return func(s *ProxyStore) { + s.matcherCache = cache + } +} + // NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client. // Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL). func NewProxyStore( @@ -156,6 +164,7 @@ func NewProxyStore( metrics: metrics, retrievalStrategy: retrievalStrategy, tsdbSelector: DefaultSelector, + matcherCache: storepb.NewMatchersCache(storepb.WithPromRegistry(reg)), } for _, option := range options { @@ -292,7 +301,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. reqLogger = log.With(reqLogger, "request", originalRequest.String()) } - match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels) + match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index c4571de9762..56fff63fd59 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -6,11 +6,11 @@ package store import ( "context" "fmt" - "math" "math/rand" "os" "path/filepath" + "strings" "sync" "testing" "time" @@ -1974,6 +1974,43 @@ func BenchmarkProxySeries(b *testing.B) { }) } +func BenchmarkProxySeriesRegex(b *testing.B) { + tb := testutil.NewTB(b) + + q := NewProxyStore(nil, + nil, + func() []Client { return nil }, + component.Query, + labels.EmptyLabels(), 0*time.Second, EagerRetrieval, + ) + + words := []string{"foo", "bar", "baz", "qux", "quux", "corge", "grault", "garply", "waldo", "fred", "plugh", "xyzzy", "thud"} + bigRegex := strings.Builder{} + for i := 0; i < 200; i++ { + bigRegex.WriteString(words[rand.Intn(len(words))]) + bigRegex.WriteString("|") + } + + matchers := []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "foo", Value: ".*"}, + {Type: storepb.LabelMatcher_RE, Name: "bar", Value: bigRegex.String()}, + } + + // Create a regex that matches all series. + req := &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: math.MaxInt64, + Matchers: matchers, + } + s := newStoreSeriesServer(context.Background()) + + tb.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + testutil.Ok(b, q.Series(req, s)) + } +} + func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { tmpDir := t.TempDir() @@ -2293,5 +2330,4 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { tcase.testFn(tcase.responses, h) }) } - } diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index faed79bc7b1..b3754bafa95 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -379,29 +379,49 @@ func PromMatchersToMatchers(ms ...*labels.Matcher) ([]LabelMatcher, error) { func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) { res := make([]*labels.Matcher, 0, len(ms)) for _, m := range ms { - var t labels.MatchType - - switch m.Type { - case LabelMatcher_EQ: - t = labels.MatchEqual - case LabelMatcher_NEQ: - t = labels.MatchNotEqual - case LabelMatcher_RE: - t = labels.MatchRegexp - case LabelMatcher_NRE: - t = labels.MatchNotRegexp - default: - return nil, errors.Errorf("unrecognized label matcher type %d", m.Type) + pm, err := MatcherToPromMatcher(m) + if err != nil { + return nil, err } - m, err := labels.NewMatcher(t, m.Name, m.Value) + res = append(res, pm) + } + return res, nil +} + +// MatchersToPromMatchersCached returns Prometheus matchers from proto matchers. +// Works analogously to MatchersToPromMatchers but uses cache to avoid unnecessary allocations and conversions. +// NOTE: It allocates memory. +func MatchersToPromMatchersCached(cache *MatchersCache, ms ...LabelMatcher) ([]*labels.Matcher, error) { + res := make([]*labels.Matcher, 0, len(ms)) + for _, m := range ms { + pm, err := cache.GetOrSet(m, MatcherToPromMatcher) if err != nil { return nil, err } - res = append(res, m) + res = append(res, pm) } return res, nil } +// MatcherToPromMatcher converts a Thanos label matcher to Prometheus label matcher. +func MatcherToPromMatcher(m LabelMatcher) (*labels.Matcher, error) { + var t labels.MatchType + + switch m.Type { + case LabelMatcher_EQ: + t = labels.MatchEqual + case LabelMatcher_NEQ: + t = labels.MatchNotEqual + case LabelMatcher_RE: + t = labels.MatchRegexp + case LabelMatcher_NRE: + t = labels.MatchNotRegexp + default: + return nil, errors.Errorf("unrecognized label matcher type %d", m.Type) + } + return labels.NewMatcher(t, m.Name, m.Value) +} + // MatchersToString converts label matchers to string format. // String should be parsable as a valid PromQL query metric selector. func MatchersToString(ms ...LabelMatcher) string { diff --git a/pkg/store/storepb/matcher_cache.go b/pkg/store/storepb/matcher_cache.go new file mode 100644 index 00000000000..678925cb0e6 --- /dev/null +++ b/pkg/store/storepb/matcher_cache.go @@ -0,0 +1,97 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storepb + +import ( + lru "github.com/hashicorp/golang-lru/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" +) + +const DefaultCacheSize = 200 + +type NewItemFunc func(matcher LabelMatcher) (*labels.Matcher, error) + +type MatchersCache struct { + reg prometheus.Registerer + cache *lru.TwoQueueCache[LabelMatcher, *labels.Matcher] + metrics *matcherCacheMetrics + size int +} + +type MatcherCacheOption func(*MatchersCache) + +func WithPromRegistry(reg prometheus.Registerer) MatcherCacheOption { + return func(c *MatchersCache) { + c.reg = reg + } +} + +func WithSize(size int) MatcherCacheOption { + return func(c *MatchersCache) { + c.size = size + } +} + +func NewMatchersCache(opts ...MatcherCacheOption) *MatchersCache { + cache := &MatchersCache{ + reg: prometheus.NewRegistry(), + size: DefaultCacheSize, + } + + for _, opt := range opts { + opt(cache) + } + cache.metrics = newMatcherCacheMetrics(cache.reg) + + return cache +} + +func (c *MatchersCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { + if c.cache == nil { + lruCache, err := lru.New2Q[LabelMatcher, *labels.Matcher](c.size) + if err != nil { + return nil, err + } + c.cache = lruCache + } + c.metrics.requestsTotal.Inc() + if item, ok := c.cache.Get(key); ok { + c.metrics.hitsTotal.Inc() + return item, nil + } + + item, err := newItem(key) + if err != nil { + return nil, err + } + c.cache.Add(key, item) + c.metrics.numItems.Set(float64(c.cache.Len())) + + return item, nil +} + +type matcherCacheMetrics struct { + requestsTotal prometheus.Counter + hitsTotal prometheus.Counter + numItems prometheus.Gauge +} + +func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics { + return &matcherCacheMetrics{ + requestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_matchers_cache_requests_total", + Help: "Total number of cache requests for series matchers", + }), + hitsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_matchers_cache_hits_total", + Help: "Total number of cache hits for series matchers", + }), + numItems: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_matchers_cache_items", + Help: "Total number of cached items", + }), + } +} diff --git a/pkg/store/storepb/matcher_cache_test.go b/pkg/store/storepb/matcher_cache_test.go new file mode 100644 index 00000000000..f4bd5c35e3d --- /dev/null +++ b/pkg/store/storepb/matcher_cache_test.go @@ -0,0 +1,85 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storepb_test + +import ( + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/model/labels" + + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +func TestMatchersCache(t *testing.T) { + cache := storepb.NewMatchersCache(storepb.WithSize(2)) + + matcher := storepb.LabelMatcher{ + Type: storepb.LabelMatcher_EQ, + Name: "key", + Value: "val", + } + + matcher2 := storepb.LabelMatcher{ + Type: storepb.LabelMatcher_RE, + Name: "key2", + Value: "val2|val3", + } + + matcher3 := storepb.LabelMatcher{ + Type: storepb.LabelMatcher_EQ, + Name: "key3", + Value: "val3", + } + + var cacheHit bool + newItem := func(matcher storepb.LabelMatcher) (*labels.Matcher, error) { + cacheHit = false + return storepb.MatcherToPromMatcher(matcher) + } + expected := labels.MustNewMatcher(labels.MatchEqual, "key", "val") + expected2 := labels.MustNewMatcher(labels.MatchRegexp, "key2", "val2|val3") + expected3 := labels.MustNewMatcher(labels.MatchEqual, "key3", "val3") + + item, err := cache.GetOrSet(matcher, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher, newItem) + testutil.Ok(t, err) + testutil.Equals(t, true, cacheHit) + testutil.Equals(t, expected.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem) + testutil.Ok(t, err) + testutil.Equals(t, true, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher, newItem) + testutil.Ok(t, err) + testutil.Equals(t, true, cacheHit) + testutil.Equals(t, expected, item) + + cacheHit = true + item, err = cache.GetOrSet(matcher3, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected3, item) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) +} diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 6985c716fac..608eb512a47 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "google.golang.org/grpc" @@ -43,6 +44,7 @@ type TSDBStore struct { component component.StoreAPI buffers sync.Pool maxBytesPerFrame int + matcherCache *storepb.MatchersCache extLset labels.Labels mtx sync.RWMutex @@ -60,12 +62,35 @@ type ReadWriteTSDBStore struct { storepb.WriteableStoreServer } +type tsdbStoreOpts struct { + cacheSize int + registry prometheus.Registerer +} + +var defaultTsdbStoreOpts = tsdbStoreOpts{ + cacheSize: 200, +} + +type TSDBStoreOption func(*tsdbStoreOpts) + +func WithCacheSize(size int) TSDBStoreOption { + return func(o *tsdbStoreOpts) { + o.cacheSize = size + } +} + // NewTSDBStore creates a new TSDBStore. // NOTE: Given lset has to be sorted. -func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels) *TSDBStore { +func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels, opts ...TSDBStoreOption) *TSDBStore { if logger == nil { logger = log.NewNopLogger() } + + opt := defaultTsdbStoreOpts + for _, o := range opts { + o(&opt) + } + return &TSDBStore{ logger: logger, db: db, @@ -76,6 +101,7 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI b := make([]byte, 0, initialBufSize) return &b }}, + matcherCache: storepb.NewMatchersCache(storepb.WithPromRegistry(opt.registry), storepb.WithSize(opt.cacheSize)), } } @@ -130,13 +156,13 @@ func (s *TSDBStore) LabelSet() []labelpb.ZLabelSet { return labelSets } -func (p *TSDBStore) TSDBInfos() []infopb.TSDBInfo { - labels := p.LabelSet() +func (s *TSDBStore) TSDBInfos() []infopb.TSDBInfo { + labels := s.LabelSet() if len(labels) == 0 { return []infopb.TSDBInfo{} } - mint, maxt := p.TimeRange() + mint, maxt := s.TimeRange() return []infopb.TSDBInfo{ { Labels: labelpb.ZLabelSet{ @@ -171,7 +197,7 @@ type CloseDelegator interface { func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { srv := newFlushableServer(seriesSrv, sortingStrategyStore) - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), s.matcherCache) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -288,7 +314,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), nil) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -347,7 +373,7 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque } } - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), nil) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) }