From 5c9800cb1cc5c11fc10071f090b17687bf7b7874 Mon Sep 17 00:00:00 2001 From: dev0 Date: Tue, 12 Sep 2023 22:45:03 +1000 Subject: [PATCH 1/3] wip --- Cargo.lock | 7 + clash_lib/Cargo.toml | 1 + clash_lib/src/app/api/handlers/provider.rs | 8 +- clash_lib/src/app/mod.rs | 2 +- clash_lib/src/app/outbound/manager.rs | 19 ++- .../proxy_manager/providers/rule_provider.rs | 1 - .../healthcheck.rs | 0 .../http_client.rs | 0 .../mod.rs | 9 +- .../providers/fetcher.rs | 2 +- .../providers/file_vehicle.rs | 0 .../providers/http_vehicle.rs | 2 +- .../providers/mod.rs | 9 +- .../providers/proxy_provider/mod.rs | 8 ++ .../proxy_provider}/plain_provider.rs | 13 +- .../proxy_provider}/proxy_provider.rs | 4 +- .../proxy_provider}/proxy_set_provider.rs | 15 +- .../providers/rule_provider/cidr_trie.rs | 34 +++++ .../providers/rule_provider/mod.rs | 2 + .../providers/rule_provider/rule_provider.rs | 135 +++++++++++++++++ clash_lib/src/app/router/mod.rs | 136 +++++++++--------- clash_lib/src/common/trie.rs | 1 + clash_lib/src/proxy/fallback/mod.rs | 4 +- clash_lib/src/proxy/loadbalance/mod.rs | 2 +- clash_lib/src/proxy/mocks.rs | 2 +- clash_lib/src/proxy/relay/mod.rs | 2 +- clash_lib/src/proxy/selector/mod.rs | 2 +- clash_lib/src/proxy/urltest/mod.rs | 4 +- clash_lib/src/proxy/utils/provider_helper.rs | 2 +- 29 files changed, 312 insertions(+), 114 deletions(-) delete mode 100644 clash_lib/src/app/proxy_manager/providers/rule_provider.rs rename clash_lib/src/app/{proxy_manager => remote_content_manager}/healthcheck.rs (100%) rename clash_lib/src/app/{proxy_manager => remote_content_manager}/http_client.rs (100%) rename clash_lib/src/app/{proxy_manager => remote_content_manager}/mod.rs (98%) rename clash_lib/src/app/{proxy_manager => remote_content_manager}/providers/fetcher.rs (98%) rename clash_lib/src/app/{proxy_manager => remote_content_manager}/providers/file_vehicle.rs (100%) rename clash_lib/src/app/{proxy_manager => remote_content_manager}/providers/http_vehicle.rs (98%) rename clash_lib/src/app/{proxy_manager => remote_content_manager}/providers/mod.rs (90%) create mode 100644 clash_lib/src/app/remote_content_manager/providers/proxy_provider/mod.rs rename clash_lib/src/app/{proxy_manager/providers => remote_content_manager/providers/proxy_provider}/plain_provider.rs (89%) rename clash_lib/src/app/{proxy_manager/providers => remote_content_manager/providers/proxy_provider}/proxy_provider.rs (81%) rename clash_lib/src/app/{proxy_manager/providers => remote_content_manager/providers/proxy_provider}/proxy_set_provider.rs (95%) create mode 100644 clash_lib/src/app/remote_content_manager/providers/rule_provider/cidr_trie.rs create mode 100644 clash_lib/src/app/remote_content_manager/providers/rule_provider/mod.rs create mode 100644 clash_lib/src/app/remote_content_manager/providers/rule_provider/rule_provider.rs diff --git a/Cargo.lock b/Cargo.lock index f0599672c..5ed10b488 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -800,6 +800,7 @@ dependencies = [ "httparse", "hyper", "hyper-boring", + "ip_network_table-deps-treebitmap", "ipnet", "libc", "lru_time_cache", @@ -1899,6 +1900,12 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c429fffa658f288669529fc26565f728489a2e39bc7b24a428aaaf51355182e" +[[package]] +name = "ip_network_table-deps-treebitmap" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e537132deb99c0eb4b752f0346b6a836200eaaa3516dd7e5514b63930a09e5d" + [[package]] name = "ipconfig" version = "0.3.2" diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index 660df71b6..bbd45bc87 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -39,6 +39,7 @@ boring = { git = "https://github.com/Watfaq/boring.git", rev = "24c006f" } boring-sys = { git = "https://github.com/Watfaq/boring.git", rev = "24c006f" } hyper-boring = { git = "https://github.com/Watfaq/boring.git", rev = "24c006f" } tokio-boring = { git = "https://github.com/Watfaq/boring.git", rev = "24c006f" } +ip_network_table-deps-treebitmap = "0.5.0" crc32fast = "1.3.2" brotli = "3.3.4" diff --git a/clash_lib/src/app/api/handlers/provider.rs b/clash_lib/src/app/api/handlers/provider.rs index 3a51ea6c8..d6346df14 100644 --- a/clash_lib/src/app/api/handlers/provider.rs +++ b/clash_lib/src/app/api/handlers/provider.rs @@ -11,11 +11,11 @@ use axum::{ }; use serde::Deserialize; -use crate::app::{api::AppState, outbound::manager::ThreadSafeOutboundManager}; -use crate::{ - app::proxy_manager::providers::proxy_provider::ThreadSafeProxyProvider, - proxy::AnyOutboundHandler, +use crate::app::{ + api::AppState, outbound::manager::ThreadSafeOutboundManager, + remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider, }; +use crate::proxy::AnyOutboundHandler; #[derive(Clone)] struct ProviderState { outbound_manager: ThreadSafeOutboundManager, diff --git a/clash_lib/src/app/mod.rs b/clash_lib/src/app/mod.rs index 5e99463d2..3806acda1 100644 --- a/clash_lib/src/app/mod.rs +++ b/clash_lib/src/app/mod.rs @@ -5,5 +5,5 @@ pub mod inbound; pub mod logging; pub mod outbound; pub mod profile; -pub mod proxy_manager; +pub mod remote_content_manager; pub mod router; diff --git a/clash_lib/src/app/outbound/manager.rs b/clash_lib/src/app/outbound/manager.rs index b83e96573..ac157a013 100644 --- a/clash_lib/src/app/outbound/manager.rs +++ b/clash_lib/src/app/outbound/manager.rs @@ -5,19 +5,18 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock}; -use tracing::debug; + use tracing::info; use crate::app::dns::ThreadSafeDNSResolver; -use crate::app::profile::ThreadSafeCacheFile; -use crate::app::proxy_manager::healthcheck::HealthCheck; -use crate::app::proxy_manager::providers::file_vehicle; -use crate::app::proxy_manager::providers::http_vehicle; -use crate::app::proxy_manager::providers::plain_provider::PlainProvider; -use crate::app::proxy_manager::providers::proxy_provider::ThreadSafeProxyProvider; -use crate::app::proxy_manager::providers::proxy_set_provider::ProxySetProvider; -use crate::app::proxy_manager::ProxyManager; - +use crate::app::remote_content_manager::healthcheck::HealthCheck; +use crate::app::remote_content_manager::providers::file_vehicle; +use crate::app::remote_content_manager::providers::http_vehicle; +use crate::app::remote_content_manager::ProxyManager; + +use crate::app::remote_content_manager::providers::proxy_provider::PlainProvider; +use crate::app::remote_content_manager::providers::proxy_provider::ProxySetProvider; +use crate::app::remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider; use crate::config::internal::proxy::PROXY_GLOBAL; use crate::config::internal::proxy::{OutboundProxyProvider, PROXY_DIRECT, PROXY_REJECT}; use crate::proxy::fallback; diff --git a/clash_lib/src/app/proxy_manager/providers/rule_provider.rs b/clash_lib/src/app/proxy_manager/providers/rule_provider.rs deleted file mode 100644 index 8b1378917..000000000 --- a/clash_lib/src/app/proxy_manager/providers/rule_provider.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/clash_lib/src/app/proxy_manager/healthcheck.rs b/clash_lib/src/app/remote_content_manager/healthcheck.rs similarity index 100% rename from clash_lib/src/app/proxy_manager/healthcheck.rs rename to clash_lib/src/app/remote_content_manager/healthcheck.rs diff --git a/clash_lib/src/app/proxy_manager/http_client.rs b/clash_lib/src/app/remote_content_manager/http_client.rs similarity index 100% rename from clash_lib/src/app/proxy_manager/http_client.rs rename to clash_lib/src/app/remote_content_manager/http_client.rs diff --git a/clash_lib/src/app/proxy_manager/mod.rs b/clash_lib/src/app/remote_content_manager/mod.rs similarity index 98% rename from clash_lib/src/app/proxy_manager/mod.rs rename to clash_lib/src/app/remote_content_manager/mod.rs index 4e8eef1fe..9c3ba44d8 100644 --- a/clash_lib/src/app/proxy_manager/mod.rs +++ b/clash_lib/src/app/remote_content_manager/mod.rs @@ -10,7 +10,7 @@ use std::{ use boring::ssl::{SslConnector, SslMethod}; use chrono::{DateTime, Utc}; -use futures::StreamExt; + use http::{Request, Version}; use hyper_boring::HttpsConnector; use serde::Serialize; @@ -246,11 +246,10 @@ impl ProxyManager { mod tests { use std::{net::Ipv4Addr, sync::Arc, time::Duration}; - use anyhow::Chain; use futures::TryFutureExt; use crate::{ - app::{dispatcher::ChainedStreamWrapper, dns::MockClashResolver}, + app::{dispatcher::ChainedStreamWrapper, dns::MockClashResolver, remote_content_manager}, config::internal::proxy::PROXY_DIRECT, proxy::mocks::MockDummyOutboundHandler, }; @@ -262,7 +261,7 @@ mod tests { .expect_resolve() .returning(|_, _| Ok(Some(std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))))); - let manager = super::ProxyManager::new(Arc::new(mock_resolver)); + let manager = remote_content_manager::ProxyManager::new(Arc::new(mock_resolver)); let mut mock_handler = MockDummyOutboundHandler::new(); mock_handler @@ -317,7 +316,7 @@ mod tests { .expect_resolve() .returning(|_, _| Ok(Some(std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))))); - let manager = super::ProxyManager::new(Arc::new(mock_resolver)); + let manager = remote_content_manager::ProxyManager::new(Arc::new(mock_resolver)); let mut mock_handler = MockDummyOutboundHandler::new(); mock_handler diff --git a/clash_lib/src/app/proxy_manager/providers/fetcher.rs b/clash_lib/src/app/remote_content_manager/providers/fetcher.rs similarity index 98% rename from clash_lib/src/app/proxy_manager/providers/fetcher.rs rename to clash_lib/src/app/remote_content_manager/providers/fetcher.rs index eff1a0c01..e8ce3bb32 100644 --- a/clash_lib/src/app/proxy_manager/providers/fetcher.rs +++ b/clash_lib/src/app/remote_content_manager/providers/fetcher.rs @@ -242,7 +242,7 @@ mod tests { use futures::future::BoxFuture; use tokio::time::sleep; - use crate::app::proxy_manager::providers::{MockProviderVehicle, ProviderVehicleType}; + use crate::app::remote_content_manager::providers::{MockProviderVehicle, ProviderVehicleType}; use super::Fetcher; diff --git a/clash_lib/src/app/proxy_manager/providers/file_vehicle.rs b/clash_lib/src/app/remote_content_manager/providers/file_vehicle.rs similarity index 100% rename from clash_lib/src/app/proxy_manager/providers/file_vehicle.rs rename to clash_lib/src/app/remote_content_manager/providers/file_vehicle.rs diff --git a/clash_lib/src/app/proxy_manager/providers/http_vehicle.rs b/clash_lib/src/app/remote_content_manager/providers/http_vehicle.rs similarity index 98% rename from clash_lib/src/app/proxy_manager/providers/http_vehicle.rs rename to clash_lib/src/app/remote_content_manager/providers/http_vehicle.rs index 718d86517..81f3aaf3a 100644 --- a/clash_lib/src/app/proxy_manager/providers/http_vehicle.rs +++ b/clash_lib/src/app/remote_content_manager/providers/http_vehicle.rs @@ -51,7 +51,7 @@ impl ProviderVehicle for Vehicle { } fn typ(&self) -> ProviderVehicleType { - ProviderVehicleType::HTTP + ProviderVehicleType::Http } } diff --git a/clash_lib/src/app/proxy_manager/providers/mod.rs b/clash_lib/src/app/remote_content_manager/providers/mod.rs similarity index 90% rename from clash_lib/src/app/proxy_manager/providers/mod.rs rename to clash_lib/src/app/remote_content_manager/providers/mod.rs index b62003116..2e4db4eda 100644 --- a/clash_lib/src/app/proxy_manager/providers/mod.rs +++ b/clash_lib/src/app/remote_content_manager/providers/mod.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use erased_serde::Serialize; +use serde::Deserialize; use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::io; @@ -8,18 +9,16 @@ use std::sync::Arc; pub mod fetcher; pub mod file_vehicle; pub mod http_vehicle; -pub mod plain_provider; pub mod proxy_provider; -pub mod proxy_set_provider; pub mod rule_provider; #[cfg(test)] use mockall::automock; -#[derive(PartialEq, Clone, Copy, Debug)] +#[derive(Deserialize, PartialEq, Clone, Copy, Debug)] pub enum ProviderVehicleType { File, - HTTP, + Http, Compatible, } @@ -27,7 +26,7 @@ impl Display for ProviderVehicleType { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { ProviderVehicleType::File => write!(f, "File"), - ProviderVehicleType::HTTP => write!(f, "HTTP"), + ProviderVehicleType::Http => write!(f, "HTTP"), ProviderVehicleType::Compatible => write!(f, "Compatible"), } } diff --git a/clash_lib/src/app/remote_content_manager/providers/proxy_provider/mod.rs b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/mod.rs new file mode 100644 index 000000000..55b3fb70e --- /dev/null +++ b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/mod.rs @@ -0,0 +1,8 @@ +pub mod plain_provider; +pub mod proxy_provider; +pub mod proxy_set_provider; + +pub use plain_provider::PlainProvider; +pub use proxy_provider::ProxyProvider; +pub use proxy_provider::ThreadSafeProxyProvider; +pub use proxy_set_provider::ProxySetProvider; diff --git a/clash_lib/src/app/proxy_manager/providers/plain_provider.rs b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/plain_provider.rs similarity index 89% rename from clash_lib/src/app/proxy_manager/providers/plain_provider.rs rename to clash_lib/src/app/remote_content_manager/providers/proxy_provider/plain_provider.rs index 6288872dc..1e37524f5 100644 --- a/clash_lib/src/app/proxy_manager/providers/plain_provider.rs +++ b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/plain_provider.rs @@ -5,9 +5,16 @@ use erased_serde::Serialize; use tokio::sync::Mutex; use tracing::debug; -use crate::{app::proxy_manager::healthcheck::HealthCheck, proxy::AnyOutboundHandler, Error}; - -use super::{proxy_provider::ProxyProvider, Provider, ProviderType, ProviderVehicleType}; +use crate::{ + app::remote_content_manager::{ + healthcheck::HealthCheck, + providers::{Provider, ProviderType, ProviderVehicleType}, + }, + proxy::AnyOutboundHandler, + Error, +}; + +use super::proxy_provider::ProxyProvider; struct Inner { hc: Arc, diff --git a/clash_lib/src/app/proxy_manager/providers/proxy_provider.rs b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_provider.rs similarity index 81% rename from clash_lib/src/app/proxy_manager/providers/proxy_provider.rs rename to clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_provider.rs index 0e5fb45ba..8972c0bd0 100644 --- a/clash_lib/src/app/proxy_manager/providers/proxy_provider.rs +++ b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_provider.rs @@ -3,9 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; use tokio::sync::RwLock; -use crate::proxy::AnyOutboundHandler; - -use super::Provider; +use crate::{app::remote_content_manager::providers::Provider, proxy::AnyOutboundHandler}; pub type ThreadSafeProxyProvider = Arc>; diff --git a/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs similarity index 95% rename from clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs rename to clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs index 05550f7a0..fdf336149 100644 --- a/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs +++ b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs @@ -7,12 +7,13 @@ use serde::{Deserialize, Serialize}; use serde_yaml::Value; use tracing::debug; -use super::{ - fetcher::Fetcher, proxy_provider::ProxyProvider, Provider, ProviderType, ProviderVehicleType, - ThreadSafeProviderVehicle, -}; +use super::proxy_provider::ProxyProvider; use crate::{ - app::proxy_manager::healthcheck::HealthCheck, + app::remote_content_manager::{ + healthcheck::HealthCheck, + providers::{fetcher::Fetcher, ThreadSafeProviderVehicle}, + providers::{Provider, ProviderType, ProviderVehicleType}, + }, common::errors::map_io_error, config::internal::proxy::OutboundProxyProtocol, proxy::{direct, reject, AnyOutboundHandler}, @@ -204,10 +205,10 @@ mod tests { use crate::app::{ dns::MockClashResolver, - proxy_manager::{ + remote_content_manager::{ healthcheck::HealthCheck, providers::{ - proxy_provider::ProxyProvider, proxy_set_provider::ProxySetProvider, + proxy_provider::{proxy_set_provider::ProxySetProvider, ProxyProvider}, MockProviderVehicle, Provider, ProviderVehicleType, }, ProxyManager, diff --git a/clash_lib/src/app/remote_content_manager/providers/rule_provider/cidr_trie.rs b/clash_lib/src/app/remote_content_manager/providers/rule_provider/cidr_trie.rs new file mode 100644 index 000000000..329640fbe --- /dev/null +++ b/clash_lib/src/app/remote_content_manager/providers/rule_provider/cidr_trie.rs @@ -0,0 +1,34 @@ +use std::net::{Ipv4Addr, Ipv6Addr}; + +use ip_network_table_deps_treebitmap::IpLookupTable; + +pub struct CidrTrie { + v4: IpLookupTable, + v6: IpLookupTable, +} + +impl CidrTrie { + pub fn new() -> Self { + Self { + v4: IpLookupTable::new(), + v6: IpLookupTable::new(), + } + } + + pub fn insert(&mut self, cidr: &str) -> bool { + if let Ok(cidr) = cidr.parse::() { + match cidr { + ipnet::IpNet::V4(v4) => { + self.v4.insert(v4.addr(), v4.prefix_len() as _, true); + true + } + ipnet::IpNet::V6(v6) => { + self.v6.insert(v6.addr(), v6.prefix_len() as _, true); + true + } + } + } else { + false + } + } +} diff --git a/clash_lib/src/app/remote_content_manager/providers/rule_provider/mod.rs b/clash_lib/src/app/remote_content_manager/providers/rule_provider/mod.rs new file mode 100644 index 000000000..3401df113 --- /dev/null +++ b/clash_lib/src/app/remote_content_manager/providers/rule_provider/mod.rs @@ -0,0 +1,2 @@ +mod cidr_trie; +mod rule_provider; diff --git a/clash_lib/src/app/remote_content_manager/providers/rule_provider/rule_provider.rs b/clash_lib/src/app/remote_content_manager/providers/rule_provider/rule_provider.rs new file mode 100644 index 000000000..541c5f15b --- /dev/null +++ b/clash_lib/src/app/remote_content_manager/providers/rule_provider/rule_provider.rs @@ -0,0 +1,135 @@ +use std::{sync::Arc, time::Duration}; + +use async_trait::async_trait; +use futures::future::BoxFuture; +use serde::{Deserialize, Serialize}; +use tracing::trace; + +use crate::{ + app::{ + remote_content_manager::providers::{ + fetcher::Fetcher, Provider, ThreadSafeProviderVehicle, + }, + router::RuleMatcher, + }, + common::trie, + config::internal::rule::RuleType, + session::Session, + Error, +}; + +use super::cidr_trie::CidrTrie; + +#[derive(Serialize, Deserialize, Debug, Clone)] +struct ProviderScheme { + pub payload: Vec, +} + +pub enum RuleSetBehavior { + Domain, + IPCIDR, + Classical, +} + +enum RuleContent { + Domain(trie::StringTrie), + IPCIDR(CidrTrie), + Classical(Vec>), +} + +struct Inner { + content: RuleContent, + count: usize, +} + +#[async_trait] +pub trait RuleProvider: Provider { + async fn rules(&self) -> Vec; + async fn search(&self, sess: &Session) -> bool; + async fn rule_count(&self) -> usize; + fn behavior(&self) -> RuleSetBehavior; +} + +pub struct RuleProviderImpl { + fetcher: Fetcher< + Box BoxFuture<'static, ()> + Send + Sync + 'static>, + Box anyhow::Result + Send + Sync + 'static>, + >, + inner: std::sync::Arc>, + behavior: RuleSetBehavior, +} + +impl RuleProviderImpl { + pub fn new( + name: String, + behovior: RuleSetBehavior, + interval: Duration, + vehicle: ThreadSafeProviderVehicle, + ) -> Self { + let inner = Arc::new(tokio::sync::RwLock::new(Inner { + content: match behovior { + RuleSetBehavior::Domain => RuleContent::Domain(trie::StringTrie::new()), + RuleSetBehavior::IPCIDR => RuleContent::IPCIDR(CidrTrie::new()), + RuleSetBehavior::Classical => RuleContent::Classical(vec![]), + }, + count: 0, + })); + + let inner_clone = inner.clone(); + + let n = name.clone(); + let updater: Box BoxFuture<'static, ()> + Send + Sync + 'static> = + Box::new(move |input: RuleContent| -> BoxFuture<'static, ()> { + let n = n.clone(); + let inner: Arc> = inner_clone.clone(); + Box::pin(async move { + let mut inner = inner.write().await; + trace!("updated rules for: {}", n); + inner.content = input; + }) + }); + + let n = name.clone(); + let parser: Box anyhow::Result + Send + Sync + 'static> = + Box::new(move |input: &[u8]| -> anyhow::Result { + let scheme: ProviderScheme = serde_yaml::from_slice(input).map_err(|x| { + Error::InvalidConfig(format!("proxy provider parse error {}: {}", n, x)) + })?; + let rules = make_rules(behovior, scheme.payload)?; + Ok(rules) + }); + } +} + +fn make_rules(behavior: RuleSetBehavior, rules: Vec) -> Result { + match behavior { + RuleSetBehavior::Domain => todo!(), + RuleSetBehavior::IPCIDR => todo!(), + RuleSetBehavior::Classical => todo!(), + } +} + +fn make_domain_rules(rules: Vec) -> Result, Error> { + let mut trie = trie::StringTrie::new(); + for rule in rules { + trie.insert(&rule, Arc::new(true)); + } + Ok(trie) +} + +fn make_ip_cidr_rules(rules: Vec) -> Result { + let mut trie = CidrTrie::new(); + for rule in rules { + trie.insert(&rule); + } + Ok(trie) +} + +fn make_classical_rules(rules: Vec) -> Result>, Error> { + let mut rv = vec![]; + for rule in rules { + let rule_type = rule.parse::()?; + rv.push(matcher); + } + Ok(rv) +} diff --git a/clash_lib/src/app/router/mod.rs b/clash_lib/src/app/router/mod.rs index 61e68970b..a758a8f37 100644 --- a/clash_lib/src/app/router/mod.rs +++ b/clash_lib/src/app/router/mod.rs @@ -13,6 +13,8 @@ use std::sync::Arc; use tracing::info; +use self::mmdb::MMDB; + use super::dns::ThreadSafeDNSResolver; mod mmdb; @@ -45,72 +47,7 @@ impl Router { Self { rules: rules .into_iter() - .map(|r| match r { - RuleType::Domain { domain, target } => { - Box::new(Domain { domain, target }) as Box - } - RuleType::DomainSuffix { - domain_suffix, - target, - } => Box::new(DomainSuffix { - suffix: domain_suffix, - target, - }), - RuleType::DomainKeyword { - domain_keyword, - target, - } => Box::new(DomainKeyword { - keyword: domain_keyword, - target, - }), - RuleType::IPCIDR { - ipnet, - target, - no_resolve, - } => Box::new(IPCIDR { - ipnet, - target, - no_resolve, - match_src: false, - }), - RuleType::SRCIPCIDR { - ipnet, - target, - no_resolve, - } => Box::new(IPCIDR { - ipnet, - target, - no_resolve, - match_src: true, - }), - - RuleType::GeoIP { - target, - country_code, - no_resolve, - } => Box::new(rules::geoip::GeoIP { - target, - country_code, - no_resolve, - mmdb: mmdb.clone(), - }), - RuleType::SRCPort { target, port } => Box::new(rules::port::Port { - port, - target, - is_src: true, - }), - RuleType::DSTPort { target, port } => Box::new(rules::port::Port { - port, - target, - is_src: false, - }), - RuleType::ProcessName => todo!(), - RuleType::ProcessPath => todo!(), - RuleType::RuleSet { rule_set, target } => { - Box::new(RuleSet { rule_set, target }) - } - RuleType::Match { target } => Box::new(Final { target }), - }) + .map(|r| map_rule_type(r, mmdb.clone())) .collect(), dns_resolver, } @@ -151,3 +88,70 @@ impl Router { &self.rules } } + +fn map_rule_type(rule_type: RuleType, mmdb: Arc) -> Box { + match rule_type { + RuleType::Domain { domain, target } => { + Box::new(Domain { domain, target }) as Box + } + RuleType::DomainSuffix { + domain_suffix, + target, + } => Box::new(DomainSuffix { + suffix: domain_suffix, + target, + }), + RuleType::DomainKeyword { + domain_keyword, + target, + } => Box::new(DomainKeyword { + keyword: domain_keyword, + target, + }), + RuleType::IPCIDR { + ipnet, + target, + no_resolve, + } => Box::new(IPCIDR { + ipnet, + target, + no_resolve, + match_src: false, + }), + RuleType::SRCIPCIDR { + ipnet, + target, + no_resolve, + } => Box::new(IPCIDR { + ipnet, + target, + no_resolve, + match_src: true, + }), + + RuleType::GeoIP { + target, + country_code, + no_resolve, + } => Box::new(rules::geoip::GeoIP { + target, + country_code, + no_resolve, + mmdb: mmdb.clone(), + }), + RuleType::SRCPort { target, port } => Box::new(rules::port::Port { + port, + target, + is_src: true, + }), + RuleType::DSTPort { target, port } => Box::new(rules::port::Port { + port, + target, + is_src: false, + }), + RuleType::ProcessName => todo!(), + RuleType::ProcessPath => todo!(), + RuleType::RuleSet { rule_set, target } => Box::new(RuleSet { rule_set, target }), + RuleType::Match { target } => Box::new(Final { target }), + } +} diff --git a/clash_lib/src/common/trie.rs b/clash_lib/src/common/trie.rs index 3a3b8987c..6327ed72c 100644 --- a/clash_lib/src/common/trie.rs +++ b/clash_lib/src/common/trie.rs @@ -16,6 +16,7 @@ pub struct StringTrie { #[derive(Clone)] pub struct Node { children: HashMap>, + // TODO: maybe we only need RefCell here data: Option>, } diff --git a/clash_lib/src/proxy/fallback/mod.rs b/clash_lib/src/proxy/fallback/mod.rs index 54ba35be4..e2fac8053 100644 --- a/clash_lib/src/proxy/fallback/mod.rs +++ b/clash_lib/src/proxy/fallback/mod.rs @@ -7,7 +7,9 @@ use crate::{ app::{ dispatcher::BoxedChainedStream, dns::ThreadSafeDNSResolver, - proxy_manager::{providers::proxy_provider::ThreadSafeProxyProvider, ProxyManager}, + remote_content_manager::{ + providers::proxy_provider::proxy_provider::ThreadSafeProxyProvider, ProxyManager, + }, }, session::{Session, SocksAddr}, }; diff --git a/clash_lib/src/proxy/loadbalance/mod.rs b/clash_lib/src/proxy/loadbalance/mod.rs index 7765b5d69..fb606b986 100644 --- a/clash_lib/src/proxy/loadbalance/mod.rs +++ b/clash_lib/src/proxy/loadbalance/mod.rs @@ -9,7 +9,7 @@ use tracing::debug; use crate::{ app::{ dispatcher::BoxedChainedStream, dns::ThreadSafeDNSResolver, - proxy_manager::providers::proxy_provider::ThreadSafeProxyProvider, + remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider, }, config::internal::proxy::LoadBalanceStrategy, session::{Session, SocksAddr}, diff --git a/clash_lib/src/proxy/mocks.rs b/clash_lib/src/proxy/mocks.rs index dafc76bad..33a61a64f 100644 --- a/clash_lib/src/proxy/mocks.rs +++ b/clash_lib/src/proxy/mocks.rs @@ -7,7 +7,7 @@ use crate::{ app::{ dispatcher::BoxedChainedStream, dns::ThreadSafeDNSResolver, - proxy_manager::providers::{ + remote_content_manager::providers::{ proxy_provider::ProxyProvider, Provider, ProviderType, ProviderVehicleType, }, }, diff --git a/clash_lib/src/proxy/relay/mod.rs b/clash_lib/src/proxy/relay/mod.rs index 708dd0240..25ad551d0 100644 --- a/clash_lib/src/proxy/relay/mod.rs +++ b/clash_lib/src/proxy/relay/mod.rs @@ -8,7 +8,7 @@ use crate::{ app::{ dispatcher::{BoxedChainedStream, ChainedStream, ChainedStreamWrapper}, dns::ThreadSafeDNSResolver, - proxy_manager::providers::proxy_provider::ThreadSafeProxyProvider, + remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider, }, common::errors::new_io_error, proxy::utils::new_tcp_stream, diff --git a/clash_lib/src/proxy/selector/mod.rs b/clash_lib/src/proxy/selector/mod.rs index b5db6a8af..7c862b0e9 100644 --- a/clash_lib/src/proxy/selector/mod.rs +++ b/clash_lib/src/proxy/selector/mod.rs @@ -8,7 +8,7 @@ use tracing::debug; use crate::{ app::{ dispatcher::BoxedChainedStream, dns::ThreadSafeDNSResolver, - proxy_manager::providers::proxy_provider::ThreadSafeProxyProvider, + remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider, }, p_debug, session::{Session, SocksAddr}, diff --git a/clash_lib/src/proxy/urltest/mod.rs b/clash_lib/src/proxy/urltest/mod.rs index 14d9e48ed..054d48a76 100644 --- a/clash_lib/src/proxy/urltest/mod.rs +++ b/clash_lib/src/proxy/urltest/mod.rs @@ -8,7 +8,9 @@ use crate::{ app::{ dispatcher::BoxedChainedStream, dns::ThreadSafeDNSResolver, - proxy_manager::{providers::proxy_provider::ThreadSafeProxyProvider, ProxyManager}, + remote_content_manager::{ + providers::proxy_provider::ThreadSafeProxyProvider, ProxyManager, + }, }, p_debug, session::{Session, SocksAddr}, diff --git a/clash_lib/src/proxy/utils/provider_helper.rs b/clash_lib/src/proxy/utils/provider_helper.rs index ca23612f3..7cc938f99 100644 --- a/clash_lib/src/proxy/utils/provider_helper.rs +++ b/clash_lib/src/proxy/utils/provider_helper.rs @@ -1,5 +1,5 @@ use crate::{ - app::proxy_manager::providers::proxy_provider::ThreadSafeProxyProvider, + app::remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider, proxy::AnyOutboundHandler, }; From b83d117b5dbfa1ad4dadf0409b28f8131c352b9a Mon Sep 17 00:00:00 2001 From: dev0 Date: Thu, 14 Sep 2023 04:26:22 +1000 Subject: [PATCH 2/3] rebase --- clash_lib/src/app/outbound/manager.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/clash_lib/src/app/outbound/manager.rs b/clash_lib/src/app/outbound/manager.rs index ac157a013..1305ccf8d 100644 --- a/clash_lib/src/app/outbound/manager.rs +++ b/clash_lib/src/app/outbound/manager.rs @@ -9,6 +9,7 @@ use tokio::sync::{Mutex, RwLock}; use tracing::info; use crate::app::dns::ThreadSafeDNSResolver; +use crate::app::profile::ThreadSafeCacheFile; use crate::app::remote_content_manager::healthcheck::HealthCheck; use crate::app::remote_content_manager::providers::file_vehicle; use crate::app::remote_content_manager::providers::http_vehicle; From 62251dfab0215b9ebb17b0fb4816f4f914231af3 Mon Sep 17 00:00:00 2001 From: dev0 Date: Thu, 14 Sep 2023 23:26:04 +1000 Subject: [PATCH 3/3] rule-set --- clash/tests/data/config/rule-set.yaml | 2 + clash/tests/data/config/rules.yaml | 9 +- clash_lib/src/app/dispatcher/dispatcher.rs | 4 +- clash_lib/src/app/outbound/manager.rs | 23 ++- .../providers/fetcher.rs | 47 +++--- .../remote_content_manager/providers/mod.rs | 2 +- .../proxy_provider/plain_provider.rs | 2 +- .../proxy_provider/proxy_set_provider.rs | 2 +- .../providers/rule_provider/cidr_trie.rs | 9 +- .../providers/rule_provider/mod.rs | 3 + .../providers/rule_provider/rule_provider.rs | 157 ++++++++++++++++-- clash_lib/src/app/router/mod.rs | 130 ++++++++++++++- clash_lib/src/app/router/rules/process.rs | 26 +++ clash_lib/src/app/router/rules/ruleset.rs | 16 +- clash_lib/src/config/def.rs | 3 + clash_lib/src/config/internal/config.rs | 71 +++++++- clash_lib/src/config/internal/proxy.rs | 8 +- clash_lib/src/config/internal/rule.rs | 40 ++++- clash_lib/src/lib.rs | 1 + clash_lib/src/proxy/mocks.rs | 2 +- 20 files changed, 471 insertions(+), 86 deletions(-) create mode 100644 clash/tests/data/config/rule-set.yaml diff --git a/clash/tests/data/config/rule-set.yaml b/clash/tests/data/config/rule-set.yaml new file mode 100644 index 000000000..1d2ecf1d6 --- /dev/null +++ b/clash/tests/data/config/rule-set.yaml @@ -0,0 +1,2 @@ +payload: + - 'httpbin.yba.dev' \ No newline at end of file diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index 09755d6bd..12e12187a 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -195,9 +195,16 @@ proxy-providers: url: http://www.gstatic.com/generate_204 interval: 300 +rule-providers: + file-provider: + type: file + path: ./rule-set.yaml + interval: 300 + behavior: domain + rules: - DOMAIN,ipinfo.io,relay - - DOMAIN-KEYWORD,httpbin,h2-vmess + - RULE-SET,file-provider,h2-vmess - GEOIP,CN,relay - DOMAIN-SUFFIX,facebook.com,REJECT - DOMAIN-KEYWORD,google,select diff --git a/clash_lib/src/app/dispatcher/dispatcher.rs b/clash_lib/src/app/dispatcher/dispatcher.rs index 1e783b2bc..555c3d290 100644 --- a/clash_lib/src/app/dispatcher/dispatcher.rs +++ b/clash_lib/src/app/dispatcher/dispatcher.rs @@ -104,7 +104,7 @@ impl Dispatcher { }; let mode = self.mode.lock().await; - info!("dispatching {} with mode {}", sess, mode); + debug!("dispatching {} with mode {}", sess, mode); let (outbound_name, rule) = match *mode { RunMode::Global => (PROXY_GLOBAL, None), RunMode::Rule => self.router.match_route(&sess).await, @@ -120,7 +120,7 @@ impl Dispatcher { match handler.connect_stream(&sess, self.resolver.clone()).await { Ok(rhs) => { - info!("remote connection established {}", sess); + debug!("remote connection established {}", sess); let mut rhs = Box::new( TrackedStream::new(rhs, self.manager.clone(), sess.clone(), rule).await, ); diff --git a/clash_lib/src/app/outbound/manager.rs b/clash_lib/src/app/outbound/manager.rs index 1305ccf8d..7f0096cfb 100644 --- a/clash_lib/src/app/outbound/manager.rs +++ b/clash_lib/src/app/outbound/manager.rs @@ -5,6 +5,8 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock}; +use tracing::error; +use tracing::warn; use tracing::info; @@ -19,7 +21,7 @@ use crate::app::remote_content_manager::providers::proxy_provider::PlainProvider use crate::app::remote_content_manager::providers::proxy_provider::ProxySetProvider; use crate::app::remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider; use crate::config::internal::proxy::PROXY_GLOBAL; -use crate::config::internal::proxy::{OutboundProxyProvider, PROXY_DIRECT, PROXY_REJECT}; +use crate::config::internal::proxy::{OutboundProxyProviderDef, PROXY_DIRECT, PROXY_REJECT}; use crate::proxy::fallback; use crate::proxy::loadbalance; use crate::proxy::selector; @@ -52,7 +54,7 @@ impl OutboundManager { pub async fn new( outbounds: Vec, outbound_groups: Vec, - proxy_providers: HashMap, + proxy_providers: HashMap, proxy_names: Vec, dns_resolver: ThreadSafeDNSResolver, cache_store: ThreadSafeCacheFile, @@ -548,14 +550,14 @@ impl OutboundManager { } async fn load_proxy_providers( - proxy_providers: HashMap, + proxy_providers: HashMap, proxy_manager: ProxyManager, resolver: ThreadSafeDNSResolver, provider_registry: &mut HashMap, ) -> Result<(), Error> { for (name, provider) in proxy_providers.into_iter() { match provider { - OutboundProxyProvider::Http(http) => { + OutboundProxyProviderDef::Http(http) => { let vehicle = http_vehicle::Vehicle::new( http.url .parse::() @@ -581,7 +583,7 @@ impl OutboundManager { provider_registry.insert(name, Arc::new(RwLock::new(provider))); } - OutboundProxyProvider::File(file) => { + OutboundProxyProviderDef::File(file) => { let vehicle = file_vehicle::Vehicle::new(&file.path); let hc = HealthCheck::new( vec![], @@ -607,7 +609,16 @@ impl OutboundManager { for p in provider_registry.values() { info!("initializing provider {}", p.read().await.name()); - p.write().await.initialize().await?; + match p.write().await.initialize().await { + Ok(_) => {} + Err(err) => { + error!( + "failed to initialize proxy provider {}: {}", + p.read().await.name(), + err + ); + } + } } Ok(()) diff --git a/clash_lib/src/app/remote_content_manager/providers/fetcher.rs b/clash_lib/src/app/remote_content_manager/providers/fetcher.rs index e8ce3bb32..12ef5ee78 100644 --- a/clash_lib/src/app/remote_content_manager/providers/fetcher.rs +++ b/clash_lib/src/app/remote_content_manager/providers/fetcher.rs @@ -7,10 +7,8 @@ use std::{ use chrono::{DateTime, Utc}; use futures::future::BoxFuture; -use tokio::{ - sync::{Mutex, RwLock}, - time::Instant, -}; +use serde::de; +use tokio::sync::{Mutex, RwLock}; use tracing::{debug, info, trace, warn}; use crate::common::utils; @@ -20,14 +18,15 @@ use super::{ProviderVehicleType, ThreadSafeProviderVehicle}; struct Inner { updated_at: SystemTime, hash: [u8; 16], + + thread_handle: Option>, } pub struct Fetcher { name: String, interval: Duration, vehicle: ThreadSafeProviderVehicle, - thread_handle: Option>, - ticker: Option, + ticker_interval: Duration, inner: std::sync::Arc>, parser: Arc>, pub on_update: Option>>, @@ -50,17 +49,11 @@ where name, interval, vehicle, - thread_handle: None, - ticker: match interval.as_secs() { - 0 => None, - _ => Some(tokio::time::interval_at( - Instant::now() + interval, - interval, - )), - }, + ticker_interval: interval, inner: Arc::new(tokio::sync::RwLock::new(Inner { updated_at: SystemTime::UNIX_EPOCH, hash: [0; 16], + thread_handle: None, })), parser: Arc::new(Mutex::new(parser)), on_update: on_update.map(|f| Arc::new(Mutex::new(f))), @@ -78,7 +71,7 @@ where self.inner.read().await.updated_at.into() } - pub async fn initial(&mut self) -> anyhow::Result { + pub async fn initial(&self) -> anyhow::Result { let mut is_local = false; let mut immediately_update = false; @@ -100,14 +93,16 @@ where Err(_) => self.vehicle.read().await?, }; - let proxies = match (self.parser.lock().await)(&content) { + let parser_guard = self.parser.lock().await; + + let proxies = match (parser_guard)(&content) { Ok(proxies) => proxies, Err(e) => { if !is_local { return Err(e); } let content = self.vehicle.read().await?; - (self.parser.lock().await)(&content)? + (parser_guard)(&content)? } }; @@ -127,8 +122,12 @@ where drop(inner); - if let Some(ticker) = self.ticker.take() { - self.pull_loop(immediately_update, ticker); + if !self.ticker_interval.is_zero() { + self.pull_loop( + immediately_update, + tokio::time::interval(self.ticker_interval), + ) + .await; } Ok(proxies) @@ -180,13 +179,13 @@ where Ok((proxies, false)) } - pub fn destroy(&mut self) { - if let Some(handle) = self.thread_handle.take() { + pub async fn destroy(&mut self) { + if let Some(handle) = self.inner.write().await.thread_handle.take() { handle.abort(); } } - fn pull_loop(&mut self, immediately_update: bool, mut ticker: tokio::time::Interval) { + async fn pull_loop(&self, immediately_update: bool, mut ticker: tokio::time::Interval) { let inner = self.inner.clone(); let vehicle = self.vehicle.clone(); let parser = self.parser.clone(); @@ -194,7 +193,7 @@ where let name = self.name.clone(); let fire_immediately = immediately_update; - self.thread_handle = Some(tokio::spawn(async move { + let thread_handle = Some(tokio::spawn(async move { debug!("fetcher {} started", &name); loop { let inner = inner.clone(); @@ -232,6 +231,8 @@ where } } })); + + self.inner.write().await.thread_handle = thread_handle; } } diff --git a/clash_lib/src/app/remote_content_manager/providers/mod.rs b/clash_lib/src/app/remote_content_manager/providers/mod.rs index 2e4db4eda..c2535b6ea 100644 --- a/clash_lib/src/app/remote_content_manager/providers/mod.rs +++ b/clash_lib/src/app/remote_content_manager/providers/mod.rs @@ -61,7 +61,7 @@ pub trait Provider { fn name(&self) -> &str; fn vehicle_type(&self) -> ProviderVehicleType; fn typ(&self) -> ProviderType; - async fn initialize(&mut self) -> io::Result<()>; + async fn initialize(&self) -> io::Result<()>; async fn update(&self) -> io::Result<()>; async fn as_map(&self) -> HashMap>; diff --git a/clash_lib/src/app/remote_content_manager/providers/proxy_provider/plain_provider.rs b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/plain_provider.rs index 1e37524f5..701dfcd96 100644 --- a/clash_lib/src/app/remote_content_manager/providers/proxy_provider/plain_provider.rs +++ b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/plain_provider.rs @@ -65,7 +65,7 @@ impl Provider for PlainProvider { fn typ(&self) -> ProviderType { ProviderType::Proxy } - async fn initialize(&mut self) -> std::io::Result<()> { + async fn initialize(&self) -> std::io::Result<()> { Ok(()) } async fn update(&self) -> std::io::Result<()> { diff --git a/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs index fdf336149..50d546ae1 100644 --- a/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs +++ b/clash_lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs @@ -132,7 +132,7 @@ impl Provider for ProxySetProvider { ProviderType::Proxy } - async fn initialize(&mut self) -> std::io::Result<()> { + async fn initialize(&self) -> std::io::Result<()> { let ele = self.fetcher.initial().await.map_err(map_io_error)?; debug!("{} initialized with {} proxies", self.name(), ele.len()); if let Some(updater) = self.fetcher.on_update.as_ref() { diff --git a/clash_lib/src/app/remote_content_manager/providers/rule_provider/cidr_trie.rs b/clash_lib/src/app/remote_content_manager/providers/rule_provider/cidr_trie.rs index 329640fbe..137440674 100644 --- a/clash_lib/src/app/remote_content_manager/providers/rule_provider/cidr_trie.rs +++ b/clash_lib/src/app/remote_content_manager/providers/rule_provider/cidr_trie.rs @@ -1,4 +1,4 @@ -use std::net::{Ipv4Addr, Ipv6Addr}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use ip_network_table_deps_treebitmap::IpLookupTable; @@ -31,4 +31,11 @@ impl CidrTrie { false } } + + pub fn contains(&self, ip: IpAddr) -> bool { + match ip { + IpAddr::V4(v4) => self.v4.longest_match(v4).is_some(), + IpAddr::V6(v6) => self.v6.longest_match(v6).is_some(), + } + } } diff --git a/clash_lib/src/app/remote_content_manager/providers/rule_provider/mod.rs b/clash_lib/src/app/remote_content_manager/providers/rule_provider/mod.rs index 3401df113..97dd8af3c 100644 --- a/clash_lib/src/app/remote_content_manager/providers/rule_provider/mod.rs +++ b/clash_lib/src/app/remote_content_manager/providers/rule_provider/mod.rs @@ -1,2 +1,5 @@ mod cidr_trie; mod rule_provider; + +pub use rule_provider::ThreadSafeRuleProvider; +pub use rule_provider::{RuleProvider, RuleProviderImpl, RuleSetBehavior}; diff --git a/clash_lib/src/app/remote_content_manager/providers/rule_provider/rule_provider.rs b/clash_lib/src/app/remote_content_manager/providers/rule_provider/rule_provider.rs index 541c5f15b..33247b905 100644 --- a/clash_lib/src/app/remote_content_manager/providers/rule_provider/rule_provider.rs +++ b/clash_lib/src/app/remote_content_manager/providers/rule_provider/rule_provider.rs @@ -1,18 +1,27 @@ -use std::{sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + fmt::Display, + net::{IpAddr, Ipv4Addr}, + sync::Arc, + time::Duration, +}; use async_trait::async_trait; +use erased_serde::Serialize as ESerialize; use futures::future::BoxFuture; use serde::{Deserialize, Serialize}; -use tracing::trace; +use tracing::{debug, trace}; +use tracing_subscriber::field::debug; use crate::{ app::{ remote_content_manager::providers::{ - fetcher::Fetcher, Provider, ThreadSafeProviderVehicle, + fetcher::Fetcher, Provider, ProviderType, ProviderVehicleType, + ThreadSafeProviderVehicle, }, - router::RuleMatcher, + router::{map_rule_type, RuleMatcher, MMDB}, }, - common::trie, + common::{errors::map_io_error, trie}, config::internal::rule::RuleType, session::Session, Error, @@ -25,12 +34,24 @@ struct ProviderScheme { pub payload: Vec, } +#[derive(Deserialize, Serialize, Debug, Clone, Copy)] +#[serde(rename_all = "lowercase")] pub enum RuleSetBehavior { Domain, IPCIDR, Classical, } +impl Display for RuleSetBehavior { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RuleSetBehavior::Domain => write!(f, "Domain"), + RuleSetBehavior::IPCIDR => write!(f, "IPCIDR"), + RuleSetBehavior::Classical => write!(f, "Classical"), + } + } +} + enum RuleContent { Domain(trie::StringTrie), IPCIDR(CidrTrie), @@ -39,17 +60,15 @@ enum RuleContent { struct Inner { content: RuleContent, - count: usize, } -#[async_trait] pub trait RuleProvider: Provider { - async fn rules(&self) -> Vec; - async fn search(&self, sess: &Session) -> bool; - async fn rule_count(&self) -> usize; + fn search(&self, sess: &Session) -> bool; fn behavior(&self) -> RuleSetBehavior; } +pub type ThreadSafeRuleProvider = Arc; + pub struct RuleProviderImpl { fetcher: Fetcher< Box BoxFuture<'static, ()> + Send + Sync + 'static>, @@ -65,6 +84,7 @@ impl RuleProviderImpl { behovior: RuleSetBehavior, interval: Duration, vehicle: ThreadSafeProviderVehicle, + mmdb: Arc, ) -> Self { let inner = Arc::new(tokio::sync::RwLock::new(Inner { content: match behovior { @@ -72,7 +92,6 @@ impl RuleProviderImpl { RuleSetBehavior::IPCIDR => RuleContent::IPCIDR(CidrTrie::new()), RuleSetBehavior::Classical => RuleContent::Classical(vec![]), }, - count: 0, })); let inner_clone = inner.clone(); @@ -95,17 +114,115 @@ impl RuleProviderImpl { let scheme: ProviderScheme = serde_yaml::from_slice(input).map_err(|x| { Error::InvalidConfig(format!("proxy provider parse error {}: {}", n, x)) })?; - let rules = make_rules(behovior, scheme.payload)?; + let rules = make_rules(behovior, scheme.payload, mmdb.clone())?; Ok(rules) }); + + let fetcher = Fetcher::new(name, interval, vehicle, parser, Some(updater)); + + Self { + fetcher, + inner, + behavior: behovior, + } + } +} + +#[async_trait] +impl RuleProvider for RuleProviderImpl { + fn search(&self, sess: &Session) -> bool { + let inner = self.inner.try_read(); + + match inner { + Ok(inner) => match &inner.content { + RuleContent::Domain(trie) => trie.search(&sess.destination.host()).is_some(), + RuleContent::IPCIDR(trie) => trie.contains( + sess.destination + .ip() + .unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))), + ), + RuleContent::Classical(rules) => { + for rule in rules.iter() { + if rule.apply(sess) { + return true; + } + } + false + } + }, + Err(_) => { + debug!("rule provider {} is busy", self.name()); + false + } + } + } + fn behavior(&self) -> RuleSetBehavior { + self.behavior } } -fn make_rules(behavior: RuleSetBehavior, rules: Vec) -> Result { +#[async_trait] +impl Provider for RuleProviderImpl { + fn name(&self) -> &str { + self.fetcher.name() + } + fn vehicle_type(&self) -> ProviderVehicleType { + self.fetcher.vehicle_type() + } + fn typ(&self) -> ProviderType { + ProviderType::Rule + } + async fn initialize(&self) -> std::io::Result<()> { + let ele = self.fetcher.initial().await.map_err(map_io_error)?; + debug!("initializing rule provider {}", self.name()); + if let Some(updater) = self.fetcher.on_update.as_ref() { + updater.lock().await(ele).await; + } + Ok(()) + } + async fn update(&self) -> std::io::Result<()> { + let (ele, same) = self.fetcher.update().await.map_err(map_io_error)?; + debug!("rule provider {} updated. same? {}", self.name(), same); + if !same { + if let Some(updater) = self.fetcher.on_update.as_ref() { + updater.lock().await(ele); + } + } + Ok(()) + } + + async fn as_map(&self) -> HashMap> { + let mut m: HashMap> = HashMap::new(); + + m.insert("name".to_owned(), Box::new(self.name().to_string())); + m.insert("type".to_owned(), Box::new(self.typ().to_string())); + m.insert( + "vehicleType".to_owned(), + Box::new(self.vehicle_type().to_string()), + ); + + m.insert( + "updatedAt".to_owned(), + Box::new(self.fetcher.updated_at().await), + ); + + m.insert("behavior".to_owned(), Box::new(self.behavior().to_string())); + + m + } +} + +fn make_rules( + behavior: RuleSetBehavior, + rules: Vec, + mmdb: Arc, +) -> Result { match behavior { - RuleSetBehavior::Domain => todo!(), - RuleSetBehavior::IPCIDR => todo!(), - RuleSetBehavior::Classical => todo!(), + RuleSetBehavior::Domain => Ok(RuleContent::Domain(make_domain_rules(rules)?)), + RuleSetBehavior::IPCIDR => Ok(RuleContent::IPCIDR(make_ip_cidr_rules(rules)?)), + RuleSetBehavior::Classical => { + Ok(RuleContent::Classical(make_classical_rules(rules, mmdb)?)) + } } } @@ -125,11 +242,15 @@ fn make_ip_cidr_rules(rules: Vec) -> Result { Ok(trie) } -fn make_classical_rules(rules: Vec) -> Result>, Error> { +fn make_classical_rules( + rules: Vec, + mmdb: Arc, +) -> Result>, Error> { let mut rv = vec![]; for rule in rules { let rule_type = rule.parse::()?; - rv.push(matcher); + let rule_matcher = map_rule_type(rule_type, mmdb.clone(), None); + rv.push(rule_matcher); } Ok(rv) } diff --git a/clash_lib/src/app/router/mod.rs b/clash_lib/src/app/router/mod.rs index a758a8f37..d9d297411 100644 --- a/clash_lib/src/app/router/mod.rs +++ b/clash_lib/src/app/router/mod.rs @@ -3,19 +3,28 @@ use crate::app::router::rules::domain_keyword::DomainKeyword; use crate::app::router::rules::domain_suffix::DomainSuffix; use crate::app::router::rules::ipcidr::IPCIDR; use crate::app::router::rules::ruleset::RuleSet; +use crate::Error; use crate::common::http::new_http_client; +use crate::config::internal::config::RuleProviderDef; use crate::config::internal::rule::RuleType; use crate::session::{Session, SocksAddr}; use crate::app::router::rules::final_::Final; +use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; -use tracing::info; +use http::Uri; +use tracing::{error, info, warn}; -use self::mmdb::MMDB; +pub use self::mmdb::MMDB; use super::dns::ThreadSafeDNSResolver; +use super::remote_content_manager::providers::rule_provider::{ + RuleProviderImpl, ThreadSafeRuleProvider, +}; +use super::remote_content_manager::providers::{file_vehicle, http_vehicle}; mod mmdb; mod rules; @@ -23,6 +32,7 @@ pub use rules::RuleMatcher; pub struct Router { rules: Vec>, + rule_provider_registry: HashMap, dns_resolver: ThreadSafeDNSResolver, } @@ -33,6 +43,7 @@ const MATCH: &str = "MATCH"; impl Router { pub async fn new( rules: Vec, + rule_providers: HashMap, dns_resolver: ThreadSafeDNSResolver, mmdb_path: String, mmdb_download_url: Option, @@ -44,12 +55,24 @@ impl Router { .expect("failed to load mmdb"), ); + let mut rule_provider_registry = HashMap::new(); + + Self::load_rule_providers( + rule_providers, + &mut rule_provider_registry, + dns_resolver.clone(), + mmdb.clone(), + ) + .await + .ok(); + Self { rules: rules .into_iter() - .map(|r| map_rule_type(r, mmdb.clone())) + .map(|r| map_rule_type(r, mmdb.clone(), Some(&rule_provider_registry))) .collect(), dns_resolver, + rule_provider_registry, } } @@ -75,7 +98,12 @@ impl Router { } if r.apply(&sess_dup) { - info!("matched {} to target {}", &sess_dup, r.target()); + info!( + "matched {} to target {}[{}]", + &sess_dup, + r.target(), + r.type_name() + ); return (r.target(), Some(r)); } } @@ -83,13 +111,75 @@ impl Router { (MATCH, None) } + async fn load_rule_providers( + rule_providers: HashMap, + rule_provider_registry: &mut HashMap, + resolver: ThreadSafeDNSResolver, + mmdb: Arc, + ) -> Result<(), Error> { + for (name, provider) in rule_providers.into_iter() { + match provider { + RuleProviderDef::Http(http) => { + let vehicle = http_vehicle::Vehicle::new( + http.url + .parse::() + .expect(format!("invalid provider url: {}", http.url).as_str()), + http.path, + resolver.clone(), + ); + + let provider = RuleProviderImpl::new( + name.clone(), + http.behavior, + Duration::from_secs(http.interval), + Arc::new(vehicle), + mmdb.clone(), + ); + + rule_provider_registry.insert(name, Arc::new(provider)); + } + RuleProviderDef::File(file) => { + let vehicle = file_vehicle::Vehicle::new(&file.path); + + let provider = RuleProviderImpl::new( + name.clone(), + file.behavior, + Duration::from_secs(file.interval.unwrap_or_default()), + Arc::new(vehicle), + mmdb.clone(), + ); + + rule_provider_registry.insert(name, Arc::new(provider)); + } + } + } + + for p in rule_provider_registry.values() { + info!("initializing rule provider {}", p.name()); + match p.initialize().await { + Ok(_) => { + info!("rule provider {} initialized", p.name()); + } + Err(err) => { + error!("failed to initialize rule provider {}: {}", p.name(), err); + } + } + } + + Ok(()) + } + /// API handlers pub fn get_all_rules(&self) -> &Vec> { &self.rules } } -fn map_rule_type(rule_type: RuleType, mmdb: Arc) -> Box { +pub fn map_rule_type( + rule_type: RuleType, + mmdb: Arc, + rule_provider_registry: Option<&HashMap>, +) -> Box { match rule_type { RuleType::Domain { domain, target } => { Box::new(Domain { domain, target }) as Box @@ -149,9 +239,33 @@ fn map_rule_type(rule_type: RuleType, mmdb: Arc) -> Box { target, is_src: false, }), - RuleType::ProcessName => todo!(), - RuleType::ProcessPath => todo!(), - RuleType::RuleSet { rule_set, target } => Box::new(RuleSet { rule_set, target }), + RuleType::ProcessName { + process_name, + target, + } => Box::new(rules::process::Process { + name: process_name, + target, + name_only: true, + }), + RuleType::ProcessPath { + process_path, + target, + } => Box::new(rules::process::Process { + name: process_path, + target, + name_only: false, + }), + RuleType::RuleSet { rule_set, target } => match rule_provider_registry { + Some(rule_provider_registry) => Box::new(RuleSet::new( + rule_set.clone(), + target, + rule_provider_registry + .get(&rule_set) + .expect(format!("rule provider {} not found", rule_set).as_str()) + .clone(), + )), + None => unreachable!("you shouldn't next rule-set within another rule-set"), + }, RuleType::Match { target } => Box::new(Final { target }), } } diff --git a/clash_lib/src/app/router/rules/process.rs b/clash_lib/src/app/router/rules/process.rs index 8b1378917..30697a76b 100644 --- a/clash_lib/src/app/router/rules/process.rs +++ b/clash_lib/src/app/router/rules/process.rs @@ -1 +1,27 @@ +use super::RuleMatcher; +pub struct Process { + pub name: String, + pub target: String, + #[allow(dead_code)] + pub name_only: bool, +} + +impl RuleMatcher for Process { + fn apply(&self, _sess: &crate::session::Session) -> bool { + // TODO: implement this + false + } + + fn target(&self) -> &str { + &self.target + } + + fn payload(&self) -> String { + self.name.clone() + } + + fn type_name(&self) -> &str { + "Process" + } +} diff --git a/clash_lib/src/app/router/rules/ruleset.rs b/clash_lib/src/app/router/rules/ruleset.rs index 82dc564e0..74cc817fe 100644 --- a/clash_lib/src/app/router/rules/ruleset.rs +++ b/clash_lib/src/app/router/rules/ruleset.rs @@ -1,3 +1,4 @@ +use crate::app::remote_content_manager::providers::rule_provider::ThreadSafeRuleProvider; use crate::app::router::rules::RuleMatcher; use crate::session::Session; @@ -5,11 +6,22 @@ use crate::session::Session; pub struct RuleSet { pub rule_set: String, pub target: String, + pub rule_provider: ThreadSafeRuleProvider, +} + +impl RuleSet { + pub fn new(rule_set: String, target: String, rule_provider: ThreadSafeRuleProvider) -> Self { + Self { + rule_set, + target, + rule_provider, + } + } } impl RuleMatcher for RuleSet { - fn apply(&self, _sess: &Session) -> bool { - false + fn apply(&self, sess: &Session) -> bool { + self.rule_provider.search(sess) } fn target(&self) -> &str { diff --git a/clash_lib/src/config/def.rs b/clash_lib/src/config/def.rs index baf2ca1c5..56ef23ee3 100644 --- a/clash_lib/src/config/def.rs +++ b/clash_lib/src/config/def.rs @@ -119,6 +119,8 @@ pub struct Config { pub routing_mask: Option, #[serde(rename = "proxy-providers")] pub proxy_provider: Option>>, + #[serde(rename = "rule-providers")] + pub rule_provider: Option>>, pub experimental: Option, pub tun: Option>, @@ -176,6 +178,7 @@ impl Default for Config { interface: Default::default(), routing_mask: Default::default(), proxy_provider: Default::default(), + rule_provider: Default::default(), hosts: Default::default(), dns: Default::default(), experimental: Default::default(), diff --git a/clash_lib/src/config/internal/config.rs b/clash_lib/src/config/internal/config.rs index 02a426081..45fd50383 100644 --- a/clash_lib/src/config/internal/config.rs +++ b/clash_lib/src/config/internal/config.rs @@ -6,7 +6,9 @@ use std::str::FromStr; use serde::de::value::MapDeserializer; use serde::{Deserialize, Serialize}; +use serde_yaml::Value; +use crate::app::remote_content_manager::providers::rule_provider::RuleSetBehavior; use crate::common::auth; use crate::config::def::{self}; use crate::config::internal::proxy::{OutboundProxy, PROXY_DIRECT, PROXY_REJECT}; @@ -18,7 +20,7 @@ use crate::{ Error, }; -use super::proxy::{OutboundProxyProtocol, OutboundProxyProvider}; +use super::proxy::{map_serde_error, OutboundProxyProtocol, OutboundProxyProviderDef}; pub struct Config { pub general: General, @@ -27,12 +29,13 @@ pub struct Config { pub experimental: Option, pub profile: Profile, pub rules: Vec, + pub rule_providers: HashMap, pub users: Vec, /// a list maintaining the order from the config file pub proxy_names: Vec, pub proxies: HashMap, pub proxy_groups: HashMap, - pub proxy_providers: HashMap, + pub proxy_providers: HashMap, } impl Config { @@ -103,6 +106,23 @@ impl TryFrom for Config { .map_err(|x| Error::InvalidConfig(x.to_string())) }) .collect::, _>>()?, + rule_providers: c + .rule_provider + .map(|m| { + m.into_iter() + .try_fold(HashMap::new(), |mut rv, (name, body)| { + let provider = RuleProviderDef::try_from(body).map_err(|x| { + Error::InvalidConfig(format!( + "invalid rule provider {}: {}", + name, x + )) + })?; + rv.insert(name, provider); + Ok::, Error>(rv) + }) + .expect("proxy provider parse error") + }) + .unwrap_or_default(), users: c .authentication .into_iter() @@ -166,14 +186,15 @@ impl TryFrom for Config { .map(|m| { m.into_iter() .try_fold(HashMap::new(), |mut rv, (name, body)| { - let provider = OutboundProxyProvider::try_from(body).map_err(|x| { - Error::InvalidConfig(format!( - "invalid proxy provider {}: {}", - name, x - )) - })?; + let provider = + OutboundProxyProviderDef::try_from(body).map_err(|x| { + Error::InvalidConfig(format!( + "invalid proxy provider {}: {}", + name, x + )) + })?; rv.insert(name, provider); - Ok::, Error>(rv) + Ok::, Error>(rv) }) .expect("proxy provider parse error") }) @@ -286,3 +307,35 @@ pub struct Controller { pub external_ui: Option, pub secret: Option, } + +#[derive(Serialize, Deserialize)] +#[serde(tag = "type")] +#[serde(rename_all = "kebab-case")] +pub enum RuleProviderDef { + Http(HttpRuleProvider), + File(FileRuleProvider), +} + +#[derive(Serialize, Deserialize)] +pub struct HttpRuleProvider { + pub url: String, + pub interval: u64, + pub behavior: RuleSetBehavior, + pub path: String, +} + +#[derive(Serialize, Deserialize)] +pub struct FileRuleProvider { + pub path: String, + pub interval: Option, + pub behavior: RuleSetBehavior, +} + +impl TryFrom> for RuleProviderDef { + type Error = crate::Error; + + fn try_from(mapping: HashMap) -> Result { + RuleProviderDef::deserialize(MapDeserializer::new(mapping.into_iter())) + .map_err(map_serde_error) + } +} diff --git a/clash_lib/src/config/internal/proxy.rs b/clash_lib/src/config/internal/proxy.rs index df8896f11..26e1fab30 100644 --- a/clash_lib/src/config/internal/proxy.rs +++ b/clash_lib/src/config/internal/proxy.rs @@ -25,7 +25,7 @@ impl OutboundProxy { } } -fn map_serde_error(x: serde_yaml::Error) -> crate::Error { +pub fn map_serde_error(x: serde_yaml::Error) -> crate::Error { Error::InvalidConfig(if let Some(loc) = x.location() { format!( "{}, line, {}, column: {}", @@ -301,7 +301,7 @@ pub struct OutboundGroupSelect { #[derive(serde::Serialize, serde::Deserialize, Debug)] #[serde(tag = "type")] #[serde(rename_all = "kebab-case")] -pub enum OutboundProxyProvider { +pub enum OutboundProxyProviderDef { Http(OutboundHttpProvider), File(OutboundFileProvider), } @@ -335,11 +335,11 @@ pub struct HealthCheck { pub lazy: Option, } -impl TryFrom> for OutboundProxyProvider { +impl TryFrom> for OutboundProxyProviderDef { type Error = crate::Error; fn try_from(mapping: HashMap) -> Result { - OutboundProxyProvider::deserialize(MapDeserializer::new(mapping.into_iter())) + OutboundProxyProviderDef::deserialize(MapDeserializer::new(mapping.into_iter())) .map_err(map_serde_error) } } diff --git a/clash_lib/src/config/internal/rule.rs b/clash_lib/src/config/internal/rule.rs index deaee9c6a..36977ea5a 100644 --- a/clash_lib/src/config/internal/rule.rs +++ b/clash_lib/src/config/internal/rule.rs @@ -37,8 +37,14 @@ pub enum RuleType { target: String, port: u16, }, - ProcessName, - ProcessPath, + ProcessName { + process_name: String, + target: String, + }, + ProcessPath { + process_path: String, + target: String, + }, RuleSet { rule_set: String, target: String, @@ -77,8 +83,14 @@ impl RuleType { } => target, RuleType::SRCPort { target, port } => target, RuleType::DSTPort { target, port } => target, - RuleType::ProcessName => todo!(), - RuleType::ProcessPath => todo!(), + RuleType::ProcessName { + process_name, + target, + } => target, + RuleType::ProcessPath { + process_path, + target, + } => target, RuleType::RuleSet { rule_set, target } => target, RuleType::Match { target } => target, } @@ -114,8 +126,14 @@ impl Display for RuleType { } => write!(f, "SRC-IP-CIDR"), RuleType::SRCPort { target, port } => write!(f, "SRC-PORT"), RuleType::DSTPort { target, port } => write!(f, "DST-PORT"), - RuleType::ProcessName => write!(f, "PROCESS-NAME"), - RuleType::ProcessPath => write!(f, "PROCESS-PATH"), + RuleType::ProcessName { + process_name, + target, + } => write!(f, "PROCESS-NAME"), + RuleType::ProcessPath { + process_path, + target, + } => write!(f, "PROCESS-PATH"), RuleType::RuleSet { rule_set, target } => write!(f, "RULE-SET"), RuleType::Match { target } => write!(f, "MATCH"), } @@ -181,8 +199,14 @@ impl RuleType { .parse() .expect(format!("invalid port: {}", payload).as_str()), }), - "PROCESS-NAME" => todo!(), - "PROCESS-PATH" => todo!(), + "PROCESS-NAME" => Ok(RuleType::ProcessName { + process_name: payload.to_string(), + target: target.to_string(), + }), + "PROCESS-PATH" => Ok(RuleType::ProcessPath { + process_path: payload.to_string(), + target: target.to_string(), + }), "RULE-SET" => Ok(RuleType::RuleSet { rule_set: payload.to_string(), target: target.to_string(), diff --git a/clash_lib/src/lib.rs b/clash_lib/src/lib.rs index 20568629a..c3ce9e23d 100644 --- a/clash_lib/src/lib.rs +++ b/clash_lib/src/lib.rs @@ -164,6 +164,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { let router = Arc::new( Router::new( config.rules, + config.rule_providers, dns_resolver.clone(), config.general.mmdb, config.general.mmdb_download_url, diff --git a/clash_lib/src/proxy/mocks.rs b/clash_lib/src/proxy/mocks.rs index 33a61a64f..e442797c7 100644 --- a/clash_lib/src/proxy/mocks.rs +++ b/clash_lib/src/proxy/mocks.rs @@ -24,7 +24,7 @@ mock! { fn name(&self) -> &str; fn vehicle_type(&self) -> ProviderVehicleType; fn typ(&self) -> ProviderType; - async fn initialize(&mut self) -> std::io::Result<()>; + async fn initialize(&self) -> std::io::Result<()>; async fn update(&self) -> std::io::Result<()>; async fn as_map(&self) -> HashMap>;