Skip to content

Commit

Permalink
Ignore data before first processed block after startup message (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Jan 23, 2023
1 parent e4c74dc commit ce18a1a
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 60 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "solana-geyser-grpc"
version = "0.4.0+solana.1.14.10"
version = "0.4.1+solana.1.14.10"
authors = ["Triton One"]
edition = "2021"

Expand Down
147 changes: 89 additions & 58 deletions src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
grpc::{GrpcService, Message, MessageTransaction, MessageTransactionInfo},
prom::{self, PrometheusService},
},
log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
Expand All @@ -18,6 +19,8 @@ use {
#[derive(Debug)]
pub struct PluginInner {
runtime: Runtime,
startup_received: bool,
startup_processed_received: bool,
grpc_channel: mpsc::UnboundedSender<Message>,
grpc_shutdown_tx: oneshot::Sender<()>,
prometheus: PrometheusService,
Expand All @@ -29,6 +32,21 @@ pub struct Plugin {
inner: Option<PluginInner>,
}

impl Plugin {
fn with_inner<F>(&mut self, f: F) -> PluginResult<()>
where
F: FnOnce(&mut PluginInner) -> PluginResult<()>,
{
// Before processed slot after end of startup message we will fail to construct full block
let inner = self.inner.as_mut().expect("initialized");
if inner.startup_received && inner.startup_processed_received {
f(inner)
} else {
Ok(())
}
}
}

impl GeyserPlugin for Plugin {
fn name(&self) -> &'static str {
"GeyserGrpcPublic"
Expand All @@ -52,6 +70,8 @@ impl GeyserPlugin for Plugin {

self.inner = Some(PluginInner {
runtime,
startup_received: false,
startup_processed_received: false,
grpc_channel,
grpc_shutdown_tx,
prometheus,
Expand All @@ -70,17 +90,23 @@ impl GeyserPlugin for Plugin {
}
}

fn notify_end_of_startup(&mut self) -> PluginResult<()> {
let inner = self.inner.as_mut().expect("initialized");
inner.startup_received = true;
Ok(())
}

fn update_account(
&mut self,
account: ReplicaAccountInfoVersions,
slot: u64,
is_startup: bool,
) -> PluginResult<()> {
let inner = self.inner.as_ref().expect("initialized");
let message = Message::Account((account, slot, is_startup).into());
let _ = inner.grpc_channel.send(message);

Ok(())
self.with_inner(|inner| {
let message = Message::Account((account, slot, is_startup).into());
let _ = inner.grpc_channel.send(message);
Ok(())
})
}

fn update_slot_status(
Expand All @@ -89,79 +115,84 @@ impl GeyserPlugin for Plugin {
parent: Option<u64>,
status: SlotStatus,
) -> PluginResult<()> {
let inner = self.inner.as_ref().expect("initialized");
let message = Message::Slot((slot, parent, status).into());
let _ = inner.grpc_channel.send(message);

prom::update_slot_status(slot, status);
let inner = self.inner.as_mut().expect("initialized");
if inner.startup_received
&& !inner.startup_processed_received
&& status == SlotStatus::Processed
{
inner.startup_processed_received = true;
}

Ok(())
self.with_inner(|inner| {
let message = Message::Slot((slot, parent, status).into());
let _ = inner.grpc_channel.send(message);
prom::update_slot_status(slot, status);
Ok(())
})
}

fn notify_transaction(
&mut self,
transaction: ReplicaTransactionInfoVersions<'_>,
slot: u64,
) -> PluginResult<()> {
let inner = self.inner.as_mut().expect("initialized");

let msg_tx: MessageTransaction = (transaction, slot).into();
match &mut inner.transactions {
Some((current_slot, transactions)) if *current_slot == slot => {
transactions.push(msg_tx.transaction.clone());
}
Some((current_slot, _)) => {
prom::block_transactions::inc_tx();
let msg = format!(
"got tx from block {}, while current block is {}",
slot, current_slot
);
log::error!("{}", msg);
return Err(GeyserPluginError::Custom(msg.into()));
self.with_inner(|inner| {
let msg_tx: MessageTransaction = (transaction, slot).into();
match &mut inner.transactions {
Some((current_slot, transactions)) if *current_slot == slot => {
transactions.push(msg_tx.transaction.clone());
}
Some((current_slot, _)) => {
prom::block_transactions::inc_tx();
let msg = format!(
"got tx from block {}, while current block is {}",
slot, current_slot
);
error!("{}", msg);
return Err(GeyserPluginError::Custom(msg.into()));
}
None => {
inner.transactions = Some((slot, vec![msg_tx.transaction.clone()]));
}
}
None => {
inner.transactions = Some((slot, vec![msg_tx.transaction.clone()]));
}
}

let message = Message::Transaction(msg_tx);
let _ = inner.grpc_channel.send(message);
let message = Message::Transaction(msg_tx);
let _ = inner.grpc_channel.send(message);

Ok(())
Ok(())
})
}

fn notify_block_metadata(
&mut self,
blockinfo: ReplicaBlockInfoVersions<'_>,
) -> PluginResult<()> {
let inner = self.inner.as_mut().expect("initialized");

let ReplicaBlockInfoVersions::V0_0_1(block) = &blockinfo;
let transactions = match inner.transactions.take() {
Some((slot, transactions)) if slot == block.slot => transactions,
Some((slot, _)) => {
prom::block_transactions::inc_block();
let msg = format!(
self.with_inner(|inner| {
let ReplicaBlockInfoVersions::V0_0_1(block) = &blockinfo;
let transactions = match inner.transactions.take() {
Some((slot, transactions)) if slot == block.slot => Ok(transactions),
Some((slot, _)) => Err(format!(
"invalid transactions for block {}, found {}",
block.slot, slot
);
log::error!("{}", msg);
return Err(GeyserPluginError::Custom(msg.into()));
}
None => {
prom::block_transactions::inc_block();
let msg = format!("no transactions for block {}", block.slot);
log::error!("{}", msg);
return Err(GeyserPluginError::Custom(msg.into()));
)),
None => Err(format!("no transactions for block {}", block.slot)),
};

match transactions {
Ok(transactions) => {
let message = Message::Block((&blockinfo, transactions).into());
let _ = inner.grpc_channel.send(message);
let message = Message::BlockMeta((&blockinfo).into());
let _ = inner.grpc_channel.send(message);
Ok(())
}
Err(msg) => {
prom::block_transactions::inc_block();
error!("{msg}");
Err(GeyserPluginError::Custom(msg.into()))
}
}
};

let message = Message::Block((&blockinfo, transactions).into());
let _ = inner.grpc_channel.send(message);
let message = Message::BlockMeta((&blockinfo).into());
let _ = inner.grpc_channel.send(message);

Ok(())
})
}

fn transaction_notifications_enabled(&self) -> bool {
Expand Down

0 comments on commit ce18a1a

Please sign in to comment.