Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Ensure that we fetch another collation if the first collation was invalid #3362

Merged
merged 2 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 71 additions & 28 deletions node/network/collator-protocol/src/validator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,14 @@ type PendingCollationFetch = (
);

/// The status of the collations in [`CollationsPerRelayParent`].
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
enum CollationStatus {
/// We are waiting for a collation to be advertised to us.
Waiting,
/// We are currently fetching a collation.
Fetching,
/// We are waiting that a collation is being validated.
WaitingOnValidation,
/// We have seconded a collation.
Seconded,
}
Expand All @@ -496,6 +498,16 @@ impl Default for CollationStatus {
}
}

impl CollationStatus {
/// Downgrades to `Waiting`, but only if `self != Seconded`.
fn back_to_waiting(&mut self) {
match self {
Self::Seconded => {},
_ => *self = Self::Waiting,
}
}
}

/// Information about collations per relay parent.
#[derive(Default)]
struct CollationsPerRelayParent {
Expand All @@ -505,6 +517,25 @@ struct CollationsPerRelayParent {
unfetched_collations: Vec<(PendingCollation, CollatorId)>,
}

impl CollationsPerRelayParent {
/// Returns the next collation to fetch from the `unfetched_collations`.
///
/// This will reset the status back to `Waiting` using [`CollationStatus::back_to_waiting`].
///
/// Returns `Some(_)` if there is any collation to fetch and the `status` is not `Seconded`.
pub fn get_next_collation_to_fetch(&mut self) -> Option<(PendingCollation, CollatorId)> {
self.status.back_to_waiting();

match self.status {
// We don't need to fetch any other collation when we already have seconded one.
CollationStatus::Seconded => None,
CollationStatus::Waiting => self.unfetched_collations.pop(),
CollationStatus::WaitingOnValidation | CollationStatus::Fetching =>
unreachable!("We have reset the status above!"),
}
}
}

/// All state relevant for the validator side of the protocol lives here.
#[derive(Default)]
struct State {
Expand Down Expand Up @@ -812,7 +843,7 @@ where
let collations = state.collations_per_relay_parent.entry(relay_parent).or_default();

match collations.status {
CollationStatus::Fetching =>
CollationStatus::Fetching | CollationStatus::WaitingOnValidation =>
collations.unfetched_collations.push((pending_collation, id)),
CollationStatus::Waiting => {
collations.status = CollationStatus::Fetching;
Expand Down Expand Up @@ -1024,14 +1055,28 @@ where
}
}
Invalid(parent, candidate_receipt) => {
if state.pending_candidates
.get(&parent)
.map(|e| e.1.commitments_hash == Some(candidate_receipt.commitments_hash))
.unwrap_or_default()
{
if let Some((id, _)) = state.pending_candidates.remove(&parent) {
report_collator(ctx, &state.peer_data, id).await;
let id = match state.pending_candidates.entry(parent) {
Entry::Occupied(entry)
if entry.get().1.commitments_hash == Some(candidate_receipt.commitments_hash) => entry.remove().0,
Entry::Occupied(_) => {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?parent,
candidate = ?candidate_receipt.hash(),
"Reported invalid candidate for unknown `pending_candidate`!",
);
return
}
Entry::Vacant(_) => return,
};

report_collator(ctx, &state.peer_data, id).await;

if let Some((next, id)) = state.collations_per_relay_parent
.get_mut(&parent)
.and_then(|c| c.get_next_collation_to_fetch())
{
fetch_collation(ctx, state, next, id).await;
}
}
}
Expand Down Expand Up @@ -1139,30 +1184,21 @@ async fn handle_collation_fetched_result(
"Failed to fetch collation.",
);

let (next_try, id) = if let Some(collations) = state.collations_per_relay_parent.get_mut(&relay_parent) {
if let Some(next_try) = collations.unfetched_collations.pop() {
next_try
} else if matches!(collations.status, CollationStatus::Fetching) {
collations.status = CollationStatus::Waiting;
return
} else {
tracing::error!(
target: LOG_TARGET,
status = ?collations.status,
"Expected status `CollationStatus::Fetching` but got unexpected status."
);
return
}
} else {
return
};

fetch_collation(ctx, state, next_try, id).await;
if let Some((next, id)) = state.collations_per_relay_parent
.get_mut(&relay_parent)
.and_then(|c| c.get_next_collation_to_fetch())
{
fetch_collation(ctx, state, next, id).await;
}

return
},
};

if let Some(collations) = state.collations_per_relay_parent.get_mut(&relay_parent) {
collations.status = CollationStatus::WaitingOnValidation;
Comment on lines +1198 to +1199
Copy link
Member

@ordian ordian Jun 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can it be None?
can the status be Seconded?
I feel like the state transition function could use more type-safety and checked assumptions/invariants built-in

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can not be none, but I also don't want to use an expect here.

I feel like the state transition function could use more type-safety and checked assumptions/invariants built-in

Not sure what you mean here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the state transition function could use more type-safety and checked assumptions/invariants built-in

Not sure what you mean here.

I don't have a suggestion how to refactor it yet, so nevermind.

}

if let Entry::Vacant(entry) = state.pending_candidates.entry(relay_parent) {
collation_event.1.commitments_hash = Some(candidate_receipt.commitments_hash);
ctx.send_message(
Expand All @@ -1174,6 +1210,13 @@ async fn handle_collation_fetched_result(
).await;

entry.insert(collation_event);
} else {
tracing::error!(
target: LOG_TARGET,
?relay_parent,
candidate = ?candidate_receipt.hash(),
"Trying to insert a pending candidate failed, because there is already one!",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be unreachable, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be unreachable, yes. I just added it to have some error if we fucked it up.

)
}
}

Expand Down
119 changes: 95 additions & 24 deletions node/network/collator-protocol/src/validator_side/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use polkadot_node_network_protocol::{
our_view, ObservedRole, request_response::{Requests, ResponseSender},
};

const ACTIVITY_TIMEOUT: Duration = Duration::from_millis(50);
const ACTIVITY_TIMEOUT: Duration = Duration::from_millis(500);
const DECLARE_TIMEOUT: Duration = Duration::from_millis(25);

#[derive(Clone)]
Expand Down Expand Up @@ -262,15 +262,16 @@ async fn assert_candidate_backing_second(
expected_relay_parent: Hash,
expected_para_id: ParaId,
expected_pov: &PoV,
) {
) -> CandidateReceipt {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::CandidateBacking(CandidateBackingMessage::Second(relay_parent, candidate_receipt, incoming_pov)
) => {
assert_eq!(expected_relay_parent, relay_parent);
assert_eq!(expected_para_id, candidate_receipt.descriptor.para_id);
assert_eq!(*expected_pov, incoming_pov);
});
candidate_receipt
})
}

/// Assert that a collator got disconnected.
Expand All @@ -290,25 +291,6 @@ async fn assert_collator_disconnect(
);
}

/// Assert that the given collators got disconnected.
async fn assert_collators_disconnect(
virtual_overseer: &mut VirtualOverseer,
expected_peers: &[PeerId],
) {
for _ in expected_peers {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer(
peer,
peer_set,
)) => {
assert!(expected_peers.contains(&peer), "Unexpected collator disconnected: {:?}", peer);
assert_eq!(PeerSet::Collation, peer_set);
}
);
}
}

/// Assert that a fetch collation request was send.
async fn assert_fetch_collation_request(
virtual_overseer: &mut VirtualOverseer,
Expand Down Expand Up @@ -603,8 +585,6 @@ fn fetch_collations_works() {
&pov,
).await;

assert_collators_disconnect(&mut virtual_overseer, &[peer_b.clone(), peer_c.clone()]).await;

overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerDisconnected(peer_b.clone())),
Expand Down Expand Up @@ -681,6 +661,97 @@ fn fetch_collations_works() {
});
}

// Ensure that we fetch a second collation, after the first checked collation was found to be invalid.
#[test]
fn fetch_next_collation_on_invalid_collation() {
let test_state = TestState::default();

test_harness(|test_harness| async move {
let TestHarness {
mut virtual_overseer,
} = test_harness;

let second = Hash::random();

overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent, second])
),
).await;

respond_to_core_info_queries(&mut virtual_overseer, &test_state).await;
respond_to_core_info_queries(&mut virtual_overseer, &test_state).await;

let peer_b = PeerId::random();
let peer_c = PeerId::random();

connect_and_declare_collator(
&mut virtual_overseer,
peer_b.clone(),
test_state.collators[0].clone(),
test_state.chain_ids[0].clone(),
).await;

connect_and_declare_collator(
&mut virtual_overseer,
peer_c.clone(),
test_state.collators[1].clone(),
test_state.chain_ids[0].clone(),
).await;

advertise_collation(&mut virtual_overseer, peer_b.clone(), test_state.relay_parent).await;
advertise_collation(&mut virtual_overseer, peer_c.clone(), test_state.relay_parent).await;

let response_channel = assert_fetch_collation_request(
&mut virtual_overseer,
test_state.relay_parent,
test_state.chain_ids[0],
).await;

let pov = PoV { block_data: BlockData(vec![]) };
let mut candidate_a = CandidateReceipt::default();
candidate_a.descriptor.para_id = test_state.chain_ids[0];
candidate_a.descriptor.relay_parent = test_state.relay_parent;
response_channel.send(Ok(
CollationFetchingResponse::Collation(
candidate_a.clone(),
pov.clone(),
).encode()
)).expect("Sending response should succeed");

let receipt = assert_candidate_backing_second(
&mut virtual_overseer,
test_state.relay_parent,
test_state.chain_ids[0],
&pov,
).await;

// Inform that the candidate was invalid.
overseer_send(&mut virtual_overseer, CollatorProtocolMessage::Invalid(test_state.relay_parent, receipt)).await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(
peer,
rep,
)) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_REPORT_BAD);
}
);

// We should see a request for another collation.
assert_fetch_collation_request(
&mut virtual_overseer,
test_state.relay_parent,
test_state.chain_ids[0],
).await;

virtual_overseer
});
}

#[test]
fn inactive_disconnected() {
let test_state = TestState::default();
Expand Down