From 153e3fc7ab0ac1b87ef86139c973b6d4f8f99465 Mon Sep 17 00:00:00 2001 From: Serge Farny Date: Wed, 27 Mar 2024 14:06:28 +0100 Subject: [PATCH 1/2] liquidator: forcefully exit process if snapshot job die --- lib/client/src/snapshot_source.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/client/src/snapshot_source.rs b/lib/client/src/snapshot_source.rs index 44da48303d..bbe7198551 100644 --- a/lib/client/src/snapshot_source.rs +++ b/lib/client/src/snapshot_source.rs @@ -227,7 +227,7 @@ pub fn start(config: Config, mango_oracles: Vec, sender: async_channel:: let mut poll_wait_first_snapshot = crate::delay_interval(time::Duration::from_secs(2)); let mut interval_between_snapshots = crate::delay_interval(config.snapshot_interval); - tokio::spawn(async move { + let snapshot_job = tokio::spawn(async move { let rpc_client = http::connect_with_options::(&config.rpc_http_url, true) .await .expect("always Ok"); @@ -260,4 +260,10 @@ pub fn start(config: Config, mango_oracles: Vec, sender: async_channel:: }; } }); + + tokio::spawn(async move { + let res = snapshot_job.await; + tracing::error!("Snapshot job exited, terminating process.. ({:?})", res); + std::process::exit(-1); + }); } From ed1617ee915a789eb68770eb6e2f04cb6c94451c Mon Sep 17 00:00:00 2001 From: Serge Farny Date: Mon, 1 Apr 2024 10:30:22 +0200 Subject: [PATCH 2/2] client: return snapshot_job join handle so it can be watched for early unexpected exit --- bin/cli/src/save_snapshot.rs | 7 ++++++- bin/liquidator/src/main.rs | 18 +++++++++++------- bin/service-mango-health/src/main.rs | 3 ++- .../src/processors/data.rs | 14 ++++++++------ bin/settler/src/main.rs | 3 ++- lib/client/src/snapshot_source.rs | 13 +++++++------ 6 files changed, 36 insertions(+), 22 deletions(-) diff --git a/bin/cli/src/save_snapshot.rs b/bin/cli/src/save_snapshot.rs index 575b900bbc..edd6e63e59 100644 --- a/bin/cli/src/save_snapshot.rs +++ b/bin/cli/src/save_snapshot.rs @@ -67,7 +67,7 @@ pub async fn save_snapshot( .await?; // Getting solana account snapshots via jsonrpc - snapshot_source::start( + let snapshot_job = snapshot_source::start( snapshot_source::Config { rpc_http_url: rpc_url.clone(), mango_group, @@ -79,6 +79,11 @@ pub async fn save_snapshot( extra_accounts, account_update_sender, ); + tokio::spawn(async move { + let res = snapshot_job.await; + tracing::error!("Snapshot job exited, terminating process.. ({:?})", res); + std::process::exit(-1); + }); let mut chain_data = chain_data::ChainData::new(); diff --git a/bin/liquidator/src/main.rs b/bin/liquidator/src/main.rs index 1c62c9ad40..933d045fe7 100644 --- a/bin/liquidator/src/main.rs +++ b/bin/liquidator/src/main.rs @@ -169,7 +169,7 @@ async fn main() -> anyhow::Result<()> { // Getting solana account snapshots via jsonrpc // FUTURE: of what to fetch a snapshot - should probably take as an input - snapshot_source::start( + let snapshot_job = snapshot_source::start( snapshot_source::Config { rpc_http_url: rpc_url.clone(), mango_group, @@ -456,12 +456,16 @@ async fn main() -> anyhow::Result<()> { spawn_token_swap_refresh_job(&cli, shared_state, token_swap_info_updater); let check_changes_for_abort_job = spawn_context_change_watchdog_job(mango_client.clone()); - let mut jobs: futures::stream::FuturesUnordered<_> = - vec![data_job, token_swap_info_job, check_changes_for_abort_job] - .into_iter() - .chain(optional_jobs) - .chain(prio_jobs.into_iter()) - .collect(); + let mut jobs: futures::stream::FuturesUnordered<_> = vec![ + snapshot_job, + data_job, + token_swap_info_job, + check_changes_for_abort_job, + ] + .into_iter() + .chain(optional_jobs) + .chain(prio_jobs.into_iter()) + .collect(); jobs.next().await; error!("a critical job aborted, exiting"); diff --git a/bin/service-mango-health/src/main.rs b/bin/service-mango-health/src/main.rs index 9b3b5174a8..377415fc76 100644 --- a/bin/service-mango-health/src/main.rs +++ b/bin/service-mango-health/src/main.rs @@ -64,7 +64,8 @@ async fn main() -> anyhow::Result<()> { ) .await?; - let mut jobs = vec![exit_processor.job, data_processor.job, health_processor.job]; + let mut jobs = vec![exit_processor.job, health_processor.job]; + jobs.extend(data_processor.jobs); if let Some(logger) = logger { jobs.push(logger.job) diff --git a/bin/service-mango-health/src/processors/data.rs b/bin/service-mango-health/src/processors/data.rs index 3a25923b90..feb24357f1 100644 --- a/bin/service-mango-health/src/processors/data.rs +++ b/bin/service-mango-health/src/processors/data.rs @@ -22,7 +22,7 @@ use tracing::warn; pub struct DataProcessor { pub channel: tokio::sync::broadcast::Sender, - pub job: JoinHandle<()>, + pub jobs: Vec>, pub chain_data: Arc>, } @@ -52,7 +52,7 @@ impl DataProcessor { ) -> anyhow::Result { let mut retry_counter = RetryCounter::new(2); let mango_group = Pubkey::from_str(&configuration.mango_group)?; - let mango_stream = + let (mango_stream, snapshot_job) = fail_or_retry!(retry_counter, Self::init_mango_source(configuration).await)?; let (sender, _) = tokio::sync::broadcast::channel(8192); let sender_clone = sender.clone(); @@ -98,7 +98,7 @@ impl DataProcessor { let result = DataProcessor { channel: sender, - job, + jobs: vec![job, snapshot_job], chain_data, }; @@ -147,7 +147,9 @@ impl DataProcessor { return Some(Other); } - async fn init_mango_source(configuration: &Configuration) -> anyhow::Result> { + async fn init_mango_source( + configuration: &Configuration, + ) -> anyhow::Result<(Receiver, JoinHandle<()>)> { // // Client setup // @@ -192,7 +194,7 @@ impl DataProcessor { // Getting solana account snapshots via jsonrpc // FUTURE: of what to fetch a snapshot - should probably take as an input - snapshot_source::start( + let snapshot_job = snapshot_source::start( snapshot_source::Config { rpc_http_url: configuration.rpc_http_url.clone(), mango_group, @@ -205,6 +207,6 @@ impl DataProcessor { account_update_sender, ); - Ok(account_update_receiver) + Ok((account_update_receiver, snapshot_job)) } } diff --git a/bin/settler/src/main.rs b/bin/settler/src/main.rs index 57f408a219..d1be966c40 100644 --- a/bin/settler/src/main.rs +++ b/bin/settler/src/main.rs @@ -178,7 +178,7 @@ async fn main() -> anyhow::Result<()> { // Getting solana account snapshots via jsonrpc // FUTURE: of what to fetch a snapshot - should probably take as an input - snapshot_source::start( + let snapshot_job = snapshot_source::start( snapshot_source::Config { rpc_http_url: rpc_url.clone(), mango_group, @@ -353,6 +353,7 @@ async fn main() -> anyhow::Result<()> { use futures::StreamExt; let mut jobs: futures::stream::FuturesUnordered<_> = vec![ + snapshot_job, data_job, settle_job, tcs_start_job, diff --git a/lib/client/src/snapshot_source.rs b/lib/client/src/snapshot_source.rs index bbe7198551..d35ca54a95 100644 --- a/lib/client/src/snapshot_source.rs +++ b/lib/client/src/snapshot_source.rs @@ -16,6 +16,7 @@ use solana_rpc::rpc::rpc_accounts::AccountsDataClient; use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient; use std::str::FromStr; use std::time::{Duration, Instant}; +use tokio::task::JoinHandle; use tokio::time; use tracing::*; @@ -223,7 +224,11 @@ async fn feed_snapshots( Ok(()) } -pub fn start(config: Config, mango_oracles: Vec, sender: async_channel::Sender) { +pub fn start( + config: Config, + mango_oracles: Vec, + sender: async_channel::Sender, +) -> JoinHandle<()> { let mut poll_wait_first_snapshot = crate::delay_interval(time::Duration::from_secs(2)); let mut interval_between_snapshots = crate::delay_interval(config.snapshot_interval); @@ -261,9 +266,5 @@ pub fn start(config: Config, mango_oracles: Vec, sender: async_channel:: } }); - tokio::spawn(async move { - let res = snapshot_job.await; - tracing::error!("Snapshot job exited, terminating process.. ({:?})", res); - std::process::exit(-1); - }); + snapshot_job }