Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Extra timeout handling in block_requests #5794

Merged
merged 1 commit into from
Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,16 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B
let ev = self.substrate.on_block_response(peer, original_request, response);
self.inject_event(ev);
}
block_requests::Event::RequestCancelled { .. } => {
// There doesn't exist any mechanism to report cancellations yet.
// We would normally disconnect the node, but this event happens as the result of
// a disconnect, so there's nothing more to do.
}
block_requests::Event::RequestTimeout { peer, .. } => {
// There doesn't exist any mechanism to report timeouts yet, so we process them by
// disconnecting the node.
self.substrate.disconnect_peer(&peer);
}
}
}
}
Expand Down
231 changes: 219 additions & 12 deletions client/network/src/protocol/block_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
protocol::{api, message::{self, BlockAttributes}}
};
use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
use futures_timer::Delay;
use libp2p::{
core::{
ConnectedPoint,
Expand All @@ -54,10 +55,11 @@ use prost::Message;
use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}};
use std::{
cmp::min,
collections::VecDeque,
collections::{HashMap, VecDeque},
io,
iter,
marker::PhantomData,
pin::Pin,
sync::Arc,
time::Duration,
task::{Context, Poll}
Expand All @@ -77,6 +79,19 @@ pub enum Event<B: Block> {
original_request: message::BlockRequest<B>,
response: message::BlockResponse<B>,
},
/// A request has been cancelled because the peer has disconnected.
/// Disconnects can also happen as a result of violating the network protocol.
RequestCancelled {
peer: PeerId,
/// The original request passed to `send_request`.
original_request: message::BlockRequest<B>,
},
/// A request has timed out.
RequestTimeout {
peer: PeerId,
/// The original request passed to `send_request`.
original_request: message::BlockRequest<B>,
}
}

/// Configuration options for `BlockRequests`.
Expand All @@ -86,6 +101,7 @@ pub struct Config {
max_request_len: usize,
max_response_len: usize,
inactivity_timeout: Duration,
request_timeout: Duration,
protocol: Bytes,
}

Expand All @@ -96,12 +112,14 @@ impl Config {
/// - max. request size = 1 MiB
/// - max. response size = 16 MiB
/// - inactivity timeout = 15s
/// - request timeout = 40s
pub fn new(id: &ProtocolId) -> Self {
let mut c = Config {
max_block_data_response: 128,
max_request_len: 1024 * 1024,
max_response_len: 16 * 1024 * 1024,
inactivity_timeout: Duration::from_secs(15),
request_timeout: Duration::from_secs(40),
protocol: Bytes::new(),
};
c.set_protocol(id);
Expand Down Expand Up @@ -149,12 +167,27 @@ pub struct BlockRequests<B: Block> {
config: Config,
/// Blockchain client.
chain: Arc<dyn Client<B>>,
/// List of all active connections and the requests we've sent.
peers: HashMap<PeerId, Vec<Connection<B>>>,
/// Futures sending back the block request response.
outgoing: FuturesUnordered<BoxFuture<'static, ()>>,
/// Events to return as soon as possible from `poll`.
pending_events: VecDeque<NetworkBehaviourAction<OutboundProtocol<B>, Event<B>>>,
}

/// Local tracking of a libp2p connection.
#[derive(Debug)]
struct Connection<B: Block> {
id: ConnectionId,
ongoing_request: Option<OngoingRequest<B>>,
}

#[derive(Debug)]
struct OngoingRequest<B: Block> {
request: message::BlockRequest<B>,
timeout: Delay,
}

impl<B> BlockRequests<B>
where
B: Block,
Expand All @@ -163,16 +196,38 @@ where
BlockRequests {
config: cfg,
chain,
peers: HashMap::new(),
outgoing: FuturesUnordered::new(),
pending_events: VecDeque::new(),
}
}

/// Issue a new block request.
///
/// Cancels any existing request targeting the same `PeerId`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it safe to invalidate requests to a peer if more requests are sent to the same peer?

Copy link
Contributor Author

@tomaka tomaka Apr 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a matter of safety or correctness. It's a design decision by the sync code to only allow one request per node.

Copy link
Contributor Author

@tomaka tomaka Apr 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From an API user point of view, previous requests are "cancelled", but internally we just forget about them. If a response comes and it doesn't match a known request, we discard that response.

///
/// If the response doesn't arrive in time, or if the remote answers improperly, the target
/// will be disconnected.
pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest<B>) {
// Determine which connection to send the request to.
let connection = if let Some(peer) = self.peers.get_mut(target) {
// We don't want to have multiple requests for any given node, so in priority try to
// find a connection with an existing request, to override it.
if let Some(entry) = peer.iter_mut().find(|c| c.ongoing_request.is_some()) {
entry
} else if let Some(entry) = peer.get_mut(0) {
entry
} else {
log::error!(
target: "sync",
"State inconsistency: empty list of peer connections"
);
return;
}
} else {
return;
};

let protobuf_rq = api::v1::BlockRequest {
fields: u32::from_be_bytes([req.fields.bits(), 0, 0, 0]),
from_block: match req.from {
Expand All @@ -191,14 +246,31 @@ where

let mut buf = Vec::with_capacity(protobuf_rq.encoded_len());
if let Err(err) = protobuf_rq.encode(&mut buf) {
log::warn!("failed to encode block request {:?}: {:?}", protobuf_rq, err);
log::warn!(
target: "sync",
"Failed to encode block request {:?}: {:?}",
protobuf_rq,
err
);
return;
}

log::trace!("enqueueing block request to {:?}: {:?}", target, protobuf_rq);
if let Some(rq) = &connection.ongoing_request {
log::debug!(
target: "sync",
"Replacing existing block request on connection {:?}",
connection.id
);
}
connection.ongoing_request = Some(OngoingRequest {
request: req.clone(),
timeout: Delay::new(self.config.request_timeout),
});

log::trace!(target: "sync", "Enqueueing block request to {:?}: {:?}", target, protobuf_rq);
self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: target.clone(),
handler: NotifyHandler::Any,
handler: NotifyHandler::One(connection.id),
event: OutboundProtocol {
request: buf,
original_request: req,
Expand All @@ -215,7 +287,9 @@ where
, request: &api::v1::BlockRequest
) -> Result<api::v1::BlockResponse, Error>
{
log::trace!("block request from peer {}: from block {:?} to block {:?}, max blocks {:?}",
log::trace!(
target: "sync",
"Block request from peer {}: from block {:?} to block {:?}, max blocks {:?}",
peer,
request.from_block,
request.to_block,
Expand Down Expand Up @@ -332,6 +406,7 @@ where
};
let mut cfg = OneShotHandlerConfig::default();
cfg.inactive_timeout = self.config.inactivity_timeout;
cfg.substream_timeout = self.config.request_timeout;
OneShotHandler::new(SubstreamProtocol::new(p), cfg)
}

Expand All @@ -345,34 +420,138 @@ where
fn inject_disconnected(&mut self, _peer: &PeerId) {
}

fn inject_connection_established(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) {
self.peers.entry(peer_id.clone())
.or_default()
.push(Connection {
id: *id,
ongoing_request: None,
});
}

fn inject_connection_closed(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) {
let mut needs_remove = false;
if let Some(entry) = self.peers.get_mut(peer_id) {
if let Some(pos) = entry.iter().position(|i| i.id == *id) {
let ongoing_request = entry.remove(pos).ongoing_request;
if let Some(ongoing_request) = ongoing_request {
log::debug!(
target: "sync",
"Connection {:?} with {} closed with ongoing sync request: {:?}",
id,
peer_id,
ongoing_request
);
let ev = Event::RequestCancelled {
peer: peer_id.clone(),
original_request: ongoing_request.request.clone(),
};
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
}
if entry.is_empty() {
needs_remove = true;
}
} else {
log::error!(
target: "sync",
"State inconsistency: connection id not found in list"
);
}
} else {
log::error!(
target: "sync",
"State inconsistency: peer_id not found in list of connections"
);
}
if needs_remove {
self.peers.remove(peer_id);
}
}

fn inject_event(
&mut self,
peer: PeerId,
connection: ConnectionId,
connection_id: ConnectionId,
node_event: NodeEvent<B, NegotiatedSubstream>
) {
match node_event {
NodeEvent::Request(request, mut stream) => {
match self.on_block_request(&peer, &request) {
Ok(res) => {
log::trace!("enqueueing block response for peer {} with {} blocks", peer, res.blocks.len());
log::trace!(
target: "sync",
"Enqueueing block response for peer {} with {} blocks",
peer, res.blocks.len()
);
let mut data = Vec::with_capacity(res.encoded_len());
if let Err(e) = res.encode(&mut data) {
log::debug!("error encoding block response for peer {}: {}", peer, e)
log::debug!(
target: "sync",
"Error encoding block response for peer {}: {}",
peer, e
)
} else {
let future = async move {
if let Err(e) = write_one(&mut stream, data).await {
log::debug!("error writing block response: {}", e)
log::debug!(
target: "sync",
"Error writing block response: {}",
e
);
}
};
self.outgoing.push(future.boxed())
}
}
Err(e) => log::debug!("error handling block request from peer {}: {}", peer, e)
Err(e) => log::debug!(
target: "sync",
"Error handling block request from peer {}: {}", peer, e
)
}
}
NodeEvent::Response(original_request, response) => {
log::trace!("received block response from peer {} with {} blocks", peer, response.blocks.len());
log::trace!(
target: "sync",
"Received block response from peer {} with {} blocks",
peer, response.blocks.len()
);
if let Some(connections) = self.peers.get_mut(&peer) {
if let Some(connection) = connections.iter_mut().find(|c| c.id == connection_id) {
if let Some(ongoing_request) = &mut connection.ongoing_request {
if ongoing_request.request == original_request {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this compare IDs or all of the request data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It compares all the data, but that's basically just a bitfield of the requested fields, the source block hash or number, and the number of blocks.

connection.ongoing_request = None;
} else {
// We're no longer interested in that request.
log::debug!(
target: "sync",
"Received response from {} to obsolete block request {:?}",
peer,
original_request
);
return;
}
} else {
// We remove from `self.peers` requests we're no longer interested in,
// so this can legitimately happen.
return;
}
} else {
log::error!(
target: "sync",
"State inconsistency: response on non-existing connection {:?}",
connection_id
);
return;
}
} else {
log::error!(
target: "sync",
"State inconsistency: response on non-connected peer {}",
peer
);
return;
}

let blocks = response.blocks.into_iter().map(|block_data| {
Ok(message::BlockData::<B> {
hash: Decode::decode(&mut block_data.hash.as_ref())?,
Expand Down Expand Up @@ -419,7 +598,10 @@ where
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
}
Err(err) => {
log::debug!("failed to decode block response from peer {}: {}", peer, err);
log::debug!(
target: "sync",
"Failed to decode block response from peer {}: {}", peer, err
);
}
}
}
Expand All @@ -433,6 +615,31 @@ where
return Poll::Ready(ev);
}

// Check the request timeouts.
for (peer, connections) in &mut self.peers {
for connection in connections {
let ongoing_request = match &mut connection.ongoing_request {
Some(rq) => rq,
None => continue,
};

if let Poll::Ready(_) = Pin::new(&mut ongoing_request.timeout).poll(cx) {
let original_request = ongoing_request.request.clone();
connection.ongoing_request = None;
log::debug!(
target: "sync",
"Request timeout for {}: {:?}",
peer, original_request
);
let ev = Event::RequestTimeout {
peer: peer.clone(),
original_request,
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
}
}

while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {}
Poll::Pending
}
Expand Down