From ce18a1ac925df91c9c5cffce09dfe452352c7a14 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Mon, 23 Jan 2023 08:33:27 -0300 Subject: [PATCH] Ignore data before first processed block after startup message (#33) --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/plugin.rs | 147 ++++++++++++++++++++++++++++++-------------------- 3 files changed, 91 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a5e2d65..a891f069 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2554,7 +2554,7 @@ dependencies = [ [[package]] name = "solana-geyser-grpc" -version = "0.4.0+solana.1.14.10" +version = "0.4.1+solana.1.14.10" dependencies = [ "anyhow", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 7d415b55..7dc196f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "solana-geyser-grpc" -version = "0.4.0+solana.1.14.10" +version = "0.4.1+solana.1.14.10" authors = ["Triton One"] edition = "2021" diff --git a/src/plugin.rs b/src/plugin.rs index 893171e6..c6dfdddc 100644 --- a/src/plugin.rs +++ b/src/plugin.rs @@ -4,6 +4,7 @@ use { grpc::{GrpcService, Message, MessageTransaction, MessageTransactionInfo}, prom::{self, PrometheusService}, }, + log::*, solana_geyser_plugin_interface::geyser_plugin_interface::{ GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, @@ -18,6 +19,8 @@ use { #[derive(Debug)] pub struct PluginInner { runtime: Runtime, + startup_received: bool, + startup_processed_received: bool, grpc_channel: mpsc::UnboundedSender, grpc_shutdown_tx: oneshot::Sender<()>, prometheus: PrometheusService, @@ -29,6 +32,21 @@ pub struct Plugin { inner: Option, } +impl Plugin { + fn with_inner(&mut self, f: F) -> PluginResult<()> + where + F: FnOnce(&mut PluginInner) -> PluginResult<()>, + { + // Before processed slot after end of startup message we will fail to construct full block + let inner = self.inner.as_mut().expect("initialized"); + if inner.startup_received && inner.startup_processed_received { + f(inner) + } else { + Ok(()) + } + } +} + impl GeyserPlugin for Plugin { fn name(&self) -> &'static str { "GeyserGrpcPublic" @@ -52,6 +70,8 @@ impl GeyserPlugin for Plugin { self.inner = Some(PluginInner { runtime, + startup_received: false, + startup_processed_received: false, grpc_channel, grpc_shutdown_tx, prometheus, @@ -70,17 +90,23 @@ impl GeyserPlugin for Plugin { } } + fn notify_end_of_startup(&mut self) -> PluginResult<()> { + let inner = self.inner.as_mut().expect("initialized"); + inner.startup_received = true; + Ok(()) + } + fn update_account( &mut self, account: ReplicaAccountInfoVersions, slot: u64, is_startup: bool, ) -> PluginResult<()> { - let inner = self.inner.as_ref().expect("initialized"); - let message = Message::Account((account, slot, is_startup).into()); - let _ = inner.grpc_channel.send(message); - - Ok(()) + self.with_inner(|inner| { + let message = Message::Account((account, slot, is_startup).into()); + let _ = inner.grpc_channel.send(message); + Ok(()) + }) } fn update_slot_status( @@ -89,13 +115,20 @@ impl GeyserPlugin for Plugin { parent: Option, status: SlotStatus, ) -> PluginResult<()> { - let inner = self.inner.as_ref().expect("initialized"); - let message = Message::Slot((slot, parent, status).into()); - let _ = inner.grpc_channel.send(message); - - prom::update_slot_status(slot, status); + let inner = self.inner.as_mut().expect("initialized"); + if inner.startup_received + && !inner.startup_processed_received + && status == SlotStatus::Processed + { + inner.startup_processed_received = true; + } - Ok(()) + self.with_inner(|inner| { + let message = Message::Slot((slot, parent, status).into()); + let _ = inner.grpc_channel.send(message); + prom::update_slot_status(slot, status); + Ok(()) + }) } fn notify_transaction( @@ -103,65 +136,63 @@ impl GeyserPlugin for Plugin { transaction: ReplicaTransactionInfoVersions<'_>, slot: u64, ) -> PluginResult<()> { - let inner = self.inner.as_mut().expect("initialized"); - - let msg_tx: MessageTransaction = (transaction, slot).into(); - match &mut inner.transactions { - Some((current_slot, transactions)) if *current_slot == slot => { - transactions.push(msg_tx.transaction.clone()); - } - Some((current_slot, _)) => { - prom::block_transactions::inc_tx(); - let msg = format!( - "got tx from block {}, while current block is {}", - slot, current_slot - ); - log::error!("{}", msg); - return Err(GeyserPluginError::Custom(msg.into())); + self.with_inner(|inner| { + let msg_tx: MessageTransaction = (transaction, slot).into(); + match &mut inner.transactions { + Some((current_slot, transactions)) if *current_slot == slot => { + transactions.push(msg_tx.transaction.clone()); + } + Some((current_slot, _)) => { + prom::block_transactions::inc_tx(); + let msg = format!( + "got tx from block {}, while current block is {}", + slot, current_slot + ); + error!("{}", msg); + return Err(GeyserPluginError::Custom(msg.into())); + } + None => { + inner.transactions = Some((slot, vec![msg_tx.transaction.clone()])); + } } - None => { - inner.transactions = Some((slot, vec![msg_tx.transaction.clone()])); - } - } - let message = Message::Transaction(msg_tx); - let _ = inner.grpc_channel.send(message); + let message = Message::Transaction(msg_tx); + let _ = inner.grpc_channel.send(message); - Ok(()) + Ok(()) + }) } fn notify_block_metadata( &mut self, blockinfo: ReplicaBlockInfoVersions<'_>, ) -> PluginResult<()> { - let inner = self.inner.as_mut().expect("initialized"); - - let ReplicaBlockInfoVersions::V0_0_1(block) = &blockinfo; - let transactions = match inner.transactions.take() { - Some((slot, transactions)) if slot == block.slot => transactions, - Some((slot, _)) => { - prom::block_transactions::inc_block(); - let msg = format!( + self.with_inner(|inner| { + let ReplicaBlockInfoVersions::V0_0_1(block) = &blockinfo; + let transactions = match inner.transactions.take() { + Some((slot, transactions)) if slot == block.slot => Ok(transactions), + Some((slot, _)) => Err(format!( "invalid transactions for block {}, found {}", block.slot, slot - ); - log::error!("{}", msg); - return Err(GeyserPluginError::Custom(msg.into())); - } - None => { - prom::block_transactions::inc_block(); - let msg = format!("no transactions for block {}", block.slot); - log::error!("{}", msg); - return Err(GeyserPluginError::Custom(msg.into())); + )), + None => Err(format!("no transactions for block {}", block.slot)), + }; + + match transactions { + Ok(transactions) => { + let message = Message::Block((&blockinfo, transactions).into()); + let _ = inner.grpc_channel.send(message); + let message = Message::BlockMeta((&blockinfo).into()); + let _ = inner.grpc_channel.send(message); + Ok(()) + } + Err(msg) => { + prom::block_transactions::inc_block(); + error!("{msg}"); + Err(GeyserPluginError::Custom(msg.into())) + } } - }; - - let message = Message::Block((&blockinfo, transactions).into()); - let _ = inner.grpc_channel.send(message); - let message = Message::BlockMeta((&blockinfo).into()); - let _ = inner.grpc_channel.send(message); - - Ok(()) + }) } fn transaction_notifications_enabled(&self) -> bool {