Skip to content

Commit

Permalink
feat(kuma-cp) read only cache manager
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubdyszkiewicz committed Mar 17, 2020
1 parent 079dbbd commit e330872
Show file tree
Hide file tree
Showing 27 changed files with 345 additions and 139 deletions.
12 changes: 4 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,37 @@ require (
github.com/Masterminds/sprig v2.20.0+incompatible
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/emicklei/go-restful v2.9.6+incompatible
github.com/emicklei/go-restful-openapi v1.2.0
github.com/envoyproxy/go-control-plane v0.9.1-0.20191108215040-b0f2cec0e187
github.com/envoyproxy/protoc-gen-validate v0.3.0-java.0.20200311152155-ab56c3dd1cf9
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/ghodss/yaml v1.0.0
github.com/go-logr/glogr v0.1.0 // indirect
github.com/go-logr/logr v0.1.0
github.com/go-logr/zapr v0.1.0
github.com/golang-migrate/migrate v3.5.4+incompatible // indirect
github.com/golang-migrate/migrate/v4 v4.8.0
github.com/golang/protobuf v1.3.5
github.com/google/uuid v1.1.1
github.com/google/uuid v1.1.1 // indirect
github.com/hoisie/mustache v0.0.0-20160804235033-6375acf62c69
github.com/huandu/xstrings v1.2.0 // indirect
github.com/kelseyhightower/envconfig v1.4.0
github.com/lib/pq v1.1.1
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/onsi/ginkgo v1.12.0
github.com/onsi/gomega v1.9.0
github.com/pborman/uuid v0.0.0-20170612153648-e790cca94e6c
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.8.1
github.com/prometheus/common v0.4.1
github.com/prometheus/prometheus v0.0.0-00010101000000-000000000000
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect
github.com/spf13/cobra v0.0.5
github.com/spiffe/go-spiffe v0.0.0-20190820222348-6adcf1eecbcc
github.com/spiffe/spire v0.0.0-20190905203639-e85640baca1d
go.uber.org/multierr v1.1.0
go.uber.org/zap v1.9.1
golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 // indirect
golang.org/x/net v0.0.0-20200301022130-244492dfa37a // indirect
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a // indirect
golang.org/x/sys v0.0.0-20200316230553-a7d97aace0b0 // indirect
golang.org/x/tools v0.0.0-20200317043434-63da46f3035e
golang.org/x/tools v0.0.0-20200317043434-63da46f3035e // indirect
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55
google.golang.org/grpc v1.23.0
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
Expand Down
120 changes: 26 additions & 94 deletions go.sum

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pkg/api-server/config_ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ var _ = Describe("Config WS", func() {
},
"user": "kuma"
},
"cache": {
"enabled": true,
"expirationTime": "1s"
},
"type": "memory"
},
"xdsServer": {
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ store:
# Path to the root certificate. Used in verify-ca and verify-full modes.
caPath: # ENV: KUMA_STORE_POSTGRES_TLS_ROOT_CERT_PATH

# Cache for read only operations. This cache is local to the instance of the control plane.
cache:
# If true then cache is enabled
enabled: true
# Expiration time for elements in cache.
expirationTime: 1s

# Configuration of Bootstrap Server, which provides bootstrap config to Dataplanes
bootstrapServer:
# Port of Server that provides bootstrap configuration for dataplanes
Expand Down
29 changes: 29 additions & 0 deletions pkg/config/core/resources/store/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package store

import (
"time"

"github.com/pkg/errors"

"github.com/Kong/kuma/pkg/config"
Expand All @@ -26,13 +28,16 @@ type StoreConfig struct {
Postgres *postgres.PostgresStoreConfig `yaml:"postgres"`
// Kubernetes Store configuration
Kubernetes *k8s.KubernetesStoreConfig `yaml:"kubernetes"`
// Cache configuration
Cache CacheStoreConfig `yaml:"cache"`
}

func DefaultStoreConfig() *StoreConfig {
return &StoreConfig{
Type: MemoryStore,
Postgres: postgres.DefaultPostgresStoreConfig(),
Kubernetes: k8s.DefaultKubernetesStoreConfig(),
Cache: DefaultCacheStoreConfig(),
}
}

Expand All @@ -57,5 +62,29 @@ func (s *StoreConfig) Validate() error {
default:
return errors.Errorf("Type should be either %s, %s or %s", PostgresStore, KubernetesStore, MemoryStore)
}
if err := s.Cache.Validate(); err != nil {
return errors.Wrap(err, "Cache validation failed")
}
return nil
}

var _ config.Config = &CacheStoreConfig{}

type CacheStoreConfig struct {
Enabled bool `yaml:"enabled" envconfig:"kuma_store_cache_enabled"`
ExpirationTime time.Duration `yaml:"expirationTime" envconfig:"kuma_store_cache_expiration_time"`
}

func (c CacheStoreConfig) Sanitize() {
}

func (c CacheStoreConfig) Validate() error {
return nil
}

func DefaultCacheStoreConfig() CacheStoreConfig {
return CacheStoreConfig{
Enabled: true,
ExpirationTime: time.Second,
}
}
8 changes: 8 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Store.Postgres.ConnectionTimeout).To(Equal(10))
Expect(cfg.Store.Postgres.MaxOpenConnections).To(Equal(300))

Expect(cfg.Store.Cache.Enabled).To(BeFalse())
Expect(cfg.Store.Cache.ExpirationTime).To(Equal(3 * time.Second))

Expect(cfg.Store.Postgres.TLS.Mode).To(Equal(postgres.VerifyFull))
Expect(cfg.Store.Postgres.TLS.CertPath).To(Equal("/path/to/cert"))
Expect(cfg.Store.Postgres.TLS.KeyPath).To(Equal("/path/to/key"))
Expand Down Expand Up @@ -142,6 +145,9 @@ store:
certPath: /path/to/cert
keyPath: /path/to/key
caPath: /path/to/rootCert
cache:
enabled: false
expirationTime: 3s
xdsServer:
grpcPort: 5000
diagnosticsPort: 5003
Expand Down Expand Up @@ -220,6 +226,8 @@ guiServer:
"KUMA_STORE_POSTGRES_TLS_CERT_PATH": "/path/to/cert",
"KUMA_STORE_POSTGRES_TLS_KEY_PATH": "/path/to/key",
"KUMA_STORE_POSTGRES_TLS_CA_PATH": "/path/to/rootCert",
"KUMA_STORE_CACHE_ENABLED": "false",
"KUMA_STORE_CACHE_EXPIRATION_TIME": "3s",
"KUMA_API_SERVER_READ_ONLY": "true",
"KUMA_API_SERVER_PORT": "9090",
"KUMA_DATAPLANE_TOKEN_SERVER_ENABLED": "true",
Expand Down
6 changes: 6 additions & 0 deletions pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ func initializeResourceManager(builder *core_runtime.Builder) {
meshManager := mesh_managers.NewMeshManager(builder.ResourceStore(), builder.BuiltinCaManager(), builder.ProvidedCaManager(), customizableManager, builder.SecretManager(), registry.Global())
customManagers[mesh.MeshType] = meshManager
builder.WithResourceManager(customizableManager)

if builder.Config().Store.Cache.Enabled {
builder.WithReadOnlyResourceManager(core_manager.NewCachedManager(customizableManager, builder.Config().Store.Cache.ExpirationTime))
} else {
builder.WithReadOnlyResourceManager(customizableManager)
}
}

func customizeRuntime(rt core_runtime.Runtime) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/logs/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var logger = core.Log.WithName("logs")
// If we define rule kong->backend, it is also applied for kong-admin because there is no way to differentiate
// traffic from services that are using one dataplane.
type TrafficLogsMatcher struct {
ResourceManager manager.ResourceManager
ResourceManager manager.ReadOnlyResourceManager
}

func (m *TrafficLogsMatcher) Match(ctx context.Context, dataplane *mesh_core.DataplaneResource) (core_xds.LogMap, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/permissions/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type TrafficPermissionsMatcher struct {
ResourceManager manager.ResourceManager
ResourceManager manager.ReadOnlyResourceManager
}

type MatchedPermissions map[mesh_proto.InboundInterface]*mesh_core.TrafficPermissionResourceList
Expand Down
73 changes: 73 additions & 0 deletions pkg/core/resources/manager/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package manager

import (
"context"
"fmt"
"time"

"github.com/patrickmn/go-cache"

"github.com/Kong/kuma/pkg/core/resources/model"
"github.com/Kong/kuma/pkg/core/resources/store"
)

// Cached version of the ReadOnlyResourceManager designed to be used only for use cases of eventual consistency.
//
// This cache is NOT consistent across instances of the control plane.
// This cache is mutex free for performance with consideration that: values can be overridden by other goroutine.
// * if cache expires and multiple goroutines tries to fetch the resource, they may fetch underlying manager multiple times.
// * if cache expires and multiple goroutines tries to fetch the resource, they fetch underlying manager multiple times
// and the value returned is different, the older value may be persisted. This is ok, since this cache is designed
// to have low expiration time (like 1s) and having old value just extends propagation of new config for 1 more second.
type cachedManager struct {
delegate ReadOnlyResourceManager
cache *cache.Cache
}

var _ ReadOnlyResourceManager = &cachedManager{}

func NewCachedManager(delegate ReadOnlyResourceManager, expirationTime time.Duration) ReadOnlyResourceManager {
return &cachedManager{
delegate: delegate,
cache: cache.New(expirationTime, time.Duration(int64(float64(expirationTime)*0.9))),
}
}

func (c cachedManager) Get(ctx context.Context, res model.Resource, fs ...store.GetOptionsFunc) error {
opts := store.NewGetOptions(fs...)
cacheKey := fmt.Sprintf("GET:%s:%s", res.GetType(), opts.HashCode())
obj, found := c.cache.Get(cacheKey)
if !found {
if err := c.delegate.Get(ctx, res, fs...); err != nil {
return err
}
c.cache.SetDefault(cacheKey, res)
} else {
cached := obj.(model.Resource)
if err := res.SetSpec(cached.GetSpec()); err != nil {
return err
}
res.SetMeta(cached.GetMeta())
}
return nil
}

func (c cachedManager) List(ctx context.Context, list model.ResourceList, fs ...store.ListOptionsFunc) error {
opts := store.NewListOptions(fs...)
cacheKey := fmt.Sprintf("LIST:%s:%s", list.GetItemType(), opts.HashCode())
obj, found := c.cache.Get(cacheKey)
if !found {
if err := c.delegate.List(ctx, list, fs...); err != nil {
return err
}
c.cache.SetDefault(cacheKey, list.GetItems())
} else {
resources := obj.([]model.Resource)
for _, res := range resources {
if err := list.AddItem(res); err != nil {
return err
}
}
}
return nil
}
121 changes: 121 additions & 0 deletions pkg/core/resources/manager/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package manager_test

import (
"context"
"time"

mesh_proto "github.com/Kong/kuma/api/mesh/v1alpha1"
core_mesh "github.com/Kong/kuma/pkg/core/resources/apis/mesh"
core_manager "github.com/Kong/kuma/pkg/core/resources/manager"
core_model "github.com/Kong/kuma/pkg/core/resources/model"
core_store "github.com/Kong/kuma/pkg/core/resources/store"
"github.com/Kong/kuma/pkg/plugins/resources/memory"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

type countingResourcesManager struct {
store core_store.ResourceStore
getQueries int
listQueries int
}

func (c *countingResourcesManager) Get(ctx context.Context, res core_model.Resource, fn ...core_store.GetOptionsFunc) error {
c.getQueries++
return c.store.Get(ctx, res, fn...)
}

func (c *countingResourcesManager) List(ctx context.Context, list core_model.ResourceList, fn ...core_store.ListOptionsFunc) error {
c.listQueries++
return c.store.List(ctx, list, fn...)
}

var _ core_manager.ReadOnlyResourceManager = &countingResourcesManager{}

var _ = Describe("Cached Resource Manager", func() {

var store core_store.ResourceStore
var cachedManager core_manager.ReadOnlyResourceManager
var countingManager *countingResourcesManager
var res *core_mesh.DataplaneResource
expiration := 100 * time.Millisecond

BeforeEach(func() {
// given
store = memory.NewStore()
countingManager = &countingResourcesManager{
store: store,
}
cachedManager = core_manager.NewCachedManager(countingManager, expiration)

// and created resources
res = &core_mesh.DataplaneResource{
Spec: mesh_proto.Dataplane{
Networking: &mesh_proto.Dataplane_Networking{
Address: "127.0.0.1",
Inbound: []*mesh_proto.Dataplane_Networking_Inbound{
{
Port: 80,
ServicePort: 8080,
},
},
},
},
}
err := store.Create(context.Background(), res, core_store.CreateByKey("dp-1", "default"))
Expect(err).ToNot(HaveOccurred())
})

It("should cache Get() queries", func() {
// when fetched resources multiple times
fetch := func() core_mesh.DataplaneResource {
fetched := core_mesh.DataplaneResource{}
err := cachedManager.Get(context.Background(), &fetched, core_store.GetByKey("dp-1", "default"))
Expect(err).ToNot(HaveOccurred())
return fetched
}

for i := 0; i < 100; i++ {
fetch()
}

// then real manager should be called only once
Expect(fetch().Spec).To(Equal(res.Spec))
Expect(countingManager.getQueries).To(Equal(1))

// when
time.Sleep(expiration)

// then
Expect(fetch().Spec).To(Equal(res.Spec))
Expect(countingManager.getQueries).To(Equal(2))
})

It("should cache List() queries", func() {
// when fetched resources multiple times
fetch := func() core_mesh.DataplaneResourceList {
fetched := core_mesh.DataplaneResourceList{}
err := cachedManager.List(context.Background(), &fetched, core_store.ListByMesh("default"))
Expect(err).ToNot(HaveOccurred())
return fetched
}

for i := 0; i < 100; i++ {
fetch()
}

// then real manager should be called only once
Expect(fetch().Items).To(HaveLen(1))
Expect(fetch().Items[0].GetSpec()).To(Equal(&res.Spec))
Expect(countingManager.listQueries).To(Equal(1))

// when
time.Sleep(expiration)

// then
Expect(fetch().Items).To(HaveLen(1))
Expect(fetch().Items[0].GetSpec()).To(Equal(&res.Spec))
Expect(countingManager.listQueries).To(Equal(2))
})
})
Loading

0 comments on commit e330872

Please sign in to comment.