diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index a04b92435ad..352d6f2529a 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -7,9 +7,14 @@ - Deprecate `StreamMuxerExt::next_{inbound,outbound}`. See [PR 3002]. +- Add `StreamMuxerBox::active_inbound_streams` and `StreamMuxerBox::active_outbound_streams`. See [PR 2878]. + +- Make `SubstreamBox::new` constructor module private. See [PR 2878]. + [PR 2915]: https://github.com/libp2p/rust-libp2p/pull/2915 [PR 2918]: https://github.com/libp2p/rust-libp2p/pull/2918 [PR 3002]: https://github.com/libp2p/rust-libp2p/pull/3002 +[PR 2878]: https://github.com/libp2p/rust-libp2p/pull/2878 # 0.36.0 diff --git a/core/src/connection.rs b/core/src/connection.rs index 91008408fe2..7b16e4b5bdc 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -19,6 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::multiaddr::{Multiaddr, Protocol}; +use std::fmt; /// Connection identifier. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] @@ -35,6 +36,12 @@ impl ConnectionId { } } +impl fmt::Display for ConnectionId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + impl std::ops::Add for ConnectionId { type Output = Self; diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index 99f7a87c6a5..3b0a70a7aa2 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -6,18 +6,27 @@ use std::fmt; use std::io; use std::io::{IoSlice, IoSliceMut}; use std::pin::Pin; +use std::sync::{Arc, Weak}; use std::task::{Context, Poll}; /// Abstract `StreamMuxer`. pub struct StreamMuxerBox { - inner: Pin + Send>>, + inner: Pin< + Box< + dyn private::InstrumentedStreamMuxer + + Send, + >, + >, } /// Abstract type for asynchronous reading and writing. /// /// A [`SubstreamBox`] erases the concrete type it is given and only retains its `AsyncRead` /// and `AsyncWrite` capabilities. -pub struct SubstreamBox(Pin>); +pub struct SubstreamBox { + inner: Pin>, + _counter: Weak<()>, +} #[pin_project] struct Wrap @@ -26,49 +35,79 @@ where { #[pin] inner: T, + inbound_counter: Arc<()>, + outbound_counter: Arc<()>, } -impl StreamMuxer for Wrap -where - T: StreamMuxer, - T::Substream: Send + 'static, - T::Error: Send + Sync + 'static, -{ - type Substream = SubstreamBox; - type Error = io::Error; +mod private { + use super::*; - fn poll_inbound( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.project() - .inner - .poll_inbound(cx) - .map_ok(SubstreamBox::new) - .map_err(into_io_error) + pub trait InstrumentedStreamMuxer: StreamMuxer { + fn active_inbound_streams(&self) -> usize; + fn active_outbound_streams(&self) -> usize; } - fn poll_outbound( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.project() - .inner - .poll_outbound(cx) - .map_ok(SubstreamBox::new) - .map_err(into_io_error) - } + impl StreamMuxer for Wrap + where + T: StreamMuxer, + T::Substream: Send + 'static, + T::Error: Send + Sync + 'static, + { + type Substream = SubstreamBox; + type Error = io::Error; - #[inline] - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_close(cx).map_err(into_io_error) + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let counter = Arc::downgrade(&self.inbound_counter); + + self.project() + .inner + .poll_inbound(cx) + .map_ok(|stream| SubstreamBox::new(stream, counter)) + .map_err(into_io_error) + } + + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let counter = Arc::downgrade(&self.outbound_counter); + + self.project() + .inner + .poll_outbound(cx) + .map_ok(|stream| SubstreamBox::new(stream, counter)) + .map_err(into_io_error) + } + + #[inline] + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_close(cx).map_err(into_io_error) + } + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().inner.poll(cx).map_err(into_io_error) + } } - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.project().inner.poll(cx).map_err(into_io_error) + impl InstrumentedStreamMuxer for Wrap + where + T: StreamMuxer, + T::Substream: Send + 'static, + T::Error: Send + Sync + 'static, + { + fn active_inbound_streams(&self) -> usize { + Arc::weak_count(&self.inbound_counter) + } + + fn active_outbound_streams(&self) -> usize { + Arc::weak_count(&self.outbound_counter) + } } } @@ -87,16 +126,35 @@ impl StreamMuxerBox { T::Substream: Send + 'static, T::Error: Send + Sync + 'static, { - let wrap = Wrap { inner: muxer }; + let wrap = Wrap { + inner: muxer, + inbound_counter: Arc::new(()), + outbound_counter: Arc::new(()), + }; StreamMuxerBox { inner: Box::pin(wrap), } } + /// Returns the number of active inbound streams i.e. streams that + /// have been returned from [`StreamMuxer::poll_inbound`] and have not been dropped since. + pub fn active_inbound_streams(&self) -> usize { + self.inner.active_inbound_streams() + } + + /// Returns the number of active outbound streams i.e. streams that + /// have been returned from [`StreamMuxer::poll_outbound`] and have not been dropped since. + pub fn active_outbound_streams(&self) -> usize { + self.inner.active_outbound_streams() + } + fn project( self: Pin<&mut Self>, - ) -> Pin<&mut (dyn StreamMuxer + Send)> { + ) -> Pin< + &mut (dyn private::InstrumentedStreamMuxer + + Send), + > { self.get_mut().inner.as_mut() } } @@ -133,15 +191,17 @@ impl StreamMuxer for StreamMuxerBox { } impl SubstreamBox { - /// Construct a new [`SubstreamBox`] from something that implements [`AsyncRead`] and [`AsyncWrite`]. - pub fn new(stream: S) -> Self { - Self(Box::pin(stream)) + fn new(stream: S, counter: Weak<()>) -> Self { + Self { + inner: Box::pin(stream), + _counter: counter, + } } } impl fmt::Debug for SubstreamBox { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "SubstreamBox({})", self.0.type_name()) + write!(f, "SubstreamBox({})", self.inner.type_name()) } } @@ -168,7 +228,7 @@ impl AsyncRead for SubstreamBox { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - self.0.as_mut().poll_read(cx, buf) + self.inner.as_mut().poll_read(cx, buf) } fn poll_read_vectored( @@ -176,7 +236,7 @@ impl AsyncRead for SubstreamBox { cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { - self.0.as_mut().poll_read_vectored(cx, bufs) + self.inner.as_mut().poll_read_vectored(cx, bufs) } } @@ -186,7 +246,7 @@ impl AsyncWrite for SubstreamBox { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - self.0.as_mut().poll_write(cx, buf) + self.inner.as_mut().poll_write(cx, buf) } fn poll_write_vectored( @@ -194,14 +254,129 @@ impl AsyncWrite for SubstreamBox { cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { - self.0.as_mut().poll_write_vectored(cx, bufs) + self.inner.as_mut().poll_write_vectored(cx, bufs) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.0.as_mut().poll_flush(cx) + self.inner.as_mut().poll_flush(cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.0.as_mut().poll_close(cx) + self.inner.as_mut().poll_close(cx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::muxing::StreamMuxerExt; + use futures::future::poll_fn; + + #[async_std::test] + async fn stream_muxer_box_tracks_alive_inbound_streams() { + let mut muxer = StreamMuxerBox::new(DummyStreamMuxer); + + let _stream1 = poll_fn(|cx| muxer.poll_inbound_unpin(cx)).await.unwrap(); + let _stream2 = poll_fn(|cx| muxer.poll_inbound_unpin(cx)).await.unwrap(); + + assert_eq!(muxer.active_inbound_streams(), 2); + + drop(_stream1); + + assert_eq!(muxer.active_inbound_streams(), 1); + } + + #[async_std::test] + async fn stream_muxer_box_tracks_alive_outbound_streams() { + let mut muxer = StreamMuxerBox::new(DummyStreamMuxer); + + let _stream1 = poll_fn(|cx| muxer.poll_outbound_unpin(cx)).await.unwrap(); + let _stream2 = poll_fn(|cx| muxer.poll_outbound_unpin(cx)).await.unwrap(); + + assert_eq!(muxer.active_outbound_streams(), 2); + + drop(_stream1); + + assert_eq!(muxer.active_outbound_streams(), 1); + } + + #[test] + fn stream_muxer_box_starts_with_zero_active_inbound_streams() { + let muxer = StreamMuxerBox::new(DummyStreamMuxer); + + let num_active_inbound_streams = muxer.active_inbound_streams(); + + assert_eq!(num_active_inbound_streams, 0); + } + + #[test] + fn stream_muxer_box_starts_with_zero_active_outbound_streams() { + let muxer = StreamMuxerBox::new(DummyStreamMuxer); + + let num_active_outbound_streams = muxer.active_outbound_streams(); + + assert_eq!(num_active_outbound_streams, 0); + } + + struct DummyStreamMuxer; + + impl StreamMuxer for DummyStreamMuxer { + type Substream = PendingSubstream; + type Error = void::Void; + + fn poll_inbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(PendingSubstream)) + } + + fn poll_outbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(PendingSubstream)) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Pending + } + + fn poll( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + } + + struct PendingSubstream; + + impl AsyncRead for PendingSubstream { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &mut [u8], + ) -> Poll> { + Poll::Pending + } + } + + impl AsyncWrite for PendingSubstream { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + ) -> Poll> { + Poll::Pending + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Pending + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Pending + } } } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 9ea2e6ea49f..9ceab441b29 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -195,6 +195,12 @@ impl ConnectionHandler for GossipsubHandler { self.listen_protocol.clone() } + // Gossipsub is supposed to have a single long-lived inbound substream. + // Don't allow the remote to create more than one at a time. + fn max_inbound_streams(&self) -> usize { + 1 + } + fn inject_fully_negotiated_inbound( &mut self, protocol: >::Output, diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 5be9ae17737..db4469099f1 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -523,6 +523,10 @@ where } } + fn max_inbound_streams(&self) -> usize { + MAX_NUM_INBOUND_SUBSTREAMS + } + fn inject_fully_negotiated_outbound( &mut self, protocol: >::Output, @@ -559,30 +563,6 @@ where self.protocol_status = ProtocolStatus::Confirmed; } - if self.inbound_substreams.len() == MAX_NUM_INBOUND_SUBSTREAMS { - if let Some(position) = self.inbound_substreams.iter().position(|s| { - matches!( - s, - // An inbound substream waiting to be reused. - InboundSubstreamState::WaitingMessage { first: false, .. } - ) - }) { - self.inbound_substreams.remove(position); - log::warn!( - "New inbound substream to {:?} exceeds inbound substream limit. \ - Removed older substream waiting to be reused.", - self.remote_peer_id, - ) - } else { - log::warn!( - "New inbound substream to {:?} exceeds inbound substream limit. \ - No older substream waiting to be reused. Dropping new substream.", - self.remote_peer_id, - ); - return; - } - } - debug_assert!(self.config.allow_listening); let connec_unique_id = self.next_connec_unique_id; self.next_connec_unique_id.0 += 1; diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index af1ef898981..0cac23fb064 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -239,6 +239,10 @@ impl ConnectionHandler for Handler { SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) } + fn max_inbound_streams(&self) -> usize { + 1 // There is only ever one active inbound stream. + } + fn inject_fully_negotiated_inbound(&mut self, stream: NegotiatedSubstream, (): ()) { self.inbound = Some(protocol::recv_ping(stream).boxed()); } diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index fd843b7d473..42148ccd175 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -62,6 +62,8 @@ where substream_timeout: Duration, /// The current connection keep-alive. keep_alive: KeepAlive, + /// The maximum number of allowed inbound streams. + max_inbound_streams: usize, /// A pending fatal error that results in the connection being closed. pending_error: Option>, /// Queue of events to emit in `poll()`. @@ -93,6 +95,7 @@ where codec: TCodec, keep_alive_timeout: Duration, substream_timeout: Duration, + max_inbound_streams: usize, inbound_request_id: Arc, ) -> Self { Self { @@ -101,6 +104,7 @@ where keep_alive: KeepAlive::Yes, keep_alive_timeout, substream_timeout, + max_inbound_streams, outbound: VecDeque::new(), inbound: FuturesUnordered::new(), pending_events: VecDeque::new(), @@ -199,8 +203,8 @@ where type Error = ConnectionHandlerUpgrErr; type InboundProtocol = ResponseProtocol; type OutboundProtocol = RequestProtocol; - type OutboundOpenInfo = RequestId; type InboundOpenInfo = RequestId; + type OutboundOpenInfo = RequestId; fn listen_protocol(&self) -> SubstreamProtocol { // A channel for notifying the handler when the inbound @@ -236,6 +240,10 @@ where SubstreamProtocol::new(proto, request_id).with_timeout(self.substream_timeout) } + fn max_inbound_streams(&self) -> usize { + self.max_inbound_streams + } + fn inject_fully_negotiated_inbound(&mut self, sent: bool, request_id: RequestId) { if sent { self.pending_events diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index c4e18d894fb..7bd3e2201be 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -267,6 +267,7 @@ impl fmt::Display for RequestId { pub struct RequestResponseConfig { request_timeout: Duration, connection_keep_alive: Duration, + max_pending_inbound_requests: usize, } impl Default for RequestResponseConfig { @@ -274,6 +275,7 @@ impl Default for RequestResponseConfig { Self { connection_keep_alive: Duration::from_secs(10), request_timeout: Duration::from_secs(10), + max_pending_inbound_requests: libp2p_swarm::handler::DEFAULT_MAX_INBOUND_STREAMS, } } } @@ -290,6 +292,16 @@ impl RequestResponseConfig { self.request_timeout = v; self } + + /// Sets the maximum number of pending inbound requests. + /// + /// Once this limit is hit, we will stop accepting requests from the remote until an + /// inbound substream is closed. Either because you send a response, a timeout is hit or + /// the remote closed the stream. + pub fn set_max_pending_inbound_requests(&mut self, n: usize) -> &mut Self { + self.max_pending_inbound_requests = n; + self + } } /// A request/response protocol for some message codec. @@ -573,6 +585,7 @@ where self.codec.clone(), self.config.connection_keep_alive, self.config.request_timeout, + self.config.max_pending_inbound_requests, self.next_inbound_id.clone(), ) } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index ec7b8e83d4f..7c90ce650ce 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -8,13 +8,19 @@ `libp2p_swarm::dummy::ConnectionHandler`. `dummy::ConnectionHandler` now literally does not do anything. In the same spirit, introduce `libp2p_swarm::keep_alive::Behaviour` and `libp2p_swarm::dummy::Behaviour`. See [PR 2859]. -[PR 2857]: https://github.com/libp2p/rust-libp2p/pull/2857 -[PR 2859]: https://github.com/libp2p/rust-libp2p/pull/2859/ - - Pass actual `PeerId` of dial to `NetworkBehaviour::inject_dial_failure` on `DialError::ConnectionLimit`. See [PR 2928]. -[PR 2928]: https://github.com/libp2p/rust-libp2p/pull/2928 +- Deprecate `SwarmBuilder::max_negotiating_inbound_streams` and introduce `ConnectionHandler::max_inbound_streams`. + The limit returned from `ConnectionHandler::max_inbound_streams` defines, how many inbound substreams are allowed to + be _active_ (i.e. "floating around") at any single time on this connection. This differs from what + `max_negotiating_inbound_streams` used to do in that we previously only limited the number of concurrent _upgrades_, + i.e. the `InboundUpgrade` phase of a substream but did otherwise not enforce a limit on how many substreams a remote + can open. The default value for `ConnectionHandler::max_inbound_streams` is 128. See [PR 2878]. +[PR 2857]: https://github.com/libp2p/rust-libp2p/pull/2857 +[PR 2859]: https://github.com/libp2p/rust-libp2p/pull/2859/ +[PR 2928]: https://github.com/libp2p/rust-libp2p/pull/2928 +[PR 2878]: https://github.com/libp2p/rust-libp2p/pull/2878 # 0.39.0 diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 8f91c236148..a3a03e84349 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -277,8 +277,8 @@ where type InboundProtocol = EitherUpgrade, SendWrapper>; type OutboundProtocol = TInner::OutboundProtocol; - type OutboundOpenInfo = TInner::OutboundOpenInfo; type InboundOpenInfo = Either; + type OutboundOpenInfo = TInner::OutboundOpenInfo; fn listen_protocol(&self) -> SubstreamProtocol { if let Some(inner) = self.inner.as_ref() { @@ -294,6 +294,13 @@ where } } + fn max_inbound_streams(&self) -> usize { + self.inner + .as_ref() + .map(|h| h.max_inbound_streams()) + .unwrap_or(0) + } + fn inject_fully_negotiated_inbound( &mut self, out: ::Output, diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 304f2b5c759..9090c04e32d 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -58,12 +58,14 @@ pub struct Connected { } /// Event generated by a [`Connection`]. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum Event { /// Event generated by the [`ConnectionHandler`]. Handler(T), /// Address of the remote has changed. AddressChange(Multiaddr), + /// This connection is operating at the maximum number of allowed substreams. + SubstreamLimitHit { num_streams: usize }, } /// A multiplexed connection to a peer with an associated [`ConnectionHandler`]. @@ -93,16 +95,10 @@ where shutdown: Shutdown, /// The substream upgrade protocol override, if any. substream_upgrade_protocol_override: Option, - /// The maximum number of inbound streams concurrently negotiating on a - /// connection. New inbound streams exceeding the limit are dropped and thus - /// reset. + /// The maximum number of inbound streams a connection. /// - /// Note: This only enforces a limit on the number of concurrently - /// negotiating inbound streams. The total number of inbound streams on a - /// connection is the sum of negotiating and negotiated streams. A limit on - /// the total number of streams can be enforced at the - /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. - max_negotiating_inbound_streams: usize, + /// New inbound streams exceeding the limit are dropped and thus reset. + legacy_max_negotiating_inbound_streams: Option, /// Contains all upgrades that are waiting for a new outbound substream. /// /// The upgrade timeout is already ticking here so this may fail in case the remote is not quick @@ -136,7 +132,7 @@ where muxer: StreamMuxerBox, handler: THandler, substream_upgrade_protocol_override: Option, - max_negotiating_inbound_streams: usize, + legacy_max_negotiating_inbound_streams: Option, ) -> Self { Connection { muxing: muxer, @@ -145,7 +141,7 @@ where negotiating_out: Default::default(), shutdown: Shutdown::None, substream_upgrade_protocol_override, - max_negotiating_inbound_streams, + legacy_max_negotiating_inbound_streams, requested_substreams: Default::default(), } } @@ -174,7 +170,7 @@ where negotiating_out, negotiating_in, shutdown, - max_negotiating_inbound_streams, + legacy_max_negotiating_inbound_streams, substream_upgrade_protocol_override, } = self.get_mut(); @@ -299,16 +295,24 @@ where } } - if negotiating_in.len() < *max_negotiating_inbound_streams { - match muxing.poll_inbound_unpin(cx)? { - Poll::Pending => {} - Poll::Ready(substream) => { - let protocol = handler.listen_protocol(); + // Compute effective limit whilst we are backwards compatible with the old config. + let effective_substream_limit = + legacy_max_negotiating_inbound_streams.unwrap_or(handler.max_inbound_streams()); - negotiating_in.push(SubstreamUpgrade::new_inbound(substream, protocol)); + if muxing.active_inbound_streams() >= effective_substream_limit { + return Poll::Ready(Ok(Event::SubstreamLimitHit { + num_streams: muxing.active_inbound_streams(), + })); + } - continue; // Go back to the top, handler can potentially make progress again. - } + match muxing.poll_inbound_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(substream) => { + let protocol = handler.listen_protocol(); + + negotiating_in.push(SubstreamUpgrade::new_inbound(substream, protocol)); + + continue; // Go back to the top, handler can potentially make progress again. } } @@ -552,13 +556,11 @@ enum Shutdown { #[cfg(test)] mod tests { use super::*; - use crate::keep_alive; - use futures::AsyncRead; use futures::AsyncWrite; + use futures::{future, AsyncRead}; use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::StreamMuxer; use quickcheck::*; - use std::sync::{Arc, Weak}; use void::Void; #[test] @@ -566,25 +568,26 @@ mod tests { fn prop(max_negotiating_inbound_streams: u8) { let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into(); - let alive_substream_counter = Arc::new(()); - let mut connection = Connection::new( - StreamMuxerBox::new(DummyStreamMuxer { - counter: alive_substream_counter.clone(), - }), - keep_alive::ConnectionHandler, + StreamMuxerBox::new(InfiniteStreamMuxer), + MockConnectionHandler::new( + Duration::from_secs(10), + max_negotiating_inbound_streams, + ), + None, None, - max_negotiating_inbound_streams, ); - let result = Pin::new(&mut connection) - .poll(&mut Context::from_waker(futures::task::noop_waker_ref())); + let event = future::poll_fn(|cx| Pin::new(&mut connection).poll(cx)) + .now_or_never() + .unwrap() + .unwrap(); - assert!(result.is_pending()); assert_eq!( - Arc::weak_count(&alive_substream_counter), - max_negotiating_inbound_streams, - "Expect no more than the maximum number of allowed streams" + event, + Event::SubstreamLimitHit { + num_streams: max_negotiating_inbound_streams + } ); } @@ -596,9 +599,9 @@ mod tests { let upgrade_timeout = Duration::from_secs(1); let mut connection = Connection::new( StreamMuxerBox::new(PendingStreamMuxer), - MockConnectionHandler::new(upgrade_timeout), + MockConnectionHandler::new(upgrade_timeout, 2), + None, None, - 2, ); connection.handler.open_new_outbound(); @@ -616,11 +619,10 @@ mod tests { )) } - struct DummyStreamMuxer { - counter: Arc<()>, - } + /// A [`StreamMuxer`] which produces an infinite number of streams. + struct InfiniteStreamMuxer; - impl StreamMuxer for DummyStreamMuxer { + impl StreamMuxer for InfiniteStreamMuxer { type Substream = PendingSubstream; type Error = Void; @@ -628,14 +630,14 @@ mod tests { self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { - Poll::Ready(Ok(PendingSubstream(Arc::downgrade(&self.counter)))) + Poll::Ready(Ok(PendingSubstream)) } fn poll_outbound( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { - Poll::Pending + Poll::Ready(Ok(PendingSubstream)) } fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { @@ -683,7 +685,7 @@ mod tests { } } - struct PendingSubstream(Weak<()>); + struct PendingSubstream; impl AsyncRead for PendingSubstream { fn poll_read( @@ -717,14 +719,16 @@ mod tests { outbound_requested: bool, error: Option>, upgrade_timeout: Duration, + max_streams: usize, } impl MockConnectionHandler { - fn new(upgrade_timeout: Duration) -> Self { + fn new(upgrade_timeout: Duration, max_streams: usize) -> Self { Self { outbound_requested: false, error: None, upgrade_timeout, + max_streams, } } @@ -748,6 +752,10 @@ mod tests { SubstreamProtocol::new(DeniedUpgrade, ()).with_timeout(self.upgrade_timeout) } + fn max_inbound_streams(&self) -> usize { + self.max_streams + } + fn inject_fully_negotiated_inbound( &mut self, protocol: ::Output, diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index c43d5efb61e..526fa2ebce2 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -89,9 +89,7 @@ where substream_upgrade_protocol_override: Option, /// The maximum number of inbound streams concurrently negotiating on a connection. - /// - /// See [`Connection::max_negotiating_inbound_streams`]. - max_negotiating_inbound_streams: usize, + legacy_max_negotiating_inbound_streams: Option, /// The executor to use for running the background tasks. If `None`, /// the tasks are kept in `local_spawns` instead and polled on the @@ -269,7 +267,7 @@ where task_command_buffer_size: config.task_command_buffer_size, dial_concurrency_factor: config.dial_concurrency_factor, substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, - max_negotiating_inbound_streams: config.max_negotiating_inbound_streams, + legacy_max_negotiating_inbound_streams: config.legacy_max_negotiating_inbound_streams, executor: config.executor, local_spawns: FuturesUnordered::new(), pending_connection_events_tx, @@ -751,7 +749,7 @@ where muxer, handler.into_handler(&obtained_peer_id, &endpoint), self.substream_upgrade_protocol_override, - self.max_negotiating_inbound_streams, + self.legacy_max_negotiating_inbound_streams, ); self.spawn( task::new_for_established_connection( @@ -1163,9 +1161,7 @@ pub struct PoolConfig { substream_upgrade_protocol_override: Option, /// The maximum number of inbound streams concurrently negotiating on a connection. - /// - /// See [`Connection::max_negotiating_inbound_streams`]. - max_negotiating_inbound_streams: usize, + legacy_max_negotiating_inbound_streams: Option, } impl Default for PoolConfig { @@ -1177,7 +1173,7 @@ impl Default for PoolConfig { // Set to a default of 8 based on frequency of dialer connections dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"), substream_upgrade_protocol_override: None, - max_negotiating_inbound_streams: 128, + legacy_max_negotiating_inbound_streams: None, } } } @@ -1238,10 +1234,8 @@ impl PoolConfig { } /// The maximum number of inbound streams concurrently negotiating on a connection. - /// - /// See [`Connection::max_negotiating_inbound_streams`]. pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self { - self.max_negotiating_inbound_streams = v; + self.legacy_max_negotiating_inbound_streams = Some(v); self } } diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index 866049e50da..c1283d2ca4c 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -22,6 +22,7 @@ //! Async functions driving pending and established connections in the form of a task. use super::concurrent_dial::ConcurrentDial; +use crate::connection::Event; use crate::{ connection::{ self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, @@ -248,6 +249,14 @@ pub async fn new_for_established_connection( .await; return; } + Ok(Event::SubstreamLimitHit { num_streams }) => { + // A handler might completely deny inbound streams at which point this would spam the logs. + // Very likely, this log is only useful if we configured a limit > 0. + if num_streams > 0 { + // This is not necessarily bad but an indication that we have a remote peer that is keeping us very busy. + log::info!("Connection {connection_id} to peer {peer_id} is operating at the limit of allowed substeams ({num_streams})"); + } + } } } } diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 5c60f2bf24a..596e090b18e 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -58,6 +58,8 @@ pub use one_shot::{OneShotHandler, OneShotHandlerConfig}; pub use pending::PendingConnectionHandler; pub use select::{ConnectionHandlerSelect, IntoConnectionHandlerSelect}; +pub const DEFAULT_MAX_INBOUND_STREAMS: usize = 128; + /// A handler for a set of protocols used on a connection with a remote. /// /// This trait should be implemented for a type that maintains the state for @@ -115,6 +117,13 @@ pub trait ConnectionHandler: Send + 'static { /// > This allows a remote to put the list of supported protocols in a cache. fn listen_protocol(&self) -> SubstreamProtocol; + /// The maximum number of inbound substreams allowed on the underlying connection. + /// + /// Once this limit is hit, we will stop accepting new inbound streams from the remote. + fn max_inbound_streams(&self) -> usize { + DEFAULT_MAX_INBOUND_STREAMS + } + /// Injects the output of a successful upgrade on a new inbound substream. /// /// Note that it is up to the [`ConnectionHandler`] implementation to manage the lifetime of the diff --git a/swarm/src/handler/either.rs b/swarm/src/handler/either.rs index 21d386ec56a..30a2a0e57bb 100644 --- a/swarm/src/handler/either.rs +++ b/swarm/src/handler/either.rs @@ -94,33 +94,37 @@ where } } - fn inject_fully_negotiated_outbound( + fn max_inbound_streams(&self) -> usize { + either::for_both!(self, inner => inner.max_inbound_streams()) + } + + fn inject_fully_negotiated_inbound( &mut self, - output: ::Output, - info: Self::OutboundOpenInfo, + output: ::Output, + info: Self::InboundOpenInfo, ) { match (self, output, info) { (Either::Left(handler), EitherOutput::First(output), Either::Left(info)) => { - handler.inject_fully_negotiated_outbound(output, info) + handler.inject_fully_negotiated_inbound(output, info) } (Either::Right(handler), EitherOutput::Second(output), Either::Right(info)) => { - handler.inject_fully_negotiated_outbound(output, info) + handler.inject_fully_negotiated_inbound(output, info) } _ => unreachable!(), } } - fn inject_fully_negotiated_inbound( + fn inject_fully_negotiated_outbound( &mut self, - output: ::Output, - info: Self::InboundOpenInfo, + output: ::Output, + info: Self::OutboundOpenInfo, ) { match (self, output, info) { (Either::Left(handler), EitherOutput::First(output), Either::Left(info)) => { - handler.inject_fully_negotiated_inbound(output, info) + handler.inject_fully_negotiated_outbound(output, info) } (Either::Right(handler), EitherOutput::Second(output), Either::Right(info)) => { - handler.inject_fully_negotiated_inbound(output, info) + handler.inject_fully_negotiated_outbound(output, info) } _ => unreachable!(), } diff --git a/swarm/src/handler/map_in.rs b/swarm/src/handler/map_in.rs index a209225045e..39471d65d22 100644 --- a/swarm/src/handler/map_in.rs +++ b/swarm/src/handler/map_in.rs @@ -64,6 +64,10 @@ where self.inner.listen_protocol() } + fn max_inbound_streams(&self) -> usize { + self.inner.max_inbound_streams() + } + fn inject_fully_negotiated_inbound( &mut self, protocol: ::Output, diff --git a/swarm/src/handler/map_out.rs b/swarm/src/handler/map_out.rs index 2eb0c2f9bdc..a750e807956 100644 --- a/swarm/src/handler/map_out.rs +++ b/swarm/src/handler/map_out.rs @@ -59,6 +59,10 @@ where self.inner.listen_protocol() } + fn max_inbound_streams(&self) -> usize { + self.inner.max_inbound_streams() + } + fn inject_fully_negotiated_inbound( &mut self, protocol: ::Output, diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs index 07c1168b132..e996d3d3271 100644 --- a/swarm/src/handler/multi.rs +++ b/swarm/src/handler/multi.rs @@ -121,6 +121,14 @@ where SubstreamProtocol::new(upgrade, info).with_timeout(timeout) } + fn max_inbound_streams(&self) -> usize { + self.handlers + .values() + .map(|h| h.max_inbound_streams()) + .max() + .unwrap_or(0) // No handlers? No substreams. + } + fn inject_fully_negotiated_outbound( &mut self, protocol: ::Output, diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 70a6f3c26f6..36acf8b9850 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -29,6 +29,7 @@ use libp2p_core::{ upgrade::{EitherUpgrade, NegotiationError, ProtocolError, SelectUpgrade, UpgradeError}, ConnectedPoint, Multiaddr, PeerId, }; +use std::cmp::max; use std::{cmp, task::Context, task::Poll}; /// Implementation of `IntoConnectionHandler` that combines two protocols into one. @@ -113,8 +114,8 @@ where SendWrapper, SendWrapper, >; - type OutboundOpenInfo = EitherOutput; type InboundOpenInfo = (TProto1::InboundOpenInfo, TProto2::InboundOpenInfo); + type OutboundOpenInfo = EitherOutput; fn listen_protocol(&self) -> SubstreamProtocol { let proto1 = self.proto1.listen_protocol(); @@ -126,6 +127,28 @@ where SubstreamProtocol::new(choice, (i1, i2)).with_timeout(timeout) } + fn max_inbound_streams(&self) -> usize { + max( + self.proto1.max_inbound_streams(), + self.proto2.max_inbound_streams(), + ) + } + + fn inject_fully_negotiated_inbound( + &mut self, + protocol: ::Output, + (i1, i2): Self::InboundOpenInfo, + ) { + match protocol { + EitherOutput::First(protocol) => { + self.proto1.inject_fully_negotiated_inbound(protocol, i1) + } + EitherOutput::Second(protocol) => { + self.proto2.inject_fully_negotiated_inbound(protocol, i2) + } + } + } + fn inject_fully_negotiated_outbound( &mut self, protocol: ::Output, @@ -147,21 +170,6 @@ where } } - fn inject_fully_negotiated_inbound( - &mut self, - protocol: ::Output, - (i1, i2): Self::InboundOpenInfo, - ) { - match protocol { - EitherOutput::First(protocol) => { - self.proto1.inject_fully_negotiated_inbound(protocol, i1) - } - EitherOutput::Second(protocol) => { - self.proto2.inject_fully_negotiated_inbound(protocol, i2) - } - } - } - fn inject_event(&mut self, event: Self::InEvent) { match event { EitherOutput::First(event) => self.proto1.inject_event(event), diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 3979bcfecba..d33d6eb7629 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1379,6 +1379,10 @@ where /// connection is the sum of negotiating and negotiated streams. A limit on /// the total number of streams can be enforced at the /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. + #[deprecated( + since = "0.40.0", + note = "Override `ConnectionHandler::max_inbound_streams instead.`" + )] pub fn max_negotiating_inbound_streams(mut self, v: usize) -> Self { self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v); self