From c4be2f56a168bbf2fe34b7c97cf281ea205a8d06 Mon Sep 17 00:00:00 2001 From: Andronik Date: Sun, 13 Feb 2022 15:00:37 +0100 Subject: [PATCH] Revert "collator-protocol: short-term fixes for connectivity (#4640)" This reverts commit 5b74841ea76195fb804a02c0336d05d9a2d3091b. --- .../src/collator_side/mod.rs | 122 +++++--------- .../src/collator_side/tests.rs | 159 +++++++++--------- node/network/collator-protocol/src/lib.rs | 2 +- node/network/protocol/src/peer_set.rs | 2 +- 4 files changed, 127 insertions(+), 158 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 58d6898bc2ac..6834d6ffa4b7 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -321,14 +321,15 @@ impl State { /// Distribute a collation. /// -/// If the para is not scheduled on any core, at the relay parent, -/// or the relay parent isn't in our view or we already collated on the relay parent, -/// we ignore the message as it must be invalid in that case - -/// although this indicates a logic error elsewhere in the node. -/// -/// Otherwise, start advertising the collation to interested peers. +/// Figure out the core our para is assigned to and the relevant validators. +/// Issue a connection request to these validators. +/// If the para is not scheduled or next up on any core, at the relay-parent, +/// or the relay-parent isn't in the active-leaves set, we ignore the message +/// as it must be invalid in that case - although this indicates a logic error +/// elsewhere in the node. async fn distribute_collation( ctx: &mut Context, + runtime: &mut RuntimeInfo, state: &mut State, id: ParaId, receipt: CandidateReceipt, @@ -357,8 +358,32 @@ where return Ok(()) } - if !state.our_validators_groups.contains_key(&relay_parent) { - tracing::warn!(target: LOG_TARGET, "Could not determine validators assigned to the core."); + // Determine which core the para collated-on is assigned to. + // If it is not scheduled then ignore the message. + let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? { + Some(core) => core, + None => { + tracing::warn!( + target: LOG_TARGET, + para_id = %id, + ?relay_parent, + "looks like no core is assigned to {} at {}", id, relay_parent, + ); + + return Ok(()) + }, + }; + + // Determine the group on that core. + let current_validators = + determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?; + + if current_validators.validators.is_empty() { + tracing::warn!( + target: LOG_TARGET, + core = ?our_core, + "there are no validators assigned to core", + ); return Ok(()) } @@ -369,9 +394,16 @@ where relay_parent = %relay_parent, candidate_hash = ?receipt.hash(), pov_hash = ?pov.hash(), - "Accepted collation", + core = ?our_core, + ?current_validators, + "Accepted collation, connecting to validators." ); + // Issue a discovery request for the validators of the current group: + connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await; + + state.our_validators_groups.insert(relay_parent, ValidatorGroup::new()); + if let Some(result_sender) = result_sender { state.collation_result_senders.insert(receipt.hash(), result_sender); } @@ -490,7 +522,7 @@ where Context: overseer::SubsystemContext, { // ignore address resolution failure - // will reissue a new request on new relay parent + // will reissue a new request on new collation let (failed, _) = oneshot::channel(); ctx.send_message(NetworkBridgeMessage::ConnectToValidators { validator_ids, @@ -601,7 +633,8 @@ where ); }, Some(id) => { - distribute_collation(ctx, state, id, receipt, pov, result_sender).await?; + distribute_collation(ctx, runtime, state, id, receipt, pov, result_sender) + .await?; }, None => { tracing::warn!( @@ -886,7 +919,7 @@ where }, OurViewChange(view) => { tracing::trace!(target: LOG_TARGET, ?view, "Own view change"); - handle_our_view_change(ctx, runtime, state, view).await?; + handle_our_view_change(state, view).await?; }, PeerMessage(remote, msg) => { handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?; @@ -900,16 +933,7 @@ where } /// Handles our view changes. -async fn handle_our_view_change( - ctx: &mut Context, - runtime: &mut RuntimeInfo, - state: &mut State, - view: OurView, -) -> Result<()> -where - Context: SubsystemContext, - Context: overseer::SubsystemContext, -{ +async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()> { for removed in state.view.difference(&view) { tracing::debug!(target: LOG_TARGET, relay_parent = ?removed, "Removing relay parent because our view changed."); @@ -943,60 +967,6 @@ where } state.view = view; - if state.view.is_empty() { - return Ok(()) - } - - let id = match state.collating_on { - Some(id) => id, - None => return Ok(()), - }; - - // all validators assigned to the core - // across all active leaves - // this is typically our current group - // but can also include the previous group at - // rotation boundaries and considering forks - let mut group_validators = HashSet::new(); - - for relay_parent in state.view.iter().cloned() { - tracing::debug!( - target: LOG_TARGET, - ?relay_parent, - para_id = ?id, - "Processing relay parent.", - ); - - // Determine our assigned core. - // If it is not scheduled then ignore the relay parent. - let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? { - Some(core) => core, - None => continue, - }; - - // Determine the group on that core. - let current_validators = - determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?; - - let validators = current_validators.validators; - group_validators.extend(validators); - - state.our_validators_groups.entry(relay_parent).or_insert(ValidatorGroup::new()); - } - - let validators: Vec<_> = group_validators.into_iter().collect(); - let no_one_is_assigned = validators.is_empty(); - if no_one_is_assigned { - tracing::warn!(target: LOG_TARGET, "No validators assigned to our core.",); - return Ok(()) - } - tracing::debug!( - target: LOG_TARGET, - ?validators, - para_id = ?id, - "Connecting to validators.", - ); - connect_to_validators(ctx, validators).await; Ok(()) } diff --git a/node/network/collator-protocol/src/collator_side/tests.rs b/node/network/collator-protocol/src/collator_side/tests.rs index 86d5639ad610..526cfab04e19 100644 --- a/node/network/collator-protocol/src/collator_side/tests.rs +++ b/node/network/collator-protocol/src/collator_side/tests.rs @@ -29,7 +29,7 @@ use sp_core::crypto::Pair; use sp_keyring::Sr25519Keyring; use sp_runtime::traits::AppVerify; -use polkadot_node_network_protocol::{our_view, request_response::IncomingRequest, view, OurView}; +use polkadot_node_network_protocol::{our_view, request_response::IncomingRequest, view}; use polkadot_node_primitives::BlockData; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_primitives::{ @@ -172,7 +172,13 @@ impl TestState { our_view![self.relay_parent] }; - set_our_view(virtual_overseer, &self, our_view).await; + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( + our_view, + )), + ) + .await; } } @@ -272,83 +278,13 @@ async fn setup_system(virtual_overseer: &mut VirtualOverseer, test_state: &TestS ) .await; - set_our_view(virtual_overseer, test_state, our_view![test_state.relay_parent]).await; -} - -/// Check our view change triggers the right messages -async fn set_our_view( - virtual_overseer: &mut VirtualOverseer, - test_state: &TestState, - our_view: OurView, -) { overseer_send( virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange( - our_view.clone(), + our_view![test_state.relay_parent], )), ) .await; - - for parent in our_view.iter().cloned() { - // obtain the availability cores. - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::AvailabilityCores(tx) - )) => { - assert_eq!(relay_parent, parent); - tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap(); - } - ); - - // We don't know precisely what is going to come as session info might be cached: - loop { - match overseer_recv(virtual_overseer).await { - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(tx), - )) => { - assert_eq!(relay_parent, relay_parent); - tx.send(Ok(test_state.current_session_index())).unwrap(); - }, - - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionInfo(index, tx), - )) => { - assert_eq!(relay_parent, parent); - assert_eq!(index, test_state.current_session_index()); - - tx.send(Ok(Some(test_state.session_info.clone()))).unwrap(); - }, - - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::ValidatorGroups(tx), - )) => { - assert_eq!(relay_parent, parent); - tx.send(Ok(( - test_state.session_info.validator_groups.clone(), - test_state.group_rotation_info.clone(), - ))) - .unwrap(); - // This call is mandatory - we are done: - break - }, - other => panic!("Unexpected message received: {:?}", other), - } - } - } - - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ConnectToValidators { - .. - } - ) => {} - ); } /// Result of [`distribute_collation`] @@ -361,6 +297,8 @@ struct DistributeCollation { async fn distribute_collation( virtual_overseer: &mut VirtualOverseer, test_state: &TestState, + // whether or not we expect a connection request or not. + should_connect: bool, ) -> DistributeCollation { // Now we want to distribute a `PoVBlock` let pov_block = PoV { block_data: BlockData(vec![42, 43, 44]) }; @@ -381,6 +319,67 @@ async fn distribute_collation( ) .await; + // obtain the availability cores. + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx) + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap(); + } + ); + + // We don't know precisely what is going to come as session info might be cached: + loop { + match overseer_recv(virtual_overseer).await { + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + tx.send(Ok(test_state.current_session_index())).unwrap(); + }, + + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(index, tx), + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(index, test_state.current_session_index()); + + tx.send(Ok(Some(test_state.session_info.clone()))).unwrap(); + }, + + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorGroups(tx), + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + tx.send(Ok(( + test_state.session_info.validator_groups.clone(), + test_state.group_rotation_info.clone(), + ))) + .unwrap(); + // This call is mandatory - we are done: + break + }, + other => panic!("Unexpected message received: {:?}", other), + } + } + + if should_connect { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ConnectToValidators { + .. + } + ) => {} + ); + } + DistributeCollation { candidate, pov_block } } @@ -509,7 +508,7 @@ fn advertise_and_send_collation() { setup_system(&mut virtual_overseer, &test_state).await; let DistributeCollation { candidate, pov_block } = - distribute_collation(&mut virtual_overseer, &test_state).await; + distribute_collation(&mut virtual_overseer, &test_state, true).await; for (val, peer) in test_state .current_group_validator_authority_ids() @@ -626,7 +625,7 @@ fn advertise_and_send_collation() { assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none()); - distribute_collation(&mut virtual_overseer, &test_state).await; + distribute_collation(&mut virtual_overseer, &test_state, true).await; // Send info about peer's view. overseer_send( @@ -714,7 +713,7 @@ fn collations_are_only_advertised_to_validators_with_correct_view() { // And let it tell us that it is has the same view. send_peer_view_change(virtual_overseer, &peer2, vec![test_state.relay_parent]).await; - distribute_collation(virtual_overseer, &test_state).await; + distribute_collation(virtual_overseer, &test_state, true).await; expect_advertise_collation_msg(virtual_overseer, &peer2, test_state.relay_parent).await; @@ -753,14 +752,14 @@ fn collate_on_two_different_relay_chain_blocks() { expect_declare_msg(virtual_overseer, &test_state, &peer).await; expect_declare_msg(virtual_overseer, &test_state, &peer2).await; - distribute_collation(virtual_overseer, &test_state).await; + distribute_collation(virtual_overseer, &test_state, true).await; let old_relay_parent = test_state.relay_parent; // Advance to a new round, while informing the subsystem that the old and the new relay parent are active. test_state.advance_to_new_round(virtual_overseer, true).await; - distribute_collation(virtual_overseer, &test_state).await; + distribute_collation(virtual_overseer, &test_state, true).await; send_peer_view_change(virtual_overseer, &peer, vec![old_relay_parent]).await; expect_advertise_collation_msg(virtual_overseer, &peer, old_relay_parent).await; @@ -790,7 +789,7 @@ fn validator_reconnect_does_not_advertise_a_second_time() { connect_peer(virtual_overseer, peer.clone(), Some(validator_id.clone())).await; expect_declare_msg(virtual_overseer, &test_state, &peer).await; - distribute_collation(virtual_overseer, &test_state).await; + distribute_collation(virtual_overseer, &test_state, true).await; send_peer_view_change(virtual_overseer, &peer, vec![test_state.relay_parent]).await; expect_advertise_collation_msg(virtual_overseer, &peer, test_state.relay_parent).await; @@ -875,7 +874,7 @@ where setup_system(virtual_overseer, &test_state).await; let DistributeCollation { candidate, pov_block } = - distribute_collation(virtual_overseer, &test_state).await; + distribute_collation(virtual_overseer, &test_state, true).await; for (val, peer) in test_state .current_group_validator_authority_ids() diff --git a/node/network/collator-protocol/src/lib.rs b/node/network/collator-protocol/src/lib.rs index 769b1448690b..0aa53156e759 100644 --- a/node/network/collator-protocol/src/lib.rs +++ b/node/network/collator-protocol/src/lib.rs @@ -58,7 +58,7 @@ pub struct CollatorEvictionPolicy { impl Default for CollatorEvictionPolicy { fn default() -> Self { CollatorEvictionPolicy { - inactive_collator: Duration::from_secs(5), + inactive_collator: Duration::from_secs(24), undeclared: Duration::from_secs(1), } } diff --git a/node/network/protocol/src/peer_set.rs b/node/network/protocol/src/peer_set.rs index 7856e3fd9f96..3d2f133163f6 100644 --- a/node/network/protocol/src/peer_set.rs +++ b/node/network/protocol/src/peer_set.rs @@ -75,7 +75,7 @@ impl PeerSet { max_notification_size, set_config: SetConfig { // Non-authority nodes don't need to accept incoming connections on this peer set: - in_peers: if is_authority == IsAuthority::Yes { 100 } else { 0 }, + in_peers: if is_authority == IsAuthority::Yes { 25 } else { 0 }, out_peers: 0, reserved_nodes: Vec::new(), non_reserved_mode: if is_authority == IsAuthority::Yes {