Skip to content

Commit

Permalink
Asyc message handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Xerxes-2 committed Apr 21, 2024
1 parent 7520fe2 commit ebebab3
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 156 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
base64 = "0.22.0"
hudsucker = { git = "https://github.com/omjadas/hudsucker.git" }
once_cell = "1.19.0"
prost-reflect = { version = "0.13.1", features = ["serde"] }
reqwest = { version = "0.12.3", features = ["json"] }
serde = { version = "1.0.198", features = ["derive"] }
Expand Down
277 changes: 129 additions & 148 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,16 @@ use hudsucker::{
tokio_tungstenite::tungstenite::Message,
*,
};
use once_cell::sync::Lazy;
use prost_reflect::{DynamicMessage, SerializeOptions, Value};
use serde_json::{json, Map, Value as JsonValue};
use std::{
error::Error,
future::Future,
io::Read,
sync::{Arc, Mutex},
};
use std::{error::Error, future::Future, sync::Mutex};
use std::{format, net::SocketAddr};
use tracing::*;
mod parser;
mod settings;
use parser::{Action, LiqiMessage};
use parser::{Action, LiqiMessage, Parser};
use settings::Settings;

use crate::parser::my_serialize;

Expand All @@ -28,157 +25,162 @@ async fn shutdown_signal() {
}

#[derive(Clone)]
struct ActionHandler {
parser: Arc<Mutex<parser::Parser>>,
settings: Arc<settings::Settings>,
client: reqwest::Client,
}
struct ActionHandler;

pub const SERIALIZE_OPTIONS: SerializeOptions = SerializeOptions::new()
.skip_default_fields(false)
.use_proto_field_name(true);

pub const RANDOM_MD5: &str = "0123456789abcdef0123456789abcdef";

static PARSER: Mutex<Lazy<Parser>> = Mutex::new(Lazy::<Parser, _>::new(Parser::new));
static CLIENT: Lazy<reqwest::Client> = Lazy::new(|| {
reqwest::ClientBuilder::new()
.danger_accept_invalid_certs(true)
.build()
.expect("Failed to create reqwest client")
});
static SETTINGS: Lazy<Settings> = Lazy::new(Settings::new);

impl WebSocketHandler for ActionHandler {
async fn handle_message(&mut self, _ctx: &WebSocketContext, msg: Message) -> Option<Message> {
let direction_char = match _ctx {
WebSocketContext::ClientToServer { .. } => '\u{2191}',
WebSocketContext::ServerToClient { .. } => '\u{2193}',
};
if let Message::Binary(buf) = &msg {
// convert binary message to hex string
let hex = buf
.iter()
.map(|b| {
if *b >= 0x20 && *b <= 0x7e {
format!("{}", *b as char)
} else {
format!("{:02x} ", b)
let msg_clone = msg.clone();
tokio::spawn(async move {
if let Message::Binary(buf) = msg_clone {
// convert binary message to hex string
let hex = buf
.iter()
.map(|b| {
if *b >= 0x20 && *b <= 0x7e {
format!("{}", *b as char)
} else {
format!("{:02x} ", b)
}
})
.collect::<String>();
debug!("{} {}", direction_char, hex);
let mut parser = PARSER.lock().unwrap();
let parsed = parser.parse(&buf);
let parsed = match parsed {
Ok(parsed) => parsed,
Err(e) => {
error!("Failed to parse message: {:?}", e);
return;
}
})
.collect::<String>();
debug!("{} {}", direction_char, hex);
let mut parser = self.parser.lock().unwrap();
let parsed = parser.parse(buf);
let parsed = match parsed {
Ok(parsed) => parsed,
Err(e) => {
error!("Failed to parse message: {:?}", e);
return Some(msg);
};
info!(
"监听到: {}, {}, {:?}, {}",
direction_char, parsed.id, parsed.msg_type, parsed.method_name
);
if direction_char == '\u{2193}' {
return;
}
if let Err(e) = send_message(parsed) {
error!("Failed to send message: {:?}", e);
}
};
info!(
"监听到: {}, {}, {:?}, {}",
direction_char, parsed.id, parsed.msg_type, parsed.method_name
);
if direction_char == '\u{2193}' {
return Some(msg);
}
if let Err(e) = self.send_message(parsed) {
error!("Failed to send message: {:?}", e);
}
}
});
Some(msg)
}
}

impl ActionHandler {
fn send_message(&self, mut parsed: LiqiMessage) -> Result<(), Box<dyn Error>> {
let settings = self.settings.clone();
let json_data: JsonValue;
if !settings.is_method(&parsed.method_name) {
fn send_message(mut parsed: LiqiMessage) -> Result<(), Box<dyn Error>> {
let json_data: JsonValue;
if !SETTINGS.is_method(&parsed.method_name) {
return Ok(());
}
if parsed.method_name == ".lq.ActionPrototype" {
let name = parsed
.data
.get("name")
.ok_or("No name field")?
.as_str()
.ok_or("name is not a string")?
.to_owned();
if !SETTINGS.is_action(&name) {
return Ok(());
}
if parsed.method_name == ".lq.ActionPrototype" {
let name = parsed
.data
let data = parsed.data.get_mut("data").ok_or("No data field")?;
if name == "ActionNewRound" {
data.as_object_mut()
.ok_or("data is not an object")?
.insert("md5".to_string(), json!(RANDOM_MD5));
}
json_data = data.take();
} else if parsed.method_name == ".lq.FastTest.syncGame" {
let game_restore = parsed
.data
.get("game_restore")
.ok_or("No game_restore field")?
.get("actions")
.ok_or("No actions field")?
.as_array()
.ok_or("actions is not an array")?;
let mut actions: Vec<Action> = vec![];
for item in game_restore.iter() {
let action_name = item
.get("name")
.ok_or("No name field")?
.as_str()
.ok_or("name is not a string")?
.to_owned();
if !settings.is_action(&name) {
return Ok(());
}
let data = parsed.data.get_mut("data").ok_or("No data field")?;
if name == "ActionNewRound" {
data.as_object_mut()
.ok_or("data is not an object")?
.insert("md5".to_string(), json!(RANDOM_MD5));
}
json_data = data.take();
} else if parsed.method_name == ".lq.FastTest.syncGame" {
let game_restore = parsed
.data
.get("game_restore")
.ok_or("No game_restore field")?
.get("actions")
.ok_or("No actions field")?
.as_array()
.ok_or("actions is not an array")?;
let mut actions: Vec<Action> = vec![];
for item in game_restore.iter() {
let action_name = item
.get("name")
.ok_or("No name field")?
.as_str()
.ok_or("name is not a string")?;
let action_data = item
.get("data")
.ok_or("No data field")?
.as_str()
.unwrap_or("data is not a string");
if action_data.is_empty() {
let action = Action {
name: action_name.to_string(),
data: JsonValue::Object(Map::new()),
};
actions.push(action);
} else {
let b64 = BASE64_STANDARD.decode(action_data)?;
let parser = self.parser.lock().unwrap();
let action_type = parser
.pool
.get_message_by_name(action_name)
.ok_or("Invalid action type")?;
let mut action_obj = DynamicMessage::decode(action_type, b64.as_ref())?;
if action_name == ".lq.ActionNewRound" {
action_obj.set_field_by_name("md5", Value::String(RANDOM_MD5.to_string()));
}
let value: JsonValue = my_serialize(action_obj)?;
let action = Action {
name: action_name.to_string(),
data: value,
};
actions.push(action);
.ok_or("name is not a string")?;
let action_data = item
.get("data")
.ok_or("No data field")?
.as_str()
.unwrap_or("data is not a string");
if action_data.is_empty() {
let action = Action {
name: action_name.to_string(),
data: JsonValue::Object(Map::new()),
};
actions.push(action);
} else {
let b64 = BASE64_STANDARD.decode(action_data)?;
let parser = PARSER.lock().unwrap();
let action_type = parser
.pool
.get_message_by_name(action_name)
.ok_or("Invalid action type")?;
let mut action_obj = DynamicMessage::decode(action_type, b64.as_ref())?;
if action_name == ".lq.ActionNewRound" {
action_obj.set_field_by_name("md5", Value::String(RANDOM_MD5.to_string()));
}
let value: JsonValue = my_serialize(action_obj)?;
let action = Action {
name: action_name.to_string(),
data: value,
};
actions.push(action);
}
let mut map = Map::new();
map.insert(
"sync_game_actions".to_string(),
serde_json::to_value(actions)?,
);
json_data = JsonValue::Object(map);
} else {
json_data = parsed.data;
}
let mut map = Map::new();
map.insert(
"sync_game_actions".to_string(),
serde_json::to_value(actions)?,
);
json_data = JsonValue::Object(map);
} else {
json_data = parsed.data;
}

// post data to API, no verification
let client = self.client.clone();
let future = client.post(&settings.api_url).json(&json_data).send();

handle_future(future);
info!("已发送: {}", json_data);
// post data to API, no verification
let client = CLIENT.clone();
let future = client.post(&SETTINGS.api_url).json(&json_data).send();

if let Some(liqi_data) = json_data.get("liqi") {
let res = client.post(&settings.api_url).json(liqi_data).send();
handle_future(res);
info!("已发送: {:?}", liqi_data);
}
handle_future(future);
info!("已发送: {}", json_data);

Ok(())
if let Some(liqi_data) = json_data.get("liqi") {
let res = client.post(&SETTINGS.api_url).json(liqi_data).send();
handle_future(res);
info!("已发送: {:?}", liqi_data);
}

Ok(())
}

fn handle_future(
Expand Down Expand Up @@ -221,33 +223,12 @@ async fn main() {
请遵守当地法律法规,对于使用本程序所产生的任何后果,作者概不负责!
\x1b[0m"
);
let parser = parser::Parser::new();
let settings = settings::Settings::new();
let settings = match settings {
Ok(settings) => settings,
Err(e) => {
error!("{}", e);
// press any key to exit
println!("按任意键退出");
let mut stdin = std::io::stdin();
let _ = stdin.read(&mut [0u8]).unwrap_or_default();
return;
}
};
let client = reqwest::ClientBuilder::new()
.danger_accept_invalid_certs(true)
.build()
.expect("Failed to create reqwest client");

let proxy = Proxy::builder()
.with_addr(SocketAddr::from(([127, 0, 0, 1], 23410)))
.with_rustls_client()
.with_ca(ca)
.with_websocket_handler(ActionHandler {
parser: Arc::new(Mutex::new(parser)),
settings: Arc::new(settings),
client,
})
.with_websocket_handler(ActionHandler)
.with_graceful_shutdown(shutdown_signal())
.build();

Expand Down
18 changes: 10 additions & 8 deletions src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,29 @@ pub struct Settings {
}

impl Settings {
pub fn new() -> Result<Self, Box<dyn std::error::Error>> {
let cur_exe =
std::env::current_exe().map_err(|e| format!("无法获取当前程序路径: {}", e))?;
pub fn new() -> Self {
let cur_exe = std::env::current_exe()
.expect("无法获取当前程序路径")
.canonicalize()
.expect("无法获取当前程序路径的绝对路径");
let exe_dir = cur_exe
.parent()
.ok_or("无法获取当前程序路径")?
.expect("无法获取当前程序路径的父目录")
.to_str()
.ok_or("无法转换路径为字符串")?;
.expect("无法将目录转换为UTF-8字符串");
// read settings from file
let settings = std::fs::read_to_string(std::path::Path::new(exe_dir).join("settings.json"))
.or_else(
// read pwd
|_| std::fs::read_to_string("settings.json"),
)
.map_err(|e| format!("无法读取settings.json: {}", e))?;
.expect("无法读取settings.json");
let mut settings: Settings =
serde_json::from_str(&settings).map_err(|e| format!("无法解析settings.json: {}", e))?;
serde_json::from_str(&settings).expect("无法解析settings.json");
info!("已载入配置: {:?}", settings);
settings.methods_set = settings.send_method.iter().cloned().collect();
settings.actions_set = settings.send_action.iter().cloned().collect();
Ok(settings)
settings
}

pub fn is_method(&self, method: &str) -> bool {
Expand Down

0 comments on commit ebebab3

Please sign in to comment.