Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kuma-cp) Read only cache manager #634

Merged
merged 2 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,37 @@ require (
github.com/Masterminds/sprig v2.20.0+incompatible
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/emicklei/go-restful v2.9.6+incompatible
github.com/emicklei/go-restful-openapi v1.2.0
github.com/envoyproxy/go-control-plane v0.9.1-0.20191108215040-b0f2cec0e187
github.com/envoyproxy/protoc-gen-validate v0.3.0-java.0.20200311152155-ab56c3dd1cf9
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/ghodss/yaml v1.0.0
github.com/go-logr/glogr v0.1.0 // indirect
github.com/go-logr/logr v0.1.0
github.com/go-logr/zapr v0.1.0
github.com/golang-migrate/migrate v3.5.4+incompatible // indirect
github.com/golang-migrate/migrate/v4 v4.8.0
github.com/golang/protobuf v1.3.5
github.com/google/uuid v1.1.1
github.com/google/uuid v1.1.1 // indirect
github.com/hoisie/mustache v0.0.0-20160804235033-6375acf62c69
github.com/huandu/xstrings v1.2.0 // indirect
github.com/kelseyhightower/envconfig v1.4.0
github.com/lib/pq v1.1.1
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/onsi/ginkgo v1.12.0
github.com/onsi/gomega v1.9.0
github.com/pborman/uuid v0.0.0-20170612153648-e790cca94e6c
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.8.1
github.com/prometheus/common v0.4.1
github.com/prometheus/prometheus v0.0.0-00010101000000-000000000000
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect
github.com/spf13/cobra v0.0.5
github.com/spiffe/go-spiffe v0.0.0-20190820222348-6adcf1eecbcc
github.com/spiffe/spire v0.0.0-20190905203639-e85640baca1d
go.uber.org/multierr v1.1.0
go.uber.org/zap v1.9.1
golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 // indirect
golang.org/x/net v0.0.0-20200301022130-244492dfa37a // indirect
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a // indirect
golang.org/x/sys v0.0.0-20200316230553-a7d97aace0b0 // indirect
golang.org/x/tools v0.0.0-20200317043434-63da46f3035e
golang.org/x/tools v0.0.0-20200317043434-63da46f3035e // indirect
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55
google.golang.org/grpc v1.23.0
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
Expand Down
120 changes: 26 additions & 94 deletions go.sum

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pkg/api-server/config_ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ var _ = Describe("Config WS", func() {
},
"user": "kuma"
},
"cache": {
"enabled": true,
"expirationTime": "1s"
},
"type": "memory"
},
"xdsServer": {
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ store:
# Path to the root certificate. Used in verify-ca and verify-full modes.
caPath: # ENV: KUMA_STORE_POSTGRES_TLS_ROOT_CERT_PATH

# Cache for read only operations. This cache is local to the instance of the control plane.
cache:
# If true then cache is enabled
enabled: true
# Expiration time for elements in cache.
expirationTime: 1s

# Configuration of Bootstrap Server, which provides bootstrap config to Dataplanes
bootstrapServer:
# Port of Server that provides bootstrap configuration for dataplanes
Expand Down
29 changes: 29 additions & 0 deletions pkg/config/core/resources/store/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package store

import (
"time"

"github.com/pkg/errors"

"github.com/Kong/kuma/pkg/config"
Expand All @@ -26,13 +28,16 @@ type StoreConfig struct {
Postgres *postgres.PostgresStoreConfig `yaml:"postgres"`
// Kubernetes Store configuration
Kubernetes *k8s.KubernetesStoreConfig `yaml:"kubernetes"`
// Cache configuration
Cache CacheStoreConfig `yaml:"cache"`
}

func DefaultStoreConfig() *StoreConfig {
return &StoreConfig{
Type: MemoryStore,
Postgres: postgres.DefaultPostgresStoreConfig(),
Kubernetes: k8s.DefaultKubernetesStoreConfig(),
Cache: DefaultCacheStoreConfig(),
}
}

Expand All @@ -57,5 +62,29 @@ func (s *StoreConfig) Validate() error {
default:
return errors.Errorf("Type should be either %s, %s or %s", PostgresStore, KubernetesStore, MemoryStore)
}
if err := s.Cache.Validate(); err != nil {
return errors.Wrap(err, "Cache validation failed")
}
return nil
}

var _ config.Config = &CacheStoreConfig{}

type CacheStoreConfig struct {
Enabled bool `yaml:"enabled" envconfig:"kuma_store_cache_enabled"`
ExpirationTime time.Duration `yaml:"expirationTime" envconfig:"kuma_store_cache_expiration_time"`
}

func (c CacheStoreConfig) Sanitize() {
}

func (c CacheStoreConfig) Validate() error {
return nil
}

func DefaultCacheStoreConfig() CacheStoreConfig {
return CacheStoreConfig{
Enabled: true,
ExpirationTime: time.Second,
}
}
8 changes: 8 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Store.Postgres.ConnectionTimeout).To(Equal(10))
Expect(cfg.Store.Postgres.MaxOpenConnections).To(Equal(300))

Expect(cfg.Store.Cache.Enabled).To(BeFalse())
Expect(cfg.Store.Cache.ExpirationTime).To(Equal(3 * time.Second))

Expect(cfg.Store.Postgres.TLS.Mode).To(Equal(postgres.VerifyFull))
Expect(cfg.Store.Postgres.TLS.CertPath).To(Equal("/path/to/cert"))
Expect(cfg.Store.Postgres.TLS.KeyPath).To(Equal("/path/to/key"))
Expand Down Expand Up @@ -142,6 +145,9 @@ store:
certPath: /path/to/cert
keyPath: /path/to/key
caPath: /path/to/rootCert
cache:
enabled: false
expirationTime: 3s
xdsServer:
grpcPort: 5000
diagnosticsPort: 5003
Expand Down Expand Up @@ -220,6 +226,8 @@ guiServer:
"KUMA_STORE_POSTGRES_TLS_CERT_PATH": "/path/to/cert",
"KUMA_STORE_POSTGRES_TLS_KEY_PATH": "/path/to/key",
"KUMA_STORE_POSTGRES_TLS_CA_PATH": "/path/to/rootCert",
"KUMA_STORE_CACHE_ENABLED": "false",
"KUMA_STORE_CACHE_EXPIRATION_TIME": "3s",
"KUMA_API_SERVER_READ_ONLY": "true",
"KUMA_API_SERVER_PORT": "9090",
"KUMA_DATAPLANE_TOKEN_SERVER_ENABLED": "true",
Expand Down
6 changes: 6 additions & 0 deletions pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ func initializeResourceManager(builder *core_runtime.Builder) {
meshManager := mesh_managers.NewMeshManager(builder.ResourceStore(), builder.BuiltinCaManager(), builder.ProvidedCaManager(), customizableManager, builder.SecretManager(), registry.Global())
customManagers[mesh.MeshType] = meshManager
builder.WithResourceManager(customizableManager)

if builder.Config().Store.Cache.Enabled {
builder.WithReadOnlyResourceManager(core_manager.NewCachedManager(customizableManager, builder.Config().Store.Cache.ExpirationTime))
} else {
builder.WithReadOnlyResourceManager(customizableManager)
}
}

func customizeRuntime(rt core_runtime.Runtime) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/logs/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var logger = core.Log.WithName("logs")
// If we define rule kong->backend, it is also applied for kong-admin because there is no way to differentiate
// traffic from services that are using one dataplane.
type TrafficLogsMatcher struct {
ResourceManager manager.ResourceManager
ResourceManager manager.ReadOnlyResourceManager
}

func (m *TrafficLogsMatcher) Match(ctx context.Context, dataplane *mesh_core.DataplaneResource) (core_xds.LogMap, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/permissions/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type TrafficPermissionsMatcher struct {
ResourceManager manager.ResourceManager
ResourceManager manager.ReadOnlyResourceManager
}

type MatchedPermissions map[mesh_proto.InboundInterface]*mesh_core.TrafficPermissionResourceList
Expand Down
73 changes: 73 additions & 0 deletions pkg/core/resources/manager/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package manager

import (
"context"
"fmt"
"time"

"github.com/patrickmn/go-cache"

"github.com/Kong/kuma/pkg/core/resources/model"
"github.com/Kong/kuma/pkg/core/resources/store"
)

// Cached version of the ReadOnlyResourceManager designed to be used only for use cases of eventual consistency.
//
// This cache is NOT consistent across instances of the control plane.
// This cache is mutex free for performance with consideration that: values can be overridden by other goroutine.
// * if cache expires and multiple goroutines tries to fetch the resource, they may fetch underlying manager multiple times.
// * if cache expires and multiple goroutines tries to fetch the resource, they fetch underlying manager multiple times
// and the value returned is different, the older value may be persisted. This is ok, since this cache is designed
// to have low expiration time (like 1s) and having old value just extends propagation of new config for 1 more second.
type cachedManager struct {
delegate ReadOnlyResourceManager
cache *cache.Cache
}

var _ ReadOnlyResourceManager = &cachedManager{}

func NewCachedManager(delegate ReadOnlyResourceManager, expirationTime time.Duration) ReadOnlyResourceManager {
return &cachedManager{
delegate: delegate,
cache: cache.New(expirationTime, time.Duration(int64(float64(expirationTime)*0.9))),
}
}

func (c cachedManager) Get(ctx context.Context, res model.Resource, fs ...store.GetOptionsFunc) error {
opts := store.NewGetOptions(fs...)
cacheKey := fmt.Sprintf("GET:%s:%s", res.GetType(), opts.HashCode())
obj, found := c.cache.Get(cacheKey)
if !found {
if err := c.delegate.Get(ctx, res, fs...); err != nil {
return err
}
c.cache.SetDefault(cacheKey, res)
} else {
cached := obj.(model.Resource)
if err := res.SetSpec(cached.GetSpec()); err != nil {
return err
}
res.SetMeta(cached.GetMeta())
}
return nil
}

func (c cachedManager) List(ctx context.Context, list model.ResourceList, fs ...store.ListOptionsFunc) error {
opts := store.NewListOptions(fs...)
cacheKey := fmt.Sprintf("LIST:%s:%s", list.GetItemType(), opts.HashCode())
obj, found := c.cache.Get(cacheKey)
if !found {
if err := c.delegate.List(ctx, list, fs...); err != nil {
return err
}
c.cache.SetDefault(cacheKey, list.GetItems())
} else {
resources := obj.([]model.Resource)
for _, res := range resources {
if err := list.AddItem(res); err != nil {
return err
}
}
}
return nil
}
121 changes: 121 additions & 0 deletions pkg/core/resources/manager/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package manager_test

import (
"context"
"time"

mesh_proto "github.com/Kong/kuma/api/mesh/v1alpha1"
core_mesh "github.com/Kong/kuma/pkg/core/resources/apis/mesh"
core_manager "github.com/Kong/kuma/pkg/core/resources/manager"
core_model "github.com/Kong/kuma/pkg/core/resources/model"
core_store "github.com/Kong/kuma/pkg/core/resources/store"
"github.com/Kong/kuma/pkg/plugins/resources/memory"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

type countingResourcesManager struct {
store core_store.ResourceStore
getQueries int
listQueries int
}

func (c *countingResourcesManager) Get(ctx context.Context, res core_model.Resource, fn ...core_store.GetOptionsFunc) error {
c.getQueries++
return c.store.Get(ctx, res, fn...)
}

func (c *countingResourcesManager) List(ctx context.Context, list core_model.ResourceList, fn ...core_store.ListOptionsFunc) error {
c.listQueries++
return c.store.List(ctx, list, fn...)
}

var _ core_manager.ReadOnlyResourceManager = &countingResourcesManager{}

var _ = Describe("Cached Resource Manager", func() {

var store core_store.ResourceStore
var cachedManager core_manager.ReadOnlyResourceManager
var countingManager *countingResourcesManager
var res *core_mesh.DataplaneResource
expiration := 100 * time.Millisecond

BeforeEach(func() {
// given
store = memory.NewStore()
countingManager = &countingResourcesManager{
store: store,
}
cachedManager = core_manager.NewCachedManager(countingManager, expiration)

// and created resources
res = &core_mesh.DataplaneResource{
Spec: mesh_proto.Dataplane{
Networking: &mesh_proto.Dataplane_Networking{
Address: "127.0.0.1",
Inbound: []*mesh_proto.Dataplane_Networking_Inbound{
{
Port: 80,
ServicePort: 8080,
},
},
},
},
}
err := store.Create(context.Background(), res, core_store.CreateByKey("dp-1", "default"))
Expect(err).ToNot(HaveOccurred())
})

It("should cache Get() queries", func() {
// when fetched resources multiple times
fetch := func() core_mesh.DataplaneResource {
fetched := core_mesh.DataplaneResource{}
err := cachedManager.Get(context.Background(), &fetched, core_store.GetByKey("dp-1", "default"))
Expect(err).ToNot(HaveOccurred())
return fetched
}

for i := 0; i < 100; i++ {
fetch()
}

// then real manager should be called only once
Expect(fetch().Spec).To(Equal(res.Spec))
Expect(countingManager.getQueries).To(Equal(1))

// when
time.Sleep(expiration)

// then
Expect(fetch().Spec).To(Equal(res.Spec))
Expect(countingManager.getQueries).To(Equal(2))
})

It("should cache List() queries", func() {
// when fetched resources multiple times
fetch := func() core_mesh.DataplaneResourceList {
fetched := core_mesh.DataplaneResourceList{}
err := cachedManager.List(context.Background(), &fetched, core_store.ListByMesh("default"))
Expect(err).ToNot(HaveOccurred())
return fetched
}

for i := 0; i < 100; i++ {
fetch()
}

// then real manager should be called only once
Expect(fetch().Items).To(HaveLen(1))
Expect(fetch().Items[0].GetSpec()).To(Equal(&res.Spec))
Expect(countingManager.listQueries).To(Equal(1))

// when
time.Sleep(expiration)

// then
Expect(fetch().Items).To(HaveLen(1))
Expect(fetch().Items[0].GetSpec()).To(Equal(&res.Spec))
Expect(countingManager.listQueries).To(Equal(2))
})
})
Loading