Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Substrate companion: Authority discovery multiple peer ids #4295

Merged
merged 8 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
326 changes: 163 additions & 163 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
};

let maybe_authority =
authority_discovery_service.get_authority_id_by_peer_id(peer).await;
authority_discovery_service.get_authority_ids_by_peer_id(peer).await;

match peer_set {
PeerSet::Validation => {
Expand Down
6 changes: 3 additions & 3 deletions node/network/bridge/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,14 @@ impl validator_discovery::AuthorityDiscovery for TestAuthorityDiscovery {
async fn get_addresses_by_authority_id(
&mut self,
_authority: AuthorityDiscoveryId,
) -> Option<Vec<Multiaddr>> {
) -> Option<HashSet<Multiaddr>> {
None
}

async fn get_authority_id_by_peer_id(
async fn get_authority_ids_by_peer_id(
&mut self,
_peer_id: PeerId,
) -> Option<AuthorityDiscoveryId> {
) -> Option<HashSet<AuthorityDiscoveryId>> {
None
}
}
Expand Down
34 changes: 21 additions & 13 deletions node/network/bridge/src/validator_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ mod tests {
use polkadot_node_network_protocol::{request_response::outgoing::Requests, PeerId};
use sc_network::{Event as NetworkEvent, IfDisconnected};
use sp_keyring::Sr25519Keyring;
use std::{borrow::Cow, collections::HashMap};
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
};

fn new_service() -> Service<TestNetwork, TestAuthorityDiscovery> {
Service::new()
Expand All @@ -176,8 +179,8 @@ mod tests {

#[derive(Default, Clone, Debug)]
struct TestAuthorityDiscovery {
by_authority_id: HashMap<AuthorityDiscoveryId, Multiaddr>,
by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
by_authority_id: HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>,
by_peer_id: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
}

impl TestAuthorityDiscovery {
Expand All @@ -187,12 +190,15 @@ mod tests {
let multiaddr = known_multiaddr().into_iter().zip(peer_ids.iter().cloned()).map(
|(mut addr, peer_id)| {
addr.push(multiaddr::Protocol::P2p(peer_id.into()));
addr
HashSet::from([addr])
},
);
Self {
by_authority_id: authorities.iter().cloned().zip(multiaddr).collect(),
by_peer_id: peer_ids.into_iter().zip(authorities.into_iter()).collect(),
by_peer_id: peer_ids
.into_iter()
.zip(authorities.into_iter().map(|a| HashSet::from([a])))
.collect(),
}
}
}
Expand Down Expand Up @@ -246,14 +252,14 @@ mod tests {
async fn get_addresses_by_authority_id(
&mut self,
authority: AuthorityDiscoveryId,
) -> Option<Vec<Multiaddr>> {
self.by_authority_id.get(&authority).cloned().map(|addr| vec![addr])
) -> Option<HashSet<Multiaddr>> {
self.by_authority_id.get(&authority).cloned()
}

async fn get_authority_id_by_peer_id(
async fn get_authority_ids_by_peer_id(
&mut self,
peer_id: PeerId,
) -> Option<AuthorityDiscoveryId> {
) -> Option<HashSet<AuthorityDiscoveryId>> {
self.by_peer_id.get(&peer_id).cloned()
}
}
Expand Down Expand Up @@ -283,7 +289,8 @@ mod tests {

let (ns, ads) = new_network();

let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect();
let authority_ids: Vec<_> =
ads.by_peer_id.values().map(|v| v.iter()).flatten().cloned().collect();

futures::executor::block_on(async move {
let (failed, _) = oneshot::channel();
Expand All @@ -299,7 +306,7 @@ mod tests {
let state = &service.state[PeerSet::Validation];
assert_eq!(state.previously_requested.len(), 1);
let peer_1 = extract_peer_ids(
vec![ads.by_authority_id.get(&authority_ids[1]).unwrap().clone()].into_iter(),
ads.by_authority_id.get(&authority_ids[1]).unwrap().clone().into_iter(),
)
.iter()
.cloned()
Expand All @@ -315,7 +322,8 @@ mod tests {

let (ns, ads) = new_network();

let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect();
let authority_ids: Vec<_> =
ads.by_peer_id.values().map(|v| v.iter()).flatten().cloned().collect();

futures::executor::block_on(async move {
let (failed, failed_rx) = oneshot::channel();
Expand All @@ -333,7 +341,7 @@ mod tests {
let state = &service.state[PeerSet::Validation];
assert_eq!(state.previously_requested.len(), 1);
let peer_0 = extract_peer_ids(
vec![ads.by_authority_id.get(&authority_ids[0]).unwrap().clone()].into_iter(),
ads.by_authority_id.get(&authority_ids[0]).unwrap().clone().into_iter(),
)
.iter()
.cloned()
Expand Down
22 changes: 12 additions & 10 deletions node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,25 @@ impl ValidatorGroup {
/// Returns `true` if we should advertise our collation to the given peer.
fn should_advertise_to(
&self,
peer_ids: &HashMap<PeerId, AuthorityDiscoveryId>,
peer_ids: &HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
peer: &PeerId,
) -> bool {
match peer_ids.get(peer) {
Some(discovery_id) => !self.advertised_to.contains(discovery_id),
Some(discovery_ids) => !discovery_ids.iter().any(|d| self.advertised_to.contains(d)),
None => false,
}
}

/// Should be called after we advertised our collation to the given `peer` to keep track of it.
fn advertised_to_peer(
&mut self,
peer_ids: &HashMap<PeerId, AuthorityDiscoveryId>,
peer_ids: &HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
peer: &PeerId,
) {
if let Some(validator_id) = peer_ids.get(peer) {
self.advertised_to.insert(validator_id.clone());
if let Some(authority_ids) = peer_ids.get(peer) {
authority_ids.iter().for_each(|a| {
self.advertised_to.insert(a.clone());
});
}
}
}
Expand Down Expand Up @@ -274,9 +276,9 @@ struct State {
/// Our validator groups per active leaf.
our_validators_groups: HashMap<Hash, ValidatorGroup>,

/// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s
/// The mapping from [`PeerId`] to [`HashSet<AuthorityDiscoveryId>`]. This is filled over time as we learn the [`PeerId`]'s
/// by `PeerConnected` events.
peer_ids: HashMap<PeerId, AuthorityDiscoveryId>,
peer_ids: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,

/// Metrics.
metrics: Metrics,
Expand Down Expand Up @@ -907,14 +909,14 @@ where
// If it is possible that a disconnected validator would attempt a reconnect
// it should be handled here.
tracing::trace!(target: LOG_TARGET, ?peer_id, ?observed_role, "Peer connected");
if let Some(authority) = maybe_authority {
if let Some(authority_ids) = maybe_authority {
tracing::trace!(
target: LOG_TARGET,
?authority,
?authority_ids,
?peer_id,
"Connected to requested validator"
);
state.peer_ids.insert(peer_id, authority);
state.peer_ids.insert(peer_id, authority_ids);

declare(ctx, state, peer_id).await;
}
Expand Down
4 changes: 2 additions & 2 deletions node/network/collator-protocol/src/collator_side/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use super::*;

use std::{sync::Arc, time::Duration};
use std::{collections::HashSet, sync::Arc, time::Duration};

use assert_matches::assert_matches;
use futures::{executor, future, Future, SinkExt};
Expand Down Expand Up @@ -405,7 +405,7 @@ async fn connect_peer(
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerConnected(
peer.clone(),
polkadot_node_network_protocol::ObservedRole::Authority,
authority_id,
authority_id.map(|v| HashSet::from([v])),
)),
)
.await;
Expand Down
2 changes: 1 addition & 1 deletion node/network/dispute-distribution/src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ where
let peer = incoming.peer;

// Only accept messages from validators:
if self.authority_discovery.get_authority_id_by_peer_id(peer).await.is_none() {
if self.authority_discovery.get_authority_ids_by_peer_id(peer).await.is_none() {
incoming
.send_outgoing_response(OutgoingResponse {
result: Err(()),
Expand Down
17 changes: 12 additions & 5 deletions node/network/dispute-distribution/src/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

//! Mock data and utility functions for unit tests in this subsystem.

use std::{collections::HashMap, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use async_trait::async_trait;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -171,19 +174,23 @@ impl AuthorityDiscovery for MockAuthorityDiscovery {
async fn get_addresses_by_authority_id(
&mut self,
_authority: polkadot_primitives::v1::AuthorityDiscoveryId,
) -> Option<Vec<sc_network::Multiaddr>> {
) -> Option<HashSet<sc_network::Multiaddr>> {
panic!("Not implemented");
}

async fn get_authority_id_by_peer_id(
async fn get_authority_ids_by_peer_id(
&mut self,
peer_id: polkadot_node_network_protocol::PeerId,
) -> Option<polkadot_primitives::v1::AuthorityDiscoveryId> {
) -> Option<HashSet<polkadot_primitives::v1::AuthorityDiscoveryId>> {
for (a, p) in self.peer_ids.iter() {
if p == &peer_id {
return Some(MOCK_VALIDATORS_DISCOVERY_KEYS.get(&a).unwrap().clone())
return Some(HashSet::from([MOCK_VALIDATORS_DISCOVERY_KEYS
.get(&a)
.unwrap()
.clone()]))
}
}

None
}
}
23 changes: 14 additions & 9 deletions node/network/gossip-support/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
//! the `NetworkBridgeMessage::NewGossipTopology` message.

use std::{
collections::HashMap,
collections::{HashMap, HashSet},
fmt,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -94,14 +94,14 @@ pub struct GossipSupport<AD> {
/// Successfully resolved connections
///
/// waiting for actual connection.
resolved_authorities: HashMap<AuthorityDiscoveryId, Vec<Multiaddr>>,
resolved_authorities: HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>,

/// Actually connected authorities.
connected_authorities: HashMap<AuthorityDiscoveryId, PeerId>,
/// By `PeerId`.
///
/// Needed for efficient handling of disconnect events.
connected_authorities_by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
connected_authorities_by_peer_id: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
/// Authority discovery service.
authority_discovery: AD,
}
Expand Down Expand Up @@ -299,14 +299,19 @@ where
fn handle_connect_disconnect(&mut self, ev: NetworkBridgeEvent<GossipSuppportNetworkMessage>) {
match ev {
NetworkBridgeEvent::PeerConnected(peer_id, _, o_authority) => {
if let Some(authority) = o_authority {
self.connected_authorities.insert(authority.clone(), peer_id);
self.connected_authorities_by_peer_id.insert(peer_id, authority);
if let Some(authority_ids) = o_authority {
authority_ids.iter().for_each(|a| {
self.connected_authorities.insert(a.clone(), peer_id);
});
self.connected_authorities_by_peer_id.insert(peer_id, authority_ids);
}
},
NetworkBridgeEvent::PeerDisconnected(peer_id) => {
if let Some(authority) = self.connected_authorities_by_peer_id.remove(&peer_id) {
self.connected_authorities.remove(&authority);
if let Some(authority_ids) = self.connected_authorities_by_peer_id.remove(&peer_id)
{
authority_ids.into_iter().for_each(|a| {
self.connected_authorities.remove(&a);
});
}
},
NetworkBridgeEvent::OurViewChange(_) => {},
Expand Down Expand Up @@ -474,7 +479,7 @@ struct PrettyAuthorities<I>(I);

impl<'a, I> fmt::Display for PrettyAuthorities<I>
where
I: Iterator<Item = (&'a AuthorityDiscoveryId, &'a Vec<Multiaddr>)> + Clone,
I: Iterator<Item = (&'a AuthorityDiscoveryId, &'a HashSet<Multiaddr>)> + Clone,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut authorities = self.0.clone().peekable();
Expand Down
Loading