diff --git a/examples/datasources/apollo.rs b/examples/datasources/apollo.rs new file mode 100644 index 0000000..82634a0 --- /dev/null +++ b/examples/datasources/apollo.rs @@ -0,0 +1,117 @@ +#![allow(unreachable_code)] +use apollo_client::conf::{requests::WatchRequest, ApolloConfClientBuilder}; +use sentinel_core::{ + base, + datasource::{new_flow_rule_handler, rule_json_array_parser, ApolloDatasource}, + flow, EntryBuilder, Result, +}; +use std::sync::Arc; +use tokio::{ + task::JoinHandle, + time::{sleep, Duration}, +}; +use url::Url; + +// An example on apollo config service data source. +// Run this example by following steps: +// 1. Set up apollo +// (Quick start see https://github.com/apolloconfig/apollo-quick-start) +// 2. Run this example +// 3. Publish flow rule below at apollo-portal +// key: flow-apollo-example +// value: +// [ +// { +// "id":"1", +// "resource":"task", +// "ref_resource":"", +// "calculate_strategy":"Direct", +// "control_strategy":"Reject", +// "relation_strategy":"Current", +// "threshold":1.0, +// "warm_up_period_sec":0, +// "warm_up_cold_factor":0, +// "max_queueing_time_ms":0, +// "stat_interval_ms":0, +// "low_mem_usage_threshold":0, +// "high_mem_usage_threshold":0, +// "mem_low_water_mark":0, +// "mem_high_water_mark":0 +// } +// ] +// You will find that QPS number is restricted to 10 at first. But after publish the new flow rule, +// it will be restricted to 1. +#[tokio::main] +async fn main() -> Result<()> { + let handlers = basic_flow_example().await; + // println!("{:?}", sentinel_core::flow::get_rules_of_resource(&"task".to_string())); + + // Create apollo client + let client = + ApolloConfClientBuilder::new_via_config_service(Url::parse("http://localhost:8080")?)? + .build()?; + + // Request apollo notification api, and fetch configuration when notified. + let watch_request = WatchRequest { + app_id: "SampleApp".to_string(), + namespace_names: vec![ + "application.properties".into(), + "application.json".into(), + "application.yml".into(), + ], + ..Default::default() + }; + + // Sleep 3 seconds and then read the apollo + sentinel_core::utils::sleep_for_ms(3000); + + let property = "flow-apollo-example"; + // Create a data source and change the rule. + let h = new_flow_rule_handler(rule_json_array_parser); + let mut ds = ApolloDatasource::new(client, property.into(), watch_request, vec![h]); + ds.initialize().await?; + for h in handlers { + h.await.expect("Couldn't join on the associated thread"); + } + Ok(()) +} + +async fn basic_flow_example() -> Vec> { + // Init sentienl configurations + sentinel_core::init_default().unwrap_or_else(|err| sentinel_core::logging::error!("{:?}", err)); + let resource_name = String::from("task"); + // Load sentinel rules + flow::load_rules(vec![Arc::new(flow::Rule { + resource: resource_name.clone(), + threshold: 10.0, + calculate_strategy: flow::CalculateStrategy::Direct, + control_strategy: flow::ControlStrategy::Reject, + ..Default::default() + })]); + let mut handlers = Vec::new(); + for _ in 0..20 { + let res_name = resource_name.clone(); + handlers.push(tokio::spawn(async move { + loop { + let entry_builder = EntryBuilder::new(res_name.clone()) + .with_traffic_type(base::TrafficType::Inbound); + if let Ok(entry) = entry_builder.build() { + // Passed, wrap the logic here. + task().await; + // Be sure the entry is exited finally. + entry.exit() + } else { + sleep(Duration::from_millis(100)).await; + } + } + })); + } + handlers +} + +// todo: Cannot sentinel-macros now. It will append rules, +// which is conflicts with the dynamic datasource +async fn task() { + println!("{}: passed", sentinel_core::utils::curr_time_millis()); + sleep(Duration::from_millis(100)).await; +} diff --git a/sentinel-core/Cargo.toml b/sentinel-core/Cargo.toml index f1a4a65..12d17c7 100644 --- a/sentinel-core/Cargo.toml +++ b/sentinel-core/Cargo.toml @@ -31,6 +31,7 @@ metric_log = ["directories", "regex"] ds_etcdv3 = ["etcd-rs", "futures"] ds_consul = ["consul", "base64"] ds_k8s = ["kube", "k8s-openapi", "schemars", "futures"] +ds_apollo = ["apollo-client", "futures-util"] [dependencies] sentinel-macros = { version = "0.1.0", path = "../sentinel-macros", optional = true } @@ -66,6 +67,8 @@ k8s-openapi = { version = "0.16.0", default-features = false, features = [ "v1_25", ], optional = true } schemars = { version = "0.8.8", optional = true } +apollo-client = { version = "0.7.5", optional = true } +futures-util = { version = "0.3.29", optional = true } dirs = "5.0.1" [target.'cfg(not(target_arch="wasm32"))'.dependencies] @@ -77,6 +80,7 @@ uuid = { version = "1.2", features = ["serde", "v4"] } mockall = "0.11.0" rand = "0.8.4" tokio = { version = "1", features = ["full"] } +url = "2.5.0" [lib] doctest = false @@ -211,3 +215,8 @@ required-features = ["full", "ds_consul"] name = "k8s" path = "../examples/datasources/k8s.rs" required-features = ["full", "ds_k8s"] + +[[example]] +name = "apollo" +path = "../examples/datasources/apollo.rs" +required-features = ["full", "ds_apollo"] diff --git a/sentinel-core/src/datasource/adapters/ds_apollo.rs b/sentinel-core/src/datasource/adapters/ds_apollo.rs new file mode 100644 index 0000000..0798d89 --- /dev/null +++ b/sentinel-core/src/datasource/adapters/ds_apollo.rs @@ -0,0 +1,93 @@ +use super::*; +use crate::{logging, utils::sleep_for_ms}; +use apollo_client::conf::{requests::WatchRequest, ApolloConfClient}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; +use futures_util::{future, pin_mut, stream::StreamExt}; + +pub struct ApolloDatasource> { + ds: DataSourceBase, + property: String, + watch_request: WatchRequest, + client: ApolloConfClient, + closed: AtomicBool, +} + +impl> ApolloDatasource { + pub fn new(client: ApolloConfClient, property: String, watch_request: WatchRequest, + handlers: Vec>) -> Self { + let mut ds = DataSourceBase::default(); + for h in handlers { + ds.add_property_handler(h); + } + ApolloDatasource { + ds, + property, + client, + watch_request, + closed: AtomicBool::new(false), + } + } + + pub async fn initialize(&mut self) -> Result<()> { + self.watch().await + } + + async fn watch(&mut self) -> Result<()> { + logging::info!( + "[Apollo] Apollo datasource is watching property {}", + self.property + ); + + let stream = self.client.watch(self.watch_request.clone()) + .take_while(|_| future::ready(!self.closed.load(Ordering::SeqCst))); + + pin_mut!(stream); + + while let Some(response) = stream.next().await { + match response { + Ok(responses) => { + // Load rules + // One namespace for one response + for (_, value) in responses { + match value { + Ok(r) => { + let rule = r.configurations.get(&self.property); + if let Err(e) = self.ds.update(rule) { + logging::error!("[Apollo] Failed to update rules, {:?}", e); + } + }, + Err(e) => logging::error!("[Apollo] Fail to fetch response from apollo, {:?}", e), + }; + } + }, + // retry + Err(e) => { + logging::error!("[Apollo] Client yield an error, {:?}", e); + sleep_for_ms(1000); + } + } + } + + Ok(()) + } + + pub fn close(&self) -> Result<()> { + self.closed.store(true, Ordering::SeqCst); + logging::info!( + "[Apollo] Apollo data source has been closed. Stop watch the key {:?} from apollo.", + self.property + ); + Ok(()) + } +} + +impl> DataSource +for ApolloDatasource +{ + fn get_base(&mut self) -> &mut DataSourceBase { + &mut self.ds + } +} \ No newline at end of file diff --git a/sentinel-core/src/datasource/adapters/mod.rs b/sentinel-core/src/datasource/adapters/mod.rs index e14e6a2..fb49081 100644 --- a/sentinel-core/src/datasource/adapters/mod.rs +++ b/sentinel-core/src/datasource/adapters/mod.rs @@ -9,6 +9,10 @@ pub use ds_etcdv3::*; pub mod ds_consul; #[cfg(feature = "ds_consul")] pub use ds_consul::*; +#[cfg(feature = "ds_apollo")] +pub mod ds_apollo; +#[cfg(feature = "ds_apollo")] +pub use ds_apollo::*; cfg_k8s! { pub mod ds_k8s; pub use ds_k8s::*; diff --git a/sentinel-core/src/macros/cfg.rs b/sentinel-core/src/macros/cfg.rs index 9f7ad3b..083a42f 100644 --- a/sentinel-core/src/macros/cfg.rs +++ b/sentinel-core/src/macros/cfg.rs @@ -13,8 +13,8 @@ macro_rules! cfg_exporter { macro_rules! cfg_datasource { ($($item:item)*) => { $( - #[cfg(any(feature = "ds_etcdv3", feature = "ds_consul", feature = "ds_k8s"))] - #[cfg_attr(docsrs, doc(cfg(any(feature = "ds_etcdv3", feature = "ds_consul", feature = "ds_k8s"))))] + #[cfg(any(feature = "ds_etcdv3", feature = "ds_consul", feature = "ds_k8s", feature = "ds_apollo"))] + #[cfg_attr(docsrs, doc(cfg(any(feature = "ds_etcdv3", feature = "ds_consul", feature = "ds_k8s", feature = "ds_apollo"))))] $item )* }