Skip to content

Commit

Permalink
Feat: add apollo datasource (#117)
Browse files Browse the repository at this point in the history
* add apollo datasource
  • Loading branch information
flearc authored Dec 12, 2023
1 parent 1c3d3c8 commit df1433d
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 2 deletions.
117 changes: 117 additions & 0 deletions examples/datasources/apollo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#![allow(unreachable_code)]
use apollo_client::conf::{requests::WatchRequest, ApolloConfClientBuilder};
use sentinel_core::{
base,
datasource::{new_flow_rule_handler, rule_json_array_parser, ApolloDatasource},
flow, EntryBuilder, Result,
};
use std::sync::Arc;
use tokio::{
task::JoinHandle,
time::{sleep, Duration},
};
use url::Url;

// An example on apollo config service data source.
// Run this example by following steps:
// 1. Set up apollo
// (Quick start see https://github.com/apolloconfig/apollo-quick-start)
// 2. Run this example
// 3. Publish flow rule below at apollo-portal
// key: flow-apollo-example
// value:
// [
// {
// "id":"1",
// "resource":"task",
// "ref_resource":"",
// "calculate_strategy":"Direct",
// "control_strategy":"Reject",
// "relation_strategy":"Current",
// "threshold":1.0,
// "warm_up_period_sec":0,
// "warm_up_cold_factor":0,
// "max_queueing_time_ms":0,
// "stat_interval_ms":0,
// "low_mem_usage_threshold":0,
// "high_mem_usage_threshold":0,
// "mem_low_water_mark":0,
// "mem_high_water_mark":0
// }
// ]
// You will find that QPS number is restricted to 10 at first. But after publish the new flow rule,
// it will be restricted to 1.
#[tokio::main]
async fn main() -> Result<()> {
let handlers = basic_flow_example().await;
// println!("{:?}", sentinel_core::flow::get_rules_of_resource(&"task".to_string()));

// Create apollo client
let client =
ApolloConfClientBuilder::new_via_config_service(Url::parse("http://localhost:8080")?)?
.build()?;

// Request apollo notification api, and fetch configuration when notified.
let watch_request = WatchRequest {
app_id: "SampleApp".to_string(),
namespace_names: vec![
"application.properties".into(),
"application.json".into(),
"application.yml".into(),
],
..Default::default()
};

// Sleep 3 seconds and then read the apollo
sentinel_core::utils::sleep_for_ms(3000);

let property = "flow-apollo-example";
// Create a data source and change the rule.
let h = new_flow_rule_handler(rule_json_array_parser);
let mut ds = ApolloDatasource::new(client, property.into(), watch_request, vec![h]);
ds.initialize().await?;
for h in handlers {
h.await.expect("Couldn't join on the associated thread");
}
Ok(())
}

async fn basic_flow_example() -> Vec<JoinHandle<()>> {
// Init sentienl configurations
sentinel_core::init_default().unwrap_or_else(|err| sentinel_core::logging::error!("{:?}", err));
let resource_name = String::from("task");
// Load sentinel rules
flow::load_rules(vec![Arc::new(flow::Rule {
resource: resource_name.clone(),
threshold: 10.0,
calculate_strategy: flow::CalculateStrategy::Direct,
control_strategy: flow::ControlStrategy::Reject,
..Default::default()
})]);
let mut handlers = Vec::new();
for _ in 0..20 {
let res_name = resource_name.clone();
handlers.push(tokio::spawn(async move {
loop {
let entry_builder = EntryBuilder::new(res_name.clone())
.with_traffic_type(base::TrafficType::Inbound);
if let Ok(entry) = entry_builder.build() {
// Passed, wrap the logic here.
task().await;
// Be sure the entry is exited finally.
entry.exit()
} else {
sleep(Duration::from_millis(100)).await;
}
}
}));
}
handlers
}

// todo: Cannot sentinel-macros now. It will append rules,
// which is conflicts with the dynamic datasource
async fn task() {
println!("{}: passed", sentinel_core::utils::curr_time_millis());
sleep(Duration::from_millis(100)).await;
}
9 changes: 9 additions & 0 deletions sentinel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ metric_log = ["directories", "regex"]
ds_etcdv3 = ["etcd-rs", "futures"]
ds_consul = ["consul", "base64"]
ds_k8s = ["kube", "k8s-openapi", "schemars", "futures"]
ds_apollo = ["apollo-client", "futures-util"]

[dependencies]
sentinel-macros = { version = "0.1.0", path = "../sentinel-macros", optional = true }
Expand Down Expand Up @@ -66,6 +67,8 @@ k8s-openapi = { version = "0.16.0", default-features = false, features = [
"v1_25",
], optional = true }
schemars = { version = "0.8.8", optional = true }
apollo-client = { version = "0.7.5", optional = true }
futures-util = { version = "0.3.29", optional = true }
dirs = "5.0.1"

[target.'cfg(not(target_arch="wasm32"))'.dependencies]
Expand All @@ -77,6 +80,7 @@ uuid = { version = "1.2", features = ["serde", "v4"] }
mockall = "0.11.0"
rand = "0.8.4"
tokio = { version = "1", features = ["full"] }
url = "2.5.0"

[lib]
doctest = false
Expand Down Expand Up @@ -211,3 +215,8 @@ required-features = ["full", "ds_consul"]
name = "k8s"
path = "../examples/datasources/k8s.rs"
required-features = ["full", "ds_k8s"]

[[example]]
name = "apollo"
path = "../examples/datasources/apollo.rs"
required-features = ["full", "ds_apollo"]
93 changes: 93 additions & 0 deletions sentinel-core/src/datasource/adapters/ds_apollo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use super::*;
use crate::{logging, utils::sleep_for_ms};
use apollo_client::conf::{requests::WatchRequest, ApolloConfClient};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use futures_util::{future, pin_mut, stream::StreamExt};

pub struct ApolloDatasource<P: SentinelRule + PartialEq + DeserializeOwned, H: PropertyHandler<P>> {
ds: DataSourceBase<P, H>,
property: String,
watch_request: WatchRequest,
client: ApolloConfClient,
closed: AtomicBool,
}

impl<P: SentinelRule + PartialEq + DeserializeOwned, H: PropertyHandler<P>> ApolloDatasource<P, H> {
pub fn new(client: ApolloConfClient, property: String, watch_request: WatchRequest,
handlers: Vec<Arc<H>>) -> Self {
let mut ds = DataSourceBase::default();
for h in handlers {
ds.add_property_handler(h);
}
ApolloDatasource {
ds,
property,
client,
watch_request,
closed: AtomicBool::new(false),
}
}

pub async fn initialize(&mut self) -> Result<()> {
self.watch().await
}

async fn watch(&mut self) -> Result<()> {
logging::info!(
"[Apollo] Apollo datasource is watching property {}",
self.property
);

let stream = self.client.watch(self.watch_request.clone())
.take_while(|_| future::ready(!self.closed.load(Ordering::SeqCst)));

pin_mut!(stream);

while let Some(response) = stream.next().await {
match response {
Ok(responses) => {
// Load rules
// One namespace for one response
for (_, value) in responses {
match value {
Ok(r) => {
let rule = r.configurations.get(&self.property);
if let Err(e) = self.ds.update(rule) {
logging::error!("[Apollo] Failed to update rules, {:?}", e);
}
},
Err(e) => logging::error!("[Apollo] Fail to fetch response from apollo, {:?}", e),
};
}
},
// retry
Err(e) => {
logging::error!("[Apollo] Client yield an error, {:?}", e);
sleep_for_ms(1000);
}
}
}

Ok(())
}

pub fn close(&self) -> Result<()> {
self.closed.store(true, Ordering::SeqCst);
logging::info!(
"[Apollo] Apollo data source has been closed. Stop watch the key {:?} from apollo.",
self.property
);
Ok(())
}
}

impl<P: SentinelRule + PartialEq + DeserializeOwned, H: PropertyHandler<P>> DataSource<P, H>
for ApolloDatasource<P, H>
{
fn get_base(&mut self) -> &mut DataSourceBase<P, H> {
&mut self.ds
}
}
4 changes: 4 additions & 0 deletions sentinel-core/src/datasource/adapters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ pub use ds_etcdv3::*;
pub mod ds_consul;
#[cfg(feature = "ds_consul")]
pub use ds_consul::*;
#[cfg(feature = "ds_apollo")]
pub mod ds_apollo;
#[cfg(feature = "ds_apollo")]
pub use ds_apollo::*;
cfg_k8s! {
pub mod ds_k8s;
pub use ds_k8s::*;
Expand Down
4 changes: 2 additions & 2 deletions sentinel-core/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ macro_rules! cfg_exporter {
macro_rules! cfg_datasource {
($($item:item)*) => {
$(
#[cfg(any(feature = "ds_etcdv3", feature = "ds_consul", feature = "ds_k8s"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "ds_etcdv3", feature = "ds_consul", feature = "ds_k8s"))))]
#[cfg(any(feature = "ds_etcdv3", feature = "ds_consul", feature = "ds_k8s", feature = "ds_apollo"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "ds_etcdv3", feature = "ds_consul", feature = "ds_k8s", feature = "ds_apollo"))))]
$item
)*
}
Expand Down

0 comments on commit df1433d

Please sign in to comment.