Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

collect min prioritization fees when replaying sanitized transactions (backport #26709) #29539

Merged
merged 4 commits into from
Jan 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions core/src/forward_packet_batches_by_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,12 @@ impl ForwardPacketBatchesByAccounts {
mod tests {
use {
super::*,
crate::{
transaction_priority_details::TransactionPriorityDetails,
unprocessed_packet_batches::DeserializedPacket,
},
crate::unprocessed_packet_batches::DeserializedPacket,
solana_runtime::{
bank::Bank,
bank_forks::BankForks,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
transaction_priority_details::TransactionPriorityDetails,
},
solana_sdk::{hash::Hash, signature::Keypair, system_transaction},
std::sync::RwLock,
Expand Down
137 changes: 137 additions & 0 deletions core/src/immutable_deserialized_packet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use {
solana_perf::packet::Packet,
solana_runtime::transaction_priority_details::{
GetTransactionPriorityDetails, TransactionPriorityDetails,
},
solana_sdk::{
hash::Hash,
message::Message,
sanitize::SanitizeError,
short_vec::decode_shortu16_len,
signature::Signature,
transaction::{SanitizedVersionedTransaction, VersionedTransaction},
},
std::{cmp::Ordering, mem::size_of},
thiserror::Error,
};

#[derive(Debug, Error)]
pub enum DeserializedPacketError {
#[error("ShortVec Failed to Deserialize")]
// short_vec::decode_shortu16_len() currently returns () on error
ShortVecError(()),
#[error("Deserialization Error: {0}")]
DeserializationError(#[from] bincode::Error),
#[error("overflowed on signature size {0}")]
SignatureOverflowed(usize),
#[error("packet failed sanitization {0}")]
SanitizeError(#[from] SanitizeError),
#[error("transaction failed prioritization")]
PrioritizationFailure,
}

#[derive(Debug, PartialEq, Eq)]
pub struct ImmutableDeserializedPacket {
original_packet: Packet,
transaction: SanitizedVersionedTransaction,
message_hash: Hash,
is_simple_vote: bool,
priority_details: TransactionPriorityDetails,
}

impl ImmutableDeserializedPacket {
pub fn new(
packet: Packet,
priority_details: Option<TransactionPriorityDetails>,
) -> Result<Self, DeserializedPacketError> {
let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?;
let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?;
let message_bytes = packet_message(&packet)?;
let message_hash = Message::hash_raw_message(message_bytes);
let is_simple_vote = packet.meta.is_simple_vote_tx();

// drop transaction if prioritization fails.
let priority_details = priority_details
.or_else(|| sanitized_transaction.get_transaction_priority_details())
.ok_or(DeserializedPacketError::PrioritizationFailure)?;

Ok(Self {
original_packet: packet,
transaction: sanitized_transaction,
message_hash,
is_simple_vote,
priority_details,
})
}

pub fn original_packet(&self) -> &Packet {
&self.original_packet
}

pub fn transaction(&self) -> &SanitizedVersionedTransaction {
&self.transaction
}

pub fn message_hash(&self) -> &Hash {
&self.message_hash
}

pub fn is_simple_vote(&self) -> bool {
self.is_simple_vote
}

pub fn priority(&self) -> u64 {
self.priority_details.priority
}

pub fn compute_unit_limit(&self) -> u64 {
self.priority_details.compute_unit_limit
}
}

impl PartialOrd for ImmutableDeserializedPacket {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ImmutableDeserializedPacket {
fn cmp(&self, other: &Self) -> Ordering {
self.priority().cmp(&other.priority())
}
}

/// Read the transaction message from packet data
fn packet_message(packet: &Packet) -> Result<&[u8], DeserializedPacketError> {
let (sig_len, sig_size) = packet
.data(..)
.and_then(|bytes| decode_shortu16_len(bytes).ok())
.ok_or(DeserializedPacketError::ShortVecError(()))?;
sig_len
.checked_mul(size_of::<Signature>())
.and_then(|v| v.checked_add(sig_size))
.and_then(|msg_start| packet.data(msg_start..))
.ok_or(DeserializedPacketError::SignatureOverflowed(sig_size))
}

#[cfg(test)]
mod tests {
use {
super::*,
solana_sdk::{signature::Keypair, system_transaction},
};

#[test]
fn simple_deserialized_packet() {
let tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
);
let packet = Packet::from_data(None, &tx).unwrap();
let deserialized_packet = ImmutableDeserializedPacket::new(packet, None);

assert!(matches!(deserialized_packet, Ok(_)));
}
}
1 change: 0 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ mod tower1_7_14;
pub mod tower_storage;
pub mod tpu;
pub mod tracer_packet_stats;
pub mod transaction_priority_details;
pub mod tree_diff;
pub mod tvu;
pub mod unfrozen_gossip_verified_vote_hashes;
Expand Down
19 changes: 19 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use {
bank::{Bank, NewBankOptions},
bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY},
commitment::BlockCommitmentCache,
prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{
Expand Down Expand Up @@ -397,6 +398,7 @@ impl ReplayStage {
drop_bank_sender: Sender<Vec<Arc<Bank>>>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> Self {
let mut tower = if let Some(process_blockstore) = maybe_process_blockstore {
let tower = process_blockstore.process_to_create_tower();
Expand Down Expand Up @@ -530,6 +532,7 @@ impl ReplayStage {
block_metadata_notifier.clone(),
&mut replay_timing,
log_messages_bytes_limit,
&prioritization_fee_cache,
);
replay_active_banks_time.stop();

Expand Down Expand Up @@ -1757,6 +1760,7 @@ impl ReplayStage {
replay_vote_sender: &ReplayVoteSender,
verify_recyclers: &VerifyRecyclers,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> result::Result<usize, BlockstoreProcessorError> {
let mut w_replay_stats = replay_stats.write().unwrap();
let mut w_replay_progress = replay_progress.write().unwrap();
Expand All @@ -1776,6 +1780,7 @@ impl ReplayStage {
verify_recyclers,
false,
log_messages_bytes_limit,
prioritization_fee_cache,
)?;
let tx_count_after = w_replay_progress.num_txs;
let tx_count = tx_count_after - tx_count_before;
Expand Down Expand Up @@ -2278,6 +2283,7 @@ impl ReplayStage {
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
active_bank_slots: &[Slot],
prioritization_fee_cache: &PrioritizationFeeCache,
) -> Vec<ReplaySlotFromBlockstore> {
// Make mutable shared structures thread safe.
let progress = RwLock::new(progress);
Expand Down Expand Up @@ -2353,6 +2359,7 @@ impl ReplayStage {
&replay_vote_sender.clone(),
&verify_recyclers.clone(),
log_messages_bytes_limit,
prioritization_fee_cache,
);
replay_blockstore_time.stop();
replay_result.replay_result = Some(blockstore_result);
Expand Down Expand Up @@ -2383,6 +2390,7 @@ impl ReplayStage {
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
bank_slot: Slot,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> ReplaySlotFromBlockstore {
let mut replay_result = ReplaySlotFromBlockstore {
is_slot_dead: false,
Expand Down Expand Up @@ -2432,6 +2440,7 @@ impl ReplayStage {
&replay_vote_sender.clone(),
&verify_recyclers.clone(),
log_messages_bytes_limit,
prioritization_fee_cache,
);
replay_blockstore_time.stop();
replay_result.replay_result = Some(blockstore_result);
Expand Down Expand Up @@ -2463,6 +2472,7 @@ impl ReplayStage {
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
replay_result_vec: &[ReplaySlotFromBlockstore],
prioritization_fee_cache: &PrioritizationFeeCache,
) -> bool {
// TODO: See if processing of blockstore replay results and bank completion can be made thread safe.
let mut did_complete_bank = false;
Expand Down Expand Up @@ -2535,6 +2545,9 @@ impl ReplayStage {
warn!("cost_update_sender failed sending bank stats: {:?}", err)
});

// finalize block's minimum prioritization fee cache for this bank
prioritization_fee_cache.finalize_priority_fee(bank.slot());

assert_ne!(bank.hash(), Hash::default());
// Needs to be updated before `check_slot_agrees_with_cluster()` so that
// any updates in `check_slot_agrees_with_cluster()` on fork choice take
Expand Down Expand Up @@ -2652,6 +2665,7 @@ impl ReplayStage {
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> bool {
let active_bank_slots = bank_forks.read().unwrap().active_bank_slots();
let num_active_banks = active_bank_slots.len();
Expand Down Expand Up @@ -2681,6 +2695,7 @@ impl ReplayStage {
replay_timing,
log_messages_bytes_limit,
&active_bank_slots,
prioritization_fee_cache,
)
} else {
active_bank_slots
Expand All @@ -2698,6 +2713,7 @@ impl ReplayStage {
replay_timing,
log_messages_bytes_limit,
*bank_slot,
prioritization_fee_cache,
)
})
.collect()
Expand All @@ -2715,6 +2731,7 @@ impl ReplayStage {
replay_timing,
log_messages_bytes_limit,
active_bank_slots[0],
prioritization_fee_cache,
)]
};

Expand All @@ -2739,6 +2756,7 @@ impl ReplayStage {
ancestor_hashes_replay_update_sender,
block_metadata_notifier,
&replay_result_vec,
prioritization_fee_cache,
)
} else {
false
Expand Down Expand Up @@ -4278,6 +4296,7 @@ pub(crate) mod tests {
&replay_vote_sender,
&VerifyRecyclers::default(),
None,
&PrioritizationFeeCache::new(0u64),
);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
Expand Down
6 changes: 5 additions & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use {
solana_runtime::{
accounts_background_service::AbsRequestSender, bank_forks::BankForks,
commitment::BlockCommitmentCache, cost_model::CostModel,
vote_sender_types::ReplayVoteSender,
prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
std::{
Expand Down Expand Up @@ -129,6 +129,7 @@ impl Tvu {
accounts_background_request_sender: AbsRequestSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: &Arc<ConnectionCache>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
) -> Self {
let TvuSockets {
repair: repair_socket,
Expand Down Expand Up @@ -288,6 +289,7 @@ impl Tvu {
drop_bank_sender,
block_metadata_notifier,
log_messages_bytes_limit,
prioritization_fee_cache.clone(),
);

let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
Expand Down Expand Up @@ -401,6 +403,7 @@ pub mod tests {
let (_, gossip_confirmed_slots_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let _ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let tvu = Tvu::new(
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
Expand Down Expand Up @@ -450,6 +453,7 @@ pub mod tests {
AbsRequestSender::default(),
None,
&Arc::new(ConnectionCache::default()),
&_ignored_prioritization_fee_cache,
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions core/src/unprocessed_packet_batches.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use {
crate::transaction_priority_details::{
GetTransactionPriorityDetails, TransactionPriorityDetails,
},
min_max_heap::MinMaxHeap,
solana_perf::packet::{Packet, PacketBatch},
solana_runtime::transaction_priority_details::{
GetTransactionPriorityDetails, TransactionPriorityDetails,
},
solana_sdk::{
feature_set,
hash::Hash,
Expand Down
6 changes: 6 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use {
commitment::BlockCommitmentCache,
cost_model::CostModel,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
prioritization_fee_cache::PrioritizationFeeCache,
runtime_config::RuntimeConfig,
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
Expand Down Expand Up @@ -793,6 +794,10 @@ impl Validator {
false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)),
};

// block min prioritization fee cache should be readable by RPC, and writable by validator
// (for now, by replay stage)
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());

let rpc_override_health_check = Arc::new(AtomicBool::new(false));
let (
json_rpc_service,
Expand Down Expand Up @@ -1015,6 +1020,7 @@ impl Validator {
accounts_background_request_sender,
config.runtime_config.log_messages_bytes_limit,
&connection_cache,
&prioritization_fee_cache,
);

let tpu = Tpu::new(
Expand Down
Loading