Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

liquidator: forcefully exit process if snapshot job die #924

Merged
merged 2 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion bin/cli/src/save_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub async fn save_snapshot(
.await?;

// Getting solana account snapshots via jsonrpc
snapshot_source::start(
let snapshot_job = snapshot_source::start(
snapshot_source::Config {
rpc_http_url: rpc_url.clone(),
mango_group,
Expand All @@ -79,6 +79,11 @@ pub async fn save_snapshot(
extra_accounts,
account_update_sender,
);
tokio::spawn(async move {
let res = snapshot_job.await;
tracing::error!("Snapshot job exited, terminating process.. ({:?})", res);
std::process::exit(-1);
});

let mut chain_data = chain_data::ChainData::new();

Expand Down
18 changes: 11 additions & 7 deletions bin/liquidator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async fn main() -> anyhow::Result<()> {

// Getting solana account snapshots via jsonrpc
// FUTURE: of what to fetch a snapshot - should probably take as an input
snapshot_source::start(
let snapshot_job = snapshot_source::start(
snapshot_source::Config {
rpc_http_url: rpc_url.clone(),
mango_group,
Expand Down Expand Up @@ -456,12 +456,16 @@ async fn main() -> anyhow::Result<()> {
spawn_token_swap_refresh_job(&cli, shared_state, token_swap_info_updater);
let check_changes_for_abort_job = spawn_context_change_watchdog_job(mango_client.clone());

let mut jobs: futures::stream::FuturesUnordered<_> =
vec![data_job, token_swap_info_job, check_changes_for_abort_job]
.into_iter()
.chain(optional_jobs)
.chain(prio_jobs.into_iter())
.collect();
let mut jobs: futures::stream::FuturesUnordered<_> = vec![
snapshot_job,
data_job,
token_swap_info_job,
check_changes_for_abort_job,
]
.into_iter()
.chain(optional_jobs)
.chain(prio_jobs.into_iter())
.collect();
jobs.next().await;

error!("a critical job aborted, exiting");
Expand Down
3 changes: 2 additions & 1 deletion bin/service-mango-health/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ async fn main() -> anyhow::Result<()> {
)
.await?;

let mut jobs = vec![exit_processor.job, data_processor.job, health_processor.job];
let mut jobs = vec![exit_processor.job, health_processor.job];
jobs.extend(data_processor.jobs);

if let Some(logger) = logger {
jobs.push(logger.job)
Expand Down
14 changes: 8 additions & 6 deletions bin/service-mango-health/src/processors/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tracing::warn;

pub struct DataProcessor {
pub channel: tokio::sync::broadcast::Sender<DataEvent>,
pub job: JoinHandle<()>,
pub jobs: Vec<JoinHandle<()>>,
pub chain_data: Arc<RwLock<chain_data::ChainData>>,
}

Expand Down Expand Up @@ -52,7 +52,7 @@ impl DataProcessor {
) -> anyhow::Result<DataProcessor> {
let mut retry_counter = RetryCounter::new(2);
let mango_group = Pubkey::from_str(&configuration.mango_group)?;
let mango_stream =
let (mango_stream, snapshot_job) =
fail_or_retry!(retry_counter, Self::init_mango_source(configuration).await)?;
let (sender, _) = tokio::sync::broadcast::channel(8192);
let sender_clone = sender.clone();
Expand Down Expand Up @@ -98,7 +98,7 @@ impl DataProcessor {

let result = DataProcessor {
channel: sender,
job,
jobs: vec![job, snapshot_job],
chain_data,
};

Expand Down Expand Up @@ -147,7 +147,9 @@ impl DataProcessor {
return Some(Other);
}

async fn init_mango_source(configuration: &Configuration) -> anyhow::Result<Receiver<Message>> {
async fn init_mango_source(
configuration: &Configuration,
) -> anyhow::Result<(Receiver<Message>, JoinHandle<()>)> {
//
// Client setup
//
Expand Down Expand Up @@ -192,7 +194,7 @@ impl DataProcessor {

// Getting solana account snapshots via jsonrpc
// FUTURE: of what to fetch a snapshot - should probably take as an input
snapshot_source::start(
let snapshot_job = snapshot_source::start(
snapshot_source::Config {
rpc_http_url: configuration.rpc_http_url.clone(),
mango_group,
Expand All @@ -205,6 +207,6 @@ impl DataProcessor {
account_update_sender,
);

Ok(account_update_receiver)
Ok((account_update_receiver, snapshot_job))
}
}
3 changes: 2 additions & 1 deletion bin/settler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async fn main() -> anyhow::Result<()> {

// Getting solana account snapshots via jsonrpc
// FUTURE: of what to fetch a snapshot - should probably take as an input
snapshot_source::start(
let snapshot_job = snapshot_source::start(
snapshot_source::Config {
rpc_http_url: rpc_url.clone(),
mango_group,
Expand Down Expand Up @@ -353,6 +353,7 @@ async fn main() -> anyhow::Result<()> {

use futures::StreamExt;
let mut jobs: futures::stream::FuturesUnordered<_> = vec![
snapshot_job,
data_job,
settle_job,
tcs_start_job,
Expand Down
11 changes: 9 additions & 2 deletions lib/client/src/snapshot_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use solana_rpc::rpc::rpc_accounts::AccountsDataClient;
use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient;
use std::str::FromStr;
use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
use tokio::time;
use tracing::*;

Expand Down Expand Up @@ -223,11 +224,15 @@ async fn feed_snapshots(
Ok(())
}

pub fn start(config: Config, mango_oracles: Vec<Pubkey>, sender: async_channel::Sender<Message>) {
pub fn start(
config: Config,
mango_oracles: Vec<Pubkey>,
sender: async_channel::Sender<Message>,
) -> JoinHandle<()> {
let mut poll_wait_first_snapshot = crate::delay_interval(time::Duration::from_secs(2));
let mut interval_between_snapshots = crate::delay_interval(config.snapshot_interval);

tokio::spawn(async move {
let snapshot_job = tokio::spawn(async move {
let rpc_client = http::connect_with_options::<MinimalClient>(&config.rpc_http_url, true)
.await
.expect("always Ok");
Expand Down Expand Up @@ -260,4 +265,6 @@ pub fn start(config: Config, mango_oracles: Vec<Pubkey>, sender: async_channel::
};
}
});

snapshot_job
}
Loading