Skip to content

Commit

Permalink
kuma-cp: add a Monitoring Assignment Discovery Service (MADS) server
Browse files Browse the repository at this point in the history
  • Loading branch information
yskopets committed Jan 8, 2020
1 parent b58f1b1 commit 3ad7015
Show file tree
Hide file tree
Showing 11 changed files with 556 additions and 9 deletions.
6 changes: 6 additions & 0 deletions app/kuma-cp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package cmd

import (
"fmt"

ui_server "github.com/Kong/kuma/app/kuma-ui/pkg/server"
admin_server "github.com/Kong/kuma/pkg/admin-server"
api_server "github.com/Kong/kuma/pkg/api-server"
"github.com/Kong/kuma/pkg/config"
kuma_cp "github.com/Kong/kuma/pkg/config/app/kuma-cp"
"github.com/Kong/kuma/pkg/core"
"github.com/Kong/kuma/pkg/core/bootstrap"
mads_server "github.com/Kong/kuma/pkg/mads/server"
sds_server "github.com/Kong/kuma/pkg/sds/server"
xds_server "github.com/Kong/kuma/pkg/xds/server"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -67,6 +69,10 @@ func newRunCmdWithOpts(opts runCmdOpts) *cobra.Command {
runLog.Error(err, "unable to set up xDS server")
return err
}
if err := mads_server.SetupServer(rt); err != nil {
runLog.Error(err, "unable to set up Monitoring Assignment server")
return err
}
if err := api_server.SetupServer(rt); err != nil {
runLog.Error(err, "unable to set up API server")
return err
Expand Down
26 changes: 17 additions & 9 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/Kong/kuma/pkg/config/core"
"github.com/Kong/kuma/pkg/config/core/resources/store"
gui_server "github.com/Kong/kuma/pkg/config/gui-server"
"github.com/Kong/kuma/pkg/config/mads"
"github.com/Kong/kuma/pkg/config/plugins/runtime"
"github.com/Kong/kuma/pkg/config/sds"
token_server "github.com/Kong/kuma/pkg/config/token-server"
Expand Down Expand Up @@ -68,6 +69,8 @@ type Config struct {
XdsServer *xds.XdsServerConfig `yaml:"xdsServer"`
// Envoy SDS server configuration
SdsServer *sds.SdsServerConfig `yaml:"sdsServer"`
// Monitoring Assignment Discovery Service (MADS) server configuration
MonitoringAssignmentServer *mads.MonitoringAssignmentServerConfig `yaml:"monitoringAssignmentServer"`
// Dataplane Token server configuration (DEPRECATED: use adminServer)
DataplaneTokenServer *token_server.DataplaneTokenServerConfig `yaml:"dataplaneTokenServer"`
// Admin server configuration
Expand All @@ -90,6 +93,7 @@ func (c *Config) Sanitize() {
c.BootstrapServer.Sanitize()
c.XdsServer.Sanitize()
c.SdsServer.Sanitize()
c.MonitoringAssignmentServer.Sanitize()
c.DataplaneTokenServer.Sanitize()
c.AdminServer.Sanitize()
c.ApiServer.Sanitize()
Expand All @@ -100,15 +104,16 @@ func (c *Config) Sanitize() {

func DefaultConfig() Config {
return Config{
Environment: core.UniversalEnvironment,
Store: store.DefaultStoreConfig(),
XdsServer: xds.DefaultXdsServerConfig(),
SdsServer: sds.DefaultSdsServerConfig(),
DataplaneTokenServer: token_server.DefaultDataplaneTokenServerConfig(),
AdminServer: admin_server.DefaultAdminServerConfig(),
ApiServer: api_server.DefaultApiServerConfig(),
BootstrapServer: bootstrap.DefaultBootstrapServerConfig(),
Runtime: runtime.DefaultRuntimeConfig(),
Environment: core.UniversalEnvironment,
Store: store.DefaultStoreConfig(),
XdsServer: xds.DefaultXdsServerConfig(),
SdsServer: sds.DefaultSdsServerConfig(),
MonitoringAssignmentServer: mads.DefaultMonitoringAssignmentServerConfig(),
DataplaneTokenServer: token_server.DefaultDataplaneTokenServerConfig(),
AdminServer: admin_server.DefaultAdminServerConfig(),
ApiServer: api_server.DefaultApiServerConfig(),
BootstrapServer: bootstrap.DefaultBootstrapServerConfig(),
Runtime: runtime.DefaultRuntimeConfig(),
Defaults: &Defaults{
Mesh: `type: Mesh
name: default
Expand All @@ -135,6 +140,9 @@ func (c *Config) Validate() error {
if err := c.SdsServer.Validate(); err != nil {
return errors.Wrap(err, "SDS Server validation failed")
}
if err := c.MonitoringAssignmentServer.Validate(); err != nil {
return errors.Wrap(err, "Monitoring Assignment Server validation failed")
}
if err := c.DataplaneTokenServer.Validate(); err != nil {
return errors.Wrap(err, "Dataplane Token Server validation failed")
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/config/mads/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package mads

import (
"github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/Kong/kuma/pkg/config"
)

func DefaultMonitoringAssignmentServerConfig() *MonitoringAssignmentServerConfig {
return &MonitoringAssignmentServerConfig{
GrpcPort: 5676,
}
}

// Monitoring Assignment Discovery Service (MADS) server configuration.
type MonitoringAssignmentServerConfig struct {
// Port of a gRPC server that serves Monitoring Assignment Discovery Service (MADS).
GrpcPort int `yaml:"grpcPort" envconfig:"kuma_observability_monitoring_assignment_server_grpc_port"`
}

var _ config.Config = &MonitoringAssignmentServerConfig{}

func (c *MonitoringAssignmentServerConfig) Sanitize() {
}

func (c *MonitoringAssignmentServerConfig) Validate() (errs error) {
if 65535 < c.GrpcPort {
errs = multierr.Append(errs, errors.Errorf(".GrpcPort must be in the range [0, 65535]"))
}
return
}
15 changes: 15 additions & 0 deletions pkg/mads/generator/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package generator

import (
mesh_core "github.com/Kong/kuma/pkg/core/resources/apis/mesh"
"github.com/Kong/kuma/pkg/core/resources/model"
)

type Context struct {
Meshes []*mesh_core.MeshResource
Dataplanes []*mesh_core.DataplaneResource
}

type ResourceGenerator interface {
Generate(Context) ([]*model.Resource, error)
}
17 changes: 17 additions & 0 deletions pkg/mads/reconcile/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package reconcile

import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"

util_xds "github.com/Kong/kuma/pkg/util/xds"
)

// Reconciler re-computes configuration for a given node.
type Reconciler interface {
Reconcile(*envoy_core.Node) error
}

// Generates a snapshot of xDS resources for a given node.
type Snapshotter interface {
Snapshot(*envoy_core.Node) (util_xds.Snapshot, error)
}
36 changes: 36 additions & 0 deletions pkg/mads/reconcile/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package reconcile

import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache"

util_xds "github.com/Kong/kuma/pkg/util/xds"
)

func NewReconciler(hasher envoy_cache.NodeHash, cache util_xds.SnapshotCache,
snapshotter Snapshotter, versioner util_xds.SnapshotVersioner) Reconciler {
return &reconciler{
hasher: hasher,
cache: cache,
snapshotter: snapshotter,
versioner: versioner,
}
}

type reconciler struct {
hasher envoy_cache.NodeHash
cache util_xds.SnapshotCache
snapshotter Snapshotter
versioner util_xds.SnapshotVersioner
}

func (r *reconciler) Reconcile(node *envoy_core.Node) error {
new, err := r.snapshotter.Snapshot(node)
if err != nil {
return err
}
id := r.hasher.ID(node)
old, _ := r.cache.GetSnapshot(id)
r.versioner.Version(new, old)
return r.cache.SetSnapshot(id, new)
}
21 changes: 21 additions & 0 deletions pkg/mads/reconcile/snapshotter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package reconcile

import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache"

mads_cache "github.com/Kong/kuma/pkg/mads/cache"
util_xds "github.com/Kong/kuma/pkg/util/xds"
)

func NewSnapshotter() Snapshotter {
return &snapshotter{}
}

type snapshotter struct {
}

func (_ snapshotter) Snapshot(*envoy_core.Node) (util_xds.Snapshot, error) {
snapshot := mads_cache.NewSnapshot("", []envoy_cache.Resource{})
return &snapshot, nil
}
61 changes: 61 additions & 0 deletions pkg/mads/server/components.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package server

import (
"time"

"github.com/go-logr/logr"

"github.com/Kong/kuma/pkg/core"
core_runtime "github.com/Kong/kuma/pkg/core/runtime"
mads_reconcile "github.com/Kong/kuma/pkg/mads/reconcile"
util_watchdog "github.com/Kong/kuma/pkg/util/watchdog"
util_xds "github.com/Kong/kuma/pkg/util/xds"

envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache"
envoy_xds "github.com/envoyproxy/go-control-plane/pkg/server"
)

func NewSnapshotter() mads_reconcile.Snapshotter {
return mads_reconcile.NewSnapshotter()
}

func NewVersioner() util_xds.SnapshotVersioner {
return util_xds.SnapshotAutoVersioner{UUID: core.NewUUID}
}

func NewReconciler(hasher envoy_cache.NodeHash, cache util_xds.SnapshotCache,
snapshotter mads_reconcile.Snapshotter, versioner util_xds.SnapshotVersioner) mads_reconcile.Reconciler {
return mads_reconcile.NewReconciler(hasher, cache, snapshotter, versioner)
}

func NewSyncTracker(rt core_runtime.Runtime, reconciler mads_reconcile.Reconciler) envoy_xds.Callbacks {
return util_xds.NewWatchdogCallbacks(func(node *envoy_core.Node, _ int64) (util_watchdog.Watchdog, error) {
return &util_watchdog.SimpleWatchdog{
NewTicker: func() *time.Ticker {
return time.NewTicker(1 * time.Second)
},
OnTick: func() error {
madsServerLog.V(1).Info("on tick")
return reconciler.Reconcile(node)
},
OnError: func(err error) {
madsServerLog.Error(err, "OnTick() failed")
},
}, nil
})
}

func NewXdsContext(log logr.Logger) (envoy_cache.NodeHash, util_xds.SnapshotCache) {
hasher := hasher{}
logger := util_xds.NewLogger(log)
return hasher, util_xds.NewSnapshotCache(false, hasher, logger)
}

type hasher struct {
}

func (_ hasher) ID(node *envoy_core.Node) string {
// in the very first implementation, we don't differentiate clients
return ""
}
65 changes: 65 additions & 0 deletions pkg/mads/server/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package server

import (
"fmt"
"net"

"google.golang.org/grpc"

kuma_observability "github.com/Kong/kuma/api/observability/v1alpha1"

mads_config "github.com/Kong/kuma/pkg/config/mads"
"github.com/Kong/kuma/pkg/core"
core_runtime "github.com/Kong/kuma/pkg/core/runtime"
)

const grpcMaxConcurrentStreams = 1000000

var (
grpcServerLog = core.Log.WithName("mads-server").WithName("grpc")
)

type grpcServer struct {
server Server
config mads_config.MonitoringAssignmentServerConfig
}

// Make sure that grpcServer implements all relevant interfaces
var (
_ core_runtime.Component = &grpcServer{}
)

func (s *grpcServer) Start(stop <-chan struct{}) error {
var grpcOptions []grpc.ServerOption
grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
grpcServer := grpc.NewServer(grpcOptions...)

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.config.GrpcPort))
if err != nil {
return err
}

// register services
kuma_observability.RegisterMonitoringAssignmentDiscoveryServiceServer(grpcServer, s.server)

errChan := make(chan error)
go func() {
defer close(errChan)
if err = grpcServer.Serve(lis); err != nil {
grpcServerLog.Error(err, "terminated with an error")
errChan <- err
} else {
grpcServerLog.Info("terminated normally")
}
}()
grpcServerLog.Info("starting", "port", s.config.GrpcPort)

select {
case <-stop:
grpcServerLog.Info("stopping gracefully")
grpcServer.GracefulStop()
return nil
case err := <-errChan:
return err
}
}
Loading

0 comments on commit 3ad7015

Please sign in to comment.