Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Substrate companion: Authority discovery multiple peer ids (#4295)
Browse files Browse the repository at this point in the history
* Substrate companion: Authority discovery multiple peer ids

Authority discovery before had a fixed mapping from `PeerId` to
`AuthorityId`. This wasn't correct, as a `PeerId` can actually map to
multiple `AuthorityId`s. The linked Substrate pr fixes this.

paritytech/substrate#10259

* Update node/network/availability-distribution/src/requester/mod.rs

* Update node/network/collator-protocol/src/validator_side/mod.rs

* Update node/network/statement-distribution/src/tests.rs

* Update guide

* Adapt to Substrate pr

* Update Substrate
  • Loading branch information
bkchr authored Nov 17, 2021
1 parent e0295ed commit 7ea6595
Show file tree
Hide file tree
Showing 17 changed files with 281 additions and 256 deletions.
326 changes: 163 additions & 163 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
};

let maybe_authority =
authority_discovery_service.get_authority_id_by_peer_id(peer).await;
authority_discovery_service.get_authority_ids_by_peer_id(peer).await;

match peer_set {
PeerSet::Validation => {
Expand Down
6 changes: 3 additions & 3 deletions node/network/bridge/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,14 @@ impl validator_discovery::AuthorityDiscovery for TestAuthorityDiscovery {
async fn get_addresses_by_authority_id(
&mut self,
_authority: AuthorityDiscoveryId,
) -> Option<Vec<Multiaddr>> {
) -> Option<HashSet<Multiaddr>> {
None
}

async fn get_authority_id_by_peer_id(
async fn get_authority_ids_by_peer_id(
&mut self,
_peer_id: PeerId,
) -> Option<AuthorityDiscoveryId> {
) -> Option<HashSet<AuthorityDiscoveryId>> {
None
}
}
Expand Down
34 changes: 21 additions & 13 deletions node/network/bridge/src/validator_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ mod tests {
use polkadot_node_network_protocol::{request_response::outgoing::Requests, PeerId};
use sc_network::{Event as NetworkEvent, IfDisconnected};
use sp_keyring::Sr25519Keyring;
use std::{borrow::Cow, collections::HashMap};
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
};

fn new_service() -> Service<TestNetwork, TestAuthorityDiscovery> {
Service::new()
Expand All @@ -176,8 +179,8 @@ mod tests {

#[derive(Default, Clone, Debug)]
struct TestAuthorityDiscovery {
by_authority_id: HashMap<AuthorityDiscoveryId, Multiaddr>,
by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
by_authority_id: HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>,
by_peer_id: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
}

impl TestAuthorityDiscovery {
Expand All @@ -187,12 +190,15 @@ mod tests {
let multiaddr = known_multiaddr().into_iter().zip(peer_ids.iter().cloned()).map(
|(mut addr, peer_id)| {
addr.push(multiaddr::Protocol::P2p(peer_id.into()));
addr
HashSet::from([addr])
},
);
Self {
by_authority_id: authorities.iter().cloned().zip(multiaddr).collect(),
by_peer_id: peer_ids.into_iter().zip(authorities.into_iter()).collect(),
by_peer_id: peer_ids
.into_iter()
.zip(authorities.into_iter().map(|a| HashSet::from([a])))
.collect(),
}
}
}
Expand Down Expand Up @@ -246,14 +252,14 @@ mod tests {
async fn get_addresses_by_authority_id(
&mut self,
authority: AuthorityDiscoveryId,
) -> Option<Vec<Multiaddr>> {
self.by_authority_id.get(&authority).cloned().map(|addr| vec![addr])
) -> Option<HashSet<Multiaddr>> {
self.by_authority_id.get(&authority).cloned()
}

async fn get_authority_id_by_peer_id(
async fn get_authority_ids_by_peer_id(
&mut self,
peer_id: PeerId,
) -> Option<AuthorityDiscoveryId> {
) -> Option<HashSet<AuthorityDiscoveryId>> {
self.by_peer_id.get(&peer_id).cloned()
}
}
Expand Down Expand Up @@ -283,7 +289,8 @@ mod tests {

let (ns, ads) = new_network();

let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect();
let authority_ids: Vec<_> =
ads.by_peer_id.values().map(|v| v.iter()).flatten().cloned().collect();

futures::executor::block_on(async move {
let (failed, _) = oneshot::channel();
Expand All @@ -299,7 +306,7 @@ mod tests {
let state = &service.state[PeerSet::Validation];
assert_eq!(state.previously_requested.len(), 1);
let peer_1 = extract_peer_ids(
vec![ads.by_authority_id.get(&authority_ids[1]).unwrap().clone()].into_iter(),
ads.by_authority_id.get(&authority_ids[1]).unwrap().clone().into_iter(),
)
.iter()
.cloned()
Expand All @@ -315,7 +322,8 @@ mod tests {

let (ns, ads) = new_network();

let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect();
let authority_ids: Vec<_> =
ads.by_peer_id.values().map(|v| v.iter()).flatten().cloned().collect();

futures::executor::block_on(async move {
let (failed, failed_rx) = oneshot::channel();
Expand All @@ -333,7 +341,7 @@ mod tests {
let state = &service.state[PeerSet::Validation];
assert_eq!(state.previously_requested.len(), 1);
let peer_0 = extract_peer_ids(
vec![ads.by_authority_id.get(&authority_ids[0]).unwrap().clone()].into_iter(),
ads.by_authority_id.get(&authority_ids[0]).unwrap().clone().into_iter(),
)
.iter()
.cloned()
Expand Down
22 changes: 12 additions & 10 deletions node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,25 @@ impl ValidatorGroup {
/// Returns `true` if we should advertise our collation to the given peer.
fn should_advertise_to(
&self,
peer_ids: &HashMap<PeerId, AuthorityDiscoveryId>,
peer_ids: &HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
peer: &PeerId,
) -> bool {
match peer_ids.get(peer) {
Some(discovery_id) => !self.advertised_to.contains(discovery_id),
Some(discovery_ids) => !discovery_ids.iter().any(|d| self.advertised_to.contains(d)),
None => false,
}
}

/// Should be called after we advertised our collation to the given `peer` to keep track of it.
fn advertised_to_peer(
&mut self,
peer_ids: &HashMap<PeerId, AuthorityDiscoveryId>,
peer_ids: &HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
peer: &PeerId,
) {
if let Some(validator_id) = peer_ids.get(peer) {
self.advertised_to.insert(validator_id.clone());
if let Some(authority_ids) = peer_ids.get(peer) {
authority_ids.iter().for_each(|a| {
self.advertised_to.insert(a.clone());
});
}
}
}
Expand Down Expand Up @@ -274,9 +276,9 @@ struct State {
/// Our validator groups per active leaf.
our_validators_groups: HashMap<Hash, ValidatorGroup>,

/// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s
/// The mapping from [`PeerId`] to [`HashSet<AuthorityDiscoveryId>`]. This is filled over time as we learn the [`PeerId`]'s
/// by `PeerConnected` events.
peer_ids: HashMap<PeerId, AuthorityDiscoveryId>,
peer_ids: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,

/// Metrics.
metrics: Metrics,
Expand Down Expand Up @@ -907,14 +909,14 @@ where
// If it is possible that a disconnected validator would attempt a reconnect
// it should be handled here.
tracing::trace!(target: LOG_TARGET, ?peer_id, ?observed_role, "Peer connected");
if let Some(authority) = maybe_authority {
if let Some(authority_ids) = maybe_authority {
tracing::trace!(
target: LOG_TARGET,
?authority,
?authority_ids,
?peer_id,
"Connected to requested validator"
);
state.peer_ids.insert(peer_id, authority);
state.peer_ids.insert(peer_id, authority_ids);

declare(ctx, state, peer_id).await;
}
Expand Down
4 changes: 2 additions & 2 deletions node/network/collator-protocol/src/collator_side/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use super::*;

use std::{sync::Arc, time::Duration};
use std::{collections::HashSet, sync::Arc, time::Duration};

use assert_matches::assert_matches;
use futures::{executor, future, Future, SinkExt};
Expand Down Expand Up @@ -405,7 +405,7 @@ async fn connect_peer(
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerConnected(
peer.clone(),
polkadot_node_network_protocol::ObservedRole::Authority,
authority_id,
authority_id.map(|v| HashSet::from([v])),
)),
)
.await;
Expand Down
2 changes: 1 addition & 1 deletion node/network/dispute-distribution/src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ where
let peer = incoming.peer;

// Only accept messages from validators:
if self.authority_discovery.get_authority_id_by_peer_id(peer).await.is_none() {
if self.authority_discovery.get_authority_ids_by_peer_id(peer).await.is_none() {
incoming
.send_outgoing_response(OutgoingResponse {
result: Err(()),
Expand Down
17 changes: 12 additions & 5 deletions node/network/dispute-distribution/src/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

//! Mock data and utility functions for unit tests in this subsystem.

use std::{collections::HashMap, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use async_trait::async_trait;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -171,19 +174,23 @@ impl AuthorityDiscovery for MockAuthorityDiscovery {
async fn get_addresses_by_authority_id(
&mut self,
_authority: polkadot_primitives::v1::AuthorityDiscoveryId,
) -> Option<Vec<sc_network::Multiaddr>> {
) -> Option<HashSet<sc_network::Multiaddr>> {
panic!("Not implemented");
}

async fn get_authority_id_by_peer_id(
async fn get_authority_ids_by_peer_id(
&mut self,
peer_id: polkadot_node_network_protocol::PeerId,
) -> Option<polkadot_primitives::v1::AuthorityDiscoveryId> {
) -> Option<HashSet<polkadot_primitives::v1::AuthorityDiscoveryId>> {
for (a, p) in self.peer_ids.iter() {
if p == &peer_id {
return Some(MOCK_VALIDATORS_DISCOVERY_KEYS.get(&a).unwrap().clone())
return Some(HashSet::from([MOCK_VALIDATORS_DISCOVERY_KEYS
.get(&a)
.unwrap()
.clone()]))
}
}

None
}
}
23 changes: 14 additions & 9 deletions node/network/gossip-support/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
//! the `NetworkBridgeMessage::NewGossipTopology` message.

use std::{
collections::HashMap,
collections::{HashMap, HashSet},
fmt,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -94,14 +94,14 @@ pub struct GossipSupport<AD> {
/// Successfully resolved connections
///
/// waiting for actual connection.
resolved_authorities: HashMap<AuthorityDiscoveryId, Vec<Multiaddr>>,
resolved_authorities: HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>,

/// Actually connected authorities.
connected_authorities: HashMap<AuthorityDiscoveryId, PeerId>,
/// By `PeerId`.
///
/// Needed for efficient handling of disconnect events.
connected_authorities_by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
connected_authorities_by_peer_id: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
/// Authority discovery service.
authority_discovery: AD,
}
Expand Down Expand Up @@ -299,14 +299,19 @@ where
fn handle_connect_disconnect(&mut self, ev: NetworkBridgeEvent<GossipSuppportNetworkMessage>) {
match ev {
NetworkBridgeEvent::PeerConnected(peer_id, _, o_authority) => {
if let Some(authority) = o_authority {
self.connected_authorities.insert(authority.clone(), peer_id);
self.connected_authorities_by_peer_id.insert(peer_id, authority);
if let Some(authority_ids) = o_authority {
authority_ids.iter().for_each(|a| {
self.connected_authorities.insert(a.clone(), peer_id);
});
self.connected_authorities_by_peer_id.insert(peer_id, authority_ids);
}
},
NetworkBridgeEvent::PeerDisconnected(peer_id) => {
if let Some(authority) = self.connected_authorities_by_peer_id.remove(&peer_id) {
self.connected_authorities.remove(&authority);
if let Some(authority_ids) = self.connected_authorities_by_peer_id.remove(&peer_id)
{
authority_ids.into_iter().for_each(|a| {
self.connected_authorities.remove(&a);
});
}
},
NetworkBridgeEvent::OurViewChange(_) => {},
Expand Down Expand Up @@ -474,7 +479,7 @@ struct PrettyAuthorities<I>(I);

impl<'a, I> fmt::Display for PrettyAuthorities<I>
where
I: Iterator<Item = (&'a AuthorityDiscoveryId, &'a Vec<Multiaddr>)> + Clone,
I: Iterator<Item = (&'a AuthorityDiscoveryId, &'a HashSet<Multiaddr>)> + Clone,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut authorities = self.0.clone().peekable();
Expand Down
Loading

0 comments on commit 7ea6595

Please sign in to comment.