Skip to content

Commit

Permalink
Move everything common between cla and mesh cache in a common cache i…
Browse files Browse the repository at this point in the history
…mplementation
  • Loading branch information
Charly Molter committed Apr 28, 2021
1 parent 18baef9 commit ec2794b
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 81 deletions.
49 changes: 13 additions & 36 deletions pkg/xds/cache/cla/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/metrics"

"github.com/patrickmn/go-cache"

"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/dns/lookup"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
Expand All @@ -35,12 +33,11 @@ var (
// which reconcile Dataplane's state. In scope of one mesh ClusterLoadAssignment
// will be the same for each service so no need to reconcile for each dataplane.
type Cache struct {
cache *cache.Cache
cache *once.Cache
rm manager.ReadOnlyResourceManager
dsl datasource.Loader
ipFunc lookup.LookupIPFunc
zone string
onceMap *once.Map
metrics *prometheus.GaugeVec
}

Expand All @@ -51,36 +48,22 @@ func NewCache(
ipFunc lookup.LookupIPFunc,
metrics metrics.Metrics,
) (*Cache, error) {
metric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cla_cache",
Help: "Summary of CLA Cache",
}, []string{"operation", "result"})
if err := metrics.Register(metric); err != nil {
c, err := once.New(expirationTime, "cla_cache", metrics)
if err != nil {
return nil, err
}
return &Cache{
cache: cache.New(expirationTime, time.Duration(int64(float64(expirationTime)*0.9))),
rm: rm,
dsl: dsl,
zone: zone,
ipFunc: ipFunc,
onceMap: once.NewMap(),
metrics: metric,
cache: c,
rm: rm,
dsl: dsl,
zone: zone,
ipFunc: ipFunc,
}, nil
}

func (c *Cache) GetCLA(ctx context.Context, meshName, meshHash, service string, apiVersion envoy_common.APIVersion) (proto.Message, error) {
key := fmt.Sprintf("%s:%s:%s:%s", apiVersion, meshName, service, meshHash)
value, found := c.cache.Get(key)
if found {
c.metrics.WithLabelValues("get", "hit").Inc()
return value.(proto.Message), nil
}
o := c.onceMap.Get(key)
c.metrics.WithLabelValues("get", "hit-wait").Inc()
o.Do(func() (interface{}, error) {
c.metrics.WithLabelValues("get", "hit-wait").Dec()
c.metrics.WithLabelValues("get", "miss").Inc()
elt, err := c.cache.Get(ctx, key, func(ctx context.Context, key string) (interface{}, error) {
dataplanes, err := topology.GetDataplanes(claCacheLog, ctx, c.rm, c.ipFunc, meshName)
if err != nil {
return nil, err
Expand All @@ -94,16 +77,10 @@ func (c *Cache) GetCLA(ctx context.Context, meshName, meshHash, service string,
return nil, err
}
endpointMap := topology.BuildEndpointMap(mesh, c.zone, dataplanes.Items, externalServices.Items, c.dsl)
cla, err := envoy_endpoints.CreateClusterLoadAssignment(service, endpointMap[service], apiVersion)
if err != nil {
return nil, err
}
c.cache.SetDefault(key, cla)
c.onceMap.Delete(key)
return cla, nil
return envoy_endpoints.CreateClusterLoadAssignment(service, endpointMap[service], apiVersion)
})
if o.Err != nil {
return nil, o.Err
if err != nil {
return nil, err
}
return o.Value.(proto.Message), nil
return elt.(proto.Message), nil
}
48 changes: 13 additions & 35 deletions pkg/xds/cache/mesh/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (

"github.com/kumahq/kuma/pkg/xds/cache/once"

"github.com/patrickmn/go-cache"

"github.com/kumahq/kuma/pkg/core/dns/lookup"
"github.com/kumahq/kuma/pkg/core/resources/manager"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
Expand All @@ -21,11 +19,10 @@ import (
// reconcile Dataplane's state. Calculating hash is a heavy operation
// that requires fetching all the resources belonging to the Mesh.
type Cache struct {
cache *cache.Cache
cache *once.Cache
rm manager.ReadOnlyResourceManager
types []core_model.ResourceType
ipFunc lookup.LookupIPFunc
onceMap *once.Map
metrics *prometheus.GaugeVec
}

Expand All @@ -36,47 +33,28 @@ func NewCache(
ipFunc lookup.LookupIPFunc,
metrics metrics.Metrics,
) (*Cache, error) {
metric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "mesh_cache",
Help: "Summary of Mesh Cache",
}, []string{"operation", "result"})
if err := metrics.Register(metric); err != nil {
c, err := once.New(expirationTime, "mesh_cache", metrics)
if err != nil {
return nil, err
}
return &Cache{
rm: rm,
types: types,
ipFunc: ipFunc,
onceMap: once.NewMap(),
cache: cache.New(expirationTime, time.Duration(int64(float64(expirationTime)*0.9))),
metrics: metric,
rm: rm,
types: types,
ipFunc: ipFunc,
cache: c,
}, nil
}

func (c *Cache) GetHash(ctx context.Context, mesh string) (string, error) {
hash, found := c.cache.Get(mesh)
if found {
c.metrics.WithLabelValues("get", "hit").Inc()
return hash.(string), nil
}
o := c.onceMap.Get(mesh)
c.metrics.WithLabelValues("get", "hit-wait").Inc()
o.Do(func() (interface{}, error) {
c.metrics.WithLabelValues("get", "hit-wait").Dec()
c.metrics.WithLabelValues("get", "miss").Inc()
snapshot, err := GetMeshSnapshot(ctx, mesh, c.rm, c.types, c.ipFunc)
elt, err := c.cache.Get(ctx, mesh, func(ctx context.Context, key string) (interface{}, error) {
snapshot, err := GetMeshSnapshot(ctx, key, c.rm, c.types, c.ipFunc)
if err != nil {
// Don't cache errors
c.onceMap.Delete(mesh)
return nil, err
}
hash = snapshot.hash()
c.cache.SetDefault(mesh, hash)
c.onceMap.Delete(mesh)
return hash, nil
return snapshot.hash(), nil
})
if o.Err != nil {
return "", o.Err
if err != nil {
return "", err
}
return o.Value.(string), nil
return elt.(string), nil
}
57 changes: 57 additions & 0 deletions pkg/xds/cache/once/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package once

import (
"context"
"github.com/kumahq/kuma/pkg/metrics"
"github.com/patrickmn/go-cache"
"github.com/prometheus/client_golang/prometheus"
"time"
)

type Cache struct {
cache *cache.Cache
onceMap *omap
metrics *prometheus.GaugeVec
}

func New(expirationTime time.Duration, name string, metrics metrics.Metrics) (*Cache, error) {
metric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: name,
Help: "Summary of " + name,
}, []string{"operation", "result"})
if err := metrics.Register(metric); err != nil {
return nil, err
}
return &Cache{
cache: cache.New(expirationTime, time.Duration(int64(float64(expirationTime)*0.9))),
onceMap: newMap(),
metrics: metric,
}, nil
}

func (c *Cache) Get(ctx context.Context, key string, fn func(context.Context, string) (interface{}, error)) (interface{}, error) {
v, found := c.cache.Get(key)
if found {
c.metrics.WithLabelValues("get", "hit").Inc()
return v, nil
}
o := c.onceMap.Get(key)
c.metrics.WithLabelValues("get", "hit-wait").Inc()
o.Do(func() (interface{}, error) {
defer c.onceMap.Delete(key)
c.metrics.WithLabelValues("get", "hit-wait").Dec()
c.metrics.WithLabelValues("get", "miss").Inc()
res, err := fn(ctx, key)
if err != nil {
c.metrics.WithLabelValues("get", "error").Inc()
return nil, err
}
c.cache.SetDefault(key, res)
return res, nil
})
if o.Err != nil {
return "", o.Err
}
return o.Value, nil

}
160 changes: 160 additions & 0 deletions pkg/xds/cache/once/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package once_test

import (
"context"
"errors"
"github.com/kumahq/kuma/pkg/xds/cache/once"
"sync"
"sync/atomic"
"time"

core_metrics "github.com/kumahq/kuma/pkg/metrics"
test_metrics "github.com/kumahq/kuma/pkg/test/metrics"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("ClusterLoadAssignment Cache", func() {
var metrics core_metrics.Metrics
var cache *once.Cache

expiration := time.Millisecond * 200

BeforeEach(func() {
var err error
metrics, err = core_metrics.NewMetrics("Standalone")
Expect(err).ToNot(HaveOccurred())

cache, err = once.New(expiration, "cache", metrics)
Expect(err).ToNot(HaveOccurred())
})

It("should cache Get() queries", func() {
var count int32 = 0
var val int32 = 1
fn := func(ctx context.Context, s string) (interface{}, error) {
atomic.AddInt32(&count, 1)
v := atomic.LoadInt32(&val)
return v, nil
}
By("getting item for the first time")
out, err := cache.Get(context.Background(), "k1", fn)
Expect(err).ToNot(HaveOccurred())
Expect(out.(int32)).To(Equal(int32(1)))
Expect(atomic.LoadInt32(&count)).To(Equal(int32(1)))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "miss").Gauge.GetValue()).To(Equal(1.0))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "hit")).To(BeNil())
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "error")).To(BeNil())

By("getting cached item")
out, err = cache.Get(context.Background(), "k1", fn)
Expect(err).ToNot(HaveOccurred())
Expect(out.(int32)).To(Equal(int32(1)))
Expect(atomic.LoadInt32(&count)).To(Equal(int32(1)))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "miss").Gauge.GetValue()).To(Equal(1.0))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "hit").Gauge.GetValue()).To(Equal(1.0))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "error")).To(BeNil())

By("updating Dataplane in store")
atomic.StoreInt32(&val, 2)

By("cached value hasn't changed")
out, err = cache.Get(context.Background(), "k1", fn)
Expect(err).ToNot(HaveOccurred())
Expect(atomic.LoadInt32(&count)).To(Equal(int32(1)))
Expect(out.(int32)).To(Equal(int32(1)))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "miss").Gauge.GetValue()).To(Equal(1.0))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "hit").Gauge.GetValue()).To(Equal(2.0))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "error")).To(BeNil())

By("wait for invalidation")
time.Sleep(expiration)

By("get new value")
out, err = cache.Get(context.Background(), "k1", fn)
Expect(err).ToNot(HaveOccurred())
Expect(out.(int32)).To(Equal(int32(2)))
Expect(atomic.LoadInt32(&count)).To(Equal(int32(2)))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "miss").Gauge.GetValue()).To(Equal(2.0))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "hit").Gauge.GetValue()).To(Equal(2.0))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "error")).To(BeNil())
})

It("should cache concurrent Get() requests", func() {
var count int32 = 0
var val int32 = 1
fn := func(ctx context.Context, s string) (interface{}, error) {
atomic.AddInt32(&count, 1)
v := atomic.LoadInt32(&val)
return v, nil
}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
out, err := cache.Get(context.Background(), "key", fn)
Expect(err).ToNot(HaveOccurred())

Expect(out.(int32)).To(Equal(atomic.LoadInt32(&val)))
wg.Done()
}()
}
wg.Wait()

Expect(atomic.LoadInt32(&count)).To(Equal(int32(1)))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "miss").Gauge.GetValue()).To(Equal(1.0))
hitWaits := 0.0
if hw := test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "hit-wait"); hw != nil {
hitWaits = hw.Gauge.GetValue()
}
hits := 0.0
if h := test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "hit"); h != nil {
hits = h.Gauge.GetValue()
}
Expect(hitWaits + hits + 1).To(Equal(100.0))
})

It("should retry previously failed Get() requests", func() {
var count int32 = 0
var hasError int32 = 1
fn := func(ctx context.Context, s string) (interface{}, error) {
atomic.AddInt32(&count, 1)
if atomic.LoadInt32(&hasError) != 0 {
return "", errors.New("It's an error!")
}
return "hello", nil
}
By("getting Hash for the first time")
out, err := cache.Get(context.Background(), "key-1", fn)
Expect(err).To(HaveOccurred())
Expect(out).To(BeEmpty())
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "miss").Gauge.GetValue()).To(Equal(1.0))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "hit")).To(BeNil())
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "error").Gauge.GetValue()).To(Equal(1.0))

By("getting Hash calls again")
out, err = cache.Get(context.Background(), "key-1", fn)
Expect(err).To(HaveOccurred())
Expect(out).To(BeEmpty())
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "miss").Gauge.GetValue()).To(Equal(2.0))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "hit")).To(BeNil())
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "error").Gauge.GetValue()).To(Equal(2.0))

By("Getting the hash once manager is fixed")
atomic.StoreInt32(&hasError, 0)
out, err = cache.Get(context.Background(), "key-1", fn)
Expect(err).ToNot(HaveOccurred())
Expect(out).ToNot(BeEmpty())
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "miss").Gauge.GetValue()).To(Equal(3.0))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "hit")).To(BeNil())

By("Now it should cache the hash once manager is fixed")
err = nil
out, err = cache.Get(context.Background(), "key-1", fn)
Expect(err).ToNot(HaveOccurred())
Expect(out).ToNot(BeNil())
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "miss").Gauge.GetValue()).To(Equal(3.0))
Expect(test_metrics.FindMetric(metrics, "cache", "operation", "get", "result", "hit").Gauge.GetValue()).To(Equal(1.0))
})
})
Loading

0 comments on commit ec2794b

Please sign in to comment.