From bb7a0f1eb4f9e30d5fc7b620a10cbd83877c3a53 Mon Sep 17 00:00:00 2001 From: agou <2513281693@qq.com> Date: Thu, 9 May 2024 16:51:43 +0800 Subject: [PATCH 1/5] Fix Android compilation errors --- Cargo.toml | 4 +++- src/config.rs | 20 ++++++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dfbc70da..02137e12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,13 +45,15 @@ lazy_static = "1.4.0" md5 = "0.7.0" chrono = "0.4" async-trait = "0.1" -local-ip-address = "0.6" redis = { version = "0.25", features = ["tokio-comp", "cluster", "json"] } libwish = { path = "libs/libwish" } signal = { path = "libs/signal" } live777-http = { path = "libs/live777-http" } live777-storage = { path = "libs/live777-storage" } +[target.'cfg(not(target_os = "android"))'.dependencies] +local-ip-address = "0.6" + # cargo install cargo-deb # Reference: https://github.com/kornelski/cargo-deb [package.metadata.deb] diff --git a/src/config.rs b/src/config.rs index 84c04380..c31bddbc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,7 @@ use base64::engine::general_purpose::STANDARD; use base64::Engine; +// 使用条件编译排除在Android平台使用local_ip_address +#[cfg(not(target_os = "android"))] use local_ip_address::local_ip; use serde::{Deserialize, Serialize}; use std::{env, fs}; @@ -9,6 +11,20 @@ use webrtc::{ Error, }; +#[cfg(target_os = "android")] +fn get_local_ip_address() -> Option { + // 对于Android平台,直接返回None或者一个固定的IP地址 + None +} + +#[cfg(not(target_os = "android"))] +fn get_local_ip_address() -> Option { + // 在支持的平台上使用local_ip_address包获取IP地址,并将IpAddr转换为String + local_ip().map(|ip| ip.to_string()).ok() +} + + + #[derive(Debug, Default, Clone, Deserialize, Serialize)] pub struct Config { #[serde(default)] @@ -149,14 +165,14 @@ fn default_http_listen() -> String { ) } +// 修改这里来使用上面定义的 get_local_ip_address 函数 fn default_registry_ip_port() -> String { format!( "{}:{}", - local_ip().unwrap(), + get_local_ip_address().unwrap_or_else(|| "127.0.0.1".to_string()), env::var("PORT").unwrap_or(String::from("7777")) ) } - impl Default for Http { fn default() -> Self { Self { From bf94c0ebbec685aa7c5b90dfcc2a22746c210a99 Mon Sep 17 00:00:00 2001 From: agou <2513281693@qq.com> Date: Thu, 9 May 2024 16:55:19 +0800 Subject: [PATCH 2/5] Revert "Fix Android compilation errors" This reverts commit bb7a0f1eb4f9e30d5fc7b620a10cbd83877c3a53. --- Cargo.toml | 4 +--- src/config.rs | 20 ++------------------ 2 files changed, 3 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 02137e12..dfbc70da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,15 +45,13 @@ lazy_static = "1.4.0" md5 = "0.7.0" chrono = "0.4" async-trait = "0.1" +local-ip-address = "0.6" redis = { version = "0.25", features = ["tokio-comp", "cluster", "json"] } libwish = { path = "libs/libwish" } signal = { path = "libs/signal" } live777-http = { path = "libs/live777-http" } live777-storage = { path = "libs/live777-storage" } -[target.'cfg(not(target_os = "android"))'.dependencies] -local-ip-address = "0.6" - # cargo install cargo-deb # Reference: https://github.com/kornelski/cargo-deb [package.metadata.deb] diff --git a/src/config.rs b/src/config.rs index c31bddbc..84c04380 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,7 +1,5 @@ use base64::engine::general_purpose::STANDARD; use base64::Engine; -// 使用条件编译排除在Android平台使用local_ip_address -#[cfg(not(target_os = "android"))] use local_ip_address::local_ip; use serde::{Deserialize, Serialize}; use std::{env, fs}; @@ -11,20 +9,6 @@ use webrtc::{ Error, }; -#[cfg(target_os = "android")] -fn get_local_ip_address() -> Option { - // 对于Android平台,直接返回None或者一个固定的IP地址 - None -} - -#[cfg(not(target_os = "android"))] -fn get_local_ip_address() -> Option { - // 在支持的平台上使用local_ip_address包获取IP地址,并将IpAddr转换为String - local_ip().map(|ip| ip.to_string()).ok() -} - - - #[derive(Debug, Default, Clone, Deserialize, Serialize)] pub struct Config { #[serde(default)] @@ -165,14 +149,14 @@ fn default_http_listen() -> String { ) } -// 修改这里来使用上面定义的 get_local_ip_address 函数 fn default_registry_ip_port() -> String { format!( "{}:{}", - get_local_ip_address().unwrap_or_else(|| "127.0.0.1".to_string()), + local_ip().unwrap(), env::var("PORT").unwrap_or(String::from("7777")) ) } + impl Default for Http { fn default() -> Self { Self { From 69fafff038d1b6dd29910f889f46e24a0307cfe6 Mon Sep 17 00:00:00 2001 From: agou <2513281693@qq.com> Date: Mon, 6 Jan 2025 16:53:19 +0800 Subject: [PATCH 3/5] Add pull stream feature --- conf/livenil/liveman.toml | 4 +- liveman/src/config.rs | 19 ++++++- liveman/src/route/cascade.rs | 102 +++++++++++++++++++++-------------- liveman/src/route/utils.rs | 40 ++++++++++++++ 4 files changed, 123 insertions(+), 42 deletions(-) diff --git a/conf/livenil/liveman.toml b/conf/livenil/liveman.toml index f4068515..1cdb2091 100644 --- a/conf/livenil/liveman.toml +++ b/conf/livenil/liveman.toml @@ -1,2 +1,4 @@ [http] -listen = "0.0.0.0:8888" +listen = "127.0.0.1:8888" +[cascade] +mode = "pull" \ No newline at end of file diff --git a/liveman/src/config.rs b/liveman/src/config.rs index 8a32b87b..1102f077 100644 --- a/liveman/src/config.rs +++ b/liveman/src/config.rs @@ -1,5 +1,4 @@ use std::{env, net::SocketAddr, str::FromStr}; - use serde::{Deserialize, Serialize}; #[derive(Debug, Default, Clone, Deserialize, Serialize)] @@ -163,6 +162,21 @@ fn default_log_level() -> String { }) } + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CascadeMode { + Push, + Pull, +} + +impl Default for CascadeMode { + fn default() -> Self { + CascadeMode::Push + } +} + + #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct Cascade { #[serde(default)] @@ -173,6 +187,9 @@ pub struct Cascade { pub maximum_idle_time: u64, #[serde(default)] pub close_other_sub: bool, + + #[serde(default)] + pub mode: CascadeMode, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/liveman/src/route/cascade.rs b/liveman/src/route/cascade.rs index 202fd723..7d46ba09 100644 --- a/liveman/src/route/cascade.rs +++ b/liveman/src/route/cascade.rs @@ -2,7 +2,8 @@ use std::collections::HashSet; use tracing::{error, info}; -use crate::route::utils::{cascade_push, force_check_times, session_delete}; +use crate::config::CascadeMode; +use crate::route::utils::{cascade_pull, cascade_push, force_check_times, session_delete}; use crate::store::Server; use crate::{error::AppError, result::Result, AppState}; @@ -11,54 +12,75 @@ pub async fn cascade_new_node( nodes: Vec, stream: String, ) -> Result { - let set_all: HashSet = state.storage.nodes().await.into_iter().clone().collect(); + + let set_all: HashSet = state.storage.nodes().await.into_iter().collect(); let set_src: HashSet = nodes.clone().into_iter().collect(); let set_dst: HashSet<&Server> = set_all.difference(&set_src).collect(); - let arr = set_dst.into_iter().collect::>(); + let server_src = nodes.first().unwrap().clone(); let server_ds0 = *arr.first().unwrap(); let server_dst = server_ds0.clone(); - info!("cascade from: {:?}, to: {:?}", server_src, server_dst); - tokio::spawn(async move { - match cascade_push( - state.config.http.public.clone(), - state.client.clone(), - server_src.clone(), - server_dst.clone(), - stream.clone(), - ) - .await - { - Ok(()) => { - match force_check_times( - state.client.clone(), - server_dst.clone(), - stream.clone(), - state.config.cascade.check_attempts.0, - ) - .await - { - Ok(count) => { - if state.config.cascade.close_other_sub { - cascade_close_other_sub(state, server_src, stream).await - } - info!("cascade success, checked attempts: {}", count) - } - Err(e) => error!("cascade check error: {:?}", e), - } - Ok(server_dst.clone()) - } - Err(e) => { - error!("cascade error: {:?}", e); - Err(AppError::InternalServerError(e)) - } - } - }); - Ok(server_ds0.clone()) + let mode = state.config.cascade.mode.clone(); + let public = state.config.http.public.clone(); + let client = state.client.clone(); + + info!("cascade mode: {:?}, from: {:?}, to: {:?}", mode, server_src, server_dst); + + tokio::spawn(async move { + + let cascade_result = match mode { + CascadeMode::Push => { + cascade_push( + public, + client.clone(), + server_src.clone(), + server_dst.clone(), + stream.clone(), + ) + .await + } + CascadeMode::Pull => { + cascade_pull( + state.client.clone(), + server_src.clone(), + server_dst.clone(), + stream.clone(), + ) + .await + } + }; + match cascade_result { + Ok(()) => { + match force_check_times( + state.client.clone(), + server_dst.clone(), + stream.clone(), + state.config.cascade.check_attempts.0, + ) + .await + { + Ok(count) => { + if state.config.cascade.close_other_sub { + cascade_close_other_sub(state, server_src, stream).await + } + info!( + "cascade {:?} success, checked attempts: {}", + mode, + count + ) + } + Err(e) => error!("cascade check error: {:?}", e), + } + } + Err(e) => error!("cascade {:?} error: {:?}", mode, e), + } + }); + + Ok(server_ds0.clone()) } async fn cascade_close_other_sub(mut state: AppState, server: Server, stream: String) { diff --git a/liveman/src/route/utils.rs b/liveman/src/route/utils.rs index 44ae8495..5bacd318 100644 --- a/liveman/src/route/utils.rs +++ b/liveman/src/route/utils.rs @@ -117,3 +117,43 @@ pub async fn session_delete( Err(anyhow!("http status not success")) } } +pub async fn cascade_pull( + client: reqwest::Client, + server_src: Server, + server_dst: Server, + stream: String, +) -> Result<(), Error> { + let mut headers = HeaderMap::new(); + headers.append("Content-Type", "application/json".parse().unwrap()); + + let url = format!("{}{}", server_dst.url, &api::path::cascade(&stream)); + + + let body = serde_json::to_string(&Cascade { + source_url: Some(format!("{}/whep/{}", server_src.url, stream)), + token: Some(server_src.token.clone()), + target_url: None, + }) + .unwrap(); + + trace!("cascade pull request: {:?}", body); + + let response = client + .post(url.clone()) + .headers(headers) + .body(body) + .send() + .await?; + + if response.status().is_success() { + Ok(()) + } else { + error!( + "url: {:?}, [{:?}], response: {:?}", + url, + response.status(), + response.text().await? + ); + Err(anyhow!("http status not success")) + } +} \ No newline at end of file From 25c81e8160ac6f75fac6388f7732e8bc3315cd4c Mon Sep 17 00:00:00 2001 From: agou <2513281693@qq.com> Date: Tue, 7 Jan 2025 15:14:10 +0800 Subject: [PATCH 4/5] Fix Clippy warnings and improve CascadeMode default implementation --- conf/liveman.toml | 4 ++ conf/livenil/liveman.toml | 2 - libs/net4mqtt/src/utils.rs | 4 +- liveman/src/config.rs | 14 ++--- liveman/src/route/cascade.rs | 111 +++++++++++++++++------------------ liveman/src/route/utils.rs | 7 +-- 6 files changed, 66 insertions(+), 76 deletions(-) diff --git a/conf/liveman.toml b/conf/liveman.toml index 0b5b0ce3..c2012e89 100644 --- a/conf/liveman.toml +++ b/conf/liveman.toml @@ -39,6 +39,10 @@ # maximum_idle_time = 60000 # When cascade is working, close src server not cascade push session subscription # close_other_sub = false +# Cascade operating mode +# Options: "push" or "pull". Determines whether cascade operates in push mode or pull mode. +# Default is "pull" +# mode = "pull" # [net4mqtt] # Global unique alias diff --git a/conf/livenil/liveman.toml b/conf/livenil/liveman.toml index 1cdb2091..acd6e79d 100644 --- a/conf/livenil/liveman.toml +++ b/conf/livenil/liveman.toml @@ -1,4 +1,2 @@ [http] listen = "127.0.0.1:8888" -[cascade] -mode = "pull" \ No newline at end of file diff --git a/libs/net4mqtt/src/utils.rs b/libs/net4mqtt/src/utils.rs index aa3b7c8b..e21a97f4 100644 --- a/libs/net4mqtt/src/utils.rs +++ b/libs/net4mqtt/src/utils.rs @@ -21,11 +21,11 @@ fn strip_slashes(path: &str) -> &str { let mut start = 0; let mut end = path.len(); - if path.starts_with("/") { + if path.starts_with('/') { start = 1; } - if path.ends_with("/") { + if path.ends_with('/') { end -= 1; } diff --git a/liveman/src/config.rs b/liveman/src/config.rs index 1102f077..1bcc6e1e 100644 --- a/liveman/src/config.rs +++ b/liveman/src/config.rs @@ -1,5 +1,5 @@ -use std::{env, net::SocketAddr, str::FromStr}; use serde::{Deserialize, Serialize}; +use std::{env, net::SocketAddr, str::FromStr}; #[derive(Debug, Default, Clone, Deserialize, Serialize)] pub struct Config { @@ -162,21 +162,15 @@ fn default_log_level() -> String { }) } - #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] +#[derive(Default)] pub enum CascadeMode { + #[default] Push, Pull, } -impl Default for CascadeMode { - fn default() -> Self { - CascadeMode::Push - } -} - - #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct Cascade { #[serde(default)] @@ -187,7 +181,7 @@ pub struct Cascade { pub maximum_idle_time: u64, #[serde(default)] pub close_other_sub: bool, - + #[serde(default)] pub mode: CascadeMode, } diff --git a/liveman/src/route/cascade.rs b/liveman/src/route/cascade.rs index 7d46ba09..d5fbdc43 100644 --- a/liveman/src/route/cascade.rs +++ b/liveman/src/route/cascade.rs @@ -5,82 +5,77 @@ use tracing::{error, info}; use crate::config::CascadeMode; use crate::route::utils::{cascade_pull, cascade_push, force_check_times, session_delete}; use crate::store::Server; -use crate::{error::AppError, result::Result, AppState}; +use crate::{result::Result, AppState}; pub async fn cascade_new_node( mut state: AppState, nodes: Vec, stream: String, ) -> Result { - let set_all: HashSet = state.storage.nodes().await.into_iter().collect(); let set_src: HashSet = nodes.clone().into_iter().collect(); let set_dst: HashSet<&Server> = set_all.difference(&set_src).collect(); let arr = set_dst.into_iter().collect::>(); - let server_src = nodes.first().unwrap().clone(); let server_ds0 = *arr.first().unwrap(); let server_dst = server_ds0.clone(); + let mode = state.config.cascade.mode.clone(); + let public = state.config.http.public.clone(); + let client = state.client.clone(); - let mode = state.config.cascade.mode.clone(); - let public = state.config.http.public.clone(); - let client = state.client.clone(); + info!( + "cascade mode: {:?}, from: {:?}, to: {:?}", + mode, server_src, server_dst + ); - info!("cascade mode: {:?}, from: {:?}, to: {:?}", mode, server_src, server_dst); - - tokio::spawn(async move { - - let cascade_result = match mode { - CascadeMode::Push => { - cascade_push( - public, - client.clone(), - server_src.clone(), - server_dst.clone(), - stream.clone(), - ) - .await - } - CascadeMode::Pull => { - cascade_pull( - state.client.clone(), - server_src.clone(), - server_dst.clone(), - stream.clone(), - ) - .await - } - }; - match cascade_result { - Ok(()) => { - match force_check_times( - state.client.clone(), - server_dst.clone(), - stream.clone(), - state.config.cascade.check_attempts.0, - ) - .await - { - Ok(count) => { - if state.config.cascade.close_other_sub { - cascade_close_other_sub(state, server_src, stream).await - } - info!( - "cascade {:?} success, checked attempts: {}", - mode, - count - ) - } - Err(e) => error!("cascade check error: {:?}", e), - } - } - Err(e) => error!("cascade {:?} error: {:?}", mode, e), - } - }); + tokio::spawn(async move { + let cascade_result = match mode { + CascadeMode::Push => { + cascade_push( + public, + client.clone(), + server_src.clone(), + server_dst.clone(), + stream.clone(), + ) + .await + } + CascadeMode::Pull => { + cascade_pull( + state.client.clone(), + server_src.clone(), + server_dst.clone(), + stream.clone(), + ) + .await + } + }; + match cascade_result { + Ok(()) => { + match force_check_times( + state.client.clone(), + server_dst.clone(), + stream.clone(), + state.config.cascade.check_attempts.0, + ) + .await + { + Ok(count) => { + if state.config.cascade.close_other_sub { + cascade_close_other_sub(state, server_src, stream).await + } + info!("cascade {:?} success, checked attempts: {}", mode, count) + } + Err(e) => error!("cascade check error: {:?}", e), + } + } + Err(e) => error!("cascade {:?} error: {:?}", mode, e), + } + }); - Ok(server_ds0.clone()) + Ok(server_ds0.clone()) } async fn cascade_close_other_sub(mut state: AppState, server: Server, stream: String) { diff --git a/liveman/src/route/utils.rs b/liveman/src/route/utils.rs index 5bacd318..10f8af6e 100644 --- a/liveman/src/route/utils.rs +++ b/liveman/src/route/utils.rs @@ -128,11 +128,10 @@ pub async fn cascade_pull( let url = format!("{}{}", server_dst.url, &api::path::cascade(&stream)); - let body = serde_json::to_string(&Cascade { source_url: Some(format!("{}/whep/{}", server_src.url, stream)), - token: Some(server_src.token.clone()), - target_url: None, + token: Some(server_src.token.clone()), + target_url: None, }) .unwrap(); @@ -156,4 +155,4 @@ pub async fn cascade_pull( ); Err(anyhow!("http status not success")) } -} \ No newline at end of file +} From 065adb4d01e2414237e1c3d15fc5b46a60b198e1 Mon Sep 17 00:00:00 2001 From: agou <2513281693@qq.com> Date: Tue, 7 Jan 2025 15:28:26 +0800 Subject: [PATCH 5/5] default is push --- conf/liveman.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/liveman.toml b/conf/liveman.toml index c2012e89..75d6103f 100644 --- a/conf/liveman.toml +++ b/conf/liveman.toml @@ -41,7 +41,7 @@ # close_other_sub = false # Cascade operating mode # Options: "push" or "pull". Determines whether cascade operates in push mode or pull mode. -# Default is "pull" +# Default is "push" # mode = "pull" # [net4mqtt]