Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
collect min prioritization fees when replaying sanitized transactions…
Browse files Browse the repository at this point in the history
… (backport #26709) (#29539)

* collect min prioritization fees when replaying sanitized transactions (#26709)

* Collect blocks' minimum prioritization fees when replaying sanitized transactions

* Limits block min-fee metrics reporting to top 10 writable accounts

* Add service thread to asynchronously update and finalize prioritization fee cache

* Add bench test for prioritization_fee_cache

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
(cherry picked from commit 8bb039d)

# Conflicts:
#	core/src/immutable_deserialized_packet.rs
#	core/src/replay_stage.rs
#	core/src/tvu.rs
#	core/src/unprocessed_packet_batches.rs
#	core/src/validator.rs

* Fix conflicts

* Don't use api from the future

* fix a flacky test (#27572)

Co-authored-by: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com>
Co-authored-by: Tyera Eulberg <tyera@solana.com>
  • Loading branch information
3 people authored Jan 7, 2023
1 parent b76b9da commit 6c66421
Show file tree
Hide file tree
Showing 16 changed files with 1,450 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions core/src/forward_packet_batches_by_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,12 @@ impl ForwardPacketBatchesByAccounts {
mod tests {
use {
super::*,
crate::{
transaction_priority_details::TransactionPriorityDetails,
unprocessed_packet_batches::DeserializedPacket,
},
crate::unprocessed_packet_batches::DeserializedPacket,
solana_runtime::{
bank::Bank,
bank_forks::BankForks,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
transaction_priority_details::TransactionPriorityDetails,
},
solana_sdk::{hash::Hash, signature::Keypair, system_transaction},
std::sync::RwLock,
Expand Down
137 changes: 137 additions & 0 deletions core/src/immutable_deserialized_packet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use {
solana_perf::packet::Packet,
solana_runtime::transaction_priority_details::{
GetTransactionPriorityDetails, TransactionPriorityDetails,
},
solana_sdk::{
hash::Hash,
message::Message,
sanitize::SanitizeError,
short_vec::decode_shortu16_len,
signature::Signature,
transaction::{SanitizedVersionedTransaction, VersionedTransaction},
},
std::{cmp::Ordering, mem::size_of},
thiserror::Error,
};

#[derive(Debug, Error)]
pub enum DeserializedPacketError {
#[error("ShortVec Failed to Deserialize")]
// short_vec::decode_shortu16_len() currently returns () on error
ShortVecError(()),
#[error("Deserialization Error: {0}")]
DeserializationError(#[from] bincode::Error),
#[error("overflowed on signature size {0}")]
SignatureOverflowed(usize),
#[error("packet failed sanitization {0}")]
SanitizeError(#[from] SanitizeError),
#[error("transaction failed prioritization")]
PrioritizationFailure,
}

#[derive(Debug, PartialEq, Eq)]
pub struct ImmutableDeserializedPacket {
original_packet: Packet,
transaction: SanitizedVersionedTransaction,
message_hash: Hash,
is_simple_vote: bool,
priority_details: TransactionPriorityDetails,
}

impl ImmutableDeserializedPacket {
pub fn new(
packet: Packet,
priority_details: Option<TransactionPriorityDetails>,
) -> Result<Self, DeserializedPacketError> {
let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?;
let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?;
let message_bytes = packet_message(&packet)?;
let message_hash = Message::hash_raw_message(message_bytes);
let is_simple_vote = packet.meta.is_simple_vote_tx();

// drop transaction if prioritization fails.
let priority_details = priority_details
.or_else(|| sanitized_transaction.get_transaction_priority_details())
.ok_or(DeserializedPacketError::PrioritizationFailure)?;

Ok(Self {
original_packet: packet,
transaction: sanitized_transaction,
message_hash,
is_simple_vote,
priority_details,
})
}

pub fn original_packet(&self) -> &Packet {
&self.original_packet
}

pub fn transaction(&self) -> &SanitizedVersionedTransaction {
&self.transaction
}

pub fn message_hash(&self) -> &Hash {
&self.message_hash
}

pub fn is_simple_vote(&self) -> bool {
self.is_simple_vote
}

pub fn priority(&self) -> u64 {
self.priority_details.priority
}

pub fn compute_unit_limit(&self) -> u64 {
self.priority_details.compute_unit_limit
}
}

impl PartialOrd for ImmutableDeserializedPacket {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ImmutableDeserializedPacket {
fn cmp(&self, other: &Self) -> Ordering {
self.priority().cmp(&other.priority())
}
}

/// Read the transaction message from packet data
fn packet_message(packet: &Packet) -> Result<&[u8], DeserializedPacketError> {
let (sig_len, sig_size) = packet
.data(..)
.and_then(|bytes| decode_shortu16_len(bytes).ok())
.ok_or(DeserializedPacketError::ShortVecError(()))?;
sig_len
.checked_mul(size_of::<Signature>())
.and_then(|v| v.checked_add(sig_size))
.and_then(|msg_start| packet.data(msg_start..))
.ok_or(DeserializedPacketError::SignatureOverflowed(sig_size))
}

#[cfg(test)]
mod tests {
use {
super::*,
solana_sdk::{signature::Keypair, system_transaction},
};

#[test]
fn simple_deserialized_packet() {
let tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
);
let packet = Packet::from_data(None, &tx).unwrap();
let deserialized_packet = ImmutableDeserializedPacket::new(packet, None);

assert!(matches!(deserialized_packet, Ok(_)));
}
}
1 change: 0 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ mod tower1_7_14;
pub mod tower_storage;
pub mod tpu;
pub mod tracer_packet_stats;
pub mod transaction_priority_details;
pub mod tree_diff;
pub mod tvu;
pub mod unfrozen_gossip_verified_vote_hashes;
Expand Down
19 changes: 19 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use {
bank::{Bank, NewBankOptions},
bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY},
commitment::BlockCommitmentCache,
prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{
Expand Down Expand Up @@ -397,6 +398,7 @@ impl ReplayStage {
drop_bank_sender: Sender<Vec<Arc<Bank>>>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> Self {
let mut tower = if let Some(process_blockstore) = maybe_process_blockstore {
let tower = process_blockstore.process_to_create_tower();
Expand Down Expand Up @@ -530,6 +532,7 @@ impl ReplayStage {
block_metadata_notifier.clone(),
&mut replay_timing,
log_messages_bytes_limit,
&prioritization_fee_cache,
);
replay_active_banks_time.stop();

Expand Down Expand Up @@ -1757,6 +1760,7 @@ impl ReplayStage {
replay_vote_sender: &ReplayVoteSender,
verify_recyclers: &VerifyRecyclers,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> result::Result<usize, BlockstoreProcessorError> {
let mut w_replay_stats = replay_stats.write().unwrap();
let mut w_replay_progress = replay_progress.write().unwrap();
Expand All @@ -1776,6 +1780,7 @@ impl ReplayStage {
verify_recyclers,
false,
log_messages_bytes_limit,
prioritization_fee_cache,
)?;
let tx_count_after = w_replay_progress.num_txs;
let tx_count = tx_count_after - tx_count_before;
Expand Down Expand Up @@ -2278,6 +2283,7 @@ impl ReplayStage {
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
active_bank_slots: &[Slot],
prioritization_fee_cache: &PrioritizationFeeCache,
) -> Vec<ReplaySlotFromBlockstore> {
// Make mutable shared structures thread safe.
let progress = RwLock::new(progress);
Expand Down Expand Up @@ -2353,6 +2359,7 @@ impl ReplayStage {
&replay_vote_sender.clone(),
&verify_recyclers.clone(),
log_messages_bytes_limit,
prioritization_fee_cache,
);
replay_blockstore_time.stop();
replay_result.replay_result = Some(blockstore_result);
Expand Down Expand Up @@ -2383,6 +2390,7 @@ impl ReplayStage {
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
bank_slot: Slot,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> ReplaySlotFromBlockstore {
let mut replay_result = ReplaySlotFromBlockstore {
is_slot_dead: false,
Expand Down Expand Up @@ -2432,6 +2440,7 @@ impl ReplayStage {
&replay_vote_sender.clone(),
&verify_recyclers.clone(),
log_messages_bytes_limit,
prioritization_fee_cache,
);
replay_blockstore_time.stop();
replay_result.replay_result = Some(blockstore_result);
Expand Down Expand Up @@ -2463,6 +2472,7 @@ impl ReplayStage {
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
replay_result_vec: &[ReplaySlotFromBlockstore],
prioritization_fee_cache: &PrioritizationFeeCache,
) -> bool {
// TODO: See if processing of blockstore replay results and bank completion can be made thread safe.
let mut did_complete_bank = false;
Expand Down Expand Up @@ -2535,6 +2545,9 @@ impl ReplayStage {
warn!("cost_update_sender failed sending bank stats: {:?}", err)
});

// finalize block's minimum prioritization fee cache for this bank
prioritization_fee_cache.finalize_priority_fee(bank.slot());

assert_ne!(bank.hash(), Hash::default());
// Needs to be updated before `check_slot_agrees_with_cluster()` so that
// any updates in `check_slot_agrees_with_cluster()` on fork choice take
Expand Down Expand Up @@ -2652,6 +2665,7 @@ impl ReplayStage {
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> bool {
let active_bank_slots = bank_forks.read().unwrap().active_bank_slots();
let num_active_banks = active_bank_slots.len();
Expand Down Expand Up @@ -2681,6 +2695,7 @@ impl ReplayStage {
replay_timing,
log_messages_bytes_limit,
&active_bank_slots,
prioritization_fee_cache,
)
} else {
active_bank_slots
Expand All @@ -2698,6 +2713,7 @@ impl ReplayStage {
replay_timing,
log_messages_bytes_limit,
*bank_slot,
prioritization_fee_cache,
)
})
.collect()
Expand All @@ -2715,6 +2731,7 @@ impl ReplayStage {
replay_timing,
log_messages_bytes_limit,
active_bank_slots[0],
prioritization_fee_cache,
)]
};

Expand All @@ -2739,6 +2756,7 @@ impl ReplayStage {
ancestor_hashes_replay_update_sender,
block_metadata_notifier,
&replay_result_vec,
prioritization_fee_cache,
)
} else {
false
Expand Down Expand Up @@ -4278,6 +4296,7 @@ pub(crate) mod tests {
&replay_vote_sender,
&VerifyRecyclers::default(),
None,
&PrioritizationFeeCache::new(0u64),
);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
Expand Down
6 changes: 5 additions & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use {
solana_runtime::{
accounts_background_service::AbsRequestSender, bank_forks::BankForks,
commitment::BlockCommitmentCache, cost_model::CostModel,
vote_sender_types::ReplayVoteSender,
prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
std::{
Expand Down Expand Up @@ -129,6 +129,7 @@ impl Tvu {
accounts_background_request_sender: AbsRequestSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: &Arc<ConnectionCache>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
) -> Self {
let TvuSockets {
repair: repair_socket,
Expand Down Expand Up @@ -288,6 +289,7 @@ impl Tvu {
drop_bank_sender,
block_metadata_notifier,
log_messages_bytes_limit,
prioritization_fee_cache.clone(),
);

let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
Expand Down Expand Up @@ -401,6 +403,7 @@ pub mod tests {
let (_, gossip_confirmed_slots_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let _ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let tvu = Tvu::new(
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
Expand Down Expand Up @@ -450,6 +453,7 @@ pub mod tests {
AbsRequestSender::default(),
None,
&Arc::new(ConnectionCache::default()),
&_ignored_prioritization_fee_cache,
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions core/src/unprocessed_packet_batches.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use {
crate::transaction_priority_details::{
GetTransactionPriorityDetails, TransactionPriorityDetails,
},
min_max_heap::MinMaxHeap,
solana_perf::packet::{Packet, PacketBatch},
solana_runtime::transaction_priority_details::{
GetTransactionPriorityDetails, TransactionPriorityDetails,
},
solana_sdk::{
feature_set,
hash::Hash,
Expand Down
6 changes: 6 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use {
commitment::BlockCommitmentCache,
cost_model::CostModel,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
prioritization_fee_cache::PrioritizationFeeCache,
runtime_config::RuntimeConfig,
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
Expand Down Expand Up @@ -793,6 +794,10 @@ impl Validator {
false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)),
};

// block min prioritization fee cache should be readable by RPC, and writable by validator
// (for now, by replay stage)
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());

let rpc_override_health_check = Arc::new(AtomicBool::new(false));
let (
json_rpc_service,
Expand Down Expand Up @@ -1015,6 +1020,7 @@ impl Validator {
accounts_background_request_sender,
config.runtime_config.log_messages_bytes_limit,
&connection_cache,
&prioritization_fee_cache,
);

let tpu = Tpu::new(
Expand Down
Loading

0 comments on commit 6c66421

Please sign in to comment.