Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
client/network: Report reputation changes via response (#7958)
Browse files Browse the repository at this point in the history
* client/network: Report reputation changes via response

When handling a request by a remote peer in a request response handler,
one might want to in- or de-crease the reputation of the peer. E.g. one
might want to decrease the reputation slightly for each request, given
that it forces the local node to do work, or one might want to issue a
larger reputation change due to a malformed request by the remote peer.

Instead of having to pass a peerset handle to each request response
handler, this commit suggests to allow handlers to isssue reputation
changes via the provided `pending_response` `oneshot` channel.

A reputation change issued by a request response handler via the
`pending_response` channel is received by the
`RequestResponsesBehaviour` which passes the reputation change up as an
event to eventually be send to a peerset via a peerset handle.

* client/network/req-resp: Use Vec::new instead of None::<Vec<_>>

* client/network: Rename Response to OutgoingResponse

Given that a request-response request is not called `Request` but
`InomingRequest`, rename a request-response response to
`OutgoingResponse`.

* client/finality-grandpa-warp: Send empty rep change via response
  • Loading branch information
mxinden authored Jan 26, 2021
1 parent 13cdf1c commit addf203
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 30 deletions.
10 changes: 6 additions & 4 deletions client/finality-grandpa-warp-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! [`crate::request_responses::RequestResponsesBehaviour`].
use codec::Decode;
use sc_network::config::{ProtocolId, IncomingRequest, RequestResponseConfig};
use sc_network::config::{IncomingRequest, OutgoingResponse, ProtocolId, RequestResponseConfig};
use sc_client_api::Backend;
use sp_runtime::traits::NumberFor;
use futures::channel::{mpsc, oneshot};
Expand Down Expand Up @@ -113,7 +113,7 @@ impl<TBlock: BlockT, TBackend: Backend<TBlock>> GrandpaWarpSyncRequestHandler<TB
fn handle_request(
&self,
payload: Vec<u8>,
pending_response: oneshot::Sender<Vec<u8>>
pending_response: oneshot::Sender<OutgoingResponse>
) -> Result<(), HandleRequestError>
where NumberFor<TBlock>: sc_finality_grandpa::BlockNumberOps,
{
Expand All @@ -124,8 +124,10 @@ impl<TBlock: BlockT, TBackend: Backend<TBlock>> GrandpaWarpSyncRequestHandler<TB
self.backend.blockchain(), request.begin, Some(WARP_SYNC_FRAGMENTS_LIMIT), Some(&mut cache)
)?;

pending_response.send(response)
.map_err(|_| HandleRequestError::SendResponse)
pending_response.send(OutgoingResponse {
result: Ok(response),
reputation_changes: Vec::new(),
}).map_err(|_| HandleRequestError::SendResponse)
}

/// Run [`GrandpaWarpSyncRequestHandler`].
Expand Down
5 changes: 5 additions & 0 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,11 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<request_responses::Even
peer, protocol, duration, result,
});
},
request_responses::Event::ReputationChanges { peer, changes } => {
for change in changes {
self.substrate.report_peer(peer, change);
}
}
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions client/network/src/block_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use codec::{Encode, Decode};
use crate::chain::Client;
use crate::config::ProtocolId;
use crate::protocol::{message::BlockAttributes};
use crate::request_responses::{IncomingRequest, ProtocolConfig};
use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
use crate::schema::v1::block_request::FromBlock;
use crate::schema::v1::{BlockResponse, Direction};
use futures::channel::{mpsc, oneshot};
Expand Down Expand Up @@ -85,7 +85,7 @@ impl <B: BlockT> BlockRequestHandler<B> {
fn handle_request(
&self,
payload: Vec<u8>,
pending_response: oneshot::Sender<Vec<u8>>
pending_response: oneshot::Sender<OutgoingResponse>
) -> Result<(), HandleRequestError> {
let request = crate::schema::v1::BlockRequest::decode(&payload[..])?;

Expand Down Expand Up @@ -181,8 +181,10 @@ impl <B: BlockT> BlockRequestHandler<B> {
let mut data = Vec::with_capacity(res.encoded_len());
res.encode(&mut data)?;

pending_response.send(data)
.map_err(|_| HandleRequestError::SendResponse)
pending_response.send(OutgoingResponse {
result: Ok(data),
reputation_changes: Vec::new(),
}).map_err(|_| HandleRequestError::SendResponse)
}

/// Run [`BlockRequestHandler`].
Expand Down
6 changes: 5 additions & 1 deletion client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
pub use crate::chain::Client;
pub use crate::on_demand_layer::{AlwaysBadChecker, OnDemand};
pub use crate::request_responses::{IncomingRequest, ProtocolConfig as RequestResponseConfig};
pub use crate::request_responses::{
IncomingRequest,
OutgoingResponse,
ProtocolConfig as RequestResponseConfig,
};
pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr};

// Note: this re-export shouldn't be part of the public API of the crate and will be removed in
Expand Down
96 changes: 75 additions & 21 deletions client/network/src/request_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use std::{
pin::Pin, task::{Context, Poll}, time::Duration,
};
use wasm_timer::Instant;
use crate::ReputationChange;

pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId};

Expand Down Expand Up @@ -114,8 +115,27 @@ pub struct IncomingRequest {
/// [`ProtocolConfig::max_request_size`].
pub payload: Vec<u8>,

/// Channel to send back the response to.
pub pending_response: oneshot::Sender<Vec<u8>>,
/// Channel to send back the response.
///
/// There are two ways to indicate that handling the request failed:
///
/// 1. Drop `pending_response` and thus not changing the reputation of the peer.
///
/// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
/// the given peer.
pub pending_response: oneshot::Sender<OutgoingResponse>,
}

/// Response for an incoming request to be send by a request protocol handler.
#[derive(Debug)]
pub struct OutgoingResponse {
/// The payload of the response.
///
/// `Err(())` if none is available e.g. due an error while handling the request.
pub result: Result<Vec<u8>, ()>,
/// Reputation changes accrued while handling the request. To be applied to the reputation of
/// the peer sending the request.
pub reputation_changes: Vec<ReputationChange>,
}

/// Event generated by the [`RequestResponsesBehaviour`].
Expand Down Expand Up @@ -150,6 +170,12 @@ pub enum Event {
/// Result of the request.
result: Result<(), RequestFailure>
},

/// A request protocol handler issued reputation changes for the given peer.
ReputationChanges {
peer: PeerId,
changes: Vec<ReputationChange>,
}
}

/// Combination of a protocol name and a request id.
Expand Down Expand Up @@ -198,10 +224,11 @@ pub struct RequestResponsesBehaviour {

/// Generated by the response builder and waiting to be processed.
struct RequestProcessingOutcome {
peer: PeerId,
request_id: RequestId,
protocol: Cow<'static, str>,
inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
response: Vec<u8>,
response: OutgoingResponse,
}

impl RequestResponsesBehaviour {
Expand Down Expand Up @@ -406,30 +433,45 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
// Poll to see if any response is ready to be sent back.
while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
let RequestProcessingOutcome {
peer,
request_id,
protocol: protocol_name,
inner_channel,
response
response: OutgoingResponse {
result,
reputation_changes,
},
} = match outcome {
Some(outcome) => outcome,
// The response builder was too busy and thus the request was dropped. This is
// The response builder was too busy or handling the request failed. This is
// later on reported as a `InboundFailure::Omission`.
None => continue,
};

if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
if let Err(_) = protocol.send_response(inner_channel, Ok(response)) {
// Note: Failure is handled further below when receiving `InboundFailure`
// event from `RequestResponse` behaviour.
log::debug!(
target: "sub-libp2p",
"Failed to send response for {:?} on protocol {:?} due to a \
timeout or due to the connection to the peer being closed. \
Dropping response",
request_id, protocol_name,
);
if let Ok(payload) = result {
if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
if let Err(_) = protocol.send_response(inner_channel, Ok(payload)) {
// Note: Failure is handled further below when receiving
// `InboundFailure` event from `RequestResponse` behaviour.
log::debug!(
target: "sub-libp2p",
"Failed to send response for {:?} on protocol {:?} due to a \
timeout or due to the connection to the peer being closed. \
Dropping response",
request_id, protocol_name,
);
}
}
}

if !reputation_changes.is_empty() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
Event::ReputationChanges{
peer,
changes: reputation_changes,
},
));
}
}

// Poll request-responses protocols.
Expand Down Expand Up @@ -505,7 +547,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
// `InboundFailure::Omission` event.
if let Ok(response) = rx.await {
Some(RequestProcessingOutcome {
request_id, protocol, inner_channel: channel, response
peer, request_id, protocol, inner_channel: channel, response
})
} else {
None
Expand Down Expand Up @@ -851,7 +893,10 @@ mod tests {
pool.spawner().spawn_obj(async move {
while let Some(rq) = rx.next().await {
assert_eq!(rq.payload, b"this is a request");
let _ = rq.pending_response.send(b"this is a response".to_vec());
let _ = rq.pending_response.send(super::OutgoingResponse {
result: Ok(b"this is a response".to_vec()),
reputation_changes: Vec::new(),
});
}
}.boxed().into()).unwrap();

Expand Down Expand Up @@ -934,7 +979,10 @@ mod tests {
pool.spawner().spawn_obj(async move {
while let Some(rq) = rx.next().await {
assert_eq!(rq.payload, b"this is a request");
let _ = rq.pending_response.send(b"this response exceeds the limit".to_vec());
let _ = rq.pending_response.send(super::OutgoingResponse {
result: Ok(b"this response exceeds the limit".to_vec()),
reputation_changes: Vec::new(),
});
}
}.boxed().into()).unwrap();

Expand Down Expand Up @@ -1100,11 +1148,17 @@ mod tests {

protocol_1_request.unwrap()
.pending_response
.send(b"this is a response".to_vec())
.send(OutgoingResponse {
result: Ok(b"this is a response".to_vec()),
reputation_changes: Vec::new(),
})
.unwrap();
protocol_2_request.unwrap()
.pending_response
.send(b"this is a response".to_vec())
.send(OutgoingResponse {
result: Ok(b"this is a response".to_vec()),
reputation_changes: Vec::new(),
})
.unwrap();
}.boxed().into()).unwrap();

Expand Down

0 comments on commit addf203

Please sign in to comment.