Skip to content

Commit

Permalink
feat: update running status of KIC node to Konnect (#3533)
Browse files Browse the repository at this point in the history
* feat: update status of kic to konnect

* resolve conflicts and add flag to set period of uploading node status

* add comments

* move config status pub/sub to interface

* fix notify status

* move subscribe config status interface to dataplane
  • Loading branch information
randmonkey committed Feb 14, 2023
1 parent c63fbca commit ea2ed68
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 47 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ Adding a new version? You'll need three changes:
[#3521](https://github.com/Kong/kubernetes-ingress-controller/pull/3521)
- Leader election is enabled by default then kong admin service discovery is enabled.
[#3529](https://github.com/Kong/kubernetes-ingress-controller/pull/3529)
- Added flag `--konnect-refresh-node-period` to set the period of uploading
status of KIC instance to Konnect runtime group.
[#3533](https://github.com/Kong/kubernetes-ingress-controller/pull/3533)

### Fixed

Expand Down
1 change: 1 addition & 0 deletions internal/adminapi/konnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type KonnectConfig struct {
ConfigSynchronizationEnabled bool
RuntimeGroupID string
Address string
RefreshNodePeriod time.Duration
TLSClient TLSClientConfig
}

Expand Down
49 changes: 49 additions & 0 deletions internal/dataplane/config_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package dataplane

type ConfigStatus int

const (
// ConfigStatusOK: no error happens in translation from k8s objects to kong configuration
// and succeeded to apply kong configuration to kong gateway.
ConfigStatusOK ConfigStatus = iota
// ConfigStatusTranslationErrorHappened: error happened in translation of k8s objects
// but succeeded to apply kong configuration for remaining objects.
ConfigStatusTranslationErrorHappened
// ConfigStatusApplyFailed: failed to apply kong configurations.
ConfigStatusApplyFailed
)

type ConfigStatusNotifier interface {
NotifyConfigStatus(ConfigStatus)
}

type ConfigStatusSubscriber interface {
SubscribeConfigStatus() chan ConfigStatus
}

type NoOpConfigStatusNotifier struct{}

var _ ConfigStatusNotifier = NoOpConfigStatusNotifier{}

func (n NoOpConfigStatusNotifier) NotifyConfigStatus(status ConfigStatus) {
}

type ChannelConfigNotifier struct {
ch chan ConfigStatus
}

var _ ConfigStatusNotifier = &ChannelConfigNotifier{}

func (n *ChannelConfigNotifier) NotifyConfigStatus(status ConfigStatus) {
n.ch <- status
}

func (n *ChannelConfigNotifier) SubscribeConfigStatus() chan ConfigStatus {
return n.ch
}

func NewChannelConfigNotifier() *ChannelConfigNotifier {
return &ChannelConfigNotifier{
ch: make(chan ConfigStatus, 1),
}
}
48 changes: 35 additions & 13 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type KongClient struct {

// clientsProvider allows retrieving the most recent set of clients.
clientsProvider AdminAPIClientsProvider

// configStatusNotifier notifies status of cofiguring kong gateway.
configStatusNotifier ConfigStatusNotifier
}

// NewKongClient provides a new KongClient object after connecting to the
Expand All @@ -154,18 +157,19 @@ func NewKongClient(
// build the client object
cache := store.NewCacheStores()
c := &KongClient{
logger: logger,
ingressClass: ingressClass,
enableReverseSync: enableReverseSync,
skipCACertificates: skipCACertificates,
requestTimeout: timeout,
diagnostic: diagnostic,
prometheusMetrics: metrics.NewCtrlFuncMetrics(),
cache: &cache,
kongConfig: kongConfig,
eventRecorder: eventRecorder,
dbmode: dbMode,
clientsProvider: clientsProvider,
logger: logger,
ingressClass: ingressClass,
enableReverseSync: enableReverseSync,
skipCACertificates: skipCACertificates,
requestTimeout: timeout,
diagnostic: diagnostic,
prometheusMetrics: metrics.NewCtrlFuncMetrics(),
cache: &cache,
kongConfig: kongConfig,
eventRecorder: eventRecorder,
dbmode: dbMode,
clientsProvider: clientsProvider,
configStatusNotifier: NoOpConfigStatusNotifier{},
}

return c, nil
Expand Down Expand Up @@ -422,9 +426,19 @@ func (c *KongClient) Update(ctx context.Context) error {

shas, err := c.sendOutToClients(ctx, kongstate, formatVersion, c.kongConfig)
if err != nil {
c.configStatusNotifier.NotifyConfigStatus(ConfigStatusApplyFailed)
return err
}

// succeeded to apply configuration to Kong gateway.
// notify the receiver of config status that translation error happened when there are translation errors,
// otherwise notify that config status is OK.
if len(translationFailures) > 0 {
c.configStatusNotifier.NotifyConfigStatus(ConfigStatusTranslationErrorHappened)
} else {
c.configStatusNotifier.NotifyConfigStatus(ConfigStatusOK)
}

// report on configured Kubernetes objects if enabled
if c.AreKubernetesObjectReportsEnabled() {
// if the configuration SHAs that have just been pushed are different than
Expand Down Expand Up @@ -520,10 +534,18 @@ func handleSendToClientResult(client sendconfig.KonnectAwareClient, logger logru
}
return "", err
}

return newSHA, nil
}

// SetConfigStatusNotifier sets a notifier which notifies subscribers about configuration sending results.
// Currently it is used for uploading the node status to konnect runtime group.
func (c *KongClient) SetConfigStatusNotifier(n ConfigStatusNotifier) {
c.lock.Lock()
defer c.lock.Unlock()

c.configStatusNotifier = n
}

// -----------------------------------------------------------------------------
// Dataplane Client - Kong - Private
// -----------------------------------------------------------------------------
Expand Down
116 changes: 91 additions & 25 deletions internal/konnect/node_agent.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package konnect

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/go-logr/logr"

"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util"
)

const defaultRefreshNodeInterval = 30 * time.Second
const (
MinRefreshNodePeriod = 30 * time.Second
DefaultRefreshNodePeriod = 60 * time.Second
NodeOutdateInterval = 5 * time.Minute
)

type NodeAgent struct {
NodeID string
Expand All @@ -18,24 +25,44 @@ type NodeAgent struct {

Logger logr.Logger

konnectClient *NodeAPIClient
refreshInterval time.Duration
konnectClient *NodeAPIClient
refreshPeriod time.Duration

configStatus atomic.Uint32
configStatusSubscriber dataplane.ConfigStatusSubscriber
}

func NewNodeAgent(hostname string, version string, logger logr.Logger, client *NodeAPIClient) *NodeAgent {
return &NodeAgent{
func NewNodeAgent(
hostname string,
version string,
refreshPeriod time.Duration,
logger logr.Logger,
client *NodeAPIClient,
configStatusSubscriber dataplane.ConfigStatusSubscriber,
) *NodeAgent {
if refreshPeriod < MinRefreshNodePeriod {
refreshPeriod = MinRefreshNodePeriod
}
a := &NodeAgent{
Hostname: hostname,
Version: version,
Logger: logger.
WithName("konnect-node").WithValues("runtime_group_id", client.RuntimeGroupID),
konnectClient: client,
// TODO: set refresh interval by some flag
// https://github.com/Kong/kubernetes-ingress-controller/issues/3515
refreshInterval: defaultRefreshNodeInterval,
konnectClient: client,
refreshPeriod: refreshPeriod,
configStatusSubscriber: configStatusSubscriber,
}
a.configStatus.Store(uint32(dataplane.ConfigStatusOK))
return a
}

func (a *NodeAgent) createNode() error {
err := a.clearOutdatedNodes()
if err != nil {
// still continue to update the current status if cleanup failed.
a.Logger.Error(err, "failed to clear outdated nodes")
}

createNodeReq := &CreateNodeRequest{
ID: a.NodeID,
Hostname: a.Hostname,
Expand All @@ -45,7 +72,7 @@ func (a *NodeAgent) createNode() error {
}
resp, err := a.konnectClient.CreateNode(createNodeReq)
if err != nil {
return fmt.Errorf("failed to update node, hostname %s: %w", a.Hostname, err)
return fmt.Errorf("failed to create node, hostname %s: %w", a.Hostname, err)
}

a.NodeID = resp.Item.ID
Expand All @@ -60,7 +87,16 @@ func (a *NodeAgent) clearOutdatedNodes() error {
}

for _, node := range nodes.Items {
if node.Type == NodeTypeIngressController && node.Hostname != a.Hostname {
deleteNode := false
if node.Type == NodeTypeIngressController {
// nodes to remove:
// (1) since only one KIC node is allowed in a runtime group, all the nodes with other hostnames are considered outdated.
// (2) in some cases(kind/minikube restart), rebuilt pod uses the same name. So nodes updated for >5mins before should be deleted.
if node.Hostname != a.Hostname || time.Since(time.Unix(node.UpdatedAt, 0)) > NodeOutdateInterval {
deleteNode = true
}
}
if deleteNode {
a.Logger.V(util.DebugLevel).Info("remove outdated KIC node", "node_id", node.ID, "hostname", node.Hostname)
err := a.konnectClient.DeleteNode(node.ID)
if err != nil {
Expand All @@ -71,16 +107,40 @@ func (a *NodeAgent) clearOutdatedNodes() error {
return nil
}

func (a *NodeAgent) subscribeConfigStatus(ctx context.Context) {
ch := a.configStatusSubscriber.SubscribeConfigStatus()
chDone := ctx.Done()

for {
select {
case <-chDone:
a.Logger.Info("subscribe loop stopped", "message", ctx.Err().Error())
return
case configStatus := <-ch:
a.configStatus.Store(uint32(configStatus))
}
}
}

func (a *NodeAgent) updateNode() error {
err := a.clearOutdatedNodes()
if err != nil {
// still continue to update the current status if cleanup failed.
a.Logger.Error(err, "failed to clear outdated nodes")
return err
}

// TODO: retrieve the real state of KIC
// https://github.com/Kong/kubernetes-ingress-controller/issues/3515
ingressControllerStatus := IngressControllerStateOperational
var ingressControllerStatus IngressControllerState
configStatus := int(a.configStatus.Load())
switch dataplane.ConfigStatus(configStatus) {
case dataplane.ConfigStatusOK:
ingressControllerStatus = IngressControllerStateOperational
case dataplane.ConfigStatusTranslationErrorHappened:
ingressControllerStatus = IngressControllerStatePartialConfigFail
case dataplane.ConfigStatusApplyFailed:
ingressControllerStatus = IngressControllerStateInoperable
default:
ingressControllerStatus = IngressControllerStateUnknown
}

updateNodeReq := &UpdateNodeRequest{
Hostname: a.Hostname,
Expand All @@ -99,24 +159,30 @@ func (a *NodeAgent) updateNode() error {
return nil
}

func (a *NodeAgent) updateNodeLoop() {
ticker := time.NewTicker(a.refreshInterval)
func (a *NodeAgent) updateNodeLoop(ctx context.Context) {
ticker := time.NewTicker(a.refreshPeriod)
defer ticker.Stop()
// TODO: add some mechanism to break the loop
// https://github.com/Kong/kubernetes-ingress-controller/issues/3515
for range ticker.C {
err := a.updateNode()
if err != nil {
a.Logger.Error(err, "failed to update node", "node_id", a.NodeID)
for {
select {
case <-ctx.Done():
err := ctx.Err()
a.Logger.Info("update node loop stopped", "message", err.Error())
return
case <-ticker.C:
err := a.updateNode()
if err != nil {
a.Logger.Error(err, "failed to update node", "node_id", a.NodeID)
}
}
}
}

func (a *NodeAgent) Run() {
func (a *NodeAgent) Run(ctx context.Context) {
err := a.createNode()
if err != nil {
a.Logger.Error(err, "failed to create node, agent abort")
return
}
go a.updateNodeLoop()
go a.updateNodeLoop(ctx)
go a.subscribeConfigStatus(ctx)
}
2 changes: 2 additions & 0 deletions internal/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/kong/kubernetes-ingress-controller/v2/internal/annotations"
"github.com/kong/kubernetes-ingress-controller/v2/internal/controllers/gateway"
"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
"github.com/kong/kubernetes-ingress-controller/v2/internal/konnect"
"github.com/kong/kubernetes-ingress-controller/v2/internal/manager/featuregates"
)

Expand Down Expand Up @@ -238,6 +239,7 @@ func (c *Config) FlagSet() *pflag.FlagSet {
flagSet.StringVar(&c.Konnect.TLSClient.CertFile, "konnect-tls-client-cert-file", "", "Konnect TLS client certificate file path.")
flagSet.StringVar(&c.Konnect.TLSClient.Key, "konnect-tls-client-key", "", "Konnect TLS client key.")
flagSet.StringVar(&c.Konnect.TLSClient.KeyFile, "konnect-tls-client-key-file", "", "Konnect TLS client key file path.")
flagSet.DurationVar(&c.Konnect.RefreshNodePeriod, "konnect-refresh-node-period", konnect.DefaultRefreshNodePeriod, "Period of uploading status of KIC and controlled kong gateway instances")

// Deprecated flags
_ = flagSet.Float32("sync-rate-limit", dataplane.DefaultSyncSeconds, "Use --proxy-sync-seconds instead")
Expand Down
29 changes: 20 additions & 9 deletions internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,23 +184,34 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d
// In case of failures when building Konnect related objects, we're not returning errors as Konnect is not
// considered critical feature, and it should not break the basic functionality of the controller.

konnectAdminAPIClient, err := adminapi.NewKongClientForKonnectRuntimeGroup(ctx, c.Konnect)
if err != nil {
setupLog.Error(err, "failed creating Konnect Runtime Group Admin API client, skipping synchronisation")
} else {
setupLog.Info("Initialized Konnect Admin API client")
clientsManager.SetKonnectClient(konnectAdminAPIClient)
}

setupLog.Info("Start Konnect client to register runtime instances to Konnect")
konnectNodeAPIClient, err := konnect.NewNodeAPIClient(c.Konnect)
if err != nil {
setupLog.Error(err, "failed creating konnect client, skipping running NodeAgent")
} else {
hostname, _ := os.Hostname()
version := metadata.Release
agent := konnect.NewNodeAgent(hostname, version, setupLog, konnectNodeAPIClient)
agent.Run()
}
// set channel to send config status.
configStatusNotifier := dataplane.NewChannelConfigNotifier()
dataplaneClient.SetConfigStatusNotifier(configStatusNotifier)

konnectAdminAPIClient, err := adminapi.NewKongClientForKonnectRuntimeGroup(ctx, c.Konnect)
if err != nil {
setupLog.Error(err, "failed creating Konnect Runtime Group Admin API client, skipping synchronisation")
} else {
setupLog.Info("Initialized Konnect Admin API client")
clientsManager.SetKonnectClient(konnectAdminAPIClient)
agent := konnect.NewNodeAgent(
hostname,
version,
c.Konnect.RefreshNodePeriod,
setupLog,
konnectNodeAPIClient,
configStatusNotifier,
)
agent.Run(ctx)
}
}

Expand Down

0 comments on commit ea2ed68

Please sign in to comment.