Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ping-pong for WebSocket server #782

Merged
merged 8 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ws-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ documentation = "https://docs.rs/jsonrpsee-ws-server"
[dependencies]
futures-channel = "0.3.14"
futures-util = { version = "0.3.14", default-features = false, features = ["io", "async-await-macro"] }
futures-timer = { version = "3" }
jsonrpsee-types = { path = "../types", version = "0.13.1" }
jsonrpsee-core = { path = "../core", version = "0.13.1", features = ["server", "soketto"] }
tracing = "0.1"
Expand Down
83 changes: 74 additions & 9 deletions ws-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use crate::future::{FutureDriver, ServerHandle, StopMonitor};
use crate::types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use crate::types::{Id, Request};
use futures_channel::mpsc;
use futures_util::future::{join_all, FutureExt};
use futures_timer::Delay;
use futures_util::future::{join_all, Either, FutureExt};
use futures_util::io::{BufReader, BufWriter};
use futures_util::stream::StreamExt;
use jsonrpsee_core::id_providers::RandomIntegerIdProvider;
Expand All @@ -46,6 +48,7 @@ use jsonrpsee_core::traits::IdProvider;
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::Params;
use soketto::connection::Error as SokettoError;
use soketto::data::ByteSlice125;
use soketto::handshake::{server::Response, Server as SokettoServer};
use soketto::Sender;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
Expand Down Expand Up @@ -275,6 +278,7 @@ where
stop_monitor.clone(),
middleware,
id_provider,
cfg.ping_interval,
))
.await;

Expand All @@ -298,6 +302,7 @@ async fn background_task(
stop_server: StopMonitor,
middleware: impl Middleware,
id_provider: Arc<dyn IdProvider>,
ping_interval: Duration,
) -> Result<(), Error> {
// And we can finally transition to a websocket background_task.
let mut builder = server.into_builder();
Expand All @@ -313,15 +318,32 @@ async fn background_task(

// Send results back to the client.
tokio::spawn(async move {
// Received messages from the WebSocket.
let mut rx_item = rx.next();

while !stop_server2.shutdown_requested() {
if let Some(response) = rx.next().await {
// If websocket message send fail then terminate the connection.
if let Err(err) = send_ws_message(&mut sender, response).await {
tracing::warn!("WS send error: {}; terminate connection", err);
break;
// Ensure select is cancel-safe by fetching and storing the `rx_item` that did not finish yet.
// Note: Although, this is cancel-safe already, avoid using `select!` macro for future proofing.
match futures_util::future::select(rx_item, Delay::new(ping_interval)).await {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
Either::Left((Some(response), _)) => {
// If websocket message send fail then terminate the connection.
if let Err(err) = send_ws_message(&mut sender, response).await {
tracing::warn!("WS send error: {}; terminate connection", err);
break;
}
rx_item = rx.next();
}
// Nothing else to receive.
Either::Left((None, _)) => break,

// Handle timer intervals.
Either::Right((_, next_rx)) => {
if let Err(err) = send_ws_ping(&mut sender).await {
tracing::warn!("WS send ping error: {}; terminate connection", err);
break;
}
rx_item = next_rx;
}
} else {
break;
}
}

Expand All @@ -342,7 +364,16 @@ async fn background_task(

{
// Need the extra scope to drop this pinned future and reclaim access to `data`
let receive = receiver.receive_data(&mut data);
let receive = async {
// Identical loop to `soketto::receive_data` with debug logs for `Pong` frames.
loop {
match receiver.receive(&mut data).await? {
soketto::Incoming::Data(d) => break Ok(d),
soketto::Incoming::Pong(_) => tracing::debug!("recv pong"),
_ => continue,
}
}
};

tokio::pin!(receive);

Expand Down Expand Up @@ -668,6 +699,8 @@ struct Settings {
batch_requests_supported: bool,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
/// The interval at which `Ping` frames are submitted.
ping_interval: Duration,
}

impl Default for Settings {
Expand All @@ -681,6 +714,7 @@ impl Default for Settings {
allowed_origins: AllowedValue::Any,
allowed_hosts: AllowedValue::Any,
tokio_runtime: None,
ping_interval: Duration::from_secs(60),
}
}
}
Expand Down Expand Up @@ -863,6 +897,27 @@ impl<M> Builder<M> {
self
}

/// Configure the interval at which pings are submitted.
///
/// This option is used to keep the connection alive, and is just submitting `Ping` frames,
/// without making any assumptions about when a `Pong` frame should be received.
///
/// Default: 60 seconds.
///
/// # Examples
///
/// ```rust
/// use std::time::Duration;
/// use jsonrpsee_ws_server::WsServerBuilder;
///
/// // Set the ping interval to 10 seconds.
/// let builder = WsServerBuilder::default().ping_interval(Duration::from_secs(10));
/// ```
pub fn ping_interval(mut self, interval: Duration) -> Self {
self.settings.ping_interval = interval;
self
}

/// Configure custom `subscription ID` provider for the server to use
/// to when getting new subscription calls.
///
Expand Down Expand Up @@ -928,3 +983,13 @@ async fn send_ws_message(
sender.send_text_owned(response).await?;
sender.flush().await.map_err(Into::into)
}

async fn send_ws_ping(sender: &mut Sender<BufReader<BufWriter<Compat<TcpStream>>>>) -> Result<(), Error> {
tracing::debug!("send ping");
// Submit empty slice as "optional" parameter.
let slice: &[u8] = &[];
// Byte slice fails if the provided slice is larger than 125 bytes.
let byte_slice = ByteSlice125::try_from(slice).expect("Empty slice should fit into ByteSlice125");
sender.send_ping(byte_slice).await?;
sender.flush().await.map_err(Into::into)
}