diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index faad90c1e76836..16ee813fa7dbc5 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -481,87 +481,59 @@ impl BankingStage { Ok((chunk_start, unprocessed_txs)) } - fn filter_out_invalid_transactions( - bank: &Bank, - transactions: &[Transaction], - pending_txs: &[usize], - ) -> Vec { - let mut error_counters = ErrorCounters::default(); - let mut mask = vec![Err(TransactionError::BlockhashNotFound); transactions.len()]; - pending_txs.iter().for_each(|x| mask[*x] = Ok(())); - - let result = bank.check_transactions( - transactions, - &mask, - MAX_RECENT_BLOCKHASHES / 2, - &mut error_counters, - ); - result - .iter() - .enumerate() - .filter_map(|(index, x)| if x.is_ok() { Some(index) } else { None }) - .collect_vec() - } - - fn process_received_packets_using_closure<'a, FnProcessTxs, FnFilterTxs>( - bank: &'a Arc, - poh: &'a Arc>, + // This function returns a vector of transactions that are not None. It also returns a vector + // with position of the transaction in the input list + fn filter_transaction_indexes( transactions: Vec>, indexes: &[usize], - fn_proc: FnProcessTxs, - fn_filter: FnFilterTxs, - ) -> Result<(usize, usize, Vec)> - where - FnProcessTxs: Fn( - &'a Bank, - &[Transaction], - &'a Arc>, - ) -> Result<(usize, Vec)>, - FnFilterTxs: Fn(&'a Bank, &[Transaction], &[usize]) -> Vec, - { - debug!("banking-stage-tx bank {}", bank.slot()); - - debug!( - "bank: {} transactions received {}", - bank.slot(), - transactions.len() - ); - let (verified_transactions, verified_indexes): (Vec<_>, Vec<_>) = transactions + ) -> (Vec, Vec) { + transactions .into_iter() .zip(indexes) .filter_map(|(tx, index)| match tx { None => None, Some(tx) => Some((tx, index)), }) - .unzip(); - - debug!( - "bank: {} verified transactions {}", - bank.slot(), - verified_transactions.len() - ); - - let tx_len = verified_transactions.len(); + .unzip() + } - let (processed, unprocessed_txs) = fn_proc(bank, &verified_transactions, poh)?; - let valid_unprocessed_txs = fn_filter(bank, &verified_transactions, &unprocessed_txs); + // This function creates a filter of transaction results with Ok() for every pending + // transaction. The non-pending transactions are marked with TransactionError + fn prepare_filter_for_pending_transactions( + transactions: &[Transaction], + pending_tx_indexes: &[usize], + ) -> Vec> { + let mut mask = vec![Err(TransactionError::BlockhashNotFound); transactions.len()]; + pending_tx_indexes.iter().for_each(|x| mask[*x] = Ok(())); + mask + } - let unprocessed_indexes: Vec<_> = valid_unprocessed_txs + // This function returns a vector containing index of all valid transactions. A valid + // transaction has result Ok() as the value + fn filter_valid_transaction_indexes( + valid_txs: &[transaction::Result<()>], + transaction_indexes: &[usize], + ) -> Vec { + let valid_transactions = valid_txs .iter() - .map(|x| verified_indexes[*x]) - .collect(); + .enumerate() + .filter_map(|(index, x)| if x.is_ok() { Some(index) } else { None }) + .collect_vec(); - Ok((processed, tx_len, unprocessed_indexes)) + valid_transactions + .iter() + .map(|x| transaction_indexes[*x]) + .collect() } - fn process_received_packets<'a>( - bank: &'a Arc, - poh: &'a Arc>, + fn process_received_packets( + bank: &Arc, + poh: &Arc>, msgs: &Packets, - packet_indexes: Vec, + transaction_indexes: Vec, ) -> Result<(usize, usize, Vec)> { let packets = Packets::new( - packet_indexes + transaction_indexes .iter() .map(|x| msgs.packets[*x].to_owned()) .collect_vec(), @@ -569,18 +541,42 @@ impl BankingStage { let transactions = Self::deserialize_transactions(&packets); - Self::process_received_packets_using_closure( - bank, - poh, - transactions, - &packet_indexes, - |x: &'a Bank, y: &[Transaction], z: &'a Arc>| { - Self::process_transactions(x, y, z) - }, - |x: &'a Bank, y: &[Transaction], z: &[usize]| { - Self::filter_out_invalid_transactions(x, y, z) - }, - ) + debug!( + "banking-stage-tx bank: {} transactions received {}", + bank.slot(), + transactions.len() + ); + + let (transactions, transaction_indexes) = + Self::filter_transaction_indexes(transactions, &transaction_indexes); + + debug!( + "bank: {} filtered transactions {}", + bank.slot(), + transactions.len() + ); + + let tx_len = transactions.len(); + + let (processed, unprocessed_tx_indexes) = + Self::process_transactions(bank, &transactions, poh)?; + + let filter = + Self::prepare_filter_for_pending_transactions(&transactions, &unprocessed_tx_indexes); + + let mut error_counters = ErrorCounters::default(); + let result = bank.check_transactions( + &transactions, + &filter, + MAX_RECENT_BLOCKHASHES / 2, + &mut error_counters, + ); + + Ok(( + processed, + tx_len, + Self::filter_valid_transaction_indexes(&result, &transaction_indexes), + )) } /// Process the incoming packets @@ -1047,153 +1043,158 @@ mod tests { } #[test] - fn test_bank_process_received_transactions() { + fn test_bank_filter_transaction_indexes() { let (genesis_block, mint_keypair) = create_genesis_block(10_000); - let bank = Arc::new(Bank::new(&genesis_block)); - let ledger_path = get_tmp_ledger_path!(); - { - let blocktree = - Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); - let (poh_recorder, _entry_receiver) = PohRecorder::new( - bank.tick_height(), - bank.last_blockhash(), - bank.slot(), - None, - bank.ticks_per_slot(), - &Pubkey::default(), - &Arc::new(blocktree), - &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), - ); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let pubkey = Pubkey::new_rand(); - let pubkey = Pubkey::new_rand(); + let transactions = vec![ + None, + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + None, + None, + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + None, + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + None, + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + None, + None, + ]; - let transactions = vec![ - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_block.hash(), - 0, - )), - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_block.hash(), - 0, - )), - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_block.hash(), - 0, - )), - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_block.hash(), - 0, - )), - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_block.hash(), - 0, - )), - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_block.hash(), - 0, - )), - ]; + let filtered_transactions = vec![ + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + ]; - assert_eq!( - BankingStage::process_received_packets_using_closure( - &bank, - &poh_recorder, - transactions.clone(), - &vec![0, 1, 2, 3, 4, 5], - |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(( - y.len(), - vec![0, 1, 2, 3, 4, 5] - )), - |_x: &Bank, _y: &[Transaction], _z: &[usize]| vec![0, 1, 2, 3, 4, 5], - ) - .ok(), - Some((6, 6, vec![0, 1, 2, 3, 4, 5])) - ); + assert_eq!( + BankingStage::filter_transaction_indexes( + transactions.clone(), + &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], + ), + (filtered_transactions.clone(), vec![1, 2, 3, 6, 8, 10]) + ); - assert_eq!( - BankingStage::process_received_packets_using_closure( - &bank, - &poh_recorder, - transactions.clone(), - &vec![0, 1, 2, 3, 4, 5], - |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(( - y.len(), - vec![0, 1, 2, 3, 4, 5] - )), - |_x: &Bank, _y: &[Transaction], _z: &[usize]| vec![0, 1, 2, 4, 5], - ) - .ok(), - Some((6, 6, vec![0, 1, 2, 4, 5])) - ); + assert_eq!( + BankingStage::filter_transaction_indexes( + transactions, + &vec![1, 2, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15], + ), + (filtered_transactions, vec![2, 4, 5, 9, 11, 13]) + ); + } - assert_eq!( - BankingStage::process_received_packets_using_closure( - &bank, - &poh_recorder, - transactions.clone(), - &vec![0, 1, 2, 3, 4, 5], - |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(( - y.len() - 1, - vec![0, 1, 2, 4, 5] - )), - |_x: &Bank, _y: &[Transaction], _z: &[usize]| vec![0, 1, 2, 4, 5], - ) - .ok(), - Some((5, 6, vec![0, 1, 2, 4, 5])) - ); + #[test] + fn test_bank_prepare_filter_for_pending_transaction() { + let (genesis_block, mint_keypair) = create_genesis_block(10_000); + let pubkey = Pubkey::new_rand(); - assert_eq!( - BankingStage::process_received_packets_using_closure( - &bank, - &poh_recorder, - transactions.clone(), - &vec![0, 1, 2, 3, 4, 5], - |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(( - y.len() - 3, - vec![2, 4, 5] - )), - |_x: &Bank, _y: &[Transaction], _z: &[usize]| vec![2, 4, 5], - ) - .ok(), - Some((3, 6, vec![2, 4, 5])) - ); + let transactions = vec![ + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + ]; - assert_eq!( - BankingStage::process_received_packets_using_closure( - &bank, - &poh_recorder, - transactions.clone(), - &vec![0, 1, 2, 3, 4, 5], - |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(( - y.len() - 3, - vec![2, 4, 5] - )), - |_x: &Bank, _y: &[Transaction], _z: &[usize]| vec![2, 5], - ) - .ok(), - Some((3, 6, vec![2, 5])) - ); - } - Blocktree::destroy(&ledger_path).unwrap(); + assert_eq!( + BankingStage::prepare_filter_for_pending_transactions(&transactions, &vec![2, 4, 5],), + vec![ + Err(TransactionError::BlockhashNotFound), + Err(TransactionError::BlockhashNotFound), + Ok(()), + Err(TransactionError::BlockhashNotFound), + Ok(()), + Ok(()) + ] + ); + + assert_eq!( + BankingStage::prepare_filter_for_pending_transactions(&transactions, &vec![0, 2, 3],), + vec![ + Ok(()), + Err(TransactionError::BlockhashNotFound), + Ok(()), + Ok(()), + Err(TransactionError::BlockhashNotFound), + Err(TransactionError::BlockhashNotFound), + ] + ); + } + + #[test] + fn test_bank_filter_valid_transaction_indexes() { + assert_eq!( + BankingStage::filter_valid_transaction_indexes( + &vec![ + Err(TransactionError::BlockhashNotFound), + Err(TransactionError::BlockhashNotFound), + Ok(()), + Err(TransactionError::BlockhashNotFound), + Ok(()), + Ok(()) + ], + &vec![2, 4, 5, 9, 11, 13] + ), + vec![5, 11, 13] + ); + + assert_eq!( + BankingStage::filter_valid_transaction_indexes( + &vec![ + Ok(()), + Err(TransactionError::BlockhashNotFound), + Err(TransactionError::BlockhashNotFound), + Ok(()), + Ok(()), + Ok(()) + ], + &vec![1, 6, 7, 9, 31, 43] + ), + vec![1, 9, 31, 43] + ); } #[test]