Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

refactor function for stable perf #19386

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 147 additions & 18 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,41 @@ impl BankingStage {
Some(&packet.data[msg_start..msg_end])
}

fn filter_transactions_by_cost(
verified_transactions_with_packet_indexes: impl Iterator<Item = (SanitizedTransaction, usize)>,
cost_tracker: &Arc<RwLock<CostTracker>>,
) -> (Vec<SanitizedTransaction>, Vec<usize>, Vec<usize>) {
let (filtered_transactions_with_packet_indexes,
retryable_transactions_with_packet_indexes):
(Vec<(_,_)>, Vec<(_, _)>) = {
let cost_tracker_readonly = cost_tracker.read().unwrap();
verified_transactions_with_packet_indexes
.into_iter()
.partition(|(tx, _packet_index)| {
cost_tracker_readonly.would_transaction_fit(tx).is_ok()
}
)
};

let (filtered_transactions, filter_transaction_packet_indexes) =
filtered_transactions_with_packet_indexes
.into_iter()
.map(|(a, b)| (a, b))
.unzip();

let (_, retryable_transaction_packet_indexes): (Vec<_>, Vec<_>) =
retryable_transactions_with_packet_indexes
.into_iter()
.map(|(a, b)| (a, b))
.unzip();

(
filtered_transactions,
filter_transaction_packet_indexes,
retryable_transaction_packet_indexes,
)
}

// This function deserializes packets into transactions, computes the blake3 hash of transaction messages,
// and verifies secp256k1 instructions. A list of valid transactions are returned with their message hashes
// and packet indexes.
Expand All @@ -1050,8 +1085,6 @@ impl BankingStage {
cost_tracker: &Arc<RwLock<CostTracker>>,
banking_stage_stats: &BankingStageStats,
) -> (Vec<SanitizedTransaction>, Vec<usize>, Vec<usize>) {
let mut retryable_transaction_packet_indexes: Vec<usize> = vec![];

let verified_transactions_with_packet_indexes: Vec<_> = transaction_indexes
.iter()
.filter_map(|tx_index| {
Expand All @@ -1077,23 +1110,15 @@ impl BankingStage {
);

let mut cost_tracker_check_time = Measure::start("cost_tracker_check_time");
let (filtered_transactions, filter_transaction_packet_indexes) = {
let cost_tracker_readonly = cost_tracker.read().unwrap();
verified_transactions_with_packet_indexes
.into_iter()
.filter_map(|(tx, tx_index)| {
let result = cost_tracker_readonly.would_transaction_fit(&tx);
if result.is_err() {
debug!("transaction {:?} would exceed limit: {:?}", tx, result);
retryable_transaction_packet_indexes.push(tx_index);
return None;
}
Some((tx, tx_index))
})
.unzip()
};
let (
filtered_transactions,
filter_transaction_packet_indexes,
retryable_transaction_packet_indexes,
) = Self::filter_transactions_by_cost(
verified_transactions_with_packet_indexes.into_iter(),
cost_tracker,
);
cost_tracker_check_time.stop();

banking_stage_stats
.cost_tracker_check_elapsed
.fetch_add(cost_tracker_check_time.as_us(), Ordering::Relaxed);
Expand Down Expand Up @@ -1583,6 +1608,7 @@ mod tests {
sync::{
atomic::{AtomicBool, Ordering},
mpsc::Receiver,
Arc, RwLock,
},
thread::sleep,
};
Expand Down Expand Up @@ -2952,4 +2978,107 @@ mod tests {
transaction.message_data()
);
}

#[test]
fn test_filter_transactions_by_cost_pass() {
solana_logger::setup();

// build simple TXs
let tx_count: usize = 4;
let mut simple_transactions: Vec<SanitizedTransaction> = vec![];
for i in 0..tx_count {
let keypair = Keypair::new();
let pubkey = solana_sdk::pubkey::new_rand();
let blockhash = Hash::new_unique();
let simple_transaction = SanitizedTransaction::try_from(system_transaction::transfer(
&keypair, &pubkey, i as u64, blockhash,
))
.unwrap();

simple_transactions.push(simple_transaction);
}
assert_eq!(tx_count, simple_transactions.len());

// build sanitized ransactions with packet index
let mut simple_transactions_with_indexes: Vec<(SanitizedTransaction, usize)> = vec![];
for i in 0..tx_count {
simple_transactions_with_indexes.push((simple_transactions[i].clone(), i));
}

let cost_model = Arc::new(RwLock::new(CostModel::default()));
let cost_tracker = Arc::new(RwLock::new(CostTracker::new(cost_model)));

let (
filtered_transactions,
filter_transaction_packet_indexes,
retryable_transaction_packet_indexes,
) = BankingStage::filter_transactions_by_cost(
simple_transactions_with_indexes.into_iter(),
&cost_tracker,
);
assert_eq!(tx_count, filtered_transactions.len());
assert_eq!(tx_count, filter_transaction_packet_indexes.len());
assert_eq!(0, retryable_transaction_packet_indexes.len());
assert_eq!(
simple_transactions[0].signature(),
filtered_transactions[0].signature()
);
assert_eq!(
simple_transactions[1].signature(),
filtered_transactions[1].signature()
);
assert_eq!(
simple_transactions[2].signature(),
filtered_transactions[2].signature()
);
assert_eq!(
simple_transactions[3].signature(),
filtered_transactions[3].signature()
);
assert_eq!(vec![0, 1, 2, 3], filter_transaction_packet_indexes);
}

#[test]
fn test_filter_transactions_by_cost_retry() {
solana_logger::setup();

// build simple TXs
let tx_count: usize = 4;
let mut simple_transactions: Vec<SanitizedTransaction> = vec![];
for i in 0..tx_count {
let keypair = Keypair::new();
let pubkey = solana_sdk::pubkey::new_rand();
let blockhash = Hash::new_unique();
let simple_transaction = SanitizedTransaction::try_from(system_transaction::transfer(
&keypair, &pubkey, i as u64, blockhash,
))
.unwrap();

simple_transactions.push(simple_transaction);
}
assert_eq!(tx_count, simple_transactions.len());

// build sanitized ransactions with packet index
let mut simple_transactions_with_indexes: Vec<(SanitizedTransaction, usize)> = vec![];
for i in 0..tx_count {
simple_transactions_with_indexes.push((simple_transactions[i].clone(), i));
}

let cost_limit = 1;
let cost_model = Arc::new(RwLock::new(CostModel::new(cost_limit, cost_limit)));
let cost_tracker = Arc::new(RwLock::new(CostTracker::new(cost_model)));

let (
filtered_transactions,
filter_transaction_packet_indexes,
retryable_transaction_packet_indexes,
) = BankingStage::filter_transactions_by_cost(
simple_transactions_with_indexes.into_iter(),
&cost_tracker,
);
assert_eq!(0, filtered_transactions.len());
assert_eq!(0, filter_transaction_packet_indexes.len());
assert_eq!(tx_count, retryable_transaction_packet_indexes.len());
assert_eq!(vec![0, 1, 2, 3], retryable_transaction_packet_indexes);
}
}