diff --git a/connector/src/grpc_plugin_source.rs b/connector/src/grpc_plugin_source.rs index 9d59a78..daf521f 100644 --- a/connector/src/grpc_plugin_source.rs +++ b/connector/src/grpc_plugin_source.rs @@ -5,10 +5,13 @@ use solana_account_decoder::UiAccount; use solana_sdk::{account::Account, pubkey::Pubkey}; use futures::{future, future::FutureExt}; -use yellowstone_grpc_proto::tonic::{ - metadata::MetadataValue, - transport::{Certificate, Channel, ClientTlsConfig, Identity}, - Request, +use yellowstone_grpc_proto::{ + prelude::SubscribeRequestFilterTransactions, + tonic::{ + metadata::MetadataValue, + transport::{Certificate, Channel, ClientTlsConfig, Identity}, + Request, + }, }; use log::*; @@ -22,6 +25,7 @@ use yellowstone_grpc_proto::prelude::{ }; use crate::snapshot::{get_snapshot_gma, get_snapshot_gpa}; +use crate::TransactionUpdate; use crate::{ chain_data::SlotStatus, metrics::{MetricType, Metrics}, @@ -87,7 +91,7 @@ async fn feed_data_geyser( let mut accounts = HashMap::new(); let mut slots = HashMap::new(); let blocks = HashMap::new(); - let transactions = HashMap::new(); + let mut transactions = HashMap::new(); let blocks_meta = HashMap::new(); match &filter_config.entity_filter { @@ -113,9 +117,28 @@ async fn feed_data_geyser( }, ); } + + EntityFilter::FilterByAccountIdsTransactions(account_ids) => { + transactions.insert( + "client".to_owned(), + SubscribeRequestFilterTransactions { + vote: Some(false), + failed: Some(false), + signature: None, + account_include: account_ids.iter().map(Pubkey::to_string).collect(), + account_exclude: vec![], + account_required: vec![], + }, + ); + } } - slots.insert("client".to_owned(), SubscribeRequestFilterSlots {}); + slots.insert( + "client".to_owned(), + SubscribeRequestFilterSlots { + filter_by_commitment: None, + }, + ); let request = SubscribeRequest { accounts, @@ -126,6 +149,7 @@ async fn feed_data_geyser( slots, transactions, accounts_data_slice: vec![], + ping: None, }; info!("Going to send request: {:?}", request); @@ -216,6 +240,10 @@ async fn feed_data_geyser( let account_ids_typed = account_ids.iter().map(Pubkey::to_string).collect(); snapshot_gma = tokio::spawn(get_snapshot_gma(snapshot_rpc_http_url.clone(), account_ids_typed)).fuse(); }, + EntityFilter::FilterByAccountIdsTransactions(account_ids) => { + let account_ids_typed = account_ids.iter().map(Pubkey::to_string).collect(); + snapshot_gma = tokio::spawn(get_snapshot_gma(snapshot_rpc_http_url.clone(), account_ids_typed)).fuse(); + }, EntityFilter::FilterByProgramId(program_id) => { snapshot_gpa = tokio::spawn(get_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.to_string())).fuse(); }, @@ -267,6 +295,7 @@ async fn feed_data_geyser( UpdateOneof::BlockMeta(_) => {}, UpdateOneof::Entry(_) => {}, UpdateOneof::Ping(_) => {}, + UpdateOneof::Pong(_) => {}, } sender.send(Message::GrpcUpdate(update)).await.expect("send success"); }, @@ -360,6 +389,7 @@ pub async fn process_events( filter_config: &FilterConfig, account_write_queue_sender: async_channel::Sender, slot_queue_sender: async_channel::Sender, + transaction_queue_sender: async_channel::Sender, metrics_sender: Metrics, exit: Arc, ) { @@ -443,6 +473,10 @@ pub async fn process_events( metrics_sender.register_u64("grpc_slot_update_queue".into(), MetricType::Gauge); let mut metric_slot_updates = metrics_sender.register_u64("grpc_slot_updates".into(), MetricType::Counter); + let mut metric_transaction_queue = + metrics_sender.register_u64("grpc_transaction_queue".into(), MetricType::Gauge); + let mut metric_transaction_updates = + metrics_sender.register_u64("grpc_transaction_updates".into(), MetricType::Counter); let mut metric_snapshots = metrics_sender.register_u64("grpc_snapshots".into(), MetricType::Counter); let mut metric_snapshot_account_writes = @@ -527,10 +561,38 @@ pub async fn process_events( .expect("send success"); } UpdateOneof::Block(_) => {} - UpdateOneof::Transaction(_) => {} + UpdateOneof::Transaction(update) => { + metric_transaction_updates.increment(); + metric_transaction_queue.set(transaction_queue_sender.len() as u64); + + if update.transaction.is_none() { + // TODO: handle error + continue; + } + let transaction_info = update.transaction.unwrap(); + if transaction_info.transaction.is_none() || transaction_info.meta.is_none() + { + // TODO: handle error + continue; + } + let transaction = transaction_info.transaction.unwrap(); + let meta = transaction_info.meta.unwrap(); + + let transaction_update = TransactionUpdate { + slot: update.slot, + transaction, + meta, + }; + + transaction_queue_sender + .send(transaction_update) + .await + .expect("send success"); + } UpdateOneof::BlockMeta(_) => {} UpdateOneof::Entry(_) => {} UpdateOneof::Ping(_) => {} + UpdateOneof::Pong(_) => {} } } Message::Snapshot(update) => { diff --git a/connector/src/lib.rs b/connector/src/lib.rs index 62e2cde..64dab0a 100644 --- a/connector/src/lib.rs +++ b/connector/src/lib.rs @@ -7,13 +7,12 @@ pub mod websocket_source; use itertools::Itertools; use std::str::FromStr; +use yellowstone_grpc_proto::prelude::{Transaction, TransactionStatusMeta}; use { serde_derive::Deserialize, solana_sdk::{account::Account, pubkey::Pubkey}, }; -pub use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient as GetProgramAccountsClient; - pub use solana_sdk; trait AnyhowWrap { @@ -64,6 +63,13 @@ pub struct SlotUpdate { pub status: chain_data::SlotStatus, } +#[derive(Clone, Debug)] +pub struct TransactionUpdate { + pub slot: u64, + pub transaction: Transaction, + pub meta: TransactionStatusMeta, +} + #[derive(Clone, Debug, Deserialize)] pub struct TlsConfig { pub ca_cert_path: String, @@ -97,6 +103,7 @@ pub struct SnapshotSourceConfig { #[derive(Clone, Debug, Deserialize)] pub enum EntityFilter { FilterByAccountIds(Vec), + FilterByAccountIdsTransactions(Vec), FilterByProgramId(Pubkey), } impl EntityFilter { @@ -110,6 +117,13 @@ impl EntityFilter { .collect_vec(); EntityFilter::FilterByAccountIds(accounts_ids_typed) } + pub fn filter_by_account_ids_transactions(account_ids: Vec<&str>) -> Self { + let accounts_ids_typed = account_ids + .into_iter() + .map(|id| Pubkey::from_str(id).unwrap()) + .collect_vec(); + EntityFilter::FilterByAccountIdsTransactions(accounts_ids_typed) + } } #[derive(Clone, Debug, Deserialize)] diff --git a/connector/src/websocket_source.rs b/connector/src/websocket_source.rs index ad28a3d..5e3980f 100644 --- a/connector/src/websocket_source.rs +++ b/connector/src/websocket_source.rs @@ -38,6 +38,7 @@ enum WebsocketMessage { SingleUpdate(Response), SnapshotUpdate((Slot, Vec<(String, Option)>)), SlotUpdate(Arc), + //TransactionUpdate(), } async fn feed_data( @@ -50,6 +51,9 @@ async fn feed_data( let account_ids_typed = account_ids.iter().map(Pubkey::to_string).collect(); feed_data_by_accounts(config, account_ids_typed, sender).await } + EntityFilter::FilterByAccountIdsTransactions(_) => { + todo!("Implement transaction subscriptions for websockets"); + } EntityFilter::FilterByProgramId(program_id) => { feed_data_by_program(config, program_id.to_string(), sender).await } @@ -393,7 +397,7 @@ pub async fn process_events( if let Some(message) = message { slot_queue_sender.send(message).await.expect("send success"); } - } + } //TransactionUpdate } } }