Skip to content

Commit

Permalink
Receivers|Store: cache matchers for series calls
Browse files Browse the repository at this point in the history
We have tried caching matchers before with a time-based expiration cache, this time we are trying with LRU cache.

We saw some of our receivers busy with compiling regexes and with high CPU usage, similar to the profile of the benchmark I added here:

* Adding matcher cache for method `MatchersToPromMatchers` and a new version which uses the cache.
* The main change is in `matchesExternalLabels` function which now receives a cache instance.

adding matcher cache and refactor matchers

Co-authored-by: Andre Branchizio <andre.branchizio@shopify.com>

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

Using the cache in proxy and tsdb stores (only receiver)

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

fixing problem with deep equality

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

adding some docs

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

Adding benchmark

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

undo unecessary changes

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

Adjusting metric names

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>
  • Loading branch information
pedro-stanaka committed May 13, 2024
1 parent 2d738f0 commit a58508d
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 32 deletions.
2 changes: 1 addition & 1 deletion pkg/store/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *LocalStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.I
// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels)
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, nil)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto

extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -524,8 +524,16 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que

// matchesExternalLabels returns false if given matchers are not matching external labels.
// If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels.
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels) (bool, []*labels.Matcher, error) {
tms, err := storepb.MatchersToPromMatchers(ms...)
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache *storepb.MatchersCache) (bool, []*labels.Matcher, error) {
var (
tms []*labels.Matcher
err error
)
if cache != nil {
tms, err = storepb.MatchersToPromMatchersCached(cache, ms...)
} else {
tms, err = storepb.MatchersToPromMatchers(ms...)
}
if err != nil {
return false, nil, err
}
Expand Down Expand Up @@ -573,7 +581,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -636,7 +644,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue

extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type ProxyStore struct {
retrievalStrategy RetrievalStrategy
debugLogging bool
tsdbSelector *TSDBSelector
matcherCache *storepb.MatchersCache
}

type proxyStoreMetrics struct {
Expand All @@ -109,7 +110,7 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*
}
}

// BucketStoreOption are functions that configure BucketStore.
// ProxyStoreOption are functions that configure the ProxyStore.
type ProxyStoreOption func(s *ProxyStore)

// WithProxyStoreDebugLogging toggles debug logging.
Expand All @@ -126,6 +127,13 @@ func WithTSDBSelector(selector *TSDBSelector) ProxyStoreOption {
}
}

// WithMatcherCache sets the matcher cache instance for the proxy.
func WithMatcherCache(cache *storepb.MatchersCache) ProxyStoreOption {
return func(s *ProxyStore) {
s.matcherCache = cache
}
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL).
func NewProxyStore(
Expand Down Expand Up @@ -156,6 +164,7 @@ func NewProxyStore(
metrics: metrics,
retrievalStrategy: retrievalStrategy,
tsdbSelector: DefaultSelector,
matcherCache: storepb.NewMatchersCache(storepb.WithPromRegistry(reg)),
}

for _, option := range options {
Expand Down Expand Up @@ -292,7 +301,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
reqLogger = log.With(reqLogger, "request", originalRequest.String())
}

match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels)
match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
40 changes: 38 additions & 2 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ package store
import (
"context"
"fmt"

"math"
"math/rand"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1974,6 +1974,43 @@ func BenchmarkProxySeries(b *testing.B) {
})
}

func BenchmarkProxySeriesRegex(b *testing.B) {
tb := testutil.NewTB(b)

q := NewProxyStore(nil,
nil,
func() []Client { return nil },
component.Query,
labels.EmptyLabels(), 0*time.Second, EagerRetrieval,
)

words := []string{"foo", "bar", "baz", "qux", "quux", "corge", "grault", "garply", "waldo", "fred", "plugh", "xyzzy", "thud"}
bigRegex := strings.Builder{}
for i := 0; i < 200; i++ {
bigRegex.WriteString(words[rand.Intn(len(words))])
bigRegex.WriteString("|")
}

matchers := []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_RE, Name: "foo", Value: ".*"},
{Type: storepb.LabelMatcher_RE, Name: "bar", Value: bigRegex.String()},
}

// Create a regex that matches all series.
req := &storepb.SeriesRequest{
MinTime: 0,
MaxTime: math.MaxInt64,
Matchers: matchers,
}
s := newStoreSeriesServer(context.Background())

tb.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
testutil.Ok(b, q.Series(req, s))
}
}

func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
tmpDir := t.TempDir()

Expand Down Expand Up @@ -2293,5 +2330,4 @@ func TestDedupRespHeap_Deduplication(t *testing.T) {
tcase.testFn(tcase.responses, h)
})
}

}
50 changes: 35 additions & 15 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,29 +379,49 @@ func PromMatchersToMatchers(ms ...*labels.Matcher) ([]LabelMatcher, error) {
func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for _, m := range ms {
var t labels.MatchType

switch m.Type {
case LabelMatcher_EQ:
t = labels.MatchEqual
case LabelMatcher_NEQ:
t = labels.MatchNotEqual
case LabelMatcher_RE:
t = labels.MatchRegexp
case LabelMatcher_NRE:
t = labels.MatchNotRegexp
default:
return nil, errors.Errorf("unrecognized label matcher type %d", m.Type)
pm, err := MatcherToPromMatcher(m)
if err != nil {
return nil, err
}
m, err := labels.NewMatcher(t, m.Name, m.Value)
res = append(res, pm)
}
return res, nil
}

// MatchersToPromMatchersCached returns Prometheus matchers from proto matchers.
// Works analogously to MatchersToPromMatchers but uses cache to avoid unnecessary allocations and conversions.
// NOTE: It allocates memory.
func MatchersToPromMatchersCached(cache *MatchersCache, ms ...LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for _, m := range ms {
pm, err := cache.GetOrSet(m, MatcherToPromMatcher)
if err != nil {
return nil, err
}
res = append(res, m)
res = append(res, pm)
}
return res, nil
}

// MatcherToPromMatcher converts a Thanos label matcher to Prometheus label matcher.
func MatcherToPromMatcher(m LabelMatcher) (*labels.Matcher, error) {
var t labels.MatchType

switch m.Type {
case LabelMatcher_EQ:
t = labels.MatchEqual
case LabelMatcher_NEQ:
t = labels.MatchNotEqual
case LabelMatcher_RE:
t = labels.MatchRegexp
case LabelMatcher_NRE:
t = labels.MatchNotRegexp
default:
return nil, errors.Errorf("unrecognized label matcher type %d", m.Type)
}
return labels.NewMatcher(t, m.Name, m.Value)
}

// MatchersToString converts label matchers to string format.
// String should be parsable as a valid PromQL query metric selector.
func MatchersToString(ms ...LabelMatcher) string {
Expand Down
97 changes: 97 additions & 0 deletions pkg/store/storepb/matcher_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storepb

import (
lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
)

const DefaultCacheSize = 200

type NewItemFunc func(matcher LabelMatcher) (*labels.Matcher, error)

type MatchersCache struct {
reg prometheus.Registerer
cache *lru.TwoQueueCache[LabelMatcher, *labels.Matcher]
metrics *matcherCacheMetrics
size int
}

type MatcherCacheOption func(*MatchersCache)

func WithPromRegistry(reg prometheus.Registerer) MatcherCacheOption {
return func(c *MatchersCache) {
c.reg = reg
}
}

func WithSize(size int) MatcherCacheOption {
return func(c *MatchersCache) {
c.size = size
}
}

func NewMatchersCache(opts ...MatcherCacheOption) *MatchersCache {
cache := &MatchersCache{
reg: prometheus.NewRegistry(),
size: DefaultCacheSize,
}

for _, opt := range opts {
opt(cache)
}
cache.metrics = newMatcherCacheMetrics(cache.reg)

return cache
}

func (c *MatchersCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
if c.cache == nil {
lruCache, err := lru.New2Q[LabelMatcher, *labels.Matcher](c.size)
if err != nil {
return nil, err
}
c.cache = lruCache
}
c.metrics.requestsTotal.Inc()
if item, ok := c.cache.Get(key); ok {
c.metrics.hitsTotal.Inc()
return item, nil
}

item, err := newItem(key)
if err != nil {
return nil, err
}
c.cache.Add(key, item)
c.metrics.numItems.Set(float64(c.cache.Len()))

return item, nil
}

type matcherCacheMetrics struct {
requestsTotal prometheus.Counter
hitsTotal prometheus.Counter
numItems prometheus.Gauge
}

func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics {
return &matcherCacheMetrics{
requestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_matchers_cache_requests_total",
Help: "Total number of cache requests for series matchers",
}),
hitsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_matchers_cache_hits_total",
Help: "Total number of cache hits for series matchers",
}),
numItems: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_matchers_cache_items",
Help: "Total number of cached items",
}),
}
}
Loading

0 comments on commit a58508d

Please sign in to comment.