Skip to content

Commit

Permalink
Rollforward "deprecating AnnounceAccount (near#8182)" (near#8440)
Browse files Browse the repository at this point in the history
This is a roll forward of near#8182 ,
which was reverted by near#8314 .

The issue that near#8182 caused has been fixed in near#8407 .
nayduck run: https://nayduck.near.org/#/run/2834
before the fix tests/sanity/switch_node_key.py was failing, now it succeeds.
No nayduck tests are newly failing with this PR, compared to master.
  • Loading branch information
pompon0 authored and marcelo-gonzalez committed Jan 26, 2023
1 parent 220bec3 commit f109153
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 57 deletions.
43 changes: 29 additions & 14 deletions chain/network/src/peer_manager/network_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,11 @@ impl NetworkState {
msg: RoutedMessageBody,
) -> bool {
let mut success = false;
let accounts_data = self.accounts_data.load();
// All TIER1 messages are being sent over both TIER1 and TIER2 connections for now,
// so that we can actually observe the latency/reliability improvements in practice:
// for each message we track over which network tier it arrived faster?
if tcp::Tier::T1.is_allowed_routed(&msg) {
let accounts_data = self.accounts_data.load();
for key in accounts_data.keys_by_id.get(account_id).iter().flat_map(|keys| keys.iter())
{
let data = match accounts_data.data.get(key) {
Expand All @@ -503,19 +503,34 @@ impl NetworkState {
}
}

let target = match self.graph.routing_table.account_owner(account_id) {
Some(peer_id) => peer_id,
None => {
// TODO(MarX, #1369): Message is dropped here. Define policy for this case.
metrics::MessageDropped::UnknownAccount.inc(&msg);
tracing::debug!(target: "network",
account_id = ?self.config.validator.as_ref().map(|v|v.account_id()),
to = ?account_id,
?msg,"Drop message: unknown account",
);
tracing::trace!(target: "network", known_peers = ?self.graph.routing_table.get_accounts_keys(), "Known peers");
return false;
}
let peer_id_from_account_data = accounts_data
.keys_by_id
.get(account_id)
.iter()
.flat_map(|keys| keys.iter())
.flat_map(|key| accounts_data.data.get(key))
.next()
.map(|data| data.peer_id.clone());
// Find the target peer_id:
// - first look it up in self.accounts_data
// - if missing, fall back to lookup in self.graph.routing_table
// We want to deprecate self.graph.routing_table.account_owner in the next release.
let target = if let Some(peer_id) = peer_id_from_account_data {
metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AccountData"]).inc();
peer_id
} else if let Some(peer_id) = self.graph.routing_table.account_owner(account_id) {
metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AnnounceAccount"]).inc();
peer_id
} else {
// TODO(MarX, #1369): Message is dropped here. Define policy for this case.
metrics::MessageDropped::UnknownAccount.inc(&msg);
tracing::debug!(target: "network",
account_id = ?self.config.validator.as_ref().map(|v|v.account_id()),
to = ?account_id,
?msg,"Drop message: unknown account",
);
tracing::trace!(target: "network", known_peers = ?self.graph.routing_table.get_accounts_keys(), "Known peers");
return false;
};

let msg = RawRoutedMessage { target: PeerIdOrHash::PeerId(target), body: msg };
Expand Down
6 changes: 3 additions & 3 deletions chain/network/src/peer_manager/tests/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ async fn max_num_peers_limit() {
drop(pm3);
}

// test that TTL is handled property.
/// Test that TTL is handled properly.
#[tokio::test]
async fn ttl() {
init_test_logger();
Expand Down Expand Up @@ -928,8 +928,8 @@ async fn ttl() {
}
}

// After the initial exchange, all subsequent SyncRoutingTable messages are
// expected to contain only the diff of the known data.
/// After the initial exchange, all subsequent SyncRoutingTable messages are
/// expected to contain only the diff of the known data.
#[tokio::test]
async fn repeated_data_in_sync_routing_table() {
init_test_logger();
Expand Down
121 changes: 81 additions & 40 deletions chain/network/src/peer_manager/tests/tier1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ use crate::peer_manager::testonly::Event;
use crate::tcp;
use crate::testonly::{make_rng, Rng};
use crate::time;
use crate::types::{NetworkRequests, NetworkResponses, PeerManagerMessageRequest};
use near_o11y::testonly::init_test_logger;
use near_o11y::WithSpanContextExt;
use near_primitives::block_header::{Approval, ApprovalInner, ApprovalMessage};
use near_primitives::block_header::{Approval, ApprovalInner};
use near_primitives::validator_signer::ValidatorSigner;
use near_store::db::TestDB;
use rand::Rng as _;
Expand Down Expand Up @@ -48,46 +46,66 @@ async fn establish_connections(clock: &time::Clock, pms: &[&peer_manager::teston
}
}

// Sends a routed TIER1 message from `from` to `to`.
// Returns the message body that was sent, or None if the routing information was missing.
async fn send_tier1_message(
rng: &mut Rng,
clock: &time::Clock,
from: &peer_manager::testonly::ActorHandler,
to: &peer_manager::testonly::ActorHandler,
) {
) -> Option<RoutedMessageBody> {
let from_signer = from.cfg.validator.as_ref().unwrap().signer.clone();
let to_signer = to.cfg.validator.as_ref().unwrap().signer.clone();
let target = to_signer.validator_id().clone();
let want = make_block_approval(rng, from_signer.as_ref());
let req = NetworkRequests::Approval {
approval_message: ApprovalMessage { approval: want.clone(), target },
};
let want = RoutedMessageBody::BlockApproval(make_block_approval(rng, from_signer.as_ref()));
let clock = clock.clone();
from.with_state(move |s| async move {
if s.send_message_to_account(&clock, &target, want.clone()) {
Some(want)
} else {
None
}
})
.await
}

// Sends a routed TIER1 message from `from` to `to`, then waits until `to` receives it.
// `recv_tier` specifies over which network the message is expected to be actually delivered.
async fn send_and_recv_tier1_message(
rng: &mut Rng,
clock: &time::Clock,
from: &peer_manager::testonly::ActorHandler,
to: &peer_manager::testonly::ActorHandler,
recv_tier: tcp::Tier,
) {
let mut events = to.events.from_now();
let resp = from
.actix
.addr
.send(PeerManagerMessageRequest::NetworkRequests(req).with_span_context())
.await
.unwrap();
assert_eq!(NetworkResponses::NoResponse, resp.as_network_response());
let want = send_tier1_message(rng, clock, from, to).await.expect("routing info not available");
let got = events
.recv_until(|ev| match ev {
Event::PeerManager(PME::MessageProcessed(tcp::Tier::T1, PeerMessage::Routed(got))) => {
Event::PeerManager(PME::MessageProcessed(tier, PeerMessage::Routed(got)))
if tier == recv_tier =>
{
Some(got)
}
_ => None,
})
.await;
assert_eq!(from.cfg.node_id(), got.author);
assert_eq!(RoutedMessageBody::BlockApproval(want), got.body);
assert_eq!(want, got.body);
}

/// Send a message over each connection.
async fn test_clique(rng: &mut Rng, pms: &[&peer_manager::testonly::ActorHandler]) {
async fn test_clique(
rng: &mut Rng,
clock: &time::Clock,
pms: &[&peer_manager::testonly::ActorHandler],
) {
for from in pms {
for to in pms {
if from.cfg.node_id() == to.cfg.node_id() {
continue;
}
send_tier1_message(rng, from, to).await;
send_and_recv_tier1_message(rng, clock, from, to, tcp::Tier::T1).await;
}
}
}
Expand All @@ -101,7 +119,7 @@ async fn first_proxy_advertisement() {
let rng = &mut rng;
let mut clock = time::FakeClock::default();
let chain = Arc::new(data::Chain::make(&mut clock, rng, 10));
let pm = peer_manager::testonly::start(
let pm = start_pm(
clock.clock(),
near_store::db::TestDB::new(),
chain.make_config(rng),
Expand Down Expand Up @@ -132,7 +150,7 @@ async fn direct_connections() {
let mut pms = vec![];
for _ in 0..5 {
pms.push(
peer_manager::testonly::start(
start_pm(
clock.clock(),
near_store::db::TestDB::new(),
chain.make_config(rng),
Expand All @@ -159,7 +177,7 @@ async fn direct_connections() {
tracing::info!(target:"test", "Establish connections.");
establish_connections(&clock.clock(), &pms[..]).await;
tracing::info!(target:"test", "Test clique.");
test_clique(rng, &pms[..]).await;
test_clique(rng, &clock.clock(), &pms[..]).await;
}

/// Test which spawns N validators, each with 1 proxy.
Expand All @@ -178,7 +196,7 @@ async fn proxy_connections() {
let mut proxies = vec![];
for _ in 0..N {
proxies.push(
peer_manager::testonly::start(
start_pm(
clock.clock(),
near_store::db::TestDB::new(),
chain.make_config(rng),
Expand All @@ -197,20 +215,13 @@ async fn proxy_connections() {
peer_id: proxies[i].cfg.node_id(),
addr: proxies[i].cfg.node_addr.unwrap(),
}]);
validators.push(
peer_manager::testonly::start(
clock.clock(),
near_store::db::TestDB::new(),
cfg,
chain.clone(),
)
.await,
);
validators
.push(start_pm(clock.clock(), near_store::db::TestDB::new(), cfg, chain.clone()).await);
}
let validators: Vec<_> = validators.iter().collect();

// Connect validators and proxies in a star topology. Any connected graph would do.
let hub = peer_manager::testonly::start(
let hub = start_pm(
clock.clock(),
near_store::db::TestDB::new(),
chain.make_config(rng),
Expand All @@ -237,7 +248,7 @@ async fn proxy_connections() {
pm.set_chain_info(chain_info.clone()).await;
}
establish_connections(&clock.clock(), &all[..]).await;
test_clique(rng, &validators[..]).await;
test_clique(rng, &clock.clock(), &validators[..]).await;
}

#[tokio::test]
Expand All @@ -262,15 +273,15 @@ async fn account_keys_change() {
pm.set_chain_info(chain_info.clone()).await;
}
establish_connections(&clock.clock(), &[&v0, &v1, &v2, &hub]).await;
test_clique(rng, &[&v0, &v1]).await;
test_clique(rng, &clock.clock(), &[&v0, &v1]).await;

// TIER1 nodes in 2nd epoch are {v0,v2}.
let chain_info = peer_manager::testonly::make_chain_info(&chain, &[&v0.cfg, &v2.cfg]);
for pm in [&v0, &v1, &v2, &hub] {
pm.set_chain_info(chain_info.clone()).await;
}
establish_connections(&clock.clock(), &[&v0, &v1, &v2, &hub]).await;
test_clique(rng, &[&v0, &v2]).await;
test_clique(rng, &clock.clock(), &[&v0, &v2]).await;

drop(v0);
drop(v1);
Expand Down Expand Up @@ -312,8 +323,6 @@ async fn proxy_change() {
hub.connect_to(&p1.peer_info(), tcp::Tier::T2).await;
hub.connect_to(&v0.peer_info(), tcp::Tier::T2).await;
hub.connect_to(&v1.peer_info(), tcp::Tier::T2).await;
tracing::info!(target:"dupa","p0 = {}",p0cfg.node_id());
tracing::info!(target:"dupa","hub = {}",hub.cfg.node_id());

tracing::info!(target:"test", "p0 goes down");
drop(p0);
Expand All @@ -325,7 +334,7 @@ async fn proxy_change() {
tracing::info!(target:"test", "TIER1 connections get established: v0 -> p1 <- v1.");
establish_connections(&clock.clock(), &[&v0, &v1, &p1, &hub]).await;
tracing::info!(target:"test", "Send message v1 -> v0 over TIER1.");
send_tier1_message(rng, &v1, &v0).await;
send_and_recv_tier1_message(rng, &clock.clock(), &v1, &v0, tcp::Tier::T1).await;

// Advance time, so that the new AccountsData has newer timestamp.
clock.advance(time::Duration::hours(1));
Expand All @@ -339,10 +348,42 @@ async fn proxy_change() {
tracing::info!(target:"test", "TIER1 connections get established: v0 -> p0 <- v1.");
establish_connections(&clock.clock(), &[&v0, &v1, &p0, &hub]).await;
tracing::info!(target:"test", "Send message v1 -> v0 over TIER1.");
send_tier1_message(rng, &v1, &v0).await;
send_and_recv_tier1_message(rng, &clock.clock(), &v1, &v0, tcp::Tier::T1).await;

drop(hub);
drop(v0);
drop(v1);
drop(p0);
}

#[tokio::test]
async fn tier2_routing_using_accounts_data() {
init_test_logger();
let mut rng = make_rng(921853233);
let rng = &mut rng;
let mut clock = time::FakeClock::default();
let chain = Arc::new(data::Chain::make(&mut clock, rng, 10));

tracing::info!(target:"test", "start 2 nodes and connect them");
let pm0 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await;
let pm1 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await;
pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await;

tracing::info!(target:"test", "Try to send a routed message pm0 -> pm1 over TIER2");
// It should fail due to missing routing information: neither AccountData or AnnounceAccount is
// broadcasted by default in tests.
// TODO(gprusak): send_tier1_message sends an Approval message, which is not a valid message to
// be sent from a non-TIER1 node. Make it more realistic by sending a Transaction message.
assert!(send_tier1_message(rng, &clock.clock(), &pm0, &pm1).await.is_none());

tracing::info!(target:"test", "propagate AccountsData");
let chain_info = peer_manager::testonly::make_chain_info(&chain, &[&pm1.cfg]);
for pm in [&pm0, &pm1] {
pm.set_chain_info(chain_info.clone()).await;
}
let data: HashSet<_> = pm1.tier1_advertise_proxies(&clock.clock()).await.into_iter().collect();
pm0.wait_for_accounts_data(&data).await;

tracing::info!(target:"test", "Send a routed message pm0 -> pm1 over TIER2.");
send_and_recv_tier1_message(rng, &clock.clock(), &pm0, &pm1, tcp::Tier::T2).await;
}
13 changes: 13 additions & 0 deletions chain/network/src/stats/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,19 @@ pub(crate) static ALREADY_CONNECTED_ACCOUNT: Lazy<IntCounter> = Lazy::new(|| {
.unwrap()
});

pub(crate) static ACCOUNT_TO_PEER_LOOKUPS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_account_to_peer_lookups",
"number of lookups of peer_id by account_id (for routed messages)",
// Source is either "AnnounceAccount" or "AccountData".
// We want to deprecate AnnounceAccount, so eventually we want all
// lookups to be done via AccountData. For now AnnounceAccount is
// used as a fallback.
&["source"],
)
.unwrap()
});

/// Updated the prometheus metrics about the received routed message `msg`.
/// `tier` indicates the network over which the message was transmitted.
/// `fastest` indicates whether this message is the first copy of `msg` received -
Expand Down

0 comments on commit f109153

Please sign in to comment.