From f109153484096912d6d3feca0ce2aab6c9f3aaa8 Mon Sep 17 00:00:00 2001 From: pompon0 Date: Wed, 25 Jan 2023 18:40:18 +0100 Subject: [PATCH] Rollforward "deprecating AnnounceAccount (#8182)" (#8440) This is a roll forward of near/nearcore#8182 , which was reverted by near/nearcore#8314 . The issue that #8182 caused has been fixed in https://github.com/near/nearcore/pull/8407 . nayduck run: https://nayduck.near.org/#/run/2834 before the fix tests/sanity/switch_node_key.py was failing, now it succeeds. No nayduck tests are newly failing with this PR, compared to master. --- .../src/peer_manager/network_state/mod.rs | 43 +++++-- .../network/src/peer_manager/tests/routing.rs | 6 +- chain/network/src/peer_manager/tests/tier1.rs | 121 ++++++++++++------ chain/network/src/stats/metrics.rs | 13 ++ 4 files changed, 126 insertions(+), 57 deletions(-) diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index a3eb070c76d..0b8666ab49a 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -474,11 +474,11 @@ impl NetworkState { msg: RoutedMessageBody, ) -> bool { let mut success = false; + let accounts_data = self.accounts_data.load(); // All TIER1 messages are being sent over both TIER1 and TIER2 connections for now, // so that we can actually observe the latency/reliability improvements in practice: // for each message we track over which network tier it arrived faster? if tcp::Tier::T1.is_allowed_routed(&msg) { - let accounts_data = self.accounts_data.load(); for key in accounts_data.keys_by_id.get(account_id).iter().flat_map(|keys| keys.iter()) { let data = match accounts_data.data.get(key) { @@ -503,19 +503,34 @@ impl NetworkState { } } - let target = match self.graph.routing_table.account_owner(account_id) { - Some(peer_id) => peer_id, - None => { - // TODO(MarX, #1369): Message is dropped here. Define policy for this case. - metrics::MessageDropped::UnknownAccount.inc(&msg); - tracing::debug!(target: "network", - account_id = ?self.config.validator.as_ref().map(|v|v.account_id()), - to = ?account_id, - ?msg,"Drop message: unknown account", - ); - tracing::trace!(target: "network", known_peers = ?self.graph.routing_table.get_accounts_keys(), "Known peers"); - return false; - } + let peer_id_from_account_data = accounts_data + .keys_by_id + .get(account_id) + .iter() + .flat_map(|keys| keys.iter()) + .flat_map(|key| accounts_data.data.get(key)) + .next() + .map(|data| data.peer_id.clone()); + // Find the target peer_id: + // - first look it up in self.accounts_data + // - if missing, fall back to lookup in self.graph.routing_table + // We want to deprecate self.graph.routing_table.account_owner in the next release. + let target = if let Some(peer_id) = peer_id_from_account_data { + metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AccountData"]).inc(); + peer_id + } else if let Some(peer_id) = self.graph.routing_table.account_owner(account_id) { + metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AnnounceAccount"]).inc(); + peer_id + } else { + // TODO(MarX, #1369): Message is dropped here. Define policy for this case. + metrics::MessageDropped::UnknownAccount.inc(&msg); + tracing::debug!(target: "network", + account_id = ?self.config.validator.as_ref().map(|v|v.account_id()), + to = ?account_id, + ?msg,"Drop message: unknown account", + ); + tracing::trace!(target: "network", known_peers = ?self.graph.routing_table.get_accounts_keys(), "Known peers"); + return false; }; let msg = RawRoutedMessage { target: PeerIdOrHash::PeerId(target), body: msg }; diff --git a/chain/network/src/peer_manager/tests/routing.rs b/chain/network/src/peer_manager/tests/routing.rs index 66a4b738a40..5026f0893c3 100644 --- a/chain/network/src/peer_manager/tests/routing.rs +++ b/chain/network/src/peer_manager/tests/routing.rs @@ -874,7 +874,7 @@ async fn max_num_peers_limit() { drop(pm3); } -// test that TTL is handled property. +/// Test that TTL is handled properly. #[tokio::test] async fn ttl() { init_test_logger(); @@ -928,8 +928,8 @@ async fn ttl() { } } -// After the initial exchange, all subsequent SyncRoutingTable messages are -// expected to contain only the diff of the known data. +/// After the initial exchange, all subsequent SyncRoutingTable messages are +/// expected to contain only the diff of the known data. #[tokio::test] async fn repeated_data_in_sync_routing_table() { init_test_logger(); diff --git a/chain/network/src/peer_manager/tests/tier1.rs b/chain/network/src/peer_manager/tests/tier1.rs index 04f124151e4..40e91eaa683 100644 --- a/chain/network/src/peer_manager/tests/tier1.rs +++ b/chain/network/src/peer_manager/tests/tier1.rs @@ -8,10 +8,8 @@ use crate::peer_manager::testonly::Event; use crate::tcp; use crate::testonly::{make_rng, Rng}; use crate::time; -use crate::types::{NetworkRequests, NetworkResponses, PeerManagerMessageRequest}; use near_o11y::testonly::init_test_logger; -use near_o11y::WithSpanContextExt; -use near_primitives::block_header::{Approval, ApprovalInner, ApprovalMessage}; +use near_primitives::block_header::{Approval, ApprovalInner}; use near_primitives::validator_signer::ValidatorSigner; use near_store::db::TestDB; use rand::Rng as _; @@ -48,46 +46,66 @@ async fn establish_connections(clock: &time::Clock, pms: &[&peer_manager::teston } } +// Sends a routed TIER1 message from `from` to `to`. +// Returns the message body that was sent, or None if the routing information was missing. async fn send_tier1_message( rng: &mut Rng, + clock: &time::Clock, from: &peer_manager::testonly::ActorHandler, to: &peer_manager::testonly::ActorHandler, -) { +) -> Option { let from_signer = from.cfg.validator.as_ref().unwrap().signer.clone(); let to_signer = to.cfg.validator.as_ref().unwrap().signer.clone(); let target = to_signer.validator_id().clone(); - let want = make_block_approval(rng, from_signer.as_ref()); - let req = NetworkRequests::Approval { - approval_message: ApprovalMessage { approval: want.clone(), target }, - }; + let want = RoutedMessageBody::BlockApproval(make_block_approval(rng, from_signer.as_ref())); + let clock = clock.clone(); + from.with_state(move |s| async move { + if s.send_message_to_account(&clock, &target, want.clone()) { + Some(want) + } else { + None + } + }) + .await +} + +// Sends a routed TIER1 message from `from` to `to`, then waits until `to` receives it. +// `recv_tier` specifies over which network the message is expected to be actually delivered. +async fn send_and_recv_tier1_message( + rng: &mut Rng, + clock: &time::Clock, + from: &peer_manager::testonly::ActorHandler, + to: &peer_manager::testonly::ActorHandler, + recv_tier: tcp::Tier, +) { let mut events = to.events.from_now(); - let resp = from - .actix - .addr - .send(PeerManagerMessageRequest::NetworkRequests(req).with_span_context()) - .await - .unwrap(); - assert_eq!(NetworkResponses::NoResponse, resp.as_network_response()); + let want = send_tier1_message(rng, clock, from, to).await.expect("routing info not available"); let got = events .recv_until(|ev| match ev { - Event::PeerManager(PME::MessageProcessed(tcp::Tier::T1, PeerMessage::Routed(got))) => { + Event::PeerManager(PME::MessageProcessed(tier, PeerMessage::Routed(got))) + if tier == recv_tier => + { Some(got) } _ => None, }) .await; assert_eq!(from.cfg.node_id(), got.author); - assert_eq!(RoutedMessageBody::BlockApproval(want), got.body); + assert_eq!(want, got.body); } /// Send a message over each connection. -async fn test_clique(rng: &mut Rng, pms: &[&peer_manager::testonly::ActorHandler]) { +async fn test_clique( + rng: &mut Rng, + clock: &time::Clock, + pms: &[&peer_manager::testonly::ActorHandler], +) { for from in pms { for to in pms { if from.cfg.node_id() == to.cfg.node_id() { continue; } - send_tier1_message(rng, from, to).await; + send_and_recv_tier1_message(rng, clock, from, to, tcp::Tier::T1).await; } } } @@ -101,7 +119,7 @@ async fn first_proxy_advertisement() { let rng = &mut rng; let mut clock = time::FakeClock::default(); let chain = Arc::new(data::Chain::make(&mut clock, rng, 10)); - let pm = peer_manager::testonly::start( + let pm = start_pm( clock.clock(), near_store::db::TestDB::new(), chain.make_config(rng), @@ -132,7 +150,7 @@ async fn direct_connections() { let mut pms = vec![]; for _ in 0..5 { pms.push( - peer_manager::testonly::start( + start_pm( clock.clock(), near_store::db::TestDB::new(), chain.make_config(rng), @@ -159,7 +177,7 @@ async fn direct_connections() { tracing::info!(target:"test", "Establish connections."); establish_connections(&clock.clock(), &pms[..]).await; tracing::info!(target:"test", "Test clique."); - test_clique(rng, &pms[..]).await; + test_clique(rng, &clock.clock(), &pms[..]).await; } /// Test which spawns N validators, each with 1 proxy. @@ -178,7 +196,7 @@ async fn proxy_connections() { let mut proxies = vec![]; for _ in 0..N { proxies.push( - peer_manager::testonly::start( + start_pm( clock.clock(), near_store::db::TestDB::new(), chain.make_config(rng), @@ -197,20 +215,13 @@ async fn proxy_connections() { peer_id: proxies[i].cfg.node_id(), addr: proxies[i].cfg.node_addr.unwrap(), }]); - validators.push( - peer_manager::testonly::start( - clock.clock(), - near_store::db::TestDB::new(), - cfg, - chain.clone(), - ) - .await, - ); + validators + .push(start_pm(clock.clock(), near_store::db::TestDB::new(), cfg, chain.clone()).await); } let validators: Vec<_> = validators.iter().collect(); // Connect validators and proxies in a star topology. Any connected graph would do. - let hub = peer_manager::testonly::start( + let hub = start_pm( clock.clock(), near_store::db::TestDB::new(), chain.make_config(rng), @@ -237,7 +248,7 @@ async fn proxy_connections() { pm.set_chain_info(chain_info.clone()).await; } establish_connections(&clock.clock(), &all[..]).await; - test_clique(rng, &validators[..]).await; + test_clique(rng, &clock.clock(), &validators[..]).await; } #[tokio::test] @@ -262,7 +273,7 @@ async fn account_keys_change() { pm.set_chain_info(chain_info.clone()).await; } establish_connections(&clock.clock(), &[&v0, &v1, &v2, &hub]).await; - test_clique(rng, &[&v0, &v1]).await; + test_clique(rng, &clock.clock(), &[&v0, &v1]).await; // TIER1 nodes in 2nd epoch are {v0,v2}. let chain_info = peer_manager::testonly::make_chain_info(&chain, &[&v0.cfg, &v2.cfg]); @@ -270,7 +281,7 @@ async fn account_keys_change() { pm.set_chain_info(chain_info.clone()).await; } establish_connections(&clock.clock(), &[&v0, &v1, &v2, &hub]).await; - test_clique(rng, &[&v0, &v2]).await; + test_clique(rng, &clock.clock(), &[&v0, &v2]).await; drop(v0); drop(v1); @@ -312,8 +323,6 @@ async fn proxy_change() { hub.connect_to(&p1.peer_info(), tcp::Tier::T2).await; hub.connect_to(&v0.peer_info(), tcp::Tier::T2).await; hub.connect_to(&v1.peer_info(), tcp::Tier::T2).await; - tracing::info!(target:"dupa","p0 = {}",p0cfg.node_id()); - tracing::info!(target:"dupa","hub = {}",hub.cfg.node_id()); tracing::info!(target:"test", "p0 goes down"); drop(p0); @@ -325,7 +334,7 @@ async fn proxy_change() { tracing::info!(target:"test", "TIER1 connections get established: v0 -> p1 <- v1."); establish_connections(&clock.clock(), &[&v0, &v1, &p1, &hub]).await; tracing::info!(target:"test", "Send message v1 -> v0 over TIER1."); - send_tier1_message(rng, &v1, &v0).await; + send_and_recv_tier1_message(rng, &clock.clock(), &v1, &v0, tcp::Tier::T1).await; // Advance time, so that the new AccountsData has newer timestamp. clock.advance(time::Duration::hours(1)); @@ -339,10 +348,42 @@ async fn proxy_change() { tracing::info!(target:"test", "TIER1 connections get established: v0 -> p0 <- v1."); establish_connections(&clock.clock(), &[&v0, &v1, &p0, &hub]).await; tracing::info!(target:"test", "Send message v1 -> v0 over TIER1."); - send_tier1_message(rng, &v1, &v0).await; + send_and_recv_tier1_message(rng, &clock.clock(), &v1, &v0, tcp::Tier::T1).await; drop(hub); drop(v0); drop(v1); drop(p0); } + +#[tokio::test] +async fn tier2_routing_using_accounts_data() { + init_test_logger(); + let mut rng = make_rng(921853233); + let rng = &mut rng; + let mut clock = time::FakeClock::default(); + let chain = Arc::new(data::Chain::make(&mut clock, rng, 10)); + + tracing::info!(target:"test", "start 2 nodes and connect them"); + let pm0 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; + let pm1 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; + + tracing::info!(target:"test", "Try to send a routed message pm0 -> pm1 over TIER2"); + // It should fail due to missing routing information: neither AccountData or AnnounceAccount is + // broadcasted by default in tests. + // TODO(gprusak): send_tier1_message sends an Approval message, which is not a valid message to + // be sent from a non-TIER1 node. Make it more realistic by sending a Transaction message. + assert!(send_tier1_message(rng, &clock.clock(), &pm0, &pm1).await.is_none()); + + tracing::info!(target:"test", "propagate AccountsData"); + let chain_info = peer_manager::testonly::make_chain_info(&chain, &[&pm1.cfg]); + for pm in [&pm0, &pm1] { + pm.set_chain_info(chain_info.clone()).await; + } + let data: HashSet<_> = pm1.tier1_advertise_proxies(&clock.clock()).await.into_iter().collect(); + pm0.wait_for_accounts_data(&data).await; + + tracing::info!(target:"test", "Send a routed message pm0 -> pm1 over TIER2."); + send_and_recv_tier1_message(rng, &clock.clock(), &pm0, &pm1, tcp::Tier::T2).await; +} diff --git a/chain/network/src/stats/metrics.rs b/chain/network/src/stats/metrics.rs index 06c5cb20a7f..49ceed27e0c 100644 --- a/chain/network/src/stats/metrics.rs +++ b/chain/network/src/stats/metrics.rs @@ -358,6 +358,19 @@ pub(crate) static ALREADY_CONNECTED_ACCOUNT: Lazy = Lazy::new(|| { .unwrap() }); +pub(crate) static ACCOUNT_TO_PEER_LOOKUPS: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_account_to_peer_lookups", + "number of lookups of peer_id by account_id (for routed messages)", + // Source is either "AnnounceAccount" or "AccountData". + // We want to deprecate AnnounceAccount, so eventually we want all + // lookups to be done via AccountData. For now AnnounceAccount is + // used as a fallback. + &["source"], + ) + .unwrap() +}); + /// Updated the prometheus metrics about the received routed message `msg`. /// `tier` indicates the network over which the message was transmitted. /// `fastest` indicates whether this message is the first copy of `msg` received -