diff --git a/src/extensions/client/endpoint.rs b/src/extensions/client/endpoint.rs index 2663702..8167f98 100644 --- a/src/extensions/client/endpoint.rs +++ b/src/extensions/client/endpoint.rs @@ -39,8 +39,7 @@ impl Endpoint { ) -> Self { let (client_tx, client_rx) = tokio::sync::watch::channel(None); let on_client_ready = Arc::new(tokio::sync::Notify::new()); - let url_ = url.clone(); - let health = Arc::new(Health::new(url_, health_config)); + let health = Arc::new(Health::new(url.clone(), health_config)); let url_ = url.clone(); let health_ = health.clone(); @@ -51,7 +50,7 @@ impl Endpoint { let connect_backoff_counter = Arc::new(AtomicU32::new(0)); loop { - tracing::info!("Connecting to endpoint: {url_}"); + tracing::info!("Connecting endpoint: {url_}"); let client = WsClientBuilder::default() .request_timeout(request_timeout.unwrap_or(Duration::from_secs(30))) diff --git a/src/extensions/client/health.rs b/src/extensions/client/health.rs index 2ca8265..b7ce98c 100644 --- a/src/extensions/client/health.rs +++ b/src/extensions/client/health.rs @@ -18,12 +18,26 @@ pub enum Event { StaleChain, } +impl Event { + pub fn update_score(&self, current: u32) -> u32 { + u32::min( + match self { + Event::ResponseOk => current.saturating_add(2), + Event::SlowResponse => current.saturating_sub(5), + Event::RequestTimeout | Event::ConnectionFailed | Event::StaleChain => 0, + Event::ConnectionSuccessful => MAX_SCORE / 5 * 4, // 80% of max score + }, + MAX_SCORE, + ) + } +} + #[derive(Debug, Default)] pub struct Health { - pub url: String, - pub config: HealthCheckConfig, - pub score: AtomicU32, - pub unhealthy: tokio::sync::Notify, + url: String, + config: HealthCheckConfig, + score: AtomicU32, + unhealthy: tokio::sync::Notify, } const MAX_SCORE: u32 = 100; @@ -34,7 +48,7 @@ impl Health { Self { url, config, - score: AtomicU32::new(100), + score: AtomicU32::new(0), unhealthy: tokio::sync::Notify::new(), } } @@ -45,23 +59,25 @@ impl Health { pub fn update(&self, event: Event) { let current_score = self.score.load(Ordering::Relaxed); - let new_score = u32::min( - match event { - Event::ResponseOk => current_score.saturating_add(5), - Event::SlowResponse => current_score.saturating_sub(5), - Event::RequestTimeout | Event::ConnectionFailed | Event::StaleChain => 0, - Event::ConnectionSuccessful => 100, - }, - MAX_SCORE, - ); + let new_score = event.update_score(current_score); + if new_score == current_score { + return; + } self.score.store(new_score, Ordering::Relaxed); + log::trace!( + "Endpoint {:?} score updated from: {current_score} to: {new_score}", + self.url + ); + // Notify waiters if the score has dropped below the threshold if current_score >= THRESHOLD && new_score < THRESHOLD { + log::warn!("Endpoint {:?} became unhealthy", self.url); self.unhealthy.notify_waiters(); } } pub fn on_error(&self, err: &jsonrpsee::core::Error) { + log::warn!("Endpoint {:?} responded with error: {err:?}", self.url); match err { jsonrpsee::core::Error::RequestTimeout => { self.update(Event::RequestTimeout); @@ -74,6 +90,10 @@ impl Health { _ => {} }; } + + pub async fn unhealthy(&self) { + self.unhealthy.notified().await; + } } impl Health { diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index 38e41be..bca5fc0 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -338,12 +338,12 @@ impl Client { loop { tokio::select! { - _ = selected_endpoint.health.unhealthy.notified() => { + _ = selected_endpoint.health.unhealthy() => { // Current selected endpoint is unhealthy, try to rotate to another one. // In case of all endpoints are unhealthy, we don't want to keep rotating but stick with the healthiest one. let new_selected_endpoint = healthiest_endpoint(None).await; if new_selected_endpoint.url != selected_endpoint.url { - tracing::info!("Endpoint {current_url} is unhealthy, switch to endpoint: {new_url}", current_url = selected_endpoint.url, new_url=new_selected_endpoint.url); + tracing::warn!("Switch to endpoint: {new_url}", new_url=new_selected_endpoint.url); selected_endpoint = new_selected_endpoint; rotation_notify_bg.notify_waiters(); }