Skip to content

Commit

Permalink
feature(carbonserver): introduce a new cache for expanded globs and u…
Browse files Browse the repository at this point in the history
…se it in both find and render http handlers; also switch grpc find/render from findCache to this new cache and restore grpc find response caching into findCache

Couple points about the new cache:
* it's initialised without memory limit, same as find response and render response caches. Might be worth to review this in future for all 3 caches.
* there is no toggle to disable this cache, it can be added later if needed.
  • Loading branch information
Anton Timofieiev committed Feb 5, 2023
1 parent 8192946 commit b61bb69
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 41 deletions.
18 changes: 10 additions & 8 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ type CarbonserverListener struct {
queryCache queryCache
findCacheEnabled bool
findCache queryCache
expandedGlobsCache queryCache // TODO: rename queryCache type to be more generic
trigramIndex bool
trieIndex bool
concurrentIndex bool
Expand Down Expand Up @@ -473,14 +474,15 @@ type fileIndex struct {
func NewCarbonserverListener(cacheGetFunc func(key string) []points.Point) *CarbonserverListener {
return &CarbonserverListener{
// Config variables
metrics: &metricStruct{},
metricsAsCounters: false,
cacheGet: cacheGetFunc,
logger: zapwriter.Logger("carbonserver"),
accessLogger: zapwriter.Logger("access"),
findCache: queryCache{ec: expirecache.New(0)},
trigramIndex: true,
percentiles: []int{100, 99, 98, 95, 75, 50},
metrics: &metricStruct{},
metricsAsCounters: false,
cacheGet: cacheGetFunc,
logger: zapwriter.Logger("carbonserver"),
accessLogger: zapwriter.Logger("access"),
findCache: queryCache{ec: expirecache.New(0)},
expandedGlobsCache: queryCache{ec: expirecache.New(0)},
trigramIndex: true,
percentiles: []int{100, 99, 98, 95, 75, 50},
prometheus: prometheus{
request: func(string, int) {},
duration: func(time.Duration) {},
Expand Down
49 changes: 35 additions & 14 deletions carbonserver/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func getProtoV2FindResponse(expandedGlob globs, query string) *protov2.GlobRespo
func (listener *CarbonserverListener) findMetrics(ctx context.Context, logger *zap.Logger, t0 time.Time, format responseFormat, names []string) (*findResponse, error) {
var result findResponse
metricsCount := uint64(0)
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, t0, names)
expandedGlobs, _, err := listener.getExpandedGlobsWithCache(ctx, logger, names)
if expandedGlobs == nil {
return nil, err
}
Expand Down Expand Up @@ -412,6 +412,22 @@ GATHER:
return expandedGlobs, nil
}

func (listener *CarbonserverListener) getExpandedGlobsWithCache(ctx context.Context, logger *zap.Logger, queries []string) ([]globs, bool, error) {
key := strings.Join(queries, "&")
size := uint64(100 * 1024 * 1024)
expandedGlobs, isCacheHit, err := getWithCache(logger, listener.expandedGlobsCache, key, size, 300,
func() (interface{}, error) {
return listener.getExpandedGlobs(ctx, logger, time.Now(), queries)
})

var expandedGlobsCasted []globs
if err == nil {
expandedGlobsCasted = expandedGlobs.([]globs)
}

return expandedGlobsCasted, isCacheHit, err
}

func (listener *CarbonserverListener) Find(ctx context.Context, req *protov2.GlobRequest) (*protov2.GlobResponse, error) {
t0 := time.Now()
span := trace.SpanFromContext(ctx)
Expand Down Expand Up @@ -456,13 +472,23 @@ func (listener *CarbonserverListener) Find(ctx context.Context, req *protov2.Glo
fromCache := false
var finalRes *protov2.GlobResponse
var lookups uint32
expandedGlobs, _, err := listener.getExpandedGlobsWithCache(ctx, logger, []string{query})
if err != nil {
return nil, err
}

if listener.findCacheEnabled {
key := query
key := query + "&" + format + "grpc"
size := uint64(100 * 1024 * 1024)
var result interface{}
result, fromCache, err = getWithCache(logger, listener.findCache, key, size, 300,
func() (interface{}, error) {
return listener.getExpandedGlobs(ctx, logger, t0, []string{query})
finalRes = getProtoV2FindResponse(expandedGlobs[0], query)
lookups = expandedGlobs[0].Lookups
if len(finalRes.Matches) == 0 {
return nil, errorNotFound{}
}
return finalRes, nil
})
if err == nil {
listener.prometheus.cacheRequest("find", fromCache)
Expand All @@ -471,21 +497,16 @@ func (listener *CarbonserverListener) Find(ctx context.Context, req *protov2.Glo
} else {
atomic.AddUint64(&listener.metrics.FindCacheMiss, 1)
}
expandedGlobs := result.([]globs)
finalRes = getProtoV2FindResponse(expandedGlobs[0], query)
finalRes = result.(*protov2.GlobResponse)
if len(finalRes.Matches) == 0 {
err = errorNotFound{}
}
}
} else {
var expandedGlobs []globs
expandedGlobs, err = listener.getExpandedGlobs(ctx, logger, t0, []string{query})
if err == nil {
finalRes = getProtoV2FindResponse(expandedGlobs[0], query)
lookups = expandedGlobs[0].Lookups
if len(finalRes.Matches) == 0 {
finalRes, err = nil, errorNotFound{}
}
} else if err == nil {
finalRes = getProtoV2FindResponse(expandedGlobs[0], query)
lookups = expandedGlobs[0].Lookups
if len(finalRes.Matches) == 0 {
finalRes, err = nil, errorNotFound{}
}
}

Expand Down
23 changes: 4 additions & 19 deletions carbonserver/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,9 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expansionT0 := time.Now()
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
expandedGlobs, isExpandCacheHit, err := listener.getExpandedGlobsWithCache(ctx, logger, metricNames)
tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second)
tle.FindFromCache = isExpandCacheHit
if expandedGlobs == nil {
return fetchResponse{nil, contentType, 0, 0, 0, nil}, err
}
Expand Down Expand Up @@ -711,28 +712,12 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
responseChan := make(chan response, 1000)

fetchAndStreamMetricsFunc := func(getMetrics bool) ([]response, error) {
var err error
var findFromCache bool
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expansionT0 := time.Now()
var expandedGlobs []globs
if listener.findCacheEnabled {
key := strings.Join(metricNames, "&")
size := uint64(100 * 1024 * 1024)
var result interface{}
result, findFromCache, err = getWithCache(logger, listener.findCache, key, size, 300,
func() (interface{}, error) {
return listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
})
if err == nil {
expandedGlobs = result.([]globs)
tle.FindFromCache = findFromCache
}
} else {
expandedGlobs, err = listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
}
expandedGlobs, isExpandCacheHit, err := listener.getExpandedGlobsWithCache(ctx, logger, metricNames)
tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second)
tle.FindFromCache = isExpandCacheHit
if expandedGlobs == nil {
if err != nil {
return nil, status.New(codes.InvalidArgument, err.Error()).Err()
Expand Down

0 comments on commit b61bb69

Please sign in to comment.