diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml
index ab636bcb386..9901be942be 100644
--- a/.github/workflows/docs.yml
+++ b/.github/workflows/docs.yml
@@ -28,7 +28,7 @@ jobs:
echo "" > target/doc/index.html
cp -r target/doc/* ./host-docs
- name: Upload documentation
- uses: actions/upload-pages-artifact@v1.0.6
+ uses: actions/upload-pages-artifact@v1.0.7
with:
path: "host-docs/"
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8df2d37c210..511120d1a57 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -47,8 +47,11 @@
# 0.51.0 [unreleased]
+- Count bandwidth at the application level. Previously `BandwidthLogging` would implement `Transport` and now implements `StreamMuxer` ([PR 3180](https://github.com/libp2p/rust-libp2p/pull/3180)).
+ - `BandwidthLogging::new` now requires a 2nd argument: `Arc`
+ - Remove `BandwidthFuture`
+ - Rename `BandwidthConnecLogging` to `InstrumentedStream`
- Remove `SimpleProtocol` due to being unused. See [`libp2p::core::upgrade`](https://docs.rs/libp2p/0.50.0/libp2p/core/upgrade/index.html) for alternatives. See [PR 3191].
-
- Update individual crates.
- Update to [`libp2p-dcutr` `v0.9.0`](protocols/dcutr/CHANGELOG.md#090).
diff --git a/Cargo.toml b/Cargo.toml
index eaed637a2d8..976d9fa0002 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -136,6 +136,10 @@ env_logger = "0.10.0"
clap = { version = "4.0.13", features = ["derive"] }
tokio = { version = "1.15", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread"] }
+libp2p-mplex = { path = "muxers/mplex" }
+libp2p-noise = { path = "transports/noise" }
+libp2p-tcp = { path = "transports/tcp", features = ["tokio"] }
+
[workspace]
members = [
"core",
diff --git a/README.md b/README.md
index 1ce2acc06a0..23dfbf75a14 100644
--- a/README.md
+++ b/README.md
@@ -4,6 +4,7 @@
[![dependency status](https://deps.rs/repo/github/libp2p/rust-libp2p/status.svg?style=flat-square)](https://deps.rs/repo/github/libp2p/rust-libp2p)
[![Crates.io](https://img.shields.io/crates/v/libp2p.svg)](https://crates.io/crates/libp2p)
[![docs.rs](https://img.shields.io/badge/api-rustdoc-blue.svg)](https://docs.rs/libp2p)
+[![docs.rs master](https://img.shields.io/badge/docs-master-blueviolet)](https://libp2p.github.io/rust-libp2p/libp2p/)
This repository is the central place for Rust development of the [libp2p](https://libp2p.io) spec.
diff --git a/misc/metrics/src/dcutr.rs b/misc/metrics/src/dcutr.rs
index b90e784f9b7..9e6f06bb497 100644
--- a/misc/metrics/src/dcutr.rs
+++ b/misc/metrics/src/dcutr.rs
@@ -55,21 +55,21 @@ enum EventType {
DirectConnectionUpgradeFailed,
}
-impl From<&libp2p_dcutr::behaviour::Event> for EventType {
- fn from(event: &libp2p_dcutr::behaviour::Event) -> Self {
+impl From<&libp2p_dcutr::Event> for EventType {
+ fn from(event: &libp2p_dcutr::Event) -> Self {
match event {
- libp2p_dcutr::behaviour::Event::InitiatedDirectConnectionUpgrade {
+ libp2p_dcutr::Event::InitiatedDirectConnectionUpgrade {
remote_peer_id: _,
local_relayed_addr: _,
} => EventType::InitiateDirectConnectionUpgrade,
- libp2p_dcutr::behaviour::Event::RemoteInitiatedDirectConnectionUpgrade {
+ libp2p_dcutr::Event::RemoteInitiatedDirectConnectionUpgrade {
remote_peer_id: _,
remote_relayed_addr: _,
} => EventType::RemoteInitiatedDirectConnectionUpgrade,
- libp2p_dcutr::behaviour::Event::DirectConnectionUpgradeSucceeded {
- remote_peer_id: _,
- } => EventType::DirectConnectionUpgradeSucceeded,
- libp2p_dcutr::behaviour::Event::DirectConnectionUpgradeFailed {
+ libp2p_dcutr::Event::DirectConnectionUpgradeSucceeded { remote_peer_id: _ } => {
+ EventType::DirectConnectionUpgradeSucceeded
+ }
+ libp2p_dcutr::Event::DirectConnectionUpgradeFailed {
remote_peer_id: _,
error: _,
} => EventType::DirectConnectionUpgradeFailed,
@@ -77,8 +77,8 @@ impl From<&libp2p_dcutr::behaviour::Event> for EventType {
}
}
-impl super::Recorder for Metrics {
- fn record(&self, event: &libp2p_dcutr::behaviour::Event) {
+impl super::Recorder for Metrics {
+ fn record(&self, event: &libp2p_dcutr::Event) {
self.events
.get_or_create(&EventLabels {
event: event.into(),
diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs
index 3a4bf4bcbc3..aa9f3d924e7 100644
--- a/misc/metrics/src/lib.rs
+++ b/misc/metrics/src/lib.rs
@@ -100,8 +100,8 @@ pub trait Recorder {
}
#[cfg(feature = "dcutr")]
-impl Recorder for Metrics {
- fn record(&self, event: &libp2p_dcutr::behaviour::Event) {
+impl Recorder for Metrics {
+ fn record(&self, event: &libp2p_dcutr::Event) {
self.dcutr.record(event)
}
}
diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs
index 065be8ba259..a003ab56570 100644
--- a/misc/metrics/src/swarm.rs
+++ b/misc/metrics/src/swarm.rs
@@ -396,5 +396,5 @@ impl From<&libp2p_swarm::PendingInboundConnectionError>
}
fn create_connection_establishment_duration_histogram() -> Histogram {
- Histogram::new(exponential_buckets(1e-3, 2., 10))
+ Histogram::new(exponential_buckets(0.01, 1.5, 20))
}
diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md
index 23c9b616541..39f92d2772d 100644
--- a/protocols/dcutr/CHANGELOG.md
+++ b/protocols/dcutr/CHANGELOG.md
@@ -7,8 +7,12 @@
- Require the node's local `PeerId` to be passed into the constructor of `libp2p_dcutr::Behaviour`. See [PR 3153].
+- Rename types in public API to follow naming conventions defined in [issue 2217]. See [PR 3214].
+
[PR 3213]: https://github.com/libp2p/rust-libp2p/pull/3213
[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153
+[issue 2217]: https://github.com/libp2p/rust-libp2p/issues/2217
+[PR 3214]: https://github.com/libp2p/rust-libp2p/pull/3214
# 0.8.0
diff --git a/protocols/dcutr/examples/dcutr.rs b/protocols/dcutr/examples/dcutr.rs
index 553dffc4864..c574b0e9469 100644
--- a/protocols/dcutr/examples/dcutr.rs
+++ b/protocols/dcutr/examples/dcutr.rs
@@ -114,7 +114,7 @@ fn main() -> Result<(), Box> {
relay_client: client::Behaviour,
ping: ping::Behaviour,
identify: identify::Behaviour,
- dcutr: dcutr::behaviour::Behaviour,
+ dcutr: dcutr::Behaviour,
}
#[derive(Debug)]
@@ -123,7 +123,7 @@ fn main() -> Result<(), Box> {
Ping(ping::Event),
Identify(identify::Event),
Relay(client::Event),
- Dcutr(dcutr::behaviour::Event),
+ Dcutr(dcutr::Event),
}
impl From for Event {
@@ -144,8 +144,8 @@ fn main() -> Result<(), Box> {
}
}
- impl From for Event {
- fn from(e: dcutr::behaviour::Event) -> Self {
+ impl From for Event {
+ fn from(e: dcutr::Event) -> Self {
Event::Dcutr(e)
}
}
@@ -157,7 +157,7 @@ fn main() -> Result<(), Box> {
"/TODO/0.0.1".to_string(),
local_key.public(),
)),
- dcutr: dcutr::behaviour::Behaviour::new(local_peer_id),
+ dcutr: dcutr::Behaviour::new(local_peer_id),
};
let mut swarm = match ThreadPool::new() {
diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour_impl.rs
similarity index 98%
rename from protocols/dcutr/src/behaviour.rs
rename to protocols/dcutr/src/behaviour_impl.rs
index 15dfe078bfe..2d65197c5f8 100644
--- a/protocols/dcutr/src/behaviour.rs
+++ b/protocols/dcutr/src/behaviour_impl.rs
@@ -54,12 +54,12 @@ pub enum Event {
},
DirectConnectionUpgradeFailed {
remote_peer_id: PeerId,
- error: UpgradeError,
+ error: Error,
},
}
#[derive(Debug, Error)]
-pub enum UpgradeError {
+pub enum Error {
#[error("Failed to dial peer.")]
Dial,
#[error("Failed to establish substream: {0}.")]
@@ -164,7 +164,7 @@ impl Behaviour {
.into(),
NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed {
remote_peer_id: peer_id,
- error: UpgradeError::Dial,
+ error: Error::Dial,
})
.into(),
]);
@@ -236,7 +236,7 @@ impl NetworkBehaviour for Behaviour {
self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed {
remote_peer_id: event_source,
- error: UpgradeError::Handler(error),
+ error: Error::Handler(error),
})
.into(),
);
@@ -260,7 +260,7 @@ impl NetworkBehaviour for Behaviour {
self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed {
remote_peer_id: event_source,
- error: UpgradeError::Handler(error),
+ error: Error::Handler(error),
})
.into(),
);
diff --git a/protocols/dcutr/src/lib.rs b/protocols/dcutr/src/lib.rs
index 525b2a08d20..d32ad011b5b 100644
--- a/protocols/dcutr/src/lib.rs
+++ b/protocols/dcutr/src/lib.rs
@@ -23,7 +23,7 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
-pub mod behaviour;
+mod behaviour_impl; // TODO: Rename back `behaviour` once deprecation symbols are removed.
mod handler;
mod protocol;
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -31,6 +31,35 @@ mod message_proto {
include!(concat!(env!("OUT_DIR"), "/holepunch.pb.rs"));
}
+pub use behaviour_impl::Behaviour;
+pub use behaviour_impl::Error;
+pub use behaviour_impl::Event;
pub use protocol::PROTOCOL_NAME;
-pub type InboundUpgradeError = protocol::inbound::UpgradeError;
-pub type OutboundUpgradeError = protocol::outbound::UpgradeError;
+pub mod inbound {
+ pub use crate::protocol::inbound::UpgradeError;
+}
+pub mod outbound {
+ pub use crate::protocol::outbound::UpgradeError;
+}
+
+#[deprecated(
+ since = "0.9.0",
+ note = "Use `libp2p_dcutr::inbound::UpgradeError` instead.`"
+)]
+pub type InboundUpgradeError = inbound::UpgradeError;
+
+#[deprecated(
+ since = "0.9.0",
+ note = "Use `libp2p_dcutr::outbound::UpgradeError` instead.`"
+)]
+pub type OutboundUpgradeError = outbound::UpgradeError;
+pub mod behaviour {
+ #[deprecated(since = "0.9.0", note = "Use `libp2p_dcutr::Behaviour` instead.`")]
+ pub type Behaviour = crate::Behaviour;
+
+ #[deprecated(since = "0.9.0", note = "Use `libp2p_dcutr::Event` instead.`")]
+ pub type Event = crate::Event;
+
+ #[deprecated(since = "0.9.0", note = "Use `libp2p_dcutr::Error` instead.`")]
+ pub type UpgradeError = crate::Error;
+}
diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs
index 15f37bdb6a3..7119b0899c2 100644
--- a/protocols/dcutr/tests/lib.rs
+++ b/protocols/dcutr/tests/lib.rs
@@ -78,7 +78,7 @@ fn connect() {
pool.run_until(wait_for_connection_established(&mut src, &dst_relayed_addr));
match pool.run_until(wait_for_dcutr_event(&mut src)) {
- dcutr::behaviour::Event::RemoteInitiatedDirectConnectionUpgrade {
+ dcutr::Event::RemoteInitiatedDirectConnectionUpgrade {
remote_peer_id,
remote_relayed_addr,
} if remote_peer_id == dst_peer_id && remote_relayed_addr == dst_relayed_addr => {}
@@ -125,7 +125,7 @@ fn build_client() -> Swarm {
transport,
Client {
relay: behaviour,
- dcutr: dcutr::behaviour::Behaviour::new(local_peer_id),
+ dcutr: dcutr::Behaviour::new(local_peer_id),
},
local_peer_id,
)
@@ -153,13 +153,13 @@ where
)]
struct Client {
relay: relay::client::Behaviour,
- dcutr: dcutr::behaviour::Behaviour,
+ dcutr: dcutr::Behaviour,
}
#[derive(Debug)]
enum ClientEvent {
Relay(relay::client::Event),
- Dcutr(dcutr::behaviour::Event),
+ Dcutr(dcutr::Event),
}
impl From for ClientEvent {
@@ -168,8 +168,8 @@ impl From for ClientEvent {
}
}
-impl From for ClientEvent {
- fn from(event: dcutr::behaviour::Event) -> Self {
+impl From for ClientEvent {
+ fn from(event: dcutr::Event) -> Self {
ClientEvent::Dcutr(event)
}
}
@@ -242,7 +242,7 @@ async fn wait_for_new_listen_addr(client: &mut Swarm, new_addr: &Multiad
}
}
-async fn wait_for_dcutr_event(client: &mut Swarm) -> dcutr::behaviour::Event {
+async fn wait_for_dcutr_event(client: &mut Swarm) -> dcutr::Event {
loop {
match client.select_next_some().await {
SwarmEvent::Behaviour(ClientEvent::Dcutr(e)) => return e,
diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md
index 564476d987a..7b7293e6390 100644
--- a/protocols/floodsub/CHANGELOG.md
+++ b/protocols/floodsub/CHANGELOG.md
@@ -2,6 +2,10 @@
- Update to `libp2p-swarm` `v0.42.0`.
+- Read and write protocols messages via `prost-codec`. See [PR 3224].
+
+[pr 3224]: https://github.com/libp2p/rust-libp2p/pull/3224
+
# 0.41.0
- Update to `libp2p-core` `v0.38.0`.
diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml
index 5716da7d14d..778f949b655 100644
--- a/protocols/floodsub/Cargo.toml
+++ b/protocols/floodsub/Cargo.toml
@@ -11,6 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[dependencies]
+asynchronous-codec = "0.6"
cuckoofilter = "0.5.0"
fnv = "1.0"
futures = "0.3.1"
@@ -18,6 +19,7 @@ libp2p-core = { version = "0.38.0", path = "../../core" }
libp2p-swarm = { version = "0.42.0", path = "../../swarm" }
log = "0.4"
prost = "0.11"
+prost-codec = { version = "0.3", path = "../../misc/prost-codec" }
rand = "0.8"
smallvec = "1.6.1"
thiserror = "1.0.37"
diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs
index 104e92e49b8..fe3f2859437 100644
--- a/protocols/floodsub/src/protocol.rs
+++ b/protocols/floodsub/src/protocol.rs
@@ -20,14 +20,19 @@
use crate::rpc_proto;
use crate::topic::Topic;
+use asynchronous_codec::Framed;
use futures::{
io::{AsyncRead, AsyncWrite},
- AsyncWriteExt, Future,
+ Future,
};
-use libp2p_core::{upgrade, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo};
-use prost::Message;
+use futures::{SinkExt, StreamExt};
+use libp2p_core::{InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo};
use std::{io, iter, pin::Pin};
+const MAX_MESSAGE_LEN_BYTES: usize = 2048;
+
+const PROTOCOL_NAME: &[u8] = b"/floodsub/1.0.0";
+
/// Implementation of `ConnectionUpgrade` for the floodsub protocol.
#[derive(Debug, Clone, Default)]
pub struct FloodsubProtocol {}
@@ -44,7 +49,7 @@ impl UpgradeInfo for FloodsubProtocol {
type InfoIter = iter::Once;
fn protocol_info(&self) -> Self::InfoIter {
- iter::once(b"/floodsub/1.0.0")
+ iter::once(PROTOCOL_NAME)
}
}
@@ -53,19 +58,27 @@ where
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Output = FloodsubRpc;
- type Error = FloodsubDecodeError;
+ type Error = FloodsubError;
type Future = Pin> + Send>>;
- fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future {
+ fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
Box::pin(async move {
- let packet = upgrade::read_length_prefixed(&mut socket, 2048).await?;
- let rpc = rpc_proto::Rpc::decode(&packet[..]).map_err(DecodeError)?;
+ let mut framed = Framed::new(
+ socket,
+ prost_codec::Codec::::new(MAX_MESSAGE_LEN_BYTES),
+ );
+
+ let rpc = framed
+ .next()
+ .await
+ .ok_or_else(|| FloodsubError::ReadError(io::ErrorKind::UnexpectedEof.into()))?
+ .map_err(CodecError)?;
let mut messages = Vec::with_capacity(rpc.publish.len());
for publish in rpc.publish.into_iter() {
messages.push(FloodsubMessage {
source: PeerId::from_bytes(&publish.from.unwrap_or_default())
- .map_err(|_| FloodsubDecodeError::InvalidPeerId)?,
+ .map_err(|_| FloodsubError::InvalidPeerId)?,
data: publish.data.unwrap_or_default(),
sequence_number: publish.seqno.unwrap_or_default(),
topics: publish.topic_ids.into_iter().map(Topic::new).collect(),
@@ -93,21 +106,21 @@ where
/// Reach attempt interrupt errors.
#[derive(thiserror::Error, Debug)]
-pub enum FloodsubDecodeError {
- /// Error when reading the packet from the socket.
- #[error("Failed to read from socket")]
- ReadError(#[from] io::Error),
- /// Error when decoding the raw buffer into a protobuf.
- #[error("Failed to decode protobuf")]
- ProtobufError(#[from] DecodeError),
+pub enum FloodsubError {
/// Error when parsing the `PeerId` in the message.
#[error("Failed to decode PeerId from message")]
InvalidPeerId,
+ /// Error when decoding the raw buffer into a protobuf.
+ #[error("Failed to decode protobuf")]
+ ProtobufError(#[from] CodecError),
+ /// Error when reading the packet from the socket.
+ #[error("Failed to read from socket")]
+ ReadError(#[from] io::Error),
}
#[derive(thiserror::Error, Debug)]
#[error(transparent)]
-pub struct DecodeError(prost::DecodeError);
+pub struct CodecError(#[from] prost_codec::Error);
/// An RPC received by the floodsub system.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -123,7 +136,7 @@ impl UpgradeInfo for FloodsubRpc {
type InfoIter = iter::Once;
fn protocol_info(&self) -> Self::InfoIter {
- iter::once(b"/floodsub/1.0.0")
+ iter::once(PROTOCOL_NAME)
}
}
@@ -132,16 +145,17 @@ where
TSocket: AsyncWrite + AsyncRead + Send + Unpin + 'static,
{
type Output = ();
- type Error = io::Error;
+ type Error = CodecError;
type Future = Pin> + Send>>;
- fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future {
+ fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
Box::pin(async move {
- let bytes = self.into_bytes();
-
- upgrade::write_length_prefixed(&mut socket, bytes).await?;
- socket.close().await?;
-
+ let mut framed = Framed::new(
+ socket,
+ prost_codec::Codec::::new(MAX_MESSAGE_LEN_BYTES),
+ );
+ framed.send(self.into_rpc()).await?;
+ framed.close().await?;
Ok(())
})
}
@@ -149,8 +163,8 @@ where
impl FloodsubRpc {
/// Turns this `FloodsubRpc` into a message that can be sent to a substream.
- fn into_bytes(self) -> Vec {
- let rpc = rpc_proto::Rpc {
+ fn into_rpc(self) -> rpc_proto::Rpc {
+ rpc_proto::Rpc {
publish: self
.messages
.into_iter()
@@ -170,12 +184,7 @@ impl FloodsubRpc {
topic_id: Some(topic.topic.into()),
})
.collect(),
- };
-
- let mut buf = Vec::with_capacity(rpc.encoded_len());
- rpc.encode(&mut buf)
- .expect("Vec provides capacity as needed");
- buf
+ }
}
}
diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs
index 5d1b69ec0fb..0803137323a 100644
--- a/protocols/gossipsub/src/behaviour.rs
+++ b/protocols/gossipsub/src/behaviour.rs
@@ -25,7 +25,6 @@ use std::{
collections::{BTreeSet, HashMap},
fmt,
net::IpAddr,
- sync::Arc,
task::{Context, Poll},
time::Duration,
};
@@ -201,9 +200,6 @@ impl From for PublishConfig {
}
}
-type GossipsubNetworkBehaviourAction =
- NetworkBehaviourAction>;
-
/// Network behaviour that handles the gossipsub protocol.
///
/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If
@@ -223,7 +219,7 @@ pub struct Gossipsub<
config: GossipsubConfig,
/// Events that need to be yielded to the outside when polling.
- events: VecDeque,
+ events: VecDeque>,
/// Pools non-urgent control messages between heartbeats.
control_pool: HashMap>,
@@ -2903,7 +2899,7 @@ where
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
- event: Arc::new(GossipsubHandlerIn::Message(message)),
+ event: GossipsubHandlerIn::Message(message),
handler: NotifyHandler::Any,
})
}
@@ -3163,7 +3159,7 @@ where
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
- event: Arc::new(GossipsubHandlerIn::JoinedMesh),
+ event: GossipsubHandlerIn::JoinedMesh,
handler: NotifyHandler::One(connections.connections[0]),
});
break;
@@ -3449,10 +3445,7 @@ where
_: &mut impl PollParameters,
) -> Poll> {
if let Some(event) = self.events.pop_front() {
- return Poll::Ready(event.map_in(|e: Arc| {
- // clone send event reference if others references are present
- Arc::try_unwrap(e).unwrap_or_else(|e| (*e).clone())
- }));
+ return Poll::Ready(event);
}
// update scores
@@ -3499,7 +3492,7 @@ fn peer_added_to_mesh(
new_topics: Vec<&TopicHash>,
mesh: &HashMap>,
known_topics: Option<&BTreeSet>,
- events: &mut VecDeque,
+ events: &mut VecDeque>,
connections: &HashMap,
) {
// Ensure there is an active connection
@@ -3527,7 +3520,7 @@ fn peer_added_to_mesh(
// This is the first mesh the peer has joined, inform the handler
events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
- event: Arc::new(GossipsubHandlerIn::JoinedMesh),
+ event: GossipsubHandlerIn::JoinedMesh,
handler: NotifyHandler::One(connection_id),
});
}
@@ -3540,7 +3533,7 @@ fn peer_removed_from_mesh(
old_topic: &TopicHash,
mesh: &HashMap>,
known_topics: Option<&BTreeSet>,
- events: &mut VecDeque,
+ events: &mut VecDeque>,
connections: &HashMap,
) {
// Ensure there is an active connection
@@ -3566,7 +3559,7 @@ fn peer_removed_from_mesh(
// The peer is not in any other mesh, inform the handler
events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
- event: Arc::new(GossipsubHandlerIn::LeftMesh),
+ event: GossipsubHandlerIn::LeftMesh,
handler: NotifyHandler::One(*connection_id),
});
}
diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs
index 42c28061e88..f5fd8a3dc40 100644
--- a/protocols/gossipsub/src/behaviour/tests.rs
+++ b/protocols/gossipsub/src/behaviour/tests.rs
@@ -375,17 +375,17 @@ fn test_subscribe() {
.events
.iter()
.fold(vec![], |mut collected_subscriptions, e| match e {
- NetworkBehaviourAction::NotifyHandler { event, .. } => match **event {
- GossipsubHandlerIn::Message(ref message) => {
- for s in &message.subscriptions {
- if let Some(true) = s.subscribe {
- collected_subscriptions.push(s.clone())
- };
- }
- collected_subscriptions
+ NetworkBehaviourAction::NotifyHandler {
+ event: GossipsubHandlerIn::Message(ref message),
+ ..
+ } => {
+ for s in &message.subscriptions {
+ if let Some(true) = s.subscribe {
+ collected_subscriptions.push(s.clone())
+ };
}
- _ => collected_subscriptions,
- },
+ collected_subscriptions
+ }
_ => collected_subscriptions,
});
@@ -443,17 +443,17 @@ fn test_unsubscribe() {
.events
.iter()
.fold(vec![], |mut collected_subscriptions, e| match e {
- NetworkBehaviourAction::NotifyHandler { event, .. } => match **event {
- GossipsubHandlerIn::Message(ref message) => {
- for s in &message.subscriptions {
- if let Some(true) = s.subscribe {
- collected_subscriptions.push(s.clone())
- };
- }
- collected_subscriptions
+ NetworkBehaviourAction::NotifyHandler {
+ event: GossipsubHandlerIn::Message(ref message),
+ ..
+ } => {
+ for s in &message.subscriptions {
+ if let Some(true) = s.subscribe {
+ collected_subscriptions.push(s.clone())
+ };
}
- _ => collected_subscriptions,
- },
+ collected_subscriptions
+ }
_ => collected_subscriptions,
});
@@ -630,16 +630,16 @@ fn test_publish_without_flood_publishing() {
.events
.iter()
.fold(vec![], |mut collected_publish, e| match e {
- NetworkBehaviourAction::NotifyHandler { event, .. } => match **event {
- GossipsubHandlerIn::Message(ref message) => {
- let event = proto_to_message(message);
- for s in &event.messages {
- collected_publish.push(s.clone());
- }
- collected_publish
+ NetworkBehaviourAction::NotifyHandler {
+ event: GossipsubHandlerIn::Message(ref message),
+ ..
+ } => {
+ let event = proto_to_message(message);
+ for s in &event.messages {
+ collected_publish.push(s.clone());
}
- _ => collected_publish,
- },
+ collected_publish
+ }
_ => collected_publish,
});
@@ -720,16 +720,16 @@ fn test_fanout() {
.events
.iter()
.fold(vec![], |mut collected_publish, e| match e {
- NetworkBehaviourAction::NotifyHandler { event, .. } => match **event {
- GossipsubHandlerIn::Message(ref message) => {
- let event = proto_to_message(message);
- for s in &event.messages {
- collected_publish.push(s.clone());
- }
- collected_publish
+ NetworkBehaviourAction::NotifyHandler {
+ event: GossipsubHandlerIn::Message(ref message),
+ ..
+ } => {
+ let event = proto_to_message(message);
+ for s in &event.messages {
+ collected_publish.push(s.clone());
}
- _ => collected_publish,
- },
+ collected_publish
+ }
_ => collected_publish,
});
@@ -773,26 +773,25 @@ fn test_inject_connected() {
.events
.iter()
.filter(|e| match e {
- NetworkBehaviourAction::NotifyHandler { event, .. } => {
- if let GossipsubHandlerIn::Message(ref m) = **event {
- !m.subscriptions.is_empty()
- } else {
- false
- }
- }
+ NetworkBehaviourAction::NotifyHandler {
+ event: GossipsubHandlerIn::Message(ref m),
+ ..
+ } => !m.subscriptions.is_empty(),
_ => false,
})
.collect();
// check that there are two subscriptions sent to each peer
for sevent in send_events.clone() {
- if let NetworkBehaviourAction::NotifyHandler { event, .. } = sevent {
- if let GossipsubHandlerIn::Message(ref m) = **event {
- assert!(
- m.subscriptions.len() == 2,
- "There should be two subscriptions sent to each peer (1 for each topic)."
- );
- }
+ if let NetworkBehaviourAction::NotifyHandler {
+ event: GossipsubHandlerIn::Message(ref m),
+ ..
+ } = sevent
+ {
+ assert!(
+ m.subscriptions.len() == 2,
+ "There should be two subscriptions sent to each peer (1 for each topic)."
+ );
};
}
@@ -1018,7 +1017,7 @@ fn test_handle_iwant_msg_cached() {
.iter()
.fold(vec![], |mut collected_messages, e| match e {
NetworkBehaviourAction::NotifyHandler { event, .. } => {
- if let GossipsubHandlerIn::Message(ref m) = **event {
+ if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m);
for c in &event.messages {
collected_messages.push(c.clone())
@@ -1075,17 +1074,16 @@ fn test_handle_iwant_msg_cached_shifted() {
// is the message is being sent?
let message_exists = gs.events.iter().any(|e| match e {
- NetworkBehaviourAction::NotifyHandler { event, .. } => {
- if let GossipsubHandlerIn::Message(ref m) = **event {
- let event = proto_to_message(m);
- event
- .messages
- .iter()
- .map(|msg| gs.data_transform.inbound_transform(msg.clone()).unwrap())
- .any(|msg| gs.config.message_id(&msg) == msg_id)
- } else {
- false
- }
+ NetworkBehaviourAction::NotifyHandler {
+ event: GossipsubHandlerIn::Message(ref m),
+ ..
+ } => {
+ let event = proto_to_message(m);
+ event
+ .messages
+ .iter()
+ .map(|msg| gs.data_transform.inbound_transform(msg.clone()).unwrap())
+ .any(|msg| gs.config.message_id(&msg) == msg_id)
}
_ => false,
});
@@ -1317,17 +1315,17 @@ fn count_control_msgs(
+ gs.events
.iter()
.map(|e| match e {
- NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => {
- if let GossipsubHandlerIn::Message(ref m) = **event {
- let event = proto_to_message(m);
- event
- .control_msgs
- .iter()
- .filter(|m| filter(peer_id, m))
- .count()
- } else {
- 0
- }
+ NetworkBehaviourAction::NotifyHandler {
+ peer_id,
+ event: GossipsubHandlerIn::Message(ref m),
+ ..
+ } => {
+ let event = proto_to_message(m);
+ event
+ .control_msgs
+ .iter()
+ .filter(|m| filter(peer_id, m))
+ .count()
}
_ => 0,
})
@@ -1540,19 +1538,19 @@ fn do_forward_messages_to_explicit_peers() {
gs.events
.iter()
.filter(|e| match e {
- NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => {
- if let GossipsubHandlerIn::Message(ref m) = **event {
- let event = proto_to_message(m);
- peer_id == &peers[0]
- && event
- .messages
- .iter()
- .filter(|m| m.data == message.data)
- .count()
- > 0
- } else {
- false
- }
+ NetworkBehaviourAction::NotifyHandler {
+ peer_id,
+ event: GossipsubHandlerIn::Message(ref m),
+ ..
+ } => {
+ let event = proto_to_message(m);
+ peer_id == &peers[0]
+ && event
+ .messages
+ .iter()
+ .filter(|m| m.data == message.data)
+ .count()
+ > 0
}
_ => false,
})
@@ -2107,7 +2105,7 @@ fn test_flood_publish() {
.iter()
.fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { event, .. } => {
- if let GossipsubHandlerIn::Message(ref m) = **event {
+ if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m);
for s in &event.messages {
collected_publish.push(s.clone());
@@ -2668,7 +2666,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() {
.iter()
.fold(vec![], |mut collected_messages, e| match e {
NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => {
- if let GossipsubHandlerIn::Message(ref m) = **event {
+ if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m);
for c in &event.messages {
collected_messages.push((*peer_id, c.clone()))
@@ -2816,7 +2814,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() {
.iter()
.fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => {
- if let GossipsubHandlerIn::Message(ref m) = **event {
+ if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m);
for s in &event.messages {
collected_publish.push((*peer_id, s.clone()));
@@ -2873,7 +2871,7 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() {
.iter()
.fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => {
- if let GossipsubHandlerIn::Message(ref m) = **event {
+ if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m);
for s in &event.messages {
collected_publish.push((*peer_id, s.clone()));
@@ -4407,13 +4405,12 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() {
gs.events
.iter()
.map(|e| match e {
- NetworkBehaviourAction::NotifyHandler { event, .. } => {
- if let GossipsubHandlerIn::Message(ref m) = **event {
- let event = proto_to_message(m);
- event.messages.len()
- } else {
- 0
- }
+ NetworkBehaviourAction::NotifyHandler {
+ event: GossipsubHandlerIn::Message(ref m),
+ ..
+ } => {
+ let event = proto_to_message(m);
+ event.messages.len()
}
_ => 0,
})
@@ -4816,7 +4813,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() {
.fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => {
if peer_id == &p1 || peer_id == &p2 {
- if let GossipsubHandlerIn::Message(ref m) = **event {
+ if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m);
for s in &event.messages {
collected_publish.push(s.clone());
@@ -4873,7 +4870,7 @@ fn test_do_not_use_floodsub_in_fanout() {
.fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => {
if peer_id == &p1 || peer_id == &p2 {
- if let GossipsubHandlerIn::Message(ref m) = **event {
+ if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m);
for s in &event.messages {
collected_publish.push(s.clone());
@@ -5187,7 +5184,7 @@ fn test_subscribe_and_graft_with_negative_score() {
let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e {
NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => {
if peer_id == p1 {
- if let GossipsubHandlerIn::Message(m) = Arc::try_unwrap(event).unwrap() {
+ if let GossipsubHandlerIn::Message(m) = event {
Some(m)
} else {
None
diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs
index 68bcf912975..8fd563c37a6 100644
--- a/protocols/gossipsub/src/handler.rs
+++ b/protocols/gossipsub/src/handler.rs
@@ -64,7 +64,7 @@ pub enum HandlerEvent {
}
/// A message sent from the behaviour to the handler.
-#[derive(Debug, Clone)]
+#[derive(Debug)]
pub enum GossipsubHandlerIn {
/// A gossipsub message to send.
Message(crate::rpc_proto::Rpc),
diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs
index d012c94d2e0..37f41a8cd8a 100644
--- a/protocols/kad/src/handler.rs
+++ b/protocols/kad/src/handler.rs
@@ -580,7 +580,7 @@ where
)
}) {
*s = InboundSubstreamState::Cancelled;
- log::warn!(
+ log::debug!(
"New inbound substream to {:?} exceeds inbound substream limit. \
Removed older substream waiting to be reused.",
self.remote_peer_id,
diff --git a/src/bandwidth.rs b/src/bandwidth.rs
index a58eec95ddb..dc696ce07e2 100644
--- a/src/bandwidth.rs
+++ b/src/bandwidth.rs
@@ -18,20 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
-use crate::{
- core::{
- transport::{TransportError, TransportEvent},
- Transport,
- },
- Multiaddr,
-};
+use crate::core::muxing::{StreamMuxer, StreamMuxerEvent};
use futures::{
io::{IoSlice, IoSliceMut},
prelude::*,
ready,
};
-use libp2p_core::transport::ListenerId;
use std::{
convert::TryFrom as _,
io,
@@ -43,121 +36,86 @@ use std::{
task::{Context, Poll},
};
-/// Wraps around a `Transport` and counts the number of bytes that go through all the opened
-/// connections.
+/// Wraps around a [`StreamMuxer`] and counts the number of bytes that go through all the opened
+/// streams.
#[derive(Clone)]
#[pin_project::pin_project]
-pub struct BandwidthLogging {
+pub(crate) struct BandwidthLogging {
#[pin]
- inner: TInner,
+ inner: SMInner,
sinks: Arc,
}
-impl BandwidthLogging {
- /// Creates a new [`BandwidthLogging`] around the transport.
- pub fn new(inner: TInner) -> (Self, Arc) {
- let sink = Arc::new(BandwidthSinks {
- inbound: AtomicU64::new(0),
- outbound: AtomicU64::new(0),
- });
-
- let trans = BandwidthLogging {
- inner,
- sinks: sink.clone(),
- };
-
- (trans, sink)
+impl BandwidthLogging {
+ /// Creates a new [`BandwidthLogging`] around the stream muxer.
+ pub(crate) fn new(inner: SMInner, sinks: Arc) -> Self {
+ Self { inner, sinks }
}
}
-impl Transport for BandwidthLogging
+impl StreamMuxer for BandwidthLogging
where
- TInner: Transport,
+ SMInner: StreamMuxer,
{
- type Output = BandwidthConnecLogging;
- type Error = TInner::Error;
- type ListenerUpgrade = BandwidthFuture;
- type Dial = BandwidthFuture;
+ type Substream = InstrumentedStream;
+ type Error = SMInner::Error;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
- ) -> Poll> {
+ ) -> Poll> {
let this = self.project();
- match this.inner.poll(cx) {
- Poll::Ready(event) => {
- let event = event.map_upgrade({
- let sinks = this.sinks.clone();
- |inner| BandwidthFuture { inner, sinks }
- });
- Poll::Ready(event)
- }
- Poll::Pending => Poll::Pending,
- }
- }
-
- fn listen_on(&mut self, addr: Multiaddr) -> Result> {
- self.inner.listen_on(addr)
+ this.inner.poll(cx)
}
- fn remove_listener(&mut self, id: ListenerId) -> bool {
- self.inner.remove_listener(id)
- }
-
- fn dial(&mut self, addr: Multiaddr) -> Result> {
- let sinks = self.sinks.clone();
- self.inner
- .dial(addr)
- .map(move |fut| BandwidthFuture { inner: fut, sinks })
- }
-
- fn dial_as_listener(
- &mut self,
- addr: Multiaddr,
- ) -> Result> {
- let sinks = self.sinks.clone();
- self.inner
- .dial_as_listener(addr)
- .map(move |fut| BandwidthFuture { inner: fut, sinks })
- }
-
- fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option {
- self.inner.address_translation(server, observed)
+ fn poll_inbound(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll> {
+ let this = self.project();
+ let inner = ready!(this.inner.poll_inbound(cx)?);
+ let logged = InstrumentedStream {
+ inner,
+ sinks: this.sinks.clone(),
+ };
+ Poll::Ready(Ok(logged))
}
-}
-
-/// Wraps around a `Future` that produces a connection. Wraps the connection around a bandwidth
-/// counter.
-#[pin_project::pin_project]
-pub struct BandwidthFuture {
- #[pin]
- inner: TInner,
- sinks: Arc,
-}
-
-impl Future for BandwidthFuture {
- type Output = Result, TInner::Error>;
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
+ fn poll_outbound(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll> {
let this = self.project();
- let inner = ready!(this.inner.try_poll(cx)?);
- let logged = BandwidthConnecLogging {
+ let inner = ready!(this.inner.poll_outbound(cx)?);
+ let logged = InstrumentedStream {
inner,
sinks: this.sinks.clone(),
};
Poll::Ready(Ok(logged))
}
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ let this = self.project();
+ this.inner.poll_close(cx)
+ }
}
-/// Allows obtaining the average bandwidth of the connections created from a [`BandwidthLogging`].
+/// Allows obtaining the average bandwidth of the streams.
pub struct BandwidthSinks {
inbound: AtomicU64,
outbound: AtomicU64,
}
impl BandwidthSinks {
- /// Returns the total number of bytes that have been downloaded on all the connections spawned
- /// through the [`BandwidthLogging`].
+ /// Returns a new [`BandwidthSinks`].
+ pub(crate) fn new() -> Arc {
+ Arc::new(Self {
+ inbound: AtomicU64::new(0),
+ outbound: AtomicU64::new(0),
+ })
+ }
+
+ /// Returns the total number of bytes that have been downloaded on all the streams.
///
/// > **Note**: This method is by design subject to race conditions. The returned value should
/// > only ever be used for statistics purposes.
@@ -165,8 +123,7 @@ impl BandwidthSinks {
self.inbound.load(Ordering::Relaxed)
}
- /// Returns the total number of bytes that have been uploaded on all the connections spawned
- /// through the [`BandwidthLogging`].
+ /// Returns the total number of bytes that have been uploaded on all the streams.
///
/// > **Note**: This method is by design subject to race conditions. The returned value should
/// > only ever be used for statistics purposes.
@@ -175,15 +132,15 @@ impl BandwidthSinks {
}
}
-/// Wraps around an `AsyncRead + AsyncWrite` and logs the bandwidth that goes through it.
+/// Wraps around an [`AsyncRead`] + [`AsyncWrite`] and logs the bandwidth that goes through it.
#[pin_project::pin_project]
-pub struct BandwidthConnecLogging {
+pub(crate) struct InstrumentedStream {
#[pin]
- inner: TInner,
+ inner: SMInner,
sinks: Arc,
}
-impl AsyncRead for BandwidthConnecLogging {
+impl AsyncRead for InstrumentedStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -213,7 +170,7 @@ impl AsyncRead for BandwidthConnecLogging {
}
}
-impl AsyncWrite for BandwidthConnecLogging {
+impl AsyncWrite for InstrumentedStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
diff --git a/src/transport_ext.rs b/src/transport_ext.rs
index fa8926c8380..2a4c30f17e3 100644
--- a/src/transport_ext.rs
+++ b/src/transport_ext.rs
@@ -20,22 +20,74 @@
//! Provides the `TransportExt` trait.
-use crate::{bandwidth::BandwidthLogging, bandwidth::BandwidthSinks, Transport};
+use crate::core::{
+ muxing::{StreamMuxer, StreamMuxerBox},
+ transport::Boxed,
+ PeerId,
+};
+use crate::{
+ bandwidth::{BandwidthLogging, BandwidthSinks},
+ Transport,
+};
use std::sync::Arc;
/// Trait automatically implemented on all objects that implement `Transport`. Provides some
/// additional utilities.
pub trait TransportExt: Transport {
- /// Adds a layer on the `Transport` that logs all trafic that passes through the sockets
+ /// Adds a layer on the `Transport` that logs all trafic that passes through the streams
/// created by it.
///
- /// This method returns an `Arc` that can be used to retreive the total number
- /// of bytes transferred through the sockets.
- fn with_bandwidth_logging(self) -> (BandwidthLogging, Arc)
+ /// This method returns an `Arc` that can be used to retrieve the total number
+ /// of bytes transferred through the streams.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use libp2p_mplex as mplex;
+ /// use libp2p_noise as noise;
+ /// use libp2p_tcp as tcp;
+ /// use libp2p::{
+ /// core::upgrade,
+ /// identity,
+ /// TransportExt,
+ /// Transport,
+ /// };
+ ///
+ /// let id_keys = identity::Keypair::generate_ed25519();
+ ///
+ /// let transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true))
+ /// .upgrade(upgrade::Version::V1)
+ /// .authenticate(
+ /// noise::NoiseAuthenticated::xx(&id_keys)
+ /// .expect("Signing libp2p-noise static DH keypair failed."),
+ /// )
+ /// .multiplex(mplex::MplexConfig::new())
+ /// .boxed();
+ ///
+ /// let (transport, sinks) = transport.with_bandwidth_logging();
+ /// ```
+ fn with_bandwidth_logging(self) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc)
where
- Self: Sized,
+ Self: Sized + Send + Unpin + 'static,
+ Self::Dial: Send + 'static,
+ Self::ListenerUpgrade: Send + 'static,
+ Self::Error: Send + Sync,
+ Self::Output: Into<(PeerId, S)>,
+ S: StreamMuxer + Send + 'static,
+ S::Substream: Send + 'static,
+ S::Error: Send + Sync + 'static,
{
- BandwidthLogging::new(self)
+ let sinks = BandwidthSinks::new();
+ let sinks_copy = sinks.clone();
+ let transport = Transport::map(self, |output, _| {
+ let (peer_id, stream_muxer_box) = output.into();
+ (
+ peer_id,
+ StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sinks_copy)),
+ )
+ })
+ .boxed();
+ (transport, sinks)
}
}
diff --git a/transports/noise/CHANGELOG.md b/transports/noise/CHANGELOG.md
index 7bbab360faa..4deec2a4baa 100644
--- a/transports/noise/CHANGELOG.md
+++ b/transports/noise/CHANGELOG.md
@@ -1,3 +1,9 @@
+# 0.41.1 [unreleased]
+
+- Deprecate non-compliant noise implementation. We intend to remove it in a future release without replacement. See [PR 3227].
+
+[PR 3227]: https://github.com/libp2p/rust-libp2p/pull/3227
+
# 0.41.0
- Remove `prost::Error` from public API. See [PR 3058].
diff --git a/transports/noise/Cargo.toml b/transports/noise/Cargo.toml
index f0bf2864297..942618d2c57 100644
--- a/transports/noise/Cargo.toml
+++ b/transports/noise/Cargo.toml
@@ -3,7 +3,7 @@ name = "libp2p-noise"
edition = "2021"
rust-version = "1.60.0"
description = "Cryptographic handshake protocol using the noise framework."
-version = "0.41.0"
+version = "0.41.1"
authors = ["Parity Technologies "]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@@ -31,7 +31,7 @@ snow = { version = "0.9.0", features = ["default-resolver"], default-features =
[dev-dependencies]
async-io = "1.2.0"
-ed25519-compact = "1.0.11"
+ed25519-compact = "2.0.4"
env_logger = "0.10.0"
libp2p-tcp = { path = "../tcp", features = ["async-io"] }
libsodium-sys-stable = { version = "1.19.22", features = ["fetch-latest"] }
diff --git a/transports/noise/src/lib.rs b/transports/noise/src/lib.rs
index 45e5540df43..52f13ccac94 100644
--- a/transports/noise/src/lib.rs
+++ b/transports/noise/src/lib.rs
@@ -60,7 +60,9 @@ mod protocol;
pub use io::handshake::RemoteIdentity;
pub use io::NoiseOutput;
-pub use protocol::{x25519::X25519, x25519_spec::X25519Spec};
+#[allow(deprecated)]
+pub use protocol::x25519::X25519;
+pub use protocol::x25519_spec::X25519Spec;
pub use protocol::{AuthenticKeypair, Keypair, KeypairIdentity, PublicKey, SecretKey};
pub use protocol::{Protocol, ProtocolParams, IK, IX, XX};
diff --git a/transports/noise/src/protocol.rs b/transports/noise/src/protocol.rs
index 5a4ea04f518..7ec9746d5bc 100644
--- a/transports/noise/src/protocol.rs
+++ b/transports/noise/src/protocol.rs
@@ -245,7 +245,7 @@ impl snow::resolvers::CryptoResolver for Resolver {
fn resolve_dh(&self, choice: &snow::params::DHChoice) -> Option> {
if let snow::params::DHChoice::Curve25519 = choice {
- Some(Box::new(Keypair::::default()))
+ Some(Box::new(Keypair::::default()))
} else {
None
}
@@ -308,7 +308,7 @@ impl snow::types::Random for Rng {}
#[cfg(test)]
mod tests {
use super::*;
- use crate::X25519;
+ use crate::X25519Spec;
use once_cell::sync::Lazy;
#[test]
@@ -334,9 +334,9 @@ mod tests {
}
fn xx_builder(prologue: &'static [u8]) -> snow::Builder<'static> {
- X25519::params_xx().into_builder(prologue, TEST_KEY.secret(), None)
+ X25519Spec::params_xx().into_builder(prologue, TEST_KEY.secret(), None)
}
// Hack to work around borrow-checker.
- static TEST_KEY: Lazy> = Lazy::new(Keypair::::new);
+ static TEST_KEY: Lazy> = Lazy::new(Keypair::::new);
}
diff --git a/transports/noise/src/protocol/x25519.rs b/transports/noise/src/protocol/x25519.rs
index 067f67ca2c1..182d9905b39 100644
--- a/transports/noise/src/protocol/x25519.rs
+++ b/transports/noise/src/protocol/x25519.rs
@@ -23,6 +23,8 @@
//! **Note**: This set of protocols is not interoperable with other
//! libp2p implementations.
+#![allow(deprecated)]
+
use crate::{NoiseConfig, NoiseError, Protocol, ProtocolParams};
use curve25519_dalek::edwards::CompressedEdwardsY;
use libp2p_core::UpgradeInfo;
@@ -56,6 +58,10 @@ static PARAMS_XX: Lazy = Lazy::new(|| {
/// A X25519 key.
#[derive(Clone)]
+#[deprecated(
+ since = "0.41.1",
+ note = "Will be removed because it is not compliant with the official libp2p specification. Use `X25519Spec` instead."
+)]
pub struct X25519([u8; 32]);
impl AsRef<[u8]> for X25519 {
@@ -135,15 +141,6 @@ impl Protocol for X25519 {
}
impl Keypair {
- /// An "empty" keypair as a starting state for DH computations in `snow`,
- /// which get manipulated through the `snow::types::Dh` interface.
- pub(super) fn default() -> Self {
- Keypair {
- secret: SecretKey(X25519([0u8; 32])),
- public: PublicKey(X25519([0u8; 32])),
- }
- }
-
/// Create a new X25519 keypair.
pub fn new() -> Keypair {
let mut sk_bytes = [0u8; 32];
diff --git a/transports/noise/src/protocol/x25519_spec.rs b/transports/noise/src/protocol/x25519_spec.rs
index e114f10747c..87973463521 100644
--- a/transports/noise/src/protocol/x25519_spec.rs
+++ b/transports/noise/src/protocol/x25519_spec.rs
@@ -29,7 +29,7 @@ use rand::Rng;
use x25519_dalek::{x25519, X25519_BASEPOINT_BYTES};
use zeroize::Zeroize;
-use super::{x25519::X25519, *};
+use super::*;
/// Prefix of static key signatures for domain separation.
const STATIC_KEY_DOMAIN: &str = "noise-libp2p-static-key:";
@@ -51,6 +51,15 @@ impl Zeroize for X25519Spec {
}
impl Keypair {
+ /// An "empty" keypair as a starting state for DH computations in `snow`,
+ /// which get manipulated through the `snow::types::Dh` interface.
+ pub(super) fn default() -> Self {
+ Keypair {
+ secret: SecretKey(X25519Spec([0u8; 32])),
+ public: PublicKey(X25519Spec([0u8; 32])),
+ }
+ }
+
/// Create a new X25519 keypair.
pub fn new() -> Keypair {
let mut sk_bytes = [0u8; 32];
@@ -110,15 +119,18 @@ impl UpgradeInfo for NoiseConfig {
/// interoperable with other libp2p implementations.
impl Protocol for X25519Spec {
fn params_ik() -> ProtocolParams {
- X25519::params_ik()
+ #[allow(deprecated)]
+ x25519::X25519::params_ik()
}
fn params_ix() -> ProtocolParams {
- X25519::params_ix()
+ #[allow(deprecated)]
+ x25519::X25519::params_ix()
}
fn params_xx() -> ProtocolParams {
- X25519::params_xx()
+ #[allow(deprecated)]
+ x25519::X25519::params_xx()
}
fn public_from_bytes(bytes: &[u8]) -> Result, NoiseError> {
diff --git a/transports/noise/tests/smoke.rs b/transports/noise/tests/smoke.rs
index 56b5e139651..c92435e4c3e 100644
--- a/transports/noise/tests/smoke.rs
+++ b/transports/noise/tests/smoke.rs
@@ -28,7 +28,6 @@ use libp2p_core::upgrade::{apply_inbound, apply_outbound, Negotiated};
use libp2p_core::{identity, transport, upgrade};
use libp2p_noise::{
Keypair, NoiseAuthenticated, NoiseConfig, NoiseError, NoiseOutput, RemoteIdentity, X25519Spec,
- X25519,
};
use libp2p_tcp as tcp;
use log::info;
@@ -47,7 +46,7 @@ fn core_upgrade_compat() {
}
#[test]
-fn xx_spec() {
+fn xx() {
let _ = env_logger::try_init();
fn prop(mut messages: Vec) -> bool {
messages.truncate(5);
@@ -95,51 +94,6 @@ fn xx_spec() {
.quickcheck(prop as fn(Vec) -> bool)
}
-#[test]
-fn xx() {
- let _ = env_logger::try_init();
- fn prop(mut messages: Vec) -> bool {
- messages.truncate(5);
- let server_id = identity::Keypair::generate_ed25519();
- let client_id = identity::Keypair::generate_ed25519();
-
- let server_id_public = server_id.public();
- let client_id_public = client_id.public();
-
- let server_dh = Keypair::::new().into_authentic(&server_id).unwrap();
- let server_transport = tcp::async_io::Transport::default()
- .and_then(move |output, endpoint| {
- upgrade::apply(
- output,
- NoiseConfig::xx(server_dh),
- endpoint,
- upgrade::Version::V1,
- )
- })
- .and_then(move |out, _| expect_identity(out, &client_id_public))
- .boxed();
-
- let client_dh = Keypair::::new().into_authentic(&client_id).unwrap();
- let client_transport = tcp::async_io::Transport::default()
- .and_then(move |output, endpoint| {
- upgrade::apply(
- output,
- NoiseConfig::xx(client_dh),
- endpoint,
- upgrade::Version::V1,
- )
- })
- .and_then(move |out, _| expect_identity(out, &server_id_public))
- .boxed();
-
- run(server_transport, client_transport, messages);
- true
- }
- QuickCheck::new()
- .max_tests(30)
- .quickcheck(prop as fn(Vec) -> bool)
-}
-
#[test]
fn ix() {
let _ = env_logger::try_init();
@@ -151,7 +105,9 @@ fn ix() {
let server_id_public = server_id.public();
let client_id_public = client_id.public();
- let server_dh = Keypair::::new().into_authentic(&server_id).unwrap();
+ let server_dh = Keypair::::new()
+ .into_authentic(&server_id)
+ .unwrap();
let server_transport = tcp::async_io::Transport::default()
.and_then(move |output, endpoint| {
upgrade::apply(
@@ -164,7 +120,9 @@ fn ix() {
.and_then(move |out, _| expect_identity(out, &client_id_public))
.boxed();
- let client_dh = Keypair::::new().into_authentic(&client_id).unwrap();
+ let client_dh = Keypair::::new()
+ .into_authentic(&client_id)
+ .unwrap();
let client_transport = tcp::async_io::Transport::default()
.and_then(move |output, endpoint| {
upgrade::apply(
@@ -196,7 +154,9 @@ fn ik_xx() {
let client_id = identity::Keypair::generate_ed25519();
let client_id_public = client_id.public();
- let server_dh = Keypair::::new().into_authentic(&server_id).unwrap();
+ let server_dh = Keypair::::new()
+ .into_authentic(&server_id)
+ .unwrap();
let server_dh_public = server_dh.public_dh_key().clone();
let server_transport = tcp::async_io::Transport::default()
.and_then(move |output, endpoint| {
@@ -213,7 +173,9 @@ fn ik_xx() {
.and_then(move |out, _| expect_identity(out, &client_id_public))
.boxed();
- let client_dh = Keypair::::new().into_authentic(&client_id).unwrap();
+ let client_dh = Keypair::::new()
+ .into_authentic(&client_id)
+ .unwrap();
let server_id_public2 = server_id_public.clone();
let client_transport = tcp::async_io::Transport::default()
.and_then(move |output, endpoint| {