From d9760dfd82462207be057035426020ecce17ba27 Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Wed, 31 Aug 2022 08:00:55 -0500 Subject: [PATCH 1/4] collect min prioritization fees when replaying sanitized transactions (#26709) * Collect blocks' minimum prioritization fees when replaying sanitized transactions * Limits block min-fee metrics reporting to top 10 writable accounts * Add service thread to asynchronously update and finalize prioritization fee cache * Add bench test for prioritization_fee_cache Co-authored-by: Tyera Eulberg (cherry picked from commit 8bb039d08d3a90eef0b51c9ca9ed0ad3f020eaa2) # Conflicts: # core/src/immutable_deserialized_packet.rs # core/src/replay_stage.rs # core/src/tvu.rs # core/src/unprocessed_packet_batches.rs # core/src/validator.rs --- Cargo.lock | 1 + .../src/forward_packet_batches_by_accounts.rs | 6 +- core/src/immutable_deserialized_packet.rs | 137 +++ core/src/lib.rs | 1 - core/src/replay_stage.rs | 58 ++ core/src/tvu.rs | 19 +- core/src/unprocessed_packet_batches.rs | 5 + core/src/validator.rs | 10 + ledger/src/blockstore_processor.rs | 17 + programs/bpf/Cargo.lock | 1 + runtime/Cargo.toml | 4 + runtime/benches/prioritization_fee_cache.rs | 113 +++ runtime/src/lib.rs | 3 + runtime/src/prioritization_fee.rs | 324 +++++++ runtime/src/prioritization_fee_cache.rs | 811 ++++++++++++++++++ .../src/transaction_priority_details.rs | 0 16 files changed, 1504 insertions(+), 6 deletions(-) create mode 100644 core/src/immutable_deserialized_packet.rs create mode 100644 runtime/benches/prioritization_fee_cache.rs create mode 100644 runtime/src/prioritization_fee.rs create mode 100644 runtime/src/prioritization_fee_cache.rs rename {core => runtime}/src/transaction_priority_details.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 491a196edfad7e..d739aaf72bf3ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5995,6 +5995,7 @@ dependencies = [ "lazy_static", "libsecp256k1", "log", + "lru", "lz4", "memmap2", "num-derive", diff --git a/core/src/forward_packet_batches_by_accounts.rs b/core/src/forward_packet_batches_by_accounts.rs index 14fcfe486fa8d6..caeda3fae2bfb1 100644 --- a/core/src/forward_packet_batches_by_accounts.rs +++ b/core/src/forward_packet_batches_by_accounts.rs @@ -187,14 +187,12 @@ impl ForwardPacketBatchesByAccounts { mod tests { use { super::*, - crate::{ - transaction_priority_details::TransactionPriorityDetails, - unprocessed_packet_batches::DeserializedPacket, - }, + crate::unprocessed_packet_batches::DeserializedPacket, solana_runtime::{ bank::Bank, bank_forks::BankForks, genesis_utils::{create_genesis_config, GenesisConfigInfo}, + transaction_priority_details::TransactionPriorityDetails, }, solana_sdk::{hash::Hash, signature::Keypair, system_transaction}, std::sync::RwLock, diff --git a/core/src/immutable_deserialized_packet.rs b/core/src/immutable_deserialized_packet.rs new file mode 100644 index 00000000000000..0a12fcad44e17d --- /dev/null +++ b/core/src/immutable_deserialized_packet.rs @@ -0,0 +1,137 @@ +use { + solana_perf::packet::Packet, + solana_runtime::transaction_priority_details::{ + GetTransactionPriorityDetails, TransactionPriorityDetails, + }, + solana_sdk::{ + hash::Hash, + message::Message, + sanitize::SanitizeError, + short_vec::decode_shortu16_len, + signature::Signature, + transaction::{SanitizedVersionedTransaction, VersionedTransaction}, + }, + std::{cmp::Ordering, mem::size_of}, + thiserror::Error, +}; + +#[derive(Debug, Error)] +pub enum DeserializedPacketError { + #[error("ShortVec Failed to Deserialize")] + // short_vec::decode_shortu16_len() currently returns () on error + ShortVecError(()), + #[error("Deserialization Error: {0}")] + DeserializationError(#[from] bincode::Error), + #[error("overflowed on signature size {0}")] + SignatureOverflowed(usize), + #[error("packet failed sanitization {0}")] + SanitizeError(#[from] SanitizeError), + #[error("transaction failed prioritization")] + PrioritizationFailure, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct ImmutableDeserializedPacket { + original_packet: Packet, + transaction: SanitizedVersionedTransaction, + message_hash: Hash, + is_simple_vote: bool, + priority_details: TransactionPriorityDetails, +} + +impl ImmutableDeserializedPacket { + pub fn new( + packet: Packet, + priority_details: Option, + ) -> Result { + let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?; + let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?; + let message_bytes = packet_message(&packet)?; + let message_hash = Message::hash_raw_message(message_bytes); + let is_simple_vote = packet.meta.is_simple_vote_tx(); + + // drop transaction if prioritization fails. + let priority_details = priority_details + .or_else(|| sanitized_transaction.get_transaction_priority_details()) + .ok_or(DeserializedPacketError::PrioritizationFailure)?; + + Ok(Self { + original_packet: packet, + transaction: sanitized_transaction, + message_hash, + is_simple_vote, + priority_details, + }) + } + + pub fn original_packet(&self) -> &Packet { + &self.original_packet + } + + pub fn transaction(&self) -> &SanitizedVersionedTransaction { + &self.transaction + } + + pub fn message_hash(&self) -> &Hash { + &self.message_hash + } + + pub fn is_simple_vote(&self) -> bool { + self.is_simple_vote + } + + pub fn priority(&self) -> u64 { + self.priority_details.priority + } + + pub fn compute_unit_limit(&self) -> u64 { + self.priority_details.compute_unit_limit + } +} + +impl PartialOrd for ImmutableDeserializedPacket { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ImmutableDeserializedPacket { + fn cmp(&self, other: &Self) -> Ordering { + self.priority().cmp(&other.priority()) + } +} + +/// Read the transaction message from packet data +fn packet_message(packet: &Packet) -> Result<&[u8], DeserializedPacketError> { + let (sig_len, sig_size) = packet + .data(..) + .and_then(|bytes| decode_shortu16_len(bytes).ok()) + .ok_or(DeserializedPacketError::ShortVecError(()))?; + sig_len + .checked_mul(size_of::()) + .and_then(|v| v.checked_add(sig_size)) + .and_then(|msg_start| packet.data(msg_start..)) + .ok_or(DeserializedPacketError::SignatureOverflowed(sig_size)) +} + +#[cfg(test)] +mod tests { + use { + super::*, + solana_sdk::{signature::Keypair, system_transaction}, + }; + + #[test] + fn simple_deserialized_packet() { + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let packet = Packet::from_data(None, &tx).unwrap(); + let deserialized_packet = ImmutableDeserializedPacket::new(packet, None); + + assert!(matches!(deserialized_packet, Ok(_))); + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 496a114500d1ae..33efb883df8281 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -69,7 +69,6 @@ mod tower1_7_14; pub mod tower_storage; pub mod tpu; pub mod tracer_packet_stats; -pub mod transaction_priority_details; pub mod tree_diff; pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 508a57157b4d16..646cebaf44cba4 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -57,6 +57,7 @@ use { bank::{Bank, NewBankOptions}, bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY}, commitment::BlockCommitmentCache, + prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender, }, solana_sdk::{ @@ -397,7 +398,12 @@ impl ReplayStage { drop_bank_sender: Sender>>, block_metadata_notifier: Option, log_messages_bytes_limit: Option, +<<<<<<< HEAD ) -> Self { +======= + prioritization_fee_cache: Arc, + ) -> Result { +>>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) let mut tower = if let Some(process_blockstore) = maybe_process_blockstore { let tower = process_blockstore.process_to_create_tower(); info!("Tower state: {:?}", tower); @@ -599,6 +605,14 @@ impl ReplayStage { &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, &ancestor_hashes_replay_update_sender, +<<<<<<< HEAD +======= + block_metadata_notifier.clone(), + &mut replay_timing, + log_messages_bytes_limit, + replay_slots_concurrently, + &prioritization_fee_cache, +>>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) ); } process_duplicate_slots_time.stop(); @@ -1757,6 +1771,7 @@ impl ReplayStage { replay_vote_sender: &ReplayVoteSender, verify_recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, + prioritization_fee_cache: &PrioritizationFeeCache, ) -> result::Result { let mut w_replay_stats = replay_stats.write().unwrap(); let mut w_replay_progress = replay_progress.write().unwrap(); @@ -1776,6 +1791,7 @@ impl ReplayStage { verify_recyclers, false, log_messages_bytes_limit, + prioritization_fee_cache, )?; let tx_count_after = w_replay_progress.num_txs; let tx_count = tx_count_after - tx_count_before; @@ -2278,6 +2294,7 @@ impl ReplayStage { replay_timing: &mut ReplayTiming, log_messages_bytes_limit: Option, active_bank_slots: &[Slot], + prioritization_fee_cache: &PrioritizationFeeCache, ) -> Vec { // Make mutable shared structures thread safe. let progress = RwLock::new(progress); @@ -2353,6 +2370,7 @@ impl ReplayStage { &replay_vote_sender.clone(), &verify_recyclers.clone(), log_messages_bytes_limit, + prioritization_fee_cache, ); replay_blockstore_time.stop(); replay_result.replay_result = Some(blockstore_result); @@ -2383,6 +2401,7 @@ impl ReplayStage { replay_timing: &mut ReplayTiming, log_messages_bytes_limit: Option, bank_slot: Slot, + prioritization_fee_cache: &PrioritizationFeeCache, ) -> ReplaySlotFromBlockstore { let mut replay_result = ReplaySlotFromBlockstore { is_slot_dead: false, @@ -2432,6 +2451,7 @@ impl ReplayStage { &replay_vote_sender.clone(), &verify_recyclers.clone(), log_messages_bytes_limit, + prioritization_fee_cache, ); replay_blockstore_time.stop(); replay_result.replay_result = Some(blockstore_result); @@ -2463,6 +2483,7 @@ impl ReplayStage { ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, block_metadata_notifier: Option, replay_result_vec: &[ReplaySlotFromBlockstore], + prioritization_fee_cache: &PrioritizationFeeCache, ) -> bool { // TODO: See if processing of blockstore replay results and bank completion can be made thread safe. let mut did_complete_bank = false; @@ -2535,6 +2556,9 @@ impl ReplayStage { warn!("cost_update_sender failed sending bank stats: {:?}", err) }); + // finalize block's minimum prioritization fee cache for this bank + prioritization_fee_cache.finalize_priority_fee(bank.slot()); + assert_ne!(bank.hash(), Hash::default()); // Needs to be updated before `check_slot_agrees_with_cluster()` so that // any updates in `check_slot_agrees_with_cluster()` on fork choice take @@ -2652,7 +2676,13 @@ impl ReplayStage { block_metadata_notifier: Option, replay_timing: &mut ReplayTiming, log_messages_bytes_limit: Option, +<<<<<<< HEAD ) -> bool { +======= + replay_slots_concurrently: bool, + prioritization_fee_cache: &PrioritizationFeeCache, + ) -> bool /* completed a bank */ { +>>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) let active_bank_slots = bank_forks.read().unwrap().active_bank_slots(); let num_active_banks = active_bank_slots.len(); trace!( @@ -2714,8 +2744,34 @@ impl ReplayStage { replay_vote_sender, replay_timing, log_messages_bytes_limit, +<<<<<<< HEAD active_bank_slots[0], )] +======= + &active_bank_slots, + prioritization_fee_cache, + ) + } else { + active_bank_slots + .iter() + .map(|bank_slot| { + Self::replay_active_bank( + blockstore, + bank_forks, + my_pubkey, + vote_account, + progress, + transaction_status_sender, + verify_recyclers, + replay_vote_sender, + replay_timing, + log_messages_bytes_limit, + *bank_slot, + prioritization_fee_cache, + ) + }) + .collect() +>>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) }; Self::process_replay_results( @@ -2739,6 +2795,7 @@ impl ReplayStage { ancestor_hashes_replay_update_sender, block_metadata_notifier, &replay_result_vec, + prioritization_fee_cache, ) } else { false @@ -4278,6 +4335,7 @@ pub(crate) mod tests { &replay_vote_sender, &VerifyRecyclers::default(), None, + &PrioritizationFeeCache::new(0u64), ); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 990b943f7478f0..c0ff81cb95ac04 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -43,7 +43,7 @@ use { solana_runtime::{ accounts_background_service::AbsRequestSender, bank_forks::BankForks, commitment::BlockCommitmentCache, cost_model::CostModel, - vote_sender_types::ReplayVoteSender, + prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender, }, solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}, std::{ @@ -129,7 +129,12 @@ impl Tvu { accounts_background_request_sender: AbsRequestSender, log_messages_bytes_limit: Option, connection_cache: &Arc, +<<<<<<< HEAD ) -> Self { +======= + prioritization_fee_cache: &Arc, + ) -> Result { +>>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) let TvuSockets { repair: repair_socket, fetch: fetch_sockets, @@ -288,7 +293,12 @@ impl Tvu { drop_bank_sender, block_metadata_notifier, log_messages_bytes_limit, +<<<<<<< HEAD ); +======= + prioritization_fee_cache.clone(), + )?; +>>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { LedgerCleanupService::new( @@ -401,6 +411,7 @@ pub mod tests { let (_, gossip_confirmed_slots_receiver) = unbounded(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let _ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let tvu = Tvu::new( &vote_keypair.pubkey(), Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])), @@ -450,7 +461,13 @@ pub mod tests { AbsRequestSender::default(), None, &Arc::new(ConnectionCache::default()), +<<<<<<< HEAD ); +======= + &_ignored_prioritization_fee_cache, + ) + .expect("assume success"); +>>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); poh_service.join().unwrap(); diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index 1973b8f9dc91ba..5d60e3a0550fd9 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -1,9 +1,14 @@ use { +<<<<<<< HEAD crate::transaction_priority_details::{ GetTransactionPriorityDetails, TransactionPriorityDetails, }, +======= + crate::immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}, +>>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) min_max_heap::MinMaxHeap, solana_perf::packet::{Packet, PacketBatch}, + solana_runtime::transaction_priority_details::TransactionPriorityDetails, solana_sdk::{ feature_set, hash::Hash, diff --git a/core/src/validator.rs b/core/src/validator.rs index c6880e9ec706ab..bb0832335762d5 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -79,6 +79,7 @@ use { commitment::BlockCommitmentCache, cost_model::CostModel, hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, + prioritization_fee_cache::PrioritizationFeeCache, runtime_config::RuntimeConfig, snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, @@ -793,6 +794,10 @@ impl Validator { false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)), }; + // block min prioritization fee cache should be readable by RPC, and writable by validator + // (for now, by replay stage) + let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default()); + let rpc_override_health_check = Arc::new(AtomicBool::new(false)); let ( json_rpc_service, @@ -1015,7 +1020,12 @@ impl Validator { accounts_background_request_sender, config.runtime_config.log_messages_bytes_limit, &connection_cache, +<<<<<<< HEAD ); +======= + &prioritization_fee_cache, + )?; +>>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) let tpu = Tpu::new( &cluster_info, diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index a51612886c001c..0fbf49c7b487bb 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -31,6 +31,7 @@ use { block_cost_limits::*, commitment::VOTE_THRESHOLD_SIZE, cost_model::CostModel, + prioritization_fee_cache::PrioritizationFeeCache, runtime_config::RuntimeConfig, transaction_batch::TransactionBatch, vote_account::VoteAccountsHashMap, @@ -515,6 +516,7 @@ pub fn process_entries_for_tests( }) .collect(); + let _ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); let result = process_entries_with_callback( bank, &mut replay_entries, @@ -525,6 +527,7 @@ pub fn process_entries_for_tests( &mut confirmation_timing, Arc::new(RwLock::new(BlockCostCapacityMeter::default())), None, + &_ignored_prioritization_fee_cache, ); debug!("process_entries: {:?}", confirmation_timing); @@ -543,6 +546,7 @@ fn process_entries_with_callback( confirmation_timing: &mut ConfirmationTiming, cost_capacity_meter: Arc>, log_messages_bytes_limit: Option, + prioritization_fee_cache: &PrioritizationFeeCache, ) -> Result<()> { // accumulator for entries that can be processed in parallel let mut batches = vec![]; @@ -605,6 +609,9 @@ fn process_entries_with_callback( batch, transaction_indexes, }); + // entry is scheduled to be processed, transactions in it can be used to + // update prioritization fee cache asynchronously. + prioritization_fee_cache.update(bank.clone(), transactions.iter()); // done with this entry break; } @@ -940,6 +947,8 @@ fn confirm_full_slot( ) -> result::Result<(), BlockstoreProcessorError> { let mut confirmation_timing = ConfirmationTiming::default(); let skip_verification = !opts.poh_verify; + let _ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); + confirm_slot( blockstore, bank, @@ -952,6 +961,7 @@ fn confirm_full_slot( recyclers, opts.allow_dead_slots, opts.runtime_config.log_messages_bytes_limit, + &_ignored_prioritization_fee_cache, )?; timing.accumulate(&confirmation_timing.execute_timings); @@ -1078,6 +1088,7 @@ pub fn confirm_slot( recyclers: &VerifyRecyclers, allow_dead_slots: bool, log_messages_bytes_limit: Option, + prioritization_fee_cache: &PrioritizationFeeCache, ) -> result::Result<(), BlockstoreProcessorError> { let slot = bank.slot(); @@ -1106,6 +1117,7 @@ pub fn confirm_slot( entry_callback, recyclers, log_messages_bytes_limit, + prioritization_fee_cache, ) } @@ -1121,6 +1133,7 @@ fn confirm_slot_entries( entry_callback: Option<&ProcessCallback>, recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, + prioritization_fee_cache: &PrioritizationFeeCache, ) -> result::Result<(), BlockstoreProcessorError> { let slot = bank.slot(); let (entries, num_shreds, slot_full) = slot_entries_load_result; @@ -1222,6 +1235,7 @@ fn confirm_slot_entries( timing, cost_capacity_meter, log_messages_bytes_limit, + prioritization_fee_cache, ) .map_err(BlockstoreProcessorError::from); replay_elapsed.stop(); @@ -4131,6 +4145,7 @@ pub mod tests { None, &VerifyRecyclers::default(), None, + &PrioritizationFeeCache::new(0u64), ) } @@ -4274,6 +4289,7 @@ pub mod tests { None, &VerifyRecyclers::default(), None, + &PrioritizationFeeCache::new(0u64), ) .unwrap(); assert_eq!(progress.num_txs, 2); @@ -4319,6 +4335,7 @@ pub mod tests { None, &VerifyRecyclers::default(), None, + &PrioritizationFeeCache::new(0u64), ) .unwrap(); assert_eq!(progress.num_txs, 5); diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 86e940f0497ed3..815decab2bf6bb 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -5390,6 +5390,7 @@ dependencies = [ "itertools", "lazy_static", "log", + "lru", "lz4", "memmap2", "num-derive", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 318fa6d7733f39..0437f0e99ec108 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -27,6 +27,7 @@ index_list = "0.2.7" itertools = "0.10.3" lazy_static = "1.4.0" log = "0.4.17" +lru = "0.7.7" lz4 = "1.24.0" memmap2 = "0.5.3" num-derive = { version = "0.3" } @@ -78,3 +79,6 @@ targets = ["x86_64-unknown-linux-gnu"] [build-dependencies] rustc_version = "0.4" + +[[bench]] +name = "prioritization_fee_cache" diff --git a/runtime/benches/prioritization_fee_cache.rs b/runtime/benches/prioritization_fee_cache.rs new file mode 100644 index 00000000000000..e04e783b99d43e --- /dev/null +++ b/runtime/benches/prioritization_fee_cache.rs @@ -0,0 +1,113 @@ +#![feature(test)] +extern crate test; + +use { + rand::{thread_rng, Rng}, + solana_runtime::{ + bank::Bank, + bank_forks::BankForks, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + prioritization_fee_cache::*, + }, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, + message::Message, + pubkey::Pubkey, + system_instruction, + transaction::{SanitizedTransaction, Transaction}, + }, + std::sync::Arc, + test::Bencher, +}; +const TRANSFER_TRANSACTION_COMPUTE_UNIT: u32 = 200; + +fn build_sanitized_transaction( + compute_unit_price: u64, + signer_account: &Pubkey, + write_account: &Pubkey, +) -> SanitizedTransaction { + let transfer_lamports = 1; + let transaction = Transaction::new_unsigned(Message::new( + &[ + system_instruction::transfer(signer_account, write_account, transfer_lamports), + ComputeBudgetInstruction::set_compute_unit_limit(TRANSFER_TRANSACTION_COMPUTE_UNIT), + ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price), + ], + Some(signer_account), + )); + + SanitizedTransaction::try_from_legacy_transaction(transaction).unwrap() +} + +#[bench] +#[ignore] +fn bench_process_transactions_single_slot(bencher: &mut Bencher) { + let prioritization_fee_cache = PrioritizationFeeCache::default(); + + let bank = Arc::new(Bank::default_for_tests()); + + // build test transactions + let transactions: Vec<_> = (0..5000) + .map(|n| { + let compute_unit_price = n % 7; + build_sanitized_transaction( + compute_unit_price, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + ) + }) + .collect(); + + bencher.iter(|| { + prioritization_fee_cache.update(bank.clone(), transactions.iter()); + }); +} + +fn process_transactions_multiple_slots(banks: &[Arc], num_slots: usize, num_threads: usize) { + let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default()); + + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .build() + .unwrap(); + + // each threads updates a slot a batch of 50 transactions, for 100 times + for _ in 0..100 { + pool.install(|| { + let transactions: Vec<_> = (0..50) + .map(|n| { + let compute_unit_price = n % 7; + build_sanitized_transaction( + compute_unit_price, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + ) + }) + .collect(); + + let index = thread_rng().gen_range(0, num_slots); + + prioritization_fee_cache.update(banks[index].clone(), transactions.iter()); + }) + } +} + +#[bench] +#[ignore] +fn bench_process_transactions_multiple_slots(bencher: &mut Bencher) { + const NUM_SLOTS: usize = 5; + const NUM_THREADS: usize = 3; + + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank0 = Bank::new_for_benches(&genesis_config); + let bank_forks = BankForks::new(bank0); + let bank = bank_forks.working_bank(); + let collector = solana_sdk::pubkey::new_rand(); + let banks = (1..=NUM_SLOTS) + .map(|n| Arc::new(Bank::new_from_parent(&bank, &collector, n as u64))) + .collect::>(); + + bencher.iter(|| { + process_transactions_multiple_slots(&banks, NUM_SLOTS, NUM_THREADS); + }); +} diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 45d338bfe58127..30a7aea527dc15 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -46,6 +46,8 @@ pub mod loader_utils; pub mod message_processor; pub mod non_circulating_supply; mod nonce_keyed_account; +pub mod prioritization_fee; +pub mod prioritization_fee_cache; mod pubkey_bins; mod read_only_accounts_cache; pub mod rent_collector; @@ -72,6 +74,7 @@ mod storable_accounts; mod system_instruction_processor; pub mod transaction_batch; pub mod transaction_error_metrics; +pub mod transaction_priority_details; mod verify_accounts_hash_in_background; pub mod vote_account; pub mod vote_parser; diff --git a/runtime/src/prioritization_fee.rs b/runtime/src/prioritization_fee.rs new file mode 100644 index 00000000000000..3c225090278989 --- /dev/null +++ b/runtime/src/prioritization_fee.rs @@ -0,0 +1,324 @@ +use { + solana_measure::measure, + solana_sdk::{clock::Slot, pubkey::Pubkey, saturating_add_assign}, + std::collections::HashMap, +}; + +#[derive(Debug, Default)] +struct PrioritizationFeeMetrics { + // Count of writable accounts in slot + total_writable_accounts_count: u64, + + // Count of writeable accounts with a minimum prioritization fee higher than the minimum transaction + // fee for this slot. + relevant_writable_accounts_count: u64, + + // Total prioritization fees included in this slot. + total_prioritization_fee: u64, + + // Accumulated time spent on tracking prioritization fee for each slot. + total_update_elapsed_us: u64, +} + +impl PrioritizationFeeMetrics { + fn accumulate_total_prioritization_fee(&mut self, val: u64) { + saturating_add_assign!(self.total_prioritization_fee, val); + } + + fn accumulate_total_update_elapsed_us(&mut self, val: u64) { + saturating_add_assign!(self.total_update_elapsed_us, val); + } + + fn report(&self, slot: Slot) { + datapoint_info!( + "block_prioritization_fee", + ("slot", slot as i64, i64), + ( + "total_writable_accounts_count", + self.total_writable_accounts_count as i64, + i64 + ), + ( + "relevant_writable_accounts_count", + self.relevant_writable_accounts_count as i64, + i64 + ), + ( + "total_prioritization_fee", + self.total_prioritization_fee as i64, + i64 + ), + ( + "total_update_elapsed_us", + self.total_update_elapsed_us as i64, + i64 + ), + ); + } +} + +pub enum PrioritizationFeeError { + // Not able to get account locks from sanitized transaction, which is required to update block + // minimum fees. + FailGetTransactionAccountLocks, + + // Not able to read priority details, including compute-unit price, from transaction. + // Compute-unit price is required to update block minimum fees. + FailGetTransactionPriorityDetails, + + // Block is already finalized, trying to finalize it again is usually unexpected + BlockIsAlreadyFinalized, +} + +/// Block minimum prioritization fee stats, includes the minimum prioritization fee for a transaction in this +/// block; and the minimum fee for each writable account in all transactions in this block. The only relevant +/// write account minimum fees are those greater than the block minimum transaction fee, because the minimum fee needed to land +/// a transaction is determined by Max( min_transaction_fee, min_writable_account_fees(key), ...) +#[derive(Debug)] +pub struct PrioritizationFee { + // The minimum prioritization fee of transactions that landed in this block. + min_transaction_fee: u64, + + // The minimum prioritization fee of each writable account in transactions in this block. + min_writable_account_fees: HashMap, + + // Default to `false`, set to `true` when a block is completed, therefore the minimum fees recorded + // are finalized, and can be made available for use (e.g., RPC query) + is_finalized: bool, + + // slot prioritization fee metrics + metrics: PrioritizationFeeMetrics, +} + +impl Default for PrioritizationFee { + fn default() -> Self { + PrioritizationFee { + min_transaction_fee: u64::MAX, + min_writable_account_fees: HashMap::new(), + is_finalized: false, + metrics: PrioritizationFeeMetrics::default(), + } + } +} + +impl PrioritizationFee { + /// Update self for minimum transaction fee in the block and minimum fee for each writable account. + pub fn update( + &mut self, + transaction_fee: u64, + writable_accounts: &[Pubkey], + ) -> Result<(), PrioritizationFeeError> { + let (_, update_time) = measure!( + { + if transaction_fee < self.min_transaction_fee { + self.min_transaction_fee = transaction_fee; + } + + for write_account in writable_accounts.iter() { + self.min_writable_account_fees + .entry(*write_account) + .and_modify(|write_lock_fee| { + *write_lock_fee = std::cmp::min(*write_lock_fee, transaction_fee) + }) + .or_insert(transaction_fee); + } + + self.metrics + .accumulate_total_prioritization_fee(transaction_fee); + }, + "update_time", + ); + + self.metrics + .accumulate_total_update_elapsed_us(update_time.as_us()); + Ok(()) + } + + /// Accounts that have minimum fees lesser or equal to the minimum fee in the block are redundant, they are + /// removed to reduce memory footprint when mark_block_completed() is called. + fn prune_irrelevant_writable_accounts(&mut self) { + self.metrics.total_writable_accounts_count = self.get_writable_accounts_count() as u64; + self.min_writable_account_fees + .retain(|_, account_fee| account_fee > &mut self.min_transaction_fee); + self.metrics.relevant_writable_accounts_count = self.get_writable_accounts_count() as u64; + } + + pub fn mark_block_completed(&mut self) -> Result<(), PrioritizationFeeError> { + if self.is_finalized { + return Err(PrioritizationFeeError::BlockIsAlreadyFinalized); + } + self.prune_irrelevant_writable_accounts(); + self.is_finalized = true; + Ok(()) + } + + pub fn get_min_transaction_fee(&self) -> Option { + (self.min_transaction_fee != u64::MAX).then_some(self.min_transaction_fee) + } + + pub fn get_writable_account_fee(&self, key: &Pubkey) -> Option { + self.min_writable_account_fees.get(key).copied() + } + + pub fn get_writable_account_fees(&self) -> impl Iterator { + self.min_writable_account_fees.iter() + } + + pub fn get_writable_accounts_count(&self) -> usize { + self.min_writable_account_fees.len() + } + + pub fn is_finalized(&self) -> bool { + self.is_finalized + } + + pub fn report_metrics(&self, slot: Slot) { + self.metrics.report(slot); + + // report this slot's min_transaction_fee and top 10 min_writable_account_fees + let min_transaction_fee = self.get_min_transaction_fee().unwrap_or(0); + let mut accounts_fees: Vec<_> = self.get_writable_account_fees().collect(); + accounts_fees.sort_by(|lh, rh| rh.1.cmp(lh.1)); + datapoint_info!( + "block_min_prioritization_fee", + ("slot", slot as i64, i64), + ("entity", "block", String), + ("min_prioritization_fee", min_transaction_fee as i64, i64), + ); + for (account_key, fee) in accounts_fees.iter().take(10) { + datapoint_info!( + "block_min_prioritization_fee", + ("slot", slot as i64, i64), + ("entity", account_key.to_string(), String), + ("min_prioritization_fee", **fee as i64, i64), + ); + } + } +} + +#[cfg(test)] +mod tests { + use {super::*, solana_sdk::pubkey::Pubkey}; + + #[test] + fn test_update_prioritization_fee() { + solana_logger::setup(); + let write_account_a = Pubkey::new_unique(); + let write_account_b = Pubkey::new_unique(); + let write_account_c = Pubkey::new_unique(); + + let mut prioritization_fee = PrioritizationFee::default(); + assert!(prioritization_fee.get_min_transaction_fee().is_none()); + + // Assert for 1st transaction + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] + // ----------------------------------------------------------------------- + // [5, a, b ] --> [5, 5, 5, nil ] + { + assert!(prioritization_fee + .update(5, &[write_account_a, write_account_b]) + .is_ok()); + assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); + assert_eq!( + 5, + prioritization_fee + .get_writable_account_fee(&write_account_a) + .unwrap() + ); + assert_eq!( + 5, + prioritization_fee + .get_writable_account_fee(&write_account_b) + .unwrap() + ); + assert!(prioritization_fee + .get_writable_account_fee(&write_account_c) + .is_none()); + } + + // Assert for second transaction: + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] + // ----------------------------------------------------------------------- + // [9, b, c ] --> [5, 5, 5, 9 ] + { + assert!(prioritization_fee + .update(9, &[write_account_b, write_account_c]) + .is_ok()); + assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); + assert_eq!( + 5, + prioritization_fee + .get_writable_account_fee(&write_account_a) + .unwrap() + ); + assert_eq!( + 5, + prioritization_fee + .get_writable_account_fee(&write_account_b) + .unwrap() + ); + assert_eq!( + 9, + prioritization_fee + .get_writable_account_fee(&write_account_c) + .unwrap() + ); + } + + // Assert for third transaction: + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] + // ----------------------------------------------------------------------- + // [2, a, c ] --> [2, 2, 5, 2 ] + { + assert!(prioritization_fee + .update(2, &[write_account_a, write_account_c]) + .is_ok()); + assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap()); + assert_eq!( + 2, + prioritization_fee + .get_writable_account_fee(&write_account_a) + .unwrap() + ); + assert_eq!( + 5, + prioritization_fee + .get_writable_account_fee(&write_account_b) + .unwrap() + ); + assert_eq!( + 2, + prioritization_fee + .get_writable_account_fee(&write_account_c) + .unwrap() + ); + } + + // assert after prune, account a and c should be removed from cache to save space + { + prioritization_fee.prune_irrelevant_writable_accounts(); + assert_eq!(1, prioritization_fee.min_writable_account_fees.len()); + assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap()); + assert!(prioritization_fee + .get_writable_account_fee(&write_account_a) + .is_none()); + assert_eq!( + 5, + prioritization_fee + .get_writable_account_fee(&write_account_b) + .unwrap() + ); + assert!(prioritization_fee + .get_writable_account_fee(&write_account_c) + .is_none()); + } + } + + #[test] + fn test_mark_block_completed() { + let mut prioritization_fee = PrioritizationFee::default(); + + assert!(prioritization_fee.mark_block_completed().is_ok()); + assert!(prioritization_fee.mark_block_completed().is_err()); + } +} diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs new file mode 100644 index 00000000000000..3ecf11cdc61b7f --- /dev/null +++ b/runtime/src/prioritization_fee_cache.rs @@ -0,0 +1,811 @@ +use { + crate::{ + bank::Bank, prioritization_fee::*, + transaction_priority_details::GetTransactionPriorityDetails, + }, + crossbeam_channel::{unbounded, Receiver, Sender}, + log::*, + lru::LruCache, + solana_measure::measure, + solana_sdk::{ + clock::Slot, pubkey::Pubkey, saturating_add_assign, transaction::SanitizedTransaction, + }, + std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, RwLock, + }, + thread::{Builder, JoinHandle}, + }, +}; + +/// The maximum number of blocks to keep in `PrioritizationFeeCache`, ie. +/// the amount of history generally desired to estimate the prioritization fee needed to +/// land a transaction in the current block. +const MAX_NUM_RECENT_BLOCKS: u64 = 150; + +#[derive(Debug, Default)] +struct PrioritizationFeeCacheMetrics { + // Count of transactions that successfully updated each slot's prioritization fee cache. + successful_transaction_update_count: AtomicU64, + + // Accumulated time spent on tracking prioritization fee for each slot. + total_update_elapsed_us: AtomicU64, + + // Accumulated time spent on acquiring cache write lock. + total_cache_lock_elapsed_us: AtomicU64, + + // Accumulated time spent on acquiring each block entry's lock.. + total_entry_lock_elapsed_us: AtomicU64, + + // Accumulated time spent on updating block prioritization fees. + total_entry_update_elapsed_us: AtomicU64, + + // Accumulated time spent on finalizing block prioritization fees. + total_block_finalize_elapsed_us: AtomicU64, +} + +impl PrioritizationFeeCacheMetrics { + fn accumulate_successful_transaction_update_count(&self, val: u64) { + self.successful_transaction_update_count + .fetch_add(val, Ordering::Relaxed); + } + + fn accumulate_total_update_elapsed_us(&self, val: u64) { + self.total_update_elapsed_us + .fetch_add(val, Ordering::Relaxed); + } + + fn accumulate_total_cache_lock_elapsed_us(&self, val: u64) { + self.total_cache_lock_elapsed_us + .fetch_add(val, Ordering::Relaxed); + } + + fn accumulate_total_entry_lock_elapsed_us(&self, val: u64) { + self.total_entry_lock_elapsed_us + .fetch_add(val, Ordering::Relaxed); + } + + fn accumulate_total_entry_update_elapsed_us(&self, val: u64) { + self.total_entry_update_elapsed_us + .fetch_add(val, Ordering::Relaxed); + } + + fn accumulate_total_block_finalize_elapsed_us(&self, val: u64) { + self.total_block_finalize_elapsed_us + .fetch_add(val, Ordering::Relaxed); + } + + fn report(&self, slot: Slot) { + datapoint_info!( + "block_prioritization_fee_counters", + ("slot", slot as i64, i64), + ( + "successful_transaction_update_count", + self.successful_transaction_update_count + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "total_update_elapsed_us", + self.total_update_elapsed_us.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "total_cache_lock_elapsed_us", + self.total_cache_lock_elapsed_us.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "total_entry_lock_elapsed_us", + self.total_entry_lock_elapsed_us.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "total_entry_update_elapsed_us", + self.total_entry_update_elapsed_us + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "total_block_finalize_elapsed_us", + self.total_block_finalize_elapsed_us + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ); + } +} + +enum CacheServiceUpdate { + TransactionUpdate { + slot: Slot, + transaction_fee: u64, + writable_accounts: Arc>, + }, + BankFrozen { + slot: Slot, + }, + Exit, +} + +/// Stores up to MAX_NUM_RECENT_BLOCKS recent block's prioritization fee, +/// A separate internal thread `service_thread` handles additional tasks when a bank is frozen, +/// and collecting stats and reporting metrics. +pub struct PrioritizationFeeCache { + cache: Arc>>>>, + service_thread: Option>, + sender: Sender, + metrics: Arc, +} + +impl Default for PrioritizationFeeCache { + fn default() -> Self { + Self::new(MAX_NUM_RECENT_BLOCKS) + } +} + +impl Drop for PrioritizationFeeCache { + fn drop(&mut self) { + let _ = self.sender.send(CacheServiceUpdate::Exit); + self.service_thread + .take() + .unwrap() + .join() + .expect("Prioritization fee cache servicing thread failed to join"); + } +} + +impl PrioritizationFeeCache { + pub fn new(capacity: u64) -> Self { + let metrics = Arc::new(PrioritizationFeeCacheMetrics::default()); + let (sender, receiver) = unbounded(); + let cache = Arc::new(RwLock::new(LruCache::new(capacity as usize))); + + let cache_clone = cache.clone(); + let metrics_clone = metrics.clone(); + let service_thread = Some( + Builder::new() + .name("prioritization-fee-cache-servicing-thread".to_string()) + .spawn(move || { + Self::service_loop(cache_clone, receiver, metrics_clone); + }) + .unwrap(), + ); + + PrioritizationFeeCache { + cache, + service_thread, + sender, + metrics, + } + } + + /// Get prioritization fee entry, create new entry if necessary + fn get_prioritization_fee( + cache: Arc>>>>, + slot: &Slot, + ) -> Arc> { + let mut cache = cache.write().unwrap(); + match cache.get(slot) { + Some(entry) => Arc::clone(entry), + None => { + let entry = Arc::new(Mutex::new(PrioritizationFee::default())); + cache.put(*slot, Arc::clone(&entry)); + entry + } + } + } + + /// Update with a list of transactions' tx_priority_details and tx_account_locks; Only + /// transactions have both valid priority_detail and account_locks will be used to update + /// fee_cache asynchronously. + pub fn update<'a>(&self, bank: Arc, txs: impl Iterator) { + let mut successful_transaction_update_count: u64 = 0; + let (_, send_updates_time) = measure!( + { + for sanitized_transaction in txs { + let priority_details = sanitized_transaction.get_transaction_priority_details(); + let account_locks = sanitized_transaction + .get_account_locks(bank.get_transaction_account_lock_limit()); + + if priority_details.is_none() || account_locks.is_err() { + continue; + } + + let writable_accounts = Arc::new( + account_locks + .unwrap() + .writable + .iter() + .map(|key| **key) + .collect::>(), + ); + + self.sender + .send(CacheServiceUpdate::TransactionUpdate { + slot: bank.slot(), + transaction_fee: priority_details.unwrap().priority, + writable_accounts, + }) + .unwrap_or_else(|err| { + warn!( + "prioritization fee cache transaction updates failed: {:?}", + err + ); + }); + saturating_add_assign!(successful_transaction_update_count, 1) + } + }, + "send_updates", + ); + + self.metrics + .accumulate_total_update_elapsed_us(send_updates_time.as_us()); + self.metrics + .accumulate_successful_transaction_update_count(successful_transaction_update_count); + } + + /// Finalize prioritization fee when it's bank is completely replayed from blockstore, + /// by pruning irrelevant accounts to save space, and marking its availability for queries. + pub fn finalize_priority_fee(&self, slot: Slot) { + self.sender + .send(CacheServiceUpdate::BankFrozen { slot }) + .unwrap_or_else(|err| { + warn!( + "prioritization fee cache signalling bank frozen failed: {:?}", + err + ) + }); + } + + /// Internal function is invoked by worker thread to update slot's minimum prioritization fee, + /// Cache lock contends here. + fn update_cache( + cache: Arc>>>>, + slot: &Slot, + transaction_fee: u64, + writable_accounts: Arc>, + metrics: Arc, + ) { + let (block_prioritization_fee, cache_lock_time) = + measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time"); + + let (mut block_prioritization_fee, entry_lock_time) = + measure!(block_prioritization_fee.lock().unwrap(), "entry_lock_time"); + + let (_, entry_update_time) = measure!( + block_prioritization_fee.update(transaction_fee, &writable_accounts), + "entry_update_time" + ); + metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); + metrics.accumulate_total_entry_lock_elapsed_us(entry_lock_time.as_us()); + metrics.accumulate_total_entry_update_elapsed_us(entry_update_time.as_us()); + } + + fn finalize_slot( + cache: Arc>>>>, + slot: &Slot, + metrics: Arc, + ) { + let (block_prioritization_fee, cache_lock_time) = + measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time"); + + let (mut block_prioritization_fee, entry_lock_time) = + measure!(block_prioritization_fee.lock().unwrap(), "entry_lock_time"); + + // prune cache by evicting write account entry from prioritization fee if its fee is less + // or equal to block's minimum transaction fee, because they are irrelevant in calculating + // block minimum fee. + let (_, slot_finalize_time) = measure!( + block_prioritization_fee.mark_block_completed(), + "slot_finalize_time" + ); + block_prioritization_fee.report_metrics(*slot); + metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); + metrics.accumulate_total_entry_lock_elapsed_us(entry_lock_time.as_us()); + metrics.accumulate_total_block_finalize_elapsed_us(slot_finalize_time.as_us()); + } + + fn service_loop( + cache: Arc>>>>, + receiver: Receiver, + metrics: Arc, + ) { + for update in receiver.iter() { + match update { + CacheServiceUpdate::TransactionUpdate { + slot, + transaction_fee, + writable_accounts, + } => Self::update_cache( + cache.clone(), + &slot, + transaction_fee, + writable_accounts, + metrics.clone(), + ), + CacheServiceUpdate::BankFrozen { slot } => { + Self::finalize_slot(cache.clone(), &slot, metrics.clone()); + + metrics.report(slot); + } + CacheServiceUpdate::Exit => { + break; + } + } + } + } + + /// Returns number of blocks that have finalized minimum fees collection + pub fn available_block_count(&self) -> usize { + self.cache + .read() + .unwrap() + .iter() + .filter(|(_slot, prioritization_fee)| prioritization_fee.lock().unwrap().is_finalized()) + .count() + } + + /// Query block minimum fees from finalized blocks in cache, + /// Returns a vector of fee; call site can use it to produce + /// average, or top 5% etc. + pub fn get_prioritization_fees(&self) -> Vec { + self.cache + .read() + .unwrap() + .iter() + .filter_map(|(_slot, prioritization_fee)| { + let prioritization_fee_read = prioritization_fee.lock().unwrap(); + prioritization_fee_read + .is_finalized() + .then(|| prioritization_fee_read.get_min_transaction_fee()) + }) + .flatten() + .collect() + } + + /// Query given account minimum fees from finalized blocks in cache, + /// Returns a vector of fee; call site can use it to produce + /// average, or top 5% etc. + pub fn get_account_prioritization_fees(&self, account_key: &Pubkey) -> Vec { + self.cache + .read() + .unwrap() + .iter() + .filter_map(|(_slot, prioritization_fee)| { + let prioritization_fee_read = prioritization_fee.lock().unwrap(); + prioritization_fee_read + .is_finalized() + .then(|| prioritization_fee_read.get_writable_account_fee(account_key)) + }) + .flatten() + .collect() + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + crate::{ + bank::Bank, + bank_forks::BankForks, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, + message::Message, + pubkey::Pubkey, + system_instruction, + transaction::{SanitizedTransaction, Transaction}, + }, + }; + + fn build_sanitized_transaction_for_test( + compute_unit_price: u64, + signer_account: &Pubkey, + write_account: &Pubkey, + ) -> SanitizedTransaction { + let transaction = Transaction::new_unsigned(Message::new( + &[ + system_instruction::transfer(signer_account, write_account, 1), + ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price), + ], + Some(signer_account), + )); + + SanitizedTransaction::try_from_legacy_transaction(transaction).unwrap() + } + + // update fee cache is asynchronous, this test helper blocks until update is completed. + fn sync_update<'a>( + prioritization_fee_cache: &mut PrioritizationFeeCache, + bank: Arc, + txs: impl Iterator, + ) { + prioritization_fee_cache.update(bank.clone(), txs); + + let block_fee = PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &bank.slot(), + ); + + // wait till update is done + while block_fee + .lock() + .unwrap() + .get_min_transaction_fee() + .is_none() + { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + } + + // finalization is asynchronous, this test helper blocks until finalization is completed. + fn sync_finalize_priority_fee_for_test( + prioritization_fee_cache: &mut PrioritizationFeeCache, + slot: Slot, + ) { + prioritization_fee_cache.finalize_priority_fee(slot); + let fee = PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &slot, + ); + + // wait till finalization is done + while !fee.lock().unwrap().is_finalized() { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + } + + #[test] + fn test_prioritization_fee_cache_update() { + solana_logger::setup(); + let write_account_a = Pubkey::new_unique(); + let write_account_b = Pubkey::new_unique(); + let write_account_c = Pubkey::new_unique(); + + // Set up test with 3 transactions, in format of [fee, write-accounts...], + // Shall expect fee cache is updated in following sequence: + // transaction block minimum prioritization fee cache + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] + // ----------------------------------------------------------------------- + // [5, a, b ] --> [5, 5, 5, nil ] + // [9, b, c ] --> [5, 5, 5, 9 ] + // [2, a, c ] --> [2, 2, 5, 2 ] + // + let txs = vec![ + build_sanitized_transaction_for_test(5, &write_account_a, &write_account_b), + build_sanitized_transaction_for_test(9, &write_account_b, &write_account_c), + build_sanitized_transaction_for_test(2, &write_account_a, &write_account_c), + ]; + + let bank = Arc::new(Bank::default_for_tests()); + let slot = bank.slot(); + + let mut prioritization_fee_cache = PrioritizationFeeCache::default(); + prioritization_fee_cache.update(bank, txs.iter()); + + // assert block minimum fee and account a, b, c fee accordingly + { + let fee = PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &slot, + ); + let fee = fee.lock().unwrap(); + assert_eq!(2, fee.get_min_transaction_fee().unwrap()); + assert_eq!(2, fee.get_writable_account_fee(&write_account_a).unwrap()); + assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); + assert_eq!(2, fee.get_writable_account_fee(&write_account_c).unwrap()); + // assert unknown account d fee + assert!(fee + .get_writable_account_fee(&Pubkey::new_unique()) + .is_none()); + } + + // assert after prune, account a and c should be removed from cache to save space + { + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, slot); + let fee = PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &slot, + ); + let fee = fee.lock().unwrap(); + assert_eq!(2, fee.get_min_transaction_fee().unwrap()); + assert!(fee.get_writable_account_fee(&write_account_a).is_none()); + assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); + assert!(fee.get_writable_account_fee(&write_account_c).is_none()); + } + } + + #[test] + fn test_available_block_count() { + let prioritization_fee_cache = PrioritizationFeeCache::default(); + + assert!(PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &1 + ) + .lock() + .unwrap() + .mark_block_completed() + .is_ok()); + assert!(PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &2 + ) + .lock() + .unwrap() + .mark_block_completed() + .is_ok()); + // add slot 3 entry to cache, but not finalize it + PrioritizationFeeCache::get_prioritization_fee(prioritization_fee_cache.cache.clone(), &3); + + // assert available block count should be 2 finalized blocks + assert_eq!(2, prioritization_fee_cache.available_block_count()); + } + + fn assert_vec_eq(expected: &mut Vec, actual: &mut Vec) { + expected.sort_unstable(); + actual.sort_unstable(); + assert_eq!(expected, actual); + } + + #[test] + fn test_get_prioritization_fees() { + solana_logger::setup(); + let write_account_a = Pubkey::new_unique(); + let write_account_b = Pubkey::new_unique(); + let write_account_c = Pubkey::new_unique(); + + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank0 = Bank::new_for_benches(&genesis_config); + let bank_forks = BankForks::new(bank0); + let bank = bank_forks.working_bank(); + let collector = solana_sdk::pubkey::new_rand(); + let bank1 = Arc::new(Bank::new_from_parent(&bank, &collector, 1)); + let bank2 = Arc::new(Bank::new_from_parent(&bank, &collector, 2)); + let bank3 = Arc::new(Bank::new_from_parent(&bank, &collector, 3)); + + let mut prioritization_fee_cache = PrioritizationFeeCache::default(); + + // Assert no minimum fee from empty cache + assert!(prioritization_fee_cache + .get_prioritization_fees() + .is_empty()); + + // Assert after add one transaction for slot 1 + { + let txs = vec![build_sanitized_transaction_for_test( + 5, + &write_account_a, + &write_account_b, + )]; + sync_update(&mut prioritization_fee_cache, bank1.clone(), txs.iter()); + assert_eq!( + 5, + PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &bank1.slot() + ) + .lock() + .unwrap() + .get_min_transaction_fee() + .unwrap() + ); + // before block is marked as completed + assert!(prioritization_fee_cache + .get_prioritization_fees() + .is_empty()); + // after block is completed + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank1.slot()); + assert_eq!(vec![5], prioritization_fee_cache.get_prioritization_fees()); + } + + // Assert after add one transaction for slot 2 + { + let txs = vec![build_sanitized_transaction_for_test( + 9, + &write_account_b, + &write_account_c, + )]; + sync_update(&mut prioritization_fee_cache, bank2.clone(), txs.iter()); + assert_eq!( + 9, + PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &bank2.slot() + ) + .lock() + .unwrap() + .get_min_transaction_fee() + .unwrap() + ); + // before block is marked as completed + assert_eq!(vec![5], prioritization_fee_cache.get_prioritization_fees()); + // after block is completed + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank2.slot()); + assert_vec_eq( + &mut vec![5, 9], + &mut prioritization_fee_cache.get_prioritization_fees(), + ); + } + + // Assert after add one transaction for slot 3 + { + let txs = vec![build_sanitized_transaction_for_test( + 2, + &write_account_a, + &write_account_c, + )]; + sync_update(&mut prioritization_fee_cache, bank3.clone(), txs.iter()); + assert_eq!( + 2, + PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &bank3.slot() + ) + .lock() + .unwrap() + .get_min_transaction_fee() + .unwrap() + ); + // before block is marked as completed + assert_vec_eq( + &mut vec![5, 9], + &mut prioritization_fee_cache.get_prioritization_fees(), + ); + // after block is completed + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank3.slot()); + assert_vec_eq( + &mut vec![5, 9, 2], + &mut prioritization_fee_cache.get_prioritization_fees(), + ); + } + } + + #[test] + fn test_get_account_prioritization_fees() { + solana_logger::setup(); + let write_account_a = Pubkey::new_unique(); + let write_account_b = Pubkey::new_unique(); + let write_account_c = Pubkey::new_unique(); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank0 = Bank::new_for_benches(&genesis_config); + let bank_forks = BankForks::new(bank0); + let bank = bank_forks.working_bank(); + let collector = solana_sdk::pubkey::new_rand(); + let bank1 = Arc::new(Bank::new_from_parent(&bank, &collector, 1)); + let bank2 = Arc::new(Bank::new_from_parent(&bank, &collector, 2)); + let bank3 = Arc::new(Bank::new_from_parent(&bank, &collector, 3)); + + let mut prioritization_fee_cache = PrioritizationFeeCache::default(); + + // Assert no minimum fee from empty cache + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_a) + .is_empty()); + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_b) + .is_empty()); + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_c) + .is_empty()); + + // Assert after add one transaction for slot 1 + { + let txs = vec![ + build_sanitized_transaction_for_test(5, &write_account_a, &write_account_b), + build_sanitized_transaction_for_test( + 0, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + ), + ]; + prioritization_fee_cache.update(bank1.clone(), txs.iter()); + // before block is marked as completed + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_a) + .is_empty()); + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_b) + .is_empty()); + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_c) + .is_empty()); + // after block is completed + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank1.slot()); + assert_eq!( + vec![5], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_a) + ); + assert_eq!( + vec![5], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_b) + ); + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_c) + .is_empty()); + } + + // Assert after add one transaction for slot 2 + { + let txs = vec![ + build_sanitized_transaction_for_test(9, &write_account_b, &write_account_c), + build_sanitized_transaction_for_test( + 0, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + ), + ]; + prioritization_fee_cache.update(bank2.clone(), txs.iter()); + // before block is marked as completed + assert_eq!( + vec![5], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_a) + ); + assert_eq!( + vec![5], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_b) + ); + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_c) + .is_empty()); + // after block is completed + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank2.slot()); + assert_eq!( + vec![5], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_a) + ); + assert_vec_eq( + &mut vec![5, 9], + &mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_b), + ); + assert_eq!( + vec![9], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_c) + ); + } + + // Assert after add one transaction for slot 3 + { + let txs = vec![ + build_sanitized_transaction_for_test(2, &write_account_a, &write_account_c), + build_sanitized_transaction_for_test( + 0, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + ), + ]; + prioritization_fee_cache.update(bank3.clone(), txs.iter()); + // before block is marked as completed + assert_eq!( + vec![5], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_a) + ); + assert_vec_eq( + &mut vec![5, 9], + &mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_b), + ); + assert_eq!( + vec![9], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_c) + ); + // after block is completed + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank3.slot()); + assert_vec_eq( + &mut vec![5, 2], + &mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_a), + ); + assert_vec_eq( + &mut vec![5, 9], + &mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_b), + ); + assert_vec_eq( + &mut vec![9, 2], + &mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_c), + ); + } + } +} diff --git a/core/src/transaction_priority_details.rs b/runtime/src/transaction_priority_details.rs similarity index 100% rename from core/src/transaction_priority_details.rs rename to runtime/src/transaction_priority_details.rs From 216880f55271314216d7ef5f75c0d051ae40463c Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Thu, 5 Jan 2023 11:59:20 -0700 Subject: [PATCH 2/4] Fix conflicts --- core/src/replay_stage.rs | 51 +++----------------------- core/src/tvu.rs | 19 ++-------- core/src/unprocessed_packet_batches.rs | 11 ++---- core/src/validator.rs | 6 +-- 4 files changed, 13 insertions(+), 74 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 646cebaf44cba4..ba9158e1249194 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -398,12 +398,8 @@ impl ReplayStage { drop_bank_sender: Sender>>, block_metadata_notifier: Option, log_messages_bytes_limit: Option, -<<<<<<< HEAD - ) -> Self { -======= prioritization_fee_cache: Arc, - ) -> Result { ->>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) + ) -> Self { let mut tower = if let Some(process_blockstore) = maybe_process_blockstore { let tower = process_blockstore.process_to_create_tower(); info!("Tower state: {:?}", tower); @@ -536,6 +532,7 @@ impl ReplayStage { block_metadata_notifier.clone(), &mut replay_timing, log_messages_bytes_limit, + &prioritization_fee_cache, ); replay_active_banks_time.stop(); @@ -605,14 +602,6 @@ impl ReplayStage { &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, &ancestor_hashes_replay_update_sender, -<<<<<<< HEAD -======= - block_metadata_notifier.clone(), - &mut replay_timing, - log_messages_bytes_limit, - replay_slots_concurrently, - &prioritization_fee_cache, ->>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) ); } process_duplicate_slots_time.stop(); @@ -2676,13 +2665,8 @@ impl ReplayStage { block_metadata_notifier: Option, replay_timing: &mut ReplayTiming, log_messages_bytes_limit: Option, -<<<<<<< HEAD - ) -> bool { -======= - replay_slots_concurrently: bool, prioritization_fee_cache: &PrioritizationFeeCache, - ) -> bool /* completed a bank */ { ->>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) + ) -> bool { let active_bank_slots = bank_forks.read().unwrap().active_bank_slots(); let num_active_banks = active_bank_slots.len(); trace!( @@ -2711,6 +2695,7 @@ impl ReplayStage { replay_timing, log_messages_bytes_limit, &active_bank_slots, + prioritization_fee_cache, ) } else { active_bank_slots @@ -2728,6 +2713,7 @@ impl ReplayStage { replay_timing, log_messages_bytes_limit, *bank_slot, + prioritization_fee_cache, ) }) .collect() @@ -2744,34 +2730,9 @@ impl ReplayStage { replay_vote_sender, replay_timing, log_messages_bytes_limit, -<<<<<<< HEAD active_bank_slots[0], - )] -======= - &active_bank_slots, prioritization_fee_cache, - ) - } else { - active_bank_slots - .iter() - .map(|bank_slot| { - Self::replay_active_bank( - blockstore, - bank_forks, - my_pubkey, - vote_account, - progress, - transaction_status_sender, - verify_recyclers, - replay_vote_sender, - replay_timing, - log_messages_bytes_limit, - *bank_slot, - prioritization_fee_cache, - ) - }) - .collect() ->>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) + )] }; Self::process_replay_results( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index c0ff81cb95ac04..4ec82901447b99 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -129,12 +129,8 @@ impl Tvu { accounts_background_request_sender: AbsRequestSender, log_messages_bytes_limit: Option, connection_cache: &Arc, -<<<<<<< HEAD - ) -> Self { -======= prioritization_fee_cache: &Arc, - ) -> Result { ->>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) + ) -> Self { let TvuSockets { repair: repair_socket, fetch: fetch_sockets, @@ -293,12 +289,8 @@ impl Tvu { drop_bank_sender, block_metadata_notifier, log_messages_bytes_limit, -<<<<<<< HEAD - ); -======= prioritization_fee_cache.clone(), - )?; ->>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) + ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { LedgerCleanupService::new( @@ -461,13 +453,8 @@ pub mod tests { AbsRequestSender::default(), None, &Arc::new(ConnectionCache::default()), -<<<<<<< HEAD - ); -======= &_ignored_prioritization_fee_cache, - ) - .expect("assume success"); ->>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) + ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); poh_service.join().unwrap(); diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index 5d60e3a0550fd9..c732a9e2052380 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -1,14 +1,9 @@ use { -<<<<<<< HEAD - crate::transaction_priority_details::{ - GetTransactionPriorityDetails, TransactionPriorityDetails, - }, -======= - crate::immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}, ->>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) min_max_heap::MinMaxHeap, solana_perf::packet::{Packet, PacketBatch}, - solana_runtime::transaction_priority_details::TransactionPriorityDetails, + solana_runtime::transaction_priority_details::{ + GetTransactionPriorityDetails, TransactionPriorityDetails, + }, solana_sdk::{ feature_set, hash::Hash, diff --git a/core/src/validator.rs b/core/src/validator.rs index bb0832335762d5..818e835d435ac9 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1020,12 +1020,8 @@ impl Validator { accounts_background_request_sender, config.runtime_config.log_messages_bytes_limit, &connection_cache, -<<<<<<< HEAD - ); -======= &prioritization_fee_cache, - )?; ->>>>>>> 8bb039d08 (collect min prioritization fees when replaying sanitized transactions (#26709)) + ); let tpu = Tpu::new( &cluster_info, From f74a52f0bc83e139e47c5cdf4d6f56dc3488491d Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Thu, 5 Jan 2023 13:05:47 -0700 Subject: [PATCH 3/4] Don't use api from the future --- runtime/src/prioritization_fee.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/runtime/src/prioritization_fee.rs b/runtime/src/prioritization_fee.rs index 3c225090278989..bd847dd4224163 100644 --- a/runtime/src/prioritization_fee.rs +++ b/runtime/src/prioritization_fee.rs @@ -153,7 +153,11 @@ impl PrioritizationFee { } pub fn get_min_transaction_fee(&self) -> Option { - (self.min_transaction_fee != u64::MAX).then_some(self.min_transaction_fee) + if self.min_transaction_fee != u64::MAX { + Some(self.min_transaction_fee) + } else { + None + } } pub fn get_writable_account_fee(&self, key: &Pubkey) -> Option { From 669ecc1f9ed9180fdfe6bdf7a0171812aee978f3 Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Fri, 2 Sep 2022 10:02:22 -0500 Subject: [PATCH 4/4] fix a flacky test (#27572) --- runtime/src/prioritization_fee_cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index 3ecf11cdc61b7f..1399544af81dff 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -485,7 +485,7 @@ mod tests { let slot = bank.slot(); let mut prioritization_fee_cache = PrioritizationFeeCache::default(); - prioritization_fee_cache.update(bank, txs.iter()); + sync_update(&mut prioritization_fee_cache, bank, txs.iter()); // assert block minimum fee and account a, b, c fee accordingly {