Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use lru instead of cached in near-network #5512

Merged
merged 1 commit into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion chain/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ actix = "=0.11.0-beta.2"
borsh = { version = "0.9", features = ["rc"] }
bytes = "1"
bytesize = "1.1"
cached = "0.23"
conqueue = "0.4.0"
futures = "0.3"
lru = "0.6.5"
near-rust-allocator-proxy = "0.3.0"
once_cell = "1.5.2"
rand = "0.7"
Expand Down
10 changes: 5 additions & 5 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use actix::{
};
use borsh::BorshDeserialize;
use borsh::BorshSerialize;
use cached::{Cached, SizedCache};
use lru::LruCache;
use near_crypto::Signature;
use near_network_primitives::types::{
Ban, NetworkViewClientMessages, NetworkViewClientResponses, PeerChainInfo, PeerChainInfoV2,
Expand Down Expand Up @@ -103,7 +103,7 @@ pub struct PeerActor {
/// How many peer actors are created
peer_counter: Arc<AtomicUsize>,
/// Cache of recently routed messages, this allows us to drop duplicates
routed_message_cache: SizedCache<(PeerId, PeerIdOrHash, Signature), Instant>,
routed_message_cache: LruCache<(PeerId, PeerIdOrHash, Signature), Instant>,
/// A helper data structure for limiting reading
#[allow(unused)]
throttle_controller: ThrottleController,
Expand Down Expand Up @@ -152,7 +152,7 @@ impl PeerActor {
network_metrics,
txns_since_last_block,
peer_counter,
routed_message_cache: SizedCache::with_size(ROUTED_MESSAGE_CACHE_SIZE),
routed_message_cache: LruCache::new(ROUTED_MESSAGE_CACHE_SIZE),
throttle_controller,
}
}
Expand Down Expand Up @@ -683,13 +683,13 @@ impl StreamHandler<Result<Vec<u8>, ReasonForBan>> for PeerActor {
if let PeerMessage::Routed(msg) = &peer_msg {
let key = (msg.author.clone(), msg.target.clone(), msg.signature.clone());
let now = Clock::instant();
if let Some(time) = self.routed_message_cache.cache_get(&key) {
if let Some(time) = self.routed_message_cache.get(&key) {
if now.saturating_duration_since(*time) <= DROP_DUPLICATED_MESSAGES_PERIOD {
debug!(target: "network", "Dropping duplicated message from {} to {:?}", msg.author, msg.target);
return;
}
}
self.routed_message_cache.cache_set(key, now);
self.routed_message_cache.put(key, now);
}
if let PeerMessage::Routed(RoutedMessage {
body: RoutedMessageBody::ForwardTx(_), ..
Expand Down
68 changes: 34 additions & 34 deletions chain/network/src/routing/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::routing::utils::cache_to_hashmap;
use crate::PeerInfo;
use actix::dev::{MessageResponse, ResponseChannel};
use actix::{Actor, Message};
use cached::{Cached, SizedCache};
use lru::LruCache;
use near_network_primitives::types::{PeerIdOrHash, Ping, Pong};
use near_primitives::hash::CryptoHash;
use near_primitives::network::{AnnounceAccount, PeerId};
Expand Down Expand Up @@ -57,7 +57,7 @@ pub struct RoutingTableView {
/// PeerId associated with this instance.
my_peer_id: PeerId,
/// PeerId associated for every known account id.
account_peers: SizedCache<AccountId, AnnounceAccount>,
account_peers: LruCache<AccountId, AnnounceAccount>,
/// Active PeerId that are part of the shortest path to each PeerId.
pub peer_forwarding: Arc<HashMap<PeerId, Vec<PeerId>>>,
/// Store last update for known edges. This is limited to list of adjacent edges to `my_peer_id`.
Expand All @@ -69,15 +69,15 @@ pub struct RoutingTableView {
/// Number of times each active connection was used to route a message.
/// If there are several options use route with minimum nonce.
/// New routes are added with minimum nonce.
route_nonce: SizedCache<PeerId, usize>,
route_nonce: LruCache<PeerId, usize>,
/// Ping received by nonce.
ping_info: SizedCache<usize, (Ping, usize)>,
ping_info: LruCache<usize, (Ping, usize)>,
/// Ping received by nonce.
pong_info: SizedCache<usize, (Pong, usize)>,
pong_info: LruCache<usize, (Pong, usize)>,
/// List of pings sent for which we haven't received any pong yet.
waiting_pong: SizedCache<PeerId, SizedCache<usize, Instant>>,
waiting_pong: LruCache<PeerId, LruCache<usize, Instant>>,
/// Last nonce sent to each peer through pings.
last_ping_nonce: SizedCache<PeerId, usize>,
last_ping_nonce: LruCache<PeerId, usize>,
}

#[derive(Debug)]
Expand All @@ -94,16 +94,16 @@ impl RoutingTableView {

Self {
my_peer_id,
account_peers: SizedCache::with_size(ANNOUNCE_ACCOUNT_CACHE_SIZE),
account_peers: LruCache::new(ANNOUNCE_ACCOUNT_CACHE_SIZE),
peer_forwarding: Default::default(),
local_edges_info: Default::default(),
route_back: RouteBackCache::default(),
store,
route_nonce: SizedCache::with_size(ROUND_ROBIN_NONCE_CACHE_SIZE),
ping_info: SizedCache::with_size(PING_PONG_CACHE_SIZE),
pong_info: SizedCache::with_size(PING_PONG_CACHE_SIZE),
waiting_pong: SizedCache::with_size(PING_PONG_CACHE_SIZE),
last_ping_nonce: SizedCache::with_size(PING_PONG_CACHE_SIZE),
route_nonce: LruCache::new(ROUND_ROBIN_NONCE_CACHE_SIZE),
ping_info: LruCache::new(PING_PONG_CACHE_SIZE),
pong_info: LruCache::new(PING_PONG_CACHE_SIZE),
waiting_pong: LruCache::new(PING_PONG_CACHE_SIZE),
last_ping_nonce: LruCache::new(PING_PONG_CACHE_SIZE),
}
}

Expand Down Expand Up @@ -132,7 +132,7 @@ impl RoutingTableView {
// max nonce - threshold.
let nonce_peer = routes
.iter()
.map(|peer_id| (self.route_nonce.cache_get(peer_id).cloned().unwrap_or(0), peer_id))
.map(|peer_id| (self.route_nonce.get(peer_id).cloned().unwrap_or(0), peer_id))
.collect::<Vec<_>>();

// Neighbor with minimum and maximum nonce respectively.
Expand All @@ -141,12 +141,12 @@ impl RoutingTableView {

if min_v.0 + ROUND_ROBIN_MAX_NONCE_DIFFERENCE_ALLOWED < max_v.0 {
self.route_nonce
.cache_set(min_v.1.clone(), max_v.0 - ROUND_ROBIN_MAX_NONCE_DIFFERENCE_ALLOWED);
.put(min_v.1.clone(), max_v.0 - ROUND_ROBIN_MAX_NONCE_DIFFERENCE_ALLOWED);
}

let next_hop = min_v.1;
let nonce = self.route_nonce.cache_get(next_hop).cloned();
self.route_nonce.cache_set(next_hop.clone(), nonce.map_or(1, |nonce| nonce + 1));
let nonce = self.route_nonce.get(next_hop).cloned();
self.route_nonce.put(next_hop.clone(), nonce.map_or(1, |nonce| nonce + 1));
Ok(next_hop.clone())
} else {
Err(FindRouteError::PeerNotFound)
Expand All @@ -173,7 +173,7 @@ impl RoutingTableView {
/// Note: There is at most on peer id per account id.
pub fn add_account(&mut self, announce_account: AnnounceAccount) {
let account_id = announce_account.account_id.clone();
self.account_peers.cache_set(account_id.clone(), announce_account.clone());
self.account_peers.put(account_id.clone(), announce_account.clone());

// Add account to store
let mut update = self.store.store_update();
Expand Down Expand Up @@ -213,46 +213,46 @@ impl RoutingTableView {
}

pub fn add_ping(&mut self, ping: Ping) {
let cnt = self.ping_info.cache_get(&(ping.nonce as usize)).map(|v| v.1).unwrap_or(0);
let cnt = self.ping_info.get(&(ping.nonce as usize)).map(|v| v.1).unwrap_or(0);

self.ping_info.cache_set(ping.nonce as usize, (ping, cnt + 1));
self.ping_info.put(ping.nonce as usize, (ping, cnt + 1));
}

/// Return time of the round trip of ping + pong
pub fn add_pong(&mut self, pong: Pong) -> Option<f64> {
let mut res = None;

if let Some(nonces) = self.waiting_pong.cache_get_mut(&pong.source) {
res = nonces.cache_remove(&(pong.nonce as usize)).map(|sent| {
if let Some(nonces) = self.waiting_pong.get_mut(&pong.source) {
res = nonces.pop(&(pong.nonce as usize)).map(|sent| {
Clock::instant().saturating_duration_since(sent).as_secs_f64() * 1000f64
});
}

let cnt = self.pong_info.cache_get(&(pong.nonce as usize)).map(|v| v.1).unwrap_or(0);
let cnt = self.pong_info.get(&(pong.nonce as usize)).map(|v| v.1).unwrap_or(0);

self.pong_info.cache_set(pong.nonce as usize, (pong, (cnt + 1)));
self.pong_info.put(pong.nonce as usize, (pong, (cnt + 1)));

res
}

// for unit tests
pub fn sending_ping(&mut self, nonce: usize, target: PeerId) {
let entry = if let Some(entry) = self.waiting_pong.cache_get_mut(&target) {
let entry = if let Some(entry) = self.waiting_pong.get_mut(&target) {
entry
} else {
self.waiting_pong.cache_set(target.clone(), SizedCache::with_size(10));
self.waiting_pong.cache_get_mut(&target).unwrap()
self.waiting_pong.put(target.clone(), LruCache::new(10));
self.waiting_pong.get_mut(&target).unwrap()
};

entry.cache_set(nonce, Clock::instant());
entry.put(nonce, Clock::instant());
}

pub fn get_ping(&mut self, peer_id: PeerId) -> usize {
if let Some(entry) = self.last_ping_nonce.cache_get_mut(&peer_id) {
if let Some(entry) = self.last_ping_nonce.get_mut(&peer_id) {
*entry += 1;
*entry - 1
} else {
self.last_ping_nonce.cache_set(peer_id, 1);
self.last_ping_nonce.put(peer_id, 1);
0
}
}
Expand All @@ -277,22 +277,22 @@ impl RoutingTableView {
///
/// Get keys currently on cache.
pub fn get_accounts_keys(&mut self) -> Vec<AccountId> {
self.account_peers.key_order().cloned().collect()
self.account_peers.iter().map(|(k, _v)| (k.clone())).collect()
}

/// Get announce accounts on cache.
pub fn get_announce_accounts(&mut self) -> Vec<AnnounceAccount> {
self.account_peers.value_order().cloned().collect()
self.account_peers.iter().map(|(_k, v)| v).cloned().collect()
}

/// Get number of accounts
pub fn get_announce_accounts_size(&mut self) -> usize {
self.account_peers.cache_size()
self.account_peers.len()
}

/// Get account announce from
pub fn get_announce(&mut self, account_id: &AccountId) -> Option<AnnounceAccount> {
if let Some(announce_account) = self.account_peers.cache_get(account_id) {
if let Some(announce_account) = self.account_peers.get(account_id) {
Some(announce_account.clone())
} else {
self.store
Expand Down
6 changes: 3 additions & 3 deletions chain/network/src/routing/utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use cached::SizedCache;
use lru::LruCache;
use std::collections::HashMap;
use std::hash::Hash;

/// `cache_to_hashmap` - converts SizedCache<K, V> to HashMap<K, V>
pub fn cache_to_hashmap<K: Hash + Eq + Clone, V: Clone>(cache: &SizedCache<K, V>) -> HashMap<K, V> {
cache.key_order().cloned().zip(cache.value_order().cloned()).collect()
pub fn cache_to_hashmap<K: Hash + Eq + Clone, V: Clone>(cache: &LruCache<K, V>) -> HashMap<K, V> {
cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
}