From 581ac4d78a19bef7224744a2b67419e55a7dc444 Mon Sep 17 00:00:00 2001 From: Sviatoslav Boichuk Date: Wed, 2 Oct 2024 11:49:05 +0300 Subject: [PATCH] CGW refactoring: logging and error handling! --- src/cgw_app_args.rs | 91 ++++---- src/cgw_connection_processor.rs | 72 ++++--- src/cgw_connection_server.rs | 233 +++++++++++---------- src/cgw_db_accessor.rs | 42 ++-- src/cgw_devices_cache.rs | 11 +- src/cgw_errors.rs | 45 +++- src/cgw_metrics.rs | 16 +- src/cgw_nb_api_listener.rs | 22 +- src/cgw_remote_discovery.rs | 203 +++++++++++------- src/cgw_remote_server.rs | 8 +- src/cgw_runtime.rs | 6 +- src/cgw_tls.rs | 31 ++- src/cgw_ucentral_ap_parser.rs | 62 +++--- src/cgw_ucentral_messages_queue_manager.rs | 11 +- src/cgw_ucentral_parser.rs | 18 +- src/cgw_ucentral_switch_parser.rs | 12 +- src/cgw_ucentral_topology_map.rs | 15 +- src/main.rs | 31 +-- 18 files changed, 500 insertions(+), 429 deletions(-) diff --git a/src/cgw_app_args.rs b/src/cgw_app_args.rs index c5359bd..4f8004d 100644 --- a/src/cgw_app_args.rs +++ b/src/cgw_app_args.rs @@ -69,10 +69,9 @@ impl CGWWSSArgs { let wss_t_num: usize = match env::var("DEFAULT_WSS_THREAD_NUM") { Ok(val) => match val.parse() { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse DEFAULT_WSS_THREAD_NUM! Invalid value: {}", - val + "Failed to parse DEFAULT_WSS_THREAD_NUM! Invalid value: {val}! Error: {e}" ))); } }, @@ -82,10 +81,9 @@ impl CGWWSSArgs { let wss_ip: Ipv4Addr = match env::var("CGW_WSS_IP") { Ok(val) => match Ipv4Addr::from_str(val.as_str()) { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_WSS_IP! Invalid value: {}", - val + "Failed to parse CGW_WSS_IP! Invalid value: {val}! Error: {e}" ))); } }, @@ -95,10 +93,9 @@ impl CGWWSSArgs { let wss_port: u16 = match env::var("CGW_WSS_PORT") { Ok(val) => match val.parse() { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_WSS_PORT! Invalid value: {}", - val + "Failed to parse CGW_WSS_PORT! Invalid value: {val}! Error: {e}" ))); } }, @@ -141,10 +138,9 @@ impl CGWGRPCArgs { let grpc_listening_ip: Ipv4Addr = match env::var("CGW_GRPC_LISTENING_IP") { Ok(val) => match Ipv4Addr::from_str(val.as_str()) { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_GRPC_LISTENING_IP! Invalid value: {}", - val + "Failed to parse CGW_GRPC_LISTENING_IP! Invalid value: {val}! Error: {e}" ))); } }, @@ -154,10 +150,9 @@ impl CGWGRPCArgs { let grpc_listening_port: u16 = match env::var("CGW_GRPC_LISTENING_PORT") { Ok(val) => match val.parse() { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_GRPC_LISTENING_PORT! Invalid value: {}", - val + "Failed to parse CGW_GRPC_LISTENING_PORT! Invalid value: {val}! Error: {e}" ))); } }, @@ -181,10 +176,9 @@ impl CGWGRPCArgs { let grpc_public_port: u16 = match env::var("CGW_GRPC_PUBLIC_PORT") { Ok(val) => match val.parse() { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_GRPC_PUBLIC_PORT! Invalid value: {}", - val + "Failed to parse CGW_GRPC_PUBLIC_PORT! Invalid value: {val}! Error: {e}" ))); } }, @@ -232,10 +226,9 @@ impl CGWKafkaArgs { let kafka_port: u16 = match env::var("CGW_KAFKA_PORT") { Ok(val) => match val.parse() { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_KAFKA_PORT! Invalid value: {}", - val + "Failed to parse CGW_KAFKA_PORT! Invalid value: {val}! Error: {e}" ))); } }, @@ -290,10 +283,9 @@ impl CGWDBArgs { let db_port: u16 = match env::var("CGW_DB_PORT") { Ok(val) => match val.parse() { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_DB_PORT! Invalid value: {}", - val + "Failed to parse CGW_DB_PORT! Invalid value: {val}! Error: {e}" ))); } }, @@ -352,10 +344,9 @@ impl CGWRedisArgs { let redis_port: u16 = match env::var("CGW_REDIS_PORT") { Ok(val) => match val.parse() { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_REDIS_PORT! Invalid value: {}", - val + "Failed to parse CGW_REDIS_PORT! Invalid value: {val}! Error: {e}" ))); } }, @@ -408,10 +399,9 @@ impl CGWMetricsArgs { let metrics_port: u16 = match env::var("CGW_METRICS_PORT") { Ok(val) => match val.parse() { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_METRICS_PORT! Invalid value: {}", - val + "Failed to parse CGW_METRICS_PORT! Invalid value: {val}! Error: {e}" ))); } }, @@ -447,8 +437,7 @@ impl CGWValidationSchemaArgs { Ok(path) => CGWValionSchemaRef::SchemaPath(path), Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_UCENTRAL_AP_DATAMODEL_URI! Invalid URI: {}, err: {}", - uri, e + "Failed to parse CGW_UCENTRAL_AP_DATAMODEL_URI! Invalid URI: {uri}! Error: {e}" ))); } } @@ -457,8 +446,7 @@ impl CGWValidationSchemaArgs { Ok(url) => CGWValionSchemaRef::SchemaUri(url), Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_UCENTRAL_AP_DATAMODEL_URI! Invalid URI: {}, err: {}", - uri, e + "Failed to parse CGW_UCENTRAL_AP_DATAMODEL_URI! Invalid URI: {uri}! Error: {e}" ))); } } @@ -470,8 +458,7 @@ impl CGWValidationSchemaArgs { Ok(uri) => CGWValionSchemaRef::SchemaUri(uri), Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse default CGW_UCENTRAL_AP_DATAMODEL_URI! Invalid URI: {}, err: {}", - CGW_DEFAULT_UCENTRAL_AP_DATAMODEL_URI, e + "Failed to parse default CGW_UCENTRAL_AP_DATAMODEL_URI! Invalid URI: {CGW_DEFAULT_UCENTRAL_AP_DATAMODEL_URI}! Error: {e}" ))); } }, @@ -487,8 +474,7 @@ impl CGWValidationSchemaArgs { Ok(path) => CGWValionSchemaRef::SchemaPath(path), Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_UCENTRAL_SWITCH_DATAMODEL_URI! Invalid URI: {}, err: {}", - uri, e + "Failed to parse CGW_UCENTRAL_SWITCH_DATAMODEL_URI! Invalid URI: {uri}! Error: {e}" ))); } } @@ -497,8 +483,7 @@ impl CGWValidationSchemaArgs { Ok(url) => CGWValionSchemaRef::SchemaUri(url), Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_UCENTRAL_SWITCH_DATAMODEL_URI! Invalid URI: {}, err: {}", - uri, e + "Failed to parse CGW_UCENTRAL_SWITCH_DATAMODEL_URI! Invalid URI: {uri}! Error: {e}" ))); } } @@ -509,8 +494,7 @@ impl CGWValidationSchemaArgs { Ok(url) => CGWValionSchemaRef::SchemaUri(url), Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse default CGW_UCENTRAL_SWITCH_DATAMODEL_URI! Invalid value: {}, err: {}", - CGW_DEFAULT_UCENTRAL_SWITCH_DATAMODEL_URI, e + "Failed to parse default CGW_UCENTRAL_SWITCH_DATAMODEL_URI! Invalid value: {CGW_DEFAULT_UCENTRAL_SWITCH_DATAMODEL_URI}! Error: {e}" ))); } }, @@ -567,8 +551,7 @@ impl AppArgs { Ok(v) => v, Err(_e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_LOG_LEVEL! Invalid value: {}", - val + "Failed to parse CGW_LOG_LEVEL! Invalid value: {val}! Error: (unknown)" ))); } }, @@ -578,10 +561,9 @@ impl AppArgs { let cgw_id: i32 = match env::var("CGW_ID") { Ok(val) => match val.parse() { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_ID! Invalid value: {}", - val + "Failed to parse CGW_ID! Invalid value: {val}! Error: {e}" ))); } }, @@ -591,10 +573,9 @@ impl AppArgs { let cgw_groups_capacity: i32 = match env::var("CGW_GROUPS_CAPACITY") { Ok(val) => match val.parse() { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_GROUPS_CAPACITY! Invalid value: {}", - val + "Failed to parse CGW_GROUPS_CAPACITY! Invalid value: {val}! Error: {e}" ))); } }, @@ -604,10 +585,9 @@ impl AppArgs { let cgw_groups_threshold: i32 = match env::var("CGW_GROUPS_THRESHOLD") { Ok(val) => match val.parse() { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_GROUPS_CAPACITY! Invalid value: {}", - val + "Failed to parse CGW_GROUPS_CAPACITY! Invalid value: {val}! Error: {e}" ))); } }, @@ -617,10 +597,9 @@ impl AppArgs { let cgw_group_infras_capacity: i32 = match env::var("CGW_GROUP_INFRAS_CAPACITY") { Ok(val) => match val.parse() { Ok(v) => v, - Err(_e) => { + Err(e) => { return Err(Error::AppArgsParser(format!( - "Failed to parse CGW_GROUP_INFRAS_CAPACITY! Invalid value: {}", - val + "Failed to parse CGW_GROUP_INFRAS_CAPACITY! Invalid value: {val}! Error: {e}" ))); } }, diff --git a/src/cgw_connection_processor.rs b/src/cgw_connection_processor.rs index 70633ac..3f84669 100644 --- a/src/cgw_connection_processor.rs +++ b/src/cgw_connection_processor.rs @@ -64,6 +64,15 @@ enum CGWUCentralMessageProcessorState { ResultPending, } +impl std::fmt::Display for CGWUCentralMessageProcessorState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CGWUCentralMessageProcessorState::Idle => write!(f, "Idle"), + CGWUCentralMessageProcessorState::ResultPending => write!(f, "ResultPending"), + } + } +} + pub struct CGWConnectionProcessor { cgw_server: Arc, pub serial: MacAddress, @@ -109,15 +118,15 @@ impl CGWConnectionProcessor { match _val { Some(m) => m, None => { - error!("no connect message received from {}, closing connection", self.addr); + error!("No connect message received from: {}! Closing connection!", self.addr); return Err(Error::ConnectionProcessor("No connect message received")); } } } // TODO: configurable duration (upon server creation) _val = sleep(Duration::from_millis(30000)) => { - error!("no message received from {}, closing connection", self.addr); - return Err(Error::ConnectionProcessor("No message receive for too long")); + error!("No message received from: {}! Closing connection", self.addr); + return Err(Error::ConnectionProcessor("No message received for too long")); } }; @@ -127,7 +136,7 @@ impl CGWConnectionProcessor { Ok(m) => m, Err(e) => { error!( - "established connection with device, but failed to receive any messages\n{e}" + "Established connection with device, but failed to receive any messages! Error: {e}" ); return Err(Error::ConnectionProcessor( "Established connection with device, but failed to receive any messages", @@ -143,7 +152,7 @@ impl CGWConnectionProcessor { } Err(_e) => { error!( - "failed to recv connect message from {}, closing connection", + "Failed to receive connect message from: {}! Closing connection!", self.addr ); return Err(Error::ConnectionProcessor( @@ -164,14 +173,17 @@ impl CGWConnectionProcessor { )); } else { debug!( - "The client MAC address {} and clinet certificate CN {} chech passed!", + "The client MAC address {} and clinet certificate CN {} chech passed", evt.serial.to_hex_string(), client_cn.to_hex_string() ); } } - debug!("Done Parse Connect Event {}", evt.serial.to_hex_string()); + debug!( + "Parse Connect Event done! Device serial: {}", + evt.serial.to_hex_string() + ); let mut caps: CGWDeviceCapabilities = Default::default(); match evt.evt_type { @@ -184,21 +196,27 @@ impl CGWConnectionProcessor { caps.label_macaddr = c.capabilities.label_macaddr; } _ => warn!( - "Device {} is not abiding the protocol: first message - CONNECT - expected", + "Device {} is not abiding the protocol! First message expected to receive: CONNECT!", evt.serial ), } self.serial = evt.serial; - self.device_type = CGWDeviceType::from_str(caps.platform.as_str())?; + self.device_type = match CGWDeviceType::from_str(caps.platform.as_str()) { + Ok(dev_type) => dev_type, + Err(_) => { + warn!("Failed to parse device {} type!", self.serial); + return Err(Error::ConnectionProcessor("Failed to parse device type")); + } + }; // TODO: we accepted tls stream and split the WS into RX TX part, // now we have to ASK cgw_connection_server's permission whether // we can proceed on with this underlying connection. // cgw_connection_server has an authorative decision whether // we can proceed. - debug!("Sending ACK req for {}", self.serial); + debug!("Sending ACK request for device serial: {}", self.serial); let (mbox_tx, mut mbox_rx) = unbounded_channel::(); let msg = CGWConnectionServerReqMsg::AddNewConnection(evt.serial, caps, mbox_tx); self.cgw_server @@ -206,26 +224,26 @@ impl CGWConnectionProcessor { .await; let ack = mbox_rx.recv().await; - debug!("GOT ACK resp for {}", self.serial); + debug!("Got ACK response for device serial: {}", self.serial); if let Some(m) = ack { match m { CGWConnectionProcessorReqMsg::AddNewConnectionAck(gid) => { debug!( - "websocket connection established: {} {} gid {gid}", + "WebSocket connection established! Address: {}, serial: {} gid {gid}", self.addr, evt.serial ); self.group_id = gid; } _ => { return Err(Error::ConnectionProcessor( - "Unexpected response from server, expected ACK/NOT ACK)", + "Unexpected response from server! Expected: ACK/NOT ACK", )); } } } else { - info!("connection server declined connection, websocket connection {} {} cannot be established", + info!("Connection server declined connection! WebSocket connection for address: {}, serial: {} cannot be established!", self.addr, evt.serial); - return Err(Error::ConnectionProcessor("Websocker connection declined")); + return Err(Error::ConnectionProcessor("WebSocket connection declined")); } // Remove device from disconnected device list @@ -309,7 +327,7 @@ impl CGWConnectionProcessor { } else if let CGWUCentralEventType::Reply(content) = evt.evt_type { if *fsm_state != CGWUCentralMessageProcessorState::ResultPending { error!( - "Unexpected FSM {:?} state! Expected: ResultPending", + "Unexpected FSM state: {}! Expected: ResultPending", *fsm_state ); } @@ -322,7 +340,7 @@ impl CGWConnectionProcessor { } *fsm_state = CGWUCentralMessageProcessorState::Idle; - debug!("Got reply event for pending request id: {}", pending_req_id); + debug!("Got reply event for pending request id: {pending_req_id}"); } else if let CGWUCentralEventType::RealtimeEvent(_) = evt.evt_type { if self.feature_topomap_enabled { let topo_map = CGWUCentralTopologyMap::get_ref(); @@ -372,11 +390,11 @@ impl CGWConnectionProcessor { let processor_mac = self.serial; match msg { CGWConnectionProcessorReqMsg::AddNewConnectionShouldClose => { - debug!("MBOX_IN: AddNewConnectionShouldClose, processor (mac:{processor_mac}) (ACK OK)"); + debug!("process_sink_mbox_rx_msg: AddNewConnectionShouldClose, processor (mac:{processor_mac}) (ACK OK)"); return Ok(CGWConnectionState::IsForcedToClose); } CGWConnectionProcessorReqMsg::SinkRequestToDevice(pload) => { - debug!("MBOX_IN: SinkRequestToDevice, processor (mac:{processor_mac}) req for (mac:{}) payload:{}", + debug!("process_sink_mbox_rx_msg: SinkRequestToDevice, processor (mac: {processor_mac}) request for (mac: {}) payload: {}", pload.command.serial, pload.message.clone(), ); @@ -384,15 +402,15 @@ impl CGWConnectionProcessor { } CGWConnectionProcessorReqMsg::GroupIdChanged(new_group_id) => { debug!( - "Mac {} received gid {} -> {} change request", + "Received GroupID change message: mac {} - old gid {} : new gid {}", self.serial, self.group_id, new_group_id ); self.group_id = new_group_id; } _ => { - warn!("Received unknown mbox message {:?}", msg); + warn!("Received unknown mbox message: {:?}!", msg); return Err(Error::ConnectionProcessor( - "Sink MBOX: received unexpected message", + "Connection processor (Sink MBOX): received unexpected message", )); } } @@ -601,26 +619,26 @@ impl CGWConnectionProcessor { return; } else if let CGWConnectionState::ClosedGracefully = state { warn!( - "Remote client {} closed connection gracefully", + "Remote client {} closed connection gracefully!", self.serial.to_hex_string() ); return self.send_connection_close_event().await; } else if let CGWConnectionState::IsStale = state { warn!( - "Remote client {} closed due to inactivity", + "Remote client {} closed due to inactivity!", self.serial.to_hex_string() ); return self.send_connection_close_event().await; } else if let CGWConnectionState::IsDead = state { warn!( - "Remote client {} connection is dead", + "Remote client {} connection is dead!", self.serial.to_hex_string() ); return self.send_connection_close_event().await; } } Err(e) => { - warn!("{:?}", e); + warn!("Connection processor closed! Error: {e}"); return self.send_connection_close_event().await; } } @@ -633,7 +651,7 @@ impl CGWConnectionProcessor { .enqueue_mbox_message_to_cgw_server(msg) .await; debug!( - "MBOX_OUT: ConnectionClosed, processor (mac:{})", + "MBOX_OUT: ConnectionClosed, processor (mac: {})", self.serial.to_hex_string() ); } diff --git a/src/cgw_connection_server.rs b/src/cgw_connection_server.rs index 44f0255..3ed744a 100644 --- a/src/cgw_connection_server.rs +++ b/src/cgw_connection_server.rs @@ -204,9 +204,8 @@ impl CGWConnectionServer { }, Err(e) => { return Err(Error::ConnectionServer(format!( - "Failed to get runtime type {:?}, err: {}", - CGWRuntimeType::WssRxTx, - e + "Failed to get runtime type {:?}! Error: {e}", + CGWRuntimeType::WssRxTx ))); } }; @@ -223,9 +222,8 @@ impl CGWConnectionServer { }, Err(e) => { return Err(Error::ConnectionServer(format!( - "Failed to get runtime type {:?}, err: {}", - CGWRuntimeType::WssRxTx, - e + "Failed to get runtime type {:?}! Error: {e}", + CGWRuntimeType::WssRxTx ))); } }; @@ -242,9 +240,8 @@ impl CGWConnectionServer { }, Err(e) => { return Err(Error::ConnectionServer(format!( - "Failed to get runtime type {:?}, err: {}", - CGWRuntimeType::WssRxTx, - e + "Failed to get runtime type {:?}! Error: {e}", + CGWRuntimeType::WssRxTx ))); } }; @@ -261,9 +258,8 @@ impl CGWConnectionServer { }, Err(e) => { return Err(Error::ConnectionServer(format!( - "Failed to get runtime type {:?}, err: {}", - CGWRuntimeType::WssRxTx, - e + "Failed to get runtime type {:?}! Error: {e}", + CGWRuntimeType::WssRxTx ))); } }; @@ -280,9 +276,8 @@ impl CGWConnectionServer { }, Err(e) => { return Err(Error::ConnectionServer(format!( - "Failed to get runtime type {:?}, err: {}", - CGWRuntimeType::WssRxTx, - e + "Failed to get runtime type {:?}! Error: {e}", + CGWRuntimeType::WssRxTx ))); } }; @@ -299,9 +294,8 @@ impl CGWConnectionServer { }, Err(e) => { return Err(Error::ConnectionServer(format!( - "Failed to get runtime type {:?}, err: {}", - CGWRuntimeType::WssRxTx, - e + "Failed to get runtime type {:?}! Error: {e}", + CGWRuntimeType::WssRxTx ))); } }; @@ -316,12 +310,10 @@ impl CGWConnectionServer { Ok(c) => c, Err(e) => { error!( - "Can't create CGW Connection server: NB API client create failed: {:?}", - e + "Can't create CGW Connection server! NB API client create failed! Error: {e}" ); return Err(Error::ConnectionServer(format!( - "Can't create CGW Connection server: NB API client create failed: {:?}", - e + "Can't create CGW Connection server! NB API client create failed! Error: {e}" ))); } }; @@ -330,12 +322,10 @@ impl CGWConnectionServer { Ok(d) => d, Err(e) => { error!( - "Can't create CGW Connection server: Remote Discovery create failed: {:?}", - e + "Can't create CGW Connection server! Remote Discovery create failed! Error: {e}" ); return Err(Error::ConnectionServer(format!( - "Can't create CGW Connection server: Remote Discovery create failed: {:?}", - e, + "Can't create CGW Connection server! Remote Discovery create failed! Error: {e}" ))); } }; @@ -355,13 +345,9 @@ impl CGWConnectionServer { let config_validator = match get_config_validator_fut.await { Ok(res) => res, Err(e) => { - error!( - "Failed to retrieve json config validators: {}", - e.to_string() - ); + error!("Failed to retrieve json config validators! Error: {e}"); return Err(Error::ConnectionServer(format!( - "Failed to retrieve json config validators: {}", - e + "Failed to retrieve json config validators! Error: {e}" ))); } }; @@ -433,7 +419,9 @@ impl CGWConnectionServer { } pub async fn enqueue_mbox_message_to_cgw_server(&self, req: CGWConnectionServerReqMsg) { - let _ = self.mbox_internal_tx.send(req); + if let Err(e) = self.mbox_internal_tx.send(req) { + error!("Failed to send message to CGW server (internal)! Error: {e}"); + } } pub fn enqueue_mbox_message_from_device_to_nb_api_c( @@ -444,7 +432,7 @@ impl CGWConnectionServer { let nb_api_client_clone = self.nb_api_client.clone(); tokio::spawn(async move { let key = group_id.to_string(); - let _ = nb_api_client_clone + nb_api_client_clone .enqueue_mbox_message_from_cgw_server(key, req) .await; }); @@ -455,7 +443,7 @@ impl CGWConnectionServer { pub fn enqueue_mbox_message_from_cgw_to_nb_api(&self, gid: i32, req: String) { let nb_api_client_clone = self.nb_api_client.clone(); self.mbox_nb_api_tx_runtime_handle.spawn(async move { - let _ = nb_api_client_clone + nb_api_client_clone .enqueue_mbox_message_from_cgw_server(gid.to_string(), req) .await; }); @@ -467,7 +455,10 @@ impl CGWConnectionServer { req, CGWConnectionNBAPIReqMsgOrigin::FromRemoteCGW, ); - let _ = self.mbox_relayed_messages_handle.send(msg); + + if let Err(e) = self.mbox_relayed_messages_handle.send(msg) { + error!("Failed to handle relayed message to CGW server! Error: {e}"); + } } fn parse_nbapi_msg(&self, pload: &str) -> Option { @@ -616,8 +607,10 @@ impl CGWConnectionServer { for mac in mac_list.iter() { if let Some(c) = connmap_r_lock.get(mac) { - let _ = c.send(msg.clone()); - debug!("Notified {mac} about GID change (->{new_gid})"); + match c.send(msg.clone()) { + Ok(_) => debug!("Notified {mac} about GID change (->{new_gid})"), + Err(e) => warn!("Failed to send GID change notification! Error: {e}"), + } } } }); @@ -683,7 +676,9 @@ impl CGWConnectionServer { // This is done to ensure that we don't fallback for redis too much, // but still somewhat fully rely on it. // - let _ = self.cgw_remote_discovery.sync_gid_to_cgw_map().await; + if let Err(e) = self.cgw_remote_discovery.sync_gid_to_cgw_map().await { + error!("process_internal_nb_api_mbox: failed to sync GID to CGW map Error: {e}"); + } // TODO: rework to avoid re-allocating these buffers on each loop iteration // (get mut slice of vec / clear when done?) @@ -703,7 +698,9 @@ impl CGWConnectionServer { let gid_numeric = match key.parse::() { Err(e) => { - warn!("Invalid KEY received from KAFKA bus message, ignoring\n{e}"); + warn!( + "Invalid KEY received from KAFKA bus message, ignoring it. Error: {e}" + ); continue; } Ok(v) => v, @@ -712,7 +709,7 @@ impl CGWConnectionServer { let parsed_msg = match self.parse_nbapi_msg(&payload) { Some(val) => val, None => { - warn!("Failed to parse recv msg with key {key}, discarded"); + warn!("Failed to parse recv msg with key {key}, discarded!"); continue; } }; @@ -752,25 +749,22 @@ impl CGWConnectionServer { ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { - error!("Failed to construct infra_group_create message"); + error!("Failed to construct infra_group_create message!"); } } Err(e) => { - warn!( - "Create group gid {gid}, uuid {uuid} request failed, reason: {:?}", - e - ); + warn!("Create group gid {gid}, uuid {uuid} request failed! Error: {e}"); if let Ok(resp) = cgw_construct_infra_group_create_response( gid, String::default(), uuid, false, - Some(format!("Failed to create new group: {:?}", e)), + Some(format!("Failed to create new group! Error: {e}")), ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { - error!("Failed to construct infra_group_create message"); + error!("Failed to construct infra_group_create message!"); } } } @@ -804,7 +798,7 @@ impl CGWConnectionServer { ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { - error!("Failed to construct infra_group_create message"); + error!("Failed to construct infra_group_create message!"); } } Err(e) => { @@ -819,13 +813,12 @@ impl CGWConnectionServer { uuid, false, Some(format!( - "Failed to create new group to shard id {}: {:?}", - shard_id, e + "Failed to create new group to shard id {shard_id}! Error {e}" )), ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { - error!("Failed to construct infra_group_create message"); + error!("Failed to construct infra_group_create message!"); } } } @@ -876,27 +869,26 @@ impl CGWConnectionServer { { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { - error!("Failed to construct infra_group_delete message"); + error!("Failed to construct infra_group_delete message!"); } } Err(e) => { warn!( - "Destroy group gid {gid}, uuid {uuid} request failed, reason: {:?}", - e + "Destroy group gid {gid}, uuid {uuid} request failed! Error: {e}" ); if let Ok(resp) = cgw_construct_infra_group_delete_response( gid, uuid, false, - Some(format!("Failed to delete group: {:?}", e)), + Some(format!("Failed to delete group! Error: {e}")), ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { - error!("Failed to construct infra_group_delete message"); + error!("Failed to construct infra_group_delete message!"); } - warn!("Destroy group gid {gid} received, but it does not exist"); + warn!("Destroy group gid {gid} received, but it does not exist!"); } } // This type of msg is handled in place, not added to buf @@ -949,10 +941,10 @@ impl CGWConnectionServer { ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid_numeric, resp); } else { - error!("Failed to construct device_enqueue message"); + error!("Failed to construct device_enqueue message!"); } - warn!("Received msg for gid {gid_numeric}, while this group is unassigned to any of CGWs: rejecting"); + warn!("Received msg for gid {gid_numeric}, while this group is unassigned to any of CGWs, rejecting!"); } } } @@ -976,7 +968,7 @@ impl CGWConnectionServer { ), ) = msg; debug!( - "Received MSG for remote CGW k:{}, local id {} relaying msg to remote...", + "Received MSG for remote CGW key: {}, local id {}, relaying msg to remote...", key, self_clone.local_cgw_id ); if let Some(v) = remote_cgws_map.get_mut(&key) { @@ -1007,7 +999,7 @@ impl CGWConnectionServer { ) { self_clone.enqueue_mbox_message_from_cgw_to_nb_api(-1, resp); } else { - error!("Failed to construct device_enqueue message"); + error!("Failed to construct device_enqueue message!"); } } }); @@ -1029,14 +1021,14 @@ impl CGWConnectionServer { let gid_numeric = match key.parse::() { Err(e) => { - warn!("Invalid KEY received from KAFKA bus message, ignoring\n{e}"); + warn!("Invalid KEY received from KAFKA bus message, ignoring! Error: {e}"); continue; } Ok(v) => v, }; debug!( - "Received message for local CGW k:{key}, local id {}", + "Received message for local CGW key: {key}, local id {}", self.local_cgw_id ); @@ -1062,10 +1054,10 @@ impl CGWConnectionServer { ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { - error!("Failed to construct infra_group_device_add message"); + error!("Failed to construct infra_group_device_add message!"); } - warn!("Unexpected: tried to add infra list to nonexisting group, gid {gid}, uuid {uuid}"); + warn!("Unexpected: tried to add infra list to nonexisting group, gid {gid}, uuid {uuid}!"); } let devices_cache_lock = self.devices_cache.clone(); @@ -1086,7 +1078,7 @@ impl CGWConnectionServer { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { error!( - "Failed to construct infra_group_device_add message" + "Failed to construct infra_group_device_add message!" ); } } @@ -1119,11 +1111,11 @@ impl CGWConnectionServer { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { error!( - "Failed to construct infra_group_device_add message" + "Failed to construct infra_group_device_add message!" ); } - warn!("Failed to create few MACs from infras list (partial create)"); + warn!("Failed to create few MACs from infras list (partial create)!"); continue; } } @@ -1145,16 +1137,16 @@ impl CGWConnectionServer { mac_list.clone(), uuid, false, - Some(format!("Failed to delete MACs from infra list, gid {gid}, uuid {uuid}: group does not exist.")), + Some(format!("Failed to delete MACs from infra list, gid {gid}, uuid {uuid}: group does not exist")), ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { error!( - "Failed to construct infra_group_device_del message" + "Failed to construct infra_group_device_del message!" ); } - warn!("Unexpected: tried to delete infra list from nonexisting group (gid {gid}, uuid {uuid}"); + warn!("Unexpected: tried to delete infra list from nonexisting group (gid {gid}, uuid {uuid}!"); } let lock = self.devices_cache.clone(); @@ -1177,7 +1169,7 @@ impl CGWConnectionServer { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { error!( - "Failed to construct infra_group_device_del message" + "Failed to construct infra_group_device_del message!" ); } } @@ -1210,11 +1202,11 @@ impl CGWConnectionServer { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { error!( - "Failed to construct infra_group_device_del message" + "Failed to construct infra_group_device_del message!" ); } - warn!("Failed to destroy few MACs from infras list (partial delete)"); + warn!("Failed to destroy few MACs from infras list (partial delete)!"); continue; } } @@ -1235,14 +1227,14 @@ impl CGWConnectionServer { if let Ok(resp) = cgw_construct_device_enqueue_response( uuid, false, - Some(format!("Failed to sink down msg to device of nonexisting group, gid {gid}, uuid {uuid}: group does not exist.")), + Some(format!("Failed to sink down msg to device of nonexisting group, gid {gid}, uuid {uuid}: group does not exist")), ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { - error!("Failed to construct device_enqueue message"); + error!("Failed to construct device_enqueue message!"); } - warn!("Unexpected: tried to sink down msg to device of nonexisting group (gid {gid}, uuid {uuid}"); + warn!("Unexpected: tried to sink down msg to device of nonexisting group (gid {gid}, uuid {uuid}!"); } // 1. Parse message from NB @@ -1268,23 +1260,26 @@ impl CGWConnectionServer { { let queue_lock = CGW_MESSAGES_QUEUE.read().await; - let _ = queue_lock + if let Err(e) = queue_lock .push_device_message( device_mac, queue_msg, ) - .await; + .await + { + error!("Failed to get CGW message queue read lock! Error: {e}"); + } } } Err(e) => { - error!("Failed to validate config message! Invalid configure message for device: {device_mac}"); + error!("Failed to validate config message! Invalid configure message for device: {device_mac}!"); if let Ok(resp) = cgw_construct_device_enqueue_response( uuid, false, - Some(format!("Failed to validate config message! Invalid configure message for device: {device_mac}, uuid {uuid}\n{}", e)), + Some(format!("Failed to validate config message! Invalid configure message for device: {device_mac}, uuid {uuid}\nError: {e}")), ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { - error!("Failed to construct device_enqueue message"); + error!("Failed to construct device_enqueue message!"); } continue; } @@ -1304,9 +1299,9 @@ impl CGWConnectionServer { ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { - error!("Failed to construct device_enqueue message"); + error!("Failed to construct device_enqueue message!"); } - error!("Failed to parse UCentral command"); + error!("Failed to parse UCentral command!"); } } CGWNBApiParsedMsg { @@ -1314,10 +1309,7 @@ impl CGWConnectionServer { gid, msg_type: CGWNBApiParsedMsgType::RebalanceGroups, } => { - debug!( - "Received Rebalance Groups request, gid {}, uuid {}", - uuid, gid - ); + debug!("Received Rebalance Groups request, gid {gid}, uuid {uuid}"); match self.cgw_remote_discovery.rebalance_all_groups().await { Ok(groups_res) => { if let Ok(resp) = cgw_construct_rebalance_group_response( @@ -1325,29 +1317,28 @@ impl CGWConnectionServer { ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { - error!("Failed to construct rebalance_group message"); + error!("Failed to construct rebalance_group message!"); } - debug!("Rebalancing groups completed successfully, # of rebalanced groups {groups_res}"); + debug!("Rebalancing groups completed successfully. Number of rebalanced groups {groups_res}"); } Err(e) => { warn!( - "Rebalance groups uuid {uuid} request failed, reason: {:?}", - e + "Rebalance groups uuid {uuid} request failed! Error: {e}" ); if let Ok(resp) = cgw_construct_rebalance_group_response( gid, uuid, false, - Some(format!("Failed to rebalance groups: {:?}", e)), + Some(format!("Failed to rebalance groups! Error: {e}")), ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); } else { - error!("Failed to construct rebalance_group message"); + error!("Failed to construct rebalance_group message!"); } - warn!("Rebalancing groups failed, error {:?}", e); + warn!("Rebalancing groups failed! Error {e}"); } } } @@ -1358,14 +1349,17 @@ impl CGWConnectionServer { } } } else { - error!("Failed to parse msg from NBAPI (malformed?)"); + error!("Failed to parse msg from NBAPI (malformed?)!"); continue; } } // Do not proceed parsing local / remote msgs untill previous relaying has been // finished - _ = tokio::join!(relay_task_hdl); + match tokio::join!(relay_task_hdl) { + (Ok(_),) => debug!("Successfully completed relay tasks!"), + (Err(e),) => warn!("Failed to complete relay tasks! Error: {e}"), + } buf.clear(); num_of_msg_read = 0; @@ -1431,10 +1425,12 @@ impl CGWConnectionServer { // processing it. if let Some(c) = connmap_w_lock.remove(&device_mac) { tokio::spawn(async move { - warn!("Duplicate connection (mac:{}) detected, closing OLD connection in favor of NEW", device_mac); + warn!("Duplicate connection (mac: {}) detected! Closing OLD connection in favor of NEW!", device_mac); let msg: CGWConnectionProcessorReqMsg = CGWConnectionProcessorReqMsg::AddNewConnectionShouldClose; - let _ = c.send(msg); + if let Err(e) = c.send(msg) { + warn!("Failed to send notification about duplicate connection! Error: {e}") + } }); } else { CGWMetrics::get_ref().change_counter( @@ -1448,7 +1444,7 @@ impl CGWConnectionServer { let conn_processor_mbox_tx_clone = conn_processor_mbox_tx.clone(); info!( - "connmap: connection with {} established, new num_of_connections:{}", + "Connection map: connection with {} established, new num_of_connections: {}", device_mac, connmap_w_lock.len() + 1 ); @@ -1486,7 +1482,7 @@ impl CGWConnectionServer { ) { self.enqueue_mbox_message_from_cgw_to_nb_api(group_id, resp); } else { - error!("Failed to construct foreign_infra_connection message"); + error!("Failed to construct foreign_infra_connection message!"); } debug!("Detected foreign infra {} connection. Group: {}, Group Shard Owner: {}", device_mac.to_hex_string(), group_id, group_owner_id); @@ -1498,11 +1494,11 @@ impl CGWConnectionServer { ) { self.enqueue_mbox_message_from_cgw_to_nb_api(group_id, resp); } else { - error!("Failed to construct unassigned_infra_connection message"); + error!("Failed to construct unassigned_infra_connection message!"); } debug!( - "Detected unassigned infra {} connection.", + "Detected unassigned infra {} connection", device_mac.to_hex_string() ); } @@ -1522,13 +1518,13 @@ impl CGWConnectionServer { ); } else { error!( - "Failed to construct device_capabilities_changed message" + "Failed to construct device_capabilities_changed message!" ); } } None => { debug!( - "Capabilities for device: {} was not changed!", + "Capabilities for device: {} was not changed", device_mac.to_hex_string() ) } @@ -1545,13 +1541,13 @@ impl CGWConnectionServer { self.enqueue_mbox_message_from_cgw_to_nb_api(0, resp); } else { error!( - "Failed to construct device_capabilities_changed message" + "Failed to construct device_capabilities_changed message!" ); } } None => { debug!( - "Capabilities for device: {} was not changed!", + "Capabilities for device: {} was not changed", device_mac.to_hex_string() ) } @@ -1574,11 +1570,11 @@ impl CGWConnectionServer { ) { self.enqueue_mbox_message_from_cgw_to_nb_api(0, resp); } else { - error!("Failed to construct unassigned_infra_connection message"); + error!("Failed to construct unassigned_infra_connection message!"); } debug!( - "Detected unassigned infra {} connection.", + "Detected unassigned infra {} connection", device_mac.to_hex_string() ); } @@ -1595,7 +1591,9 @@ impl CGWConnectionServer { tokio::spawn(async move { let msg: CGWConnectionProcessorReqMsg = CGWConnectionProcessorReqMsg::AddNewConnectionAck(device_group_id); - let _ = conn_processor_mbox_tx_clone.send(msg); + if let Err(e) = conn_processor_mbox_tx_clone.send(msg) { + error!("Failed to send NewConnection message! Error: {e}"); + } }); } else if let CGWConnectionServerReqMsg::ConnectionClosed(device_mac) = msg { let mut device_group_id: i32 = 0; @@ -1605,7 +1603,7 @@ impl CGWConnectionServer { queue_lock.device_disconnected(&device_mac).await; } info!( - "connmap: removed {} serial from connmap, new num_of_connections:{}", + "Connection map: removed {} serial from connmap, new num_of_connections: {}", device_mac, connmap_w_lock.len() - 1 ); @@ -1657,21 +1655,24 @@ impl CGWConnectionServer { Ok(stream) => match cgw_tls_get_cn_from_stream(&stream).await { Ok(cn) => (cn, stream), Err(e) => { - error!("Failed to read client CN. Error: {}", e.to_string()); + error!("Failed to read client CN! Error: {e}"); return; } }, Err(e) => { - error!("Failed to accept connection: Error {}", e); + error!("Failed to accept connection: Error {e}"); return; } }; let allow_mismatch = server_clone.allow_mismatch; let conn_processor = CGWConnectionProcessor::new(server_clone, conn_idx, addr); - let _ = conn_processor + if let Err(e) = conn_processor .start(tls_stream, client_cn, allow_mismatch) - .await; + .await + { + error!("Failed to start connection processor! Error: {e}"); + } }); } diff --git a/src/cgw_db_accessor.rs b/src/cgw_db_accessor.rs index 91ced45..7762246 100644 --- a/src/cgw_db_accessor.rs +++ b/src/cgw_db_accessor.rs @@ -67,7 +67,7 @@ impl CGWDBAccessor { } ); debug!( - "Trying to connect to remote db ({}:{})...\nConn args {}", + "Trying to connect to remote db ({}:{})...\nConnection args: {}", db_args.db_host, db_args.db_port, conn_str ); @@ -76,10 +76,7 @@ impl CGWDBAccessor { let tls = match cgw_tls_create_db_connect().await { Ok(tls_connect) => tls_connect, Err(e) => { - error!( - "Failed to build TLS connection with remote DB, reason: {}", - e.to_string() - ); + error!("Failed to build TLS connection with remote DB! Error: {e}"); return Err(Error::DbAccessor( "Failed to build TLS connection with remote DB", )); @@ -89,14 +86,14 @@ impl CGWDBAccessor { let (db_client, connection) = match tokio_postgres::connect(&conn_str, tls).await { Ok((cl, conn)) => (cl, conn), Err(e) => { - error!("Failed to establish connection with DB, reason: {:?}", e); + error!("Failed to establish connection with DB! Error: {e}"); return Err(Error::DbAccessor("Failed to establish connection with DB")); } }; tokio::spawn(async move { if let Err(e) = connection.await { - let err_msg = format!("Connection to DB broken: {}", e); + let err_msg = format!("Connection to DB broken! Error: {e}"); error!("{}", err_msg); CGWMetrics::get_ref() .change_component_health_status( @@ -112,14 +109,14 @@ impl CGWDBAccessor { let (db_client, connection) = match tokio_postgres::connect(&conn_str, NoTls).await { Ok((cl, conn)) => (cl, conn), Err(e) => { - error!("Failed to establish connection with DB, reason: {:?}", e); + error!("Failed to establish connection with DB! Error: {e}"); return Err(Error::DbAccessor("Failed to establish connection with DB")); } }; tokio::spawn(async move { if let Err(e) = connection.await { - let err_msg = format!("Connection to DB broken: {}", e); + let err_msg = format!("Connection to DB broken! Error: {e}"); error!("{}", err_msg); CGWMetrics::get_ref() .change_component_health_status( @@ -162,7 +159,7 @@ impl CGWDBAccessor { let q = match self.cl.prepare("INSERT INTO infrastructure_groups (id, reserved_size, actual_size) VALUES ($1, $2, $3)").await { Ok(c) => c, Err(e) => { - error!("Failed to prepare query (new infra group) for insertion, reason: {:?}", e); + error!("Failed to prepare query (new infra group) for insertion! Error: {e}"); return Err(Error::DbAccessor("Insert new infra group failed")); } }; @@ -174,11 +171,7 @@ impl CGWDBAccessor { match res { Ok(_n) => Ok(()), Err(e) => { - error!( - "Failed to insert a new infra group {}: {:?}", - g.id, - e.to_string() - ); + error!("Failed to insert a new infra group {}! Error: {}", g.id, e); Err(Error::DbAccessor("Insert new infra group failed")) } } @@ -193,10 +186,7 @@ impl CGWDBAccessor { { Ok(c) => c, Err(e) => { - error!( - "Failed to prepare query (del infra group) for removal, reason: {:?}", - e - ); + error!("Failed to prepare query (del infra group) for removal! Error: {e}"); return Err(Error::DbAccessor("Insert new infra group failed")); } }; @@ -213,7 +203,7 @@ impl CGWDBAccessor { } } Err(e) => { - error!("Failed to delete an infra group {gid}: {:?}", e.to_string()); + error!("Failed to delete an infra group {gid}! Error: {e}"); Err(Error::DbAccessor("Delete infra group failed")) } } @@ -274,7 +264,7 @@ impl CGWDBAccessor { { Ok(c) => c, Err(e) => { - error!("Failed to insert new infra, reason: {:?}", e); + error!("Failed to insert new infra! Error: {e}"); return Err(Error::DbAccessor("Failed to insert new infra")); } }; @@ -286,7 +276,7 @@ impl CGWDBAccessor { match res { Ok(_n) => Ok(()), Err(e) => { - error!("Failed to insert a new infra: {:?}", e.to_string()); + error!("Failed to insert new infra! Error: {e}"); Err(Error::DbAccessor("Insert new infra failed")) } } @@ -296,7 +286,7 @@ impl CGWDBAccessor { let q = match self.cl.prepare("DELETE FROM infras WHERE mac = $1").await { Ok(c) => c, Err(e) => { - error!("Failed to delete infra, reason: {:?}", e); + error!("Failed to delete infra! Error: {e}"); return Err(Error::DbAccessor("Failed to delete infra from DB")); } }; @@ -308,12 +298,12 @@ impl CGWDBAccessor { Ok(()) } else { Err(Error::DbAccessor( - "Failed to delete infra from DB: MAC does not exist", + "Failed to delete infra from DB: MAC does not exist!", )) } } Err(e) => { - error!("Failed to delete infra: {:?}", e.to_string()); + error!("Failed to delete infra! Error: {e}"); Err(Error::DbAccessor("Delete infra failed")) } } @@ -333,7 +323,7 @@ impl CGWDBAccessor { Some(list) } Err(e) => { - error!("Failed to retrieve infras from DB, reason: {:?}", e); + error!("Failed to retrieve infras from DB! Error: {e}"); None } } diff --git a/src/cgw_devices_cache.rs b/src/cgw_devices_cache.rs index dc18be9..dfcc0b3 100644 --- a/src/cgw_devices_cache.rs +++ b/src/cgw_devices_cache.rs @@ -32,10 +32,7 @@ impl CGWDevicesCache { pub fn add_device(&mut self, key: &MacAddress, value: &CGWDevice) -> bool { let status: bool = if self.check_device_exists(key) { - debug!( - "Failed to add device {}. Requested item already exist.", - key - ); + debug!("Failed to add device {}. Requested item already exist", key); false } else { self.cache.insert(*key, value.clone()); @@ -51,7 +48,7 @@ impl CGWDevicesCache { true } else { debug!( - "Failed to del device {}. Requested item does not exist.", + "Failed to del device {}. Requested item does not exist", key ); false @@ -97,7 +94,9 @@ impl CGWDevicesCache { debug!("Cache: {}", json_output); if let Ok(mut fd) = File::create("/var/devices_cache.json") { - let _ = fd.write_all(json_output.as_bytes()); + if let Err(e) = fd.write_all(json_output.as_bytes()) { + error!("Failed to dump CGW device chache data! Error: {e}"); + } } }; } diff --git a/src/cgw_errors.rs b/src/cgw_errors.rs index c8ee229..ee4fda4 100644 --- a/src/cgw_errors.rs +++ b/src/cgw_errors.rs @@ -1,4 +1,5 @@ use derive_more::From; +use eui48::MacAddressFormat; pub type Result = core::result::Result; @@ -45,9 +46,6 @@ pub enum Error { #[from] TokioSync(tokio::sync::TryLockError), - #[from] - Tokiofs(tokio::fs::ReadDir), - #[from] IpAddressParse(std::net::AddrParseError), @@ -80,16 +78,47 @@ pub enum Error { #[from] Tungstenite(tungstenite::Error), - - #[from] - Empty(()), } impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Error::AppArgsParser(message) | Error::Tls(message) => write!(f, "{}", message), - _ => write!(f, "{:?}", self), + Error::AppArgsParser(message) + | Error::Tls(message) + | Error::ConnectionServer(message) + | Error::Runtime(message) + | Error::Redis(message) + | Error::UCentralValidator(message) => write!(f, "{}", message), + Error::ConnectionProcessor(message) + | Error::DbAccessor(message) + | Error::RemoteDiscovery(message) + | Error::UCentralParser(message) + | Error::UCentralMessagesQueue(message) + | Error::StaticStr(message) => write!(f, "{}", message), + Error::Io(io_error) => write!(f, "{}", io_error), + Error::ClientVerifierBuilder(verifier_error) => write!(f, "{}", verifier_error), + Error::TokioPostgres(psql_error) => write!(f, "{}", psql_error), + Error::TokioRustls(rustls_error) => write!(f, "{}", rustls_error), + Error::TokioSync(sync_error) => write!(f, "{}", sync_error), + Error::IpAddressParse(ip_parse_error) => write!(f, "{}", ip_parse_error), + Error::MacAddressParse(mac_parse_error) => write!(f, "{}", mac_parse_error), + Error::ParseInt(int_error) => write!(f, "{}", int_error), + Error::TryFromInt(try_from_int_error) => write!(f, "{}", try_from_int_error), + Error::Prometheus(prometheus_error) => write!(f, "{}", prometheus_error), + Error::SerdeJson(serde_error) => write!(f, "{}", serde_error), + Error::Kafka(kafka_error) => write!(f, "{}", kafka_error), + Error::InvalidUri(uri_error) => write!(f, "{}", uri_error), + Error::Tonic(tonic_error) => write!(f, "{}", tonic_error), + Error::Tungstenite(tungstenite_error) => write!(f, "{}", tungstenite_error), + Error::RemoteDiscoveryFailedInfras(vec) => { + let result = vec + .iter() + .map(|obj| obj.to_string(MacAddressFormat::HexString)) + .collect::>() + .join(", "); + + write!(f, "{}", result) + } } } } diff --git a/src/cgw_metrics.rs b/src/cgw_metrics.rs index af4b0d1..939f619 100644 --- a/src/cgw_metrics.rs +++ b/src/cgw_metrics.rs @@ -154,8 +154,8 @@ impl CGWMetrics { ); tokio::spawn(async move { - if let Err(err) = register_custom_metrics() { - warn!("Failed to register CGW Metrics: {:?}", err); + if let Err(e) = register_custom_metrics() { + warn!("Failed to register CGW Metrics! Error: {e}"); return; }; @@ -254,7 +254,7 @@ impl CGWMetrics { counter.set(1); lock.insert(group_id, counter); } else { - error!("Failed to register GroupInfrasAssignedNum metric for GID {group_id}"); + error!("Failed to register GroupInfrasAssignedNum metric for GID {group_id}!"); } } else { error!("Failed to create GroupInfrasAssignedNum metric for GID {group_id}"); @@ -266,7 +266,7 @@ impl CGWMetrics { let mut lock = GROUP_INFRAS_ASSIGNED_NUM.write().await; if let Some(counter) = lock.remove(&group_id) { if let Err(e) = REGISTRY.unregister(Box::new(counter)) { - error!("Failed to deregister GroupInfrasAssignedNum metric for GID {group_id}. Err: {e}"); + error!("Failed to deregister GroupInfrasAssignedNum metric for GID {group_id}! Error: {e}"); } } } @@ -322,12 +322,12 @@ async fn metrics_handler() -> std::result::Result { let mut buffer = Vec::new(); if let Err(e) = encoder.encode(®ISTRY.gather(), &mut buffer) { - error!("could not encode custom metrics: {}", e); + error!("Could not encode custom metrics! Error: {e}"); }; let mut res = match String::from_utf8(buffer.clone()) { Ok(v) => v, Err(e) => { - error!("custom metrics could not be from_utf8'd: {}", e); + error!("Custom metrics could not be from_utf8'd! Error: {e}"); String::default() } }; @@ -335,12 +335,12 @@ async fn metrics_handler() -> std::result::Result { let mut buffer = Vec::new(); if let Err(e) = encoder.encode(&prometheus::gather(), &mut buffer) { - error!("could not encode prometheus metrics: {}", e); + error!("Could not encode prometheus metrics! Error: {e}"); }; let res_custom = match String::from_utf8(buffer.clone()) { Ok(v) => v, Err(e) => { - error!("prometheus metrics could not be from_utf8'd: {}", e); + error!("Prometheus metrics could not be from_utf8'd! Error: {e}"); String::default() } }; diff --git a/src/cgw_nb_api_listener.rs b/src/cgw_nb_api_listener.rs index 311de42..30d0343 100644 --- a/src/cgw_nb_api_listener.rs +++ b/src/cgw_nb_api_listener.rs @@ -457,7 +457,7 @@ impl CGWCNCConsumer { { Ok(c) => c, Err(e) => { - error!("Failed to create kafka consumer from config: {:?}", e); + error!("Failed to create kafka consumer from config! Error: {e}"); return Err(Error::Kafka(e)); } }; @@ -469,7 +469,7 @@ impl CGWCNCConsumer { if let Err(e) = consumer.subscribe(&CONSUMER_TOPICS) { error!( - "Kafka consumer was unable to subscribe to {:?}", + "Kafka consumer was unable to subscribe to {:?}! Error: {e}", CONSUMER_TOPICS ); return Err(Error::Kafka(e)); @@ -555,7 +555,7 @@ impl CGWNBApiClient { None => "", Some(Ok(s)) => s, Some(Err(e)) => { - warn!("Error while deserializing message payload: {:?}", e); + warn!("Error while deserializing message payload! Error: {e}"); "" } }; @@ -564,7 +564,7 @@ impl CGWNBApiClient { None => "", Some(Ok(s)) => s, Some(Err(e)) => { - warn!("Error while deserializing message payload: {:?}", e); + warn!("Deserializing message payload failed! Error: {e}"); "" } }; @@ -577,7 +577,10 @@ impl CGWNBApiClient { Ok(()) } }); - let _ = stream_processor.await; + + if let Err(e) = stream_processor.await { + error!("Failed to create NB API Client! Error: {e}"); + } } }); @@ -593,17 +596,20 @@ impl CGWNBApiClient { ); if let Err((e, _)) = produce_future.await { - error!("{:?}", e) + error!("{e}") } } async fn enqueue_mbox_message_to_cgw_server(&self, key: String, payload: String) { - debug!("MBOX_OUT: EnqueueNewMessageFromNBAPIListener, k:{key}"); + debug!("MBOX_OUT: EnqueueNewMessageFromNBAPIListener, key: {key}"); let msg = CGWConnectionNBAPIReqMsg::EnqueueNewMessageFromNBAPIListener( key, payload, CGWConnectionNBAPIReqMsgOrigin::FromNBAPI, ); - let _ = self.cgw_server_tx_mbox.send(msg); + + if let Err(e) = self.cgw_server_tx_mbox.send(msg) { + error!("Failed to send message to CGW server (remote)! Error: {e}"); + } } } diff --git a/src/cgw_remote_discovery.rs b/src/cgw_remote_discovery.rs index 5a911ee..8a4fedd 100644 --- a/src/cgw_remote_discovery.rs +++ b/src/cgw_remote_discovery.rs @@ -54,7 +54,7 @@ pub struct CGWREDISDBShard { impl From> for CGWREDISDBShard { fn from(values: Vec) -> Self { if values.len() < REDIS_KEY_SHARD_ID_FIELDS_NUM { - error!("Unexpected size of parsed vector: at least {REDIS_KEY_SHARD_ID_FIELDS_NUM} expected"); + error!("Unexpected size of parsed vector! At least {REDIS_KEY_SHARD_ID_FIELDS_NUM} expected!"); return CGWREDISDBShard::default(); } @@ -183,12 +183,16 @@ async fn cgw_create_redis_client(redis_args: &CGWRedisArgs) -> Result { match redis::Client::build_with_tls(redis_client_info, tls_certs) { Ok(client) => Ok(client), - Err(e) => Err(Error::Redis(format!("Failed to start Redis Client: {}", e))), + Err(e) => Err(Error::Redis(format!( + "Failed to start Redis client! Error: {e}" + ))), } } false => match redis::Client::open(redis_client_info) { Ok(client) => Ok(client), - Err(e) => Err(Error::Redis(format!("Failed to start Redis Client: {}", e))), + Err(e) => Err(Error::Redis(format!( + "Failed to start Redis client! Error: {e}" + ))), }, } } @@ -204,8 +208,7 @@ impl CGWRemoteDiscovery { Ok(c) => c, Err(e) => { error!( - "Can't create CGW Remote Discovery client: Redis client create failed ({:?})", - e + "Can't create CGW Remote Discovery client! Redis client create failed! Error: {e}" ); return Err(Error::RemoteDiscovery("Redis client create failed")); } @@ -221,8 +224,7 @@ impl CGWRemoteDiscovery { Ok(conn) => conn, Err(e) => { error!( - "Can't create CGW Remote Discovery client: Get Redis async connection failed ({})", - e + "Can't create CGW Remote Discovery client! Get Redis async connection failed! Error: {e}" ); return Err(Error::RemoteDiscovery("Redis client create failed")); } @@ -232,8 +234,7 @@ impl CGWRemoteDiscovery { Ok(c) => c, Err(e) => { error!( - "Can't create CGW Remote Discovery client: DB Accessor create failed ({:?})", - e + "Can't create CGW Remote Discovery client! DB Accessor create failed! Error: {e}" ); return Err(Error::RemoteDiscovery("DB Accessor create failed")); } @@ -248,13 +249,13 @@ impl CGWRemoteDiscovery { }; if let Err(e) = rc.sync_gid_to_cgw_map().await { - error!("Can't create CGW Remote Discovery client: Can't pull records data from REDIS (wrong redis host/port?) ({:?})", e); + error!("Can't create CGW Remote Discovery client! Failed to sync GID to CGW map! Error: {e}"); return Err(Error::RemoteDiscovery( "Failed to sync (sync_gid_to_cgw_map) gid to cgw map", )); } if let Err(e) = rc.sync_remote_cgw_map().await { - error!("Can't create CGW Remote Discovery client: Can't pull records data from REDIS (wrong redis host/port?) ({:?})", e); + error!("Can't create CGW Remote Discovery client! Failed to sync remote CGW map! Error: {e}"); return Err(Error::RemoteDiscovery( "Failed to sync (sync_remote_cgw_map) remote CGW info from REDIS", )); @@ -295,7 +296,7 @@ impl CGWRemoteDiscovery { .await; if res.is_err() { warn!( - "Failed to destroy record about shard in REDIS, first launch? ({})", + "Failed to destroy record about shard in REDIS! Error: {}", res.err().unwrap() ); } @@ -306,7 +307,7 @@ impl CGWRemoteDiscovery { .query_async(&mut con) .await; if res.is_err() { - error!("Can't create CGW Remote Discovery client: Failed to create record about shard in REDIS: {}", res.err().unwrap()); + error!("Can't create CGW Remote Discovery client! Failed to create record about shard in REDIS! Error: {}", res.err().unwrap()); return Err(Error::RemoteDiscovery( "Failed to create record about shard in REDIS", )); @@ -315,8 +316,7 @@ impl CGWRemoteDiscovery { if let Err(e) = rc.sync_gid_to_cgw_map().await { error!( - "Can't create CGW Remote Discovery client: Can't pull records data from REDIS: {:?}", - e + "Can't create CGW Remote Discovery client! Failed to sync GID to CGW map! Error: {e}" ); return Err(Error::RemoteDiscovery( "Failed to sync (sync_gid_to_cgw_map) gid to cgw map", @@ -324,8 +324,7 @@ impl CGWRemoteDiscovery { } if let Err(e) = rc.sync_remote_cgw_map().await { error!( - "Can't create CGW Remote Discovery client: Can't pull records data from REDIS: {:?}", - e + "Can't create CGW Remote Discovery client! Failed to sync remope CGW map! Error: {e}" ); return Err(Error::RemoteDiscovery( "Failed to sync (sync_remote_cgw_map) remote CGW info from REDIS", @@ -374,7 +373,7 @@ impl CGWRemoteDiscovery { .await { Err(e) => { - error!("Failed to sync gid to cgw map:\n{}", e); + error!("Failed to sync gid to cgw map! Error: {e}"); return Err(Error::RemoteDiscovery("Failed to get KEYS list from REDIS")); } Ok(keys) => keys, @@ -389,7 +388,7 @@ impl CGWRemoteDiscovery { { Ok(gid) => gid, Err(e) => { - warn!("Found proper key '{key}' entry, but failed to fetch GID from it:\n{e}"); + warn!("Found proper key '{key}' entry, but failed to fetch GID from it! Error: {e}"); continue; } }; @@ -402,7 +401,7 @@ impl CGWRemoteDiscovery { { Ok(shard_id) => shard_id, Err(e) => { - warn!("Found proper key '{key}' entry, but failed to fetch SHARD_ID from it:\n{e}"); + warn!("Found proper key '{key}' entry, but failed to fetch SHARD_ID from it! Error: {e}"); continue; } }; @@ -412,7 +411,7 @@ impl CGWRemoteDiscovery { match lock.insert(gid, shard_id) { None => continue, Some(_v) => warn!( - "Populated gid_to_cgw_map with previous value being alerady set, unexpected" + "Populated gid_to_cgw_map with previous value being alerady set, unexpected!" ), } } @@ -472,8 +471,7 @@ impl CGWRemoteDiscovery { Ok(keys) => keys, Err(e) => { error!( - "Can't sync remote CGW map: Failed to get shard record in REDIS: {}", - e + "Can't sync remote CGW map! Failed to get shard record in REDIS! Error: {e}" ); return Err(Error::RemoteDiscovery("Failed to get KEYS list from REDIS")); } @@ -487,7 +485,7 @@ impl CGWRemoteDiscovery { Ok(res) => { let shrd: CGWREDISDBShard = CGWREDISDBShard::from(res); if shrd == CGWREDISDBShard::default() { - warn!("Failed to parse CGWREDISDBShard, {key}"); + warn!("Failed to parse CGWREDISDBShard, key: {key}!"); continue; } @@ -502,7 +500,7 @@ impl CGWRemoteDiscovery { lock.insert(cgw_iface.shard.id, cgw_iface); } Err(e) => { - warn!("Found proper key '{key}' entry, but failed to fetch Shard info from it:\n{e}"); + warn!("Found proper key '{key}' entry, but failed to fetch Shard info from it! Error: {e}"); continue; } } @@ -523,7 +521,9 @@ impl CGWRemoteDiscovery { } // then try to use redis - let _ = self.sync_gid_to_cgw_map().await; + if let Err(e) = self.sync_gid_to_cgw_map().await { + error!("Failed to sync GID to CGW map! Error: {e}"); + } if let Some(id) = self.gid_to_cgw_cache.read().await.get(&gid) { return Some(*id); @@ -544,7 +544,7 @@ impl CGWRemoteDiscovery { .await; if res.is_err() { error!( - "Failed to increment assigned groups number:\n{}", + "Failed to increment assigned groups number! Error: {}", res.err().unwrap() ); return Err(Error::RemoteDiscovery( @@ -573,7 +573,7 @@ impl CGWRemoteDiscovery { .await; if res.is_err() { error!( - "Failed to decrement assigned groups number:\n{}", + "Failed to decrement assigned groups number! Error: {}", res.err().unwrap() ); return Err(Error::RemoteDiscovery( @@ -591,19 +591,23 @@ impl CGWRemoteDiscovery { Ok(()) } - async fn increment_group_assigned_infras_num(&self, gid: i32) -> Result<()> { + async fn increment_group_assigned_infras_num( + &self, + gid: i32, + incremet_value: i32, + ) -> Result<()> { debug!("Incrementing assigned infras num group_id_{gid}"); let mut con = self.redis_client.clone(); let res: RedisResult<()> = redis::cmd("HINCRBY") .arg(format!("{}{gid}", REDIS_KEY_GID)) .arg(REDIS_KEY_GID_VALUE_INFRAS_ASSIGNED) - .arg("1") + .arg(&incremet_value.to_string()) .query_async(&mut con) .await; if res.is_err() { error!( - "Failed to increment assigned infras number:\n{}", + "Failed to increment assigned infras number! Error: {}", res.err().unwrap() ); return Err(Error::RemoteDiscovery( @@ -622,19 +626,23 @@ impl CGWRemoteDiscovery { Ok(()) } - async fn decrement_group_assigned_infras_num(&self, gid: i32) -> Result<()> { + async fn decrement_group_assigned_infras_num( + &self, + gid: i32, + decremet_value: i32, + ) -> Result<()> { debug!("Decrementing assigned infras num group_id_{gid}"); let mut con = self.redis_client.clone(); let res: RedisResult<()> = redis::cmd("HINCRBY") .arg(format!("{}{gid}", REDIS_KEY_GID)) .arg(REDIS_KEY_GID_VALUE_INFRAS_ASSIGNED) - .arg("-1") + .arg(&(-decremet_value).to_string()) .query_async(&mut con) .await; if res.is_err() { error!( - "Failed to decrement assigned infras number:\n{}", + "Failed to decrement assigned infras number! Error: {}", res.err().unwrap() ); return Err(Error::RemoteDiscovery( @@ -666,7 +674,7 @@ impl CGWRemoteDiscovery { for x in hash_vec { let max_capacity: i32 = x.1.shard.capacity + x.1.shard.threshold; if x.1.shard.assigned_groups_num < max_capacity { - debug!("Found CGW shard to assign group to (id {})", x.1.shard.id); + debug!("Found CGW shard to assign group to id {}", x.1.shard.id); return Ok(x.1.shard.id); } } @@ -683,7 +691,7 @@ impl CGWRemoteDiscovery { Some(instance) => { let max_capacity: i32 = instance.shard.capacity + instance.shard.threshold; if instance.shard.assigned_groups_num < max_capacity { - debug!("Found CGW shard to assign group to (id {})", shard_id); + debug!("Found CGW shard to assign group to id {}", shard_id); Ok(shard_id) } else { Err(Error::RemoteDiscovery( @@ -705,11 +713,13 @@ impl CGWRemoteDiscovery { infras_assigned: i32, ) -> Result { // Delete key (if exists), recreate with new owner - let _ = self.deassign_infra_group_to_cgw(gid).await; + if let Err(e) = self.deassign_infra_group_to_cgw(gid).await { + error!("destroy_infra_group: failed to deassign infra group to CGW! Error: {e}"); + } // Sync CGWs to get lates data if let Err(e) = self.sync_remote_cgw_map().await { - error!("Can't create CGW Remote Discovery client: Can't pull records data from REDIS (wrong redis host/port?) ({:?})", e); + error!("Can't create CGW Remote Discovery client! Failed to sync remote CGW map! Error: {e}"); return Err(Error::RemoteDiscovery( "Failed to sync remote CGW info from REDIS", )); @@ -739,7 +749,7 @@ impl CGWRemoteDiscovery { if res.is_err() { error!( - "Failed to assign infra group {} to cgw {}:\n{}", + "Failed to assign infra group {} to cgw {}! Error: {}", gid, dst_cgw_id, res.err().unwrap() @@ -765,7 +775,7 @@ impl CGWRemoteDiscovery { if res.is_err() { error!( - "Failed to deassign infra group {}:\n{}", + "Failed to deassign infra group {}! Error: {}", gid, res.err().unwrap() ); @@ -795,14 +805,15 @@ impl CGWRemoteDiscovery { { Ok(v) => v, Err(e) => { - error!("Assign group to CGW shard failed! Err: {}", e.to_string()); - let _ = self.db_accessor.delete_infra_group(g.id).await; + error!("Assign group to CGW shard failed! Error: {e}"); + if let Err(e) = self.db_accessor.delete_infra_group(g.id).await { + error!("Assign group to CGW shard failed! Failed to delete infra group ID {}! Error: {e}", g.id); + } return Err(e); } }; - let rc = self.increment_cgw_assigned_groups_num(shard_id).await; - rc?; + self.increment_cgw_assigned_groups_num(shard_id).await?; Ok(shard_id) } @@ -814,8 +825,12 @@ impl CGWRemoteDiscovery { ) -> Result<()> { let cgw_id: Option = self.get_infra_group_owner_id(gid).await; if let Some(id) = cgw_id { - let _ = self.deassign_infra_group_to_cgw(gid).await; - let _ = self.decrement_cgw_assigned_groups_num(id).await; + if let Err(e) = self.deassign_infra_group_to_cgw(gid).await { + error!("destroy_infra_group: failed to deassign infra group to CGW! Error: {e}"); + } + if let Err(e) = self.decrement_cgw_assigned_groups_num(id).await { + error!("destroy_infra_group: failed to decrement assigned groups num to CGW! Error: {e}"); + } } //TODO: transaction-based insert/assigned_group_num update (DB) @@ -856,7 +871,7 @@ impl CGWRemoteDiscovery { let infras_capacity = match self.get_group_infras_capacity(gid).await { Ok(capacity) => capacity, Err(e) => { - error!("Failed to create infreas list: {}", e.to_string()); + error!("Failed to create infras list! Error: {e}"); return Err(Error::RemoteDiscoveryFailedInfras(infras)); } }; @@ -864,13 +879,13 @@ impl CGWRemoteDiscovery { let infras_assigned = match self.get_group_infras_assigned_num(gid).await { Ok(assigned) => assigned, Err(e) => { - error!("Failed to create infreas list: {}", e.to_string()); + error!("Failed to create infras list! Error: {e}"); return Err(Error::RemoteDiscoveryFailedInfras(infras)); } }; if infras.len() as i32 + infras_assigned > infras_capacity { - error!("Failed to create infras list - GID {gid} has no enough capacity"); + error!("Failed to create infras list - GID {gid} has no enough capacity!"); return Err(Error::RemoteDiscoveryFailedInfras(infras)); } @@ -891,6 +906,7 @@ impl CGWRemoteDiscovery { })); } + let mut assigned_infras_num: i32 = 0; for (i, future) in futures.iter_mut().enumerate() { match future.await { Ok(res) => { @@ -915,9 +931,7 @@ impl CGWRemoteDiscovery { ), ); } - - // Update assigned infras num - let _ = self.increment_group_assigned_infras_num(gid).await; + assigned_infras_num += 1; } } Err(_) => { @@ -926,6 +940,14 @@ impl CGWRemoteDiscovery { } } + // Update assigned infras num + if let Err(e) = self + .increment_group_assigned_infras_num(gid, assigned_infras_num) + .await + { + error!("create_ifras_list: failed to decrement assigned infras num! Error: {e}"); + } + if !failed_infras.is_empty() { return Err(Error::RemoteDiscoveryFailedInfras(failed_infras)); } @@ -955,6 +977,7 @@ impl CGWRemoteDiscovery { })); } + let mut removed_infras: i32 = 0; for (i, future) in futures.iter_mut().enumerate() { match future.await { Ok(res) => { @@ -971,8 +994,7 @@ impl CGWRemoteDiscovery { devices_cache.del_device(&device_mac); } } - // Update assigned infras num - let _ = self.decrement_group_assigned_infras_num(gid).await; + removed_infras += 1; } } Err(_) => { @@ -981,6 +1003,14 @@ impl CGWRemoteDiscovery { } } + // Update assigned infras num + if let Err(e) = self + .decrement_group_assigned_infras_num(gid, removed_infras) + .await + { + error!("destroy_ifras_list: failed to decrement assigned infras num! Error: {e}"); + } + if !failed_infras.is_empty() { return Err(Error::RemoteDiscoveryFailedInfras(failed_infras)); } @@ -995,10 +1025,10 @@ impl CGWRemoteDiscovery { ) -> Result<()> { // try to use internal cache first if let Some(cl) = self.remote_cgws_map.read().await.get(&shard_id) { - if let Err(_e) = cl.client.relay_request_stream(stream).await { + if let Err(e) = cl.client.relay_request_stream(stream).await { error!( - "Failed to relay message. CGW{} seems to be unreachable at [{}:{}]", - shard_id, cl.shard.server_host, cl.shard.server_port + "Failed to relay message! CGW{} seems to be unreachable at [{}:{}]! Error: {}", + shard_id, cl.shard.server_host, cl.shard.server_port, e ); } @@ -1006,30 +1036,33 @@ impl CGWRemoteDiscovery { } // then try to use redis - let _ = self.sync_remote_cgw_map().await; + if let Err(e) = self.sync_remote_cgw_map().await { + error!("relay_request_stream_to_remote_cgw: failed to sync remote CGW map! Error: {e}"); + } + if let Some(cl) = self.remote_cgws_map.read().await.get(&shard_id) { - if let Err(_e) = cl.client.relay_request_stream(stream).await { + if let Err(e) = cl.client.relay_request_stream(stream).await { error!( - "Failed to relay message. CGW{} seems to be unreachable at [{}:{}]", - shard_id, cl.shard.server_host, cl.shard.server_port + "Failed to relay message! CGW{} seems to be unreachable at [{}:{}]. Error: {}", + shard_id, cl.shard.server_host, cl.shard.server_port, e ); } return Ok(()); } - error!("No suitable CGW instance #{shard_id} was discovered, cannot relay msg"); + error!("No suitable CGW instance #{shard_id} was discovered, cannot relay msg!"); Err(Error::RemoteDiscovery( "No suitable CGW instance was discovered, cannot relay msg", )) } pub async fn rebalance_all_groups(&self) -> Result { - warn!("Executing group rebalancing procedure"); + warn!("Executing group rebalancing procedure!"); let groups = match self.db_accessor.get_all_infra_groups().await { Some(list) => list, None => { - warn!("Tried to execute rebalancing when 0 groups created in DB"); + warn!("Tried to execute rebalancing when 0 groups created in DB!"); return Err(Error::RemoteDiscovery( "Cannot do rebalancing due to absence of any groups created in DB", )); @@ -1049,20 +1082,24 @@ impl CGWRemoteDiscovery { .await; if res.is_err() { warn!( - "Failed to reset CGW{cgw_id} assigned group num count, e:{}", + "Failed to reset CGW{cgw_id} assigned group num count! Error: {}", res.err().unwrap() ); } } for i in groups.iter() { - let _ = self.sync_remote_cgw_map().await; - let _ = self.sync_gid_to_cgw_map().await; + if let Err(e) = self.sync_remote_cgw_map().await { + error!("rebalance_all_groups: failed to sync remote CGW map! Error: {e}"); + } + if let Err(e) = self.sync_gid_to_cgw_map().await { + error!("rebalance_all_groups: failed to sync GID to CGW map! Error: {e}"); + } let infras_assigned: i32 = match self.get_group_infras_assigned_num(i.id).await { Ok(infras_num) => infras_num, Err(e) => { - warn!("Cannot execute rebalancing: {}", e.to_string()); + warn!("Failed to execute rebalancing! Error: {e}"); return Err(Error::RemoteDiscovery( "Cannot do rebalancing due to absence of any groups created in DB", )); @@ -1074,15 +1111,21 @@ impl CGWRemoteDiscovery { .await { Ok(shard_id) => { - debug!("Rebalancing: assigned {} to shard {}", i.id, shard_id); - let _ = self.increment_cgw_assigned_groups_num(shard_id).await; + debug!("Rebalancing: assigned gid {} to shard {}", i.id, shard_id); + if let Err(e) = self.increment_cgw_assigned_groups_num(shard_id).await { + error!("rebalance_all_groups: failed to increment assigned groups num! Error: {e}"); + } } Err(_e) => {} } } - let _ = self.sync_remote_cgw_map().await; - let _ = self.sync_gid_to_cgw_map().await; + if let Err(e) = self.sync_remote_cgw_map().await { + error!("rebalance_all_groups: failed to sync remote CGW map! Error: {e}"); + } + if let Err(e) = self.sync_gid_to_cgw_map().await { + error!("rebalance_all_groups: failed to sync GID to CGW! Error: {e}"); + } Ok(0u32) } @@ -1091,13 +1134,23 @@ impl CGWRemoteDiscovery { debug!("Remove from Redis shard id {}", self.local_shard_id); // We are on de-init stage - ignore any errors on Redis clean-up let mut con = self.redis_client.clone(); - let _res: RedisResult<()> = redis::cmd("DEL") + let res: RedisResult<()> = redis::cmd("DEL") .arg(format!( "{REDIS_KEY_SHARD_ID_PREFIX}{}", self.local_shard_id )) .query_async(&mut con) .await; + match res { + Ok(_) => info!( + "Successfully cleaned up Redis for shard id {}", + self.local_shard_id + ), + Err(e) => error!( + "Failed to cleanup Redis for shard id {}! Error: {}", + self.local_shard_id, e + ), + } } pub async fn get_group_infras_capacity(&self, gid: i32) -> Result { @@ -1111,7 +1164,7 @@ impl CGWRemoteDiscovery { { Ok(cap) => cap, Err(e) => { - warn!("Failed to get infras capacity for GID {gid}:\n{e}"); + warn!("Failed to get infras capacity for GID {gid}! Ereor: {e}"); return Err(Error::RemoteDiscovery("Failed to get infras capacity")); } }; @@ -1130,7 +1183,7 @@ impl CGWRemoteDiscovery { { Ok(cap) => cap, Err(e) => { - warn!("Failed to get infras assigned number for GID {gid}:\n{e}"); + warn!("Failed to get infras assigned number for GID {gid}! Error: {e}"); return Err(Error::RemoteDiscovery( "Failed to get group infras assigned number", )); diff --git a/src/cgw_remote_server.rs b/src/cgw_remote_server.rs index b064389..21c77e5 100644 --- a/src/cgw_remote_server.rs +++ b/src/cgw_remote_server.rs @@ -76,8 +76,10 @@ impl CGWRemoteServer { self.cfg.remote_id, self.cfg.server_ip, self.cfg.server_port ); - let res = grpc_srv.serve(self.cfg.to_socket_addr()).await; - error!("grpc server returned {:?}", res); - // end of GRPC server build / start declaration + if let Err(e) = grpc_srv.serve(self.cfg.to_socket_addr()).await { + error!("gRPC server failed! Error: {e}"); + }; + + // end of gRPC server build / start declaration } } diff --git a/src/cgw_runtime.rs b/src/cgw_runtime.rs index 8349fbc..5542923 100644 --- a/src/cgw_runtime.rs +++ b/src/cgw_runtime.rs @@ -84,8 +84,7 @@ pub fn cgw_initialize_runtimes(wss_t_num: usize) -> Result<()> { Ok(runtimes_lock) => runtimes_lock, Err(e) => { return Err(Error::Runtime(format!( - "Failed to get runtimes lock: {}", - e + "Failed to get runtimes lock! Error: {e}" ))); } }; @@ -105,8 +104,7 @@ pub fn cgw_get_runtime(runtime_type: CGWRuntimeType) -> Result runtimes_lock, Err(e) => { return Err(Error::Runtime(format!( - "Failed to get runtimes lock: {}", - e + "Failed to get runtimes lock! Error: {e}" ))); } }; diff --git a/src/cgw_tls.rs b/src/cgw_tls.rs index 7e011c3..87ed0c8 100644 --- a/src/cgw_tls.rs +++ b/src/cgw_tls.rs @@ -25,8 +25,7 @@ pub async fn cgw_tls_read_certs(cert_file: &str) -> Result f, Err(e) => { return Err(Error::Tls(format!( - "Failed to open TLS certificate file: {}. Error: {}", - cert_file, e + "Failed to open TLS certificate file: {cert_file}! Error: {e}" ))); } }; @@ -41,8 +40,7 @@ pub async fn cgw_tls_read_private_key(private_key_file: &str) -> Result f, Err(e) => { return Err(Error::Tls(format!( - "Failed to open TLS private key file: {}. Error: {}", - private_key_file, e + "Failed to open TLS private key file: {private_key_file}! Error: {e}" ))); } }; @@ -58,8 +56,7 @@ pub async fn cgw_tls_read_private_key(private_key_file: &str) -> Result Err(Error::Tls(format!( - "Failed to read private key from file: {}. Error: {}", - private_key_file, e + "Failed to read private key from file: {private_key_file}! Error: {e}" ))), } } @@ -91,8 +88,7 @@ pub async fn cgw_tls_get_cn_from_stream(stream: &TlsStream) -> Result Ok(mac) => return Ok(mac), Err(e) => { return Err(Error::Tls(format!( - "Failed to parse clien CN/MAC. Error: {}", - e + "Failed to parse clien CN/MAC! Error: {e}" ))) } }; @@ -101,13 +97,12 @@ pub async fn cgw_tls_get_cn_from_stream(stream: &TlsStream) -> Result } Err(e) => { return Err(Error::Tls(format!( - "Failed to read peer comman name. Error: {}", - e + "Failed to read peer common name (CN)! Error: {e}" ))); } } - Err(Error::Tls("Failed to read peer comman name!".to_string())) + Err(Error::Tls("Failed to read peer common name!".to_string())) } pub async fn cgw_tls_create_acceptor(wss_args: &CGWWSSArgs) -> Result { @@ -116,7 +111,7 @@ pub async fn cgw_tls_create_acceptor(wss_args: &CGWWSSArgs) -> Result cas_pem, Err(e) => { - error!("{}", e.to_string()); + error!("{e}"); return Err(e); } }; @@ -126,7 +121,7 @@ pub async fn cgw_tls_create_acceptor(wss_args: &CGWWSSArgs) -> Result cert_pem, Err(e) => { - error!("{}", e.to_string()); + error!("{e}"); return Err(e); } }; @@ -137,7 +132,7 @@ pub async fn cgw_tls_create_acceptor(wss_args: &CGWWSSArgs) -> Result pkey, Err(e) => { - error!("{}", e.to_string()); + error!("{e}"); return Err(e); } }; @@ -149,7 +144,7 @@ pub async fn cgw_tls_create_acceptor(wss_args: &CGWWSSArgs) -> Result verifier, Err(e) => { - error!("Failed to build client verifier: {}", e.to_string()); + error!("Failed to build client verifier! Error: {e}"); return Err(Error::Tls("Failed to build client verifier!".to_string())); } }; @@ -161,7 +156,7 @@ pub async fn cgw_tls_create_acceptor(wss_args: &CGWWSSArgs) -> Result server_config, Err(e) => { - error!("Failed to build server config: {}", e.to_string()); + error!("Failed to build server config! Error: {e}"); return Err(Error::Tls("Failed to build server config!".to_string())); } }; @@ -198,7 +193,9 @@ pub async fn cgw_get_root_certs_store() -> Result { let certs = rustls_pemfile::certs(buf); let mut root_cert_store = rustls::RootCertStore::empty(); for cert in certs.flatten() { - let _r = root_cert_store.add(cert); + if let Err(e) = root_cert_store.add(cert.clone()) { + error!("Failed do add cert {:?} to root store! Error: {e}", cert); + } } Ok(root_cert_store) diff --git a/src/cgw_ucentral_ap_parser.rs b/src/cgw_ucentral_ap_parser.rs index 4d64d8a..7cc55d8 100644 --- a/src/cgw_ucentral_ap_parser.rs +++ b/src/cgw_ucentral_ap_parser.rs @@ -122,7 +122,7 @@ fn parse_wireless_clients_data( if let Value::String(port) = &ssid["iface"] { port.clone() } else { - warn!("Failed to retrieve local_port for {:?}", ssid); + warn!("Failed to retrieve local_port for {:?}!", ssid); continue; } }; @@ -133,7 +133,7 @@ fn parse_wireless_clients_data( } if !ssid.contains_key("associations") { - warn!("Failed to retrieve associations for {local_port}"); + warn!("Failed to retrieve associations for local port {local_port}!"); continue; } @@ -157,7 +157,7 @@ fn parse_wireless_clients_data( if let Some(v) = ssids_map.get(&bssid_value) { (v.0.clone(), v.1.clone()) } else { - warn!("Failed to get ssid/band value for {bssid_value}"); + warn!("Failed to get ssid/band value for {bssid_value}!"); continue; } }; @@ -219,14 +219,14 @@ fn parse_wired_clients_data( Some(s) => s.to_string(), None => { warn!( - "Failed to get clients port string for {:?}, skipping", + "Failed to get clients port string for {:?}, skipping!", client ); continue; } } } else { - warn!("Failed to parse clients port for {:?}, skipping", client); + warn!("Failed to parse clients port for {:?}, skipping!", client); continue; } }; @@ -336,7 +336,7 @@ fn parse_state_event_data( let decoded_data = match BASE64_STANDARD.decode(compressed_data) { Ok(d) => d, Err(e) => { - warn!("Failed to decode base64+zip state evt {e}"); + warn!("Failed to decode base64+zip state evt! Error: {e}"); return Err(Error::UCentralParser( "Failed to decode base64+zip state evt", )); @@ -345,7 +345,7 @@ fn parse_state_event_data( let mut d = ZlibDecoder::new(&decoded_data[..]); let mut unzipped_data = String::new(); if let Err(e) = d.read_to_string(&mut unzipped_data) { - warn!("Failed to decompress decrypted state message {e}"); + warn!("Failed to decompress decrypted state message! Error: {e}"); return Err(Error::UCentralParser( "Failed to decompress decrypted state message", )); @@ -354,7 +354,7 @@ fn parse_state_event_data( let state_map: CGWUCentralJRPCMessage = match serde_json::from_str(&unzipped_data) { Ok(m) => m, Err(e) => { - error!("Failed to parse input state message {e}"); + error!("Failed to parse input state message! Error: {e}"); return Err(Error::UCentralParser("Failed to parse input state message")); } }; @@ -541,7 +541,7 @@ fn parse_realtime_event_data( }; if events.len() < 2 { - warn!("Received malformed event: number of event values < 2"); + warn!("Received malformed event: number of event values < 2!"); return Err(Error::UCentralParser( "Received malformed event: number of event values < 2", )); @@ -552,14 +552,14 @@ fn parse_realtime_event_data( match &events[0] { Value::Number(ts) => { if ts.as_i64().is_none() { - warn!("Received malformed event: missing timestamp"); + warn!("Received malformed event: missing timestamp!"); return Err(Error::UCentralParser( "Received malformed event: missing timestamp", )); } } _ => { - warn!("Received malformed event: missing timestamp"); + warn!("Received malformed event: missing timestamp!"); return Err(Error::UCentralParser( "Received malformed event: missing timestamp", )); @@ -569,7 +569,7 @@ fn parse_realtime_event_data( let event_data = match &events[1] { Value::Object(v) => v, _ => { - warn!("Received malformed event: missing timestamp"); + warn!("Received malformed event: missing timestamp!"); return Err(Error::UCentralParser( "Received malformed event: missing timestamp", )); @@ -577,7 +577,7 @@ fn parse_realtime_event_data( }; if !event_data.contains_key("type") { - warn!("Received malformed event: missing type"); + warn!("Received malformed event: missing type!"); return Err(Error::UCentralParser( "Received malformed event: missing type", )); @@ -586,7 +586,7 @@ fn parse_realtime_event_data( let evt_type = match &event_data["type"] { Value::String(t) => t, _ => { - warn!("Received malformed event: type is of wrongful underlying format/type"); + warn!("Received malformed event: type is of wrongful underlying format/type!"); return Err(Error::UCentralParser( "Received malformed event: type is of wrongful underlying format/type", )); @@ -596,7 +596,7 @@ fn parse_realtime_event_data( let evt_payload = match &event_data["payload"] { Value::Object(d) => d, _ => { - warn!("Received malformed event: payload is of wrongful underlying format/type"); + warn!("Received malformed event: payload is of wrongful underlying format/type!"); return Err(Error::UCentralParser( "Received malformed event: payload is of wrongful underlying format/type", )); @@ -611,7 +611,7 @@ fn parse_realtime_event_data( || !evt_payload.contains_key("rssi") || !evt_payload.contains_key("channel") { - warn!("Received malformed client.join event: band, rssi, ssid, channel and client are required"); + warn!("Received malformed client.join event: band, rssi, ssid, channel and client are required!"); return Err(Error::UCentralParser("Received malformed client.join event: band, rssi, ssid, channel and client are required")); } @@ -619,7 +619,7 @@ fn parse_realtime_event_data( match &evt_payload["band"] { Value::String(s) => s, _ => { - warn!("Received malformed client.join event: band is of wrongful underlying format/type"); + warn!("Received malformed client.join event: band is of wrongful underlying format/type!"); return Err(Error::UCentralParser( "Received malformed client.join event: band is of wrongful underlying format/type", )); @@ -631,7 +631,7 @@ fn parse_realtime_event_data( Value::String(s) => match MacAddress::from_str(s.as_str()) { Ok(v) => v, Err(_) => { - warn!("Received malformed client.join event: client is a malformed MAC address"); + warn!("Received malformed client.join event: client is a malformed MAC address!"); return Err(Error::UCentralParser( "Received malformed client.join event: client is a malformed MAC address", )); @@ -649,7 +649,7 @@ fn parse_realtime_event_data( match &evt_payload["ssid"] { Value::String(s) => s, _ => { - warn!("Received malformed client.join event: ssid is of wrongful underlying format/type"); + warn!("Received malformed client.join event: ssid is of wrongful underlying format/type!"); return Err(Error::UCentralParser( "Received malformed client.join event: ssid is of wrongful underlying format/type", )); @@ -668,7 +668,7 @@ fn parse_realtime_event_data( } }, _ => { - warn!("Received malformed client.join event: rssi is of wrongful underlying format/type"); + warn!("Received malformed client.join event: rssi is of wrongful underlying format/type!"); return Err(Error::UCentralParser( "Received malformed client.join event: rssi is of wrongful underlying format/type", )); @@ -687,7 +687,7 @@ fn parse_realtime_event_data( } }, _ => { - warn!("Received malformed client.join event: channel is of wrongful underlying format/type"); + warn!("Received malformed client.join event: channel is of wrongful underlying format/type!"); return Err(Error::UCentralParser( "Received malformed client.join event: channel is of wrongful underlying format/type", )); @@ -719,7 +719,7 @@ fn parse_realtime_event_data( || !evt_payload.contains_key("client") || !evt_payload.contains_key("connected_time") { - warn!("Received malformed client.leave event: client, band and connected_time is required"); + warn!("Received malformed client.leave event: client, band and connected_time is required!"); return Err(Error::UCentralParser("Received malformed client.leave event: client, band and connected_time is required")); } @@ -727,7 +727,7 @@ fn parse_realtime_event_data( match &evt_payload["band"] { Value::String(s) => s, _ => { - warn!("Received malformed client.leave event: band is of wrongful underlying format/type"); + warn!("Received malformed client.leave event: band is of wrongful underlying format/type!"); return Err(Error::UCentralParser( "Received malformed client.leave event: band is of wrongful underlying format/type", )); @@ -739,14 +739,14 @@ fn parse_realtime_event_data( Value::String(s) => match MacAddress::from_str(s.as_str()) { Ok(v) => v, Err(_) => { - warn!("Received malformed client.leave event: client is a malformed MAC address"); + warn!("Received malformed client.leave event: client is a malformed MAC address!"); return Err(Error::UCentralParser( "Received malformed client.leave event: client is a malformed MAC address", )); } }, _ => { - warn!("Received malformed client.leave event: client is of wrongful underlying format/type"); + warn!("Received malformed client.leave event: client is of wrongful underlying format/type!"); return Err(Error::UCentralParser( "Received malformed client.leave event: client is of wrongful underlying format/type", )); @@ -765,7 +765,7 @@ fn parse_realtime_event_data( } }, _ => { - warn!("Received malformed client.leave event: connected_time is of wrongful underlying format/type"); + warn!("Received malformed client.leave event: connected_time is of wrongful underlying format/type!"); return Err(Error::UCentralParser( "Received malformed client.leave event: connected_time is of wrongful underlying format/type", )); @@ -795,7 +795,7 @@ fn parse_realtime_event_data( }) } _ => { - warn!("Received unknown event: {evt_type}"); + warn!("Received unknown event: {evt_type}!"); Err(Error::UCentralParser("Received unknown event")) } } @@ -809,19 +809,19 @@ pub fn cgw_ucentral_ap_parse_message( let map: CGWUCentralJRPCMessage = match serde_json::from_str(message) { Ok(m) => m, Err(e) => { - error!("Failed to parse input json {e}"); + error!("Failed to parse input json! Error: {e}"); return Err(Error::UCentralParser("Failed to parse input json")); } }; if map.contains_key("method") { let method = map["method"].as_str().ok_or_else(|| { - warn!("Received malformed JSONRPC msg"); + warn!("Received malformed JSONRPC msg!"); Error::UCentralParser("JSONRPC field is missing in message") })?; if method == "log" { let params = map.get("params").ok_or_else(|| { - warn!("Received JRPC without params."); + warn!("Received JRPC without params!"); Error::UCentralParser("Received JRPC without params") })?; let serial = MacAddress::from_str( @@ -882,7 +882,7 @@ pub fn cgw_ucentral_ap_parse_message( } } else if map.contains_key("result") { if !map.contains_key("id") { - warn!("Received JRPC without id."); + warn!("Received JRPC without id!"); return Err(Error::UCentralParser("Received JRPC without id")); } diff --git a/src/cgw_ucentral_messages_queue_manager.rs b/src/cgw_ucentral_messages_queue_manager.rs index 7a5b49c..6745428 100644 --- a/src/cgw_ucentral_messages_queue_manager.rs +++ b/src/cgw_ucentral_messages_queue_manager.rs @@ -119,7 +119,7 @@ lazy_static! { impl CGWUCentralMessagesQueueManager { pub async fn create_device_messages_queue(&self, device_mac: &MacAddress) { if !self.check_messages_queue_exists(device_mac).await { - debug!("Create queue message for device: {}", device_mac); + debug!("Create queue message for device: {device_mac}"); let new_queue: Arc> = Arc::new(RwLock::new(CGWUCentralMessagesQueue::new())); @@ -139,21 +139,18 @@ impl CGWUCentralMessagesQueueManager { pub async fn delete_device_messages_queue(&self, device_mac: &MacAddress) { let mut write_lock = self.queue.write().await; - debug!("Remove queue message for device: {}", device_mac); + debug!("Remove queue message for device: {device_mac}"); match write_lock.remove(device_mac) { Some(_) => {} None => { - error!( - "Trying to delete message queue for unexisting device: {}", - device_mac - ); + error!("Trying to delete message queue for unexisting device: {device_mac}!"); } } } pub async fn clear_device_message_queue(&self, device_mac: &MacAddress) { - debug!("Flush device {} queue due to timeout!", device_mac); + debug!("Flush device {device_mac} queue due to timeout"); let container_lock = self.queue.read().await; if let Some(device_msg_queue) = container_lock.get(device_mac) { diff --git a/src/cgw_ucentral_parser.rs b/src/cgw_ucentral_parser.rs index 8c631ac..04c5a3d 100644 --- a/src/cgw_ucentral_parser.rs +++ b/src/cgw_ucentral_parser.rs @@ -327,12 +327,12 @@ pub fn cgw_ucentral_parse_connect_event(message: Message) -> Result Result without params"); + warn!("Received JSONRPC without params!"); Error::UCentralParser("Received JSONRPC without params") })?; @@ -377,28 +377,28 @@ pub fn cgw_ucentral_parse_command_message(message: &str) -> Result m, Err(e) => { - error!("Failed to parse input json {e}"); + error!("Failed to parse input json! Error: {e}"); return Err(Error::UCentralParser("Failed to parse input json")); } }; if !map.contains_key("jsonrpc") { - warn!("Received malformed JSONRPC msg"); + warn!("Received malformed JSONRPC msg!"); return Err(Error::UCentralParser("JSONRPC field is missing in message")); } if !map.contains_key("method") { - warn!("Received malformed JSONRPC msg"); + warn!("Received malformed JSONRPC msg!"); return Err(Error::UCentralParser("method field is missing in message")); } if !map.contains_key("params") { - warn!("Received malformed JSONRPC msg"); + warn!("Received malformed JSONRPC msg!"); return Err(Error::UCentralParser("params field is missing in message")); } if !map.contains_key("id") { - warn!("Received malformed JSONRPC msg"); + warn!("Received malformed JSONRPC msg!"); return Err(Error::UCentralParser("id field is missing in message")); } @@ -499,7 +499,7 @@ fn cgw_load_json_validation_schemas(path: &Path) -> Result { match serde_json::from_reader(reader) { Ok(json_schema) => Ok(json_schema), Err(e) => Err(Error::UCentralValidator(format!( - "Failed to read JSON schema from file {}: {e}", + "Failed to read JSON schema from file {}! Error: {e}", path.display() ))), } diff --git a/src/cgw_ucentral_switch_parser.rs b/src/cgw_ucentral_switch_parser.rs index 7b63cf4..f49748f 100644 --- a/src/cgw_ucentral_switch_parser.rs +++ b/src/cgw_ucentral_switch_parser.rs @@ -106,8 +106,8 @@ fn parse_fdb_data( let vid = { match u16::from_str(k.as_str()) { Ok(v) => v, - Err(_e) => { - warn!("Failed to convert vid {k} to u16"); + Err(e) => { + warn!("Failed to convert vid {k} to u16! Error: {e}"); continue; } } @@ -130,7 +130,7 @@ fn parse_fdb_data( if let Some(ref mut existing_vec) = existing_vec { existing_vec.push(clients_data); } else { - warn!("Unexpected: tried to push clients_data {:?}:{}, while hashmap entry (key) for it does not exist", + warn!("Unexpected: tried to push clients_data [{}:{}], while hashmap entry (key) for it does not exist!", local_port, clients_data.remote_port); } } @@ -151,19 +151,19 @@ pub fn cgw_ucentral_switch_parse_message( let map: CGWUCentralJRPCMessage = match serde_json::from_str(message) { Ok(m) => m, Err(e) => { - error!("Failed to parse input json {e}"); + error!("Failed to parse input json! Error: {e}"); return Err(Error::UCentralParser("Failed to parse input json")); } }; if !map.contains_key("jsonrpc") { - warn!("Received malformed JSONRPC msg"); + warn!("Received malformed JSONRPC msg!"); return Err(Error::UCentralParser("JSONRPC field is missing in message")); } if map.contains_key("method") { let method = map["method"].as_str().ok_or_else(|| { - warn!("Received JRPC without params."); + warn!("Received JRPC without params!"); Error::UCentralParser("Received JRPC without params") })?; if method == "log" { diff --git a/src/cgw_ucentral_topology_map.rs b/src/cgw_ucentral_topology_map.rs index 9fd661a..7592942 100644 --- a/src/cgw_ucentral_topology_map.rs +++ b/src/cgw_ucentral_topology_map.rs @@ -210,7 +210,7 @@ impl CGWUCentralTopologyMap { } async fn process_queue() { - info!("TopoMap: queue processor started"); + info!("TopoMap: queue processor started."); let topo_map = CGWUCentralTopologyMap::get_ref(); let buf_capacity = 2000; @@ -320,8 +320,7 @@ impl CGWUCentralTopologyMap { // parse string once again. if CGWDeviceType::from_str(platform).is_err() { warn!( - "Tried to insert {} into tomo map, but failed to parse it's platform string", - topology_node_mac + "Tried to insert {topology_node_mac} into tomo map, but failed to parse it's platform string" ); return; } @@ -442,7 +441,7 @@ impl CGWUCentralTopologyMap { if let Ok(r) = msg { let _ = conn_server.enqueue_mbox_message_from_device_to_nb_api_c(gid, r); } else { - warn!("Failed to convert client.leave event to string!"); + warn!("Failed to convert client leave event to string!"); } } } @@ -472,7 +471,7 @@ impl CGWUCentralTopologyMap { if let Ok(r) = msg { let _ = conn_server.enqueue_mbox_message_from_device_to_nb_api_c(gid, r); } else { - warn!("Failed to convert client.leave event to string!"); + warn!("Failed to convert client leave event to string!"); } } } @@ -507,7 +506,7 @@ impl CGWUCentralTopologyMap { if let Ok(r) = msg { let _ = conn_server.enqueue_mbox_message_from_device_to_nb_api_c(gid, r); } else { - warn!("Failed to convert client.leave event to string!"); + warn!("Failed to convert client leave event to string!"); } } } @@ -548,7 +547,7 @@ impl CGWUCentralTopologyMap { if let Some((ref mut topology_map_data, _)) = lock.get_mut(&gid) { Self::clear_related_nodes(topology_map_data, topology_node_mac); } else { - error!("Unexpected: GID {gid} doesn't exists (should've been created prior to state processing)"); + error!("Unexpected: GID {gid} doesn't exists (should've been created prior to state processing)!"); return; } } @@ -941,7 +940,7 @@ impl CGWUCentralTopologyMap { } } } else { - error!("Unexpected: GID {gid} doesn't exists (should've been created prior to state processing)"); + error!("Unexpected: GID {gid} doesn't exists (should've been created prior to state processing)!"); } } } diff --git a/src/main.rs b/src/main.rs index 87920dd..308dd1b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -104,7 +104,7 @@ impl AppCore { let cgw_server = match CGWConnectionServer::new(&app_args).await { Ok(s) => s, Err(e) => { - error!("Failed to create CGW server: {:?}", e); + error!("Failed to create CGW server! Error: {e}"); return Err(e); } }; @@ -174,13 +174,9 @@ async fn server_loop(app_core: Arc) -> Result<()> { let listener: Arc = match TcpListener::bind(sockaddress).await { Ok(listener) => Arc::new(listener), Err(e) => { - error!( - "Failed to bind socket address: {}. Error: {}", - sockaddress, e - ); + error!("Failed to bind socket address {sockaddress}! Error: {e}"); return Err(Error::ConnectionServer(format!( - "Failed to bind socket address: {}. Error: {}", - sockaddress, e + "Failed to bind socket address {sockaddress}! Error: {e}" ))); } }; @@ -188,7 +184,7 @@ async fn server_loop(app_core: Arc) -> Result<()> { let tls_acceptor = match cgw_tls_create_acceptor(&app_core.args.wss_args).await { Ok(acceptor) => acceptor, Err(e) => { - error!("Failed to create TLS acceptor. Error: {}", e.to_string()); + error!("Failed to create TLS acceptor! Error: {e}"); return Err(e); } }; @@ -196,7 +192,7 @@ async fn server_loop(app_core: Arc) -> Result<()> { // Spawn explicitly in main thread: created task accepts connection, // but handling is spawned inside another threadpool runtime let app_core_clone = app_core.clone(); - let _ = app_core + let result = app_core .main_runtime_handle .spawn(async move { let mut conn_idx: i64 = 0; @@ -209,12 +205,12 @@ async fn server_loop(app_core: Arc) -> Result<()> { let (socket, remote_addr) = match listener.accept().await { Ok((sock, addr)) => (sock, addr), Err(e) => { - error!("Failed to Accept conn {e}\n"); + error!("Failed to accept connection! Error: {e}"); continue; } }; - info!("ACK conn: {}", conn_idx); + info!("Accept (ACK) connection: {conn_idx}, remote address: {remote_addr}"); app_core_clone.conn_ack_runtime_handle.spawn(async move { cgw_server_clone @@ -227,6 +223,13 @@ async fn server_loop(app_core: Arc) -> Result<()> { }) .await; + match result { + Ok(_) => info!("Apllication finished succesfully!"), + Err(e) => { + error!("Application failed! Error: {e}"); + } + } + Ok(()) } @@ -266,7 +269,7 @@ async fn main() -> Result<()> { Ok(app_args) => app_args, Err(e) => { setup_logger(AppCoreLogLevel::Info); - error!("Failed to parse app args: {}", e.to_string()); + error!("Failed to parse application args! Error: {e}"); return Err(e); } }; @@ -276,7 +279,7 @@ async fn main() -> Result<()> { // Initialize runtimes if let Err(e) = cgw_initialize_runtimes(args.wss_args.wss_t_num) { - error!("Failed to initialize CGW runtimes: {}", e.to_string()); + error!("Failed to initialize CGW runtimes! Error: {e}"); return Err(e); } @@ -296,7 +299,7 @@ async fn main() -> Result<()> { // Spawn a task to listen for SIGHUP, SIGINT, and SIGTERM signals tokio::spawn(async move { if let Err(e) = signal_handler(shutdown_notify_clone).await { - error!("Failed to handle signal: {:?}", e); + error!("Failed to handle signal (SIGHUP, SIGINT, or SIGTERM)! Error: {e}"); } });