Skip to content

Commit

Permalink
add Geyser transaction subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
Lou-Kamades committed Dec 27, 2023
1 parent 45cf47a commit 5def8c9
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 10 deletions.
76 changes: 69 additions & 7 deletions connector/src/grpc_plugin_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ use solana_account_decoder::UiAccount;
use solana_sdk::{account::Account, pubkey::Pubkey};

use futures::{future, future::FutureExt};
use yellowstone_grpc_proto::tonic::{
metadata::MetadataValue,
transport::{Certificate, Channel, ClientTlsConfig, Identity},
Request,
use yellowstone_grpc_proto::{
prelude::SubscribeRequestFilterTransactions,
tonic::{
metadata::MetadataValue,
transport::{Certificate, Channel, ClientTlsConfig, Identity},
Request,
},
};

use log::*;
Expand All @@ -22,6 +25,7 @@ use yellowstone_grpc_proto::prelude::{
};

use crate::snapshot::{get_snapshot_gma, get_snapshot_gpa};
use crate::TransactionUpdate;
use crate::{
chain_data::SlotStatus,
metrics::{MetricType, Metrics},
Expand Down Expand Up @@ -87,7 +91,7 @@ async fn feed_data_geyser(
let mut accounts = HashMap::new();
let mut slots = HashMap::new();
let blocks = HashMap::new();
let transactions = HashMap::new();
let mut transactions = HashMap::new();
let blocks_meta = HashMap::new();

match &filter_config.entity_filter {
Expand All @@ -113,9 +117,28 @@ async fn feed_data_geyser(
},
);
}

EntityFilter::FilterByAccountIdsTransactions(account_ids) => {
transactions.insert(
"client".to_owned(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
signature: None,
account_include: account_ids.iter().map(Pubkey::to_string).collect(),
account_exclude: vec![],
account_required: vec![],
},
);
}
}

slots.insert("client".to_owned(), SubscribeRequestFilterSlots {});
slots.insert(
"client".to_owned(),
SubscribeRequestFilterSlots {
filter_by_commitment: None,
},
);

let request = SubscribeRequest {
accounts,
Expand All @@ -126,6 +149,7 @@ async fn feed_data_geyser(
slots,
transactions,
accounts_data_slice: vec![],
ping: None,
};
info!("Going to send request: {:?}", request);

Expand Down Expand Up @@ -216,6 +240,10 @@ async fn feed_data_geyser(
let account_ids_typed = account_ids.iter().map(Pubkey::to_string).collect();
snapshot_gma = tokio::spawn(get_snapshot_gma(snapshot_rpc_http_url.clone(), account_ids_typed)).fuse();
},
EntityFilter::FilterByAccountIdsTransactions(account_ids) => {
let account_ids_typed = account_ids.iter().map(Pubkey::to_string).collect();
snapshot_gma = tokio::spawn(get_snapshot_gma(snapshot_rpc_http_url.clone(), account_ids_typed)).fuse();
},
EntityFilter::FilterByProgramId(program_id) => {
snapshot_gpa = tokio::spawn(get_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.to_string())).fuse();
},
Expand Down Expand Up @@ -267,6 +295,7 @@ async fn feed_data_geyser(
UpdateOneof::BlockMeta(_) => {},
UpdateOneof::Entry(_) => {},
UpdateOneof::Ping(_) => {},
UpdateOneof::Pong(_) => {},
}
sender.send(Message::GrpcUpdate(update)).await.expect("send success");
},
Expand Down Expand Up @@ -360,6 +389,7 @@ pub async fn process_events(
filter_config: &FilterConfig,
account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
transaction_queue_sender: async_channel::Sender<TransactionUpdate>,
metrics_sender: Metrics,
exit: Arc<AtomicBool>,
) {
Expand Down Expand Up @@ -443,6 +473,10 @@ pub async fn process_events(
metrics_sender.register_u64("grpc_slot_update_queue".into(), MetricType::Gauge);
let mut metric_slot_updates =
metrics_sender.register_u64("grpc_slot_updates".into(), MetricType::Counter);
let mut metric_transaction_queue =
metrics_sender.register_u64("grpc_transaction_queue".into(), MetricType::Gauge);
let mut metric_transaction_updates =
metrics_sender.register_u64("grpc_transaction_updates".into(), MetricType::Counter);
let mut metric_snapshots =
metrics_sender.register_u64("grpc_snapshots".into(), MetricType::Counter);
let mut metric_snapshot_account_writes =
Expand Down Expand Up @@ -527,10 +561,38 @@ pub async fn process_events(
.expect("send success");
}
UpdateOneof::Block(_) => {}
UpdateOneof::Transaction(_) => {}
UpdateOneof::Transaction(update) => {
metric_transaction_updates.increment();
metric_transaction_queue.set(transaction_queue_sender.len() as u64);

if update.transaction.is_none() {
// TODO: handle error
continue;
}
let transaction_info = update.transaction.unwrap();
if transaction_info.transaction.is_none() || transaction_info.meta.is_none()
{
// TODO: handle error
continue;
}
let transaction = transaction_info.transaction.unwrap();
let meta = transaction_info.meta.unwrap();

let transaction_update = TransactionUpdate {
slot: update.slot,
transaction,
meta,
};

transaction_queue_sender
.send(transaction_update)
.await
.expect("send success");
}
UpdateOneof::BlockMeta(_) => {}
UpdateOneof::Entry(_) => {}
UpdateOneof::Ping(_) => {}
UpdateOneof::Pong(_) => {}
}
}
Message::Snapshot(update) => {
Expand Down
18 changes: 16 additions & 2 deletions connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ pub mod websocket_source;

use itertools::Itertools;
use std::str::FromStr;
use yellowstone_grpc_proto::prelude::{Transaction, TransactionStatusMeta};
use {
serde_derive::Deserialize,
solana_sdk::{account::Account, pubkey::Pubkey},
};

pub use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient as GetProgramAccountsClient;

pub use solana_sdk;

trait AnyhowWrap {
Expand Down Expand Up @@ -64,6 +63,13 @@ pub struct SlotUpdate {
pub status: chain_data::SlotStatus,
}

#[derive(Clone, Debug)]
pub struct TransactionUpdate {
pub slot: u64,
pub transaction: Transaction,
pub meta: TransactionStatusMeta,
}

#[derive(Clone, Debug, Deserialize)]
pub struct TlsConfig {
pub ca_cert_path: String,
Expand Down Expand Up @@ -97,6 +103,7 @@ pub struct SnapshotSourceConfig {
#[derive(Clone, Debug, Deserialize)]
pub enum EntityFilter {
FilterByAccountIds(Vec<Pubkey>),
FilterByAccountIdsTransactions(Vec<Pubkey>),
FilterByProgramId(Pubkey),
}
impl EntityFilter {
Expand All @@ -110,6 +117,13 @@ impl EntityFilter {
.collect_vec();
EntityFilter::FilterByAccountIds(accounts_ids_typed)
}
pub fn filter_by_account_ids_transactions(account_ids: Vec<&str>) -> Self {
let accounts_ids_typed = account_ids
.into_iter()
.map(|id| Pubkey::from_str(id).unwrap())
.collect_vec();
EntityFilter::FilterByAccountIdsTransactions(accounts_ids_typed)
}
}

#[derive(Clone, Debug, Deserialize)]
Expand Down
6 changes: 5 additions & 1 deletion connector/src/websocket_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ enum WebsocketMessage {
SingleUpdate(Response<RpcKeyedAccount>),
SnapshotUpdate((Slot, Vec<(String, Option<UiAccount>)>)),
SlotUpdate(Arc<solana_client::rpc_response::SlotUpdate>),
//TransactionUpdate(),
}

async fn feed_data(
Expand All @@ -50,6 +51,9 @@ async fn feed_data(
let account_ids_typed = account_ids.iter().map(Pubkey::to_string).collect();
feed_data_by_accounts(config, account_ids_typed, sender).await
}
EntityFilter::FilterByAccountIdsTransactions(_) => {
todo!("Implement transaction subscriptions for websockets");
}
EntityFilter::FilterByProgramId(program_id) => {
feed_data_by_program(config, program_id.to_string(), sender).await
}
Expand Down Expand Up @@ -393,7 +397,7 @@ pub async fn process_events(
if let Some(message) = message {
slot_queue_sender.send(message).await.expect("send success");
}
}
} //TransactionUpdate
}
}
}

0 comments on commit 5def8c9

Please sign in to comment.