diff --git a/app/kuma-cp/cmd/run.go b/app/kuma-cp/cmd/run.go index 2d55a6393c7c..9a2f3cfac1d8 100644 --- a/app/kuma-cp/cmd/run.go +++ b/app/kuma-cp/cmd/run.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + ui_server "github.com/Kong/kuma/app/kuma-ui/pkg/server" admin_server "github.com/Kong/kuma/pkg/admin-server" api_server "github.com/Kong/kuma/pkg/api-server" @@ -9,6 +10,7 @@ import ( kuma_cp "github.com/Kong/kuma/pkg/config/app/kuma-cp" "github.com/Kong/kuma/pkg/core" "github.com/Kong/kuma/pkg/core/bootstrap" + mads_server "github.com/Kong/kuma/pkg/mads/server" sds_server "github.com/Kong/kuma/pkg/sds/server" xds_server "github.com/Kong/kuma/pkg/xds/server" "github.com/spf13/cobra" @@ -67,6 +69,10 @@ func newRunCmdWithOpts(opts runCmdOpts) *cobra.Command { runLog.Error(err, "unable to set up xDS server") return err } + if err := mads_server.SetupServer(rt); err != nil { + runLog.Error(err, "unable to set up Monitoring Assignment server") + return err + } if err := api_server.SetupServer(rt); err != nil { runLog.Error(err, "unable to set up API server") return err diff --git a/pkg/config/app/kuma-cp/config.go b/pkg/config/app/kuma-cp/config.go index 5ccf21aa531e..abf0a902ded1 100644 --- a/pkg/config/app/kuma-cp/config.go +++ b/pkg/config/app/kuma-cp/config.go @@ -8,6 +8,7 @@ import ( "github.com/Kong/kuma/pkg/config/core" "github.com/Kong/kuma/pkg/config/core/resources/store" gui_server "github.com/Kong/kuma/pkg/config/gui-server" + "github.com/Kong/kuma/pkg/config/mads" "github.com/Kong/kuma/pkg/config/plugins/runtime" "github.com/Kong/kuma/pkg/config/sds" token_server "github.com/Kong/kuma/pkg/config/token-server" @@ -68,6 +69,8 @@ type Config struct { XdsServer *xds.XdsServerConfig `yaml:"xdsServer"` // Envoy SDS server configuration SdsServer *sds.SdsServerConfig `yaml:"sdsServer"` + // Monitoring Assignment Discovery Service (MADS) server configuration + MonitoringAssignmentServer *mads.MonitoringAssignmentServerConfig `yaml:"monitoringAssignmentServer"` // Dataplane Token server configuration (DEPRECATED: use adminServer) DataplaneTokenServer *token_server.DataplaneTokenServerConfig `yaml:"dataplaneTokenServer"` // Admin server configuration @@ -90,6 +93,7 @@ func (c *Config) Sanitize() { c.BootstrapServer.Sanitize() c.XdsServer.Sanitize() c.SdsServer.Sanitize() + c.MonitoringAssignmentServer.Sanitize() c.DataplaneTokenServer.Sanitize() c.AdminServer.Sanitize() c.ApiServer.Sanitize() @@ -100,15 +104,16 @@ func (c *Config) Sanitize() { func DefaultConfig() Config { return Config{ - Environment: core.UniversalEnvironment, - Store: store.DefaultStoreConfig(), - XdsServer: xds.DefaultXdsServerConfig(), - SdsServer: sds.DefaultSdsServerConfig(), - DataplaneTokenServer: token_server.DefaultDataplaneTokenServerConfig(), - AdminServer: admin_server.DefaultAdminServerConfig(), - ApiServer: api_server.DefaultApiServerConfig(), - BootstrapServer: bootstrap.DefaultBootstrapServerConfig(), - Runtime: runtime.DefaultRuntimeConfig(), + Environment: core.UniversalEnvironment, + Store: store.DefaultStoreConfig(), + XdsServer: xds.DefaultXdsServerConfig(), + SdsServer: sds.DefaultSdsServerConfig(), + MonitoringAssignmentServer: mads.DefaultMonitoringAssignmentServerConfig(), + DataplaneTokenServer: token_server.DefaultDataplaneTokenServerConfig(), + AdminServer: admin_server.DefaultAdminServerConfig(), + ApiServer: api_server.DefaultApiServerConfig(), + BootstrapServer: bootstrap.DefaultBootstrapServerConfig(), + Runtime: runtime.DefaultRuntimeConfig(), Defaults: &Defaults{ Mesh: `type: Mesh name: default @@ -135,6 +140,9 @@ func (c *Config) Validate() error { if err := c.SdsServer.Validate(); err != nil { return errors.Wrap(err, "SDS Server validation failed") } + if err := c.MonitoringAssignmentServer.Validate(); err != nil { + return errors.Wrap(err, "Monitoring Assignment Server validation failed") + } if err := c.DataplaneTokenServer.Validate(); err != nil { return errors.Wrap(err, "Dataplane Token Server validation failed") } diff --git a/pkg/config/mads/config.go b/pkg/config/mads/config.go new file mode 100644 index 000000000000..39997d21cf94 --- /dev/null +++ b/pkg/config/mads/config.go @@ -0,0 +1,32 @@ +package mads + +import ( + "github.com/pkg/errors" + "go.uber.org/multierr" + + "github.com/Kong/kuma/pkg/config" +) + +func DefaultMonitoringAssignmentServerConfig() *MonitoringAssignmentServerConfig { + return &MonitoringAssignmentServerConfig{ + GrpcPort: 5676, + } +} + +// Monitoring Assignment Discovery Service (MADS) server configuration. +type MonitoringAssignmentServerConfig struct { + // Port of a gRPC server that serves Monitoring Assignment Discovery Service (MADS). + GrpcPort int `yaml:"grpcPort" envconfig:"kuma_observability_monitoring_assignment_server_grpc_port"` +} + +var _ config.Config = &MonitoringAssignmentServerConfig{} + +func (c *MonitoringAssignmentServerConfig) Sanitize() { +} + +func (c *MonitoringAssignmentServerConfig) Validate() (errs error) { + if 65535 < c.GrpcPort { + errs = multierr.Append(errs, errors.Errorf(".GrpcPort must be in the range [0, 65535]")) + } + return +} diff --git a/pkg/mads/generator/interfaces.go b/pkg/mads/generator/interfaces.go new file mode 100644 index 000000000000..7fcb016297cb --- /dev/null +++ b/pkg/mads/generator/interfaces.go @@ -0,0 +1,15 @@ +package generator + +import ( + mesh_core "github.com/Kong/kuma/pkg/core/resources/apis/mesh" + "github.com/Kong/kuma/pkg/core/resources/model" +) + +type Context struct { + Meshes []*mesh_core.MeshResource + Dataplanes []*mesh_core.DataplaneResource +} + +type ResourceGenerator interface { + Generate(Context) ([]*model.Resource, error) +} diff --git a/pkg/mads/reconcile/interfaces.go b/pkg/mads/reconcile/interfaces.go new file mode 100644 index 000000000000..5460f44e2e97 --- /dev/null +++ b/pkg/mads/reconcile/interfaces.go @@ -0,0 +1,17 @@ +package reconcile + +import ( + envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + + util_xds "github.com/Kong/kuma/pkg/util/xds" +) + +// Reconciler re-computes configuration for a given node. +type Reconciler interface { + Reconcile(*envoy_core.Node) error +} + +// Generates a snapshot of xDS resources for a given node. +type Snapshotter interface { + Snapshot(*envoy_core.Node) (util_xds.Snapshot, error) +} diff --git a/pkg/mads/reconcile/reconciler.go b/pkg/mads/reconcile/reconciler.go new file mode 100644 index 000000000000..50e76b65e179 --- /dev/null +++ b/pkg/mads/reconcile/reconciler.go @@ -0,0 +1,36 @@ +package reconcile + +import ( + envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache" + + util_xds "github.com/Kong/kuma/pkg/util/xds" +) + +func NewReconciler(hasher envoy_cache.NodeHash, cache util_xds.SnapshotCache, + snapshotter Snapshotter, versioner util_xds.SnapshotVersioner) Reconciler { + return &reconciler{ + hasher: hasher, + cache: cache, + snapshotter: snapshotter, + versioner: versioner, + } +} + +type reconciler struct { + hasher envoy_cache.NodeHash + cache util_xds.SnapshotCache + snapshotter Snapshotter + versioner util_xds.SnapshotVersioner +} + +func (r *reconciler) Reconcile(node *envoy_core.Node) error { + new, err := r.snapshotter.Snapshot(node) + if err != nil { + return err + } + id := r.hasher.ID(node) + old, _ := r.cache.GetSnapshot(id) + r.versioner.Version(new, old) + return r.cache.SetSnapshot(id, new) +} diff --git a/pkg/mads/reconcile/snapshotter.go b/pkg/mads/reconcile/snapshotter.go new file mode 100644 index 000000000000..bc6cd488910c --- /dev/null +++ b/pkg/mads/reconcile/snapshotter.go @@ -0,0 +1,21 @@ +package reconcile + +import ( + envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache" + + mads_cache "github.com/Kong/kuma/pkg/mads/cache" + util_xds "github.com/Kong/kuma/pkg/util/xds" +) + +func NewSnapshotter() Snapshotter { + return &snapshotter{} +} + +type snapshotter struct { +} + +func (_ snapshotter) Snapshot(*envoy_core.Node) (util_xds.Snapshot, error) { + snapshot := mads_cache.NewSnapshot("", []envoy_cache.Resource{}) + return &snapshot, nil +} diff --git a/pkg/mads/server/components.go b/pkg/mads/server/components.go new file mode 100644 index 000000000000..ac0485dcc6ac --- /dev/null +++ b/pkg/mads/server/components.go @@ -0,0 +1,61 @@ +package server + +import ( + "time" + + "github.com/go-logr/logr" + + "github.com/Kong/kuma/pkg/core" + core_runtime "github.com/Kong/kuma/pkg/core/runtime" + mads_reconcile "github.com/Kong/kuma/pkg/mads/reconcile" + util_watchdog "github.com/Kong/kuma/pkg/util/watchdog" + util_xds "github.com/Kong/kuma/pkg/util/xds" + + envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache" + envoy_xds "github.com/envoyproxy/go-control-plane/pkg/server" +) + +func NewSnapshotter() mads_reconcile.Snapshotter { + return mads_reconcile.NewSnapshotter() +} + +func NewVersioner() util_xds.SnapshotVersioner { + return util_xds.SnapshotAutoVersioner{UUID: core.NewUUID} +} + +func NewReconciler(hasher envoy_cache.NodeHash, cache util_xds.SnapshotCache, + snapshotter mads_reconcile.Snapshotter, versioner util_xds.SnapshotVersioner) mads_reconcile.Reconciler { + return mads_reconcile.NewReconciler(hasher, cache, snapshotter, versioner) +} + +func NewSyncTracker(rt core_runtime.Runtime, reconciler mads_reconcile.Reconciler) envoy_xds.Callbacks { + return util_xds.NewWatchdogCallbacks(func(node *envoy_core.Node, _ int64) (util_watchdog.Watchdog, error) { + return &util_watchdog.SimpleWatchdog{ + NewTicker: func() *time.Ticker { + return time.NewTicker(1 * time.Second) + }, + OnTick: func() error { + madsServerLog.V(1).Info("on tick") + return reconciler.Reconcile(node) + }, + OnError: func(err error) { + madsServerLog.Error(err, "OnTick() failed") + }, + }, nil + }) +} + +func NewXdsContext(log logr.Logger) (envoy_cache.NodeHash, util_xds.SnapshotCache) { + hasher := hasher{} + logger := util_xds.NewLogger(log) + return hasher, util_xds.NewSnapshotCache(false, hasher, logger) +} + +type hasher struct { +} + +func (_ hasher) ID(node *envoy_core.Node) string { + // in the very first implementation, we don't differentiate clients + return "" +} diff --git a/pkg/mads/server/grpc.go b/pkg/mads/server/grpc.go new file mode 100644 index 000000000000..b70c52f5f067 --- /dev/null +++ b/pkg/mads/server/grpc.go @@ -0,0 +1,65 @@ +package server + +import ( + "fmt" + "net" + + "google.golang.org/grpc" + + kuma_observability "github.com/Kong/kuma/api/observability/v1alpha1" + + mads_config "github.com/Kong/kuma/pkg/config/mads" + "github.com/Kong/kuma/pkg/core" + core_runtime "github.com/Kong/kuma/pkg/core/runtime" +) + +const grpcMaxConcurrentStreams = 1000000 + +var ( + grpcServerLog = core.Log.WithName("mads-server").WithName("grpc") +) + +type grpcServer struct { + server Server + config mads_config.MonitoringAssignmentServerConfig +} + +// Make sure that grpcServer implements all relevant interfaces +var ( + _ core_runtime.Component = &grpcServer{} +) + +func (s *grpcServer) Start(stop <-chan struct{}) error { + var grpcOptions []grpc.ServerOption + grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams)) + grpcServer := grpc.NewServer(grpcOptions...) + + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.config.GrpcPort)) + if err != nil { + return err + } + + // register services + kuma_observability.RegisterMonitoringAssignmentDiscoveryServiceServer(grpcServer, s.server) + + errChan := make(chan error) + go func() { + defer close(errChan) + if err = grpcServer.Serve(lis); err != nil { + grpcServerLog.Error(err, "terminated with an error") + errChan <- err + } else { + grpcServerLog.Info("terminated normally") + } + }() + grpcServerLog.Info("starting", "port", s.config.GrpcPort) + + select { + case <-stop: + grpcServerLog.Info("stopping gracefully") + grpcServer.GracefulStop() + return nil + case err := <-errChan: + return err + } +} diff --git a/pkg/mads/server/mads.go b/pkg/mads/server/mads.go new file mode 100644 index 000000000000..ad899ae888bb --- /dev/null +++ b/pkg/mads/server/mads.go @@ -0,0 +1,257 @@ +package server + +import ( + "context" + "strconv" + "sync/atomic" + + "github.com/go-logr/logr" + "github.com/pkg/errors" + + "github.com/golang/protobuf/proto" + any "github.com/golang/protobuf/ptypes/any" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + kuma_observability "github.com/Kong/kuma/api/observability/v1alpha1" + "github.com/Kong/kuma/pkg/mads" + + envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" + envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache" + envoy_server "github.com/envoyproxy/go-control-plane/pkg/server" +) + +type Server interface { + kuma_observability.MonitoringAssignmentDiscoveryServiceServer +} + +func NewServer(config envoy_cache.Cache, callbacks envoy_server.Callbacks, log logr.Logger) Server { + return &server{cache: config, callbacks: callbacks, log: log} +} + +type server struct { + cache envoy_cache.Cache + callbacks envoy_server.Callbacks + + // streamCount for counting bi-di streams + streamCount int64 + + log logr.Logger +} + +type stream interface { + grpc.ServerStream + + Send(*envoy.DiscoveryResponse) error + Recv() (*envoy.DiscoveryRequest, error) +} + +// watches for all xDS resource types +type watches struct { + assignments chan envoy_cache.Response + + assignmentsCancel func() + + assignmentsNonce string +} + +// Cancel all watches +func (values watches) Cancel() { + if values.assignmentsCancel != nil { + values.assignmentsCancel() + } +} + +func createResponse(resp *envoy_cache.Response, typeURL string) (*envoy.DiscoveryResponse, error) { + if resp == nil { + return nil, errors.New("missing response") + } + resources := make([]*any.Any, len(resp.Resources)) + for i := 0; i < len(resp.Resources); i++ { + // Envoy relies on serialized protobuf bytes for detecting changes to the resources. + // This requires deterministic serialization. + b := proto.NewBuffer(nil) + b.SetDeterministic(true) + err := b.Marshal(resp.Resources[i]) + if err != nil { + return nil, err + } + resources[i] = &any.Any{ + TypeUrl: typeURL, + Value: b.Bytes(), + } + } + out := &envoy.DiscoveryResponse{ + VersionInfo: resp.Version, + Resources: resources, + TypeUrl: typeURL, + } + return out, nil +} + +// process handles a bi-di stream request +func (s *server) process(stream stream, reqCh <-chan *envoy.DiscoveryRequest, defaultTypeURL string) (err error) { + // increment stream count + streamID := atomic.AddInt64(&s.streamCount, 1) + + log := s.log.WithValues("streamID", streamID) + defer func() { + if err != nil { + log.Error(err, "xDS stream terminated with an error") + } + }() + + // unique nonce generator for req-resp pairs per xDS stream; the server + // ignores stale nonces. nonce is only modified within send() function. + var streamNonce int64 + + // a collection of watches per request type + var values watches + defer func() { + values.Cancel() + if s.callbacks != nil { + s.callbacks.OnStreamClosed(streamID) + } + }() + + // sends a response by serializing to protobuf Any + send := func(resp envoy_cache.Response, typeURL string) (string, error) { + out, err := createResponse(&resp, typeURL) + if err != nil { + return "", err + } + + // increment nonce + streamNonce++ + out.Nonce = strconv.FormatInt(streamNonce, 10) + if s.callbacks != nil { + s.callbacks.OnStreamResponse(streamID, &resp.Request, out) + } + + return out.Nonce, stream.Send(out) + } + + if s.callbacks != nil { + if err := s.callbacks.OnStreamOpen(stream.Context(), streamID, defaultTypeURL); err != nil { + return err + } + } + + // node may only be set on the first discovery request + var node = &envoy_core.Node{} + + for { + select { + case resp, more := <-values.assignments: + if !more { + return status.Errorf(codes.Unavailable, "MonitoringAssignment watch failed") + } + nonce, err := send(resp, mads.MonitoringAssignmentType) + if err != nil { + return err + } + values.assignmentsNonce = nonce + + case req, more := <-reqCh: + // input stream ended or errored out + if !more { + return nil + } + if req == nil { + return status.Errorf(codes.Unavailable, "empty request") + } + + // node field in discovery request is delta-compressed + if req.Node != nil { + node = req.Node + } else { + req.Node = node + } + + // nonces can be reused across streams; we verify nonce only if nonce is not initialized + nonce := req.GetResponseNonce() + + // type URL is required for ADS but is implicit for xDS + if req.TypeUrl == "" { + req.TypeUrl = defaultTypeURL + } + + if s.callbacks != nil { + if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { + return err + } + } + + // cancel existing watches to (re-)request a newer version + switch { + case req.TypeUrl == mads.MonitoringAssignmentType && (values.assignmentsNonce == "" || values.assignmentsNonce == nonce): + if values.assignmentsCancel != nil { + values.assignmentsCancel() + } + values.assignments, values.assignmentsCancel = s.cache.CreateWatch(*req) + } + } + } +} + +// handler converts a blocking read call to channels and initiates stream processing +func (s *server) handler(stream stream, typeURL string) error { + // a channel for receiving incoming requests + reqCh := make(chan *envoy.DiscoveryRequest) + reqStop := int32(0) + go func() { + for { + req, err := stream.Recv() + if atomic.LoadInt32(&reqStop) != 0 { + return + } + if err != nil { + close(reqCh) + return + } + reqCh <- req + } + }() + + err := s.process(stream, reqCh, typeURL) + + atomic.StoreInt32(&reqStop, 1) + + return err +} + +func (s *server) StreamMonitoringAssignments(stream kuma_observability.MonitoringAssignmentDiscoveryService_StreamMonitoringAssignmentsServer) error { + return s.handler(stream, mads.MonitoringAssignmentType) +} + +func (s *server) DeltaMonitoringAssignments(_ kuma_observability.MonitoringAssignmentDiscoveryService_DeltaMonitoringAssignmentsServer) error { + return errors.New("not implemented") +} + +func (s *server) FetchMonitoringAssignments(ctx context.Context, req *envoy.DiscoveryRequest) (*envoy.DiscoveryResponse, error) { + if req == nil { + return nil, status.Errorf(codes.Unavailable, "empty request") + } + req.TypeUrl = mads.MonitoringAssignmentType + return s.Fetch(ctx, req) +} + +// Fetch is the universal fetch method. +func (s *server) Fetch(ctx context.Context, req *envoy.DiscoveryRequest) (*envoy.DiscoveryResponse, error) { + if s.callbacks != nil { + if err := s.callbacks.OnFetchRequest(ctx, req); err != nil { + return nil, err + } + } + resp, err := s.cache.Fetch(ctx, *req) + if err != nil { + return nil, err + } + out, err := createResponse(resp, req.TypeUrl) + if s.callbacks != nil { + s.callbacks.OnFetchResponse(req, out) + } + return out, err +} diff --git a/pkg/mads/server/server.go b/pkg/mads/server/server.go new file mode 100644 index 000000000000..156155333c2f --- /dev/null +++ b/pkg/mads/server/server.go @@ -0,0 +1,29 @@ +package server + +import ( + "github.com/Kong/kuma/pkg/core" + + core_runtime "github.com/Kong/kuma/pkg/core/runtime" + util_xds "github.com/Kong/kuma/pkg/util/xds" +) + +var ( + madsServerLog = core.Log.WithName("mads-server") +) + +func SetupServer(rt core_runtime.Runtime) error { + hasher, cache := NewXdsContext(madsServerLog) + snapshotter := NewSnapshotter() + versioner := NewVersioner() + reconciler := NewReconciler(hasher, cache, snapshotter, versioner) + syncTracker := NewSyncTracker(rt, reconciler) + callbacks := util_xds.CallbacksChain{ + util_xds.LoggingCallbacks{Log: madsServerLog}, + syncTracker, + } + srv := NewServer(cache, callbacks, madsServerLog) + return core_runtime.Add( + rt, + &grpcServer{srv, *rt.Config().MonitoringAssignmentServer}, + ) +}