Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Banking thredas send batch of transactions to cost_model_thread right…
Browse files Browse the repository at this point in the history
… after committed to bank, avoiding cloning individual `transaction` when send.
  • Loading branch information
tao-stones committed Jun 29, 2021
1 parent 5858678 commit d3c3412
Showing 1 changed file with 45 additions and 33 deletions.
78 changes: 45 additions & 33 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ pub enum BufferedPacketsDecision {
Hold,
}

pub struct CommittedTransactionBatch {
pub transactions: Vec<Transaction>,
pub execution_results: Vec<TransactionExecutionResult>,
}

impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
#[allow(clippy::new_ret_no_self)]
Expand Down Expand Up @@ -341,8 +346,8 @@ impl BankingStage {

// start cost model thread
let (cost_update_sender, cost_update_receiver): (
Sender<Transaction>,
Receiver<Transaction>,
Sender<CommittedTransactionBatch>,
Receiver<CommittedTransactionBatch>,
) = channel();

// TODO TAO - passing `exit` to this loop to allow the thread to end
Expand Down Expand Up @@ -470,7 +475,7 @@ impl BankingStage {
recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_update_sender: Sender<Transaction>,
cost_update_sender: Sender<CommittedTransactionBatch>,
) {
let mut rebuffered_packets_len = 0;
let mut new_tx_count = 0;
Expand Down Expand Up @@ -621,7 +626,7 @@ impl BankingStage {
recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_update_sender: Sender<Transaction>,
cost_update_sender: Sender<CommittedTransactionBatch>,
) -> BufferedPacketsDecision {
let bank_start;
let (
Expand Down Expand Up @@ -732,7 +737,7 @@ impl BankingStage {
fn cost_update_loop(
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_update_receiver: Receiver<Transaction>,
cost_update_receiver: Receiver<CommittedTransactionBatch>,
) {
// TODO TAO - sleep or hot spin? We dont want to delay counting TX too long
// even it is not criitcal to track the cost at micro-sec
Expand All @@ -746,15 +751,19 @@ impl BankingStage {
}
// */

for transaction in cost_update_receiver.try_iter() {
cost_model
.read()
.unwrap()
.calculate_cost_no_alloc(&transaction, &mut tx_cost);
cost_tracker.write().unwrap().add_transaction(
&tx_cost.writable_accounts,
&(tx_cost.account_access_cost + tx_cost.execution_cost),
);
for batch in cost_update_receiver.try_iter() {
for ((result, _), tx) in batch.execution_results.iter().zip(batch.transactions.iter()) {
if result.is_ok() {
cost_model
.read()
.unwrap()
.calculate_cost_no_alloc(&tx, &mut tx_cost);
cost_tracker.write().unwrap().add_transaction(
&tx_cost.writable_accounts,
&(tx_cost.account_access_cost + tx_cost.execution_cost),
);
}
}
}

thread::sleep(wait_timer);
Expand All @@ -776,7 +785,7 @@ impl BankingStage {
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_update_sender: Sender<Transaction>,
cost_update_sender: Sender<CommittedTransactionBatch>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
Expand Down Expand Up @@ -919,6 +928,7 @@ impl BankingStage {
batch: &TransactionBatch,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
cost_tracker_update_sender: Sender<CommittedTransactionBatch>,
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut load_execute_time = Measure::start("load_execute_time");
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
Expand Down Expand Up @@ -1006,6 +1016,15 @@ impl BankingStage {
tx_results.rent_debits,
);
}

// track committed transactions' cost
let transactions = batch.transactions_iter().cloned().collect();
let execution_results = results.to_vec();
cost_tracker_update_sender.send(
CommittedTransactionBatch {
transactions,
execution_results,
} ).expect("send committed transactions to update cost model");
}
commit_time.stop();

Expand Down Expand Up @@ -1035,6 +1054,7 @@ impl BankingStage {
chunk_offset: usize,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
cost_tracker_update_sender: Sender<CommittedTransactionBatch>,
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut lock_time = Measure::start("lock_time");
// Once accounts are locked, other threads cannot encode transactions that will modify the
Expand All @@ -1048,6 +1068,7 @@ impl BankingStage {
&batch,
transaction_status_sender,
gossip_vote_sender,
cost_tracker_update_sender,
);
retryable_txs.iter_mut().for_each(|x| *x += chunk_offset);

Expand Down Expand Up @@ -1078,6 +1099,7 @@ impl BankingStage {
poh: &TransactionRecorder,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
cost_tracker_update_sender: Sender<CommittedTransactionBatch>,
) -> (usize, Vec<usize>) {
let mut chunk_start = 0;
let mut unprocessed_txs = vec![];
Expand All @@ -1094,6 +1116,7 @@ impl BankingStage {
chunk_start,
transaction_status_sender.clone(),
gossip_vote_sender,
cost_tracker_update_sender.clone(),
);
trace!("process_transactions result: {:?}", result);

Expand Down Expand Up @@ -1297,7 +1320,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_update_sender: Sender<Transaction>,
cost_update_sender: Sender<CommittedTransactionBatch>,
) -> (usize, usize, Vec<usize>) {
let mut packet_conversion_time = Measure::start("packet_conversion");
let (transactions, transaction_to_packet_indexes, retryable_packet_indexes) =
Expand Down Expand Up @@ -1332,6 +1355,7 @@ impl BankingStage {
poh,
transaction_status_sender,
gossip_vote_sender,
cost_update_sender,
);
process_tx_time.stop();
let unprocessed_tx_count = unprocessed_tx_indexes.len();
Expand All @@ -1340,32 +1364,20 @@ impl BankingStage {
unprocessed_tx_count
);

// applying cost of processed transactions to shared cost_tracker
let mut cost_tracking_time = Measure::start("cost_tracking_time");
/* TODO TAO - replaced
let mut tx_cost = TransactionCost::new_with_capacity(MAX_WRITABLE_ACCOUNTS);
// */
/* TODO TAO - doing in batch fashino right after committing, to
* save 1) nested for-loop, 2) individual transaction cloning
// applying cost of processed transactions to shared cost_tracker
{
//let cost_model_readonly = cost_model.read().unwrap();
//let mut cost_tracker_mutable = cost_tracker.write().unwrap();
transactions.iter().enumerate().for_each(|(index, tx)| {
if !unprocessed_tx_indexes.iter().any(|&i| i == index) {
cost_update_sender
.send(tx.transaction().clone())
.expect("send transaction to cost_model");
/* TODO TAO - replaced
cost_model
.read()
.unwrap()
.calculate_cost_no_alloc(tx.transaction(), &mut tx_cost);
cost_tracker.write().unwrap().add_transaction(
&tx_cost.writable_accounts,
&(tx_cost.account_access_cost + tx_cost.execution_cost),
);
// */
}
});
}
// */
cost_tracking_time.stop();

let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time");
Expand Down Expand Up @@ -1493,7 +1505,7 @@ impl BankingStage {
recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_update_sender: Sender<Transaction>,
cost_update_sender: Sender<CommittedTransactionBatch>,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
let mms = verified_receiver.recv_timeout(recv_timeout)?;
Expand Down

0 comments on commit d3c3412

Please sign in to comment.