Skip to content

Commit

Permalink
feat(swarm): don't have ConnectionHandlers close connections
Browse files Browse the repository at this point in the history
This PR implements the long-awaited design of disallowing `ConnectionHandler`s to close entire connections. Instead, users should close connections via `ToSwarm::CloseConnection` from a `NetworkBehaviour` or - even better - from the `Swarm` via `close_connection`. A `NetworkBehaviour` also does not have a "full" view onto how a connection is used but at least it can correlate whether it created the connection via the `ConnectionId`. In general, the more modular and friendly approach is to stop "using" a connection if a particular protocol no longer needs it. As a result of the keep-alive algorithm, such a connection is then closed automatically.

Depends-on: #4745.
Depends-on: #4718.
Depends-on: #4749.
Related: #3353.
Related: #4714.
Resolves: #3591.

Pull-Request: #4755.
  • Loading branch information
thomaseizinger authored Nov 2, 2023
1 parent e6905fe commit 0ef6feb
Show file tree
Hide file tree
Showing 33 changed files with 85 additions and 303 deletions.
7 changes: 1 addition & 6 deletions examples/file-sharing/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use async_std::io;
use either::Either;
use futures::channel::{mpsc, oneshot};
use futures::prelude::*;

Expand Down Expand Up @@ -208,10 +206,7 @@ impl EventLoop {
}
}

async fn handle_event(
&mut self,
event: SwarmEvent<BehaviourEvent, Either<void::Void, io::Error>>,
) {
async fn handle_event(&mut self, event: SwarmEvent<BehaviourEvent>) {
match event {
SwarmEvent::Behaviour(BehaviourEvent::Kademlia(
kad::Event::OutboundQueryProgressed {
Expand Down
4 changes: 2 additions & 2 deletions misc/metrics/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ impl super::Recorder<libp2p_identify::Event> for Metrics {
}
}

impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
impl<TBvEv> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv>) {
if let libp2p_swarm::SwarmEvent::ConnectionClosed {
peer_id,
num_established,
Expand Down
4 changes: 2 additions & 2 deletions misc/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ impl Recorder<libp2p_relay::Event> for Metrics {
}
}

impl<TBvEv, THandleErr> Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
impl<TBvEv> Recorder<libp2p_swarm::SwarmEvent<TBvEv>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv>) {
self.swarm.record(event);

#[cfg(feature = "identify")]
Expand Down
10 changes: 4 additions & 6 deletions misc/metrics/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ impl Metrics {
}
}

impl<TBvEv, THandleErr> super::Recorder<SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &SwarmEvent<TBvEv, THandleErr>) {
impl<TBvEv> super::Recorder<SwarmEvent<TBvEv>> for Metrics {
fn record(&self, event: &SwarmEvent<TBvEv>) {
match event {
SwarmEvent::Behaviour(_) => {}
SwarmEvent::ConnectionEstablished {
Expand Down Expand Up @@ -359,15 +359,13 @@ struct ConnectionClosedLabels {
enum ConnectionError {
Io,
KeepAliveTimeout,
Handler,
}

impl<E> From<&libp2p_swarm::ConnectionError<E>> for ConnectionError {
fn from(value: &libp2p_swarm::ConnectionError<E>) -> Self {
impl From<&libp2p_swarm::ConnectionError> for ConnectionError {
fn from(value: &libp2p_swarm::ConnectionError) -> Self {
match value {
libp2p_swarm::ConnectionError::IO(_) => ConnectionError::Io,
libp2p_swarm::ConnectionError::KeepAliveTimeout => ConnectionError::KeepAliveTimeout,
libp2p_swarm::ConnectionError::Handler(_) => ConnectionError::Handler,
}
}
}
Expand Down
10 changes: 1 addition & 9 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use std::collections::VecDeque;
use std::io;
use std::task::{Context, Poll};
use std::time::Duration;
use void::Void;

#[derive(Debug)]
pub enum Command {
Expand All @@ -63,7 +62,6 @@ pub struct Handler {
<Self as ConnectionHandler>::OutboundProtocol,
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::ToBehaviour,
<Self as ConnectionHandler>::Error,
>,
>,

Expand Down Expand Up @@ -182,7 +180,6 @@ impl Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = Command;
type ToBehaviour = Event;
type Error = Void;
type InboundProtocol = Either<ReadyUpgrade<StreamProtocol>, DeniedUpgrade>;
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
type OutboundOpenInfo = ();
Expand Down Expand Up @@ -229,12 +226,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
// Return queued events.
if let Some(event) = self.queued_events.pop_front() {
Expand Down
10 changes: 1 addition & 9 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use void::Void;

/// The event emitted by the Handler. This informs the behaviour of various events created
/// by the handler.
Expand Down Expand Up @@ -220,7 +219,6 @@ impl EnabledHandler {
<Handler as ConnectionHandler>::OutboundProtocol,
<Handler as ConnectionHandler>::OutboundOpenInfo,
<Handler as ConnectionHandler>::ToBehaviour,
<Handler as ConnectionHandler>::Error,
>,
> {
if !self.peer_kind_sent {
Expand Down Expand Up @@ -391,7 +389,6 @@ impl EnabledHandler {
impl ConnectionHandler for Handler {
type FromBehaviour = HandlerIn;
type ToBehaviour = HandlerEvent;
type Error = Void;
type InboundOpenInfo = ();
type InboundProtocol = either::Either<ProtocolConfig, DeniedUpgrade>;
type OutboundOpenInfo = ();
Expand Down Expand Up @@ -434,12 +431,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
match self {
Handler::Enabled(handler) => handler.poll(cx),
Expand Down
8 changes: 2 additions & 6 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use libp2p_swarm::{
};
use smallvec::SmallVec;
use std::collections::HashSet;
use std::{io, task::Context, task::Poll, time::Duration};
use std::{task::Context, task::Poll, time::Duration};
use tracing::Level;

const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
Expand All @@ -57,7 +57,6 @@ pub struct Handler {
Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>,
(),
Event,
io::Error,
>; 4],
>,

Expand Down Expand Up @@ -282,7 +281,6 @@ impl Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = InEvent;
type ToBehaviour = Event;
type Error = io::Error;
type InboundProtocol =
SelectUpgrade<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
type OutboundProtocol = Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
Expand Down Expand Up @@ -320,9 +318,7 @@ impl ConnectionHandler for Handler {
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Event, Self::Error>,
> {
) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Event>> {
if let Some(event) = self.events.pop() {
return Poll::Ready(event);
}
Expand Down
12 changes: 3 additions & 9 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,6 @@ impl Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = HandlerIn;
type ToBehaviour = HandlerEvent;
type Error = io::Error; // TODO: better error type?
type InboundProtocol = Either<ProtocolConfig, upgrade::DeniedUpgrade>;
type OutboundProtocol = ProtocolConfig;
type OutboundOpenInfo = ();
Expand Down Expand Up @@ -711,12 +710,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
match &mut self.protocol_status {
Some(status) if !status.reported => {
Expand Down Expand Up @@ -846,7 +840,7 @@ impl Handler {
}

impl futures::Stream for OutboundSubstreamState {
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent, io::Error>;
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Expand Down Expand Up @@ -978,7 +972,7 @@ impl futures::Stream for OutboundSubstreamState {
}

impl futures::Stream for InboundSubstreamState {
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent, io::Error>;
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Expand Down
10 changes: 1 addition & 9 deletions protocols/perf/src/client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use libp2p_swarm::{
},
ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol,
};
use void::Void;

use crate::client::{RunError, RunId};
use crate::{RunParams, RunUpdate};
Expand All @@ -59,7 +58,6 @@ pub struct Handler {
<Self as ConnectionHandler>::OutboundProtocol,
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::ToBehaviour,
<Self as ConnectionHandler>::Error,
>,
>,

Expand Down Expand Up @@ -87,7 +85,6 @@ impl Default for Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = Command;
type ToBehaviour = Event;
type Error = Void;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
type OutboundOpenInfo = ();
Expand Down Expand Up @@ -159,12 +156,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event);
Expand Down
8 changes: 1 addition & 7 deletions protocols/perf/src/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ impl Default for Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = Void;
type ToBehaviour = Event;
type Error = Void;
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = Void;
Expand Down Expand Up @@ -121,12 +120,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
loop {
match self.inbound.poll_unpin(cx) {
Expand Down
11 changes: 2 additions & 9 deletions protocols/ping/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ impl Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = Void;
type ToBehaviour = Result<Duration, Failure>;
type Error = Void;
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
type OutboundOpenInfo = ();
Expand All @@ -225,14 +224,8 @@ impl ConnectionHandler for Handler {
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
ReadyUpgrade<StreamProtocol>,
(),
Result<Duration, Failure>,
Self::Error,
>,
> {
) -> Poll<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), Result<Duration, Failure>>>
{
match self.state {
State::Inactive { reported: true } => {
return Poll::Pending; // nothing to do on this connection
Expand Down
9 changes: 1 addition & 8 deletions protocols/relay/src/behaviour/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ pub struct Handler {
<Self as ConnectionHandler>::OutboundProtocol,
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::ToBehaviour,
<Self as ConnectionHandler>::Error,
>,
>,

Expand Down Expand Up @@ -482,7 +481,6 @@ type Futures<T> = FuturesUnordered<BoxFuture<'static, T>>;
impl ConnectionHandler for Handler {
type FromBehaviour = In;
type ToBehaviour = Event;
type Error = void::Void;
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
type InboundOpenInfo = ();
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
Expand Down Expand Up @@ -593,12 +591,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
// Return queued events.
if let Some(event) = self.queued_events.pop_front() {
Expand Down
9 changes: 1 addition & 8 deletions protocols/relay/src/priv_client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ pub struct Handler {
<Handler as ConnectionHandler>::OutboundProtocol,
<Handler as ConnectionHandler>::OutboundOpenInfo,
<Handler as ConnectionHandler>::ToBehaviour,
<Handler as ConnectionHandler>::Error,
>,
>,

Expand Down Expand Up @@ -230,7 +229,6 @@ impl Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = In;
type ToBehaviour = Event;
type Error = void::Void;
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
type InboundOpenInfo = ();
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
Expand Down Expand Up @@ -275,12 +273,7 @@ impl ConnectionHandler for Handler {
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
loop {
debug_assert_eq!(
Expand Down
4 changes: 1 addition & 3 deletions protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ where
{
type FromBehaviour = OutboundMessage<TCodec>;
type ToBehaviour = Event<TCodec>;
type Error = void::Void;
type InboundProtocol = Protocol<TCodec::Protocol>;
type OutboundProtocol = Protocol<TCodec::Protocol>;
type OutboundOpenInfo = ();
Expand All @@ -390,8 +389,7 @@ where
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<ConnectionHandlerEvent<Protocol<TCodec::Protocol>, (), Self::ToBehaviour, Self::Error>>
{
) -> Poll<ConnectionHandlerEvent<Protocol<TCodec::Protocol>, (), Self::ToBehaviour>> {
match self.worker_streams.poll_unpin(cx) {
Poll::Ready((_, Ok(Ok(event)))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
Expand Down
Loading

0 comments on commit 0ef6feb

Please sign in to comment.