diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index e865be9e71..30ab5607df 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -6,6 +6,7 @@ use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use futures::{FutureExt, StreamExt}; +use nimiq_primitives::account::AccountType; use parking_lot::RwLock; use tokio::sync::broadcast::{channel as broadcast, Sender as BroadcastSender}; use tokio::time::Sleep; @@ -13,7 +14,7 @@ use tokio_stream::wrappers::BroadcastStream; use nimiq_blockchain::{AbstractBlockchain, Blockchain}; use nimiq_database::Environment; -use nimiq_mempool::mempool::TransactionTopic; +use nimiq_mempool::mempool::{ControlTransactionTopic, TransactionTopic}; use nimiq_network_interface::network::Network; use nimiq_transaction::Transaction; @@ -42,6 +43,9 @@ impl Clone for ConsensusProxy { impl ConsensusProxy { pub async fn send_transaction(&self, tx: Transaction) -> Result<(), N::Error> { + if tx.sender_type == AccountType::Staking || tx.recipient_type == AccountType::Staking { + return self.network.publish::(tx).await; + } self.network.publish::(tx).await } diff --git a/mempool/src/executor.rs b/mempool/src/executor.rs index a5bb41a84e..8113d407c5 100644 --- a/mempool/src/executor.rs +++ b/mempool/src/executor.rs @@ -13,7 +13,7 @@ use nimiq_primitives::networks::NetworkId; use nimiq_transaction::Transaction; use crate::filter::MempoolFilter; -use crate::mempool::{MempoolState, TransactionTopic}; +use crate::mempool::{ControlTransactionTopic, MempoolState, TransactionTopic}; use crate::verify::{verify_tx, VerifyErr}; const CONCURRENT_VERIF_TASKS: u32 = 1000; @@ -31,7 +31,7 @@ pub(crate) struct MempoolExecutor { // Ongoing verification tasks counter verification_tasks: Arc, - // Reference to the network, to alow for message validation + // Reference to the network, to allow for message validation network: Arc, // Network ID, used for tx verification @@ -114,3 +114,100 @@ impl Future for MempoolExecutor { Poll::Ready(()) } } + +pub(crate) struct ControlMempoolExecutor { + // Blockchain reference + blockchain: Arc>, + + // The mempool state: the data structure where the transactions are stored + state: Arc>, + + // Mempool filter + filter: Arc>, + + // Ongoing verification tasks counter + verification_tasks: Arc, + + // Reference to the network, to allow for message validation + network: Arc, + + // Network ID, used for tx verification + network_id: Arc, + + // Transaction stream that is used to listen to transactions from the network + txn_stream: BoxStream<'static, (Transaction, ::PubsubId)>, +} + +impl ControlMempoolExecutor { + pub fn new( + blockchain: Arc>, + state: Arc>, + filter: Arc>, + network: Arc, + txn_stream: BoxStream<'static, (Transaction, ::PubsubId)>, + ) -> Self { + Self { + blockchain: blockchain.clone(), + state, + filter, + network, + network_id: Arc::new(blockchain.read().network_id), + verification_tasks: Arc::new(AtomicU32::new(0)), + txn_stream, + } + } +} + +impl Future for ControlMempoolExecutor { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + while let Some((tx, pubsub_id)) = ready!(self.txn_stream.as_mut().poll_next_unpin(cx)) { + if self.verification_tasks.fetch_add(0, AtomicOrdering::SeqCst) + >= CONCURRENT_VERIF_TASKS + { + log::debug!("Reached the max number of verification tasks"); + continue; + } + + let blockchain = Arc::clone(&self.blockchain); + let mempool_state = Arc::clone(&self.state); + let filter = Arc::clone(&self.filter); + let tasks_count = Arc::clone(&self.verification_tasks); + let network_id = Arc::clone(&self.network_id); + let network = Arc::clone(&self.network); + + // Spawn the transaction verification task + tokio::task::spawn(async move { + tasks_count.fetch_add(1, AtomicOrdering::SeqCst); + + // Verifying and pushing the TX in a separate scope to drop the lock that is returned by + // the verify_tx function immediately + let acceptance = { + let verify_tx_ret = + verify_tx(&tx, blockchain, network_id, &mempool_state, filter).await; + + match verify_tx_ret { + Ok(mempool_state_lock) => { + RwLockUpgradableReadGuard::upgrade(mempool_state_lock).put(&tx); + MsgAcceptance::Accept + } + // Reject the message if signature verification fails or transaction is invalid + // for current validation window + Err(VerifyErr::InvalidSignature) => MsgAcceptance::Reject, + Err(VerifyErr::InvalidTxWindow) => MsgAcceptance::Reject, + Err(_) => MsgAcceptance::Ignore, + } + }; + + network.validate_message::(pubsub_id, acceptance); + + tasks_count.fetch_sub(1, AtomicOrdering::SeqCst); + }); + } + + // We have exited the loop, so poll_next() must have returned Poll::Ready(None). + // Thus, we terminate the executor future. + Poll::Ready(()) + } +} diff --git a/mempool/src/mempool.rs b/mempool/src/mempool.rs index 78868e8827..eed47ad577 100644 --- a/mempool/src/mempool.rs +++ b/mempool/src/mempool.rs @@ -23,7 +23,7 @@ use nimiq_transaction::account::staking_contract::{ use nimiq_transaction::Transaction; use crate::config::MempoolConfig; -use crate::executor::MempoolExecutor; +use crate::executor::{ControlMempoolExecutor, MempoolExecutor}; use crate::filter::{MempoolFilter, MempoolRules}; #[cfg(feature = "metrics")] use crate::mempool_metrics::MempoolMetrics; @@ -41,6 +41,18 @@ impl Topic for TransactionTopic { const VALIDATE: bool = true; } +/// Control Transaction topic for the Mempool to request transactions from the network +#[derive(Clone, Debug, Default)] +pub struct ControlTransactionTopic; + +impl Topic for ControlTransactionTopic { + type Item = Transaction; + + const BUFFER_SIZE: usize = 100; + const NAME: &'static str = "Controltransactions"; + const VALIDATE: bool = true; +} + /// Struct defining the Mempool pub struct Mempool { /// Blockchain reference @@ -54,6 +66,9 @@ pub struct Mempool { /// Mempool executor handle used to stop the executor pub(crate) executor_handle: Mutex>, + + /// Mempool executor handle used to stop the control mempool executor + pub(crate) control_executor_handle: Mutex>, } impl Mempool { @@ -63,18 +78,13 @@ impl Mempool { /// Creates a new mempool pub fn new(blockchain: Arc>, config: MempoolConfig) -> Self { let state = MempoolState { - transactions: HashMap::new(), - best_transactions: KeyedPriorityQueue::new(), - worst_transactions: KeyedPriorityQueue::new(), - oldest_transactions: KeyedPriorityQueue::new(), + regular_transactions: MempoolTransactions::new(config.size_limit), + control_transactions: MempoolTransactions::new(config.size_limit), state_by_sender: HashMap::new(), outgoing_validators: HashMap::new(), outgoing_stakers: HashMap::new(), creating_validators: HashMap::new(), creating_stakers: HashMap::new(), - total_size_limit: config.size_limit, - total_size: 0, - tx_counter: 0, #[cfg(feature = "metrics")] metrics: Default::default(), }; @@ -89,18 +99,25 @@ impl Mempool { config.filter_limit, ))), executor_handle: Mutex::new(None), + control_executor_handle: Mutex::new(None), } } - /// Starts the mempool executor + /// Starts the mempool executors /// - /// Once this function is called, the mempool executor is spawned. - /// The executor will subscribe to the transaction topic from the the network. - pub async fn start_executor(&self, network: Arc, monitor: Option) { + /// Once this function is called, the mempool executors are spawned. + /// Both the regular and control txn executors are subscribed to their respective txn topics. + pub async fn start_executors( + &self, + network: Arc, + monitor: Option, + control_monitor: Option, + ) { let mut executor_handle = self.executor_handle.lock().await; + let mut control_executor_handle = self.control_executor_handle.lock().await; - if executor_handle.is_some() { - // If we already have an executor running, don't do anything + if executor_handle.is_some() && control_executor_handle.is_some() { + // If we already have both executors running dont do anything return; } @@ -127,6 +144,34 @@ impl Mempool { // Set the executor handle *executor_handle = Some(abort_handle); + + // Subscribe to the control transaction topic + let txn_stream = network + .subscribe::() + .await + .unwrap(); + + let control_mempool_executor = ControlMempoolExecutor::new( + Arc::clone(&self.blockchain), + Arc::clone(&self.state), + Arc::clone(&self.filter), + Arc::clone(&network), + txn_stream, + ); + + // Start the executor and obtain its handle + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + + let future = Abortable::new(control_mempool_executor, abort_registration); + + if let Some(monitor) = control_monitor { + tokio::spawn(monitor.instrument(future)); + } else { + tokio::spawn(future); + } + + // Set the control mempool executor handle + *control_executor_handle = Some(abort_handle); } /// Starts the mempool executor with a custom transaction stream @@ -162,14 +207,48 @@ impl Mempool { *executor_handle = Some(abort_handle); } - /// Stops the mempool executor + /// Starts the control mempool executor with a custom transaction stream /// - /// This functions should only be called only after one of the functions to start the executor is called. - pub async fn stop_executor(&self, network: Arc) { + /// Once this function is called, the mempool executor is spawned. + /// The executor won't subscribe to the transaction topic from the network but will use the provided transaction + /// stream instead. + pub async fn start_control_executor_with_txn_stream( + &self, + txn_stream: BoxStream<'static, (Transaction, ::PubsubId)>, + network: Arc, + ) { + let mut control_executor_handle = self.control_executor_handle.lock().await; + + if control_executor_handle.is_some() { + // If we already have an executor running, don't do anything + return; + } + + let mempool_executor = ControlMempoolExecutor::::new( + Arc::clone(&self.blockchain), + Arc::clone(&self.state), + Arc::clone(&self.filter), + Arc::clone(&network), + txn_stream, + ); + + // Start the executor and obtain its handle + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + tokio::spawn(Abortable::new(mempool_executor, abort_registration)); + + // Set the executor handle + *control_executor_handle = Some(abort_handle); + } + + /// Stops the mempool executors + /// + /// This functions should only be called only after one of the functions to start the executors is called. + pub async fn stop_executors(&self, network: Arc) { let mut handle = self.executor_handle.lock().await; + let mut control_executor_handle = self.control_executor_handle.lock().await; - if handle.is_none() { - // If there isn't any executor running we return + if handle.is_none() && control_executor_handle.is_none() { + // If there isn't any executors running we return return; } @@ -178,6 +257,18 @@ impl Mempool { // Stop the executor handle.take().expect("Expected an executor handle").abort(); + + // Unsubscribe to the network control TX topic before killing the executor + network + .unsubscribe::() + .await + .unwrap(); + + // Stop the control executor + control_executor_handle + .take() + .expect("Expected a control executor handle") + .abort(); } /// Stops the mempool executor without TX stream @@ -198,6 +289,27 @@ impl Mempool { handle.take().expect("Expected an executor handle").abort(); } + /// Stops the control mempool executor without TX stream + /// + /// This function is used for testing purposes (along with the start_executor_with_txn_stream function) + /// it is the responsibility of the caller to subscribe and unsubscribe from the topic accordingly + /// + /// This functions should only be called only after one of the functions to start the executor is called. + pub async fn stop_control_executor_without_unsubscribe(&self) { + let mut handle = self.control_executor_handle.lock().await; + + if handle.is_none() { + // If there isn't any executor running we return + return; + } + + // Stop the executor + handle + .take() + .expect("Expected a control executor handle") + .abort(); + } + /// Updates the mempool given a set of reverted and adopted blocks. /// /// During a Blockchain extend event a new block is mined which implies that: @@ -224,32 +336,20 @@ impl Mempool { let block_height = blockchain.block_number() + 1; // First remove the transactions that are no longer valid due to age. - loop { - // Get the hash of the oldest transaction. - let tx_hash = match mempool_state.oldest_transactions.peek() { - None => { - break; - } - Some((tx_hash, _)) => tx_hash.clone(), - }; + let expired_regular_txns = mempool_state + .regular_transactions + .get_expired_txns(block_height); - // Get a reference to the transaction. - let tx = mempool_state.get(&tx_hash).unwrap(); + for tx_hash in expired_regular_txns { + mempool_state.remove(&tx_hash, EvictionReason::Expired); + } - // Check if it is still valid. - if tx.is_valid_at(block_height) { - // No need to process more transactions, since we arrived to the oldest one that is valid - break; - } else { - // Remove the transaction from the mempool. - trace!( - reason = "TX is old and it is no longer valid at block height", - block_height = block_height, - "Mempool-update removing tx {} from mempool", - tx_hash - ); - mempool_state.remove(&tx_hash, EvictionReason::Expired); - } + let expired_control_txns = mempool_state + .control_transactions + .get_expired_txns(block_height); + + for tx_hash in expired_control_txns { + mempool_state.remove(&tx_hash, EvictionReason::Expired); } // Now iterate over the transactions in the adopted blocks: @@ -468,14 +568,78 @@ impl Mempool { /// Returns a vector with accepted transactions from the mempool. /// /// Returns the highest fee per byte up to max_bytes transactions and removes them from the mempool - pub fn get_transactions_for_block(&self, max_bytes: usize) -> Vec { + /// It also return the sum of the serialied size of the returned transactions + pub fn get_transactions_for_block(&self, max_bytes: usize) -> (Vec, usize) { + let mut tx_vec = vec![]; + + let state = self.state.upgradable_read(); + + if state.regular_transactions.is_empty() { + log::debug!("Requesting regular txns and there are no txns in the mempool "); + return (tx_vec, 0_usize); + } + + let mut size = 0_usize; + + let mut mempool_state_upgraded = RwLockUpgradableReadGuard::upgrade(state); + + loop { + // Get the hash of the highest paying regular transaction. + let tx_hash = match mempool_state_upgraded + .regular_transactions + .best_transactions + .peek() + { + None => { + break; + } + Some((tx_hash, _)) => tx_hash.clone(), + }; + + // Get the transaction. + let tx = mempool_state_upgraded.get(&tx_hash).unwrap().clone(); + + // Calculate size. If we can't fit the transaction in the block, then we stop here. + // TODO: We can optimize this. There might be a smaller transaction that still fits. + size += tx.serialized_size(); + + if size > max_bytes { + break; + } + + // Remove the transaction from the mempool. + mempool_state_upgraded.remove(&tx_hash, EvictionReason::BlockBuilding); + + // Push the transaction to our output vector. + tx_vec.push(tx); + } + + debug!( + returned_txs = tx_vec.len(), + remaining_txs = mempool_state_upgraded + .regular_transactions + .transactions + .len(), + "Returned regular transactions from mempool" + ); + + (tx_vec, size) + } + + /// Returns a vector with accepted control transactions from the mempool. + /// + /// Returns the highest fee per byte up to max_bytes transactions and removes them from the mempool + pub fn get_control_transactions_for_block( + &self, + max_bytes: usize, + ) -> (Vec, usize) { let mut tx_vec = vec![]; let state = self.state.upgradable_read(); - if state.transactions.is_empty() { - log::debug!("Requesting txns and there are no txns in the mempool "); - return tx_vec; + if state.control_transactions.is_empty() { + log::debug!("Requesting control txns and there are no txns in the mempool "); + return (tx_vec, 0_usize); } let mut size = 0_usize; @@ -483,8 +647,12 @@ impl Mempool { let mut mempool_state_upgraded = RwLockUpgradableReadGuard::upgrade(state); loop { - // Get the hash of the highest paying transaction. - let tx_hash = match mempool_state_upgraded.best_transactions.peek() { + // Get the hash of the highest paying control transactions. + let tx_hash = match mempool_state_upgraded + .control_transactions + .best_transactions + .peek() + { None => { break; } @@ -511,11 +679,14 @@ impl Mempool { debug!( returned_txs = tx_vec.len(), - remaining_txs = mempool_state_upgraded.transactions.len(), - "Returned transactions from mempool" + remaining_txs = mempool_state_upgraded + .control_transactions + .transactions + .len(), + "Returned control transactions from mempool" ); - tx_vec + (tx_vec, size) } /// Adds a transaction to the Mempool. @@ -558,17 +729,35 @@ impl Mempool { /// Gets all transaction hashes in the mempool. pub fn get_transaction_hashes(&self) -> Vec { - self.state.read().transactions.keys().cloned().collect() + let state = self.state.read(); + + state + .regular_transactions + .transactions + .keys() + .cloned() + .chain(state.control_transactions.transactions.keys().cloned()) + .collect() } /// Returns the number of pending transactions in mempool. pub fn num_transactions(&self) -> usize { - self.state.read().transactions.len() + let state = self.state.read(); + state.regular_transactions.transactions.len() + + state.control_transactions.transactions.len() } /// Gets all transactions in the mempool. pub fn get_transactions(&self) -> Vec { - self.state.read().transactions.values().cloned().collect() + let state = self.state.read(); + + state + .regular_transactions + .transactions + .values() + .cloned() + .chain(state.control_transactions.transactions.values().cloned()) + .collect() } /// Returns the current metrics @@ -588,7 +777,10 @@ impl TransactionVerificationCache for Mempool { } } -pub(crate) struct MempoolState { +// This is a container where all mempool transactions are stored. +// It provides simple functions to insert/delete/get transactions +// And mantains internal structures to keep track of the best/worst transactions +pub(crate) struct MempoolTransactions { // A hashmap containing the transactions indexed by their hash. pub(crate) transactions: HashMap, @@ -604,21 +796,6 @@ pub(crate) struct MempoolState { // This ordering is used to evict expired transactions from the mempool. pub(crate) oldest_transactions: KeyedPriorityQueue>, - // The pending balance per sender. - pub(crate) state_by_sender: HashMap, - - // The sets of all senders of staking transactions. For simplicity, each validator/staker can - // only have one outgoing staking transaction in the mempool. This makes sure that the outgoing - // staking transaction can actually pay its fee. - pub(crate) outgoing_validators: HashMap, - pub(crate) outgoing_stakers: HashMap, - - // The sets of all recipients of creation staking transactions. For simplicity, each - // validator/staker can only have one creation staking transaction in the mempool. This makes - // sure that the creation staking transactions do not interfere with one another. - pub(crate) creating_validators: HashMap, - pub(crate) creating_stakers: HashMap, - // Maximum allowed total size (in bytes) of all transactions in the mempool. pub(crate) total_size_limit: usize, @@ -627,21 +804,62 @@ pub(crate) struct MempoolState { // Counter that increases for every added transaction, to order them for removal. pub(crate) tx_counter: u64, - - #[cfg(feature = "metrics")] - pub(crate) metrics: Arc, } -impl MempoolState { - pub fn contains(&self, hash: &Blake2bHash) -> bool { +impl MempoolTransactions { + pub fn new(size_limit: usize) -> Self { + Self { + transactions: HashMap::new(), + best_transactions: KeyedPriorityQueue::new(), + worst_transactions: KeyedPriorityQueue::new(), + oldest_transactions: KeyedPriorityQueue::new(), + total_size_limit: size_limit, + total_size: 0, + tx_counter: 0, + } + } + + pub fn contains_key(&self, hash: &Blake2bHash) -> bool { self.transactions.contains_key(hash) } + pub fn is_empty(&self) -> bool { + self.transactions.is_empty() + } + + // This function is used to remove the transactions that are no longer valid at a given block height + pub fn get_expired_txns(&mut self, block_height: u32) -> Vec { + let mut expired_txns = vec![]; + loop { + // Get the hash of the oldest transaction. + let tx_hash = match self.oldest_transactions.peek() { + None => { + break; + } + Some((tx_hash, _)) => tx_hash.clone(), + }; + + // Get a reference to the transaction. + let tx = self.get(&tx_hash).unwrap(); + + // Check if it is still valid. + if tx.is_valid_at(block_height) { + // No need to process more transactions, since we arrived to the oldest one that is valid + break; + } else { + // Remove the transaction from the oldest struct to continue collecting expired txns. + self.oldest_transactions.remove(&tx_hash); + expired_txns.push(tx_hash); + } + } + expired_txns + } + pub fn get(&self, hash: &Blake2bHash) -> Option<&Transaction> { self.transactions.get(hash) } - pub(crate) fn put(&mut self, tx: &Transaction) -> bool { + pub(crate) fn insert(&mut self, tx: &Transaction) -> bool { let tx_hash = tx.hash(); if self.transactions.contains_key(&tx_hash) { @@ -664,11 +882,90 @@ impl MempoolState { insertion_order: self.tx_counter, }, ); + self.tx_counter += 1; self.oldest_transactions - .push(tx_hash.clone(), Reverse(tx.validity_start_height)); + .push(tx_hash, Reverse(tx.validity_start_height)); + + // Update total tx size + self.total_size += tx.serialized_size(); + + true + } + + pub(crate) fn delete(&mut self, tx_hash: &Blake2bHash) -> Option { + let tx = self.transactions.remove(tx_hash)?; + + self.best_transactions.remove(tx_hash); + self.worst_transactions.remove(tx_hash); + self.oldest_transactions.remove(tx_hash); + + self.total_size -= tx.serialized_size(); + + Some(tx) + } +} + +pub(crate) struct MempoolState { + // Container where the regular transactions are stored + pub(crate) regular_transactions: MempoolTransactions, + + // Container where the control transactions are stored + pub(crate) control_transactions: MempoolTransactions, + + // The pending balance per sender. + pub(crate) state_by_sender: HashMap, + // The sets of all senders of staking transactions. For simplicity, each validator/staker can + // only have one outgoing staking transaction in the mempool. This makes sure that the outgoing + // staking transaction can actually pay its fee. + pub(crate) outgoing_validators: HashMap, + pub(crate) outgoing_stakers: HashMap, + + // The sets of all recipients of creation staking transactions. For simplicity, each + // validator/staker can only have one creation staking transaction in the mempool. This makes + // sure that the creation staking transactions do not interfere with one another. + pub(crate) creating_validators: HashMap, + pub(crate) creating_stakers: HashMap, + + #[cfg(feature = "metrics")] + pub(crate) metrics: Arc, +} + +impl MempoolState { + pub fn contains(&self, hash: &Blake2bHash) -> bool { + self.regular_transactions.contains_key(hash) || self.control_transactions.contains_key(hash) + } + + pub fn get(&self, hash: &Blake2bHash) -> Option<&Transaction> { + if let Some(transaction) = self.regular_transactions.get(hash) { + Some(transaction) + } else if let Some(transaction) = self.control_transactions.get(hash) { + Some(transaction) + } else { + None + } + } + + pub(crate) fn put(&mut self, tx: &Transaction) -> bool { + let tx_hash = tx.hash(); + + if self.regular_transactions.contains_key(&tx_hash) + || self.control_transactions.contains_key(&tx_hash) + { + return false; + } + + // If we are adding a stacking transaction we insert it into the control txns container + // Staking txns are control txns + if tx.sender_type == AccountType::Staking || tx.recipient_type == AccountType::Staking { + self.control_transactions.insert(tx); + } else { + self.regular_transactions.insert(tx); + } + + // Update the per sender state match self.state_by_sender.get_mut(&tx.sender) { None => { let mut txns = HashSet::new(); @@ -738,11 +1035,14 @@ impl MempoolState { _ => {} } } + // After inserting the new txn, check if we need to remove txns + while self.regular_transactions.total_size > self.regular_transactions.total_size_limit { + let (tx_hash, _) = self.regular_transactions.worst_transactions.pop().unwrap(); + self.remove(&tx_hash, EvictionReason::TooFull); + } - // Update total tx size and remove the cheapest ones if we have too many txs. - self.total_size += tx.serialized_size(); - while self.total_size > self.total_size_limit { - let (tx_hash, _) = self.worst_transactions.pop().unwrap(); + while self.control_transactions.total_size > self.control_transactions.total_size_limit { + let (tx_hash, _) = self.control_transactions.worst_transactions.pop().unwrap(); self.remove(&tx_hash, EvictionReason::TooFull); } @@ -754,39 +1054,52 @@ impl MempoolState { tx_hash: &Blake2bHash, reason: EvictionReason, ) -> Option { - let tx = self.transactions.remove(tx_hash)?; + if let Some(tx) = self.regular_transactions.delete(tx_hash) { + let sender_state = self.state_by_sender.get_mut(&tx.sender).unwrap(); - self.best_transactions.remove(tx_hash); - self.worst_transactions.remove(tx_hash); - self.oldest_transactions.remove(tx_hash); + sender_state.total -= tx.total_value(); + sender_state.txns.remove(tx_hash); - let sender_state = self.state_by_sender.get_mut(&tx.sender).unwrap(); + if sender_state.txns.is_empty() { + self.state_by_sender.remove(&tx.sender); + } - sender_state.total -= tx.total_value(); - sender_state.txns.remove(tx_hash); + self.remove_from_staking_state(&tx); - if sender_state.txns.is_empty() { - self.state_by_sender.remove(&tx.sender); - } + #[cfg(feature = "metrics")] + self.metrics.note_evicted(reason); - self.remove_from_staking_state(&tx); + Some(tx) + } else if let Some(tx) = self.control_transactions.delete(tx_hash) { + let sender_state = self.state_by_sender.get_mut(&tx.sender).unwrap(); - self.total_size -= tx.serialized_size(); + sender_state.total -= tx.total_value(); + sender_state.txns.remove(tx_hash); - #[cfg(feature = "metrics")] - self.metrics.note_evicted(reason); + if sender_state.txns.is_empty() { + self.state_by_sender.remove(&tx.sender); + } - Some(tx) + self.remove_from_staking_state(&tx); + + #[cfg(feature = "metrics")] + self.metrics.note_evicted(reason); + + Some(tx) + } else { + None + } } // Removes all the transactions sent by some specific address pub(crate) fn remove_sender_txns(&mut self, sender_address: &Address) { if let Some(sender_state) = &self.state_by_sender.remove(sender_address) { for tx_hash in &sender_state.txns { - self.best_transactions.remove(tx_hash); - self.worst_transactions.remove(tx_hash); - self.oldest_transactions.remove(tx_hash); - if let Some(tx) = self.transactions.remove(tx_hash) { + if let Some(tx) = self.regular_transactions.delete(tx_hash) { + self.remove_from_staking_state(&tx); + } + + if let Some(tx) = self.control_transactions.delete(tx_hash) { self.remove_from_staking_state(&tx); } } @@ -838,6 +1151,7 @@ impl MempoolState { } } +#[derive(Clone)] pub(crate) enum EvictionReason { BlockBuilding, Expired, diff --git a/mempool/tests/mod.rs b/mempool/tests/mod.rs index 576214f85b..13957dcccd 100644 --- a/mempool/tests/mod.rs +++ b/mempool/tests/mod.rs @@ -59,7 +59,8 @@ async fn send_get_mempool_txns( send_txn_to_mempool(&mempool, mock_network, mock_id, transactions).await; // Get the transactions from the mempool - mempool.get_transactions_for_block(txn_len) + let (mempool_txns, _) = mempool.get_transactions_for_block(txn_len); + mempool_txns } async fn send_txn_to_mempool( @@ -96,6 +97,40 @@ async fn send_txn_to_mempool( mempool.stop_executor_without_unsubscribe().await; } +async fn send_control_txn_to_mempool( + mempool: &Mempool, + mock_network: Arc, + mock_id: MockId, + transactions: Vec, +) { + // Create a MPSC channel to directly send transactions to the mempool + let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); + + // Subscribe mempool with the mpsc stream created + mempool + .start_control_executor_with_txn_stream::( + Box::pin(ReceiverStream::new(txn_stream_rx)), + mock_network, + ) + .await; + + // Send the transactions + tokio::task::spawn(async move { + for txn in transactions { + txn_stream_tx + .send((txn.clone(), mock_id.clone())) + .await + .unwrap(); + } + }) + .await + .expect("Send failed"); + + let timeout = tokio::time::Duration::from_secs(1); + tokio::time::sleep(timeout).await; + mempool.stop_control_executor_without_unsubscribe().await; +} + async fn multiple_start_stop_send( blockchain: Arc>, transactions: Vec, @@ -137,7 +172,7 @@ async fn multiple_start_stop_send( mempool.stop_executor_without_unsubscribe().await; // Get the transactions from the mempool - let obtained_txns = mempool.get_transactions_for_block(usize::MAX); + let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); // We should obtain the same amount of transactions assert_eq!(obtained_txns.len(), NUM_TXNS_START_STOP); @@ -162,7 +197,7 @@ async fn multiple_start_stop_send( mempool.stop_executor_without_unsubscribe().await; // We should not obtain any, since the executor should not be running. - let obtained_txns = mempool.get_transactions_for_block(usize::MAX); + let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); // We should obtain 0 transactions assert_eq!(obtained_txns.len(), 0_usize); @@ -203,7 +238,7 @@ async fn multiple_start_stop_send( mempool.stop_executor_without_unsubscribe().await; // Get the transactions from the mempool - let obtained_txns = mempool.get_transactions_for_block(usize::MAX); + let (obtained_txns, _) = mempool.get_transactions_for_block(usize::MAX); // We should obtain same number of txns assert_eq!(obtained_txns.len(), NUM_TXNS_START_STOP); @@ -934,7 +969,7 @@ async fn mempool_update() { mempool.mempool_update(&adopted_micro_blocks[..], &reverted_micro_blocks[..]); // Get txns from mempool - let updated_txns = mempool.get_transactions_for_block(10_000); + let (updated_txns, _) = mempool.get_transactions_for_block(10_000); // Expect at least the original 30 transactions plus or minus: // - minus 5 from the adopted blocks since 5/10 transactions were in the mempool and they need to be dropped. @@ -1077,7 +1112,7 @@ async fn mempool_update_aged_transaction() { ); // Get txns from mempool - let updated_txns = mempool.get_transactions_for_block(10_000); + let (updated_txns, _) = mempool.get_transactions_for_block(10_000); // Should obtain 0 txns, as they are no longer valid due to aging assert_eq!( @@ -1212,7 +1247,7 @@ async fn mempool_update_not_enough_balance() { mempool.mempool_update(&adopted_micro_blocks[..], &[].to_vec()); // Get txns from mempool - let updated_txns = mempool.get_transactions_for_block(10_000); + let (updated_txns, _) = mempool.get_transactions_for_block(10_000); // Expect only 20 transations because in the adopted blocks we included 10 txns that would cause the senders // to not have enough balance to pay for all txns already in the mempool @@ -1369,7 +1404,7 @@ async fn mempool_update_pruned_account() { mempool.mempool_update(&adopted_micro_blocks[..], &[].to_vec()); // Get txns from mempool - let updated_txns = mempool.get_transactions_for_block(10_000); + let (updated_txns, _) = mempool.get_transactions_for_block(10_000); assert_eq!( updated_txns.len(), @@ -1454,7 +1489,7 @@ async fn mempool_update_create_staker_twice() { let mock_network = Arc::new(hub.new_network()); // Send txns to mempool - send_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; + send_control_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; assert_eq!( mempool.num_transactions(), @@ -1488,7 +1523,7 @@ async fn mempool_update_create_staker_twice() { mempool.mempool_update(&adopted_micro_blocks[..], &[].to_vec()); // Get txns from mempool - let updated_txns = mempool.get_transactions_for_block(10_000); + let (updated_txns, _) = mempool.get_control_transactions_for_block(10_000); assert_eq!( updated_txns.len(), @@ -1515,6 +1550,203 @@ async fn mempool_update_create_staker_twice() { ); } +#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +async fn mempool_basic_control_tx() { + let time = Arc::new(OffsetTime::new()); + let env = VolatileEnvironment::new(10).unwrap(); + + let key_pair = ed25519_key_pair(ACCOUNT_SECRET_KEY); + let address = Address::from_any_str(STAKER_ADDRESS).unwrap(); + + // This is the transaction produced in the block + let tx = TransactionBuilder::new_create_staker( + &key_pair, + &key_pair, + Some(address.clone()), + 100_000_000.try_into().unwrap(), + 100.try_into().unwrap(), + 1, + NetworkId::UnitAlbatross, + ) + .unwrap(); + + let txns = vec![tx]; + + let blockchain = Arc::new(RwLock::new( + Blockchain::new(env, NetworkId::UnitAlbatross, time).unwrap(), + )); + + // Create mempool and subscribe with a custom txn stream. + let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); + let mut hub = MockHub::new(); + let mock_id = MockId::new(hub.new_address().into()); + let mock_network = Arc::new(hub.new_network()); + + // Send txns to mempool + send_control_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; + + assert_eq!( + mempool.num_transactions(), + 1, + "Number of txns in mempool is not what is expected" + ); + + // Get regular txns from mempool + let (updated_txns, _) = mempool.get_transactions_for_block(10_000); + + //We should obtain 0 regular txns since we only have 1 control tx in the mempool + assert_eq!( + updated_txns.len(), + 0, + "Number of txns is not what is expected" + ); + + // Get control txns from mempool + let (updated_txns, _) = mempool.get_control_transactions_for_block(10_000); + + //Now we should obtain one control transaction + assert_eq!( + updated_txns.len(), + 1, + "Number of txns is not what is expected" + ); + + //Now the mempool should have 0 total txns + assert_eq!( + mempool.num_transactions(), + 0, + "Number of txns in mempool is not what is expected" + ); +} + +#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] +async fn mempool_regular_and_control_tx() { + let mut rng = StdRng::seed_from_u64(0); + let balance = 100_000_000; + let num_txns = 4; + let mut mempool_transactions = vec![]; + let sender_balances = vec![balance + num_txns * 3; 1]; + let recipient_balances = vec![0; num_txns as usize]; + let mut genesis_builder = GenesisBuilder::default(); + + // Generate recipient accounts + let recipient_accounts = generate_accounts(recipient_balances, &mut genesis_builder, false); + // Generate sender accounts + let sender_accounts = generate_accounts(sender_balances, &mut genesis_builder, true); + + // Generate transactions + for i in 0..num_txns { + let mempool_transaction = TestTransaction { + fee: (i + 1) as u64, + value: 1, + recipient: recipient_accounts[i as usize].clone(), + sender: sender_accounts[0].clone(), + }; + mempool_transactions.push(mempool_transaction); + } + let (txns, _) = generate_transactions(mempool_transactions, true); + log::debug!("Done generating transactions and accounts"); + + let time = Arc::new(OffsetTime::new()); + let env = VolatileEnvironment::new(10).unwrap(); + + // Add a validator to genesis + genesis_builder.with_genesis_validator( + Address::from(&SchnorrKeyPair::generate(&mut rng)), + SchnorrPublicKey::from([0u8; 32]), + BlsKeyPair::generate(&mut rng).public_key, + Address::default(), + ); + + let genesis_info = genesis_builder.generate(env.clone()).unwrap(); + + let blockchain = Arc::new(RwLock::new( + Blockchain::with_genesis( + env.clone(), + time, + NetworkId::UnitAlbatross, + genesis_info.block, + genesis_info.accounts, + ) + .unwrap(), + )); + + // Create mempool and subscribe with a custom txn stream. + let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); + let mut hub = MockHub::new(); + let mock_id = MockId::new(hub.new_address().into()); + let mock_network = Arc::new(hub.new_network()); + + // This is the transaction produced in the block + let control_tx = TransactionBuilder::new_create_staker( + &sender_accounts[0].keypair, + &sender_accounts[0].keypair, + None, + 100.try_into().unwrap(), + 1.try_into().unwrap(), + 1, + NetworkId::UnitAlbatross, + ) + .unwrap(); + + let control_txns = vec![control_tx]; + + // Send txns to mempool + send_control_txn_to_mempool( + &mempool, + mock_network.clone(), + mock_id.clone(), + control_txns, + ) + .await; + + assert_eq!( + mempool.num_transactions(), + 1, + "Number of txns in mempool is not what is expected" + ); + + // Get regular txns from mempool + let (updated_txns, _) = mempool.get_transactions_for_block(10_000); + + //We should obtain 0 regular txns since we only have control txns in the mempool + assert_eq!( + updated_txns.len(), + 0, + "Number of txns is not what is expected" + ); + + //Send regular txns to mempool + send_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; + + // Get control txns from mempool + let (updated_txns, _) = mempool.get_control_transactions_for_block(10_000); + + //Now we should obtain one control transaction + assert_eq!( + updated_txns.len(), + 1, + "Number of txns is not what is expected" + ); + + // Get regular txns from mempool + let (updated_txns, _) = mempool.get_transactions_for_block(10_000); + + //Now we should obtain all regular txns + assert_eq!( + updated_txns.len(), + num_txns as usize, + "Number of txns is not what is expected" + ); + + //Now the mempool should have 0 total txns + assert_eq!( + mempool.num_transactions(), + 0, + "Number of txns in mempool is not what is expected" + ); +} + #[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] async fn mempool_update_create_staker_non_existant_delegation_addr() { let mut rng = StdRng::seed_from_u64(0); @@ -1574,7 +1806,7 @@ async fn mempool_update_create_staker_non_existant_delegation_addr() { let mock_network = Arc::new(hub.new_network()); // Send txns to mempool - send_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; + send_control_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; // The transaction should be rejected by the mempool, since the delegation address does not exist assert_eq!( @@ -1646,7 +1878,7 @@ async fn applies_total_tx_size_limits() { mempool.add_transaction(tx).await.unwrap(); } - let mempool_txns = mempool.get_transactions_for_block(txns_len); + let (mempool_txns, _) = mempool.get_transactions_for_block(txns_len); // We expect that the tx with the lowest fee did not stay in the mempool for tx in &mempool_txns { diff --git a/validator/src/micro.rs b/validator/src/micro.rs index dcadeeb1fd..c8829d0db6 100644 --- a/validator/src/micro.rs +++ b/validator/src/micro.rs @@ -240,9 +240,20 @@ impl NextProduceMicroBlockEvent @@ -211,6 +213,8 @@ impl mempool_state, #[cfg(feature = "metrics")] mempool_monitor: TaskMonitor::new(), + #[cfg(feature = "metrics")] + control_mempool_monitor: TaskMonitor::new(), }; this.init(); @@ -231,6 +235,11 @@ impl self.mempool_monitor.clone() } + #[cfg(feature = "metrics")] + pub fn get_control_mempool_monitor(&self) -> TaskMonitor { + self.control_mempool_monitor.clone() + } + fn init(&mut self) { self.init_epoch(); self.init_block_producer(None); @@ -684,14 +693,21 @@ impl Future #[cfg(not(feature = "metrics"))] tokio::spawn({ async move { - mempool.start_executor(network, None).await; + mempool.start_executors(network, None, None).await; } }); #[cfg(feature = "metrics")] tokio::spawn({ let mempool_monitor = self.mempool_monitor.clone(); + let ctrl_mempool_monitor = self.control_mempool_monitor.clone(); async move { - mempool.start_executor(network, Some(mempool_monitor)).await; + mempool + .start_executors( + network, + Some(mempool_monitor), + Some(ctrl_mempool_monitor), + ) + .await; } }); self.mempool_state = MempoolState::Active; @@ -702,7 +718,7 @@ impl Future let mempool = Arc::clone(&self.mempool); let network = Arc::clone(&self.consensus.network); tokio::spawn(async move { - mempool.stop_executor(network).await; + mempool.stop_executors(network).await; }); self.mempool_state = MempoolState::Inactive; }