Skip to content

Commit

Permalink
feat(request-response): don't close connection on stream errors
Browse files Browse the repository at this point in the history
Related: libp2p#3591.

Pull-Request: libp2p#3913.
  • Loading branch information
thomaseizinger authored and alindima committed Jan 9, 2024
1 parent 2d9ae38 commit 85e3025
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 36 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion examples/file-sharing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ either = "1.8"
env_logger = "0.10"
futures = "0.3.28"
libp2p = { path = "../../libp2p", features = ["async-std", "dns", "kad", "noise", "macros", "request-response", "tcp", "websocket", "yamux"] }
multiaddr = { version = "0.17.1" }
multiaddr = { version = "0.17.1" }
void = "1.0.2"
2 changes: 2 additions & 0 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ libp2p-swarm = { version = "0.42.1", path = "../../swarm" }
libp2p-identity = { version = "0.1.0", path = "../../identity" }
rand = "0.8"
smallvec = "1.6.1"
void = "1.0.2"
log = "0.4.17"

[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
Expand Down
43 changes: 8 additions & 35 deletions protocols/request-response/src/handler_priv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use libp2p_swarm::{
use smallvec::SmallVec;
use std::{
collections::VecDeque,
fmt, io,
fmt,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Expand Down Expand Up @@ -71,8 +71,6 @@ where
substream_timeout: Duration,
/// The current connection keep-alive.
keep_alive: KeepAlive,
/// A pending fatal error that results in the connection being closed.
pending_error: Option<ConnectionHandlerUpgrErr<io::Error>>,
/// Queue of events to emit in `poll()`.
pending_events: VecDeque<Event<TCodec>>,
/// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`.
Expand Down Expand Up @@ -113,7 +111,6 @@ where
outbound: VecDeque::new(),
inbound: FuturesUnordered::new(),
pending_events: VecDeque::new(),
pending_error: None,
inbound_request_id,
}
}
Expand Down Expand Up @@ -156,40 +153,22 @@ where
// the remote peer does not support the requested protocol(s).
self.pending_events
.push_back(Event::OutboundUnsupportedProtocols(info));
log::debug!("outbound stream {info} failed: Failed negotiation");
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error);
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => {
log::debug!("outbound stream {info} failed: {e}");
}
_ => {}
}
}
fn on_listen_upgrade_error(
&mut self,
ListenUpgradeError { info, error }: ListenUpgradeError<
ListenUpgradeError { error, info }: ListenUpgradeError<
<Self as ConnectionHandler>::InboundOpenInfo,
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
match error {
ConnectionHandlerUpgrErr::Timeout => {
self.pending_events.push_back(Event::InboundTimeout(info))
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The local peer merely doesn't support the protocol(s) requested.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
// An event is reported to permit user code to react to the fact that
// the local peer does not support the requested protocol(s).
self.pending_events
.push_back(Event::InboundUnsupportedProtocols(info));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error);
}
}
log::debug!("inbound stream {info} failed: {error}");
}
}

Expand Down Expand Up @@ -284,7 +263,7 @@ where
{
type InEvent = RequestProtocol<TCodec>;
type OutEvent = Event<TCodec>;
type Error = ConnectionHandlerUpgrErr<io::Error>;
type Error = void::Void;
type InboundProtocol = ResponseProtocol<TCodec>;
type OutboundProtocol = RequestProtocol<TCodec>;
type OutboundOpenInfo = RequestId;
Expand Down Expand Up @@ -338,12 +317,6 @@ where
cx: &mut Context<'_>,
) -> Poll<ConnectionHandlerEvent<RequestProtocol<TCodec>, RequestId, Self::OutEvent, Self::Error>>
{
// Check for a pending (fatal) error.
if let Some(err) = self.pending_error.take() {
// The handler will not be polled again by the `Swarm`.
return Poll::Ready(ConnectionHandlerEvent::Close(err));
}

// Drain pending events.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(ConnectionHandlerEvent::Custom(event));
Expand Down

0 comments on commit 85e3025

Please sign in to comment.