From bf10f6f50562cb528ef3635eba8ac65c73257e2d Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 5 Oct 2022 10:32:22 +0300 Subject: [PATCH 1/8] Move Role(s) to `sc-network-common` --- .../src/communication/gossip.rs | 2 +- .../src/communication/tests.rs | 5 +- client/network-gossip/src/bridge.rs | 2 +- client/network-gossip/src/state_machine.rs | 2 +- client/network-gossip/src/validator.rs | 2 +- client/network/common/src/protocol.rs | 1 + client/network/common/src/protocol/event.rs | 24 +--- client/network/common/src/protocol/role.rs | 121 ++++++++++++++++++ client/network/src/behaviour.rs | 5 +- client/network/src/config.rs | 26 +--- client/network/src/lib.rs | 3 +- client/network/src/protocol.rs | 9 +- client/network/src/protocol/message.rs | 59 +-------- client/network/src/service.rs | 6 +- client/network/transactions/src/lib.rs | 5 +- 15 files changed, 143 insertions(+), 129 deletions(-) create mode 100644 client/network/common/src/protocol/role.rs diff --git a/client/finality-grandpa/src/communication/gossip.rs b/client/finality-grandpa/src/communication/gossip.rs index 95efedf7b23b7..1ba5e0da33c96 100644 --- a/client/finality-grandpa/src/communication/gossip.rs +++ b/client/finality-grandpa/src/communication/gossip.rs @@ -90,7 +90,7 @@ use parity_scale_codec::{Decode, Encode}; use prometheus_endpoint::{register, CounterVec, Opts, PrometheusError, Registry, U64}; use rand::seq::SliceRandom; use sc_network::{PeerId, ReputationChange}; -use sc_network_common::protocol::event::ObservedRole; +use sc_network_common::protocol::role::ObservedRole; use sc_network_gossip::{MessageIntent, ValidatorContext}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; diff --git a/client/finality-grandpa/src/communication/tests.rs b/client/finality-grandpa/src/communication/tests.rs index b73f6cdecdd4f..eab7bb2df50cf 100644 --- a/client/finality-grandpa/src/communication/tests.rs +++ b/client/finality-grandpa/src/communication/tests.rs @@ -28,10 +28,7 @@ use parity_scale_codec::Encode; use sc_network::{config::Role, Multiaddr, PeerId, ReputationChange}; use sc_network_common::{ config::MultiaddrWithPeerId, - protocol::{ - event::{Event as NetworkEvent, ObservedRole}, - ProtocolName, - }, + protocol::{event::Event as NetworkEvent, role::ObservedRole, ProtocolName}, service::{ NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers, NetworkSyncForkRequest, NotificationSender, NotificationSenderError, diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index 121fa6dc9a50d..5563b3be35e8d 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -317,7 +317,7 @@ mod tests { use quickcheck::{Arbitrary, Gen, QuickCheck}; use sc_network_common::{ config::MultiaddrWithPeerId, - protocol::event::ObservedRole, + protocol::role::ObservedRole, service::{ NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers, NotificationSender, NotificationSenderError, diff --git a/client/network-gossip/src/state_machine.rs b/client/network-gossip/src/state_machine.rs index 30a2e9d1494be..600383cf5f70d 100644 --- a/client/network-gossip/src/state_machine.rs +++ b/client/network-gossip/src/state_machine.rs @@ -22,7 +22,7 @@ use ahash::AHashSet; use libp2p::PeerId; use lru::LruCache; use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; -use sc_network_common::protocol::{event::ObservedRole, ProtocolName}; +use sc_network_common::protocol::{role::ObservedRole, ProtocolName}; use sp_runtime::traits::{Block as BlockT, Hash, HashFor}; use std::{collections::HashMap, iter, sync::Arc, time, time::Instant}; diff --git a/client/network-gossip/src/validator.rs b/client/network-gossip/src/validator.rs index 185c2cfeed2c7..77dcc3bdc8791 100644 --- a/client/network-gossip/src/validator.rs +++ b/client/network-gossip/src/validator.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . use libp2p::PeerId; -use sc_network_common::protocol::event::ObservedRole; +use sc_network_common::protocol::role::ObservedRole; use sp_runtime::traits::Block as BlockT; /// Validates consensus messages. diff --git a/client/network/common/src/protocol.rs b/client/network/common/src/protocol.rs index 11edc373a2620..04bfaedbcac71 100644 --- a/client/network/common/src/protocol.rs +++ b/client/network/common/src/protocol.rs @@ -27,6 +27,7 @@ use std::{ use libp2p::core::upgrade; pub mod event; +pub mod role; /// The protocol name transmitted on the wire. #[derive(Debug, Clone)] diff --git a/client/network/common/src/protocol/event.rs b/client/network/common/src/protocol/event.rs index 3d8c183da495c..236913df1b120 100644 --- a/client/network/common/src/protocol/event.rs +++ b/client/network/common/src/protocol/event.rs @@ -20,6 +20,7 @@ //! events that happen on the network like DHT get/put results received. use super::ProtocolName; +use crate::protocol::role::ObservedRole; use bytes::Bytes; use libp2p::{core::PeerId, kad::record::Key}; @@ -97,26 +98,3 @@ pub enum Event { messages: Vec<(ProtocolName, Bytes)>, }, } - -/// Role that the peer sent to us during the handshake, with the addition of what our local node -/// knows about that peer. -/// -/// > **Note**: This enum is different from the `Role` enum. The `Role` enum indicates what a -/// > node says about itself, while `ObservedRole` is a `Role` merged with the -/// > information known locally about that node. -#[derive(Debug, Clone)] -pub enum ObservedRole { - /// Full node. - Full, - /// Light node. - Light, - /// Third-party authority. - Authority, -} - -impl ObservedRole { - /// Returns `true` for `ObservedRole::Light`. - pub fn is_light(&self) -> bool { - matches!(self, Self::Light) - } -} diff --git a/client/network/common/src/protocol/role.rs b/client/network/common/src/protocol/role.rs new file mode 100644 index 0000000000000..f30d3de87d293 --- /dev/null +++ b/client/network/common/src/protocol/role.rs @@ -0,0 +1,121 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use codec::{self, Encode, EncodeLike, Input, Output}; + +/// Role that the peer sent to us during the handshake, with the addition of what our local node +/// knows about that peer. +/// +/// > **Note**: This enum is different from the `Role` enum. The `Role` enum indicates what a +/// > node says about itself, while `ObservedRole` is a `Role` merged with the +/// > information known locally about that node. +#[derive(Debug, Clone)] +pub enum ObservedRole { + /// Full node. + Full, + /// Light node. + Light, + /// Third-party authority. + Authority, +} + +impl ObservedRole { + /// Returns `true` for `ObservedRole::Light`. + pub fn is_light(&self) -> bool { + matches!(self, Self::Light) + } +} + +/// Role of the local node. +#[derive(Debug, Clone)] +pub enum Role { + /// Regular full node. + Full, + /// Actual authority. + Authority, +} + +impl Role { + /// True for [`Role::Authority`]. + pub fn is_authority(&self) -> bool { + matches!(self, Self::Authority { .. }) + } +} + +impl std::fmt::Display for Role { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Full => write!(f, "FULL"), + Self::Authority { .. } => write!(f, "AUTHORITY"), + } + } +} + +bitflags::bitflags! { + /// Bitmask of the roles that a node fulfills. + pub struct Roles: u8 { + /// No network. + const NONE = 0b00000000; + /// Full node, does not participate in consensus. + const FULL = 0b00000001; + /// Light client node. + const LIGHT = 0b00000010; + /// Act as an authority + const AUTHORITY = 0b00000100; + } +} + +impl Roles { + /// Does this role represents a client that holds full chain data locally? + pub fn is_full(&self) -> bool { + self.intersects(Self::FULL | Self::AUTHORITY) + } + + /// Does this role represents a client that does not participates in the consensus? + pub fn is_authority(&self) -> bool { + *self == Self::AUTHORITY + } + + /// Does this role represents a client that does not hold full chain data locally? + pub fn is_light(&self) -> bool { + !self.is_full() + } +} + +impl<'a> From<&'a Role> for Roles { + fn from(roles: &'a Role) -> Self { + match roles { + Role::Full => Self::FULL, + Role::Authority { .. } => Self::AUTHORITY, + } + } +} + +impl Encode for Roles { + fn encode_to(&self, dest: &mut T) { + dest.push_byte(self.bits()) + } +} + +impl EncodeLike for Roles {} + +impl codec::Decode for Roles { + fn decode(input: &mut I) -> Result { + Self::from_bits(input.read_byte()?).ok_or_else(|| codec::Error::from("Invalid bytes")) + } +} diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 14962c837aa10..b31f36eb46692 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -19,7 +19,7 @@ use crate::{ discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, peer_info, - protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol}, + protocol::{CustomMessageOutcome, NotificationsSink, Protocol}, request_responses, }; @@ -41,7 +41,8 @@ use sc_consensus::import_queue::{IncomingBlock, RuntimeOrigin}; use sc_network_common::{ config::ProtocolId, protocol::{ - event::{DhtEvent, ObservedRole}, + event::DhtEvent, + role::{ObservedRole, Roles}, ProtocolName, }, request_responses::{IfDisconnected, ProtocolConfig, RequestFailure}, diff --git a/client/network/src/config.rs b/client/network/src/config.rs index b2adfa81d065b..b8489e1d1b37f 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -23,6 +23,7 @@ pub use sc_network_common::{ config::ProtocolId, + protocol::role::Role, request_responses::{ IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, }, @@ -130,31 +131,6 @@ where pub request_response_protocol_configs: Vec, } -/// Role of the local node. -#[derive(Debug, Clone)] -pub enum Role { - /// Regular full node. - Full, - /// Actual authority. - Authority, -} - -impl Role { - /// True for [`Role::Authority`]. - pub fn is_authority(&self) -> bool { - matches!(self, Self::Authority { .. }) - } -} - -impl fmt::Display for Role { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Full => write!(f, "FULL"), - Self::Authority { .. } => write!(f, "AUTHORITY"), - } - } -} - /// Sync operation mode. #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum SyncMode { diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index d17f47328b804..27f2a938154fe 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -260,7 +260,8 @@ pub use libp2p::{multiaddr, Multiaddr, PeerId}; pub use protocol::PeerInfo; pub use sc_network_common::{ protocol::{ - event::{DhtEvent, Event, ObservedRole}, + event::{DhtEvent, Event}, + role::ObservedRole, ProtocolName, }, request_responses::{IfDisconnected, RequestFailure}, diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index fbf651de9d49a..a2977ba72d42b 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -31,10 +31,7 @@ use libp2p::{ Multiaddr, PeerId, }; use log::{debug, error, info, log, trace, warn, Level}; -use message::{ - generic::{Message as GenericMessage, Roles}, - Message, -}; +use message::{generic::Message as GenericMessage, Message}; use notifications::{Notifications, NotificationsOut}; use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64}; use sc_client_api::HeaderBackend; @@ -44,7 +41,7 @@ use sc_consensus::import_queue::{ use sc_network_common::{ config::{NonReservedPeerMode, ProtocolId}, error, - protocol::ProtocolName, + protocol::{role::Roles, ProtocolName}, request_responses::RequestFailure, sync::{ message::{ @@ -1627,7 +1624,7 @@ where } } else { match ( - message::Roles::decode_all(&mut &received_handshake[..]), + Roles::decode_all(&mut &received_handshake[..]), self.peers.get(&peer_id), ) { (Ok(roles), _) => CustomMessageOutcome::NotificationStreamOpened { diff --git a/client/network/src/protocol/message.rs b/client/network/src/protocol/message.rs index 3e1281753b82c..b61228abfd273 100644 --- a/client/network/src/protocol/message.rs +++ b/client/network/src/protocol/message.rs @@ -21,11 +21,11 @@ pub use self::generic::{ RemoteCallRequest, RemoteChangesRequest, RemoteChangesResponse, RemoteHeaderRequest, - RemoteHeaderResponse, RemoteReadChildRequest, RemoteReadRequest, Roles, + RemoteHeaderResponse, RemoteReadChildRequest, RemoteReadRequest, }; use codec::{Decode, Encode}; use sc_client_api::StorageProof; -use sc_network_common::message::RequestId; +use sc_network_common::{message::RequestId, protocol::role::*}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; /// Type alias for using the message type using block type parameters. @@ -62,6 +62,7 @@ pub mod generic { use sc_client_api::StorageProof; use sc_network_common::{ message::RequestId, + protocol::role::Roles, sync::message::{ generic::{BlockRequest, BlockResponse}, BlockAnnounce, @@ -69,60 +70,6 @@ pub mod generic { }; use sp_runtime::ConsensusEngineId; - bitflags! { - /// Bitmask of the roles that a node fulfills. - pub struct Roles: u8 { - /// No network. - const NONE = 0b00000000; - /// Full node, does not participate in consensus. - const FULL = 0b00000001; - /// Light client node. - const LIGHT = 0b00000010; - /// Act as an authority - const AUTHORITY = 0b00000100; - } - } - - impl Roles { - /// Does this role represents a client that holds full chain data locally? - pub fn is_full(&self) -> bool { - self.intersects(Self::FULL | Self::AUTHORITY) - } - - /// Does this role represents a client that does not participates in the consensus? - pub fn is_authority(&self) -> bool { - *self == Self::AUTHORITY - } - - /// Does this role represents a client that does not hold full chain data locally? - pub fn is_light(&self) -> bool { - !self.is_full() - } - } - - impl<'a> From<&'a crate::config::Role> for Roles { - fn from(roles: &'a crate::config::Role) -> Self { - match roles { - crate::config::Role::Full => Self::FULL, - crate::config::Role::Authority { .. } => Self::AUTHORITY, - } - } - } - - impl codec::Encode for Roles { - fn encode_to(&self, dest: &mut T) { - dest.push_byte(self.bits()) - } - } - - impl codec::EncodeLike for Roles {} - - impl codec::Decode for Roles { - fn decode(input: &mut I) -> Result { - Self::from_bits(input.read_byte()?).ok_or_else(|| codec::Error::from("Invalid bytes")) - } - } - /// Consensus is mostly opaque to us #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] pub struct ConsensusMessage { diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 180482e75ece2..ffedcd10c995c 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -34,10 +34,7 @@ use crate::{ network_state::{ NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, }, - protocol::{ - self, message::generic::Roles, NotificationsSink, NotifsHandlerError, PeerInfo, Protocol, - Ready, - }, + protocol::{self, NotificationsSink, NotifsHandlerError, PeerInfo, Protocol, Ready}, transport, ReputationChange, }; @@ -63,6 +60,7 @@ use sc_network_common::{ error::Error, protocol::{ event::{DhtEvent, Event}, + role::Roles, ProtocolName, }, request_responses::{IfDisconnected, RequestFailure}, diff --git a/client/network/transactions/src/lib.rs b/client/network/transactions/src/lib.rs index b75bd411b39c4..a4ddb860a1254 100644 --- a/client/network/transactions/src/lib.rs +++ b/client/network/transactions/src/lib.rs @@ -35,10 +35,7 @@ use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; use sc_network_common::{ config::{NonDefaultSetConfig, NonReservedPeerMode, ProtocolId, SetConfig}, error, - protocol::{ - event::{Event, ObservedRole}, - ProtocolName, - }, + protocol::{event::Event, role::ObservedRole, ProtocolName}, service::{NetworkEventStream, NetworkNotification, NetworkPeers}, utils::{interval, LruHashSet}, ExHashT, From 73b5e070d149b2f73c3521489d71ffd7f39306c2 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 5 Oct 2022 16:53:38 +0300 Subject: [PATCH 2/8] Introduce `NotificationHandshake` type --- client/finality-grandpa/src/lib.rs | 1 + client/network/common/src/config.rs | 25 +++++++++++++++++++++++++ client/network/src/protocol.rs | 19 ++++++++----------- client/network/src/protocol/message.rs | 5 ++--- client/network/src/service.rs | 5 ----- client/network/src/service/tests.rs | 6 ++++++ client/network/test/src/lib.rs | 1 + client/network/transactions/src/lib.rs | 1 + 8 files changed, 44 insertions(+), 19 deletions(-) diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index d5c05fea78aa2..a7326d57c2bf0 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -695,6 +695,7 @@ pub fn grandpa_peers_set_config( fallback_names: grandpa_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect(), // Notifications reach ~256kiB in size at the time of writing on Kusama and Polkadot. max_notification_size: 1024 * 1024, + handshake: None, set_config: sc_network_common::config::SetConfig { in_peers: 0, out_peers: 0, diff --git a/client/network/common/src/config.rs b/client/network/common/src/config.rs index fb23cd0174922..319bb636ffb98 100644 --- a/client/network/common/src/config.rs +++ b/client/network/common/src/config.rs @@ -20,6 +20,7 @@ use crate::protocol; +use codec::Encode; use libp2p::{multiaddr, Multiaddr, PeerId}; use std::{fmt, str, str::FromStr}; @@ -199,6 +200,27 @@ impl Default for SetConfig { } } +#[derive(Debug, Clone)] +pub struct NotificationHandshake(Vec); + +impl NotificationHandshake { + pub fn new(handshake: H) -> Self { + Self(handshake.encode()) + } + + pub fn from_bytes(bytes: Vec) -> Self { + Self(bytes) + } +} + +impl std::ops::Deref for NotificationHandshake { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + /// Extension to [`SetConfig`] for sets that aren't the default set. /// /// > **Note**: As new fields might be added in the future, please consider using the `new` method @@ -218,6 +240,8 @@ pub struct NonDefaultSetConfig { /// If a fallback is used, it will be reported in /// `sc_network::protocol::event::Event::NotificationStreamOpened::negotiated_fallback` pub fallback_names: Vec, + /// Handshake for the protocol + pub handshake: Option, /// Maximum allowed size of single notifications. pub max_notification_size: u64, /// Base configuration. @@ -231,6 +255,7 @@ impl NonDefaultSetConfig { notifications_protocol, max_notification_size, fallback_names: Vec::new(), + handshake: None, set_config: SetConfig { in_peers: 0, out_peers: 0, diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index a2977ba72d42b..a48592c75503d 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -268,7 +268,6 @@ where protocol_id: ProtocolId, fork_id: &Option, network_config: &config::NetworkConfiguration, - notifications_protocols_handshakes: Vec>, metrics_registry: Option<&Registry>, chain_sync: Box>, ) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { @@ -397,16 +396,14 @@ where Notifications::new( peerset, - iter::once(sync_protocol_config).chain( - network_config.extra_sets.iter().zip(notifications_protocols_handshakes).map( - |(s, hs)| notifications::ProtocolConfig { - name: s.notifications_protocol.clone(), - fallback_names: s.fallback_names.clone(), - handshake: hs, - max_notification_size: s.max_notification_size, - }, - ), - ), + iter::once(sync_protocol_config).chain(network_config.extra_sets.iter().map(|s| { + notifications::ProtocolConfig { + name: s.notifications_protocol.clone(), + fallback_names: s.fallback_names.clone(), + handshake: s.handshake.as_ref().map_or(roles.encode(), |h| (*h).to_vec()), + max_notification_size: s.max_notification_size, + } + })), ) }; diff --git a/client/network/src/protocol/message.rs b/client/network/src/protocol/message.rs index b61228abfd273..ef652387d2c7d 100644 --- a/client/network/src/protocol/message.rs +++ b/client/network/src/protocol/message.rs @@ -25,7 +25,7 @@ pub use self::generic::{ }; use codec::{Decode, Encode}; use sc_client_api::StorageProof; -use sc_network_common::{message::RequestId, protocol::role::*}; +use sc_network_common::message::RequestId; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; /// Type alias for using the message type using block type parameters. @@ -57,8 +57,7 @@ pub struct RemoteReadResponse { /// Generic types. pub mod generic { use super::{RemoteCallResponse, RemoteReadResponse}; - use bitflags::bitflags; - use codec::{Decode, Encode, Input, Output}; + use codec::{Decode, Encode, Input}; use sc_client_api::StorageProof; use sc_network_common::{ message::RequestId, diff --git a/client/network/src/service.rs b/client/network/src/service.rs index ffedcd10c995c..6ffae6e566fc8 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -220,17 +220,12 @@ where local_peer_id.to_base58(), ); - let default_notif_handshake_message = Roles::from(¶ms.role).encode(); - let (protocol, peerset_handle, mut known_addresses) = Protocol::new( From::from(¶ms.role), params.chain.clone(), params.protocol_id.clone(), ¶ms.fork_id, ¶ms.network_config, - (0..params.network_config.extra_sets.len()) - .map(|_| default_notif_handshake_message.clone()) - .collect(), params.metrics_registry.as_ref(), params.chain_sync, )?; diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index c8f137f79c6dc..cd9219c9f7942 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -178,6 +178,7 @@ fn build_nodes_one_proto() -> ( notifications_protocol: PROTOCOL_NAME.into(), fallback_names: Vec::new(), max_notification_size: 1024 * 1024, + handshake: None, set_config: Default::default(), }], listen_addresses: vec![listen_addr.clone()], @@ -190,6 +191,7 @@ fn build_nodes_one_proto() -> ( notifications_protocol: PROTOCOL_NAME.into(), fallback_names: Vec::new(), max_notification_size: 1024 * 1024, + handshake: None, set_config: SetConfig { reserved_nodes: vec![MultiaddrWithPeerId { multiaddr: listen_addr, @@ -368,6 +370,7 @@ fn lots_of_incoming_peers_works() { notifications_protocol: PROTOCOL_NAME.into(), fallback_names: Vec::new(), max_notification_size: 1024 * 1024, + handshake: None, set_config: SetConfig { in_peers: u32::MAX, ..Default::default() }, }], transport: TransportConfig::MemoryOnly, @@ -387,6 +390,7 @@ fn lots_of_incoming_peers_works() { notifications_protocol: PROTOCOL_NAME.into(), fallback_names: Vec::new(), max_notification_size: 1024 * 1024, + handshake: None, set_config: SetConfig { reserved_nodes: vec![MultiaddrWithPeerId { multiaddr: listen_addr.clone(), @@ -504,6 +508,7 @@ fn fallback_name_working() { notifications_protocol: NEW_PROTOCOL_NAME.into(), fallback_names: vec![PROTOCOL_NAME.into()], max_notification_size: 1024 * 1024, + handshake: None, set_config: Default::default(), }], listen_addresses: vec![listen_addr.clone()], @@ -516,6 +521,7 @@ fn fallback_name_working() { notifications_protocol: PROTOCOL_NAME.into(), fallback_names: Vec::new(), max_notification_size: 1024 * 1024, + handshake: None, set_config: SetConfig { reserved_nodes: vec![MultiaddrWithPeerId { multiaddr: listen_addr, diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 9d5abf98ceff0..a007e46c5f43d 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -802,6 +802,7 @@ where notifications_protocol: p, fallback_names: Vec::new(), max_notification_size: 1024 * 1024, + handshake: None, set_config: Default::default(), }) .collect(); diff --git a/client/network/transactions/src/lib.rs b/client/network/transactions/src/lib.rs index a4ddb860a1254..5239a94ef23f3 100644 --- a/client/network/transactions/src/lib.rs +++ b/client/network/transactions/src/lib.rs @@ -142,6 +142,7 @@ impl TransactionsHandlerPrototype { notifications_protocol: self.protocol_name.clone(), fallback_names: self.fallback_protocol_names.clone(), max_notification_size: MAX_TRANSACTIONS_SIZE, + handshake: None, set_config: SetConfig { in_peers: 0, out_peers: 0, From 8a295613ef052d2280b5b9e4c959299ca31095ab Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 5 Oct 2022 20:27:13 +0300 Subject: [PATCH 3/8] Move block announce protocol config creation to `ChainSync` --- client/network/common/src/sync/message.rs | 28 +++++++- client/network/src/config.rs | 3 + client/network/src/protocol.rs | 83 +++++------------------ client/network/src/service.rs | 1 + client/network/src/service/tests.rs | 25 ++++++- client/network/sync/src/lib.rs | 82 +++++++++++++++++++--- client/network/test/src/lib.rs | 17 ++++- client/service/src/builder.rs | 22 +++++- 8 files changed, 181 insertions(+), 80 deletions(-) diff --git a/client/network/common/src/sync/message.rs b/client/network/common/src/sync/message.rs index 27ab2704e6471..04e73c0eb9077 100644 --- a/client/network/common/src/sync/message.rs +++ b/client/network/common/src/sync/message.rs @@ -19,10 +19,12 @@ //! Network packet message types. These get serialized and put into the lower level protocol //! payload. +use crate::protocol::role::Roles; + use bitflags::bitflags; use codec::{Decode, Encode, Error, Input, Output}; pub use generic::{BlockAnnounce, FromBlock}; -use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; /// Type alias for using the block request type using block type parameters. pub type BlockRequest = @@ -220,3 +222,27 @@ pub mod generic { } } } + +/// Handshake sent when we open a block announces substream. +#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] +pub struct BlockAnnouncesHandshake { + /// Roles of the node. + pub roles: Roles, + /// Best block number. + pub best_number: NumberFor, + /// Best block hash. + pub best_hash: B::Hash, + /// Genesis block hash. + pub genesis_hash: B::Hash, +} + +impl BlockAnnouncesHandshake { + pub fn build( + roles: Roles, + best_number: NumberFor, + best_hash: B::Hash, + genesis_hash: B::Hash, + ) -> Self { + Self { genesis_hash, roles, best_number, best_hash } + } +} diff --git a/client/network/src/config.rs b/client/network/src/config.rs index b8489e1d1b37f..db3e8f0b98a33 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -94,6 +94,9 @@ where /// Registry for recording prometheus metrics to. pub metrics_registry: Option, + /// Block announce protocol configuration + pub block_announce_config: NonDefaultSetConfig, + /// Request response configuration for the block request protocol. /// /// [`RequestResponseConfig::name`] is used to tag outgoing block requests with the correct diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index a48592c75503d..fa819d92d89e5 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -45,7 +45,8 @@ use sc_network_common::{ request_responses::RequestFailure, sync::{ message::{ - BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, BlockState, + BlockAnnounce, BlockAnnouncesHandshake, BlockAttributes, BlockData, BlockRequest, + BlockResponse, BlockState, }, warp::{EncodedProof, WarpProofRequest}, BadPeer, ChainSync, OnBlockData, OnBlockJustification, OnStateData, OpaqueBlockRequest, @@ -82,8 +83,6 @@ const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100); /// Maximum number of known block hashes to keep for a peer. const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead -/// Maximum allowed size for a block announce. -const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024; /// Maximum size used for notifications in the block announce and transaction protocols. // Must be equal to `max(MAX_BLOCK_ANNOUNCE_SIZE, MAX_TRANSACTIONS_SIZE)`. @@ -232,30 +231,6 @@ pub struct PeerInfo { pub best_number: ::Number, } -/// Handshake sent when we open a block announces substream. -#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] -struct BlockAnnouncesHandshake { - /// Roles of the node. - roles: Roles, - /// Best block number. - best_number: NumberFor, - /// Best block hash. - best_hash: B::Hash, - /// Genesis block hash. - genesis_hash: B::Hash, -} - -impl BlockAnnouncesHandshake { - fn build( - roles: Roles, - best_number: NumberFor, - best_hash: B::Hash, - genesis_hash: B::Hash, - ) -> Self { - Self { genesis_hash, roles, best_number, best_hash } - } -} - impl Protocol where B: BlockT, @@ -263,6 +238,7 @@ where { /// Create a new instance. pub fn new( + block_announces_protocol: sc_network_common::config::NonDefaultSetConfig, roles: Roles, chain: Arc, protocol_id: ProtocolId, @@ -361,48 +337,25 @@ where sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { sets }) }; - let block_announces_protocol = { - let genesis_hash = - chain.hash(0u32.into()).ok().flatten().expect("Genesis block exists; qed"); - let genesis_hash = genesis_hash.as_ref(); - if let Some(fork_id) = fork_id { - format!( - "/{}/{}/block-announces/1", - array_bytes::bytes2hex("", genesis_hash), - fork_id - ) - } else { - format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash)) - } - }; - - let legacy_ba_protocol_name = format!("/{}/block-announces/1", protocol_id.as_ref()); + // println!("{:#?}", network_config.extra_sets); let behaviour = { - let best_number = info.best_number; - let best_hash = info.best_hash; - let genesis_hash = info.genesis_hash; - - let block_announces_handshake = - BlockAnnouncesHandshake::::build(roles, best_number, best_hash, genesis_hash) - .encode(); - - let sync_protocol_config = notifications::ProtocolConfig { - name: block_announces_protocol.into(), - fallback_names: iter::once(legacy_ba_protocol_name.into()).collect(), - handshake: block_announces_handshake, - max_notification_size: MAX_BLOCK_ANNOUNCE_SIZE, - }; - Notifications::new( peerset, - iter::once(sync_protocol_config).chain(network_config.extra_sets.iter().map(|s| { - notifications::ProtocolConfig { - name: s.notifications_protocol.clone(), - fallback_names: s.fallback_names.clone(), - handshake: s.handshake.as_ref().map_or(roles.encode(), |h| (*h).to_vec()), - max_notification_size: s.max_notification_size, - } + iter::once(notifications::ProtocolConfig { + name: block_announces_protocol.notifications_protocol.clone(), + fallback_names: block_announces_protocol.fallback_names.clone(), + handshake: block_announces_protocol + .handshake + .as_ref() + .map_or(roles.encode(), |h| (*h).to_vec()), + max_notification_size: block_announces_protocol.max_notification_size, + }) + .chain(network_config.extra_sets.iter().map(|s| notifications::ProtocolConfig { + name: s.notifications_protocol.clone(), + fallback_names: s.fallback_names.clone(), + handshake: s.handshake.as_ref().map_or(roles.encode(), |h| (*h).to_vec()), + max_notification_size: s.max_notification_size, })), ) }; diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 6ffae6e566fc8..159c479949b3b 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -221,6 +221,7 @@ where ); let (protocol, peerset_handle, mut known_addresses) = Protocol::new( + params.block_announce_config, From::from(¶ms.role), params.chain.clone(), params.protocol_id.clone(), diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index cd9219c9f7942..3671715592f95 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -20,10 +20,15 @@ use crate::{config, NetworkService, NetworkWorker}; use futures::prelude::*; use libp2p::PeerId; +use sc_client_api::{BlockBackend, HeaderBackend}; use sc_network_common::{ - config::{MultiaddrWithPeerId, NonDefaultSetConfig, ProtocolId, SetConfig, TransportConfig}, - protocol::event::Event, + config::{ + MultiaddrWithPeerId, NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, + ProtocolId, SetConfig, TransportConfig, + }, + protocol::{event::Event, role::Roles}, service::{NetworkEventStream, NetworkNotification, NetworkPeers, NetworkStateInfo}, + sync::message::BlockAnnouncesHandshake, }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ @@ -32,7 +37,7 @@ use sc_network_sync::{ }; use sp_consensus::block_validation::DefaultBlockAnnounceValidator; use sp_runtime::traits::{Block as BlockT, Header as _}; -use std::{sync::Arc, time::Duration}; +use std::{iter, sync::Arc, time::Duration}; use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _}; type TestNetworkService = NetworkService< @@ -132,7 +137,21 @@ fn build_test_full_node( None, ) .unwrap(); + let block_announce_config = chain_sync.get_block_announce_proto_config( + protocol_id.clone(), + &fork_id, + Roles::from(&config::Role::Full), + client.info().best_number, + client.info().best_hash, + client + .block_hash(0u32.into()) + .ok() + .flatten() + .expect("Genesis block exists; qed"), + ); + let worker = NetworkWorker::new(config::Params { + block_announce_config, role: config::Role::Full, executor: None, network_config, diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 6ad4a8fbbdcc5..48508bbee72e4 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -50,15 +50,27 @@ use log::{debug, error, info, trace, warn}; use prost::Message; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; -use sc_network_common::sync::{ - message::{ - BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, - FromBlock, +use sc_network_common::{ + config::{ + NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, ProtocolId, SetConfig, + }, + protocol::role::Roles, + sync::{ + message::{ + BlockAnnounce, + BlockAnnouncesHandshake, // TODO: move away from common to here + BlockAttributes, + BlockData, + BlockRequest, + BlockResponse, + Direction, + FromBlock, + }, + warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress, WarpSyncProvider}, + BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, OnStateData, + OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, + PollBlockAnnounceValidation, SyncMode, SyncState, SyncStatus, }, - warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress, WarpSyncProvider}, - BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, OnStateData, - OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, - PollBlockAnnounceValidation, SyncMode, SyncState, SyncStatus, }; use sp_arithmetic::traits::Saturating; use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata}; @@ -76,6 +88,7 @@ use sp_runtime::{ }; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, + iter, ops::Range, pin::Pin, sync::Arc, @@ -121,6 +134,9 @@ const MAJOR_SYNC_BLOCKS: u8 = 5; /// Number of peers that need to be connected before warp sync is started. const MIN_PEERS_TO_START_WARP_SYNC: usize = 3; +/// Maximum allowed size for a block announce. +const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024; + mod rep { use sc_peerset::ReputationChange as Rep; /// Reputation change when a peer sent us a message that led to a @@ -2233,6 +2249,56 @@ where } None } + + /// Get handshake for the block announcement protocol + pub fn get_block_announce_proto_handshake(&self) -> Vec { + todo!(); + } + + /// Get config for the block announcement protocol + pub fn get_block_announce_proto_config( + &self, + protocol_id: ProtocolId, + fork_id: &Option, + roles: Roles, + best_number: NumberFor, + best_hash: B::Hash, + genesis_hash: B::Hash, + ) -> NonDefaultSetConfig { + let block_announces_protocol = { + let genesis_hash = genesis_hash.as_ref(); + if let Some(ref fork_id) = fork_id { + format!( + "/{}/{}/block-announces/1", + array_bytes::bytes2hex("", genesis_hash), + fork_id + ) + } else { + format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash)) + } + }; + + NonDefaultSetConfig { + notifications_protocol: block_announces_protocol.into(), + fallback_names: iter::once( + format!("/{}/block-announces/1", protocol_id.as_ref()).into(), + ) + .collect(), + max_notification_size: MAX_BLOCK_ANNOUNCE_SIZE, + handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::::build( + roles, + best_number, + best_hash, + genesis_hash, + ))), + set_config: SetConfig { + in_peers: 0, + out_peers: 0, + reserved_nodes: Vec::new(), + non_reserved_mode: NonReservedPeerMode::Deny, + }, + } + } } // This is purely during a backwards compatible transitionary period and should be removed diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index a007e46c5f43d..8f67d9e230db2 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -24,6 +24,7 @@ mod sync; use std::{ collections::HashMap, + iter, marker::PhantomData, pin::Pin, sync::Arc, @@ -55,7 +56,7 @@ use sc_network_common::{ config::{ MultiaddrWithPeerId, NonDefaultSetConfig, NonReservedPeerMode, ProtocolId, TransportConfig, }, - protocol::ProtocolName, + protocol::{role::Roles, ProtocolName}, service::{NetworkBlock, NetworkStateInfo, NetworkSyncForkRequest}, sync::warp::{AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncProvider}, }; @@ -880,6 +881,19 @@ where Some(warp_sync), ) .unwrap(); + let block_announce_config = chain_sync.get_block_announce_proto_config( + protocol_id.clone(), + &fork_id, + Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }), + client.info().best_number, + client.info().best_hash, + client + .block_hash(0u32.into()) + .ok() + .flatten() + .expect("Genesis block exists; qed"), + ); + let network = NetworkWorker::new(sc_network::config::Params { role: if config.is_authority { Role::Authority } else { Role::Full }, executor: None, @@ -890,6 +904,7 @@ where import_queue, chain_sync: Box::new(chain_sync), metrics_registry: None, + block_announce_config, block_request_protocol_config, state_request_protocol_config, light_client_request_protocol_config, diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index dfd532a14c172..674ddf2c3cd01 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -24,6 +24,7 @@ use crate::{ metrics::MetricsService, start_rpc_servers, RpcHandlers, SpawnTaskHandle, TaskManager, TransactionPoolAdapter, }; +use codec::Encode; use futures::{channel::oneshot, future::ready, FutureExt, StreamExt}; use jsonrpsee::RpcModule; use log::info; @@ -40,8 +41,10 @@ use sc_keystore::LocalKeystore; use sc_network::{config::SyncMode, NetworkService}; use sc_network_bitswap::BitswapRequestHandler; use sc_network_common::{ + config::{NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, SetConfig}, + protocol::role::Roles, service::{NetworkStateInfo, NetworkStatusProvider}, - sync::warp::WarpSyncProvider, + sync::{message::BlockAnnouncesHandshake, warp::WarpSyncProvider}, }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ @@ -71,7 +74,7 @@ use sp_runtime::{ traits::{Block as BlockT, BlockIdTo, NumberFor, Zero}, BuildStorage, }; -use std::{str::FromStr, sync::Arc, time::SystemTime}; +use std::{iter, str::FromStr, sync::Arc, time::SystemTime}; /// Full client type. pub type TFullClient = @@ -843,6 +846,18 @@ where config.network.max_parallel_downloads, warp_sync_provider, )?; + let block_announce_config = chain_sync.get_block_announce_proto_config( + protocol_id.clone(), + &config.chain_spec.fork_id().map(ToOwned::to_owned), + Roles::from(&config.role.clone()), + client.info().best_number, + client.info().best_hash, + client + .block_hash(0u32.into()) + .ok() + .flatten() + .expect("Genesis block exists; qed"), + ); request_response_protocol_configs.push(config.network.ipfs_server.then(|| { let (handler, protocol_config) = BitswapRequestHandler::new(client.clone()); @@ -865,6 +880,7 @@ where import_queue: Box::new(import_queue), chain_sync: Box::new(chain_sync), metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), + block_announce_config, block_request_protocol_config, state_request_protocol_config, warp_sync_protocol_config, @@ -885,6 +901,8 @@ where .expect("Genesis block exists; qed"), config.chain_spec.fork_id(), ); + + // configure transaction and block announce protocols network_params .network_config .extra_sets From a59781be660c46bc357f82cf343af66891ed8abd Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Thu, 6 Oct 2022 18:24:04 +0300 Subject: [PATCH 4/8] Include block announcement into `notification_protocols` --- client/network/Cargo.toml | 2 +- client/network/src/protocol.rs | 58 ++++----------- client/network/src/service.rs | 6 +- client/network/src/service/tests.rs | 105 ++++++++++++++++++++++++---- client/network/sync/src/lib.rs | 15 +--- client/network/test/src/lib.rs | 1 - client/service/src/builder.rs | 8 +-- 7 files changed, 113 insertions(+), 82 deletions(-) diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 8e3d68851c423..0ee2f40b6c09b 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -57,7 +57,7 @@ sp-runtime = { version = "6.0.0", path = "../../primitives/runtime" } [dev-dependencies] assert_matches = "1.3" -async-std = "1.11.0" +async-std = { version = "1.11.0", features = ["attributes"] } rand = "0.7.2" tempfile = "3.1.0" sc-network-light = { version = "0.10.0-dev", path = "./light" } diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index fa819d92d89e5..f47bb2d9c2461 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -39,7 +39,7 @@ use sc_consensus::import_queue::{ BlockImportError, BlockImportStatus, IncomingBlock, RuntimeOrigin, }; use sc_network_common::{ - config::{NonReservedPeerMode, ProtocolId}, + config::NonReservedPeerMode, error, protocol::{role::Roles, ProtocolName}, request_responses::RequestFailure, @@ -238,14 +238,12 @@ where { /// Create a new instance. pub fn new( - block_announces_protocol: sc_network_common::config::NonDefaultSetConfig, roles: Roles, chain: Arc, - protocol_id: ProtocolId, - fork_id: &Option, network_config: &config::NetworkConfiguration, metrics_registry: Option<&Registry>, chain_sync: Box>, + block_announces_protocol: sc_network_common::config::NonDefaultSetConfig, ) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { let info = chain.info(); @@ -337,8 +335,6 @@ where sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { sets }) }; - // println!("{:#?}", network_config.extra_sets); - let behaviour = { Notifications::new( peerset, @@ -384,10 +380,8 @@ where }, peerset_handle: peerset_handle.clone(), behaviour, - notification_protocols: network_config - .extra_sets - .iter() - .map(|s| s.notifications_protocol.clone()) + notification_protocols: iter::once(block_announces_protocol.notifications_protocol) + .chain(network_config.extra_sets.iter().map(|s| s.notifications_protocol.clone())) .collect(), bad_handshake_substreams: Default::default(), metrics: if let Some(r) = metrics_registry { @@ -416,10 +410,7 @@ where pub fn disconnect_peer(&mut self, peer_id: &PeerId, protocol_name: ProtocolName) { if let Some(position) = self.notification_protocols.iter().position(|p| *p == protocol_name) { - self.behaviour.disconnect_peer( - peer_id, - sc_peerset::SetId::from(position + NUM_HARDCODED_PEERSETS), - ); + self.behaviour.disconnect_peer(peer_id, sc_peerset::SetId::from(position)); } else { warn!(target: "sub-libp2p", "disconnect_peer() with invalid protocol name") } @@ -1042,8 +1033,7 @@ where /// Sets the list of reserved peers for the given protocol/peerset. pub fn set_reserved_peerset_peers(&self, protocol: ProtocolName, peers: HashSet) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset_handle - .set_reserved_peers(sc_peerset::SetId::from(index + NUM_HARDCODED_PEERSETS), peers); + self.peerset_handle.set_reserved_peers(sc_peerset::SetId::from(index), peers); } else { error!( target: "sub-libp2p", @@ -1056,10 +1046,7 @@ where /// Removes a `PeerId` from the list of reserved peers. pub fn remove_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset_handle.remove_reserved_peer( - sc_peerset::SetId::from(index + NUM_HARDCODED_PEERSETS), - peer, - ); + self.peerset_handle.remove_reserved_peer(sc_peerset::SetId::from(index), peer); } else { error!( target: "sub-libp2p", @@ -1072,8 +1059,7 @@ where /// Adds a `PeerId` to the list of reserved peers. pub fn add_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset_handle - .add_reserved_peer(sc_peerset::SetId::from(index + NUM_HARDCODED_PEERSETS), peer); + self.peerset_handle.add_reserved_peer(sc_peerset::SetId::from(index), peer); } else { error!( target: "sub-libp2p", @@ -1095,8 +1081,7 @@ where /// Add a peer to a peers set. pub fn add_to_peers_set(&self, protocol: ProtocolName, peer: PeerId) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset_handle - .add_to_peers_set(sc_peerset::SetId::from(index + NUM_HARDCODED_PEERSETS), peer); + self.peerset_handle.add_to_peers_set(sc_peerset::SetId::from(index), peer); } else { error!( target: "sub-libp2p", @@ -1109,10 +1094,7 @@ where /// Remove a peer from a peers set. pub fn remove_from_peers_set(&self, protocol: ProtocolName, peer: PeerId) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset_handle.remove_from_peers_set( - sc_peerset::SetId::from(index + NUM_HARDCODED_PEERSETS), - peer, - ); + self.peerset_handle.remove_from_peers_set(sc_peerset::SetId::from(index), peer); } else { error!( target: "sub-libp2p", @@ -1579,9 +1561,7 @@ where ) { (Ok(roles), _) => CustomMessageOutcome::NotificationStreamOpened { remote: peer_id, - protocol: self.notification_protocols - [usize::from(set_id) - NUM_HARDCODED_PEERSETS] - .clone(), + protocol: self.notification_protocols[usize::from(set_id)].clone(), negotiated_fallback, roles, notifications_sink, @@ -1593,9 +1573,7 @@ where // TODO: remove this after https://github.com/paritytech/substrate/issues/5685 CustomMessageOutcome::NotificationStreamOpened { remote: peer_id, - protocol: self.notification_protocols - [usize::from(set_id) - NUM_HARDCODED_PEERSETS] - .clone(), + protocol: self.notification_protocols[usize::from(set_id)].clone(), negotiated_fallback, roles: peer.info.roles, notifications_sink, @@ -1619,9 +1597,7 @@ where } else { CustomMessageOutcome::NotificationStreamReplaced { remote: peer_id, - protocol: self.notification_protocols - [usize::from(set_id) - NUM_HARDCODED_PEERSETS] - .clone(), + protocol: self.notification_protocols[usize::from(set_id)].clone(), notifications_sink, } }, @@ -1646,9 +1622,7 @@ where } else { CustomMessageOutcome::NotificationStreamClosed { remote: peer_id, - protocol: self.notification_protocols - [usize::from(set_id) - NUM_HARDCODED_PEERSETS] - .clone(), + protocol: self.notification_protocols[usize::from(set_id)].clone(), } } }, @@ -1681,9 +1655,7 @@ where _ if self.bad_handshake_substreams.contains(&(peer_id, set_id)) => CustomMessageOutcome::None, _ => { - let protocol_name = self.notification_protocols - [usize::from(set_id) - NUM_HARDCODED_PEERSETS] - .clone(); + let protocol_name = self.notification_protocols[usize::from(set_id)].clone(); CustomMessageOutcome::NotificationsReceived { remote: peer_id, messages: vec![(protocol_name, message.freeze())], diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 159c479949b3b..28e479b702779 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -38,7 +38,6 @@ use crate::{ transport, ReputationChange, }; -use codec::Encode as _; use futures::{channel::oneshot, prelude::*}; use libp2p::{ core::{either::EitherError, upgrade, ConnectedPoint, Executor}, @@ -60,7 +59,6 @@ use sc_network_common::{ error::Error, protocol::{ event::{DhtEvent, Event}, - role::Roles, ProtocolName, }, request_responses::{IfDisconnected, RequestFailure}, @@ -221,14 +219,12 @@ where ); let (protocol, peerset_handle, mut known_addresses) = Protocol::new( - params.block_announce_config, From::from(¶ms.role), params.chain.clone(), - params.protocol_id.clone(), - ¶ms.fork_id, ¶ms.network_config, params.metrics_registry.as_ref(), params.chain_sync, + params.block_announce_config, )?; // List of multiaddresses that we know in the network. diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index 3671715592f95..38776e9853605 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -37,7 +37,7 @@ use sc_network_sync::{ }; use sp_consensus::block_validation::DefaultBlockAnnounceValidator; use sp_runtime::traits::{Block as BlockT, Header as _}; -use std::{iter, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _}; type TestNetworkService = NetworkService< @@ -137,18 +137,30 @@ fn build_test_full_node( None, ) .unwrap(); - let block_announce_config = chain_sync.get_block_announce_proto_config( - protocol_id.clone(), - &fork_id, - Roles::from(&config::Role::Full), - client.info().best_number, - client.info().best_hash, - client - .block_hash(0u32.into()) - .ok() - .flatten() - .expect("Genesis block exists; qed"), - ); + + let block_announce_config = NonDefaultSetConfig { + notifications_protocol: BLOCK_ANNOUNCE_PROTO_NAME.into(), + fallback_names: vec![], + max_notification_size: 1024 * 1024, + handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::< + substrate_test_runtime_client::runtime::Block, + >::build( + Roles::from(&config::Role::Full), + client.info().best_number, + client.info().best_hash, + client + .block_hash(0u32.into()) + .ok() + .flatten() + .expect("Genesis block exists; qed"), + ))), + set_config: SetConfig { + in_peers: 0, + out_peers: 0, + reserved_nodes: Vec::new(), + non_reserved_mode: NonReservedPeerMode::Deny, + }, + }; let worker = NetworkWorker::new(config::Params { block_announce_config, @@ -180,6 +192,7 @@ fn build_test_full_node( (service, event_stream) } +const BLOCK_ANNOUNCE_PROTO_NAME: &str = "/block-announces"; const PROTOCOL_NAME: &str = "/foo"; /// Builds two nodes and their associated events stream. @@ -586,6 +599,72 @@ fn fallback_name_working() { }); } +// Disconnect peer by calling `Protocol::disconnect_peer()` with the supplied block announcement +// protocol name and verify that `SyncDisconnected` event is emitted +#[async_std::test] +async fn disconnect_sync_peer_using_block_announcement_protocol_name() { + let listen_addr = config::build_multiaddr![Memory(rand::random::())]; + + let (node1, mut events_stream1) = build_test_full_node(config::NetworkConfiguration { + extra_sets: vec![NonDefaultSetConfig { + notifications_protocol: PROTOCOL_NAME.into(), + fallback_names: vec![], + max_notification_size: 1024 * 1024, + handshake: None, + set_config: Default::default(), + }], + listen_addresses: vec![listen_addr.clone()], + transport: TransportConfig::MemoryOnly, + ..config::NetworkConfiguration::new_local() + }); + + let (node2, mut events_stream2) = build_test_full_node(config::NetworkConfiguration { + extra_sets: vec![NonDefaultSetConfig { + notifications_protocol: PROTOCOL_NAME.into(), + fallback_names: Vec::new(), + max_notification_size: 1024 * 1024, + handshake: None, + set_config: SetConfig { + reserved_nodes: vec![MultiaddrWithPeerId { + multiaddr: listen_addr, + peer_id: node1.local_peer_id(), + }], + ..Default::default() + }, + }], + listen_addresses: vec![], + transport: TransportConfig::MemoryOnly, + ..config::NetworkConfiguration::new_local() + }); + + loop { + match events_stream1.next().await.unwrap() { + Event::NotificationStreamOpened { .. } => break, + _ => {}, + }; + } + + loop { + match events_stream2.next().await.unwrap() { + Event::NotificationStreamOpened { .. } => break, + _ => {}, + }; + } + + // disconnect peer using `PROTOCOL_NAME`, verify `NotificationStreamClosed` event is emitted + node2.disconnect_peer(node1.local_peer_id(), PROTOCOL_NAME.into()); + assert!(std::matches!( + events_stream2.next().await, + Some(Event::NotificationStreamClosed { .. }) + )); + let _ = events_stream2.next().await; // ignore the reopen event + + // now disconnect using the block announcement protocol, verify that `SyncDisconnected` is + // emitted + node2.disconnect_peer(node1.local_peer_id(), BLOCK_ANNOUNCE_PROTO_NAME.into()); + assert!(std::matches!(events_stream2.next().await, Some(Event::SyncDisconnected { .. }))); +} + #[test] #[should_panic(expected = "don't match the transport")] fn ensure_listen_addresses_consistent_with_transport_memory() { diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 48508bbee72e4..8f5eec7603303 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -57,14 +57,8 @@ use sc_network_common::{ protocol::role::Roles, sync::{ message::{ - BlockAnnounce, - BlockAnnouncesHandshake, // TODO: move away from common to here - BlockAttributes, - BlockData, - BlockRequest, - BlockResponse, - Direction, - FromBlock, + BlockAnnounce, BlockAnnouncesHandshake, BlockAttributes, BlockData, BlockRequest, + BlockResponse, Direction, FromBlock, }, warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress, WarpSyncProvider}, BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, OnStateData, @@ -2250,11 +2244,6 @@ where None } - /// Get handshake for the block announcement protocol - pub fn get_block_announce_proto_handshake(&self) -> Vec { - todo!(); - } - /// Get config for the block announcement protocol pub fn get_block_announce_proto_config( &self, diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 8f67d9e230db2..3c335e419defb 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -24,7 +24,6 @@ mod sync; use std::{ collections::HashMap, - iter, marker::PhantomData, pin::Pin, sync::Arc, diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 674ddf2c3cd01..2d0960dc66e8d 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -24,7 +24,6 @@ use crate::{ metrics::MetricsService, start_rpc_servers, RpcHandlers, SpawnTaskHandle, TaskManager, TransactionPoolAdapter, }; -use codec::Encode; use futures::{channel::oneshot, future::ready, FutureExt, StreamExt}; use jsonrpsee::RpcModule; use log::info; @@ -41,10 +40,9 @@ use sc_keystore::LocalKeystore; use sc_network::{config::SyncMode, NetworkService}; use sc_network_bitswap::BitswapRequestHandler; use sc_network_common::{ - config::{NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, SetConfig}, protocol::role::Roles, service::{NetworkStateInfo, NetworkStatusProvider}, - sync::{message::BlockAnnouncesHandshake, warp::WarpSyncProvider}, + sync::warp::WarpSyncProvider, }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ @@ -74,7 +72,7 @@ use sp_runtime::{ traits::{Block as BlockT, BlockIdTo, NumberFor, Zero}, BuildStorage, }; -use std::{iter, str::FromStr, sync::Arc, time::SystemTime}; +use std::{str::FromStr, sync::Arc, time::SystemTime}; /// Full client type. pub type TFullClient = @@ -901,8 +899,6 @@ where .expect("Genesis block exists; qed"), config.chain_spec.fork_id(), ); - - // configure transaction and block announce protocols network_params .network_config .extra_sets From e1a2d7dc9d9faf5cc3c7b757c94e39341169a33a Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Sun, 9 Oct 2022 11:17:38 +0300 Subject: [PATCH 5/8] Apply review comments --- client/network/common/src/protocol/role.rs | 6 +++--- client/network/src/protocol.rs | 5 +---- client/network/src/service/tests.rs | 2 +- client/network/sync/src/lib.rs | 3 +++ client/network/test/src/lib.rs | 4 ++-- client/service/src/builder.rs | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/client/network/common/src/protocol/role.rs b/client/network/common/src/protocol/role.rs index f30d3de87d293..ed22830fd7170 100644 --- a/client/network/common/src/protocol/role.rs +++ b/client/network/common/src/protocol/role.rs @@ -53,7 +53,7 @@ pub enum Role { impl Role { /// True for [`Role::Authority`]. pub fn is_authority(&self) -> bool { - matches!(self, Self::Authority { .. }) + matches!(self, Self::Authority) } } @@ -61,7 +61,7 @@ impl std::fmt::Display for Role { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Full => write!(f, "FULL"), - Self::Authority { .. } => write!(f, "AUTHORITY"), + Self::Authority => write!(f, "AUTHORITY"), } } } @@ -101,7 +101,7 @@ impl<'a> From<&'a Role> for Roles { fn from(roles: &'a Role) -> Self { match roles { Role::Full => Self::FULL, - Role::Authority { .. } => Self::AUTHORITY, + Role::Authority => Self::AUTHORITY, } } } diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index f47bb2d9c2461..897e9085174f3 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -341,10 +341,7 @@ where iter::once(notifications::ProtocolConfig { name: block_announces_protocol.notifications_protocol.clone(), fallback_names: block_announces_protocol.fallback_names.clone(), - handshake: block_announces_protocol - .handshake - .as_ref() - .map_or(roles.encode(), |h| (*h).to_vec()), + handshake: block_announces_protocol.handshake.as_ref().unwrap().to_vec(), max_notification_size: block_announces_protocol.max_notification_size, }) .chain(network_config.extra_sets.iter().map(|s| notifications::ProtocolConfig { diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index 38776e9853605..1836ec520eeee 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -149,7 +149,7 @@ fn build_test_full_node( client.info().best_number, client.info().best_hash, client - .block_hash(0u32.into()) + .block_hash(Zero::zero()) .ok() .flatten() .expect("Genesis block exists; qed"), diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 8f5eec7603303..55ee50e9ae1bc 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -31,6 +31,7 @@ pub mod block_request_handler; pub mod blocks; mod schema; +pub mod service; pub mod state; pub mod state_request_handler; pub mod warp; @@ -2280,6 +2281,8 @@ where best_hash, genesis_hash, ))), + // NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement + // protocol is still hardcoded into the peerset. set_config: SetConfig { in_peers: 0, out_peers: 0, diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 3c335e419defb..a7c58631dc0f7 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -77,7 +77,7 @@ use sp_core::H256; use sp_runtime::{ codec::{Decode, Encode}, generic::{BlockId, OpaqueDigestItemId}, - traits::{Block as BlockT, Header as HeaderT, NumberFor}, + traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero}, Justification, Justifications, }; use substrate_test_runtime_client::AccountKeyring; @@ -887,7 +887,7 @@ where client.info().best_number, client.info().best_hash, client - .block_hash(0u32.into()) + .block_hash(Zero::zero()) .ok() .flatten() .expect("Genesis block exists; qed"), diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 2d0960dc66e8d..4301e17a8c31e 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -851,7 +851,7 @@ where client.info().best_number, client.info().best_hash, client - .block_hash(0u32.into()) + .block_hash(Zero::zero()) .ok() .flatten() .expect("Genesis block exists; qed"), From ba9bf249e73a8722b5a27e250e979431f15977ff Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Sun, 9 Oct 2022 13:19:50 +0300 Subject: [PATCH 6/8] Remove unneeded include --- client/network/sync/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index cf0370ad910fb..280e530eca9a9 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -31,7 +31,6 @@ pub mod block_request_handler; pub mod blocks; mod schema; -pub mod service; pub mod state; pub mod state_request_handler; pub mod warp; From b8998cd00e15ce4709a1d1050eaf351c08293118 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Sun, 9 Oct 2022 14:03:36 +0300 Subject: [PATCH 7/8] Add missing include --- client/network/src/service/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index 1836ec520eeee..7c651c675b83e 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -36,7 +36,7 @@ use sc_network_sync::{ ChainSync, }; use sp_consensus::block_validation::DefaultBlockAnnounceValidator; -use sp_runtime::traits::{Block as BlockT, Header as _}; +use sp_runtime::traits::{Block as BlockT, Header as _, Zero}; use std::{sync::Arc, time::Duration}; use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _}; From 9e8463cb3e1193c7543b04deac6156dc87590b76 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Mon, 10 Oct 2022 09:30:17 +0300 Subject: [PATCH 8/8] Apply review comments --- client/network/common/src/config.rs | 5 ++++- client/network/src/protocol.rs | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/client/network/common/src/config.rs b/client/network/common/src/config.rs index 319bb636ffb98..e4a7f04c8d6e8 100644 --- a/client/network/common/src/config.rs +++ b/client/network/common/src/config.rs @@ -200,14 +200,17 @@ impl Default for SetConfig { } } +/// Custom handshake for the notification protocol #[derive(Debug, Clone)] pub struct NotificationHandshake(Vec); impl NotificationHandshake { + /// Create new `NotificationHandshake` from an object that implements `Encode` pub fn new(handshake: H) -> Self { Self(handshake.encode()) } + /// Create new `NotificationHandshake` from raw bytes pub fn from_bytes(bytes: Vec) -> Self { Self(bytes) } @@ -240,7 +243,7 @@ pub struct NonDefaultSetConfig { /// If a fallback is used, it will be reported in /// `sc_network::protocol::event::Event::NotificationStreamOpened::negotiated_fallback` pub fallback_names: Vec, - /// Handshake for the protocol + /// Handshake of the protocol pub handshake: Option, /// Maximum allowed size of single notifications. pub max_notification_size: u64, diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 897e9085174f3..325e044527efa 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -338,6 +338,9 @@ where let behaviour = { Notifications::new( peerset, + // NOTE: Block announcement protocol is still very much hardcoded into `Protocol`. + // This protocol must be the first notification protocol given to + // `Notifications` iter::once(notifications::ProtocolConfig { name: block_announces_protocol.notifications_protocol.clone(), fallback_names: block_announces_protocol.fallback_names.clone(),