Skip to content

Commit

Permalink
add logs
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci committed Apr 5, 2024
1 parent ae907bb commit 08f44b8
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 19 deletions.
5 changes: 2 additions & 3 deletions src/extensions/client/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ impl Endpoint {
) -> Self {
let (client_tx, client_rx) = tokio::sync::watch::channel(None);
let on_client_ready = Arc::new(tokio::sync::Notify::new());
let url_ = url.clone();
let health = Arc::new(Health::new(url_, health_config));
let health = Arc::new(Health::new(url.clone(), health_config));

let url_ = url.clone();
let health_ = health.clone();
Expand All @@ -51,7 +50,7 @@ impl Endpoint {
let connect_backoff_counter = Arc::new(AtomicU32::new(0));

loop {
tracing::info!("Connecting to endpoint: {url_}");
tracing::info!("Connecting endpoint: {url_}");

let client = WsClientBuilder::default()
.request_timeout(request_timeout.unwrap_or(Duration::from_secs(30)))
Expand Down
48 changes: 34 additions & 14 deletions src/extensions/client/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,26 @@ pub enum Event {
StaleChain,
}

impl Event {
pub fn update_score(&self, current: u32) -> u32 {
u32::min(
match self {
Event::ResponseOk => current.saturating_add(2),
Event::SlowResponse => current.saturating_sub(5),
Event::RequestTimeout | Event::ConnectionFailed | Event::StaleChain => 0,
Event::ConnectionSuccessful => MAX_SCORE / 5 * 4, // 80% of max score
},
MAX_SCORE,
)
}
}

#[derive(Debug, Default)]
pub struct Health {
pub url: String,
pub config: HealthCheckConfig,
pub score: AtomicU32,
pub unhealthy: tokio::sync::Notify,
url: String,
config: HealthCheckConfig,
score: AtomicU32,
unhealthy: tokio::sync::Notify,
}

const MAX_SCORE: u32 = 100;
Expand All @@ -34,7 +48,7 @@ impl Health {
Self {
url,
config,
score: AtomicU32::new(100),
score: AtomicU32::new(0),
unhealthy: tokio::sync::Notify::new(),
}
}
Expand All @@ -45,23 +59,25 @@ impl Health {

pub fn update(&self, event: Event) {
let current_score = self.score.load(Ordering::Relaxed);
let new_score = u32::min(
match event {
Event::ResponseOk => current_score.saturating_add(5),
Event::SlowResponse => current_score.saturating_sub(5),
Event::RequestTimeout | Event::ConnectionFailed | Event::StaleChain => 0,
Event::ConnectionSuccessful => 100,
},
MAX_SCORE,
);
let new_score = event.update_score(current_score);
if new_score == current_score {
return;
}
self.score.store(new_score, Ordering::Relaxed);
log::trace!(
"Endpoint {:?} score updated from: {current_score} to: {new_score}",
self.url
);

// Notify waiters if the score has dropped below the threshold
if current_score >= THRESHOLD && new_score < THRESHOLD {
log::warn!("Endpoint {:?} became unhealthy", self.url);
self.unhealthy.notify_waiters();
}
}

pub fn on_error(&self, err: &jsonrpsee::core::Error) {
log::warn!("Endpoint {:?} responded with error: {err:?}", self.url);
match err {
jsonrpsee::core::Error::RequestTimeout => {
self.update(Event::RequestTimeout);
Expand All @@ -74,6 +90,10 @@ impl Health {
_ => {}
};
}

pub async fn unhealthy(&self) {
self.unhealthy.notified().await;
}
}

impl Health {
Expand Down
4 changes: 2 additions & 2 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,12 @@ impl Client {

loop {
tokio::select! {
_ = selected_endpoint.health.unhealthy.notified() => {
_ = selected_endpoint.health.unhealthy() => {
// Current selected endpoint is unhealthy, try to rotate to another one.
// In case of all endpoints are unhealthy, we don't want to keep rotating but stick with the healthiest one.
let new_selected_endpoint = healthiest_endpoint(None).await;
if new_selected_endpoint.url != selected_endpoint.url {
tracing::info!("Endpoint {current_url} is unhealthy, switch to endpoint: {new_url}", current_url = selected_endpoint.url, new_url=new_selected_endpoint.url);
tracing::warn!("Switch to endpoint: {new_url}", new_url=new_selected_endpoint.url);
selected_endpoint = new_selected_endpoint;
rotation_notify_bg.notify_waiters();
}
Expand Down

0 comments on commit 08f44b8

Please sign in to comment.