diff --git a/mempool/src/executor.rs b/mempool/src/executor.rs index 2f9efb64f9..4030f94d45 100644 --- a/mempool/src/executor.rs +++ b/mempool/src/executor.rs @@ -15,6 +15,7 @@ use nimiq_transaction::Transaction; use crate::filter::MempoolFilter; use crate::mempool_state::MempoolState; +use crate::mempool_transactions::TxPriority; use crate::verify::{verify_tx, VerifyErr}; const CONCURRENT_VERIF_TASKS: u32 = 1000; @@ -98,7 +99,8 @@ impl Future for MempoolExecutor { match verify_tx_ret { Ok(mempool_state_lock) => { - RwLockUpgradableReadGuard::upgrade(mempool_state_lock).put(&tx); + RwLockUpgradableReadGuard::upgrade(mempool_state_lock) + .put(&tx, TxPriority::MediumPriority); MsgAcceptance::Accept } // Reject the message if signature verification fails or transaction is invalid diff --git a/mempool/src/lib.rs b/mempool/src/lib.rs index ad2c3c834f..0bd27cc77b 100644 --- a/mempool/src/lib.rs +++ b/mempool/src/lib.rs @@ -7,10 +7,6 @@ //! transactions that should be included in a block. #[macro_use] extern crate log; - -/// Mempool transaction module -mod mempool_transactions; - /// Mempool state module mod mempool_state; @@ -18,6 +14,7 @@ mod mempool_state; pub mod config; /// Mempool executor module pub mod executor; + /// Mempool filter module pub mod filter; /// Main mempool module @@ -25,5 +22,7 @@ pub mod mempool; /// Mempool metrics #[cfg(feature = "metrics")] mod mempool_metrics; +/// Mempool transaction module +pub mod mempool_transactions; /// Verify transaction module pub mod verify; diff --git a/mempool/src/mempool.rs b/mempool/src/mempool.rs index 6a72dc4056..588c98827c 100644 --- a/mempool/src/mempool.rs +++ b/mempool/src/mempool.rs @@ -25,6 +25,7 @@ use crate::filter::{MempoolFilter, MempoolRules}; #[cfg(feature = "metrics")] use crate::mempool_metrics::MempoolMetrics; use crate::mempool_state::{EvictionReason, MempoolState}; +use crate::mempool_transactions::TxPriority; use crate::verify::{verify_tx, VerifyErr}; /// Transaction topic for the Mempool to request transactions from the network @@ -46,7 +47,7 @@ pub struct ControlTransactionTopic; impl Topic for ControlTransactionTopic { type Item = Transaction; - const BUFFER_SIZE: usize = 100; + const BUFFER_SIZE: usize = 1024; const NAME: &'static str = "Controltransactions"; const VALIDATE: bool = true; } @@ -520,7 +521,8 @@ impl Mempool { let pending_balance = tx.total_value() + sender_total; if pending_balance <= sender_balance { - mempool_state.put(tx); + //TODO: This could be improved by re-adding unpark txns with high priority + mempool_state.put(tx, TxPriority::MediumPriority); } else { debug!( block_number = block.block_number(), @@ -658,7 +660,11 @@ impl Mempool { } /// Adds a transaction to the Mempool. - pub async fn add_transaction(&self, transaction: Transaction) -> Result<(), VerifyErr> { + pub async fn add_transaction( + &self, + transaction: Transaction, + tx_priority: Option, + ) -> Result<(), VerifyErr> { let blockchain = Arc::clone(&self.blockchain); let mempool_state = Arc::clone(&self.state); let filter = Arc::clone(&self.filter); @@ -668,7 +674,11 @@ impl Mempool { match verify_tx_ret { Ok(mempool_state_lock) => { - RwLockUpgradableReadGuard::upgrade(mempool_state_lock).put(&transaction); + RwLockUpgradableReadGuard::upgrade(mempool_state_lock).put( + &transaction, + tx_priority.unwrap_or(TxPriority::MediumPriority), + ); + Ok(()) } Err(e) => Err(e), diff --git a/mempool/src/mempool_state.rs b/mempool/src/mempool_state.rs index 841ab825cd..c716259c7b 100644 --- a/mempool/src/mempool_state.rs +++ b/mempool/src/mempool_state.rs @@ -5,7 +5,7 @@ use std::{ #[cfg(feature = "metrics")] use crate::mempool_metrics::MempoolMetrics; -use crate::mempool_transactions::MempoolTransactions; +use crate::mempool_transactions::{MempoolTransactions, TxPriority}; use nimiq_hash::{Blake2bHash, Hash}; use nimiq_keys::Address; use nimiq_primitives::{account::AccountType, coin::Coin}; @@ -69,7 +69,7 @@ impl MempoolState { } } - pub(crate) fn put(&mut self, tx: &Transaction) -> bool { + pub(crate) fn put(&mut self, tx: &Transaction, priority: TxPriority) -> bool { let tx_hash = tx.hash(); if self.regular_transactions.contains_key(&tx_hash) @@ -81,9 +81,9 @@ impl MempoolState { // If we are adding a stacking transaction we insert it into the control txns container // Staking txns are control txns if tx.sender_type == AccountType::Staking || tx.recipient_type == AccountType::Staking { - self.control_transactions.insert(tx); + self.control_transactions.insert(tx, priority); } else { - self.regular_transactions.insert(tx); + self.regular_transactions.insert(tx, priority); } // Update the per sender state diff --git a/mempool/src/mempool_transactions.rs b/mempool/src/mempool_transactions.rs index 8ffab87b38..4618f585e3 100644 --- a/mempool/src/mempool_transactions.rs +++ b/mempool/src/mempool_transactions.rs @@ -9,6 +9,18 @@ use keyed_priority_queue::KeyedPriorityQueue; use nimiq_hash::{Blake2bHash, Hash}; use nimiq_transaction::Transaction; +/// TxPriority that is used when adding transactions into the mempool +/// Higher Priority transactions are returned first from the mempool +#[derive(Copy, Clone, PartialEq)] +pub enum TxPriority { + /// Low Priority transactions + LowPriority = 1, + /// Medium Priority transactions, this is thed default + MediumPriority = 2, + /// High Priority transactions, + HighPriority = 3, +} + /// Ordering in which transactions removed from the mempool to be included in blocks. /// This is stored on a max-heap, so the greater transaction comes first. /// Compares by fee per byte (higher first), then by insertion order (lower i.e. older first). @@ -17,6 +29,7 @@ use nimiq_transaction::Transaction; // we might prefer basic transactions over staking contract transactions, etc, etc. #[derive(PartialEq)] pub struct BestTxOrder { + priority: TxPriority, fee_per_byte: f64, insertion_order: u64, } @@ -31,9 +44,14 @@ impl PartialOrd for BestTxOrder { impl Ord for BestTxOrder { fn cmp(&self, other: &Self) -> Ordering { - self.fee_per_byte - .partial_cmp(&other.fee_per_byte) - .expect("fees can't be NaN") + (self.priority as u8) + .partial_cmp(&(other.priority as u8)) + .expect("TX Priority is required") + .then( + self.fee_per_byte + .partial_cmp(&other.fee_per_byte) + .expect("fees can't be NaN"), + ) .then(self.insertion_order.cmp(&other.insertion_order).reverse()) } } @@ -43,6 +61,7 @@ impl Ord for BestTxOrder { /// Compares by fee per byte (lower first), then by insertion order (higher i.e. newer first). #[derive(PartialEq)] pub struct WorstTxOrder { + priority: TxPriority, fee_per_byte: f64, insertion_order: u64, } @@ -57,11 +76,17 @@ impl PartialOrd for WorstTxOrder { impl Ord for WorstTxOrder { fn cmp(&self, other: &Self) -> Ordering { - self.fee_per_byte - .partial_cmp(&other.fee_per_byte) - .expect("fees can't be NaN") + (self.priority as u8) + .partial_cmp(&(other.priority as u8)) + .expect("TX Priority is required") .reverse() - .then(self.insertion_order.cmp(&other.insertion_order)) + .then( + self.fee_per_byte + .partial_cmp(&other.fee_per_byte) + .expect("fees can't be NaN") + .reverse() + .then(self.insertion_order.cmp(&other.insertion_order)), + ) } } @@ -147,7 +172,7 @@ impl MempoolTransactions { self.transactions.get(hash) } - pub(crate) fn insert(&mut self, tx: &Transaction) -> bool { + pub(crate) fn insert(&mut self, tx: &Transaction, priority: TxPriority) -> bool { let tx_hash = tx.hash(); if self.transactions.contains_key(&tx_hash) { @@ -159,6 +184,7 @@ impl MempoolTransactions { self.best_transactions.push( tx_hash.clone(), BestTxOrder { + priority, fee_per_byte: tx.fee_per_byte(), insertion_order: self.tx_counter, }, @@ -166,6 +192,7 @@ impl MempoolTransactions { self.worst_transactions.push( tx_hash.clone(), WorstTxOrder { + priority, fee_per_byte: tx.fee_per_byte(), insertion_order: self.tx_counter, }, diff --git a/mempool/tests/mod.rs b/mempool/tests/mod.rs index 13957dcccd..06e8f1c478 100644 --- a/mempool/tests/mod.rs +++ b/mempool/tests/mod.rs @@ -18,8 +18,8 @@ use nimiq_keys::{ Address, KeyPair as SchnorrKeyPair, PrivateKey as SchnorrPrivateKey, PublicKey as SchnorrPublicKey, SecureGenerate, }; -use nimiq_mempool::config::MempoolConfig; use nimiq_mempool::mempool::Mempool; +use nimiq_mempool::{config::MempoolConfig, mempool_transactions::TxPriority}; use nimiq_network_mock::{MockHub, MockId, MockNetwork, MockPeerId}; use nimiq_primitives::{networks::NetworkId, policy}; use nimiq_test_log::test; @@ -37,7 +37,11 @@ const NUM_TXNS_START_STOP: usize = 100; pub const ACCOUNT_SECRET_KEY: &str = "6c9320ac201caf1f8eaa5b05f5d67a9e77826f3f6be266a0ecccc20416dc6587"; +pub const VALIDATOR_SECRET_KEY: &str = + "041580cc67e66e9e08b68fd9e4c9deb68737168fbe7488de2638c2e906c2f5ad"; + const STAKER_ADDRESS: &str = "NQ20TSB0DFSMUH9C15GQGAGJTTE4D3MA859E"; +const VALIDATOR_ADDRESS: &str = "NQ20 TSB0 DFSM UH9C 15GQ GAGJ TTE4 D3MA 859E"; fn ed25519_key_pair(secret_key: &str) -> SchnorrKeyPair { let priv_key: SchnorrPrivateKey = @@ -1551,12 +1555,24 @@ async fn mempool_update_create_staker_twice() { } #[test(tokio::test(flavor = "multi_thread", worker_threads = 10))] -async fn mempool_basic_control_tx() { +async fn mempool_basic_prioritization_control_tx() { let time = Arc::new(OffsetTime::new()); let env = VolatileEnvironment::new(10).unwrap(); let key_pair = ed25519_key_pair(ACCOUNT_SECRET_KEY); + let validator_signing_key = ed25519_key_pair(VALIDATOR_SECRET_KEY); let address = Address::from_any_str(STAKER_ADDRESS).unwrap(); + let validator_address = Address::from_any_str(VALIDATOR_ADDRESS).unwrap(); + + let unpark = TransactionBuilder::new_unpark_validator( + &key_pair, + validator_address, + &validator_signing_key, + 1.try_into().unwrap(), + 1, + NetworkId::UnitAlbatross, + ) + .unwrap(); // This is the transaction produced in the block let tx = TransactionBuilder::new_create_staker( @@ -1583,7 +1599,7 @@ async fn mempool_basic_control_tx() { let mock_network = Arc::new(hub.new_network()); // Send txns to mempool - send_control_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; + send_control_txn_to_mempool(&mempool, mock_network, mock_id, txns.clone()).await; assert_eq!( mempool.num_transactions(), @@ -1591,27 +1607,26 @@ async fn mempool_basic_control_tx() { "Number of txns in mempool is not what is expected" ); - // Get regular txns from mempool - let (updated_txns, _) = mempool.get_transactions_for_block(10_000); - - //We should obtain 0 regular txns since we only have 1 control tx in the mempool - assert_eq!( - updated_txns.len(), - 0, - "Number of txns is not what is expected" - ); + // Insert unpark with high priority + mempool + .add_transaction(unpark.clone(), Some(TxPriority::HighPriority)) + .await + .unwrap(); // Get control txns from mempool let (updated_txns, _) = mempool.get_control_transactions_for_block(10_000); - //Now we should obtain one control transaction + // Now we should obtain one control transaction assert_eq!( updated_txns.len(), - 1, + 2, "Number of txns is not what is expected" ); - //Now the mempool should have 0 total txns + // We should obtain the txns in the reversed ordered as the unpark should have been prioritized. + assert_eq!(updated_txns[0], unpark); + + // Now the mempool should have 0 total txns assert_eq!( mempool.num_transactions(), 0, @@ -1875,7 +1890,7 @@ async fn applies_total_tx_size_limits() { let worst_tx = txns[1].hash::(); for tx in txns { - mempool.add_transaction(tx).await.unwrap(); + mempool.add_transaction(tx, None).await.unwrap(); } let (mempool_txns, _) = mempool.get_transactions_for_block(txns_len); diff --git a/rpc-interface/src/mempool.rs b/rpc-interface/src/mempool.rs index 889aa20cc8..9036316fc2 100644 --- a/rpc-interface/src/mempool.rs +++ b/rpc-interface/src/mempool.rs @@ -10,6 +10,11 @@ pub trait MempoolInterface { async fn push_transaction(&mut self, raw_tx: String) -> Result; + async fn push_high_priority_transaction( + &mut self, + raw_tx: String, + ) -> Result; + async fn mempool_content( &mut self, include_transactions: bool, diff --git a/rpc-server/src/dispatchers/mempool.rs b/rpc-server/src/dispatchers/mempool.rs index be4ba70993..8fa8ba8331 100644 --- a/rpc-server/src/dispatchers/mempool.rs +++ b/rpc-server/src/dispatchers/mempool.rs @@ -6,6 +6,7 @@ use beserial::Deserialize; use nimiq_hash::{Blake2bHash, Hash}; use nimiq_mempool::mempool::Mempool; +use nimiq_mempool::mempool_transactions::TxPriority; use nimiq_rpc_interface::mempool::MempoolInterface; use nimiq_rpc_interface::types::{HashOrTx, MempoolInfo}; @@ -33,7 +34,26 @@ impl MempoolInterface for MempoolDispatcher { Deserialize::deserialize_from_vec(&hex::decode(&raw_tx)?)?; let txid = tx.hash::(); - match self.mempool.add_transaction(tx).await { + match self.mempool.add_transaction(tx, None).await { + Ok(_) => Ok(txid), + Err(e) => Err(Error::MempoolError(e)), + } + } + + /// Pushes the given serialized transaction to the local mempool with high priority + async fn push_high_priority_transaction( + &mut self, + raw_tx: String, + ) -> Result { + let tx: nimiq_transaction::Transaction = + Deserialize::deserialize_from_vec(&hex::decode(&raw_tx)?)?; + let txid = tx.hash::(); + + match self + .mempool + .add_transaction(tx, Some(TxPriority::HighPriority)) + .await + { Ok(_) => Ok(txid), Err(e) => Err(Error::MempoolError(e)), } diff --git a/spammer/src/main.rs b/spammer/src/main.rs index 2b350a36e1..5e9fbd0b5d 100644 --- a/spammer/src/main.rs +++ b/spammer/src/main.rs @@ -424,7 +424,7 @@ async fn spam( let consensus1 = consensus.clone(); let mp = std::sync::Arc::clone(&mempool); tokio::spawn(async move { - if let Err(e) = mp.add_transaction(tx.clone()).await { + if let Err(e) = mp.add_transaction(tx.clone(), None).await { log::warn!("Mempool rejected transaction: {:?} - {:#?}", e, tx); } if let Err(e) = consensus1.send_transaction(tx).await { diff --git a/validator/src/validator.rs b/validator/src/validator.rs index 9089efaae3..b1b5ca811f 100644 --- a/validator/src/validator.rs +++ b/validator/src/validator.rs @@ -22,7 +22,7 @@ use nimiq_database::{Database, Environment, ReadTransaction, WriteTransaction}; use nimiq_hash::{Blake2bHash, Hash}; use nimiq_keys::{Address, KeyPair as SchnorrKeyPair}; use nimiq_macros::store_waker; -use nimiq_mempool::{config::MempoolConfig, mempool::Mempool}; +use nimiq_mempool::{config::MempoolConfig, mempool::Mempool, mempool_transactions::TxPriority}; use nimiq_network_interface::network::{Network, PubsubId, Topic}; use nimiq_primitives::coin::Coin; use nimiq_primitives::policy; @@ -629,11 +629,15 @@ impl .unwrap(); // TODO: Handle transaction creation error let tx_hash = unpark_transaction.hash(); - let cn = self.consensus.clone(); + let mempool = self.mempool.clone(); tokio::spawn(async move { - debug!("Sending unpark transaction"); - if cn.send_transaction(unpark_transaction).await.is_err() { - error!("Failed to send unpark transaction"); + debug!("Adding unpark transaction to mempool"); + if mempool + .add_transaction(unpark_transaction, Some(TxPriority::HighPriority)) + .await + .is_err() + { + error!("Failed adding unpark transaction into mempool"); } });