Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Watch transactions pool #10558

Merged
merged 11 commits into from
Apr 12, 2019
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ethcore/light/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ memory-db = "0.11.0"
trie-db = "0.11.0"
patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" }
ethcore-network = { path = "../../util/network" }
ethcore-miner = { path = "../../miner" }
ethcore-io = { path = "../../util/io" }
hash-db = "0.11.0"
heapsize = "0.4"
Expand Down
1 change: 1 addition & 0 deletions ethcore/light/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ extern crate ethcore_io as io;
extern crate ethcore_network as network;
extern crate parity_bytes as bytes;
extern crate ethereum_types;
extern crate ethcore_miner as miner;
extern crate ethcore;
extern crate hash_db;
extern crate heapsize;
Expand Down
21 changes: 19 additions & 2 deletions ethcore/light/src/transaction_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
//! address-wise manner.

use std::fmt;
use std::sync::Arc;
use std::collections::{BTreeMap, HashMap};
use std::collections::hash_map::Entry;

use common_types::transaction::{self, Condition, PendingTransaction, SignedTransaction};
use ethereum_types::{H256, U256, Address};
use fastmap::H256FastMap;
use futures::sync::mpsc;
use miner::pool::TxStatus;

// Knowledge of an account's current nonce.
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -134,6 +137,7 @@ pub struct TransactionQueue {
by_account: HashMap<Address, AccountTransactions>,
by_hash: H256FastMap<PendingTransaction>,
listeners: Vec<Listener>,
tx_statuses_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
}

impl fmt::Debug for TransactionQueue {
Expand Down Expand Up @@ -231,7 +235,7 @@ impl TransactionQueue {
};

self.by_hash.insert(hash, tx);
self.notify(&promoted);
self.notify(&promoted, TxStatus::Added);
Ok(res)
}

Expand Down Expand Up @@ -343,6 +347,8 @@ impl TransactionQueue {
trace!(target: "txqueue", "Culled {} old transactions from sender {} (nonce={})",
removed_hashes.len(), address, cur_nonce);

self.notify(&removed_hashes, TxStatus::Culled);

for hash in removed_hashes {
self.by_hash.remove(&hash);
}
Expand All @@ -358,11 +364,22 @@ impl TransactionQueue {
self.listeners.push(f);
}

/// Add a transaction queue listener.
pub fn tx_statuses_receiver(&mut self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
let (sender, receiver) = mpsc::unbounded();
self.tx_statuses_listeners.push(sender);
receiver
}

/// Notifies all listeners about new pending transaction.
fn notify(&self, hashes: &[H256]) {
fn notify(&mut self, hashes: &[H256], status: TxStatus) {
for listener in &self.listeners {
listener(hashes)
}

let to_send: Vec<(H256, TxStatus)> = hashes.into_iter().map(|hash| (hash.clone(), status)).collect();

self.tx_statuses_listeners.retain(| listener| listener.unbounded_send(Arc::new(to_send.clone())).is_ok());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's incorrect, the reason we have Arc is to avoid cloning the entire Vec. So:

  1. First create the Arc
  2. Then clone Arc not Vec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomusdrw I fixed it

}
}

Expand Down
6 changes: 5 additions & 1 deletion parity/rpc_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,11 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
handler.extend_with(EthPubSub::to_delegate(client));
}
Api::ParityTransactionsPool => {
if !for_generic_pubsub {}
if !for_generic_pubsub {
let receiver = self.transaction_queue.write().tx_statuses_receiver();
let client = TransactionsPoolClient::new(self.executor.clone(), receiver);
handler.extend_with(TransactionsPoolClient::to_delegate(client));
}
}
Api::Personal => {
#[cfg(feature = "accounts")]
Expand Down