Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: divide subscribeToConfigPod method to manage GRPC server connection separately inside the modules #69

Merged
merged 7 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.5.2-dev
1.5.2
168 changes: 76 additions & 92 deletions proto/client/gClient.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// SPDX-FileCopyrightText: 2021 Open Networking Foundation <info@opennetworking.org>
//
// SPDX-FileCopyrightText: 2024 Canonical Ltd.
// SPDX-License-Identifier: Apache-2.0

package client

import (
"context"
"fmt"
"math/rand"
"os"
"time"
Expand Down Expand Up @@ -50,52 +51,46 @@ type ConfigClient struct {
}

type ConfClient interface {
// channel is created on which subscription is done.
// PublishOnConfigChange creates a channel to perform the subscription using it.
// On Receiving Configuration from ConfigServer, this api publishes
// on created channel and returns the channel
PublishOnConfigChange(bool) chan *protos.NetworkSliceResponse
PublishOnConfigChange(metadataRequested bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse
gatici marked this conversation as resolved.
Show resolved Hide resolved

// returns grpc connection object
GetConfigClientConn() *grpc.ClientConn
// getConfigClientConn returns grpc connection object
getConfigClientConn() *grpc.ClientConn

// Client Subscribing channel to ConfigPod to receive configuration
subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse)
subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient)

// CheckGrpcConnectivity checks the connectivity status and
// subscribes to a stream of NetworkSlice if connectivity is ready
CheckGrpcConnectivity() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error)
}

// This API is added to control metadata from NF Clients
func ConnectToConfigServer(host string) ConfClient {
confClient := CreateChannel(host, 10000)
// ConnectToConfigServer this API is added to control metadata from NF clients
// Connects to the ConfigServer using host address
func ConnectToConfigServer(host string) (ConfClient, error) {
confClient := CreateConfClient(host)
if confClient == nil {
logger.GrpcLog.Errorln("create grpc channel to config pod failed")
return nil
return nil, fmt.Errorf("create grpc channel to config pod failed")
}
return confClient
return confClient, nil
}

func (confClient *ConfigClient) PublishOnConfigChange(mdataFlag bool) chan *protos.NetworkSliceResponse {
confClient.MetadataRequested = mdataFlag
// PublishOnConfigChange creates a communication channel to publish the messages from ConfigServer to the channel
// then NFs gets the messages
func (confClient *ConfigClient) PublishOnConfigChange(metadataFlag bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse {
confClient.MetadataRequested = metadataFlag
commChan := make(chan *protos.NetworkSliceResponse)
confClient.Channel = commChan
go confClient.subscribeToConfigPod(commChan)
return commChan
}

// pass structr which has configChangeUpdate interface
func ConfigWatcher(webuiUri string) chan *protos.NetworkSliceResponse {
// var confClient *gClient.ConfigClient
// TODO: use port from configmap.
confClient := CreateChannel(webuiUri, 10000)
if confClient == nil {
logger.GrpcLog.Errorf("create grpc channel to config pod failed")
return nil
}
commChan := make(chan *protos.NetworkSliceResponse)
go confClient.subscribeToConfigPod(commChan)
logger.GrpcLog.Debugln("a communication channel is created for ConfigServer")
go confClient.subscribeToConfigPod(commChan, stream)
return commChan
}

func CreateChannel(host string, timeout uint32) ConfClient {
logger.GrpcLog.Infoln("create config client")
// CreateConfClient creates a GRPC client by connecting to GRPC server (host).
func CreateConfClient(host string) ConfClient {
logger.GrpcLog.Debugln("create config client")
// Second, check to see if we can reuse the gRPC connection for a new P4RT client
conn, err := newClientConnection(host)
if err != nil {
Expand Down Expand Up @@ -130,9 +125,9 @@ var retryPolicy = `{
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}}]}`

// newClientConnection opens a GRPC connection to the host
func newClientConnection(host string) (conn *grpc.ClientConn, err error) {
/* get connection */
logger.GrpcLog.Infoln("dial grpc connection:", host)
logger.GrpcLog.Debugln("dial grpc connection:", host)

bd := 1 * time.Second
mltpr := 1.0
Expand All @@ -144,76 +139,65 @@ func newClientConnection(host string) (conn *grpc.ClientConn, err error) {
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(kacp), grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithConnectParams(crt)}
conn, err = grpc.NewClient(host, dialOptions...)
if err != nil {
logger.GrpcLog.Errorln("grpc newclient err:", err)
return nil, err
return nil, fmt.Errorf("grpc newclient creation failed: %v", err)
}
conn.Connect()
// defer conn.Close()
return conn, err
return conn, nil
}

func (confClient *ConfigClient) GetConfigClientConn() *grpc.ClientConn {
// getConfigClientConn exposes the GRPC client connection
func (confClient *ConfigClient) getConfigClientConn() *grpc.ClientConn {
return confClient.Conn
}

func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse) {
logger.GrpcLog.Infoln("subscribeToConfigPod")
// CheckGrpcConnectivity checks the connectivity status and subscribes to a stream of NetworkSlice
// if connectivity is Ready. It returns a stream if connection is successful else returns nil.
func (confClient *ConfigClient) CheckGrpcConnectivity() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) {
logger.GrpcLog.Debugln("connectToGrpcServer")
myid := os.Getenv("HOSTNAME")
var stream protos.ConfigService_NetworkSliceSubscribeClient
for {
if stream == nil {
status := confClient.Conn.GetState()
var err error
if status == connectivity.Ready {
logger.GrpcLog.Infoln("connectivity ready")
rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: confClient.MetadataRequested}
if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil {
logger.GrpcLog.Errorf("failed to subscribe: %v", err)
time.Sleep(time.Second * 5)
// Retry on failure
continue
}
} else if status == connectivity.Idle {
logger.GrpcLog.Errorln("connectivity status idle, trying to connect again")
time.Sleep(time.Second * 5)
continue
} else {
logger.GrpcLog.Errorln("connectivity status not ready")
time.Sleep(time.Second * 5)
continue
}
}
rsp, err := stream.Recv()
if err != nil {
logger.GrpcLog.Errorf("failed to receive message: %v", err)
// Clearing the stream will force the client to resubscribe on next iteration
stream = nil
time.Sleep(time.Second * 5)
// Retry on failure
continue
status := confClient.Conn.GetState()
if status == connectivity.Ready {
logger.GrpcLog.Debugln("connectivity ready")
rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: confClient.MetadataRequested}
if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil {
return stream, fmt.Errorf("failed to subscribe: %v", err)
}
return stream, nil
} else if status == connectivity.Idle {
return nil, fmt.Errorf("connectivity status idle")
} else {
return nil, fmt.Errorf("connectivity status not ready")
}
}

logger.GrpcLog.Infoln("stream msg received")
logger.GrpcLog.Debugf("network slices %d, RC of configpod %d", len(rsp.NetworkSlice), rsp.RestartCounter)
if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) {
// first time connection or config update
configPodRestartCounter = rsp.RestartCounter
if len(rsp.NetworkSlice) > 0 {
// always carries full config copy
logger.GrpcLog.Infoln("first time config received", rsp)
commChan <- rsp
} else if rsp.ConfigUpdated == 1 {
// config delete , all slices deleted
logger.GrpcLog.Infoln("complete config deleted")
commChan <- rsp
}
} else if len(rsp.NetworkSlice) > 0 {
logger.GrpcLog.Errorln("config received after config pod restart")
// config received after config pod restart
configPodRestartCounter = rsp.RestartCounter
// subscribeToConfigPod subscribing channel to ConfigPod to receive configuration
// using stream and communication channel as inputs
func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) {
rsp, err := stream.Recv()
if err != nil {
logger.GrpcLog.Errorf("failed to receive message from stream: %v", err)
return
}

logger.GrpcLog.Infoln("stream message received")
logger.GrpcLog.Debugf("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter)
if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) {
gatici marked this conversation as resolved.
Show resolved Hide resolved
// first time connection or config update
configPodRestartCounter = rsp.RestartCounter
if len(rsp.NetworkSlice) > 0 {
// always carries full config copy
logger.GrpcLog.Infoln("first time config received", rsp)
commChan <- rsp
} else if rsp.ConfigUpdated == 1 {
// config delete, all slices deleted
logger.GrpcLog.Infoln("complete config deleted")
commChan <- rsp
} else {
logger.GrpcLog.Errorln("config pod is restarted and no config received")
}
} else if len(rsp.NetworkSlice) > 0 {
logger.GrpcLog.Errorln("config received after config pod restart")
configPodRestartCounter = rsp.RestartCounter
commChan <- rsp
} else {
logger.GrpcLog.Errorln("config pod is restarted and no config received")
}
}