Skip to content

Commit

Permalink
re: refactor near-network and near-network-primitives (#5507)
Browse files Browse the repository at this point in the history
We should fix `near-network` and `near-network-primitives` issues.
  • Loading branch information
pmnoxx authored Nov 30, 2021
1 parent ceff74c commit a3a611a
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 74 deletions.
4 changes: 2 additions & 2 deletions chain/network-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ impl AccountOrPeerIdOrHash {
match self {
AccountOrPeerIdOrHash::AccountId(_) => None,
AccountOrPeerIdOrHash::PeerId(peer_id) => Some(PeerIdOrHash::PeerId(peer_id.clone())),
AccountOrPeerIdOrHash::Hash(hash) => Some(PeerIdOrHash::Hash(hash.clone())),
AccountOrPeerIdOrHash::Hash(hash) => Some(PeerIdOrHash::Hash(*hash)),
}
}
}
Expand Down Expand Up @@ -443,7 +443,7 @@ impl RoutedMessage {
}

pub fn verify(&self) -> bool {
self.signature.verify(self.hash().as_ref(), &self.author.public_key())
self.signature.verify(self.hash().as_ref(), self.author.public_key())
}

pub fn expect_response(&self) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ impl PeerActor {
/// Check whenever we exceeded number of transactions we got since last block.
/// If so, drop the transaction.
fn should_we_drop_msg_without_decoding(&self, msg: &Vec<u8>) -> bool {
if codec::is_forward_transaction(&msg).unwrap_or(false) {
if codec::is_forward_transaction(msg).unwrap_or(false) {
let r = self.txns_since_last_block.load(Ordering::Acquire);
if r > MAX_TRANSACTIONS_PER_BLOCK_MESSAGE {
return true;
Expand Down Expand Up @@ -587,7 +587,7 @@ impl PeerActor {
},
));
} else {
info!(target: "network", "Received invalid data {:?} from {}: {}", logging::pretty_vec(&msg), self.peer_info, err);
info!(target: "network", "Received invalid data {:?} from {}: {}", logging::pretty_vec(msg), self.peer_info, err);
}
}
}
Expand Down
15 changes: 7 additions & 8 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ impl PeerManagerActor {
peer_protocol_version,
{
self.initialize_routing_table_exchange(peer_id, peer_type, addr.clone(), ctx);
self.send_sync(peer_type, addr, ctx, target_peer_id.clone(), new_edge, Vec::new());
self.send_sync(peer_type, addr, ctx, target_peer_id, new_edge, Vec::new());
return;
}
);
Expand Down Expand Up @@ -2231,20 +2231,19 @@ impl Handler<PeerManagerMessageRequest> for PeerManagerActor {
#[cfg(feature = "test_features")]
#[cfg(feature = "protocol_feature_routing_exchange_algorithm")]
PeerManagerMessageRequest::StartRoutingTableSync(msg) => {
PeerManagerMessageResponse::StartRoutingTableSync(
self.handle_msg_start_routing_table_sync(msg, ctx),
)
self.handle_msg_start_routing_table_sync(msg, ctx);
PeerManagerMessageResponse::StartRoutingTableSync(())
}
#[cfg(feature = "test_features")]
PeerManagerMessageRequest::SetAdvOptions(msg) => {
PeerManagerMessageResponse::SetAdvOptions(self.handle_msg_set_adv_options(msg, ctx))
self.handle_msg_set_adv_options(msg, ctx);
PeerManagerMessageResponse::SetAdvOptions(())
}
#[cfg(feature = "test_features")]
#[cfg(feature = "protocol_feature_routing_exchange_algorithm")]
PeerManagerMessageRequest::SetRoutingTable(msg) => {
PeerManagerMessageResponse::SetRoutingTable(
self.handle_msg_set_routing_table(msg, ctx),
)
self.handle_msg_set_routing_table(msg, ctx);
PeerManagerMessageResponse::SetRoutingTable(())
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions chain/network/src/routing/ibf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl Ibf {
}
return Err("unable to recover result");
}
return Ok(result);
Ok(result)
}

/// Try to recover elements inserted into IBF.
Expand Down Expand Up @@ -190,7 +190,7 @@ impl Ibf {
let mask = (1 << self.k) - 1;
let pos0 = elem_hash & mask;
let mut pos1 = (elem_hash >> self.k) & mask;
let mut pos2 = (elem_hash >> 2 * self.k) & mask;
let mut pos2 = (elem_hash >> (2 * self.k)) & mask;
if pos1 >= pos0 {
pos1 = (pos1 + 1) & mask;
}
Expand Down Expand Up @@ -243,7 +243,7 @@ mod tests {

#[test]
fn create_blt_test() {
let set = 1000000_3_00000u64..1000000_301000u64;
let set = 1_000_000_300_000_u64..1_000_000_301_000_u64;

assert_eq!(1000, create_blt(set, 2048).recover().unwrap().len())
}
Expand Down
42 changes: 21 additions & 21 deletions chain/network/src/routing/ibf_peer_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ pub const MAX_IBF_LEVEL: ValidIBFLevel = ValidIBFLevel(17);
/// Represents IbfLevel from 10 to 17.
impl ValidIBFLevel {
pub fn inc(&self) -> Option<ValidIBFLevel> {
if self.0 + 1 >= MIN_IBF_LEVEL.0 && self.0 + 1 <= MAX_IBF_LEVEL.0 {
if self.0 + 1 >= MIN_IBF_LEVEL.0 && self.0 < MAX_IBF_LEVEL.0 {
Some(ValidIBFLevel(self.0 + 1))
} else {
None
}
}

pub fn is_valid(&self) -> bool {
return self.0 >= MIN_IBF_LEVEL.0 && self.0 <= MAX_IBF_LEVEL.0;
self.0 >= MIN_IBF_LEVEL.0 && self.0 <= MAX_IBF_LEVEL.0
}
}

Expand Down Expand Up @@ -112,7 +112,7 @@ impl IbfPeerSet {
}
}
let seed = ibf_set.get_seed();
self.peers.insert(peer_id.clone(), ibf_set);
self.peers.insert(peer_id, ibf_set);
seed
}

Expand All @@ -138,7 +138,7 @@ impl IbfPeerSet {
if let Some(_id) = self.slot_map.pop(edge) {
self.edges -= 1;
for (_, val) in self.peers.iter_mut() {
val.remove_edge(&edge);
val.remove_edge(edge);
}
return true;
}
Expand Down Expand Up @@ -168,7 +168,7 @@ impl IbfPeerSet {
mod test {
use crate::routing::edge::{Edge, SimpleEdge};
use crate::routing::ibf_peer_set::ValidIBFLevel;
use crate::routing::ibf_peer_set::{IbfPeerSet, SlotMap, SlotMapId};
use crate::routing::ibf_peer_set::{IbfPeerSet, SlotMap};
use crate::routing::ibf_set::IbfSet;
use crate::test_utils::random_peer_id;
use near_primitives::network::PeerId;
Expand All @@ -180,37 +180,37 @@ mod test {
let p1 = random_peer_id();
let p2 = random_peer_id();

let e0 = SimpleEdge::new(p0.clone(), p1.clone(), 0);
let e0 = SimpleEdge::new(p0, p1.clone(), 0);
let e1 = SimpleEdge::new(p1.clone(), p2.clone(), 0);
let e2 = SimpleEdge::new(p1.clone(), p2.clone(), 3);
let e2 = SimpleEdge::new(p1, p2, 3);

let mut sm = SlotMap::default();
assert_eq!(0 as SlotMapId, sm.insert(&e0).unwrap());
assert_eq!(0_u64, sm.insert(&e0).unwrap());

assert!(sm.insert(&e0).is_none());

assert_eq!(1 as SlotMapId, sm.insert(&e1).unwrap());
assert_eq!(2 as SlotMapId, sm.insert(&e2).unwrap());
assert_eq!(1_u64, sm.insert(&e1).unwrap());
assert_eq!(2_u64, sm.insert(&e2).unwrap());

assert_eq!(Some(2 as SlotMapId), sm.pop(&e2));
assert_eq!(Some(2_u64), sm.pop(&e2));
assert_eq!(None, sm.pop(&e2));
assert_eq!(Some(0 as SlotMapId), sm.pop(&e0));
assert_eq!(Some(0_u64), sm.pop(&e0));
assert_eq!(None, sm.pop(&e0));

assert_eq!(Some(1 as SlotMapId), sm.get(&e1));
assert_eq!(Some(1_u64), sm.get(&e1));

assert_eq!(Some(e1.clone()), sm.get_by_id(&(1 as SlotMapId)));
assert_eq!(None, sm.get_by_id(&(1000 as SlotMapId)));
assert_eq!(Some(e1.clone()), sm.get_by_id(&1_u64));
assert_eq!(None, sm.get_by_id(&1000_u64));

assert_eq!(Some(1 as SlotMapId), sm.pop(&e1));
assert_eq!(Some(1_u64), sm.pop(&e1));
assert_eq!(None, sm.get(&e1));
assert_eq!(None, sm.pop(&e1));

assert_eq!(3 as SlotMapId, sm.insert(&e2).unwrap());
assert_eq!(Some(3 as SlotMapId), sm.pop(&e2));
assert_eq!(3_u64, sm.insert(&e2).unwrap());
assert_eq!(Some(3_u64), sm.pop(&e2));

assert_eq!(None, sm.get_by_id(&(1 as SlotMapId)));
assert_eq!(None, sm.get_by_id(&(1000 as SlotMapId)));
assert_eq!(None, sm.get_by_id(&1_u64));
assert_eq!(None, sm.get_by_id(&1000_u64));
}

#[test]
Expand Down Expand Up @@ -238,7 +238,7 @@ mod test {
ips.add_peer(peer_id.clone(), Some(1111), &mut edges_info);

// Add edge
let e = SimpleEdge::new(peer_id.clone(), peer_id2.clone(), 111);
let e = SimpleEdge::new(peer_id.clone(), peer_id2, 111);
let se = ips.add_edge(&e).unwrap();
ibf_set.add_edge(&e, se);
assert!(ips.add_edge(&e).is_none());
Expand Down
2 changes: 1 addition & 1 deletion chain/network/src/routing/ibf_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ mod test {
a.remove_edge(&(i as u64));
}
for i in 0..10000 {
b.add_edge(&(i + 100 as u64), (i + 2000000) as SlotMapId);
b.add_edge(&(i + 100_u64), (i + 2000000) as SlotMapId);
}
for i in 10..=17 {
let mut ibf1 = a.get_ibf(ValidIBFLevel(i));
Expand Down
55 changes: 24 additions & 31 deletions chain/network/src/routing/routing_table_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ impl RoutingTableActor {
}

let (edge_hashes, unknown_edges_count) = new_ibf.try_recover();
let (known, unknown_edges) = self.split_edges_for_peer(&peer_id, &edge_hashes);
let (known, unknown_edges) = self.split_edges_for_peer(peer_id, &edge_hashes);

(known, unknown_edges, unknown_edges_count)
}
Expand Down Expand Up @@ -605,8 +605,7 @@ impl Handler<RoutingTableMessages> for RoutingTableActor {
}
#[cfg(feature = "protocol_feature_routing_exchange_algorithm")]
RoutingTableMessages::AddPeerIfMissing(peer_id, ibf_set) => {
let seed =
self.peer_ibf_set.add_peer(peer_id.clone(), ibf_set, &mut self.edges_info);
let seed = self.peer_ibf_set.add_peer(peer_id, ibf_set, &mut self.edges_info);
RoutingTableMessagesResponse::AddPeerResponse { seed }
}
#[cfg(feature = "protocol_feature_routing_exchange_algorithm")]
Expand All @@ -631,11 +630,11 @@ impl Handler<RoutingTableMessages> for RoutingTableActor {

let edges_for_peer = edges_for_peer
.iter()
.filter_map(|x| self.edges_info.get(&x.key()).cloned())
.filter_map(|x| self.edges_info.get(x.key()).cloned())
.collect();
// Prepare message
let ibf_msg = if unknown_edges_count == 0
&& unknown_edge_hashes.len() > 0
&& !unknown_edge_hashes.is_empty()
{
crate::types::RoutingVersion2 {
known_edges: self.edges_info.len() as u64,
Expand All @@ -645,38 +644,32 @@ impl Handler<RoutingTableMessages> for RoutingTableActor {
unknown_edge_hashes,
),
}
} else if unknown_edges_count == 0 && unknown_edge_hashes.len() == 0 {
} else if unknown_edges_count == 0 && unknown_edge_hashes.is_empty() {
crate::types::RoutingVersion2 {
known_edges: self.edges_info.len() as u64,
seed,
edges: edges_for_peer,
routing_state: crate::types::RoutingState::Done,
}
} else if let Some(new_ibf_level) = partial_sync.ibf_level.inc() {
let ibf_vec = ibf_set.get_ibf_vec(new_ibf_level);
crate::types::RoutingVersion2 {
known_edges: self.edges_info.len() as u64,
seed,
edges: edges_for_peer,
routing_state: crate::types::RoutingState::PartialSync(
crate::types::PartialSync {
ibf_level: new_ibf_level,
ibf: ibf_vec,
},
),
}
} else {
if let Some(new_ibf_level) = partial_sync.ibf_level.inc() {
let ibf_vec = ibf_set.get_ibf_vec(new_ibf_level);
crate::types::RoutingVersion2 {
known_edges: self.edges_info.len() as u64,
seed,
edges: edges_for_peer,
routing_state: crate::types::RoutingState::PartialSync(
crate::types::PartialSync {
ibf_level: new_ibf_level,
ibf: ibf_vec,
},
),
}
} else {
crate::types::RoutingVersion2 {
known_edges: self.edges_info.len() as u64,
seed,
edges: self
.edges_info
.iter()
.map(|x| x.1.clone())
.collect(),
routing_state: crate::types::RoutingState::RequestAllEdges,
}
crate::types::RoutingVersion2 {
known_edges: self.edges_info.len() as u64,
seed,
edges: self.edges_info.iter().map(|x| x.1.clone()).collect(),
routing_state: crate::types::RoutingState::RequestAllEdges,
}
};
RoutingTableMessagesResponse::ProcessIbfMessageResponse {
Expand Down Expand Up @@ -722,7 +715,7 @@ impl Handler<RoutingTableMessages> for RoutingTableActor {

let edges_for_peer = edges_for_peer
.iter()
.filter_map(|x| self.edges_info.get(&x.key()).cloned())
.filter_map(|x| self.edges_info.get(x.key()).cloned())
.collect();

let ibf_msg = crate::types::RoutingVersion2 {
Expand Down
10 changes: 4 additions & 6 deletions chain/network/src/tests/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn dont_load_on_build() {
let announce0 = AnnounceAccount {
account_id: "near0".parse().unwrap(),
peer_id: peer_id0.clone(),
epoch_id: epoch_id0.clone(),
epoch_id: epoch_id0,
signature: Signature::default(),
};

Expand All @@ -68,9 +68,7 @@ fn dont_load_on_build() {
routing_table.add_account(announce0.clone());
routing_table.add_account(announce1.clone());
let accounts = routing_table.get_announce_accounts();
assert!(vec![announce0.clone(), announce1.clone()]
.iter()
.all(|announce| { accounts.contains(announce) }));
assert!(vec![announce0, announce1].iter().all(|announce| { accounts.contains(announce) }));
assert_eq!(routing_table.get_announce_accounts().len(), 2);

let mut routing_table1 = RoutingTableView::new(peer_id0, store);
Expand All @@ -85,12 +83,12 @@ fn load_from_disk() {
let epoch_id0 = random_epoch_id();

let mut routing_table = RoutingTableView::new(peer_id0.clone(), store.clone());
let mut routing_table1 = RoutingTableView::new(peer_id0.clone(), store.clone());
let mut routing_table1 = RoutingTableView::new(peer_id0.clone(), store);

let announce0 = AnnounceAccount {
account_id: "near0".parse().unwrap(),
peer_id: peer_id0.clone(),
epoch_id: epoch_id0.clone(),
epoch_id: epoch_id0,
signature: Signature::default(),
};

Expand Down

0 comments on commit a3a611a

Please sign in to comment.