From b04586c211bcb098f666c87ddc344f5bbc01a877 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 24 Jun 2021 18:54:20 +0200 Subject: [PATCH 1/2] Ensure that we fetch another collation if the first collation was invalid --- .../src/validator_side/mod.rs | 101 +++++++++++---- .../src/validator_side/tests.rs | 119 ++++++++++++++---- 2 files changed, 171 insertions(+), 49 deletions(-) diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index 51890d5fa6d7..49e25e87a559 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -480,12 +480,14 @@ type PendingCollationFetch = ( ); /// The status of the collations in [`CollationsPerRelayParent`]. -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] enum CollationStatus { /// We are waiting for a collation to be advertised to us. Waiting, /// We are currently fetching a collation. Fetching, + /// We are waiting that a collation is being validated. + WaitingOnValidation, /// We have seconded a collation. Seconded, } @@ -496,6 +498,16 @@ impl Default for CollationStatus { } } +impl CollationStatus { + /// Downgrades to `Waiting`, but only if `self != Seconded`. + fn back_to_waiting(&mut self) { + match self { + Self::Seconded => {}, + _ => *self = Self::Waiting, + } + } +} + /// Information about collations per relay parent. #[derive(Default)] struct CollationsPerRelayParent { @@ -505,6 +517,25 @@ struct CollationsPerRelayParent { unfetched_collations: Vec<(PendingCollation, CollatorId)>, } +impl CollationsPerRelayParent { + /// Returns the next collation to fetch from the `unfetched_collations`. + /// + /// If the `status` is `Seconded`, `WaitingOnValidation`, `Fetching`, this will return `None`. + /// + /// Returns `Some(_)` if there is any collation to fetch and the `status` is correct. + pub fn get_next_collation_to_fetch( + &mut self, + ) -> Option<(PendingCollation, CollatorId)> { + match self.status { + // We don't need to fetch any other collation when we already have seconded one. + CollationStatus::Seconded => None, + CollationStatus::WaitingOnValidation => None, + CollationStatus::Waiting => self.unfetched_collations.pop(), + CollationStatus::Fetching => None, + } + } +} + /// All state relevant for the validator side of the protocol lives here. #[derive(Default)] struct State { @@ -812,7 +843,7 @@ where let collations = state.collations_per_relay_parent.entry(relay_parent).or_default(); match collations.status { - CollationStatus::Fetching => + CollationStatus::Fetching | CollationStatus::WaitingOnValidation => collations.unfetched_collations.push((pending_collation, id)), CollationStatus::Waiting => { collations.status = CollationStatus::Fetching; @@ -1024,14 +1055,32 @@ where } } Invalid(parent, candidate_receipt) => { - if state.pending_candidates - .get(&parent) - .map(|e| e.1.commitments_hash == Some(candidate_receipt.commitments_hash)) - .unwrap_or_default() - { - if let Some((id, _)) = state.pending_candidates.remove(&parent) { - report_collator(ctx, &state.peer_data, id).await; + let id = match state.pending_candidates.entry(parent) { + Entry::Occupied(entry) + if entry.get().1.commitments_hash == Some(candidate_receipt.commitments_hash) => entry.remove().0, + Entry::Occupied(_) => { + tracing::error!( + target: LOG_TARGET, + relay_parent = ?parent, + candidate = ?candidate_receipt.hash(), + "Reported invalid candidate for unknown `pending_candidate`!", + ); + return } + Entry::Vacant(_) => return, + }; + + report_collator(ctx, &state.peer_data, id).await; + + let next = if let Some(collations) = state.collations_per_relay_parent.get_mut(&parent) { + collations.status.back_to_waiting(); + collations.get_next_collation_to_fetch() + } else { + None + }; + + if let Some((next, id)) = next { + fetch_collation(ctx, state, next, id).await; } } } @@ -1139,30 +1188,25 @@ async fn handle_collation_fetched_result( "Failed to fetch collation.", ); - let (next_try, id) = if let Some(collations) = state.collations_per_relay_parent.get_mut(&relay_parent) { - if let Some(next_try) = collations.unfetched_collations.pop() { - next_try - } else if matches!(collations.status, CollationStatus::Fetching) { - collations.status = CollationStatus::Waiting; - return - } else { - tracing::error!( - target: LOG_TARGET, - status = ?collations.status, - "Expected status `CollationStatus::Fetching` but got unexpected status." - ); - return - } + let fetch = if let Some(collations) = state.collations_per_relay_parent.get_mut(&relay_parent) { + collations.status.back_to_waiting(); + collations.get_next_collation_to_fetch() } else { - return + None }; - fetch_collation(ctx, state, next_try, id).await; + if let Some((next, id)) = fetch { + fetch_collation(ctx, state, next, id).await; + } return }, }; + if let Some(collations) = state.collations_per_relay_parent.get_mut(&relay_parent) { + collations.status = CollationStatus::WaitingOnValidation; + } + if let Entry::Vacant(entry) = state.pending_candidates.entry(relay_parent) { collation_event.1.commitments_hash = Some(candidate_receipt.commitments_hash); ctx.send_message( @@ -1174,6 +1218,13 @@ async fn handle_collation_fetched_result( ).await; entry.insert(collation_event); + } else { + tracing::error!( + target: LOG_TARGET, + ?relay_parent, + candidate = ?candidate_receipt.hash(), + "Trying to insert a pending candidate failed, because there is already one!", + ) } } diff --git a/node/network/collator-protocol/src/validator_side/tests.rs b/node/network/collator-protocol/src/validator_side/tests.rs index dc79c575cdeb..fb49bdfa2c56 100644 --- a/node/network/collator-protocol/src/validator_side/tests.rs +++ b/node/network/collator-protocol/src/validator_side/tests.rs @@ -36,7 +36,7 @@ use polkadot_node_network_protocol::{ our_view, ObservedRole, request_response::{Requests, ResponseSender}, }; -const ACTIVITY_TIMEOUT: Duration = Duration::from_millis(50); +const ACTIVITY_TIMEOUT: Duration = Duration::from_millis(500); const DECLARE_TIMEOUT: Duration = Duration::from_millis(25); #[derive(Clone)] @@ -262,7 +262,7 @@ async fn assert_candidate_backing_second( expected_relay_parent: Hash, expected_para_id: ParaId, expected_pov: &PoV, -) { +) -> CandidateReceipt { assert_matches!( overseer_recv(virtual_overseer).await, AllMessages::CandidateBacking(CandidateBackingMessage::Second(relay_parent, candidate_receipt, incoming_pov) @@ -270,7 +270,8 @@ async fn assert_candidate_backing_second( assert_eq!(expected_relay_parent, relay_parent); assert_eq!(expected_para_id, candidate_receipt.descriptor.para_id); assert_eq!(*expected_pov, incoming_pov); - }); + candidate_receipt + }) } /// Assert that a collator got disconnected. @@ -290,25 +291,6 @@ async fn assert_collator_disconnect( ); } -/// Assert that the given collators got disconnected. -async fn assert_collators_disconnect( - virtual_overseer: &mut VirtualOverseer, - expected_peers: &[PeerId], -) { - for _ in expected_peers { - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( - peer, - peer_set, - )) => { - assert!(expected_peers.contains(&peer), "Unexpected collator disconnected: {:?}", peer); - assert_eq!(PeerSet::Collation, peer_set); - } - ); - } -} - /// Assert that a fetch collation request was send. async fn assert_fetch_collation_request( virtual_overseer: &mut VirtualOverseer, @@ -603,8 +585,6 @@ fn fetch_collations_works() { &pov, ).await; - assert_collators_disconnect(&mut virtual_overseer, &[peer_b.clone(), peer_c.clone()]).await; - overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerDisconnected(peer_b.clone())), @@ -681,6 +661,97 @@ fn fetch_collations_works() { }); } +// Ensure that we fetch a second collation, after the first checked collation was found to be invalid. +#[test] +fn fetch_next_collation_on_invalid_collation() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { + mut virtual_overseer, + } = test_harness; + + let second = Hash::random(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent, second]) + ), + ).await; + + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + + connect_and_declare_collator( + &mut virtual_overseer, + peer_b.clone(), + test_state.collators[0].clone(), + test_state.chain_ids[0].clone(), + ).await; + + connect_and_declare_collator( + &mut virtual_overseer, + peer_c.clone(), + test_state.collators[1].clone(), + test_state.chain_ids[0].clone(), + ).await; + + advertise_collation(&mut virtual_overseer, peer_b.clone(), test_state.relay_parent).await; + advertise_collation(&mut virtual_overseer, peer_c.clone(), test_state.relay_parent).await; + + let response_channel = assert_fetch_collation_request( + &mut virtual_overseer, + test_state.relay_parent, + test_state.chain_ids[0], + ).await; + + let pov = PoV { block_data: BlockData(vec![]) }; + let mut candidate_a = CandidateReceipt::default(); + candidate_a.descriptor.para_id = test_state.chain_ids[0]; + candidate_a.descriptor.relay_parent = test_state.relay_parent; + response_channel.send(Ok( + CollationFetchingResponse::Collation( + candidate_a.clone(), + pov.clone(), + ).encode() + )).expect("Sending response should succeed"); + + let receipt = assert_candidate_backing_second( + &mut virtual_overseer, + test_state.relay_parent, + test_state.chain_ids[0], + &pov, + ).await; + + // Inform that the candidate was invalid. + overseer_send(&mut virtual_overseer, CollatorProtocolMessage::Invalid(test_state.relay_parent, receipt)).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer( + peer, + rep, + )) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, COST_REPORT_BAD); + } + ); + + // We should see a request for another collation. + assert_fetch_collation_request( + &mut virtual_overseer, + test_state.relay_parent, + test_state.chain_ids[0], + ).await; + + virtual_overseer + }); +} + #[test] fn inactive_disconnected() { let test_state = TestState::default(); From e38a2a81bdb6aa6da51f211bfeafd2bf896d9dcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Mon, 28 Jun 2021 13:02:39 +0200 Subject: [PATCH 2/2] Feedback --- .../src/validator_side/mod.rs | 38 ++++++++----------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index 49e25e87a559..709855e353af 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -520,18 +520,18 @@ struct CollationsPerRelayParent { impl CollationsPerRelayParent { /// Returns the next collation to fetch from the `unfetched_collations`. /// - /// If the `status` is `Seconded`, `WaitingOnValidation`, `Fetching`, this will return `None`. + /// This will reset the status back to `Waiting` using [`CollationStatus::back_to_waiting`]. /// - /// Returns `Some(_)` if there is any collation to fetch and the `status` is correct. - pub fn get_next_collation_to_fetch( - &mut self, - ) -> Option<(PendingCollation, CollatorId)> { + /// Returns `Some(_)` if there is any collation to fetch and the `status` is not `Seconded`. + pub fn get_next_collation_to_fetch(&mut self) -> Option<(PendingCollation, CollatorId)> { + self.status.back_to_waiting(); + match self.status { // We don't need to fetch any other collation when we already have seconded one. CollationStatus::Seconded => None, - CollationStatus::WaitingOnValidation => None, CollationStatus::Waiting => self.unfetched_collations.pop(), - CollationStatus::Fetching => None, + CollationStatus::WaitingOnValidation | CollationStatus::Fetching => + unreachable!("We have reset the status above!"), } } } @@ -1072,14 +1072,10 @@ where report_collator(ctx, &state.peer_data, id).await; - let next = if let Some(collations) = state.collations_per_relay_parent.get_mut(&parent) { - collations.status.back_to_waiting(); - collations.get_next_collation_to_fetch() - } else { - None - }; - - if let Some((next, id)) = next { + if let Some((next, id)) = state.collations_per_relay_parent + .get_mut(&parent) + .and_then(|c| c.get_next_collation_to_fetch()) + { fetch_collation(ctx, state, next, id).await; } } @@ -1188,14 +1184,10 @@ async fn handle_collation_fetched_result( "Failed to fetch collation.", ); - let fetch = if let Some(collations) = state.collations_per_relay_parent.get_mut(&relay_parent) { - collations.status.back_to_waiting(); - collations.get_next_collation_to_fetch() - } else { - None - }; - - if let Some((next, id)) = fetch { + if let Some((next, id)) = state.collations_per_relay_parent + .get_mut(&relay_parent) + .and_then(|c| c.get_next_collation_to_fetch()) + { fetch_collation(ctx, state, next, id).await; }