From 64215300ddb31265d6d6f12748263ab7623d4bc7 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 3 Jun 2022 11:16:45 +0300 Subject: [PATCH] ws-server: Submit ping regardless of WS messages (#788) * ws-server: Submit ping regardless of WS messages Signed-off-by: Alexandru Vasile * use tokio_stream::IntervalStream for less boxing Co-authored-by: Niklas Adolfsson --- ws-server/Cargo.toml | 1 + ws-server/src/server.rs | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/ws-server/Cargo.toml b/ws-server/Cargo.toml index 29368c5b94..091801f7d5 100644 --- a/ws-server/Cargo.toml +++ b/ws-server/Cargo.toml @@ -19,6 +19,7 @@ serde_json = { version = "1", features = ["raw_value"] } soketto = "0.7.1" tokio = { version = "1.16", features = ["net", "rt-multi-thread", "macros", "time"] } tokio-util = { version = "0.7", features = ["compat"] } +tokio-stream = "0.1.7" [dev-dependencies] anyhow = "1" diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 961a86f828..ba935b6735 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -51,6 +51,7 @@ use soketto::data::ByteSlice125; use soketto::handshake::{server::Response, Server as SokettoServer}; use soketto::Sender; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; +use tokio_stream::wrappers::IntervalStream; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; /// Default maximum connections allowed. @@ -320,20 +321,23 @@ async fn background_task( // Received messages from the WebSocket. let mut rx_item = rx.next(); - while !stop_server2.shutdown_requested() { - let submit_ping = tokio::time::sleep(ping_interval); - tokio::pin!(submit_ping); + // Interval to send out continuously `pings`. + let ping_interval = IntervalStream::new(tokio::time::interval(ping_interval)); + tokio::pin!(ping_interval); + let mut next_ping = ping_interval.next(); + while !stop_server2.shutdown_requested() { // Ensure select is cancel-safe by fetching and storing the `rx_item` that did not finish yet. // Note: Although, this is cancel-safe already, avoid using `select!` macro for future proofing. - match futures_util::future::select(rx_item, submit_ping).await { - Either::Left((Some(response), _)) => { + match futures_util::future::select(rx_item, next_ping).await { + Either::Left((Some(response), ping)) => { // If websocket message send fail then terminate the connection. if let Err(err) = send_ws_message(&mut sender, response).await { tracing::warn!("WS send error: {}; terminate connection", err); break; } rx_item = rx.next(); + next_ping = ping; } // Nothing else to receive. Either::Left((None, _)) => break, @@ -345,6 +349,7 @@ async fn background_task( break; } rx_item = next_rx; + next_ping = ping_interval.next(); } } }