diff --git a/client/transaction-pool/graph/src/lib.rs b/client/transaction-pool/graph/src/lib.rs index b4646c6055bf6..bf220ce22973a 100644 --- a/client/transaction-pool/graph/src/lib.rs +++ b/client/transaction-pool/graph/src/lib.rs @@ -32,6 +32,7 @@ mod pool; mod ready; mod rotator; mod validated_pool; +mod tracked_map; pub mod base_pool; pub mod watcher; diff --git a/client/transaction-pool/graph/src/ready.rs b/client/transaction-pool/graph/src/ready.rs index b5807ffce4480..47289f26f02a9 100644 --- a/client/transaction-pool/graph/src/ready.rs +++ b/client/transaction-pool/graph/src/ready.rs @@ -25,15 +25,17 @@ use std::{ use serde::Serialize; use log::trace; -use parking_lot::RwLock; use sp_runtime::traits::Member; use sp_runtime::transaction_validity::{ TransactionTag as Tag, }; use sp_transaction_pool::error; -use crate::future::WaitingTransaction; -use crate::base_pool::Transaction; +use crate::{ + base_pool::Transaction, + future::WaitingTransaction, + tracked_map::{self, ReadOnlyTrackedMap, TrackedMap}, +}; /// An in-pool transaction reference. /// @@ -113,11 +115,17 @@ pub struct ReadyTransactions { /// tags that are provided by Ready transactions provided_tags: HashMap, /// Transactions that are ready (i.e. don't have any requirements external to the pool) - ready: Arc>>>, + ready: TrackedMap>, /// Best transactions that are ready to be included to the block without any other previous transaction. best: BTreeSet>, } +impl tracked_map::Size for ReadyTx { + fn size(&self) -> usize { + self.transaction.transaction.bytes + } +} + impl Default for ReadyTransactions { fn default() -> Self { ReadyTransactions { @@ -468,18 +476,18 @@ impl ReadyTransactions { /// Returns number of transactions in this queue. pub fn len(&self) -> usize { - self.ready.read().len() + self.ready.len() } /// Returns sum of encoding lengths of all transactions in this queue. pub fn bytes(&self) -> usize { - self.ready.read().values().fold(0, |acc, tx| acc + tx.transaction.transaction.bytes) + self.ready.bytes() } } /// Iterator of ready transactions ordered by priority. pub struct BestIterator { - all: Arc>>>, + all: ReadOnlyTrackedMap>, awaiting: HashMap)>, best: BTreeSet>, } diff --git a/client/transaction-pool/graph/src/tracked_map.rs b/client/transaction-pool/graph/src/tracked_map.rs new file mode 100644 index 0000000000000..c799eb0b96ea1 --- /dev/null +++ b/client/transaction-pool/graph/src/tracked_map.rs @@ -0,0 +1,189 @@ +// This file is part of Substrate. + +// Copyright (C) 2018-2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::{ + collections::HashMap, + sync::{Arc, atomic::{AtomicIsize, Ordering as AtomicOrdering}}, +}; +use parking_lot::{RwLock, RwLockWriteGuard, RwLockReadGuard}; + +/// Something that can report it's size. +pub trait Size { + fn size(&self) -> usize; +} + +/// Map with size tracking. +/// +/// Size reported might be slightly off and only approximately true. +#[derive(Debug, parity_util_mem::MallocSizeOf)] +pub struct TrackedMap { + index: Arc>>, + bytes: AtomicIsize, + length: AtomicIsize, +} + +impl Default for TrackedMap { + fn default() -> Self { + Self { + index: Arc::new(HashMap::default().into()), + bytes: 0.into(), + length: 0.into(), + } + } +} + +impl TrackedMap { + /// Current tracked length of the content. + pub fn len(&self) -> usize { + std::cmp::max(self.length.load(AtomicOrdering::Relaxed), 0) as usize + } + + /// Current sum of content length. + pub fn bytes(&self) -> usize { + std::cmp::max(self.bytes.load(AtomicOrdering::Relaxed), 0) as usize + } + + /// Read-only clone of the interior. + pub fn clone(&self) -> ReadOnlyTrackedMap { + ReadOnlyTrackedMap(self.index.clone()) + } + + /// Lock map for read. + pub fn read<'a>(&'a self) -> TrackedMapReadAccess<'a, K, V> { + TrackedMapReadAccess { + inner_guard: self.index.read(), + } + } + + /// Lock map for write. + pub fn write<'a>(&'a self) -> TrackedMapWriteAccess<'a, K, V> { + TrackedMapWriteAccess { + inner_guard: self.index.write(), + bytes: &self.bytes, + length: &self.length, + } + } +} + +/// Read-only access to map. +/// +/// The only thing can be done is .read(). +pub struct ReadOnlyTrackedMap(Arc>>); + +impl ReadOnlyTrackedMap +where + K: Eq + std::hash::Hash +{ + /// Lock map for read. + pub fn read<'a>(&'a self) -> TrackedMapReadAccess<'a, K, V> { + TrackedMapReadAccess { + inner_guard: self.0.read(), + } + } +} + +pub struct TrackedMapReadAccess<'a, K, V> { + inner_guard: RwLockReadGuard<'a, HashMap>, +} + +impl<'a, K, V> TrackedMapReadAccess<'a, K, V> +where + K: Eq + std::hash::Hash +{ + /// Returns true if map contains key. + pub fn contains_key(&self, key: &K) -> bool { + self.inner_guard.contains_key(key) + } + + /// Returns reference to the contained value by key, if exists. + pub fn get(&self, key: &K) -> Option<&V> { + self.inner_guard.get(key) + } + + /// Returns iterator over all values. + pub fn values(&self) -> std::collections::hash_map::Values { + self.inner_guard.values() + } +} + +pub struct TrackedMapWriteAccess<'a, K, V> { + bytes: &'a AtomicIsize, + length: &'a AtomicIsize, + inner_guard: RwLockWriteGuard<'a, HashMap>, +} + +impl<'a, K, V> TrackedMapWriteAccess<'a, K, V> +where + K: Eq + std::hash::Hash, V: Size +{ + /// Insert value and return previous (if any). + pub fn insert(&mut self, key: K, val: V) -> Option { + let new_bytes = val.size(); + self.bytes.fetch_add(new_bytes as isize, AtomicOrdering::Relaxed); + self.length.fetch_add(1, AtomicOrdering::Relaxed); + self.inner_guard.insert(key, val).and_then(|old_val| { + self.bytes.fetch_sub(old_val.size() as isize, AtomicOrdering::Relaxed); + self.length.fetch_sub(1, AtomicOrdering::Relaxed); + Some(old_val) + }) + } + + /// Remove value by key. + pub fn remove(&mut self, key: &K) -> Option { + let val = self.inner_guard.remove(key); + if let Some(size) = val.as_ref().map(Size::size) { + self.bytes.fetch_sub(size as isize, AtomicOrdering::Relaxed); + self.length.fetch_sub(1, AtomicOrdering::Relaxed); + } + val + } + + /// Returns mutable reference to the contained value by key, if exists. + pub fn get_mut(&mut self, key: &K) -> Option<&mut V> { + self.inner_guard.get_mut(key) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + impl Size for i32 { + fn size(&self) -> usize { *self as usize / 10 } + } + + #[test] + fn basic() { + let map = TrackedMap::default(); + map.write().insert(5, 10); + map.write().insert(6, 20); + + assert_eq!(map.bytes(), 3); + assert_eq!(map.len(), 2); + + map.write().insert(6, 30); + + assert_eq!(map.bytes(), 4); + assert_eq!(map.len(), 2); + + map.write().remove(&6); + assert_eq!(map.bytes(), 1); + assert_eq!(map.len(), 1); + } +} \ No newline at end of file