From 003bb0b4267a482d11f62892eb413f77b5d68ca6 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 5 Jul 2024 15:26:40 +0300 Subject: [PATCH 1/8] websocket/framed: Introduce connection sender state to avoid polling after errors Signed-off-by: Alexandru Vasile --- transports/websocket/src/framed.rs | 85 +++++++++++++++++++++++++----- 1 file changed, 71 insertions(+), 14 deletions(-) diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index f6f99d18580..01a8c6afcf0 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -571,10 +571,22 @@ fn location_to_multiaddr(location: &str) -> Result> { /// The websocket connection. pub struct Connection { receiver: BoxStream<'static, Result>, - sender: Pin + Send>>, + sender: ConnectionSender, _marker: std::marker::PhantomData, } +/// Keep track of the connection state from the sender's perspective. +enum ConnectionSender { + /// The sender must not be polled again after it returns `Poll::Ready(Err)`. + /// + /// When the sender enters the error state + Alive { + sender: Pin + Send>>, + }, + /// The sender has returned an error and must not be polled again. + Dead, +} + /// Data or control information received over the websocket connection. #[derive(Debug, Clone)] pub enum Incoming { @@ -703,7 +715,9 @@ where }); Connection { receiver: stream.boxed(), - sender: Box::pin(sink), + sender: ConnectionSender::Alive { + sender: Box::pin(sink), + }, _marker: std::marker::PhantomData, } } @@ -744,26 +758,69 @@ where type Error = io::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.sender) - .poll_ready(cx) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + match &mut self.sender { + ConnectionSender::Alive { sender } => match Pin::new(sender).poll_ready(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => { + self.sender = ConnectionSender::Dead; + Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))) + } + Poll::Pending => Poll::Pending, + }, + ConnectionSender::Dead => Poll::Ready(Err(io::Error::new( + io::ErrorKind::Other, + "Connection sender is dead, poll_ready called after an error", + ))), + } } fn start_send(mut self: Pin<&mut Self>, item: OutgoingData) -> io::Result<()> { - Pin::new(&mut self.sender) - .start_send(item) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + match &mut self.sender { + ConnectionSender::Alive { sender } => match Pin::new(sender).start_send(item) { + Ok(()) => Ok(()), + Err(e) => { + self.sender = ConnectionSender::Dead; + Err(io::Error::new(io::ErrorKind::Other, e)) + } + }, + ConnectionSender::Dead => Err(io::Error::new( + io::ErrorKind::Other, + "Connection sender is dead, start_send called after an error", + )), + } } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.sender) - .poll_flush(cx) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + match &mut self.sender { + ConnectionSender::Alive { sender } => match Pin::new(sender).poll_flush(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => { + self.sender = ConnectionSender::Dead; + Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))) + } + Poll::Pending => Poll::Pending, + }, + ConnectionSender::Dead => Poll::Ready(Err(io::Error::new( + io::ErrorKind::Other, + "Connection sender is dead, poll_flush called after an error", + ))), + } } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.sender) - .poll_close(cx) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + match &mut self.sender { + ConnectionSender::Alive { sender } => match Pin::new(sender).poll_close(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => { + self.sender = ConnectionSender::Dead; + Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))) + } + Poll::Pending => Poll::Pending, + }, + ConnectionSender::Dead => Poll::Ready(Err(io::Error::new( + io::ErrorKind::Other, + "Connection sender is dead, poll_close called after an error", + ))), + } } } From aaabdfa20e11d838f57a29193266cfea5f78c4ac Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 8 Jul 2024 13:25:52 +0300 Subject: [PATCH 2/8] websocket: Update changelog Signed-off-by: Alexandru Vasile --- transports/websocket/CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/transports/websocket/CHANGELOG.md b/transports/websocket/CHANGELOG.md index d206cbac6a1..419ff41c6fc 100644 --- a/transports/websocket/CHANGELOG.md +++ b/transports/websocket/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.43.2 + +- fix: Avoid websocket panic on polling after errors. See [PR 5482]. + +[PR 5482]: https://github.com/libp2p/rust-libp2p/pull/5482 + ## 0.43.1 ## 0.43.0 From c57f795a486de3e609702a289a4234a5b36a6903 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 8 Jul 2024 13:26:24 +0300 Subject: [PATCH 3/8] cargo: Bump libp2p-websocket to 0.43.2 Signed-off-by: Alexandru Vasile --- Cargo.lock | 2 +- transports/websocket/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d45d51ac4b..df8b1092d46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3489,7 +3489,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" -version = "0.43.1" +version = "0.43.2" dependencies = [ "async-std", "either", diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index b022d95ca47..82b9ada9236 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-websocket" edition = "2021" rust-version = { workspace = true } description = "WebSocket transport for libp2p" -version = "0.43.1" +version = "0.43.2" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From a43bde9954492348c36bf098b62c4570fafc9d76 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 8 Jul 2024 13:28:10 +0300 Subject: [PATCH 4/8] cargo: Update libp2p-websocket in cargo.toml Signed-off-by: Alexandru Vasile --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ab660cc90e9..1b9ca2c6f09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,7 +112,7 @@ libp2p-upnp = { version = "0.2.2", path = "protocols/upnp" } libp2p-webrtc = { version = "0.7.1-alpha", path = "transports/webrtc" } libp2p-webrtc-utils = { version = "0.2.1", path = "misc/webrtc-utils" } libp2p-webrtc-websys = { version = "0.3.0-alpha", path = "transports/webrtc-websys" } -libp2p-websocket = { version = "0.43.1", path = "transports/websocket" } +libp2p-websocket = { version = "0.43.2", path = "transports/websocket" } libp2p-websocket-websys = { version = "0.3.2", path = "transports/websocket-websys" } libp2p-webtransport-websys = { version = "0.3.0", path = "transports/webtransport-websys" } libp2p-yamux = { version = "0.45.2", path = "muxers/yamux" } From d50efd017891d59aece8fc60ed535bb65b5b5073 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 8 Jul 2024 13:37:16 +0300 Subject: [PATCH 5/8] websocket: Polish comment Signed-off-by: Alexandru Vasile --- transports/websocket/src/framed.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 01a8c6afcf0..06a4ae9ae30 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -577,9 +577,7 @@ pub struct Connection { /// Keep track of the connection state from the sender's perspective. enum ConnectionSender { - /// The sender must not be polled again after it returns `Poll::Ready(Err)`. - /// - /// When the sender enters the error state + /// The sender is alive and can be polled. Alive { sender: Pin + Send>>, }, From f49ad416274e2779c8873c6ccf24c0bb7c556f3f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 9 Jul 2024 20:07:54 +0300 Subject: [PATCH 6/8] Revert "websocket/framed: Introduce connection sender state to avoid polling" This reverts commit 003bb0b4267a482d11f62892eb413f77b5d68ca6. Signed-off-by: Alexandru Vasile --- transports/websocket/src/framed.rs | 83 +++++------------------------- 1 file changed, 14 insertions(+), 69 deletions(-) diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 06a4ae9ae30..f6f99d18580 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -571,20 +571,10 @@ fn location_to_multiaddr(location: &str) -> Result> { /// The websocket connection. pub struct Connection { receiver: BoxStream<'static, Result>, - sender: ConnectionSender, + sender: Pin + Send>>, _marker: std::marker::PhantomData, } -/// Keep track of the connection state from the sender's perspective. -enum ConnectionSender { - /// The sender is alive and can be polled. - Alive { - sender: Pin + Send>>, - }, - /// The sender has returned an error and must not be polled again. - Dead, -} - /// Data or control information received over the websocket connection. #[derive(Debug, Clone)] pub enum Incoming { @@ -713,9 +703,7 @@ where }); Connection { receiver: stream.boxed(), - sender: ConnectionSender::Alive { - sender: Box::pin(sink), - }, + sender: Box::pin(sink), _marker: std::marker::PhantomData, } } @@ -756,69 +744,26 @@ where type Error = io::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match &mut self.sender { - ConnectionSender::Alive { sender } => match Pin::new(sender).poll_ready(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(e)) => { - self.sender = ConnectionSender::Dead; - Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))) - } - Poll::Pending => Poll::Pending, - }, - ConnectionSender::Dead => Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - "Connection sender is dead, poll_ready called after an error", - ))), - } + Pin::new(&mut self.sender) + .poll_ready(cx) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } fn start_send(mut self: Pin<&mut Self>, item: OutgoingData) -> io::Result<()> { - match &mut self.sender { - ConnectionSender::Alive { sender } => match Pin::new(sender).start_send(item) { - Ok(()) => Ok(()), - Err(e) => { - self.sender = ConnectionSender::Dead; - Err(io::Error::new(io::ErrorKind::Other, e)) - } - }, - ConnectionSender::Dead => Err(io::Error::new( - io::ErrorKind::Other, - "Connection sender is dead, start_send called after an error", - )), - } + Pin::new(&mut self.sender) + .start_send(item) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match &mut self.sender { - ConnectionSender::Alive { sender } => match Pin::new(sender).poll_flush(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(e)) => { - self.sender = ConnectionSender::Dead; - Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))) - } - Poll::Pending => Poll::Pending, - }, - ConnectionSender::Dead => Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - "Connection sender is dead, poll_flush called after an error", - ))), - } + Pin::new(&mut self.sender) + .poll_flush(cx) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match &mut self.sender { - ConnectionSender::Alive { sender } => match Pin::new(sender).poll_close(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(e)) => { - self.sender = ConnectionSender::Dead; - Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))) - } - Poll::Pending => Poll::Pending, - }, - ConnectionSender::Dead => Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - "Connection sender is dead, poll_close called after an error", - ))), - } + Pin::new(&mut self.sender) + .poll_close(cx) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } } From b248e52ca66a4cfe1057b3b350fdc3a364d0cac6 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 9 Jul 2024 20:14:33 +0300 Subject: [PATCH 7/8] Apply suggestion patch Signed-off-by: Alexandru Vasile --- Cargo.lock | 1 + transports/websocket/Cargo.toml | 1 + transports/websocket/src/framed.rs | 2 +- transports/websocket/src/quicksink.rs | 45 +++++++++++++-------------- 4 files changed, 25 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df8b1092d46..3a2381000c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3504,6 +3504,7 @@ dependencies = [ "rcgen", "rw-stream-sink", "soketto", + "thiserror", "tracing", "url", "webpki-roots 0.25.2", diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index 82b9ada9236..f1b0a413115 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -21,6 +21,7 @@ pin-project-lite = "0.2.14" rw-stream-sink = { workspace = true } soketto = "0.8.0" tracing = { workspace = true } +thiserror = "1.0.61" url = "2.5" webpki-roots = "0.25" diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index f6f99d18580..69a01fdbd46 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -571,7 +571,7 @@ fn location_to_multiaddr(location: &str) -> Result> { /// The websocket connection. pub struct Connection { receiver: BoxStream<'static, Result>, - sender: Pin + Send>>, + sender: Pin> + Send>>, _marker: std::marker::PhantomData, } diff --git a/transports/websocket/src/quicksink.rs b/transports/websocket/src/quicksink.rs index cb2c98b078f..8c674a9ddf7 100644 --- a/transports/websocket/src/quicksink.rs +++ b/transports/websocket/src/quicksink.rs @@ -30,14 +30,6 @@ // Ok::<_, io::Error>(stdout) // }); // ``` -// -// # Panics -// -// - If any of the [`Sink`] methods produce an error, the sink transitions -// to a failure state and none of its methods must be called afterwards or -// else a panic will occur. -// - If [`Sink::poll_close`] has been called, no other sink method must be -// called afterwards or else a panic will be caused. use futures::{ready, sink::Sink}; use pin_project_lite::pin_project; @@ -102,6 +94,15 @@ enum State { Failed, } +/// Errors the `Sink` may return. +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + #[error("Error while sending over the sink, {0}")] + Send(E), + #[error("The Sink has closed")] + Closed, +} + pin_project! { /// `SinkImpl` implements the `Sink` trait. #[derive(Debug)] @@ -119,7 +120,7 @@ where F: FnMut(S, Action) -> T, T: Future>, { - type Error = E; + type Error = Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); @@ -135,7 +136,7 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - Poll::Ready(Err(e)) + Poll::Ready(Err(Error::Send(e))) } } } @@ -143,20 +144,19 @@ where Ok(_) => { this.future.set(None); *this.state = State::Closed; - panic!("SinkImpl::poll_ready called on a closing sink.") + Poll::Ready(Err(Error::Closed)) } Err(e) => { this.future.set(None); *this.state = State::Failed; - Poll::Ready(Err(e)) + Poll::Ready(Err(Error::Send(e))) } }, State::Empty => { assert!(this.param.is_some()); Poll::Ready(Ok(())) } - State::Closed => panic!("SinkImpl::poll_ready called on a closed sink."), - State::Failed => panic!("SinkImpl::poll_ready called after error."), + State::Closed | State::Failed => Poll::Ready(Err(Error::Closed)), } } @@ -193,7 +193,7 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - return Poll::Ready(Err(e)); + return Poll::Ready(Err(Error::Send(e))); } }, State::Flushing => { @@ -207,7 +207,7 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - return Poll::Ready(Err(e)); + return Poll::Ready(Err(Error::Send(e))); } } } @@ -221,11 +221,10 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - return Poll::Ready(Err(e)); + return Poll::Ready(Err(Error::Send(e))); } }, - State::Closed => return Poll::Ready(Ok(())), - State::Failed => panic!("SinkImpl::poll_flush called after error."), + State::Closed | State::Failed => return Poll::Ready(Err(Error::Closed)), } } } @@ -253,7 +252,7 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - return Poll::Ready(Err(e)); + return Poll::Ready(Err(Error::Send(e))); } }, State::Flushing => { @@ -266,7 +265,7 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - return Poll::Ready(Err(e)); + return Poll::Ready(Err(Error::Send(e))); } } } @@ -280,11 +279,11 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - return Poll::Ready(Err(e)); + return Poll::Ready(Err(Error::Send(e))); } }, State::Closed => return Poll::Ready(Ok(())), - State::Failed => panic!("SinkImpl::poll_closed called after error."), + State::Failed => return Poll::Ready(Err(Error::Closed)), } } } From 30893a9db61d4e836b5058b4c77cb5f7690fb793 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 9 Jul 2024 20:28:06 +0300 Subject: [PATCH 8/8] quicksink/tests: Add test to guard against panics Signed-off-by: Alexandru Vasile --- transports/websocket/src/quicksink.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/transports/websocket/src/quicksink.rs b/transports/websocket/src/quicksink.rs index 8c674a9ddf7..4f620536ea1 100644 --- a/transports/websocket/src/quicksink.rs +++ b/transports/websocket/src/quicksink.rs @@ -346,4 +346,31 @@ mod tests { assert_eq!(&expected[..], &actual[..]) }); } + + #[test] + fn error_does_not_panic() { + task::block_on(async { + let sink = make_sink(io::stdout(), |mut _stdout, _action| async move { + Err(io::Error::new(io::ErrorKind::Other, "oh no")) + }); + + futures::pin_mut!(sink); + + let result = sink.send("hello").await; + match result { + Err(crate::quicksink::Error::Send(e)) => { + assert_eq!(e.kind(), io::ErrorKind::Other); + assert_eq!(e.to_string(), "oh no") + } + _ => panic!("unexpected result: {:?}", result), + }; + + // Call send again, expect not to panic. + let result = sink.send("hello").await; + match result { + Err(crate::quicksink::Error::Closed) => {} + _ => panic!("unexpected result: {:?}", result), + }; + }) + } }