Skip to content

Commit

Permalink
client: add Subscription::close_reason (#1320)
Browse files Browse the repository at this point in the history
* client subscription: don't close subscription lagg

* refactor(client subscription): don't drop on lagg.

* add client subscription example

* Update core/src/client/mod.rs

* Update examples/examples/client_subscription_drop_oldest_item.rs

* address grumbles

* Update tests/tests/integration_tests.rs

* remove async-broadcast dep

* revert subscription tests

* Update core/src/client/mod.rs
  • Loading branch information
niklasad1 authored Mar 15, 2024
1 parent 8a4b665 commit d40521b
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 99 deletions.
16 changes: 11 additions & 5 deletions client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
use crate::types::error::{ErrorCode, ErrorObject};
use crate::WsClientBuilder;

use jsonrpsee_core::client::{BatchResponse, ClientT, Error, IdKind, Subscription, SubscriptionClientT};
use jsonrpsee_core::client::{
BatchResponse, ClientT, Error, IdKind, Subscription, SubscriptionClientT, SubscriptionCloseReason,
};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::{rpc_params, DeserializeOwned};
use jsonrpsee_test_utils::helpers::*;
Expand Down Expand Up @@ -190,7 +192,7 @@ async fn notification_handler_works() {
}

#[tokio::test]
async fn notification_without_polling_doesnt_make_client_unuseable() {
async fn notification_close_on_lagging() {
init_logger();

let server = WebSocketTestServer::with_hardcoded_notification(
Expand All @@ -212,14 +214,18 @@ async fn notification_without_polling_doesnt_make_client_unuseable() {
let mut nh: Subscription<String> =
client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap();

// don't poll the notification stream for 2 seconds, should be full now.
// Don't poll the notification stream for 2 seconds, should be full now.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

// Lagged
assert!(matches!(nh.close_reason(), Some(SubscriptionCloseReason::Lagged)));

// Drain the subscription.
for _ in 0..4 {
assert!(nh.next().with_default_timeout().await.unwrap().unwrap().is_ok());
assert!(nh.next().with_default_timeout().await.unwrap().is_some());
}

// NOTE: this is now unuseable and unregistered.
// It should be dropped when lagging.
assert!(nh.next().with_default_timeout().await.unwrap().is_none());

// The same subscription should be possible to register again.
Expand Down
39 changes: 21 additions & 18 deletions core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::client::async_client::LOG_TARGET;
use crate::client::async_client::manager::{RequestManager, RequestStatus};
use crate::client::{RequestMessage, TransportSenderT, Error};
use crate::client::async_client::LOG_TARGET;
use crate::client::{subscription_channel, Error, RequestMessage, TransportSenderT, TrySubscriptionSendError};
use crate::params::ArrayParams;
use crate::traits::ToRpcParams;

use futures_timer::Delay;
use futures_util::future::{self, Either};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;

use jsonrpsee_types::response::SubscriptionError;
use jsonrpsee_types::{
Expand Down Expand Up @@ -90,33 +90,33 @@ pub(crate) fn process_batch_response(

/// Attempts to process a subscription response.
///
/// Returns `Ok()` if the response was successfully sent to the frontend.
/// Return `Err(None)` if the subscription was not found.
/// Returns `Err(Some(msg))` if the channel to the `Subscription` was full.
/// Returns `Some(sub_id)` if the subscription should be closed otherwise
/// `None` is returned.
pub(crate) fn process_subscription_response(
manager: &mut RequestManager,
response: SubscriptionResponse<JsonValue>,
) -> Result<(), Option<SubscriptionId<'static>>> {
) -> Option<SubscriptionId<'static>> {
let sub_id = response.params.subscription.into_owned();
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Some(request_id) => request_id,
None => {
tracing::debug!(target: LOG_TARGET, "Subscription {:?} is not active", sub_id);
return Err(None);
return None;
}
};

match manager.as_subscription_mut(&request_id) {
Some(send_back_sink) => match send_back_sink.try_send(response.params.result) {
Ok(()) => Ok(()),
Err(err) => {
tracing::error!(target: LOG_TARGET, "Dropping subscription {:?} error: {:?}", sub_id, err);
Err(Some(sub_id))
Some(send_back_sink) => match send_back_sink.send(response.params.result) {
Ok(_) => None,
Err(TrySubscriptionSendError::Closed) => Some(sub_id),
Err(TrySubscriptionSendError::TooSlow(m)) => {
tracing::error!(target: LOG_TARGET, "Subscription {{method={}, sub_id={:?}}} couldn't keep up with server; failed to send {m}", response.method, sub_id);
Some(sub_id)
}
},
None => {
tracing::debug!(target: LOG_TARGET, "Subscription {:?} is not active", sub_id);
Err(None)
None
}
}
}
Expand Down Expand Up @@ -150,10 +150,13 @@ pub(crate) fn process_subscription_close_response(
/// It's possible that user close down the subscription before this notification is received.
pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notification<JsonValue>) {
match manager.as_notification_handler_mut(notif.method.to_string()) {
Some(send_back_sink) => match send_back_sink.try_send(notif.params) {
Some(send_back_sink) => match send_back_sink.send(notif.params) {
Ok(()) => (),
Err(err) => {
tracing::warn!(target: LOG_TARGET, "Could not send notification, dropping handler for {:?} error: {:?}", notif.method, err);
Err(TrySubscriptionSendError::Closed) => {
let _ = manager.remove_notification_handler(&notif.method);
}
Err(TrySubscriptionSendError::TooSlow(m)) => {
tracing::error!(target: LOG_TARGET, "Notification `{}` couldn't keep up with server; failed to send {m}", notif.method);
let _ = manager.remove_notification_handler(&notif.method);
}
},
Expand Down Expand Up @@ -206,7 +209,7 @@ pub(crate) fn process_single_response(
}
};

let (subscribe_tx, subscribe_rx) = mpsc::channel(max_capacity_per_subscription);
let (subscribe_tx, subscribe_rx) = subscription_channel(max_capacity_per_subscription);
if manager
.insert_subscription(response_id.clone(), unsub_id, sub_id.clone(), subscribe_tx, unsubscribe_method)
.is_ok()
Expand Down
50 changes: 26 additions & 24 deletions core/src/client/async_client/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ use std::{
};

use crate::{
client::BatchEntry,
client::Error, error::RegisterMethodError,
client::{BatchEntry, Error, SubscriptionReceiver, SubscriptionSender},
error::RegisterMethodError,
};
use jsonrpsee_types::{Id, SubscriptionId};
use rustc_hash::FxHashMap;
use serde_json::value::Value as JsonValue;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;

#[derive(Debug)]
enum Kind {
Expand All @@ -68,8 +68,8 @@ pub(crate) enum RequestStatus {

type PendingCallOneshot = Option<oneshot::Sender<Result<JsonValue, Error>>>;
type PendingBatchOneshot = oneshot::Sender<Result<Vec<BatchEntry<'static, JsonValue>>, Error>>;
type PendingSubscriptionOneshot = oneshot::Sender<Result<(mpsc::Receiver<JsonValue>, SubscriptionId<'static>), Error>>;
type SubscriptionSink = mpsc::Sender<JsonValue>;
type PendingSubscriptionOneshot = oneshot::Sender<Result<(SubscriptionReceiver, SubscriptionId<'static>), Error>>;
type SubscriptionSink = SubscriptionSender;
type UnsubscribeMethod = String;
type RequestId = Id<'static>;

Expand Down Expand Up @@ -339,10 +339,12 @@ impl RequestManager {

#[cfg(test)]
mod tests {
use crate::client::subscription_channel;

use super::{Error, RequestManager};
use jsonrpsee_types::{Id, SubscriptionId};
use serde_json::Value as JsonValue;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;

#[test]
fn insert_remove_pending_request_works() {
Expand All @@ -355,8 +357,8 @@ mod tests {

#[test]
fn insert_remove_subscription_works() {
let (pending_sub_tx, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (sub_tx, _) = mpsc::channel::<JsonValue>(1);
let (pending_sub_tx, _) = oneshot::channel();
let (sub_tx, _) = subscription_channel(1);
let mut manager = RequestManager::new();
assert!(manager
.insert_pending_subscription(Id::Number(1), Id::Number(2), pending_sub_tx, "unsubscribe_method".into())
Expand All @@ -382,10 +384,10 @@ mod tests {

#[test]
fn insert_subscription_with_same_sub_and_unsub_id_should_err() {
let (tx1, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (tx2, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (tx3, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (tx4, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (tx1, _) = oneshot::channel();
let (tx2, _) = oneshot::channel();
let (tx3, _) = oneshot::channel();
let (tx4, _) = oneshot::channel();
let mut manager = RequestManager::new();
assert!(manager
.insert_pending_subscription(Id::Str("1".into()), Id::Str("1".into()), tx1, "unsubscribe_method".into())
Expand Down Expand Up @@ -419,10 +421,10 @@ mod tests {

#[test]
fn pending_method_call_faulty() {
let (request_tx1, _) = oneshot::channel::<Result<JsonValue, Error>>();
let (request_tx2, _) = oneshot::channel::<Result<JsonValue, Error>>();
let (pending_sub_tx, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (sub_tx, _) = mpsc::channel::<JsonValue>(1);
let (request_tx1, _) = oneshot::channel();
let (request_tx2, _) = oneshot::channel();
let (pending_sub_tx, _) = oneshot::channel();
let (sub_tx, _) = subscription_channel(1);

let mut manager = RequestManager::new();
assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx1)).is_ok());
Expand All @@ -447,10 +449,10 @@ mod tests {

#[test]
fn pending_subscription_faulty() {
let (request_tx, _) = oneshot::channel::<Result<JsonValue, Error>>();
let (pending_sub_tx1, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (pending_sub_tx2, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (sub_tx, _) = mpsc::channel::<JsonValue>(1);
let (request_tx, _) = oneshot::channel();
let (pending_sub_tx1, _) = oneshot::channel();
let (pending_sub_tx2, _) = oneshot::channel();
let (sub_tx, _) = subscription_channel(1);

let mut manager = RequestManager::new();
assert!(manager
Expand Down Expand Up @@ -478,10 +480,10 @@ mod tests {

#[test]
fn active_subscriptions_faulty() {
let (request_tx, _) = oneshot::channel::<Result<JsonValue, Error>>();
let (pending_sub_tx, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (sub_tx1, _) = mpsc::channel::<JsonValue>(1);
let (sub_tx2, _) = mpsc::channel::<JsonValue>(1);
let (request_tx, _) = oneshot::channel();
let (pending_sub_tx, _) = oneshot::channel();
let (sub_tx1, _) = subscription_channel(1);
let (sub_tx2, _) = subscription_channel(1);

let mut manager = RequestManager::new();

Expand Down
10 changes: 5 additions & 5 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use tracing::instrument;

use self::utils::{InactivityCheck, IntervalStream};

use super::{generate_batch_id_range, FrontToBack, IdKind, RequestIdManager};
use super::{generate_batch_id_range, subscription_channel, FrontToBack, IdKind, RequestIdManager};

const LOG_TARGET: &str = "jsonrpsee-client";

Expand Down Expand Up @@ -707,13 +707,13 @@ impl SubscriptionClientT for Client {

let res = call_with_timeout(self.request_timeout, send_back_rx).await;

let (notifs_rx, method) = match res {
let (rx, method) = match res {
Ok(Ok(val)) => val,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.disconnect_reason().await),
};

Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Method(method)))
Ok(Subscription::new(self.to_back.clone(), rx, SubscriptionKind::Method(method)))
}
}

Expand Down Expand Up @@ -746,7 +746,7 @@ fn handle_backend_messages<R: TransportReceiverT>(
}
// Subscription response.
else if let Ok(response) = serde_json::from_slice::<SubscriptionResponse<_>>(raw) {
if let Err(Some(sub_id)) = process_subscription_response(&mut manager.lock(), response) {
if let Some(sub_id) = process_subscription_response(&mut manager.lock(), response) {
return Ok(Some(FrontToBack::SubscriptionClosed(sub_id)));
}
}
Expand Down Expand Up @@ -896,7 +896,7 @@ async fn handle_frontend_messages<S: TransportSenderT>(
}
// User called `register_notification` on the front-end.
FrontToBack::RegisterNotification(reg) => {
let (subscribe_tx, subscribe_rx) = mpsc::channel(max_buffer_capacity_per_subscription);
let (subscribe_tx, subscribe_rx) = subscription_channel(max_buffer_capacity_per_subscription);

if manager.lock().insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
Expand Down
Loading

0 comments on commit d40521b

Please sign in to comment.