From 676cb0e1ff187644a560028a11c45119489b6d6c Mon Sep 17 00:00:00 2001 From: Anton Timofieiev Date: Mon, 6 Feb 2023 11:19:46 +0100 Subject: [PATCH] optimisation(carbonserver): separate grpc expandedGlobsCache from findCache into a separate one, and restore response caching in findCache; and use expandedGlobsCache in http find/render This is expected to speed up http renders. A few points about the new cache: * it's initialised without memory limit, same as find response and render response caches. Might be worth to review this in future for all 3 caches. * there is no toggle to disable this cache, it can be added later if needed. * 4 graphite metrics are introduced for visibility into cache performance: find_expanded_globs_cache_hit find_expanded_globs_cache_miss render_expanded_globs_cache_hit render_expanded_globs_cache_miss --- carbonserver/carbonserver.go | 110 +++++++++++++++++++---------------- carbonserver/find.go | 64 +++++++++++++++----- carbonserver/render.go | 23 ++------ 3 files changed, 115 insertions(+), 82 deletions(-) diff --git a/carbonserver/carbonserver.go b/carbonserver/carbonserver.go index 1e4368bcd..cbf14afb0 100644 --- a/carbonserver/carbonserver.go +++ b/carbonserver/carbonserver.go @@ -66,47 +66,51 @@ import ( ) type metricStruct struct { - RenderRequests uint64 - RenderErrors uint64 - NotFound uint64 - FindRequests uint64 - FindErrors uint64 - FindZero uint64 - InfoRequests uint64 - InfoErrors uint64 - ListRequests uint64 - ListErrors uint64 - ListQueryRequests uint64 - ListQueryErrors uint64 - DetailsRequests uint64 - DetailsErrors uint64 - CacheHit uint64 - CacheMiss uint64 - CacheRequestsTotal uint64 - CacheWorkTimeNS uint64 - CacheWaitTimeFetchNS uint64 - DiskWaitTimeNS uint64 - DiskRequests uint64 - PointsReturned uint64 - MetricsReturned uint64 - MetricsKnown uint64 - FileScanTimeNS uint64 - IndexBuildTimeNS uint64 - MetricsFetched uint64 - MetricsFound uint64 - ThrottledCreates uint64 - MaxCreatesPerSecond uint64 - FetchSize uint64 - QueryCacheHit uint64 - QueryCacheMiss uint64 - FindCacheHit uint64 - FindCacheMiss uint64 - TrieNodes uint64 - TrieFiles uint64 - TrieDirs uint64 - TrieCountNodesTimeNs uint64 - QuotaApplyTimeNs uint64 - UsageRefreshTimeNs uint64 + RenderRequests uint64 + RenderErrors uint64 + NotFound uint64 + FindRequests uint64 + FindErrors uint64 + FindZero uint64 + InfoRequests uint64 + InfoErrors uint64 + ListRequests uint64 + ListErrors uint64 + ListQueryRequests uint64 + ListQueryErrors uint64 + DetailsRequests uint64 + DetailsErrors uint64 + CacheHit uint64 + CacheMiss uint64 + CacheRequestsTotal uint64 + CacheWorkTimeNS uint64 + CacheWaitTimeFetchNS uint64 + DiskWaitTimeNS uint64 + DiskRequests uint64 + PointsReturned uint64 + MetricsReturned uint64 + MetricsKnown uint64 + FileScanTimeNS uint64 + IndexBuildTimeNS uint64 + MetricsFetched uint64 + MetricsFound uint64 + ThrottledCreates uint64 + MaxCreatesPerSecond uint64 + FetchSize uint64 + QueryCacheHit uint64 + QueryCacheMiss uint64 + FindCacheHit uint64 + FindCacheMiss uint64 + findExpandedGlobsCachedHit uint64 + findExpandedGlobsCacheMiss uint64 + renderExpandedGlobsCacheHit uint64 + renderExpandedGlobsCacheMiss uint64 + TrieNodes uint64 + TrieFiles uint64 + TrieDirs uint64 + TrieCountNodesTimeNs uint64 + QuotaApplyTimeNs uint64 + UsageRefreshTimeNs uint64 InflightRequests uint64 RejectedTooManyRequests uint64 @@ -255,6 +259,7 @@ type CarbonserverListener struct { queryCache queryCache findCacheEnabled bool findCache queryCache + expandedGlobsCache queryCache // TODO: rename queryCache type to be more generic trigramIndex bool trieIndex bool concurrentIndex bool @@ -473,14 +478,15 @@ type fileIndex struct { func NewCarbonserverListener(cacheGetFunc func(key string) []points.Point) *CarbonserverListener { return &CarbonserverListener{ // Config variables - metrics: &metricStruct{}, - metricsAsCounters: false, - cacheGet: cacheGetFunc, - logger: zapwriter.Logger("carbonserver"), - accessLogger: zapwriter.Logger("access"), - findCache: queryCache{ec: expirecache.New(0)}, - trigramIndex: true, - percentiles: []int{100, 99, 98, 95, 75, 50}, + metrics: &metricStruct{}, + metricsAsCounters: false, + cacheGet: cacheGetFunc, + logger: zapwriter.Logger("carbonserver"), + accessLogger: zapwriter.Logger("access"), + findCache: queryCache{ec: expirecache.New(0)}, + expandedGlobsCache: queryCache{ec: expirecache.New(0)}, + trigramIndex: true, + percentiles: []int{100, 99, 98, 95, 75, 50}, prometheus: prometheus{ request: func(string, int) {}, duration: func(time.Duration) {}, @@ -1523,6 +1529,12 @@ func (listener *CarbonserverListener) Stat(send helper.StatCallback) { sender("find_cache_hit", &listener.metrics.FindCacheHit, send) sender("find_cache_miss", &listener.metrics.FindCacheMiss, send) + sender("find_expanded_globs_cache_hit", &listener.metrics.findExpandedGlobsCachedHit, send) + sender("find_expanded_globs_cache_miss", &listener.metrics.findExpandedGlobsCacheMiss, send) + + sender("render_expanded_globs_cache_hit", &listener.metrics.renderExpandedGlobsCacheHit, send) + sender("render_expanded_globs_cache_miss", &listener.metrics.renderExpandedGlobsCacheMiss, send) + sender("inflight_requests_count", &listener.metrics.InflightRequests, send) senderRaw("inflight_requests_limit", &listener.MaxInflightRequests, send) sender("rejected_too_many_requests", &listener.metrics.RejectedTooManyRequests, send) diff --git a/carbonserver/find.go b/carbonserver/find.go index 0ea2ae860..6e2a642e2 100644 --- a/carbonserver/find.go +++ b/carbonserver/find.go @@ -227,7 +227,7 @@ func getProtoV2FindResponse(expandedGlob globs, query string) *protov2.GlobRespo func (listener *CarbonserverListener) findMetrics(ctx context.Context, logger *zap.Logger, t0 time.Time, format responseFormat, names []string) (*findResponse, error) { var result findResponse metricsCount := uint64(0) - expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, t0, names) + expandedGlobs, _, err := listener.getExpandedGlobsWithCache(ctx, logger, "find", names) if expandedGlobs == nil { return nil, err } @@ -412,6 +412,37 @@ GATHER: return expandedGlobs, nil } +func (listener *CarbonserverListener) getExpandedGlobsWithCache(ctx context.Context, logger *zap.Logger, handler string, queries []string) ([]globs, bool, error) { + key := strings.Join(queries, "&") + size := uint64(100 * 1024 * 1024) + expandedGlobs, isCacheHit, err := getWithCache(logger, listener.expandedGlobsCache, key, size, 300, + func() (interface{}, error) { + return listener.getExpandedGlobs(ctx, logger, time.Now(), queries) + }) + + var expandedGlobsCasted []globs + if err == nil { + expandedGlobsCasted = expandedGlobs.([]globs) + } + + switch handler { + case "find": + if isCacheHit { + atomic.AddUint64(&listener.metrics.findExpandedGlobsCachedHit, 1) + } else { + atomic.AddUint64(&listener.metrics.findExpandedGlobsCacheMiss, 1) + } + case "render": + if isCacheHit { + atomic.AddUint64(&listener.metrics.renderExpandedGlobsCacheHit, 1) + } else { + atomic.AddUint64(&listener.metrics.renderExpandedGlobsCacheMiss, 1) + } + } + + return expandedGlobsCasted, isCacheHit, err +} + func (listener *CarbonserverListener) Find(ctx context.Context, req *protov2.GlobRequest) (*protov2.GlobResponse, error) { t0 := time.Now() span := trace.SpanFromContext(ctx) @@ -456,13 +487,23 @@ func (listener *CarbonserverListener) Find(ctx context.Context, req *protov2.Glo fromCache := false var finalRes *protov2.GlobResponse var lookups uint32 + expandedGlobs, _, err := listener.getExpandedGlobsWithCache(ctx, logger, "find", []string{query}) + if err != nil { + return nil, err + } + if listener.findCacheEnabled { - key := query + key := query + "&" + format + "grpc" size := uint64(100 * 1024 * 1024) var result interface{} result, fromCache, err = getWithCache(logger, listener.findCache, key, size, 300, func() (interface{}, error) { - return listener.getExpandedGlobs(ctx, logger, t0, []string{query}) + finalRes = getProtoV2FindResponse(expandedGlobs[0], query) + lookups = expandedGlobs[0].Lookups + if len(finalRes.Matches) == 0 { + return nil, errorNotFound{} + } + return finalRes, nil }) if err == nil { listener.prometheus.cacheRequest("find", fromCache) @@ -471,21 +512,16 @@ func (listener *CarbonserverListener) Find(ctx context.Context, req *protov2.Glo } else { atomic.AddUint64(&listener.metrics.FindCacheMiss, 1) } - expandedGlobs := result.([]globs) - finalRes = getProtoV2FindResponse(expandedGlobs[0], query) + finalRes = result.(*protov2.GlobResponse) if len(finalRes.Matches) == 0 { err = errorNotFound{} } } - } else { - var expandedGlobs []globs - expandedGlobs, err = listener.getExpandedGlobs(ctx, logger, t0, []string{query}) - if err == nil { - finalRes = getProtoV2FindResponse(expandedGlobs[0], query) - lookups = expandedGlobs[0].Lookups - if len(finalRes.Matches) == 0 { - finalRes, err = nil, errorNotFound{} - } + } else if err == nil { + finalRes = getProtoV2FindResponse(expandedGlobs[0], query) + lookups = expandedGlobs[0].Lookups + if len(finalRes.Matches) == 0 { + finalRes, err = nil, errorNotFound{} } } diff --git a/carbonserver/render.go b/carbonserver/render.go index 64e8967ce..ab101491e 100644 --- a/carbonserver/render.go +++ b/carbonserver/render.go @@ -478,8 +478,9 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg metricNames := getUniqueMetricNames(targets) // TODO: pipeline? expansionT0 := time.Now() - expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames) + expandedGlobs, isExpandCacheHit, err := listener.getExpandedGlobsWithCache(ctx, logger, "render", metricNames) tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second) + tle.FindFromCache = isExpandCacheHit if expandedGlobs == nil { return fetchResponse{nil, contentType, 0, 0, 0, nil}, err } @@ -711,28 +712,12 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str responseChan := make(chan response, 1000) fetchAndStreamMetricsFunc := func(getMetrics bool) ([]response, error) { - var err error - var findFromCache bool metricNames := getUniqueMetricNames(targets) // TODO: pipeline? expansionT0 := time.Now() - var expandedGlobs []globs - if listener.findCacheEnabled { - key := strings.Join(metricNames, "&") - size := uint64(100 * 1024 * 1024) - var result interface{} - result, findFromCache, err = getWithCache(logger, listener.findCache, key, size, 300, - func() (interface{}, error) { - return listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames) - }) - if err == nil { - expandedGlobs = result.([]globs) - tle.FindFromCache = findFromCache - } - } else { - expandedGlobs, err = listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames) - } + expandedGlobs, isExpandCacheHit, err := listener.getExpandedGlobsWithCache(ctx, logger, "render", metricNames) tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second) + tle.FindFromCache = isExpandCacheHit if expandedGlobs == nil { if err != nil { return nil, status.New(codes.InvalidArgument, err.Error()).Err()