Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Implement request-responses protocols #6634

Merged
16 commits merged into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/cli/src/params/network_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl NetworkParams {
listen_addresses,
public_addresses,
notifications_protocols: Vec::new(),
request_response_protocols: Vec::new(),
node_key,
node_name: node_name.to_string(),
client_version: client_id.to_string(),
Expand Down
3 changes: 2 additions & 1 deletion client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
prost-build = "0.6.1"

[dependencies]
async-trait = "0.1"
async-std = { version = "1.6.2", features = ["unstable"] }
bitflags = "1.2.0"
bs58 = "0.3.1"
Expand Down Expand Up @@ -64,7 +65,7 @@ zeroize = "1.0.0"
[dependencies.libp2p]
version = "0.24.0"
default-features = false
features = ["identify", "kad", "mdns-async-std", "mplex", "noise", "ping", "tcp-async-std", "websocket", "yamux"]
features = ["identify", "kad", "mdns-async-std", "mplex", "noise", "ping", "request-response", "tcp-async-std", "websocket", "yamux"]

[dev-dependencies]
assert_matches = "1.3"
Expand Down
94 changes: 76 additions & 18 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use crate::{
config::{ProtocolId, Role}, block_requests, light_client_handler, finality_requests,
peer_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
peer_info, request_responses, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
protocol::{message::{self, Roles}, CustomMessageOutcome, NotificationsSink, Protocol},
ObservedRole, DhtEvent, ExHashT,
};
Expand All @@ -39,6 +39,10 @@ use std::{
time::Duration,
};

pub use crate::request_responses::{
ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, RequestId, SendRequestError
};

/// General behaviour of the network. Combines all protocols together.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOut<B>", poll_method = "poll")]
Expand All @@ -50,6 +54,8 @@ pub struct Behaviour<B: BlockT, H: ExHashT> {
peer_info: peer_info::PeerInfoBehaviour,
/// Discovers nodes of the network.
discovery: DiscoveryBehaviour,
/// Generic request-reponse protocols.
request_responses: request_responses::RequestResponsesBehaviour,
/// Block request handling.
block_requests: block_requests::BlockRequests<B>,
/// Finality proof request handling.
Expand All @@ -76,22 +82,40 @@ pub enum BehaviourOut<B: BlockT> {
RandomKademliaStarted(ProtocolId),

/// We have received a request from a peer and answered it.
AnsweredRequest {
///
/// This event is generated for statistics purposes.
InboundRequest {
/// Peer which sent us a request.
peer: PeerId,
/// Protocol name of the request.
protocol: String,
/// Time it took to build the response.
build_time: Duration,
protocol: Cow<'static, str>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do protocol names have to be strings instead of bytes?

Copy link
Contributor Author

@tomaka tomaka Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While libp2p allows bytes, I went for restricting this to strings in Substrate because:

  • In practice they're always strings anyway.
  • These protocol names are reported to Prometheus, and Prometheus expects strings (which is the reason for maybe_utf8_bytes_to_string).

/// If `Ok`, contains the time elapsed between when we received the request and when we
/// sent back the response. If `Err`, the error that happened.
result: Result<Duration, ResponseFailure>,
},

/// A request initiated using [`Behaviour::send_request`] has succeeded or failed.
RequestFinished {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RequestFinished {
OutboundRequestFinished {

Would be easier for me to understand without having to read the doc comment.

/// Request that has succeeded.
request_id: RequestId,
/// Response sent by the remote or reason for failure.
result: Result<Vec<u8>, RequestFailure>,
},

/// Started a new request with the given node.
RequestStarted {
///
/// This event is for statistics purposes only. The request and response handling are entirely
/// internal to the behaviour.
OpaqueRequestStarted {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
OpaqueRequestStarted {
OutboundRequestStarted {

Would that not be more descriptive? Would as well match with InboundRequest above.

peer: PeerId,
/// Protocol name of the request.
protocol: String,
},
/// Finished, successfully or not, a previously-started request.
RequestFinished {
///
/// This event is for statistics purposes only. The request and response handling are entirely
/// internal to the behaviour.
OpaqueRequestFinished {
/// Who we were requesting.
peer: PeerId,
/// Protocol name of the request.
Expand Down Expand Up @@ -161,17 +185,20 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
finality_proof_requests: finality_requests::FinalityProofRequests<B>,
light_client_handler: light_client_handler::LightClientHandler<B>,
disco_config: DiscoveryConfig,
) -> Self {
Behaviour {
request_response_protocols: Vec<request_responses::ProtocolConfig>,
) -> Result<Self, request_responses::RegisterError> {
Ok(Behaviour {
substrate,
peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key),
discovery: disco_config.finish(),
request_responses:
request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?,
block_requests,
finality_proof_requests,
light_client_handler,
events: VecDeque::new(),
role,
}
})
}

/// Returns the list of nodes that we know exist in the network.
Expand Down Expand Up @@ -208,6 +235,16 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
self.peer_info.node(peer_id)
}

/// Initiates sending a request.
///
/// An error is returned if we are not connected to the target peer of if the protocol doesn't
/// match one that has been registered.
pub fn send_request(&mut self, target: &PeerId, protocol: &str, request: Vec<u8>)
-> Result<RequestId, SendRequestError>
{
self.request_responses.send_request(target, protocol, request)
}

/// Registers a new notifications protocol.
///
/// Please call `event_stream` before registering a protocol, otherwise you may miss events
Expand Down Expand Up @@ -298,18 +335,18 @@ Behaviour<B, H> {
CustomMessageOutcome::BlockRequest { target, request } => {
match self.block_requests.send_request(&target, request) {
block_requests::SendRequestOutcome::Ok => {
self.events.push_back(BehaviourOut::RequestStarted {
self.events.push_back(BehaviourOut::OpaqueRequestStarted {
peer: target,
protocol: self.block_requests.protocol_name().to_owned(),
});
},
block_requests::SendRequestOutcome::Replaced { request_duration, .. } => {
self.events.push_back(BehaviourOut::RequestFinished {
self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: target.clone(),
protocol: self.block_requests.protocol_name().to_owned(),
request_duration,
});
self.events.push_back(BehaviourOut::RequestStarted {
self.events.push_back(BehaviourOut::OpaqueRequestStarted {
peer: target,
protocol: self.block_requests.protocol_name().to_owned(),
});
Expand Down Expand Up @@ -358,18 +395,39 @@ Behaviour<B, H> {
}
}

impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<request_responses::Event> for Behaviour<B, H> {
fn inject_event(&mut self, event: request_responses::Event) {
match event {
request_responses::Event::InboundRequest { peer, protocol, result } => {
self.events.push_back(BehaviourOut::InboundRequest {
peer,
protocol,
result,
});
}

request_responses::Event::RequestFinished { request_id, result } => {
self.events.push_back(BehaviourOut::RequestFinished {
request_id,
result,
});
},
}
}
}

impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B>> for Behaviour<B, H> {
fn inject_event(&mut self, event: block_requests::Event<B>) {
match event {
block_requests::Event::AnsweredRequest { peer, total_handling_time } => {
self.events.push_back(BehaviourOut::AnsweredRequest {
self.events.push_back(BehaviourOut::InboundRequest {
peer,
protocol: self.block_requests.protocol_name().to_owned(),
build_time: total_handling_time,
protocol: self.block_requests.protocol_name().to_owned().into(),
result: Ok(total_handling_time),
});
},
block_requests::Event::Response { peer, original_request: _, response, request_duration } => {
self.events.push_back(BehaviourOut::RequestFinished {
self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_owned(),
request_duration,
Expand All @@ -381,7 +439,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B
block_requests::Event::RequestTimeout { peer, request_duration, .. } => {
// There doesn't exist any mechanism to report cancellations or timeouts yet, so
// we process them by disconnecting the node.
self.events.push_back(BehaviourOut::RequestFinished {
self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_owned(),
request_duration,
Expand Down
13 changes: 8 additions & 5 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

pub use crate::chain::{Client, FinalityProofProvider};
pub use crate::on_demand_layer::{AlwaysBadChecker, OnDemand};
pub use crate::request_responses::{IncomingRequest, ProtocolConfig as RequestResponseConfig};
pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr};

// Note: this re-export shouldn't be part of the public API of the crate and will be removed in
Expand All @@ -34,9 +35,10 @@ use crate::ExHashT;

use core::{fmt, iter};
use futures::future;
use libp2p::identity::{ed25519, Keypair};
use libp2p::wasm_ext;
use libp2p::{multiaddr, Multiaddr, PeerId};
use libp2p::{
identity::{ed25519, Keypair},
multiaddr, wasm_ext, Multiaddr, PeerId,
};
use prometheus_endpoint::Registry;
use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue};
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
Expand Down Expand Up @@ -414,6 +416,8 @@ pub struct NetworkConfiguration {
/// List of notifications protocols that the node supports. Must also include a
/// `ConsensusEngineId` for backwards-compatibility.
pub notifications_protocols: Vec<(ConsensusEngineId, Cow<'static, [u8]>)>,
/// List of request-response protocols that the node supports.
pub request_response_protocols: Vec<RequestResponseConfig>,
/// Maximum allowed number of incoming connections.
pub in_peers: u32,
/// Number of outgoing connections we're trying to maintain.
Expand Down Expand Up @@ -449,6 +453,7 @@ impl NetworkConfiguration {
boot_nodes: Vec::new(),
node_key,
notifications_protocols: Vec::new(),
request_response_protocols: Vec::new(),
in_peers: 25,
out_peers: 75,
reserved_nodes: Vec::new(),
Expand All @@ -465,9 +470,7 @@ impl NetworkConfiguration {
allow_non_globals_in_dht: false,
}
}
}

impl NetworkConfiguration {
/// Create new default configuration for localhost-only connection with random port (useful for testing)
pub fn new_local() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new(
Expand Down
9 changes: 8 additions & 1 deletion client/network/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use crate::config::TransportConfig;
use libp2p::{PeerId, Multiaddr};

use std::fmt;
use std::{borrow::Cow, fmt};

/// Result type alias for the network.
pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -61,6 +61,12 @@ pub enum Error {
/// The invalid addresses.
addresses: Vec<Multiaddr>,
},
/// The same request-response protocol has been registered multiple times.
#[display(fmt = "Request-response protocol registered multiple times: {}", protocol)]
DuplicateRequestResponseProtocol {
/// Name of the protocol registered multiple times.
protocol: Cow<'static, str>,
},
}

// Make `Debug` use the `Display` implementation.
Expand All @@ -78,6 +84,7 @@ impl std::error::Error for Error {
Error::DuplicateBootnode { .. } => None,
Error::Prometheus(ref err) => Some(err),
Error::AddressesForAnotherTransport { .. } => None,
Error::DuplicateRequestResponseProtocol { .. } => None,
}
}
}
10 changes: 4 additions & 6 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ mod finality_requests;
mod light_client_handler;
mod on_demand_layer;
mod protocol;
mod request_responses;
mod schema;
mod service;
mod transport;
Expand All @@ -263,13 +264,10 @@ pub mod error;
pub mod gossip;
pub mod network_state;

pub use service::{NetworkService, NetworkWorker};
pub use protocol::PeerInfo;
pub use protocol::event::{Event, DhtEvent, ObservedRole};
pub use protocol::sync::SyncState;
pub use libp2p::{Multiaddr, PeerId};
#[doc(inline)]
pub use libp2p::multiaddr;
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use protocol::{event::{DhtEvent, Event, ObservedRole}, sync::SyncState, PeerInfo};
pub use service::{NetworkService, NetworkWorker, RequestFailure, OutboundFailure};

pub use sc_peerset::ReputationChange;
use sp_runtime::traits::{Block as BlockT, NumberFor};
Expand Down
Loading