Skip to content

Commit

Permalink
feat(konnect): react to gateway clients updates in NodeAgent
Browse files Browse the repository at this point in the history
  • Loading branch information
czeslavo committed Mar 14, 2023
1 parent 0fa886b commit b973d15
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 85 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ Adding a new version? You'll need three changes:

> Release date: TBD
### Added

- Konnect Runtime Group's nodes are reactively updated on each discovered Gateway clients
change.
[#3727](https://github.com/Kong/kubernetes-ingress-controller/pull/3727)

### Fixed

- Fixed the issue where the status of an ingress is not updated when `secretName` is
Expand Down
67 changes: 48 additions & 19 deletions internal/konnect/node_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ const (
// GatewayInstance is a controlled kong gateway instance.
// its hostname and version will be used to update status of nodes corresponding to the instance in konnect.
type GatewayInstance struct {
hostname string
version string
Hostname string
Version string
}

// GatewayInstanceGetter is the interface to get currently running gateway instances in the kubernetes cluster.
type GatewayInstanceGetter interface {
GetGatewayInstances() ([]GatewayInstance, error)
}

type GatewayClientsChangesNotifier interface {
SubscribeToGatewayClientsChanges() (<-chan struct{}, bool)
}

// NodeAgent gets the running status of KIC node and controlled kong gateway nodes,
// and update their statuses to konnect.
type NodeAgent struct {
Expand All @@ -45,7 +49,8 @@ type NodeAgent struct {
configStatus atomic.Uint32
configStatusSubscriber dataplane.ConfigStatusSubscriber

gatewayInstanceGetter GatewayInstanceGetter
gatewayInstanceGetter GatewayInstanceGetter
gatewayClientsChangesNotifier GatewayClientsChangesNotifier
}

// NewNodeAgent creates a new node agent.
Expand All @@ -58,6 +63,7 @@ func NewNodeAgent(
client *NodeAPIClient,
configStatusSubscriber dataplane.ConfigStatusSubscriber,
gatewayGetter GatewayInstanceGetter,
gatewayClientsChangesNotifier GatewayClientsChangesNotifier,
) *NodeAgent {
if refreshPeriod < MinRefreshNodePeriod {
refreshPeriod = MinRefreshNodePeriod
Expand All @@ -67,10 +73,11 @@ func NewNodeAgent(
Version: version,
Logger: logger.
WithName("konnect-node").WithValues("runtime_group_id", client.RuntimeGroupID),
konnectClient: client,
refreshPeriod: refreshPeriod,
configStatusSubscriber: configStatusSubscriber,
gatewayInstanceGetter: gatewayGetter,
konnectClient: client,
refreshPeriod: refreshPeriod,
configStatusSubscriber: configStatusSubscriber,
gatewayInstanceGetter: gatewayGetter,
gatewayClientsChangesNotifier: gatewayClientsChangesNotifier,
}
a.configStatus.Store(uint32(dataplane.ConfigStatusOK))
return a
Expand All @@ -88,6 +95,8 @@ func (a *NodeAgent) Start(ctx context.Context) error {
// Run the goroutines only in case we succeeded to run initial update of nodes.
go a.updateNodeLoop(ctx)
go a.subscribeConfigStatus(ctx)
go a.subscribeToGatewayClientsChanges(ctx)

}

// We're waiting here as that's the manager.Runnable interface requirement to block until the context is done.
Expand Down Expand Up @@ -125,6 +134,26 @@ func (a *NodeAgent) subscribeConfigStatus(ctx context.Context) {
}
}

func (a *NodeAgent) subscribeToGatewayClientsChanges(ctx context.Context) {
gatewayClientsChangedCh, changesAreExpected := a.gatewayClientsChangesNotifier.SubscribeToGatewayClientsChanges()
if !changesAreExpected {
// There are no changes of gateway clients going to happen, we don't have to watch them.
return
}

for {
select {
case <-ctx.Done():
a.Logger.Info("subscribe gateway clients changes loop stopped", "message", ctx.Err().Error())
return
case <-gatewayClientsChangedCh:
if err := a.updateNodes(ctx); err != nil {
a.Logger.Error(err, "failed to update nodes after gateway clients changed")
}
}
}
}

// updateKICNode updates status of KIC node in konnect.
func (a *NodeAgent) updateKICNode(ctx context.Context, existingNodes []*NodeItem) error {
nodesWithSameName := []*NodeItem{}
Expand Down Expand Up @@ -227,42 +256,42 @@ func (a *NodeAgent) updateGatewayNodes(ctx context.Context, existingNodes []*Nod
}

for _, gateway := range gatewayInstances {
gatewayInstanceMap[gateway.hostname] = struct{}{}
nodes, ok := existingNodeMap[gateway.hostname]
gatewayInstanceMap[gateway.Hostname] = struct{}{}
nodes, ok := existingNodeMap[gateway.Hostname]

// hostname in existing nodes, should create a new node.
if !ok || len(nodes) == 0 {
createNodeReq := &CreateNodeRequest{
Hostname: gateway.hostname,
Version: gateway.version,
Hostname: gateway.Hostname,
Version: gateway.Version,
Type: nodeType,
LastPing: time.Now().Unix(),
}
newNode, err := a.konnectClient.CreateNode(ctx, createNodeReq)
if err != nil {
a.Logger.Error(err, "failed to create kong gateway node", "hostname", gateway.hostname)
a.Logger.Error(err, "failed to create kong gateway node", "hostname", gateway.Hostname)
} else {
a.Logger.Info("created kong gateway node", "hostname", gateway.hostname, "node_id", newNode.Item.ID)
a.Logger.Info("created kong gateway node", "hostname", gateway.Hostname, "node_id", newNode.Item.ID)
}
continue
}

// sort the nodes by last ping, and only reserve the latest node.
sortNodesByLastPing(nodes)
updateNodeReq := &UpdateNodeRequest{
Hostname: gateway.hostname,
Version: gateway.version,
Hostname: gateway.Hostname,
Version: gateway.Version,
Type: nodeType,
LastPing: time.Now().Unix(),
}
// update the latest node.
latestNode := nodes[0]
_, err := a.konnectClient.UpdateNode(ctx, latestNode.ID, updateNodeReq)
if err != nil {
a.Logger.Error(err, "failed to update kong gateway node", "hostname", gateway.hostname, "node_id", latestNode.ID)
a.Logger.Error(err, "failed to update kong gateway node", "hostname", gateway.Hostname, "node_id", latestNode.ID)
continue
}
a.Logger.V(util.DebugLevel).Info("updated kong gateway node", "hostname", gateway.hostname, "node_id", latestNode.ID)
a.Logger.V(util.DebugLevel).Info("updated kong gateway node", "hostname", gateway.Hostname, "node_id", latestNode.ID)
// succeeded to update node, remove the other outdated nodes.
for i := 1; i < len(nodes); i++ {
node := nodes[i]
Expand Down Expand Up @@ -379,8 +408,8 @@ func (p *GatewayClientGetter) GetGatewayInstances() ([]GatewayInstance, error) {
hostname = "gateway" + "_" + u.Host
}
gatewayInstances = append(gatewayInstances, GatewayInstance{
hostname: hostname,
version: kongVersion,
Hostname: hostname,
Version: kongVersion,
})
}

Expand Down
Loading

0 comments on commit b973d15

Please sign in to comment.