Skip to content

Commit

Permalink
Send unzipped AP State Event message to Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
SviatoslavBoichuk committed Jul 24, 2024
1 parent 72f9171 commit 489ee0f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 2 deletions.
33 changes: 31 additions & 2 deletions src/cgw_connection_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
},
cgw_ucentral_parser::{
cgw_ucentral_event_parse, cgw_ucentral_parse_connect_event, CGWUCentralCommandType,
CGWUCentralEventType,
CGWUCentralEventType, CGWUCentralJRPCMessage,
},
cgw_ucentral_topology_map::CGWUCentralTopologyMap,
};
Expand All @@ -19,6 +19,7 @@ use futures_util::{
stream::{SplitSink, SplitStream},
FutureExt, SinkExt, StreamExt,
};
use serde_json::Value;
use std::{net::SocketAddr, str::FromStr, sync::Arc};
use tokio::{
net::TcpStream,
Expand Down Expand Up @@ -268,6 +269,7 @@ impl CGWConnectionProcessor {
// Make sure we always track the as accurate as possible the time
// of receiving of the event (where needed).
let timestamp = Local::now();
let mut kafaka_msg: String = String::new();

match msg {
Ok(msg) => match msg {
Expand All @@ -278,7 +280,34 @@ impl CGWConnectionProcessor {
if let Ok(evt) =
cgw_ucentral_event_parse(&device_type, &payload, timestamp.timestamp())
{
kafaka_msg = payload.clone();
if let CGWUCentralEventType::State(_) = evt.evt_type {
if let Ok(mut origin_msg) =
serde_json::from_str::<CGWUCentralJRPCMessage>(&payload)
{
let params_value = match Value::from_str(
evt.decompressed.clone().unwrap().as_str(),
) {
Ok(val) => val,
Err(_e) => {
return Err(Error::ConnectionProcessor(
"Failed to cast decompressed message to JSON Value",
));
}
};
if let Some(value) = origin_msg.get_mut("params") {
*value = params_value;
kafaka_msg = match serde_json::to_string(&origin_msg) {
Ok(msg) => msg,
Err(_e) => {
return Err(Error::ConnectionProcessor(
"Failed to create decompressed Event message",
));
}
};
}
}

if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.process_state_message(&device_type, evt).await;
Expand Down Expand Up @@ -313,7 +342,7 @@ impl CGWConnectionProcessor {
}

self.cgw_server
.enqueue_mbox_message_from_device_to_nb_api_c(self.group_id, payload)?;
.enqueue_mbox_message_from_device_to_nb_api_c(self.group_id, kafaka_msg)?;
return Ok(CGWConnectionState::IsActive);
}
Ping(_t) => {
Expand Down
7 changes: 7 additions & 0 deletions src/cgw_ucentral_ap_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result
links: clients_links,
},
}),
decompressed: Some(unzipped_data),
};

return Ok(state_event);
Expand Down Expand Up @@ -438,6 +439,7 @@ fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result
links: clients_links,
},
}),
decompressed: None,
};

return Ok(state_event);
Expand Down Expand Up @@ -655,6 +657,7 @@ fn parse_realtime_event_data(
},
),
}),
decompressed: None,
})
}
"client.leave" => {
Expand Down Expand Up @@ -734,6 +737,7 @@ fn parse_realtime_event_data(
},
),
}),
decompressed: None,
})
}
_ => {
Expand Down Expand Up @@ -775,6 +779,7 @@ pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result<CG
log: params["log"].to_string(),
severity: serde_json::from_value(params["severity"].clone())?,
}),
decompressed: None,
};

return Ok(log_event);
Expand Down Expand Up @@ -802,6 +807,7 @@ pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result<CG
uuid: 1,
capabilities: caps,
}),
decompressed: None,
};

return Ok(connect_event);
Expand All @@ -822,6 +828,7 @@ pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result<CG
let reply_event = CGWUCentralEvent {
serial: Default::default(),
evt_type: CGWUCentralEventType::Reply(CGWUCentralEventReply { id }),
decompressed: None,
};

return Ok(reply_event);
Expand Down
2 changes: 2 additions & 0 deletions src/cgw_ucentral_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ pub enum CGWUCentralEventType {
pub struct CGWUCentralEvent {
pub serial: MacAddress,
pub evt_type: CGWUCentralEventType,
pub decompressed: Option<String>,
}

#[derive(Deserialize, Debug, Serialize)]
Expand Down Expand Up @@ -262,6 +263,7 @@ pub fn cgw_ucentral_parse_connect_event(message: Message) -> Result<CGWUCentralE
uuid: 1,
capabilities: caps,
}),
decompressed: None,
};

Ok(event)
Expand Down
2 changes: 2 additions & 0 deletions src/cgw_ucentral_switch_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub fn cgw_ucentral_switch_parse_message(
log: params["log"].to_string(),
severity: serde_json::from_value(params["severity"].clone())?,
}),
decompressed: None,
};

return Ok(log_event);
Expand Down Expand Up @@ -216,6 +217,7 @@ pub fn cgw_ucentral_switch_parse_message(
links: clients_links,
},
}),
decompressed: None,
};

return Ok(state_event);
Expand Down

0 comments on commit 489ee0f

Please sign in to comment.