From bf458b39613c882be4be13394d790f85f1756d77 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 10 Sep 2020 14:56:50 +0200 Subject: [PATCH 1/3] Allow remotes to not open a legacy substream --- client/network/src/protocol.rs | 234 +++++++++--------- .../src/protocol/generic_proto/behaviour.rs | 17 +- .../protocol/generic_proto/handler/group.rs | 72 ++++-- .../src/protocol/generic_proto/tests.rs | 7 +- client/network/src/protocol/message.rs | 6 - client/rpc-api/src/system/helpers.rs | 5 +- client/rpc/src/system/tests.rs | 2 - client/service/src/lib.rs | 1 - 8 files changed, 190 insertions(+), 154 deletions(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index a585f91145ede..f9ec8ea4c4c5a 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -36,7 +36,7 @@ use sp_consensus::{ block_validation::BlockAnnounceValidator, import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin} }; -use codec::{Decode, Encode}; +use codec::{Decode, DecodeAll, Encode}; use sp_runtime::{generic::BlockId, ConsensusEngineId, Justification}; use sp_runtime::traits::{ Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub @@ -53,7 +53,7 @@ use std::borrow::Cow; use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry}; use std::sync::Arc; use std::fmt::Write; -use std::{io, num::NonZeroUsize, pin::Pin, task::Poll, time}; +use std::{io, iter, num::NonZeroUsize, pin::Pin, task::Poll, time}; use log::{log, Level, trace, debug, warn, error}; use wasm_timer::Instant; @@ -271,8 +271,6 @@ struct Peer { pub struct PeerInfo { /// Roles pub roles: Roles, - /// Protocol version - pub protocol_version: u32, /// Peer best block hash pub best_hash: B::Hash, /// Peer best block number @@ -391,14 +389,6 @@ impl Protocol { }; let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config); - let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); - let mut behaviour = GenericProto::new( - local_peer_id, - protocol_id.clone(), - versions, - build_status_message(&config, &chain), - peerset, - ); let mut legacy_equiv_by_name = HashMap::new(); @@ -409,7 +399,6 @@ impl Protocol { proto.push_str("/transactions/1"); proto }); - behaviour.register_notif_protocol(transactions_protocol.clone(), Vec::new()); legacy_equiv_by_name.insert(transactions_protocol.clone(), Fallback::Transactions); let block_announces_protocol: Cow<'static, str> = Cow::from({ @@ -419,12 +408,23 @@ impl Protocol { proto.push_str("/block-announces/1"); proto }); - behaviour.register_notif_protocol( - block_announces_protocol.clone(), - BlockAnnouncesHandshake::build(&config, &chain).encode() - ); legacy_equiv_by_name.insert(block_announces_protocol.clone(), Fallback::BlockAnnounce); + let behaviour = { + let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); + let block_announces_handshake = BlockAnnouncesHandshake::build(&config, &chain).encode(); + GenericProto::new( + local_peer_id, + protocol_id.clone(), + versions, + build_status_message(&config, &chain), + peerset, + iter::once((block_announces_protocol.clone(), block_announces_handshake)) + .chain(iter::once((transactions_protocol.clone(), vec![]))), + 0, + ) + }; + let protocol = Protocol { tick_timeout: Box::pin(interval(TICK_TIMEOUT)), propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)), @@ -829,99 +829,86 @@ impl Protocol { } } - /// Called on receipt of a status message via the legacy protocol on the first connection between two peers. - pub fn on_peer_connected( + /// Called on the first connection between two peers, after their exchange of handshake. + fn on_peer_connected( &mut self, who: PeerId, - status: message::Status, + status: BlockAnnouncesHandshake, notifications_sink: NotificationsSink, ) -> CustomMessageOutcome { trace!(target: "sync", "New peer {} {:?}", who, status); - let _protocol_version = { - if self.context_data.peers.contains_key(&who) { - debug!(target: "sync", "Ignoring duplicate status packet from {}", who); - return CustomMessageOutcome::None; - } - if status.genesis_hash != self.genesis_hash { - log!( - target: "sync", - if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace }, - "Peer is on different chain (our genesis: {} theirs: {})", - self.genesis_hash, status.genesis_hash - ); - self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH); - self.behaviour.disconnect_peer(&who); - if self.boot_node_ids.contains(&who) { - error!( - target: "sync", - "Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})", - who, - self.genesis_hash, - status.genesis_hash, - ); - } + if self.context_data.peers.contains_key(&who) { + debug!(target: "sync", "Ignoring duplicate status packet from {}", who); + return CustomMessageOutcome::None; + } + if status.genesis_hash != self.genesis_hash { + log!( + target: "sync", + if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace }, + "Peer is on different chain (our genesis: {} theirs: {})", + self.genesis_hash, status.genesis_hash + ); + self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH); + self.behaviour.disconnect_peer(&who); - return CustomMessageOutcome::None; - } - if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version { - log!( + if self.boot_node_ids.contains(&who) { + error!( target: "sync", - if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace }, - "Peer {:?} using unsupported protocol version {}", who, status.version + "Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})", + who, + self.genesis_hash, + status.genesis_hash, ); - self.peerset_handle.report_peer(who.clone(), rep::BAD_PROTOCOL); - self.behaviour.disconnect_peer(&who); - return CustomMessageOutcome::None; } - if self.config.roles.is_light() { - // we're not interested in light peers - if status.roles.is_light() { - debug!(target: "sync", "Peer {} is unable to serve light requests", who); - self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE); - self.behaviour.disconnect_peer(&who); - return CustomMessageOutcome::None; - } + return CustomMessageOutcome::None; + } - // we don't interested in peers that are far behind us - let self_best_block = self - .context_data - .chain - .info() - .best_number; - let blocks_difference = self_best_block - .checked_sub(&status.best_number) - .unwrap_or_else(Zero::zero) - .saturated_into::(); - if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { - debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who); - self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT); - self.behaviour.disconnect_peer(&who); - return CustomMessageOutcome::None; - } + if self.config.roles.is_light() { + // we're not interested in light peers + if status.roles.is_light() { + debug!(target: "sync", "Peer {} is unable to serve light requests", who); + self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE); + self.behaviour.disconnect_peer(&who); + return CustomMessageOutcome::None; } - let peer = Peer { - info: PeerInfo { - protocol_version: status.version, - roles: status.roles, - best_hash: status.best_hash, - best_number: status.best_number - }, - block_request: None, - known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS) - .expect("Constant is nonzero")), - known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS) - .expect("Constant is nonzero")), - next_request_id: 0, - obsolete_requests: HashMap::new(), - }; - self.context_data.peers.insert(who.clone(), peer); + // we don't interested in peers that are far behind us + let self_best_block = self + .context_data + .chain + .info() + .best_number; + let blocks_difference = self_best_block + .checked_sub(&status.best_number) + .unwrap_or_else(Zero::zero) + .saturated_into::(); + if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { + debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who); + self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT); + self.behaviour.disconnect_peer(&who); + return CustomMessageOutcome::None; + } + } - debug!(target: "sync", "Connected {}", who); - status.version + let peer = Peer { + info: PeerInfo { + roles: status.roles, + best_hash: status.best_hash, + best_number: status.best_number + }, + block_request: None, + known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS) + .expect("Constant is nonzero")), + known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS) + .expect("Constant is nonzero")), + next_request_id: 0, + obsolete_requests: HashMap::new(), }; + self.context_data.peers.insert(who.clone(), peer); + + debug!(target: "sync", "Connected {}", who); let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone(); self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who.clone(), status.best_number)); @@ -1151,20 +1138,12 @@ impl Protocol { if inserted || force { let message = message::BlockAnnounce { header: header.clone(), - state: if peer.info.protocol_version >= 4 { - if is_best { - Some(message::BlockState::Best) - } else { - Some(message::BlockState::Normal) - } - } else { - None - }, - data: if peer.info.protocol_version >= 4 { - Some(data.clone()) + state: if is_best { + Some(message::BlockState::Best) } else { - None + Some(message::BlockState::Normal) }, + data: Some(data.clone()), }; self.behaviour.write_notification( @@ -1588,9 +1567,20 @@ impl NetworkBehaviour for Protocol { let outcome = match event { GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, notifications_sink, .. } => { - match as Decode>::decode(&mut &received_handshake[..]) { - Ok(GenericMessage::Status(handshake)) => - self.on_peer_connected(peer_id, handshake, notifications_sink), + // `received_handshake` can be either a `Status` message if received from the + // legacy substream ,or a `BlockAnnouncesHandshake` if received from the block + // announces substream. + match as DecodeAll>::decode_all(&mut &received_handshake[..]) { + Ok(GenericMessage::Status(handshake)) => { + let handshake = BlockAnnouncesHandshake { + roles: handshake.roles, + best_number: handshake.best_number, + best_hash: handshake.best_hash, + genesis_hash: handshake.genesis_hash, + }; + + self.on_peer_connected(peer_id, handshake, notifications_sink) + }, Ok(msg) => { debug!( target: "sync", @@ -1602,15 +1592,23 @@ impl NetworkBehaviour for Protocol { CustomMessageOutcome::None } Err(err) => { - debug!( - target: "sync", - "Couldn't decode handshake sent by {}: {:?}: {}", - peer_id, - received_handshake, - err.what() - ); - self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); - CustomMessageOutcome::None + match as DecodeAll>::decode_all(&mut &received_handshake[..]) { + Ok(handshake) => { + self.on_peer_connected(peer_id, handshake, notifications_sink) + } + Err(err2) => { + debug!( + target: "sync", + "Couldn't decode handshake sent by {}: {:?}: {} & {}", + peer_id, + received_handshake, + err.what(), + err2, + ); + self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); + CustomMessageOutcome::None + } + } } } } diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 996a810605d13..22e889adc78d8 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -122,6 +122,10 @@ pub struct GenericProto { /// initial handshake. notif_protocols: Vec<(Cow<'static, str>, Arc>>)>, + /// Index within `notif_protocols` of the protocol containing the handshake to report on the + /// external API. + handshake_protocol_index: usize, + /// Receiver for instructions about who to connect to or disconnect from. peerset: sc_peerset::Peerset, @@ -336,14 +340,24 @@ impl GenericProto { versions: &[u8], handshake_message: Vec, peerset: sc_peerset::Peerset, + notif_protocols: impl Iterator, Vec)>, + handshake_protocol_index: usize, ) -> Self { + let notif_protocols = notif_protocols + .map(|(n, hs)| (n, Arc::new(RwLock::new(hs)))) + .collect::>(); + + assert!(!notif_protocols.is_empty()); + assert!(handshake_protocol_index < notif_protocols.len()); + let legacy_handshake_message = Arc::new(RwLock::new(handshake_message)); let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message); GenericProto { local_peer_id, legacy_protocol, - notif_protocols: Vec::new(), + notif_protocols, + handshake_protocol_index, peerset, peers: FnvHashMap::default(), delays: Default::default(), @@ -855,6 +869,7 @@ impl NetworkBehaviour for GenericProto { NotifsHandlerProto::new( self.legacy_protocol.clone(), self.notif_protocols.clone(), + self.handshake_protocol_index, ) } diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index acb241af2ad2d..fba5f00e55fd4 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -93,6 +93,10 @@ pub struct NotifsHandlerProto { /// Prototypes for handlers for outbound substreams, and the initial handshake message we send. out_handlers: Vec<(NotifsOutHandlerProto, Arc>>)>, + /// Index within `in_handlers` and `out_handlers` of the protocol containing the handshake to + /// report on the external API. + handshake_protocol_index: usize, + /// Prototype for handler for backwards-compatibility. legacy: LegacyProtoHandlerProto, } @@ -107,16 +111,21 @@ pub struct NotifsHandler { /// Handlers for outbound substreams, and the initial handshake message we send. out_handlers: Vec<(NotifsOutHandler, Arc>>)>, + /// Index within `in_handlers` and `out_handlers` of the protocol containing the handshake to + /// report on the external API. + handshake_protocol_index: usize, + /// Whether we are the connection dialer or listener. endpoint: ConnectedPoint, /// Handler for backwards-compatibility. legacy: LegacyProtoHandler, - /// In the situation where `legacy.is_open()` is true, but we haven't sent out any - /// [`NotifsHandlerOut::Open`] event yet, this contains the handshake received on the legacy - /// substream. - pending_legacy_handshake: Option>, + /// In the situation where either the legacy substream has been opened or the handshake-bearing + /// notifications protocol is open, but we haven't sent out any [`NotifsHandlerOut::Open`] + /// event yet, this contains the received handshake waiting to be reported through the + /// external API. + pending_handshake: Option>, /// State of this handler. enabled: EnabledState, @@ -170,9 +179,10 @@ impl IntoProtocolsHandler for NotifsHandlerProto { .into_iter() .map(|(proto, msg)| (proto.into_handler(remote_peer_id, connected_point), msg)) .collect(), + handshake_protocol_index: self.handshake_protocol_index, endpoint: connected_point.clone(), legacy: self.legacy.into_handler(remote_peer_id, connected_point), - pending_legacy_handshake: None, + pending_handshake: None, enabled: EnabledState::Initial, pending_in: Vec::new(), notifications_sink_rx: None, @@ -360,12 +370,25 @@ impl NotifsHandlerProto { /// `list` is a list of notification protocols names, and the message to send as part of the /// handshake. At the moment, the message is always the same whether we open a substream /// ourselves or respond to handshake from the remote. + /// + /// `handshake_protocol_index` is the index, within `list`, of the protocol that contains + /// the handshake to report through the [`NotifsHandlerOut::Open`] event. + /// + /// # Panic + /// + /// - Panics if `list` is empty (as `handshake_protocol_index` can't possibly be correct). + /// - Panics if `handshake_protocol_index` is >= `list.len()`. + /// pub fn new( legacy: RegisteredProtocol, list: impl Into, Arc>>)>>, + handshake_protocol_index: usize, ) -> Self { let list = list.into(); + assert!(!list.is_empty()); + assert!(handshake_protocol_index < list.len()); + let out_handlers = list .clone() .into_iter() @@ -381,6 +404,7 @@ impl NotifsHandlerProto { NotifsHandlerProto { in_handlers, out_handlers, + handshake_protocol_index, legacy: LegacyProtoHandlerProto::new(legacy), } } @@ -612,11 +636,13 @@ impl ProtocolsHandler for NotifsHandler { } } - // If `self.pending_legacy_handshake` is `Some`, we are in a state where the legacy - // substream is open but the user isn't aware yet of the substreams being open. + // If `self.pending_handshake` is `Some`, we are in a state where the handshake-bearing + // substream (either the legacy substream of the one indicated by + // `handshake_protocol_index`) is open but the user isn't aware yet of the substreams + // being open. // When that is the case, neither the legacy substream nor the incoming notifications // substreams should be polled, otherwise there is a risk of receiving messages from them. - if self.pending_legacy_handshake.is_none() { + if self.pending_handshake.is_none() { while let Poll::Ready(ev) = self.legacy.poll(cx) { match ev { ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } => @@ -628,14 +654,16 @@ impl ProtocolsHandler for NotifsHandler { received_handshake, .. }) => { - self.pending_legacy_handshake = Some(received_handshake); + if self.notifications_sink_rx.is_none() { + debug_assert!(self.pending_handshake.is_none()); + self.pending_handshake = Some(received_handshake); + } cx.waker().wake_by_ref(); return Poll::Pending; }, ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { reason, .. }) => { // We consciously drop the receivers despite notifications being potentially // still buffered up. - debug_assert!(self.notifications_sink_rx.is_some()); self.notifications_sink_rx = None; return Poll::Ready(ProtocolsHandlerEvent::Custom( @@ -643,7 +671,6 @@ impl ProtocolsHandler for NotifsHandler { )) }, ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => { - debug_assert!(self.notifications_sink_rx.is_some()); return Poll::Ready(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::CustomMessage { message } )) @@ -660,7 +687,7 @@ impl ProtocolsHandler for NotifsHandler { for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() { loop { - let poll = if self.pending_legacy_handshake.is_none() { + let poll = if self.pending_handshake.is_none() { handler.poll(cx) } else { handler.poll_process(cx) @@ -689,7 +716,7 @@ impl ProtocolsHandler for NotifsHandler { }, ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {}, ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => { - debug_assert!(self.pending_legacy_handshake.is_none()); + debug_assert!(self.pending_handshake.is_none()); if self.notifications_sink_rx.is_some() { let msg = NotifsHandlerOut::Notification { message, @@ -712,12 +739,17 @@ impl ProtocolsHandler for NotifsHandler { }), ProtocolsHandlerEvent::Close(err) => void::unreachable(err), - // At the moment we don't actually care whether any notifications protocol - // opens or closes. - // Whether our communications with the remote are open or closed entirely - // depends on the legacy substream, because as long as we are open the user of - // this struct might try to send legacy protocol messages which we need to - // deliver for things to work properly. + // Opened substream on the handshake-bearing notification protocol. + ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Open { handshake }) + if handler_num == self.handshake_protocol_index => + { + if self.notifications_sink_rx.is_none() && self.pending_handshake.is_none() { + self.pending_handshake = Some(handshake); + } + }, + + // Nothing to do in response to other notification substreams being opened + // or closed. ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Open { .. }) => {}, ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Closed) => {}, ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Refused) => {}, @@ -726,7 +758,7 @@ impl ProtocolsHandler for NotifsHandler { } if self.out_handlers.iter().all(|(h, _)| h.is_open() || h.is_refused()) { - if let Some(handshake) = self.pending_legacy_handshake.take() { + if let Some(handshake) = self.pending_handshake.take() { let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); let notifications_sink = NotificationsSink { diff --git a/client/network/src/protocol/generic_proto/tests.rs b/client/network/src/protocol/generic_proto/tests.rs index dbe02c350100f..a9bffffd6f351 100644 --- a/client/network/src/protocol/generic_proto/tests.rs +++ b/client/network/src/protocol/generic_proto/tests.rs @@ -25,7 +25,7 @@ use libp2p::swarm::{ Swarm, ProtocolsHandler, IntoProtocolsHandler, PollParameters, NetworkBehaviour, NetworkBehaviourAction }; -use std::{error, io, task::Context, task::Poll, time::Duration}; +use std::{error, io, iter, task::{Context, Poll}, time::Duration}; /// Builds two nodes that have each other as bootstrap nodes. /// This is to be used only for testing, and a panic will happen if something goes wrong. @@ -80,7 +80,10 @@ fn build_nodes() -> (Swarm, Swarm) { }); let behaviour = CustomProtoWithAddr { - inner: GenericProto::new(local_peer_id, "test", &[1], vec![], peerset), + inner: GenericProto::new( + local_peer_id, "test", &[1], vec![], peerset, + iter::once(("foo".into(), Vec::new())), 0 + ), addrs: addrs .iter() .enumerate() diff --git a/client/network/src/protocol/message.rs b/client/network/src/protocol/message.rs index a7fbb92387cf6..1cd78c0ed1dda 100644 --- a/client/network/src/protocol/message.rs +++ b/client/network/src/protocol/message.rs @@ -41,12 +41,6 @@ pub type Message = generic::Message< ::Extrinsic, >; -/// Type alias for using the status type using block type parameters. -pub type Status = generic::Status< - ::Hash, - <::Header as HeaderT>::Number, ->; - /// Type alias for using the block request type using block type parameters. pub type BlockRequest = generic::BlockRequest< ::Hash, diff --git a/client/rpc-api/src/system/helpers.rs b/client/rpc-api/src/system/helpers.rs index 5dbe93543d8e5..dd3294c243116 100644 --- a/client/rpc-api/src/system/helpers.rs +++ b/client/rpc-api/src/system/helpers.rs @@ -67,8 +67,6 @@ pub struct PeerInfo { pub peer_id: String, /// Roles pub roles: String, - /// Protocol version - pub protocol_version: u32, /// Peer best block hash pub best_hash: Hash, /// Peer best block number @@ -110,11 +108,10 @@ mod tests { ::serde_json::to_string(&PeerInfo { peer_id: "2".into(), roles: "a".into(), - protocol_version: 2, best_hash: 5u32, best_number: 6u32, }).unwrap(), - r#"{"peerId":"2","roles":"a","protocolVersion":2,"bestHash":5,"bestNumber":6}"#, + r#"{"peerId":"2","roles":"a","bestHash":5,"bestNumber":6}"#, ); } } diff --git a/client/rpc/src/system/tests.rs b/client/rpc/src/system/tests.rs index dfe1fcc415159..a4fb5548f106a 100644 --- a/client/rpc/src/system/tests.rs +++ b/client/rpc/src/system/tests.rs @@ -73,7 +73,6 @@ fn api>>(sync: T) -> System { peers.push(PeerInfo { peer_id: status.peer_id.to_base58(), roles: format!("{}", Role::Full), - protocol_version: 1, best_hash: Default::default(), best_number: 1, }); @@ -261,7 +260,6 @@ fn system_peers() { vec![PeerInfo { peer_id: peer_id.to_base58(), roles: "FULL".into(), - protocol_version: 1, best_hash: Default::default(), best_number: 1u64, }] diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index d19b9f5ea247d..9a42f0da2935e 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -272,7 +272,6 @@ async fn build_network_future< sc_rpc::system::PeerInfo { peer_id: peer_id.to_base58(), roles: format!("{:?}", p.roles), - protocol_version: p.protocol_version, best_hash: p.best_hash, best_number: p.best_number, } From d7a506d8f7cb5e52b84cc2c8d84bca22cdb63d29 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 10 Sep 2020 16:25:47 +0200 Subject: [PATCH 2/3] Misc fixes --- client/network/src/protocol/generic_proto/handler/group.rs | 2 +- client/network/src/protocol/generic_proto/tests.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index fba5f00e55fd4..c74afc0e495a5 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -687,7 +687,7 @@ impl ProtocolsHandler for NotifsHandler { for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() { loop { - let poll = if self.pending_handshake.is_none() { + let poll = if self.notifications_sink_rx.is_some() { handler.poll(cx) } else { handler.poll_process(cx) diff --git a/client/network/src/protocol/generic_proto/tests.rs b/client/network/src/protocol/generic_proto/tests.rs index a9bffffd6f351..c9547022d836f 100644 --- a/client/network/src/protocol/generic_proto/tests.rs +++ b/client/network/src/protocol/generic_proto/tests.rs @@ -82,7 +82,7 @@ fn build_nodes() -> (Swarm, Swarm) { let behaviour = CustomProtoWithAddr { inner: GenericProto::new( local_peer_id, "test", &[1], vec![], peerset, - iter::once(("foo".into(), Vec::new())), 0 + iter::once(("/foo".into(), Vec::new())), 0 ), addrs: addrs .iter() From 776ee3b2598d83e1ec0d5dfa1dc65ab79a5fee9a Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 14 Sep 2020 15:05:37 +0200 Subject: [PATCH 3/3] Special case first protocol as the one bearing the handshake --- client/network/src/protocol.rs | 3 ++- .../src/protocol/generic_proto/behaviour.rs | 8 ------ .../protocol/generic_proto/handler/group.rs | 27 +++++-------------- .../src/protocol/generic_proto/tests.rs | 2 +- 4 files changed, 9 insertions(+), 31 deletions(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index f9ec8ea4c4c5a..c1887ce35bfdb 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -419,9 +419,10 @@ impl Protocol { versions, build_status_message(&config, &chain), peerset, + // As documented in `GenericProto`, the first protocol in the list is always the + // one carrying the handshake reported in the `CustomProtocolOpen` event. iter::once((block_announces_protocol.clone(), block_announces_handshake)) .chain(iter::once((transactions_protocol.clone(), vec![]))), - 0, ) }; diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 22e889adc78d8..e7e2cb035d65c 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -122,10 +122,6 @@ pub struct GenericProto { /// initial handshake. notif_protocols: Vec<(Cow<'static, str>, Arc>>)>, - /// Index within `notif_protocols` of the protocol containing the handshake to report on the - /// external API. - handshake_protocol_index: usize, - /// Receiver for instructions about who to connect to or disconnect from. peerset: sc_peerset::Peerset, @@ -341,14 +337,12 @@ impl GenericProto { handshake_message: Vec, peerset: sc_peerset::Peerset, notif_protocols: impl Iterator, Vec)>, - handshake_protocol_index: usize, ) -> Self { let notif_protocols = notif_protocols .map(|(n, hs)| (n, Arc::new(RwLock::new(hs)))) .collect::>(); assert!(!notif_protocols.is_empty()); - assert!(handshake_protocol_index < notif_protocols.len()); let legacy_handshake_message = Arc::new(RwLock::new(handshake_message)); let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message); @@ -357,7 +351,6 @@ impl GenericProto { local_peer_id, legacy_protocol, notif_protocols, - handshake_protocol_index, peerset, peers: FnvHashMap::default(), delays: Default::default(), @@ -869,7 +862,6 @@ impl NetworkBehaviour for GenericProto { NotifsHandlerProto::new( self.legacy_protocol.clone(), self.notif_protocols.clone(), - self.handshake_protocol_index, ) } diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index c74afc0e495a5..30a1d1037d452 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -93,10 +93,6 @@ pub struct NotifsHandlerProto { /// Prototypes for handlers for outbound substreams, and the initial handshake message we send. out_handlers: Vec<(NotifsOutHandlerProto, Arc>>)>, - /// Index within `in_handlers` and `out_handlers` of the protocol containing the handshake to - /// report on the external API. - handshake_protocol_index: usize, - /// Prototype for handler for backwards-compatibility. legacy: LegacyProtoHandlerProto, } @@ -111,10 +107,6 @@ pub struct NotifsHandler { /// Handlers for outbound substreams, and the initial handshake message we send. out_handlers: Vec<(NotifsOutHandler, Arc>>)>, - /// Index within `in_handlers` and `out_handlers` of the protocol containing the handshake to - /// report on the external API. - handshake_protocol_index: usize, - /// Whether we are the connection dialer or listener. endpoint: ConnectedPoint, @@ -179,7 +171,6 @@ impl IntoProtocolsHandler for NotifsHandlerProto { .into_iter() .map(|(proto, msg)| (proto.into_handler(remote_peer_id, connected_point), msg)) .collect(), - handshake_protocol_index: self.handshake_protocol_index, endpoint: connected_point.clone(), legacy: self.legacy.into_handler(remote_peer_id, connected_point), pending_handshake: None, @@ -371,23 +362,19 @@ impl NotifsHandlerProto { /// handshake. At the moment, the message is always the same whether we open a substream /// ourselves or respond to handshake from the remote. /// - /// `handshake_protocol_index` is the index, within `list`, of the protocol that contains - /// the handshake to report through the [`NotifsHandlerOut::Open`] event. + /// The first protocol in `list` is special-cased as the protocol that contains the handshake + /// to report through the [`NotifsHandlerOut::Open`] event. /// /// # Panic /// - /// - Panics if `list` is empty (as `handshake_protocol_index` can't possibly be correct). - /// - Panics if `handshake_protocol_index` is >= `list.len()`. + /// - Panics if `list` is empty. /// pub fn new( legacy: RegisteredProtocol, list: impl Into, Arc>>)>>, - handshake_protocol_index: usize, ) -> Self { let list = list.into(); - assert!(!list.is_empty()); - assert!(handshake_protocol_index < list.len()); let out_handlers = list .clone() @@ -404,7 +391,6 @@ impl NotifsHandlerProto { NotifsHandlerProto { in_handlers, out_handlers, - handshake_protocol_index, legacy: LegacyProtoHandlerProto::new(legacy), } } @@ -637,9 +623,8 @@ impl ProtocolsHandler for NotifsHandler { } // If `self.pending_handshake` is `Some`, we are in a state where the handshake-bearing - // substream (either the legacy substream of the one indicated by - // `handshake_protocol_index`) is open but the user isn't aware yet of the substreams - // being open. + // substream (either the legacy substream or the one special-cased as providing the + // handshake) is open but the user isn't aware yet of the substreams being open. // When that is the case, neither the legacy substream nor the incoming notifications // substreams should be polled, otherwise there is a risk of receiving messages from them. if self.pending_handshake.is_none() { @@ -741,7 +726,7 @@ impl ProtocolsHandler for NotifsHandler { // Opened substream on the handshake-bearing notification protocol. ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Open { handshake }) - if handler_num == self.handshake_protocol_index => + if handler_num == 0 => { if self.notifications_sink_rx.is_none() && self.pending_handshake.is_none() { self.pending_handshake = Some(handshake); diff --git a/client/network/src/protocol/generic_proto/tests.rs b/client/network/src/protocol/generic_proto/tests.rs index c9547022d836f..7c1fed1b447b3 100644 --- a/client/network/src/protocol/generic_proto/tests.rs +++ b/client/network/src/protocol/generic_proto/tests.rs @@ -82,7 +82,7 @@ fn build_nodes() -> (Swarm, Swarm) { let behaviour = CustomProtoWithAddr { inner: GenericProto::new( local_peer_id, "test", &[1], vec![], peerset, - iter::once(("/foo".into(), Vec::new())), 0 + iter::once(("/foo".into(), Vec::new())) ), addrs: addrs .iter()