Skip to content

Commit

Permalink
Implement ping-pong for WebSocket server (#782)
Browse files Browse the repository at this point in the history
* ws-server: Implement `ping-ping`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* ws-server: Set builder's ping_interval

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* ws-server: Handle just `ping` frames

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* ws-server: Simplify `select`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Use `futures_util::select` instead of `select!` macro

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* ws-server: Avoid pinning the delay

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* ws-server: Log when a `Pong` frame is received

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* ws-server: Use tokio for submitting pings

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv authored May 27, 2022
1 parent 47d36b9 commit 9fe25b1
Showing 1 changed file with 76 additions and 9 deletions.
85 changes: 76 additions & 9 deletions ws-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use crate::future::{FutureDriver, ServerHandle, StopMonitor};
use crate::types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use crate::types::{Id, Request};
use futures_channel::mpsc;
use futures_util::future::{join_all, FutureExt};
use futures_util::future::{join_all, Either, FutureExt};
use futures_util::io::{BufReader, BufWriter};
use futures_util::stream::StreamExt;
use jsonrpsee_core::id_providers::RandomIntegerIdProvider;
Expand All @@ -46,6 +47,7 @@ use jsonrpsee_core::traits::IdProvider;
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::Params;
use soketto::connection::Error as SokettoError;
use soketto::data::ByteSlice125;
use soketto::handshake::{server::Response, Server as SokettoServer};
use soketto::Sender;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
Expand Down Expand Up @@ -275,6 +277,7 @@ where
stop_monitor.clone(),
middleware,
id_provider,
cfg.ping_interval,
))
.await;

Expand All @@ -298,6 +301,7 @@ async fn background_task(
stop_server: StopMonitor,
middleware: impl Middleware,
id_provider: Arc<dyn IdProvider>,
ping_interval: Duration,
) -> Result<(), Error> {
// And we can finally transition to a websocket background_task.
let mut builder = server.into_builder();
Expand All @@ -313,15 +317,35 @@ async fn background_task(

// Send results back to the client.
tokio::spawn(async move {
// Received messages from the WebSocket.
let mut rx_item = rx.next();

while !stop_server2.shutdown_requested() {
if let Some(response) = rx.next().await {
// If websocket message send fail then terminate the connection.
if let Err(err) = send_ws_message(&mut sender, response).await {
tracing::warn!("WS send error: {}; terminate connection", err);
break;
let submit_ping = tokio::time::sleep(ping_interval);
tokio::pin!(submit_ping);

// Ensure select is cancel-safe by fetching and storing the `rx_item` that did not finish yet.
// Note: Although, this is cancel-safe already, avoid using `select!` macro for future proofing.
match futures_util::future::select(rx_item, submit_ping).await {
Either::Left((Some(response), _)) => {
// If websocket message send fail then terminate the connection.
if let Err(err) = send_ws_message(&mut sender, response).await {
tracing::warn!("WS send error: {}; terminate connection", err);
break;
}
rx_item = rx.next();
}
// Nothing else to receive.
Either::Left((None, _)) => break,

// Handle timer intervals.
Either::Right((_, next_rx)) => {
if let Err(err) = send_ws_ping(&mut sender).await {
tracing::warn!("WS send ping error: {}; terminate connection", err);
break;
}
rx_item = next_rx;
}
} else {
break;
}
}

Expand All @@ -342,7 +366,16 @@ async fn background_task(

{
// Need the extra scope to drop this pinned future and reclaim access to `data`
let receive = receiver.receive_data(&mut data);
let receive = async {
// Identical loop to `soketto::receive_data` with debug logs for `Pong` frames.
loop {
match receiver.receive(&mut data).await? {
soketto::Incoming::Data(d) => break Ok(d),
soketto::Incoming::Pong(_) => tracing::debug!("recv pong"),
_ => continue,
}
}
};

tokio::pin!(receive);

Expand Down Expand Up @@ -670,6 +703,8 @@ struct Settings {
batch_requests_supported: bool,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
/// The interval at which `Ping` frames are submitted.
ping_interval: Duration,
}

impl Default for Settings {
Expand All @@ -683,6 +718,7 @@ impl Default for Settings {
allowed_origins: AllowedValue::Any,
allowed_hosts: AllowedValue::Any,
tokio_runtime: None,
ping_interval: Duration::from_secs(60),
}
}
}
Expand Down Expand Up @@ -865,6 +901,27 @@ impl<M> Builder<M> {
self
}

/// Configure the interval at which pings are submitted.
///
/// This option is used to keep the connection alive, and is just submitting `Ping` frames,
/// without making any assumptions about when a `Pong` frame should be received.
///
/// Default: 60 seconds.
///
/// # Examples
///
/// ```rust
/// use std::time::Duration;
/// use jsonrpsee_ws_server::WsServerBuilder;
///
/// // Set the ping interval to 10 seconds.
/// let builder = WsServerBuilder::default().ping_interval(Duration::from_secs(10));
/// ```
pub fn ping_interval(mut self, interval: Duration) -> Self {
self.settings.ping_interval = interval;
self
}

/// Configure custom `subscription ID` provider for the server to use
/// to when getting new subscription calls.
///
Expand Down Expand Up @@ -930,3 +987,13 @@ async fn send_ws_message(
sender.send_text_owned(response).await?;
sender.flush().await.map_err(Into::into)
}

async fn send_ws_ping(sender: &mut Sender<BufReader<BufWriter<Compat<TcpStream>>>>) -> Result<(), Error> {
tracing::debug!("send ping");
// Submit empty slice as "optional" parameter.
let slice: &[u8] = &[];
// Byte slice fails if the provided slice is larger than 125 bytes.
let byte_slice = ByteSlice125::try_from(slice).expect("Empty slice should fit into ByteSlice125");
sender.send_ping(byte_slice).await?;
sender.flush().await.map_err(Into::into)
}

0 comments on commit 9fe25b1

Please sign in to comment.