Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
collator-protocol: fix message fallout
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier committed Sep 9, 2022
1 parent e96f0b2 commit 07cc887
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 64 deletions.
136 changes: 101 additions & 35 deletions node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ use sp_core::Pair;

use polkadot_node_network_protocol::{
self as net_protocol,
peer_set::PeerSet,
peer_set::{PeerSet, CollationVersion, ProtocolVersion},
request_response::{
incoming::{self, OutgoingResponse},
v1::{self as request_v1, CollationFetchingRequest, CollationFetchingResponse},
IncomingRequest, IncomingRequestReceiver,
},
v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, Versioned, View,
v1 as protocol_v1, vstaging as protocol_vstaging, OurView, PeerId,
UnifiedReputationChange as Rep, Versioned, View,
};
use polkadot_node_primitives::{CollationSecondedSignal, PoV, Statement};
use polkadot_node_subsystem::{
Expand Down Expand Up @@ -272,6 +273,11 @@ struct WaitingCollationFetches {
type ActiveCollationFetches =
FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, PeerId)> + Send + 'static>>>;

struct PeerData {
view: View,
version: ProtocolVersion,
}

struct State {
/// Our network peer id.
local_peer_id: PeerId,
Expand All @@ -285,7 +291,7 @@ struct State {

/// Track all active peers and their views
/// to determine what is relevant to them.
peer_views: HashMap<PeerId, View>,
peer_data: HashMap<PeerId, PeerData>,

/// Our own view.
view: OurView,
Expand Down Expand Up @@ -332,7 +338,7 @@ impl State {
collator_pair,
metrics,
collating_on: Default::default(),
peer_views: Default::default(),
peer_data: Default::default(),
view: Default::default(),
span_per_relay_parent: Default::default(),
collations: Default::default(),
Expand All @@ -344,12 +350,13 @@ impl State {
}
}

/// Get all peers which have the given relay parent in their view.
fn peers_interested_in_leaf(&self, relay_parent: &Hash) -> Vec<PeerId> {
self.peer_views
/// Get all peers which have the given relay parent in their view along
/// with their protocol version.
fn peers_interested_in_leaf(&self, relay_parent: &Hash) -> Vec<(PeerId, ProtocolVersion)> {
self.peer_data
.iter()
.filter(|(_, v)| v.contains(relay_parent))
.map(|(peer, _)| *peer)
.filter(|(_, data)| data.view.contains(relay_parent))
.map(|(peer, data)| (*peer, data.version))
.collect()
}
}
Expand Down Expand Up @@ -451,8 +458,8 @@ async fn distribute_collation<Context>(

let interested = state.peers_interested_in_leaf(&relay_parent);
// Make sure already connected peers get collations:
for peer_id in interested {
advertise_collation(ctx, state, relay_parent, peer_id).await;
for (peer_id, version) in interested {
advertise_collation(ctx, state, relay_parent, peer_id, version).await;
}

Ok(())
Expand Down Expand Up @@ -521,9 +528,10 @@ async fn determine_our_validators<Context>(
Ok(current_validators)
}

/// Issue a `Declare` collation message to the given `peer`.
/// Issue a `Declare` collation message to the given `peer` on protocol version
/// v1.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn declare<Context>(ctx: &mut Context, state: &mut State, peer: PeerId) {
async fn declare_v1<Context>(ctx: &mut Context, state: &mut State, peer: PeerId) {
let declare_signature_payload = protocol_v1::declare_signature_payload(&state.local_peer_id);

if let Some(para_id) = state.collating_on {
Expand All @@ -541,6 +549,30 @@ async fn declare<Context>(ctx: &mut Context, state: &mut State, peer: PeerId) {
}
}

/// Issue a `Declare` collation message to the given `peer` on protocol version
/// vstaging
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn declare_vstaging<Context>(ctx: &mut Context, state: &mut State, peer: PeerId) {
let declare_signature_payload =
protocol_vstaging::declare_signature_payload(&state.local_peer_id);

if let Some(para_id) = state.collating_on {
let wire_message = protocol_vstaging::CollatorProtocolMessage::Declare(
state.collator_pair.public(),
para_id,
state.collator_pair.sign(&declare_signature_payload),
);

ctx.send_message(NetworkBridgeTxMessage::SendCollationMessage(
vec![peer],
Versioned::VStaging(protocol_vstaging::CollationProtocol::CollatorProtocol(
wire_message,
)),
))
.await;
}
}

/// Issue a connection request to a set of validators and
/// revoke the previous connection request.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
Expand Down Expand Up @@ -569,6 +601,7 @@ async fn advertise_collation<Context>(
state: &mut State,
relay_parent: Hash,
peer: PeerId,
version: ProtocolVersion,
) {
let should_advertise = state
.our_validators_groups
Expand Down Expand Up @@ -606,13 +639,26 @@ async fn advertise_collation<Context>(
},
}

let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent);
if version == CollationVersion::V1.into() {
let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent);

ctx.send_message(NetworkBridgeTxMessage::SendCollationMessage(
vec![peer.clone()],
Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)),
))
.await;
ctx.send_message(NetworkBridgeTxMessage::SendCollationMessage(
vec![peer.clone()],
Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)),
))
.await;
} else if version == CollationVersion::VStaging.into() {
let wire_message =
protocol_vstaging::CollatorProtocolMessage::AdvertiseCollation(relay_parent);

ctx.send_message(NetworkBridgeTxMessage::SendCollationMessage(
vec![peer.clone()],
Versioned::VStaging(protocol_vstaging::CollationProtocol::CollatorProtocol(
wire_message,
)),
))
.await;
}

if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) {
validators.advertised_to_peer(&state.peer_ids, &peer);
Expand Down Expand Up @@ -738,12 +784,11 @@ async fn handle_incoming_peer_message<Context>(
runtime: &mut RuntimeInfo,
state: &mut State,
origin: PeerId,
msg: protocol_v1::CollatorProtocolMessage,
msg: net_protocol::CollatorProtocolMessage,
) -> Result<()> {
use protocol_v1::CollatorProtocolMessage::*;

match msg {
Declare(_, _, _) => {
Versioned::V1(protocol_v1::CollatorProtocolMessage::Declare(_, _, _)) |
Versioned::VStaging(protocol_vstaging::CollatorProtocolMessage::Declare(_, _, _)) => {
gum::trace!(
target: LOG_TARGET,
?origin,
Expand All @@ -754,7 +799,8 @@ async fn handle_incoming_peer_message<Context>(
ctx.send_message(NetworkBridgeTxMessage::DisconnectPeer(origin, PeerSet::Collation))
.await;
},
AdvertiseCollation(_) => {
Versioned::V1(protocol_v1::CollatorProtocolMessage::AdvertiseCollation(_)) |
Versioned::VStaging(protocol_vstaging::CollatorProtocolMessage::AdvertiseCollation(_)) => {
gum::trace!(
target: LOG_TARGET,
?origin,
Expand All @@ -771,7 +817,14 @@ async fn handle_incoming_peer_message<Context>(
ctx.send_message(NetworkBridgeTxMessage::DisconnectPeer(origin, PeerSet::Collation))
.await;
},
CollationSeconded(relay_parent, statement) => {
Versioned::V1(protocol_v1::CollatorProtocolMessage::CollationSeconded(
relay_parent,
statement,
)) |
Versioned::VStaging(protocol_vstaging::CollatorProtocolMessage::CollationSeconded(
relay_parent,
statement,
)) =>
if !matches!(statement.unchecked_payload(), Statement::Seconded(_)) {
gum::warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -804,8 +857,7 @@ async fn handle_incoming_peer_message<Context>(
"received an unexpected `CollationSeconded`: unknown statement",
);
}
}
},
},
}

Ok(())
Expand Down Expand Up @@ -892,14 +944,23 @@ async fn handle_peer_view_change<Context>(
peer_id: PeerId,
view: View,
) {
let current = state.peer_views.entry(peer_id.clone()).or_default();
let (added, version) = {
let peer_data = match state.peer_data.get_mut(&peer_id) {
Some(pd) => pd,
None => return,
};

let added: Vec<Hash> = view.difference(&*current).cloned().collect();
let current = &mut peer_data.view;

*current = view;
let added: Vec<Hash> = view.difference(&*current).cloned().collect();

*current = view;

(added, peer_data.version)
};

for added in added.into_iter() {
advertise_collation(ctx, state, added, peer_id.clone()).await;
advertise_collation(ctx, state, added, peer_id.clone(), version).await;
}
}

Expand All @@ -914,7 +975,7 @@ async fn handle_network_msg<Context>(
use NetworkBridgeEvent::*;

match bridge_message {
PeerConnected(peer_id, observed_role, _, maybe_authority) => {
PeerConnected(peer_id, observed_role, version, maybe_authority) => {
// If it is possible that a disconnected validator would attempt a reconnect
// it should be handled here.
gum::trace!(target: LOG_TARGET, ?peer_id, ?observed_role, "Peer connected");
Expand All @@ -926,8 +987,13 @@ async fn handle_network_msg<Context>(
"Connected to requested validator"
);
state.peer_ids.insert(peer_id, authority_ids);
state.peer_data.insert(peer_id, PeerData { view: View::default(), version });

declare(ctx, state, peer_id).await;
if version == CollationVersion::V1.into() {
declare_v1(ctx, state, peer_id).await;
} else if version == CollationVersion::VStaging.into() {
declare_vstaging(ctx, state, peer_id).await;
}
}
},
PeerViewChange(peer_id, view) => {
Expand All @@ -936,14 +1002,14 @@ async fn handle_network_msg<Context>(
},
PeerDisconnected(peer_id) => {
gum::trace!(target: LOG_TARGET, ?peer_id, "Peer disconnected");
state.peer_views.remove(&peer_id);
state.peer_data.remove(&peer_id);
state.peer_ids.remove(&peer_id);
},
OurViewChange(view) => {
gum::trace!(target: LOG_TARGET, ?view, "Own view change");
handle_our_view_change(state, view).await?;
},
PeerMessage(remote, Versioned::V1(msg)) => {
PeerMessage(remote, msg) => {
handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
},
NewGossipTopology { .. } => {
Expand Down
Loading

0 comments on commit 07cc887

Please sign in to comment.