diff --git a/chaindata_standalone/src/main.rs b/chaindata_standalone/src/main.rs index 931a0c9..ece6282 100644 --- a/chaindata_standalone/src/main.rs +++ b/chaindata_standalone/src/main.rs @@ -3,14 +3,14 @@ use std::env; use std::str::FromStr; use std::sync::{Arc, RwLock}; use std::time::Duration; -use async_channel::Sender; use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message}; use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{create_geyser_autoconnection_task}; +use itertools::Itertools; use log::{debug, info, trace, warn}; +use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; -use tokio::sync::broadcast; -use tokio::sync::mpsc::Receiver; +use tokio::sync::{mpsc, broadcast}; use tokio::task::JoinHandle; use tokio::time::sleep; use tracing_subscriber::EnvFilter; @@ -20,6 +20,7 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use mango_feeds_connector::chain_data::{ChainData, SlotStatus}; use mango_feeds_connector::{AccountWrite, SlotUpdate}; +use crate::account_write::account_write_from; use crate::get_program_account::get_snapshot_gpa; use crate::router_impl::{AccountOrSnapshotUpdate, spawn_updater_job}; @@ -55,13 +56,18 @@ pub async fn main() { let (_jh_grpc_source, grpc_accounts_rx) = create_geyser_autoconnection_task(grpc_source_config.clone(), raydium_accounts(), exit_sender.subscribe()); - let (account_write_sender, account_write_receiver) = async_channel::unbounded::(); + let (account_write_sender, account_write_receiver) = mpsc::channel::(100_000); let (slot_sender, slot_receiver) = async_channel::unbounded::(); let (account_update_sender, _) = broadcast::channel(524288); // 524288 - // TODO exit start_plumbing_task(grpc_accounts_rx, account_write_sender.clone(), slot_sender.clone()); + let rpc_http_url = env::var("RPC_HTTP_URL").expect("need http rpc url"); + start_gpa_snapshot_fetcher( + rpc_http_url, Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8").unwrap(), + account_write_sender.clone()); + + let chain_data = Arc::new(RwLock::new(ChainData::new())); let job1 = router_impl::start_chaindata_updating( chain_data.clone(), @@ -71,9 +77,6 @@ pub async fn main() { exit_sender.subscribe(), ); - let rpc_http_url = env::var("RPC_HTTP_URL").expect("need http rpc url"); - start_gpa_snapshot_fetcher(rpc_http_url, Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8").unwrap()); - let job2 = spawn_updater_job( chain_data.clone(), account_update_sender.subscribe(), @@ -93,13 +96,36 @@ pub async fn main() { info!("quitting."); } -fn start_gpa_snapshot_fetcher(rpc_http_url: String, program_id: Pubkey) { +fn start_gpa_snapshot_fetcher(rpc_http_url: String, program_id: Pubkey, account_write_sender: mpsc::Sender) { tokio::spawn(async move { info!("loading snapshot from compressed gPA RPC endpoint ..."); let rpc_http_url = rpc_http_url.clone(); // 675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8 -> 796157 accounts, 50s let snapshot = get_snapshot_gpa(&rpc_http_url, &program_id, true).await.unwrap(); info!("downloaded snapshot for slot {} with {:?} accounts", snapshot.slot, snapshot.accounts.len()); + + for update_chunk in snapshot.accounts.chunks(1024) { + let chunk = update_chunk.into_iter() + .map(|update| { + let slot = update.slot; + let pubkey = Pubkey::try_from(update.pubkey.clone()).unwrap(); + AccountWrite { + pubkey, + slot, + write_version: update.write_version, + lamports: update.lamports, + owner: Pubkey::try_from(update.owner.clone()).unwrap(), + executable: update.executable, + rent_epoch: update.rent_epoch, + data: update.data.clone(), // TODO not nice + is_selected: false, // what is that? + } + }).collect_vec(); + + info!("sending snapshot chunk with {} accounts", chunk.len()); + let _sent_res = account_write_sender.send(AccountOrSnapshotUpdate::SnapshotUpdate(chunk)).await; + } + }); } @@ -127,9 +153,9 @@ fn debug_chaindata(chain_data: Arc>, mut exit: broadcast::Rece // this is replacing the spawn_geyser_source task from router fn start_plumbing_task( - mut grpc_source_rx: Receiver, - account_write_sender: Sender, - slot_sender: Sender) { + mut grpc_source_rx: mpsc::Receiver, + account_write_sender: mpsc::Sender, + slot_sender: async_channel::Sender) { tokio::spawn(async move { info!("starting plumbing task"); loop { diff --git a/chaindata_standalone/src/router_impl.rs b/chaindata_standalone/src/router_impl.rs index 9abed2c..ed9286a 100644 --- a/chaindata_standalone/src/router_impl.rs +++ b/chaindata_standalone/src/router_impl.rs @@ -4,8 +4,9 @@ use mango_feeds_connector::{AccountWrite, SlotUpdate}; use solana_sdk::pubkey::Pubkey; use std::sync::{Arc, RwLock, RwLockWriteGuard}; use std::time::{Duration, Instant}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use tokio::task::JoinHandle; +use tokio::time::sleep; use tracing::{debug, error}; pub type ChainDataArcRw = Arc>; @@ -22,7 +23,7 @@ pub enum AccountOrSnapshotUpdate { pub fn start_chaindata_updating( chain_data: ChainDataArcRw, // = account_write_receiver - account_writes: async_channel::Receiver, + mut account_writes: mpsc::Receiver, slot_updates: async_channel::Receiver, account_update_sender: broadcast::Sender<(Pubkey, u64)>, mut exit: broadcast::Receiver<()>, @@ -37,23 +38,32 @@ pub fn start_chaindata_updating( break; } res = account_writes.recv() => { - let Ok(account_write) = res + let Some(update) = res else { warn!("account write channel err {res:?}"); continue; }; - // trace!("[account_write_receiver->chain_data] account update for {}@_slot_{} write_version={}", - // account_write.pubkey, account_write.slot, account_write.write_version); + + match &update { + AccountOrSnapshotUpdate::AccountUpdate(account_update) => { + trace!("[account_write_receiver->chain_data] account update for {}@_slot_{} write_version={}", + account_update.pubkey, account_update.slot, account_update.write_version); + } + AccountOrSnapshotUpdate::SnapshotUpdate(account_writes) => { + trace!("[account_write_receiver->chain_data] account update from snapshot with {} accounts", + account_writes.len()); + } + } let mut writer = chain_data.write().unwrap(); - handle_updated_account(&mut writer, account_write, &account_update_sender); + handle_updated_account(&mut writer, update, &account_update_sender); let mut batchsize: u32 = 0; let started_at = Instant::now(); - 'batch_loop: while let Ok(res) = account_writes.try_recv() { + 'batch_loop: while let Ok(bupdate) = account_writes.try_recv() { batchsize += 1; - handle_updated_account(&mut writer, res, &account_update_sender); + handle_updated_account(&mut writer, bupdate, &account_update_sender); // budget for microbatch if batchsize > 10 || started_at.elapsed() > Duration::from_micros(500) { @@ -67,6 +77,7 @@ pub fn start_chaindata_updating( warn!("slot channel err {res:?}"); continue; }; + chain_data.write().unwrap().update_slot(SlotData { slot: slot_update.slot, parent: slot_update.parent, @@ -126,6 +137,7 @@ fn handle_updated_account( one_update(chain_data, account_update_sender, account_write) } AccountOrSnapshotUpdate::SnapshotUpdate(snapshot) => { + debug!("Update from snapshot data: {}", snapshot.len()); for account_write in snapshot { one_update(chain_data, account_update_sender, account_write) }