Skip to content

Commit

Permalink
ws-server: Submit ping regardless of WS messages (#788)
Browse files Browse the repository at this point in the history
* ws-server: Submit ping regardless of WS messages

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* use tokio_stream::IntervalStream for less boxing

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
  • Loading branch information
lexnv and niklasad1 authored Jun 3, 2022
1 parent a0813cb commit 6421530
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
1 change: 1 addition & 0 deletions ws-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ serde_json = { version = "1", features = ["raw_value"] }
soketto = "0.7.1"
tokio = { version = "1.16", features = ["net", "rt-multi-thread", "macros", "time"] }
tokio-util = { version = "0.7", features = ["compat"] }
tokio-stream = "0.1.7"

[dev-dependencies]
anyhow = "1"
Expand Down
15 changes: 10 additions & 5 deletions ws-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use soketto::data::ByteSlice125;
use soketto::handshake::{server::Response, Server as SokettoServer};
use soketto::Sender;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
use tokio_stream::wrappers::IntervalStream;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};

/// Default maximum connections allowed.
Expand Down Expand Up @@ -320,20 +321,23 @@ async fn background_task(
// Received messages from the WebSocket.
let mut rx_item = rx.next();

while !stop_server2.shutdown_requested() {
let submit_ping = tokio::time::sleep(ping_interval);
tokio::pin!(submit_ping);
// Interval to send out continuously `pings`.
let ping_interval = IntervalStream::new(tokio::time::interval(ping_interval));
tokio::pin!(ping_interval);
let mut next_ping = ping_interval.next();

while !stop_server2.shutdown_requested() {
// Ensure select is cancel-safe by fetching and storing the `rx_item` that did not finish yet.
// Note: Although, this is cancel-safe already, avoid using `select!` macro for future proofing.
match futures_util::future::select(rx_item, submit_ping).await {
Either::Left((Some(response), _)) => {
match futures_util::future::select(rx_item, next_ping).await {
Either::Left((Some(response), ping)) => {
// If websocket message send fail then terminate the connection.
if let Err(err) = send_ws_message(&mut sender, response).await {
tracing::warn!("WS send error: {}; terminate connection", err);
break;
}
rx_item = rx.next();
next_ping = ping;
}
// Nothing else to receive.
Either::Left((None, _)) => break,
Expand All @@ -345,6 +349,7 @@ async fn background_task(
break;
}
rx_item = next_rx;
next_ping = ping_interval.next();
}
}
}
Expand Down

0 comments on commit 6421530

Please sign in to comment.