Skip to content

Commit

Permalink
authorithy-discovery: Make changing of peer-id while active a bit mor…
Browse files Browse the repository at this point in the history
…e robust (#3786)

In the case when nodes don't persist their node-key or they want to
generate a new one while being in the active set, things go wrong
because both the old addresses and the new ones will still be present in
DHT, so because of the distributed nature of the DHT both will survive
in the network untill the old ones expires which is 36 hours. Nodes in
the network will randomly resolve the authorithy-id to the old address
or the new one.

More details in: #3673

This PR proposes we mitigate this problem, by:

1. Let the query for a DHT key retrieve more than one results(4), that
is also bounded by the replication factor which is 20, currently we
interrupt the querry on the first result.
~2. Modify the authority-discovery service to keep all the discovered
addresses around for 24h since they last seen an address.~
~3. Plumb through other subsystems where the assumption was that an
authorithy-id will resolve only to one PeerId. Currently, the
authorithy-discovery keeps just the last record it received from DHT and
queries the DHT every 10 minutes. But they could always receive only the
old address, only the new address or a flip-flop between them depending
on what node wins the race to provide the record~

2. Extend the `SignedAuthorityRecord` with a signed creation_time.
3. Modify authority discovery to keep track of nodes that sent us old
record and once we are made aware of a new record update the nodes we
know about with the new record.
4. Update gossip-support to try resolve authorities more often than
every session.

~This would gives us a lot more chances for the nodes in the networks to
also discover not only the old address of the node but also the new one
and should improve the time it takes for a node to be properly connected
in the network. The behaviour won't be deterministic because there is no
guarantee the all nodes will see the new record at least once, since
they could query only nodes that have the old one.~


## TODO
- [x] Add unittests for the new paths.
- [x] Make sure the implementation is backwards compatible
- [x] Evaluate if there are any bad consequence of letting the query
continue rather than finish it at first record found.
- [x] Bake in versi the new changes.

---------

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
Co-authored-by: Dmitry Markin <dmitry@markin.tech>
Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 25, 2024
1 parent 3b9c909 commit 6720279
Show file tree
Hide file tree
Showing 22 changed files with 1,511 additions and 177 deletions.
15 changes: 15 additions & 0 deletions polkadot/node/network/bridge/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ pub trait Network: Clone + Send + 'static {
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String>;

/// Ask the network to extend the reserved set with these nodes.
async fn add_peers_to_reserved_set(
&mut self,
protocol: ProtocolName,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String>;

/// Removes the peers for the protocol's peer set (both reserved and non-reserved).
async fn remove_from_peers_set(
&mut self,
Expand Down Expand Up @@ -240,6 +247,14 @@ impl Network for Arc<dyn NetworkService> {
<dyn NetworkService>::set_reserved_peers(&**self, protocol, multiaddresses)
}

async fn add_peers_to_reserved_set(
&mut self,
protocol: ProtocolName,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String> {
<dyn NetworkService>::add_peers_to_reserved_set(&**self, protocol, multiaddresses)
}

async fn remove_from_peers_set(
&mut self,
protocol: ProtocolName,
Expand Down
8 changes: 8 additions & 0 deletions polkadot/node/network/bridge/src/rx/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ impl Network for TestNetwork {
Ok(())
}

async fn add_peers_to_reserved_set(
&mut self,
_protocol: ProtocolName,
_: HashSet<Multiaddr>,
) -> Result<(), String> {
Ok(())
}

async fn remove_from_peers_set(
&mut self,
_protocol: ProtocolName,
Expand Down
16 changes: 16 additions & 0 deletions polkadot/node/network/bridge/src/tx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,22 @@ where
.await;
return (network_service, authority_discovery_service)
},

NetworkBridgeTxMessage::AddToResolvedValidators { validator_addrs, peer_set } => {
gum::trace!(
target: LOG_TARGET,
action = "AddToResolvedValidators",
peer_set = ?peer_set,
?validator_addrs,
"Received a resolved validator connection request",
);

let all_addrs = validator_addrs.into_iter().flatten().collect();
let network_service = validator_discovery
.on_add_to_resolved_request(all_addrs, peer_set, network_service)
.await;
return (network_service, authority_discovery_service)
},
}
(network_service, authority_discovery_service)
}
Expand Down
8 changes: 8 additions & 0 deletions polkadot/node/network/bridge/src/tx/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ impl Network for TestNetwork {
Ok(())
}

async fn add_peers_to_reserved_set(
&mut self,
_protocol: ProtocolName,
_: HashSet<Multiaddr>,
) -> Result<(), String> {
Ok(())
}

async fn remove_from_peers_set(
&mut self,
_protocol: ProtocolName,
Expand Down
47 changes: 47 additions & 0 deletions polkadot/node/network/bridge/src/validator_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,44 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
network_service
}

/// Connect to already resolved addresses.
pub async fn on_add_to_resolved_request(
&mut self,
newly_requested: HashSet<Multiaddr>,
peer_set: PeerSet,
mut network_service: N,
) -> N {
let state = &mut self.state[peer_set];
let new_peer_ids: HashSet<PeerId> = extract_peer_ids(newly_requested.iter().cloned());
let num_peers = new_peer_ids.len();

state.previously_requested.extend(new_peer_ids);

gum::debug!(
target: LOG_TARGET,
?peer_set,
?num_peers,
"New add to resolved validators request",
);

// ask the network to connect to these nodes and not disconnect
// from them until they are removed from the set.
//
// for peer-set management, the main protocol name should be used regardless of
// the negotiated version.
if let Err(e) = network_service
.add_peers_to_reserved_set(
self.peerset_protocol_names.get_main_name(peer_set),
newly_requested,
)
.await
{
gum::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}

network_service
}

/// On a new connection request, a peer set update will be issued.
/// It will ask the network to connect to the validators and not disconnect
/// from them at least until the next request is issued for the same peer set.
Expand Down Expand Up @@ -222,6 +260,15 @@ mod tests {
Ok(())
}

async fn add_peers_to_reserved_set(
&mut self,
_protocol: ProtocolName,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String> {
self.peers_set.extend(extract_peer_ids(multiaddresses.into_iter()));
Ok(())
}

async fn remove_from_peers_set(
&mut self,
_protocol: ProtocolName,
Expand Down
127 changes: 112 additions & 15 deletions polkadot/node/network/gossip-support/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ const BACKOFF_DURATION: Duration = Duration::from_secs(5);
#[cfg(test)]
const BACKOFF_DURATION: Duration = Duration::from_millis(500);

// The authorithy_discovery queries runs every ten minutes,
// so it make sense to run a bit more often than that to
// detect changes as often as we can, but not too often since
// it won't help.
#[cfg(not(test))]
const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(5 * 60);

#[cfg(test)]
const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(2);

/// Duration after which we consider low connectivity a problem.
///
/// Especially at startup low connectivity is expected (authority discovery cache needs to be
Expand All @@ -91,6 +101,14 @@ pub struct GossipSupport<AD> {
// `None` otherwise.
last_failure: Option<Instant>,

// Validators can restart during a session, so if they change
// their PeerID, we will connect to them in the best case after
// a session, so we need to try more often to resolved peers and
// reconnect to them. The authorithy_discovery queries runs every ten
// minutes, so we can't detect changes in the address more often
// that that.
last_connection_request: Option<Instant>,

/// First time we did not reach our connectivity threshold.
///
/// This is the time of the first failed attempt to connect to >2/3 of all validators in a
Expand Down Expand Up @@ -131,6 +149,7 @@ where
keystore,
last_session_index: None,
last_failure: None,
last_connection_request: None,
failure_start: None,
resolved_authorities: HashMap::new(),
connected_authorities: HashMap::new(),
Expand Down Expand Up @@ -196,15 +215,22 @@ where
for leaf in leaves {
let current_index = util::request_session_index_for_child(leaf, sender).await.await??;
let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default();
let since_last_reconnect =
self.last_connection_request.map(|i| i.elapsed()).unwrap_or_default();

let force_request = since_failure >= BACKOFF_DURATION;
let re_resolve_authorities = since_last_reconnect >= TRY_RERESOLVE_AUTHORITIES;
let leaf_session = Some((current_index, leaf));
let maybe_new_session = match self.last_session_index {
Some(i) if current_index <= i => None,
_ => leaf_session,
};

let maybe_issue_connection =
if force_request { leaf_session } else { maybe_new_session };
let maybe_issue_connection = if force_request || re_resolve_authorities {
leaf_session
} else {
maybe_new_session
};

if let Some((session_index, relay_parent)) = maybe_issue_connection {
let session_info =
Expand Down Expand Up @@ -248,7 +274,7 @@ where
// connections to a much broader set of validators.
{
let mut connections = authorities_past_present_future(sender, leaf).await?;

self.last_connection_request = Some(Instant::now());
// Remove all of our locally controlled validator indices so we don't connect to
// ourself.
let connections =
Expand All @@ -259,7 +285,12 @@ where
// to clean up all connections.
Vec::new()
};
self.issue_connection_request(sender, connections).await;

if force_request || is_new_session {
self.issue_connection_request(sender, connections).await;
} else if re_resolve_authorities {
self.issue_connection_request_to_changed(sender, connections).await;
}
}

if is_new_session {
Expand Down Expand Up @@ -324,17 +355,14 @@ where
authority_check_result
}

async fn issue_connection_request<Sender>(
async fn resolve_authorities(
&mut self,
sender: &mut Sender,
authorities: Vec<AuthorityDiscoveryId>,
) where
Sender: overseer::GossipSupportSenderTrait,
{
let num = authorities.len();
) -> (Vec<HashSet<Multiaddr>>, HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>, usize) {
let mut validator_addrs = Vec::with_capacity(authorities.len());
let mut failures = 0;
let mut resolved = HashMap::with_capacity(authorities.len());
let mut failures = 0;

for authority in authorities {
if let Some(addrs) =
self.authority_discovery.get_addresses_by_authority_id(authority.clone()).await
Expand All @@ -350,6 +378,67 @@ where
);
}
}
(validator_addrs, resolved, failures)
}

async fn issue_connection_request_to_changed<Sender>(
&mut self,
sender: &mut Sender,
authorities: Vec<AuthorityDiscoveryId>,
) where
Sender: overseer::GossipSupportSenderTrait,
{
let (_, resolved, _) = self.resolve_authorities(authorities).await;

let mut changed = Vec::new();

for (authority, new_addresses) in &resolved {
let new_peer_ids = new_addresses
.iter()
.flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p))
.collect::<HashSet<_>>();
match self.resolved_authorities.get(authority) {
Some(old_addresses) => {
let old_peer_ids = old_addresses
.iter()
.flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p))
.collect::<HashSet<_>>();
if !old_peer_ids.is_superset(&new_peer_ids) {
changed.push(new_addresses.clone());
}
},
None => changed.push(new_addresses.clone()),
}
}
gum::debug!(
target: LOG_TARGET,
num_changed = ?changed.len(),
?changed,
"Issuing a connection request to changed validators"
);
if !changed.is_empty() {
self.resolved_authorities = resolved;

sender
.send_message(NetworkBridgeTxMessage::AddToResolvedValidators {
validator_addrs: changed,
peer_set: PeerSet::Validation,
})
.await;
}
}

async fn issue_connection_request<Sender>(
&mut self,
sender: &mut Sender,
authorities: Vec<AuthorityDiscoveryId>,
) where
Sender: overseer::GossipSupportSenderTrait,
{
let num = authorities.len();

let (validator_addrs, resolved, failures) = self.resolve_authorities(authorities).await;

self.resolved_authorities = resolved;
gum::debug!(target: LOG_TARGET, %num, "Issuing a connection request");

Expand Down Expand Up @@ -399,16 +488,24 @@ where
{
let mut authority_ids: HashMap<PeerId, HashSet<AuthorityDiscoveryId>> = HashMap::new();
for authority in authorities {
let peer_id = self
let peer_ids = self
.authority_discovery
.get_addresses_by_authority_id(authority.clone())
.await
.into_iter()
.flat_map(|list| list.into_iter())
.find_map(|addr| parse_addr(addr).ok().map(|(p, _)| p));
.flat_map(|addr| parse_addr(addr).ok().map(|(p, _)| p))
.collect::<HashSet<_>>();

gum::trace!(
target: LOG_TARGET,
?peer_ids,
?authority,
"Resolved to peer ids"
);

if let Some(p) = peer_id {
authority_ids.entry(p).or_default().insert(authority);
for p in peer_ids {
authority_ids.entry(p).or_default().insert(authority.clone());
}
}

Expand Down
Loading

0 comments on commit 6720279

Please sign in to comment.