Skip to content

Commit

Permalink
Migrate remaining routing tests to network crate (#8168)
Browse files Browse the repository at this point in the history
Follow up to #8073, #8109, and #8110 for the two remaining tests in `integration-tests/src/tests/network/routing.rs`.
  • Loading branch information
saketh-are authored Dec 7, 2022
1 parent 8c83455 commit a0def6d
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 128 deletions.
42 changes: 42 additions & 0 deletions chain/network/src/peer_manager/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ impl ActorHandler {
}
}

pub async fn send_outbound_connect(&self, peer_info: &PeerInfo, tier: tcp::Tier) {
let addr = self.actix.addr.clone();
let peer_info = peer_info.clone();
let stream = tcp::Stream::connect(&peer_info, tier).await.unwrap();
addr.do_send(PeerManagerMessageRequest::OutboundTcpConnect(stream).with_span_context());
}

pub fn connect_to(
&self,
peer_info: &PeerInfo,
Expand Down Expand Up @@ -352,6 +359,25 @@ impl ActorHandler {
}
}

pub async fn wait_for_direct_connection(&self, target_peer_id: PeerId) {
let mut events = self.events.from_now();
loop {
let connections =
self.with_state(|s| async move { s.tier2.load().ready.clone() }).await;

if connections.contains_key(&target_peer_id) {
return;
}

events
.recv_until(|ev| match ev {
Event::PeerManager(PME::HandshakeCompleted { .. }) => Some(()),
_ => None,
})
.await;
}
}

// Awaits until the routing_table matches `want`.
pub async fn wait_for_routing_table(&self, want: &[(PeerId, Vec<PeerId>)]) {
let mut events = self.events.from_now();
Expand Down Expand Up @@ -391,6 +417,22 @@ impl ActorHandler {
}
}

pub async fn wait_for_num_connected_peers(&self, wanted: usize) {
let mut events = self.events.from_now();
loop {
let got = self.with_state(|s| async move { s.tier2.load().ready.len() }).await;
if got == wanted {
return;
}
events
.recv_until(|ev| match ev {
Event::PeerManager(PME::EdgesAdded { .. }) => Some(()),
_ => None,
})
.await;
}
}

/// Executes `NetworkState::tier1_connect` method.
pub async fn tier1_connect(&self, clock: &time::Clock) {
let clock = clock.clone();
Expand Down
78 changes: 78 additions & 0 deletions chain/network/src/peer_manager/tests/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use crate::time;
use crate::types::PeerInfo;
use crate::types::PeerMessage;
use near_o11y::testonly::init_test_logger;
use near_primitives::network::PeerId;
use near_store::db::TestDB;
use pretty_assertions::assert_eq;
use rand::seq::IteratorRandom;
use rand::Rng as _;
use std::collections::HashSet;
use std::net::Ipv4Addr;
Expand Down Expand Up @@ -1222,3 +1224,79 @@ async fn do_not_block_announce_account_broadcast() {
pm1.announce_account(aa.clone()).await;
assert_eq!(&aa.peer_id, &pm2.wait_for_account_owner(&aa.account_id).await);
}

/// Check that two archival nodes keep connected after network rebalance. Nodes 0 and 1 are archival nodes, others aren't.
/// Initially connect 2, 3, 4 to 0. Then connect 1 to 0, this connection should persist, even after other nodes tries
/// to connect to node 0 again.
///
/// Do four rounds where 2, 3, 4 tries to connect to 0 and check that connection between 0 and 1 was never dropped.
#[tokio::test]
async fn archival_node() {
init_test_logger();
let mut rng = make_rng(921853233);
let rng = &mut rng;
let mut clock = time::FakeClock::default();
let chain = Arc::new(data::Chain::make(&mut clock, rng, 10));

let mut cfgs = make_configs(&chain, rng, 5, 5, false);
for config in cfgs.iter_mut() {
config.max_num_peers = 3;
config.ideal_connections_lo = 2;
config.ideal_connections_hi = 2;
config.safe_set_size = 1;
config.minimum_outbound_peers = 0;
}
cfgs[0].archive = true;
cfgs[1].archive = true;

tracing::info!(target:"test", "start five nodes, the first two of which are archival nodes");
let pm0 = start_pm(clock.clock(), TestDB::new(), cfgs[0].clone(), chain.clone()).await;
let pm1 = start_pm(clock.clock(), TestDB::new(), cfgs[1].clone(), chain.clone()).await;
let pm2 = start_pm(clock.clock(), TestDB::new(), cfgs[2].clone(), chain.clone()).await;
let pm3 = start_pm(clock.clock(), TestDB::new(), cfgs[3].clone(), chain.clone()).await;
let pm4 = start_pm(clock.clock(), TestDB::new(), cfgs[4].clone(), chain.clone()).await;

let id1 = pm1.cfg.node_id();

// capture pm0 event stream
let mut pm0_ev = pm0.events.from_now();

tracing::info!(target:"test", "connect node 2 to node 0");
pm2.send_outbound_connect(&pm0.peer_info(), tcp::Tier::T2).await;
tracing::info!(target:"test", "connect node 3 to node 0");
pm3.send_outbound_connect(&pm0.peer_info(), tcp::Tier::T2).await;

tracing::info!(target:"test", "connect node 4 to node 0 and wait for pm0 to close a connection");
pm4.send_outbound_connect(&pm0.peer_info(), tcp::Tier::T2).await;
wait_for_connection_closed(&mut pm0_ev, ClosingReason::PeerManager).await;

tracing::info!(target:"test", "connect node 1 to node 0 and wait for pm0 to close a connection");
pm1.send_outbound_connect(&pm0.peer_info(), tcp::Tier::T2).await;
wait_for_connection_closed(&mut pm0_ev, ClosingReason::PeerManager).await;

tracing::info!(target:"test", "check that node 0 and node 1 are still connected");
pm0.wait_for_direct_connection(id1.clone()).await;

for _step in 0..10 {
tracing::info!(target:"test", "[{_step}] select a node which node 0 is not connected to");
let pm0_connections: HashSet<PeerId> =
pm0.with_state(|s| async move { s.tier2.load().ready.keys().cloned().collect() }).await;

let chosen = vec![&pm2, &pm3, &pm4]
.iter()
.filter(|&pm| !pm0_connections.contains(&pm.cfg.node_id()))
.choose(rng)
.unwrap()
.clone();

tracing::info!(target:"test", "[{_step}] wait for the chosen node to finish disconnecting from node 0");
chosen.wait_for_num_connected_peers(0).await;

tracing::info!(target:"test", "[{_step}] connect the chosen node to node 0 and wait for pm0 to close a connection");
chosen.send_outbound_connect(&pm0.peer_info(), tcp::Tier::T2).await;
wait_for_connection_closed(&mut pm0_ev, ClosingReason::PeerManager).await;

tracing::info!(target:"test", "[{_step}] check that node 0 and node 1 are still connected");
pm0.wait_for_direct_connection(id1.clone()).await;
}
}
1 change: 0 additions & 1 deletion integration-tests/src/tests/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@ mod ban_peers;
mod churn_attack;
mod full_network;
mod peer_handshake;
mod routing;
mod runner;
mod stress_network;
57 changes: 0 additions & 57 deletions integration-tests/src/tests/network/routing.rs

This file was deleted.

70 changes: 0 additions & 70 deletions integration-tests/src/tests/network/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ pub enum Action {
force: bool,
},
CheckRoutingTable(usize, Vec<(usize, Vec<usize>)>),
CheckAccountId(usize, Vec<usize>),
// Send stop signal to some node.
Stop(usize),
// Wait time in milliseconds
Expand Down Expand Up @@ -172,29 +171,6 @@ async fn check_routing_table(
Ok(ControlFlow::Continue(()))
}

async fn check_account_id(
info: &mut RunningInfo,
source: usize,
known_validators: Vec<usize>,
) -> anyhow::Result<ControlFlow> {
let mut expected_known = vec![];
for u in known_validators.clone() {
expected_known.push(info.runner.test_config[u].account_id.clone());
}
let pm = &info.get_node(source)?.actix.addr;
let rt = match pm.send(PeerManagerMessageRequest::FetchRoutingTable.with_span_context()).await?
{
PeerManagerMessageResponse::FetchRoutingTable(rt) => rt,
_ => bail!("bad response"),
};
for v in &expected_known {
if !rt.account_peers.contains_key(v) {
return Ok(ControlFlow::Continue(()));
}
}
Ok(ControlFlow::Break(()))
}

impl StateMachine {
fn new() -> Self {
Self { actions: vec![] }
Expand Down Expand Up @@ -245,11 +221,6 @@ impl StateMachine {
Box::pin(check_routing_table(info, u, expected.clone()))
}));
}
Action::CheckAccountId(source, known_validators) => {
self.actions.push(Box::new(move |info| {
Box::pin(check_account_id(info, source, known_validators.clone()))
}));
}
Action::Stop(source) => {
self.actions.push(Box::new(move |info: &mut RunningInfo| Box::pin(async move {
debug!(target: "test", num_prev_actions, action = ?action_clone, "runner.rs: Action");
Expand Down Expand Up @@ -351,12 +322,6 @@ impl Runner {
self
}

/// Set node `u` as archival node.
pub fn set_as_archival(mut self, u: usize) -> Self {
self.test_config[u].archive = true;
self
}

/// Specify boot nodes. By default there are no boot nodes.
pub fn use_boot_nodes(mut self, boot_nodes: Vec<usize>) -> Self {
self.apply_all(move |test_config| {
Expand Down Expand Up @@ -592,41 +557,6 @@ pub fn check_expected_connections(
})
}

async fn check_direct_connection_inner(
info: &mut RunningInfo,
node_id: usize,
target_id: usize,
) -> anyhow::Result<ControlFlow> {
let target_peer_id = info.runner.test_config[target_id].peer_id();
debug!(target: "test", node_id, ?target_id, "runner.rs: check_direct_connection");
let pm = &info.get_node(node_id)?.actix.addr;
let rt = match pm.send(PeerManagerMessageRequest::FetchRoutingTable.with_span_context()).await?
{
PeerManagerMessageResponse::FetchRoutingTable(rt) => rt,
_ => bail!("bad response"),
};
let routes = if let Some(routes) = rt.next_hops.get(&target_peer_id) {
routes
} else {
debug!(target: "test", ?target_peer_id, node_id, target_id,
"runner.rs: check_direct_connection NO ROUTES!",
);
return Ok(ControlFlow::Continue(()));
};
debug!(target: "test", ?target_peer_id, ?routes, node_id, target_id,
"runner.rs: check_direct_connection",
);
if !routes.contains(&target_peer_id) {
return Ok(ControlFlow::Continue(()));
}
Ok(ControlFlow::Break(()))
}

/// Check that `node_id` has a direct connection to `target_id`.
pub fn check_direct_connection(node_id: usize, target_id: usize) -> ActionFn {
Box::new(move |info| Box::pin(check_direct_connection_inner(info, node_id, target_id)))
}

/// Restart a node that was already stopped.
pub fn restart(node_id: usize) -> ActionFn {
Box::new(move |info: &mut RunningInfo| {
Expand Down

0 comments on commit a0def6d

Please sign in to comment.