Skip to content

Commit

Permalink
refactor: give single responsibility to each method (#79)
Browse files Browse the repository at this point in the history
* refactor: give single responsibility to each method

client subsribes to config server using a new random client id
SubsribeToConfigPod method actually send messages to channel, so name is changed.

Signed-off-by: gatici <gulsum.atici@canonical.com>

* remove random client id generation

Signed-off-by: gatici <gulsum.atici@canonical.com>

* adding license header

Signed-off-by: gatici <gulsum.atici@canonical.com>

---------

Signed-off-by: gatici <gulsum.atici@canonical.com>
  • Loading branch information
gatici authored Oct 25, 2024
1 parent 3ba8dfc commit 7bc456f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 45 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.5.4-dev
1.5.4
109 changes: 65 additions & 44 deletions proto/client/gClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,16 @@ type ConfClient interface {
// GetConfigClientConn returns grpc connection object
GetConfigClientConn() *grpc.ClientConn

// Client Subscribing channel to ConfigPod to receive configuration
subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient)
// sendMessagesToChannel receives the configuration changes using stream
// send messages to communication channel
sendMessagesToChannel(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient)

// CheckGrpcConnectivity checks the connectivity status and
// subscribes to a stream of NetworkSlice if connectivity is ready
CheckGrpcConnectivity() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error)
// CheckGrpcConnectivity checks the connectivity status and returns the state of connectivity
CheckGrpcConnectivity() (state string)

// SubscribeToConfigServer Subscribes to a stream of NetworkSlice
// It returns a stream if subscription is successful else returns nil.
SubscribeToConfigServer() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error)
}

// ConnectToConfigServer this API is added to control metadata from NF clients
Expand All @@ -84,7 +88,7 @@ func (confClient *ConfigClient) PublishOnConfigChange(metadataFlag bool, stream
commChan := make(chan *protos.NetworkSliceResponse)
confClient.Channel = commChan
logger.GrpcLog.Debugln("a communication channel is created for ConfigServer")
go confClient.subscribeToConfigPod(commChan, stream)
go confClient.sendMessagesToChannel(commChan, stream)
return commChan
}

Expand Down Expand Up @@ -150,54 +154,71 @@ func (confClient *ConfigClient) GetConfigClientConn() *grpc.ClientConn {
return confClient.Conn
}

// CheckGrpcConnectivity checks the connectivity status and subscribes to a stream of NetworkSlice
// if connectivity is Ready. It returns a stream if connection is successful else returns nil.
func (confClient *ConfigClient) CheckGrpcConnectivity() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) {
logger.GrpcLog.Debugln("connectToGrpcServer")
myid := os.Getenv("HOSTNAME")
// CheckGrpcConnectivity checks the connectivity status and returns the state of connectivity
func (confClient *ConfigClient) CheckGrpcConnectivity() (state string) {
logger.GrpcLog.Debugln("checking GRPC connectivity status")
status := confClient.Conn.GetState()
if status == connectivity.Ready {
logger.GrpcLog.Debugln("connectivity ready")
rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: confClient.MetadataRequested}
if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil {
return stream, fmt.Errorf("failed to subscribe: %v", err)
}
return stream, nil
return "ready"
} else if status == connectivity.Idle {
return nil, fmt.Errorf("connectivity status idle")
return "idle"
} else {
return nil, fmt.Errorf("connectivity status not ready")
return "unconnected"
}
}

// subscribeToConfigPod subscribing channel to ConfigPod to receive configuration
// using stream and communication channel as inputs
func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) {
rsp, err := stream.Recv()
if err != nil {
logger.GrpcLog.Errorf("failed to receive message from stream: %v", err)
return
// SubscribeToConfigServer Subscribes to a stream of NetworkSlice
// It returns a stream if subscription is successful else returns nil.
func (confClient *ConfigClient) SubscribeToConfigServer() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) {
logger.GrpcLog.Debugln("SubscribeToConfigServer")
clientId := os.Getenv("HOSTNAME")
rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: clientId, MetadataRequested: confClient.MetadataRequested}
if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil {
return stream, fmt.Errorf("failed to subscribe: %v", err)
}
logger.GrpcLog.Debugln("subscribed to config server successfully")
return stream, nil
}

logger.GrpcLog.Infoln("stream message received")
logger.GrpcLog.Debugf("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter)
if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) {
// first time connection or config update
configPodRestartCounter = rsp.RestartCounter
if len(rsp.NetworkSlice) > 0 {
// always carries full config copy
logger.GrpcLog.Infoln("first time config received", rsp)
commChan <- rsp
} else if rsp.ConfigUpdated == 1 {
// config delete, all slices deleted
logger.GrpcLog.Infoln("complete config deleted")
// sendMessagesToChannel receives the configuration changes using stream
// and send messages to communication channel
func (confClient *ConfigClient) sendMessagesToChannel(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) {
for {
if stream == nil {
time.Sleep(time.Second * 30)
continue
}
rsp, err := stream.Recv()
if err != nil {
stream = nil
logger.GrpcLog.Errorf("failed to receive message: %v", err)
time.Sleep(time.Second * 5)
continue
}

if rsp != nil {
logger.GrpcLog.Infoln("stream message received")
logger.GrpcLog.Debugf("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter)
}
if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) {
// first time connection or config update
configPodRestartCounter = rsp.RestartCounter
if len(rsp.NetworkSlice) > 0 {
// always carries full config copy
logger.GrpcLog.Infoln("first time config received", rsp)
commChan <- rsp
} else if rsp.ConfigUpdated == 1 {
// config delete, all slices deleted
logger.GrpcLog.Infoln("complete config deleted")
commChan <- rsp
}
} else if len(rsp.NetworkSlice) > 0 {
logger.GrpcLog.Errorln("config received after config pod restart")
configPodRestartCounter = rsp.RestartCounter
commChan <- rsp
} else {
logger.GrpcLog.Errorln("config pod is restarted and no config received")
}
} else if len(rsp.NetworkSlice) > 0 {
logger.GrpcLog.Errorln("config received after config pod restart")
configPodRestartCounter = rsp.RestartCounter
commChan <- rsp
} else {
logger.GrpcLog.Errorln("config pod is restarted and no config received")
time.Sleep(time.Second * 5)
}
}

0 comments on commit 7bc456f

Please sign in to comment.