Skip to content

Commit

Permalink
kuma-cp: add a Monitoring Assignment Discovery Service (MADS) server (#…
Browse files Browse the repository at this point in the history
…531)

* kuma-cp: add a Monitoring Assignment Discovery Service (MADS) server
* code review: address feedback from code review
  • Loading branch information
yskopets authored Jan 10, 2020
1 parent ac1c073 commit e2529c9
Show file tree
Hide file tree
Showing 15 changed files with 642 additions and 57 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

Changes:

* feature: add a Monitoring Assignment Discovery Service (MADS) server
[#531](https://github.com/Kong/kuma/pull/531)
* feature: add a generic watchdog for xDS streams
[#530](https://github.com/Kong/kuma/pull/530)
* feature: add a generic versioner for xDS Snapshots
Expand Down
6 changes: 6 additions & 0 deletions app/kuma-cp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package cmd

import (
"fmt"

ui_server "github.com/Kong/kuma/app/kuma-ui/pkg/server"
admin_server "github.com/Kong/kuma/pkg/admin-server"
api_server "github.com/Kong/kuma/pkg/api-server"
"github.com/Kong/kuma/pkg/config"
kuma_cp "github.com/Kong/kuma/pkg/config/app/kuma-cp"
"github.com/Kong/kuma/pkg/core"
"github.com/Kong/kuma/pkg/core/bootstrap"
mads_server "github.com/Kong/kuma/pkg/mads/server"
sds_server "github.com/Kong/kuma/pkg/sds/server"
xds_server "github.com/Kong/kuma/pkg/xds/server"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -67,6 +69,10 @@ func newRunCmdWithOpts(opts runCmdOpts) *cobra.Command {
runLog.Error(err, "unable to set up xDS server")
return err
}
if err := mads_server.SetupServer(rt); err != nil {
runLog.Error(err, "unable to set up Monitoring Assignment server")
return err
}
if err := api_server.SetupServer(rt); err != nil {
runLog.Error(err, "unable to set up API server")
return err
Expand Down
11 changes: 8 additions & 3 deletions pkg/api-server/config_ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package api_server_test

import (
"fmt"
"io/ioutil"
"net/http"
"strings"

api_server_config "github.com/Kong/kuma/pkg/config/api-server"
"github.com/Kong/kuma/pkg/plugins/resources/memory"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"io/ioutil"
"net/http"
"strings"
)

var _ = Describe("Config WS", func() {
Expand Down Expand Up @@ -107,6 +108,10 @@ var _ = Describe("Config WS", func() {
"port": 5683,
"apiServerUrl": ""
},
"monitoringAssignmentServer": {
"assignmentRefreshInterval": "1s",
"grpcPort": 5676
},
"reports": {
"enabled": true
},
Expand Down
26 changes: 17 additions & 9 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/Kong/kuma/pkg/config/core"
"github.com/Kong/kuma/pkg/config/core/resources/store"
gui_server "github.com/Kong/kuma/pkg/config/gui-server"
"github.com/Kong/kuma/pkg/config/mads"
"github.com/Kong/kuma/pkg/config/plugins/runtime"
"github.com/Kong/kuma/pkg/config/sds"
token_server "github.com/Kong/kuma/pkg/config/token-server"
Expand Down Expand Up @@ -70,6 +71,8 @@ type Config struct {
SdsServer *sds.SdsServerConfig `yaml:"sdsServer"`
// Dataplane Token server configuration (DEPRECATED: use adminServer)
DataplaneTokenServer *token_server.DataplaneTokenServerConfig `yaml:"dataplaneTokenServer"`
// Monitoring Assignment Discovery Service (MADS) server configuration
MonitoringAssignmentServer *mads.MonitoringAssignmentServerConfig `yaml:"monitoringAssignmentServer"`
// Admin server configuration
AdminServer *admin_server.AdminServerConfig `yaml:"adminServer"`
// API Server configuration
Expand All @@ -91,6 +94,7 @@ func (c *Config) Sanitize() {
c.XdsServer.Sanitize()
c.SdsServer.Sanitize()
c.DataplaneTokenServer.Sanitize()
c.MonitoringAssignmentServer.Sanitize()
c.AdminServer.Sanitize()
c.ApiServer.Sanitize()
c.Runtime.Sanitize()
Expand All @@ -100,15 +104,16 @@ func (c *Config) Sanitize() {

func DefaultConfig() Config {
return Config{
Environment: core.UniversalEnvironment,
Store: store.DefaultStoreConfig(),
XdsServer: xds.DefaultXdsServerConfig(),
SdsServer: sds.DefaultSdsServerConfig(),
DataplaneTokenServer: token_server.DefaultDataplaneTokenServerConfig(),
AdminServer: admin_server.DefaultAdminServerConfig(),
ApiServer: api_server.DefaultApiServerConfig(),
BootstrapServer: bootstrap.DefaultBootstrapServerConfig(),
Runtime: runtime.DefaultRuntimeConfig(),
Environment: core.UniversalEnvironment,
Store: store.DefaultStoreConfig(),
XdsServer: xds.DefaultXdsServerConfig(),
SdsServer: sds.DefaultSdsServerConfig(),
DataplaneTokenServer: token_server.DefaultDataplaneTokenServerConfig(),
MonitoringAssignmentServer: mads.DefaultMonitoringAssignmentServerConfig(),
AdminServer: admin_server.DefaultAdminServerConfig(),
ApiServer: api_server.DefaultApiServerConfig(),
BootstrapServer: bootstrap.DefaultBootstrapServerConfig(),
Runtime: runtime.DefaultRuntimeConfig(),
Defaults: &Defaults{
Mesh: `type: Mesh
name: default
Expand Down Expand Up @@ -138,6 +143,9 @@ func (c *Config) Validate() error {
if err := c.DataplaneTokenServer.Validate(); err != nil {
return errors.Wrap(err, "Dataplane Token Server validation failed")
}
if err := c.MonitoringAssignmentServer.Validate(); err != nil {
return errors.Wrap(err, "Monitoring Assignment Server validation failed")
}
if err := c.AdminServer.Validate(); err != nil {
return errors.Wrap(err, "Admin Server validation failed")
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ dataplaneTokenServer:
# Directory of authorized client certificates
clientCertsDir: # ENV: KUMA_DATAPLANE_TOKEN_SERVER_PUBLIC_CLIENT_CERTS_DIR"`

# Monitoring Assignment Discovery Service (MADS) server configuration
monitoringAssignmentServer:
# Port of a gRPC server that serves Monitoring Assignment Discovery Service (MADS).
grpcPort: 5676 # ENV: KUMA_MONITORING_ASSIGNMENT_SERVER_GRPC_PORT
# Interval for re-generating monitoring assignments for clients connected to the Control Plane.
assignmentRefreshInterval: 1s # ENV: KUMA_MONITORING_ASSIGNMENT_SERVER_ASSIGNMENT_REFRESH_INTERVAL

# Admin server configuration
adminServer:
# Local configuration of server that is available only on localhost
Expand Down
100 changes: 55 additions & 45 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package config_test

import (
"github.com/Kong/kuma/pkg/config/plugins/resources/postgres"
"io/ioutil"
"os"
"time"

"github.com/Kong/kuma/pkg/config/plugins/resources/postgres"

"github.com/Kong/kuma/pkg/config"
kuma_cp "github.com/Kong/kuma/pkg/config/app/kuma-cp"
Expand Down Expand Up @@ -97,6 +99,9 @@ var _ = Describe("Config loader", func() {
Expect(cfg.DataplaneTokenServer.Public.TlsCertFile).To(Equal("/tmp/cert"))
Expect(cfg.DataplaneTokenServer.Public.ClientCertsDir).To(Equal("/tmp/certs"))

Expect(cfg.MonitoringAssignmentServer.GrpcPort).To(Equal(uint32(3333)))
Expect(cfg.MonitoringAssignmentServer.AssignmentRefreshInterval).To(Equal(12 * time.Second))

Expect(cfg.AdminServer.Apis.DataplaneToken.Enabled).To(BeTrue())
Expect(cfg.AdminServer.Local.Port).To(Equal(uint32(1111)))
Expect(cfg.AdminServer.Public.Enabled).To(BeTrue())
Expand Down Expand Up @@ -161,6 +166,9 @@ dataplaneTokenServer:
tlsCertFile: /tmp/cert
tlsKeyFile: /tmp/key
clientCertsDir: /tmp/certs
monitoringAssignmentServer:
grpcPort: 3333
assignmentRefreshInterval: 12s
adminServer:
local:
port: 1111
Expand Down Expand Up @@ -191,50 +199,52 @@ guiServer:
}),
Entry("from env variables", testCase{
envVars: map[string]string{
"KUMA_XDS_SERVER_GRPC_PORT": "5000",
"KUMA_XDS_SERVER_DIAGNOSTICS_PORT": "5003",
"KUMA_BOOTSTRAP_SERVER_PORT": "5004",
"KUMA_BOOTSTRAP_SERVER_PARAMS_ADMIN_PORT": "1234",
"KUMA_BOOTSTRAP_SERVER_PARAMS_XDS_HOST": "kuma-control-plane",
"KUMA_BOOTSTRAP_SERVER_PARAMS_XDS_PORT": "4321",
"KUMA_ENVIRONMENT": "kubernetes",
"KUMA_STORE_TYPE": "postgres",
"KUMA_STORE_POSTGRES_HOST": "postgres.host",
"KUMA_STORE_POSTGRES_PORT": "5432",
"KUMA_STORE_POSTGRES_USER": "kuma",
"KUMA_STORE_POSTGRES_PASSWORD": "kuma",
"KUMA_STORE_POSTGRES_DB_NAME": "kuma",
"KUMA_STORE_POSTGRES_CONNECTION_TIMEOUT": "10",
"KUMA_STORE_POSTGRES_TLS_MODE": "verifyFull",
"KUMA_STORE_POSTGRES_TLS_CERT_PATH": "/path/to/cert",
"KUMA_STORE_POSTGRES_TLS_KEY_PATH": "/path/to/key",
"KUMA_STORE_POSTGRES_TLS_CA_PATH": "/path/to/rootCert",
"KUMA_API_SERVER_READ_ONLY": "true",
"KUMA_API_SERVER_PORT": "9090",
"KUMA_DATAPLANE_TOKEN_SERVER_ENABLED": "true",
"KUMA_DATAPLANE_TOKEN_SERVER_LOCAL_PORT": "1111",
"KUMA_DATAPLANE_TOKEN_SERVER_PUBLIC_ENABLED": "true",
"KUMA_DATAPLANE_TOKEN_SERVER_PUBLIC_INTERFACE": "192.168.0.1",
"KUMA_DATAPLANE_TOKEN_SERVER_PUBLIC_PORT": "2222",
"KUMA_DATAPLANE_TOKEN_SERVER_PUBLIC_TLS_KEY_FILE": "/tmp/key",
"KUMA_DATAPLANE_TOKEN_SERVER_PUBLIC_TLS_CERT_FILE": "/tmp/cert",
"KUMA_DATAPLANE_TOKEN_SERVER_PUBLIC_CLIENT_CERTS_DIR": "/tmp/certs",
"KUMA_ADMIN_SERVER_APIS_DATAPLANE_TOKEN_ENABLED": "true",
"KUMA_ADMIN_SERVER_LOCAL_PORT": "1111",
"KUMA_ADMIN_SERVER_PUBLIC_ENABLED": "true",
"KUMA_ADMIN_SERVER_PUBLIC_INTERFACE": "192.168.0.1",
"KUMA_ADMIN_SERVER_PUBLIC_PORT": "2222",
"KUMA_ADMIN_SERVER_PUBLIC_TLS_KEY_FILE": "/tmp/key",
"KUMA_ADMIN_SERVER_PUBLIC_TLS_CERT_FILE": "/tmp/cert",
"KUMA_ADMIN_SERVER_PUBLIC_CLIENT_CERTS_DIR": "/tmp/certs",
"KUMA_REPORTS_ENABLED": "false",
"KUMA_KUBERNETES_ADMISSION_SERVER_ADDRESS": "127.0.0.2",
"KUMA_KUBERNETES_ADMISSION_SERVER_PORT": "9443",
"KUMA_KUBERNETES_ADMISSION_SERVER_CERT_DIR": "/var/run/secrets/kuma.io/kuma-admission-server/tls-cert",
"KUMA_GENERAL_ADVERTISED_HOSTNAME": "kuma.internal",
"KUMA_API_SERVER_CORS_ALLOWED_DOMAINS": "https://kuma,https://someapi",
"KUMA_GUI_SERVER_PORT": "8888",
"KUMA_GUI_SERVER_API_SERVER_URL": "http://localhost:1234",
"KUMA_XDS_SERVER_GRPC_PORT": "5000",
"KUMA_XDS_SERVER_DIAGNOSTICS_PORT": "5003",
"KUMA_BOOTSTRAP_SERVER_PORT": "5004",
"KUMA_BOOTSTRAP_SERVER_PARAMS_ADMIN_PORT": "1234",
"KUMA_BOOTSTRAP_SERVER_PARAMS_XDS_HOST": "kuma-control-plane",
"KUMA_BOOTSTRAP_SERVER_PARAMS_XDS_PORT": "4321",
"KUMA_ENVIRONMENT": "kubernetes",
"KUMA_STORE_TYPE": "postgres",
"KUMA_STORE_POSTGRES_HOST": "postgres.host",
"KUMA_STORE_POSTGRES_PORT": "5432",
"KUMA_STORE_POSTGRES_USER": "kuma",
"KUMA_STORE_POSTGRES_PASSWORD": "kuma",
"KUMA_STORE_POSTGRES_DB_NAME": "kuma",
"KUMA_STORE_POSTGRES_CONNECTION_TIMEOUT": "10",
"KUMA_STORE_POSTGRES_TLS_MODE": "verifyFull",
"KUMA_STORE_POSTGRES_TLS_CERT_PATH": "/path/to/cert",
"KUMA_STORE_POSTGRES_TLS_KEY_PATH": "/path/to/key",
"KUMA_STORE_POSTGRES_TLS_CA_PATH": "/path/to/rootCert",
"KUMA_API_SERVER_READ_ONLY": "true",
"KUMA_API_SERVER_PORT": "9090",
"KUMA_DATAPLANE_TOKEN_SERVER_ENABLED": "true",
"KUMA_DATAPLANE_TOKEN_SERVER_LOCAL_PORT": "1111",
"KUMA_DATAPLANE_TOKEN_SERVER_PUBLIC_ENABLED": "true",
"KUMA_DATAPLANE_TOKEN_SERVER_PUBLIC_INTERFACE": "192.168.0.1",
"KUMA_DATAPLANE_TOKEN_SERVER_PUBLIC_PORT": "2222",
"KUMA_DATAPLANE_TOKEN_SERVER_PUBLIC_TLS_KEY_FILE": "/tmp/key",
"KUMA_DATAPLANE_TOKEN_SERVER_PUBLIC_TLS_CERT_FILE": "/tmp/cert",
"KUMA_DATAPLANE_TOKEN_SERVER_PUBLIC_CLIENT_CERTS_DIR": "/tmp/certs",
"KUMA_MONITORING_ASSIGNMENT_SERVER_GRPC_PORT": "3333",
"KUMA_MONITORING_ASSIGNMENT_SERVER_ASSIGNMENT_REFRESH_INTERVAL": "12s",
"KUMA_ADMIN_SERVER_APIS_DATAPLANE_TOKEN_ENABLED": "true",
"KUMA_ADMIN_SERVER_LOCAL_PORT": "1111",
"KUMA_ADMIN_SERVER_PUBLIC_ENABLED": "true",
"KUMA_ADMIN_SERVER_PUBLIC_INTERFACE": "192.168.0.1",
"KUMA_ADMIN_SERVER_PUBLIC_PORT": "2222",
"KUMA_ADMIN_SERVER_PUBLIC_TLS_KEY_FILE": "/tmp/key",
"KUMA_ADMIN_SERVER_PUBLIC_TLS_CERT_FILE": "/tmp/cert",
"KUMA_ADMIN_SERVER_PUBLIC_CLIENT_CERTS_DIR": "/tmp/certs",
"KUMA_REPORTS_ENABLED": "false",
"KUMA_KUBERNETES_ADMISSION_SERVER_ADDRESS": "127.0.0.2",
"KUMA_KUBERNETES_ADMISSION_SERVER_PORT": "9443",
"KUMA_KUBERNETES_ADMISSION_SERVER_CERT_DIR": "/var/run/secrets/kuma.io/kuma-admission-server/tls-cert",
"KUMA_GENERAL_ADVERTISED_HOSTNAME": "kuma.internal",
"KUMA_API_SERVER_CORS_ALLOWED_DOMAINS": "https://kuma,https://someapi",
"KUMA_GUI_SERVER_PORT": "8888",
"KUMA_GUI_SERVER_API_SERVER_URL": "http://localhost:1234",
},
yamlFileConfig: "",
}),
Expand Down
41 changes: 41 additions & 0 deletions pkg/config/mads/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package mads

import (
"time"

"github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/Kong/kuma/pkg/config"
)

func DefaultMonitoringAssignmentServerConfig() *MonitoringAssignmentServerConfig {
return &MonitoringAssignmentServerConfig{
GrpcPort: 5676,
AssignmentRefreshInterval: 1 * time.Second,
}
}

// Monitoring Assignment Discovery Service (MADS) server configuration.
type MonitoringAssignmentServerConfig struct {
// Port of a gRPC server that serves Monitoring Assignment Discovery Service (MADS).
GrpcPort uint32 `yaml:"grpcPort" envconfig:"kuma_monitoring_assignment_server_grpc_port"`

// Interval for re-generating monitoring assignments for clients connected to the Control Plane.
AssignmentRefreshInterval time.Duration `yaml:"assignmentRefreshInterval" envconfig:"kuma_monitoring_assignment_server_assignment_refresh_interval"`
}

var _ config.Config = &MonitoringAssignmentServerConfig{}

func (c *MonitoringAssignmentServerConfig) Sanitize() {
}

func (c *MonitoringAssignmentServerConfig) Validate() (errs error) {
if 65535 < c.GrpcPort {
errs = multierr.Append(errs, errors.Errorf(".GrpcPort must be in the range [0, 65535]"))
}
if c.AssignmentRefreshInterval <= 0 {
return errors.New(".AssignmentRefreshInterval must be positive")
}
return
}
15 changes: 15 additions & 0 deletions pkg/mads/generator/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package generator

import (
mesh_core "github.com/Kong/kuma/pkg/core/resources/apis/mesh"
"github.com/Kong/kuma/pkg/core/resources/model"
)

type Context struct {
Meshes []*mesh_core.MeshResource
Dataplanes []*mesh_core.DataplaneResource
}

type ResourceGenerator interface {
Generate(Context) ([]*model.Resource, error)
}
17 changes: 17 additions & 0 deletions pkg/mads/reconcile/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package reconcile

import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"

util_xds "github.com/Kong/kuma/pkg/util/xds"
)

// Reconciler re-computes configuration for a given node.
type Reconciler interface {
Reconcile(*envoy_core.Node) error
}

// Generates a snapshot of xDS resources for a given node.
type SnapshotGenerator interface {
GenerateSnapshot(*envoy_core.Node) (util_xds.Snapshot, error)
}
39 changes: 39 additions & 0 deletions pkg/mads/reconcile/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package reconcile

import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache"

util_xds "github.com/Kong/kuma/pkg/util/xds"
)

func NewReconciler(hasher envoy_cache.NodeHash, cache util_xds.SnapshotCache,
generator SnapshotGenerator, versioner util_xds.SnapshotVersioner) Reconciler {
return &reconciler{
hasher: hasher,
cache: cache,
generator: generator,
versioner: versioner,
}
}

type reconciler struct {
hasher envoy_cache.NodeHash
cache util_xds.SnapshotCache
generator SnapshotGenerator
versioner util_xds.SnapshotVersioner
}

func (r *reconciler) Reconcile(node *envoy_core.Node) error {
new, err := r.generator.GenerateSnapshot(node)
if err != nil {
return err
}
if err := new.Consistent(); err != nil {
return err
}
id := r.hasher.ID(node)
old, _ := r.cache.GetSnapshot(id)
new = r.versioner.Version(new, old)
return r.cache.SetSnapshot(id, new)
}
Loading

0 comments on commit e2529c9

Please sign in to comment.