Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Stop sending messages on legacy substream altogether (#6975)
Browse files Browse the repository at this point in the history
* Stop sending messages on legacy substream altogether

* Ensure that handshake is sent back even in case of back-pressure

* Update client/network/src/protocol/generic_proto/handler/group.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Also process OpenRequest and Closed

* Also process OpenRequest and Closed

* Fix bad merge

* God I'm so lost with all these merges

* Immediately return Closed

* Add warning for sending on non-registered protocol

* Register GrandPa protocol in tests

* Update client/network/src/protocol/generic_proto/handler/group.rs

Co-authored-by: Max Inden <mail@max-inden.de>

Co-authored-by: Max Inden <mail@max-inden.de>
  • Loading branch information
tomaka and mxinden authored Sep 2, 2020
1 parent 2e2b6fd commit eb52e43
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 433 deletions.
11 changes: 10 additions & 1 deletion client/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use assert_matches::assert_matches;
use environment::HasVoted;
use sc_network_test::{
Block, BlockImportAdapter, Hash, PassThroughVerifier, Peer, PeersClient, PeersFullClient,
TestClient, TestNetFactory,
TestClient, TestNetFactory, FullPeerConfig,
};
use sc_network::config::{ProtocolConfig, BoxFinalityProofRequestBuilder};
use parking_lot::Mutex;
Expand Down Expand Up @@ -94,6 +94,15 @@ impl TestNetFactory for GrandpaTestNet {
ProtocolConfig::default()
}

fn add_full_peer(&mut self) {
self.add_full_peer_with_config(FullPeerConfig {
notifications_protocols: vec![
(communication::GRANDPA_ENGINE_ID, communication::GRANDPA_PROTOCOL_NAME.into())
],
..Default::default()
})
}

fn make_verifier(
&self,
_client: PeersClient,
Expand Down
176 changes: 16 additions & 160 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ use sp_consensus::{
use codec::{Decode, Encode};
use sp_runtime::{generic::BlockId, ConsensusEngineId, Justification};
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, NumberFor, One, Zero, CheckedSub
Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub
};
use sp_arithmetic::traits::SaturatedConversion;
use message::{BlockAnnounce, Message};
use message::generic::{Message as GenericMessage, Roles};
use prometheus_endpoint::{
Registry, Gauge, Counter, CounterVec, GaugeVec,
Registry, Gauge, Counter, GaugeVec,
PrometheusError, Opts, register, U64
};
use sync::{ChainSync, SyncState};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry};
use std::sync::Arc;
use std::fmt::Write;
use std::{cmp, io, num::NonZeroUsize, pin::Pin, task::Poll, time};
use std::{io, num::NonZeroUsize, pin::Pin, task::Poll, time};
use log::{log, Level, trace, debug, warn, error};
use wasm_timer::Instant;

Expand Down Expand Up @@ -86,11 +86,6 @@ pub(crate) const CURRENT_VERSION: u32 = 6;
/// Lowest version we support
pub(crate) const MIN_VERSION: u32 = 3;

// Maximum allowed entries in `BlockResponse`
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
// Maximum total bytes allowed for block bodies in `BlockResponse`
const MAX_BODIES_BYTES: usize = 8 * 1024 * 1024;

/// When light node connects to the full node and the full node is behind light node
/// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it not useful
/// and disconnect to free connection slot.
Expand Down Expand Up @@ -119,8 +114,6 @@ mod rep {
pub const UNEXPECTED_RESPONSE: Rep = Rep::new_fatal("Unexpected response packet");
/// We received an unexpected transaction packet.
pub const UNEXPECTED_TRANSACTIONS: Rep = Rep::new_fatal("Unexpected transactions packet");
/// We received an unexpected light node request.
pub const UNEXPECTED_REQUEST: Rep = Rep::new_fatal("Unexpected block request packet");
/// Peer has different genesis.
pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
/// Peer is on unsupported protocol version.
Expand All @@ -139,7 +132,6 @@ struct Metrics {
finality_proofs: GaugeVec<U64>,
justifications: GaugeVec<U64>,
propagated_transactions: Counter<U64>,
legacy_requests_received: CounterVec<U64>,
}

impl Metrics {
Expand Down Expand Up @@ -185,13 +177,6 @@ impl Metrics {
"sync_propagated_transactions",
"Number of transactions propagated to at least one peer",
)?, r)?,
legacy_requests_received: register(CounterVec::new(
Opts::new(
"sync_legacy_requests_received",
"Number of block/finality/light-client requests received on the legacy substream",
),
&["kind"]
)?, r)?,
})
}
}
Expand Down Expand Up @@ -604,19 +589,15 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
match message {
GenericMessage::Status(_) =>
debug!(target: "sub-libp2p", "Received unexpected Status"),
GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
GenericMessage::BlockResponse(r) => {
let outcome = self.on_block_response(who.clone(), r);
self.update_peer_info(&who);
return outcome
},
GenericMessage::BlockAnnounce(announce) => {
let outcome = self.on_block_announce(who.clone(), announce);
self.update_peer_info(&who);
return outcome;
},
GenericMessage::Transactions(m) =>
self.on_transactions(who, m),
GenericMessage::BlockResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected BlockResponse"),
GenericMessage::RemoteCallResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"),
GenericMessage::RemoteReadResponse(_) =>
Expand All @@ -627,6 +608,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
warn!(target: "sub-libp2p", "Received unexpected RemoteChangesResponse"),
GenericMessage::FinalityProofResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected FinalityProofResponse"),
GenericMessage::BlockRequest(_) |
GenericMessage::FinalityProofRequest(_) |
GenericMessage::RemoteReadChildRequest(_) |
GenericMessage::RemoteCallRequest(_) |
Expand Down Expand Up @@ -678,21 +660,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
CustomMessageOutcome::None
}

fn send_message(
&mut self,
who: &PeerId,
message: Option<(Cow<'static, str>, Vec<u8>)>,
legacy: Message<B>,
) {
send_message::<B>(
&mut self.behaviour,
&mut self.context_data.stats,
who,
message,
legacy,
);
}

fn update_peer_request(&mut self, who: &PeerId, request: &mut message::BlockRequest<B>) {
update_peer_request::<B, H>(&mut self.context_data.peers, who, request)
}
Expand All @@ -718,92 +685,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

fn on_block_request(&mut self, peer: PeerId, request: message::BlockRequest<B>) {
if let Some(metrics) = &self.metrics {
metrics.legacy_requests_received.with_label_values(&["block-request"]).inc();
}

trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}",
request.id,
peer,
request.from,
request.to,
request.max,
request.fields,
);

// sending block requests to the node that is unable to serve it is considered a bad behavior
if !self.config.roles.is_full() {
trace!(target: "sync", "Peer {} is trying to sync from the light node", peer);
self.behaviour.disconnect_peer(&peer);
self.peerset_handle.report_peer(peer, rep::UNEXPECTED_REQUEST);
return;
}

let mut blocks = Vec::new();
let mut id = match request.from {
message::FromBlock::Hash(h) => BlockId::Hash(h),
message::FromBlock::Number(n) => BlockId::Number(n),
};
let max = cmp::min(request.max.unwrap_or(u32::max_value()), MAX_BLOCK_DATA_RESPONSE) as usize;
let get_header = request.fields.contains(message::BlockAttributes::HEADER);
let get_body = request.fields.contains(message::BlockAttributes::BODY);
let get_justification = request
.fields
.contains(message::BlockAttributes::JUSTIFICATION);
let mut total_size = 0;
while let Some(header) = self.context_data.chain.header(id).unwrap_or(None) {
if blocks.len() >= max || (blocks.len() >= 1 && total_size > MAX_BODIES_BYTES) {
break;
}
let number = *header.number();
let hash = header.hash();
let parent_hash = *header.parent_hash();
let justification = if get_justification {
self.context_data.chain.justification(&BlockId::Hash(hash)).unwrap_or(None)
} else {
None
};
let block_data = message::generic::BlockData {
hash,
header: if get_header { Some(header) } else { None },
body: if get_body {
self.context_data
.chain
.block_body(&BlockId::Hash(hash))
.unwrap_or(None)
} else {
None
},
receipt: None,
message_queue: None,
justification,
};
// Stop if we don't have requested block body
if get_body && block_data.body.is_none() {
trace!(target: "sync", "Missing data for block request.");
break;
}
total_size += block_data.body.as_ref().map_or(0, |b| b.len());
blocks.push(block_data);
match request.direction {
message::Direction::Ascending => id = BlockId::Number(number + One::one()),
message::Direction::Descending => {
if number.is_zero() {
break;
}
id = BlockId::Hash(parent_hash)
}
}
}
let response = message::generic::BlockResponse {
id: request.id,
blocks,
};
trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len());
self.send_message(&peer, None, GenericMessage::BlockResponse(response))
}

/// Adjusts the reputation of a node.
pub fn report_peer(&self, who: PeerId, reputation: sc_peerset::ReputationChange) {
self.peerset_handle.report_peer(who, reputation)
Expand Down Expand Up @@ -1207,14 +1088,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
.push(who.to_base58());
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
let encoded = to_send.encode();
send_message::<B> (
&mut self.behaviour,
&mut self.context_data.stats,
&who,
Some((self.transactions_protocol.clone(), encoded)),
GenericMessage::Transactions(to_send)
)
self.behaviour.write_notification(
who,
self.transactions_protocol.clone(),
to_send.encode()
);
}
}

Expand Down Expand Up @@ -1289,15 +1167,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
},
};

let encoded = message.encode();

send_message::<B> (
&mut self.behaviour,
&mut self.context_data.stats,
&who,
Some((self.block_announces_protocol.clone(), encoded)),
Message::<B>::BlockAnnounce(message),
)
self.behaviour.write_notification(
who,
self.block_announces_protocol.clone(),
message.encode()
);
}
}
}
Expand Down Expand Up @@ -1605,24 +1479,6 @@ fn update_peer_request<B: BlockT, H: ExHashT>(
}
}

fn send_message<B: BlockT>(
behaviour: &mut GenericProto,
stats: &mut HashMap<&'static str, PacketStats>,
who: &PeerId,
message: Option<(Cow<'static, str>, Vec<u8>)>,
legacy_message: Message<B>,
) {
let encoded = legacy_message.encode();
let mut stats = stats.entry(legacy_message.id()).or_default();
stats.bytes_out += encoded.len() as u64;
stats.count_out += 1;
if let Some((proto, msg)) = message {
behaviour.write_notification(who, proto, msg, encoded);
} else {
behaviour.send_packet(who, encoded);
}
}

impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
type ProtocolsHandler = <GenericProto as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = CustomMessageOutcome<B>;
Expand Down
24 changes: 0 additions & 24 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,6 @@ impl GenericProto {
target: &PeerId,
protocol_name: Cow<'static, str>,
message: impl Into<Vec<u8>>,
encoded_fallback_message: Vec<u8>,
) {
let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) {
None => {
Expand All @@ -574,33 +573,10 @@ impl GenericProto {
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
notifs_sink.send_sync_notification(
protocol_name,
encoded_fallback_message,
message
);
}

/// Sends a message to a peer.
///
/// Has no effect if the custom protocol is not open with the given peer.
///
/// Also note that even we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
pub fn send_packet(&mut self, target: &PeerId, message: Vec<u8>) {
let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) {
None => {
debug!(target: "sub-libp2p",
"Tried to sent packet to {:?} without an open channel.",
target);
return
}
Some(sink) => sink
};

trace!(target: "sub-libp2p", "External API => Packet for {:?}", target);
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
notifs_sink.send_legacy(message);
}

/// Returns the state of the peerset manager, for debugging purposes.
pub fn peerset_debug_info(&mut self) -> serde_json::Value {
self.peerset.debug_info()
Expand Down
Loading

0 comments on commit eb52e43

Please sign in to comment.