From a0def6d30ab7276ed8c09fa4a8637a9c9310b8fe Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Wed, 7 Dec 2022 12:36:46 -0500 Subject: [PATCH] Migrate remaining routing tests to network crate (#8168) Follow up to #8073, #8109, and #8110 for the two remaining tests in `integration-tests/src/tests/network/routing.rs`. --- chain/network/src/peer_manager/testonly.rs | 42 ++++++++++ .../network/src/peer_manager/tests/routing.rs | 78 +++++++++++++++++++ integration-tests/src/tests/network/mod.rs | 1 - .../src/tests/network/routing.rs | 57 -------------- integration-tests/src/tests/network/runner.rs | 70 ----------------- 5 files changed, 120 insertions(+), 128 deletions(-) delete mode 100644 integration-tests/src/tests/network/routing.rs diff --git a/chain/network/src/peer_manager/testonly.rs b/chain/network/src/peer_manager/testonly.rs index 653a0d92f73..98196091a68 100644 --- a/chain/network/src/peer_manager/testonly.rs +++ b/chain/network/src/peer_manager/testonly.rs @@ -137,6 +137,13 @@ impl ActorHandler { } } + pub async fn send_outbound_connect(&self, peer_info: &PeerInfo, tier: tcp::Tier) { + let addr = self.actix.addr.clone(); + let peer_info = peer_info.clone(); + let stream = tcp::Stream::connect(&peer_info, tier).await.unwrap(); + addr.do_send(PeerManagerMessageRequest::OutboundTcpConnect(stream).with_span_context()); + } + pub fn connect_to( &self, peer_info: &PeerInfo, @@ -352,6 +359,25 @@ impl ActorHandler { } } + pub async fn wait_for_direct_connection(&self, target_peer_id: PeerId) { + let mut events = self.events.from_now(); + loop { + let connections = + self.with_state(|s| async move { s.tier2.load().ready.clone() }).await; + + if connections.contains_key(&target_peer_id) { + return; + } + + events + .recv_until(|ev| match ev { + Event::PeerManager(PME::HandshakeCompleted { .. }) => Some(()), + _ => None, + }) + .await; + } + } + // Awaits until the routing_table matches `want`. pub async fn wait_for_routing_table(&self, want: &[(PeerId, Vec)]) { let mut events = self.events.from_now(); @@ -391,6 +417,22 @@ impl ActorHandler { } } + pub async fn wait_for_num_connected_peers(&self, wanted: usize) { + let mut events = self.events.from_now(); + loop { + let got = self.with_state(|s| async move { s.tier2.load().ready.len() }).await; + if got == wanted { + return; + } + events + .recv_until(|ev| match ev { + Event::PeerManager(PME::EdgesAdded { .. }) => Some(()), + _ => None, + }) + .await; + } + } + /// Executes `NetworkState::tier1_connect` method. pub async fn tier1_connect(&self, clock: &time::Clock) { let clock = clock.clone(); diff --git a/chain/network/src/peer_manager/tests/routing.rs b/chain/network/src/peer_manager/tests/routing.rs index 38b34291014..bac7f86bbdc 100644 --- a/chain/network/src/peer_manager/tests/routing.rs +++ b/chain/network/src/peer_manager/tests/routing.rs @@ -17,8 +17,10 @@ use crate::time; use crate::types::PeerInfo; use crate::types::PeerMessage; use near_o11y::testonly::init_test_logger; +use near_primitives::network::PeerId; use near_store::db::TestDB; use pretty_assertions::assert_eq; +use rand::seq::IteratorRandom; use rand::Rng as _; use std::collections::HashSet; use std::net::Ipv4Addr; @@ -1222,3 +1224,79 @@ async fn do_not_block_announce_account_broadcast() { pm1.announce_account(aa.clone()).await; assert_eq!(&aa.peer_id, &pm2.wait_for_account_owner(&aa.account_id).await); } + +/// Check that two archival nodes keep connected after network rebalance. Nodes 0 and 1 are archival nodes, others aren't. +/// Initially connect 2, 3, 4 to 0. Then connect 1 to 0, this connection should persist, even after other nodes tries +/// to connect to node 0 again. +/// +/// Do four rounds where 2, 3, 4 tries to connect to 0 and check that connection between 0 and 1 was never dropped. +#[tokio::test] +async fn archival_node() { + init_test_logger(); + let mut rng = make_rng(921853233); + let rng = &mut rng; + let mut clock = time::FakeClock::default(); + let chain = Arc::new(data::Chain::make(&mut clock, rng, 10)); + + let mut cfgs = make_configs(&chain, rng, 5, 5, false); + for config in cfgs.iter_mut() { + config.max_num_peers = 3; + config.ideal_connections_lo = 2; + config.ideal_connections_hi = 2; + config.safe_set_size = 1; + config.minimum_outbound_peers = 0; + } + cfgs[0].archive = true; + cfgs[1].archive = true; + + tracing::info!(target:"test", "start five nodes, the first two of which are archival nodes"); + let pm0 = start_pm(clock.clock(), TestDB::new(), cfgs[0].clone(), chain.clone()).await; + let pm1 = start_pm(clock.clock(), TestDB::new(), cfgs[1].clone(), chain.clone()).await; + let pm2 = start_pm(clock.clock(), TestDB::new(), cfgs[2].clone(), chain.clone()).await; + let pm3 = start_pm(clock.clock(), TestDB::new(), cfgs[3].clone(), chain.clone()).await; + let pm4 = start_pm(clock.clock(), TestDB::new(), cfgs[4].clone(), chain.clone()).await; + + let id1 = pm1.cfg.node_id(); + + // capture pm0 event stream + let mut pm0_ev = pm0.events.from_now(); + + tracing::info!(target:"test", "connect node 2 to node 0"); + pm2.send_outbound_connect(&pm0.peer_info(), tcp::Tier::T2).await; + tracing::info!(target:"test", "connect node 3 to node 0"); + pm3.send_outbound_connect(&pm0.peer_info(), tcp::Tier::T2).await; + + tracing::info!(target:"test", "connect node 4 to node 0 and wait for pm0 to close a connection"); + pm4.send_outbound_connect(&pm0.peer_info(), tcp::Tier::T2).await; + wait_for_connection_closed(&mut pm0_ev, ClosingReason::PeerManager).await; + + tracing::info!(target:"test", "connect node 1 to node 0 and wait for pm0 to close a connection"); + pm1.send_outbound_connect(&pm0.peer_info(), tcp::Tier::T2).await; + wait_for_connection_closed(&mut pm0_ev, ClosingReason::PeerManager).await; + + tracing::info!(target:"test", "check that node 0 and node 1 are still connected"); + pm0.wait_for_direct_connection(id1.clone()).await; + + for _step in 0..10 { + tracing::info!(target:"test", "[{_step}] select a node which node 0 is not connected to"); + let pm0_connections: HashSet = + pm0.with_state(|s| async move { s.tier2.load().ready.keys().cloned().collect() }).await; + + let chosen = vec![&pm2, &pm3, &pm4] + .iter() + .filter(|&pm| !pm0_connections.contains(&pm.cfg.node_id())) + .choose(rng) + .unwrap() + .clone(); + + tracing::info!(target:"test", "[{_step}] wait for the chosen node to finish disconnecting from node 0"); + chosen.wait_for_num_connected_peers(0).await; + + tracing::info!(target:"test", "[{_step}] connect the chosen node to node 0 and wait for pm0 to close a connection"); + chosen.send_outbound_connect(&pm0.peer_info(), tcp::Tier::T2).await; + wait_for_connection_closed(&mut pm0_ev, ClosingReason::PeerManager).await; + + tracing::info!(target:"test", "[{_step}] check that node 0 and node 1 are still connected"); + pm0.wait_for_direct_connection(id1.clone()).await; + } +} diff --git a/integration-tests/src/tests/network/mod.rs b/integration-tests/src/tests/network/mod.rs index 0738982137d..714bb3f5fe3 100644 --- a/integration-tests/src/tests/network/mod.rs +++ b/integration-tests/src/tests/network/mod.rs @@ -2,6 +2,5 @@ mod ban_peers; mod churn_attack; mod full_network; mod peer_handshake; -mod routing; mod runner; mod stress_network; diff --git a/integration-tests/src/tests/network/routing.rs b/integration-tests/src/tests/network/routing.rs deleted file mode 100644 index 094d654bbe1..00000000000 --- a/integration-tests/src/tests/network/routing.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::tests::network::runner::*; -use near_network::time; - -#[test] -fn account_propagation() -> anyhow::Result<()> { - let mut runner = Runner::new(3, 2); - - runner.push(Action::AddEdge { from: 0, to: 1, force: true }); - runner.push(Action::CheckAccountId(1, vec![0, 1])); - runner.push(Action::AddEdge { from: 0, to: 2, force: true }); - runner.push(Action::CheckAccountId(2, vec![0, 1])); - - start_test(runner) -} - -/// Check that two archival nodes keep connected after network rebalance. Nodes 0 and 1 are archival nodes, others aren't. -/// Initially connect 2, 3, 4 to 0. Then connect 1 to 0, this connection should persist, even after other nodes tries -/// to connect to node 0 again. -/// -/// Do four rounds where 2, 3, 4 tries to connect to 0 and check that connection between 0 and 1 was never dropped. -#[test] -// TODO(#5389) fix this test, ignoring for now to unlock merging -fn archival_node() -> anyhow::Result<()> { - let mut runner = Runner::new(5, 5) - .max_num_peers(3) - .ideal_connections(2, 2) - .safe_set_size(1) - .minimum_outbound_peers(0) - .set_as_archival(0) - .set_as_archival(1); - - runner.push(Action::AddEdge { from: 2, to: 0, force: true }); - runner.push(Action::Wait(time::Duration::milliseconds(50))); - runner.push(Action::AddEdge { from: 3, to: 0, force: true }); - runner.push(Action::Wait(time::Duration::milliseconds(50))); - runner.push(Action::AddEdge { from: 4, to: 0, force: true }); - runner.push(Action::Wait(time::Duration::milliseconds(50))); - runner.push_action(check_expected_connections(0, Some(2), Some(2))); - - runner.push(Action::AddEdge { from: 1, to: 0, force: true }); - runner.push(Action::Wait(time::Duration::milliseconds(50))); - runner.push_action(check_expected_connections(0, Some(2), Some(2))); - runner.push_action(check_direct_connection(0, 1)); - - for _step in 0..4 { - runner.push(Action::AddEdge { from: 2, to: 0, force: true }); - runner.push(Action::Wait(time::Duration::milliseconds(50))); - runner.push(Action::AddEdge { from: 3, to: 0, force: true }); - runner.push(Action::Wait(time::Duration::milliseconds(50))); - runner.push(Action::AddEdge { from: 4, to: 0, force: true }); - runner.push(Action::Wait(time::Duration::milliseconds(50))); - runner.push_action(check_expected_connections(0, Some(2), Some(2))); - runner.push_action(check_direct_connection(0, 1)); - } - - start_test(runner) -} diff --git a/integration-tests/src/tests/network/runner.rs b/integration-tests/src/tests/network/runner.rs index df0a54fea49..6bf3242e2e0 100644 --- a/integration-tests/src/tests/network/runner.rs +++ b/integration-tests/src/tests/network/runner.rs @@ -126,7 +126,6 @@ pub enum Action { force: bool, }, CheckRoutingTable(usize, Vec<(usize, Vec)>), - CheckAccountId(usize, Vec), // Send stop signal to some node. Stop(usize), // Wait time in milliseconds @@ -172,29 +171,6 @@ async fn check_routing_table( Ok(ControlFlow::Continue(())) } -async fn check_account_id( - info: &mut RunningInfo, - source: usize, - known_validators: Vec, -) -> anyhow::Result { - let mut expected_known = vec![]; - for u in known_validators.clone() { - expected_known.push(info.runner.test_config[u].account_id.clone()); - } - let pm = &info.get_node(source)?.actix.addr; - let rt = match pm.send(PeerManagerMessageRequest::FetchRoutingTable.with_span_context()).await? - { - PeerManagerMessageResponse::FetchRoutingTable(rt) => rt, - _ => bail!("bad response"), - }; - for v in &expected_known { - if !rt.account_peers.contains_key(v) { - return Ok(ControlFlow::Continue(())); - } - } - Ok(ControlFlow::Break(())) -} - impl StateMachine { fn new() -> Self { Self { actions: vec![] } @@ -245,11 +221,6 @@ impl StateMachine { Box::pin(check_routing_table(info, u, expected.clone())) })); } - Action::CheckAccountId(source, known_validators) => { - self.actions.push(Box::new(move |info| { - Box::pin(check_account_id(info, source, known_validators.clone())) - })); - } Action::Stop(source) => { self.actions.push(Box::new(move |info: &mut RunningInfo| Box::pin(async move { debug!(target: "test", num_prev_actions, action = ?action_clone, "runner.rs: Action"); @@ -351,12 +322,6 @@ impl Runner { self } - /// Set node `u` as archival node. - pub fn set_as_archival(mut self, u: usize) -> Self { - self.test_config[u].archive = true; - self - } - /// Specify boot nodes. By default there are no boot nodes. pub fn use_boot_nodes(mut self, boot_nodes: Vec) -> Self { self.apply_all(move |test_config| { @@ -592,41 +557,6 @@ pub fn check_expected_connections( }) } -async fn check_direct_connection_inner( - info: &mut RunningInfo, - node_id: usize, - target_id: usize, -) -> anyhow::Result { - let target_peer_id = info.runner.test_config[target_id].peer_id(); - debug!(target: "test", node_id, ?target_id, "runner.rs: check_direct_connection"); - let pm = &info.get_node(node_id)?.actix.addr; - let rt = match pm.send(PeerManagerMessageRequest::FetchRoutingTable.with_span_context()).await? - { - PeerManagerMessageResponse::FetchRoutingTable(rt) => rt, - _ => bail!("bad response"), - }; - let routes = if let Some(routes) = rt.next_hops.get(&target_peer_id) { - routes - } else { - debug!(target: "test", ?target_peer_id, node_id, target_id, - "runner.rs: check_direct_connection NO ROUTES!", - ); - return Ok(ControlFlow::Continue(())); - }; - debug!(target: "test", ?target_peer_id, ?routes, node_id, target_id, - "runner.rs: check_direct_connection", - ); - if !routes.contains(&target_peer_id) { - return Ok(ControlFlow::Continue(())); - } - Ok(ControlFlow::Break(())) -} - -/// Check that `node_id` has a direct connection to `target_id`. -pub fn check_direct_connection(node_id: usize, target_id: usize) -> ActionFn { - Box::new(move |info| Box::pin(check_direct_connection_inner(info, node_id, target_id))) -} - /// Restart a node that was already stopped. pub fn restart(node_id: usize) -> ActionFn { Box::new(move |info: &mut RunningInfo| {