Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: add Subscription::close_reason #1320

Merged
merged 12 commits into from
Mar 15, 2024
22 changes: 11 additions & 11 deletions client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
use crate::types::error::{ErrorCode, ErrorObject};
use crate::WsClientBuilder;

use jsonrpsee_core::client::{BatchResponse, ClientT, Error, IdKind, Subscription, SubscriptionClientT};
use jsonrpsee_core::client::{
BatchResponse, ClientT, Error, IdKind, Subscription, SubscriptionClientT, SubscriptionCloseReason,
};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::{rpc_params, DeserializeOwned};
use jsonrpsee_test_utils::helpers::*;
Expand Down Expand Up @@ -190,7 +192,7 @@ async fn notification_handler_works() {
}

#[tokio::test]
async fn notification_without_polling_doesnt_make_client_unuseable() {
async fn notification_lagging_works() {
init_logger();

let server = WebSocketTestServer::with_hardcoded_notification(
Expand All @@ -212,22 +214,20 @@ async fn notification_without_polling_doesnt_make_client_unuseable() {
let mut nh: Subscription<String> =
client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap();

// don't poll the notification stream for 2 seconds, should be full now.
// Don't poll the notification stream for 2 seconds, should be full now.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

// Lagged
assert!(matches!(nh.close_reason(), Some(SubscriptionCloseReason::Lagged)));

// Drain the subscription.
for _ in 0..4 {
assert!(nh.next().with_default_timeout().await.unwrap().unwrap().is_ok());
assert!(nh.next().with_default_timeout().await.unwrap().is_some());
}

// NOTE: this is now unuseable and unregistered.
// It should be dropped when lagging.
assert!(nh.next().with_default_timeout().await.unwrap().is_none());

// The same subscription should be possible to register again.
let mut other_nh: Subscription<String> =
client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap();

// check that the new subscription works.
assert!(other_nh.next().with_default_timeout().await.unwrap().unwrap().is_ok());
assert!(client.is_connected());
}

Expand Down
39 changes: 21 additions & 18 deletions core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::client::async_client::LOG_TARGET;
use crate::client::async_client::manager::{RequestManager, RequestStatus};
use crate::client::{RequestMessage, TransportSenderT, Error};
use crate::client::async_client::LOG_TARGET;
use crate::client::{subscription_channel, Error, RequestMessage, TransportSenderT, TrySubscriptionSendError};
use crate::params::ArrayParams;
use crate::traits::ToRpcParams;

use futures_timer::Delay;
use futures_util::future::{self, Either};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;

use jsonrpsee_types::response::SubscriptionError;
use jsonrpsee_types::{
Expand Down Expand Up @@ -90,33 +90,33 @@ pub(crate) fn process_batch_response(

/// Attempts to process a subscription response.
///
/// Returns `Ok()` if the response was successfully sent to the frontend.
/// Return `Err(None)` if the subscription was not found.
/// Returns `Err(Some(msg))` if the channel to the `Subscription` was full.
/// Returns `Some(sub_id)` if the subscription should be closed otherwise
/// `None` is returned.
pub(crate) fn process_subscription_response(
manager: &mut RequestManager,
response: SubscriptionResponse<JsonValue>,
) -> Result<(), Option<SubscriptionId<'static>>> {
) -> Option<SubscriptionId<'static>> {
let sub_id = response.params.subscription.into_owned();
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Some(request_id) => request_id,
None => {
tracing::debug!(target: LOG_TARGET, "Subscription {:?} is not active", sub_id);
return Err(None);
return None;
}
};

match manager.as_subscription_mut(&request_id) {
Some(send_back_sink) => match send_back_sink.try_send(response.params.result) {
Ok(()) => Ok(()),
Err(err) => {
tracing::error!(target: LOG_TARGET, "Dropping subscription {:?} error: {:?}", sub_id, err);
Err(Some(sub_id))
Some(send_back_sink) => match send_back_sink.send(response.params.result) {
Ok(_) => None,
Err(TrySubscriptionSendError::Closed) => Some(sub_id),
Err(TrySubscriptionSendError::TooSlow(m)) => {
tracing::error!(target: LOG_TARGET, "Subscription {{method={}, sub_id={:?}}} couldn't keep up with server; failed to send {m}", response.method, sub_id);
Some(sub_id)
}
},
None => {
tracing::debug!(target: LOG_TARGET, "Subscription {:?} is not active", sub_id);
Err(None)
None
}
}
}
Expand Down Expand Up @@ -150,10 +150,13 @@ pub(crate) fn process_subscription_close_response(
/// It's possible that user close down the subscription before this notification is received.
pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notification<JsonValue>) {
match manager.as_notification_handler_mut(notif.method.to_string()) {
Some(send_back_sink) => match send_back_sink.try_send(notif.params) {
Some(send_back_sink) => match send_back_sink.send(notif.params) {
Ok(()) => (),
Err(err) => {
tracing::warn!(target: LOG_TARGET, "Could not send notification, dropping handler for {:?} error: {:?}", notif.method, err);
Err(TrySubscriptionSendError::Closed) => {
let _ = manager.remove_notification_handler(&notif.method);
}
Err(TrySubscriptionSendError::TooSlow(m)) => {
tracing::error!(target: LOG_TARGET, "Notification `{}` couldn't keep up with server; failed to send {m}", notif.method);
let _ = manager.remove_notification_handler(&notif.method);
}
},
Expand Down Expand Up @@ -206,7 +209,7 @@ pub(crate) fn process_single_response(
}
};

let (subscribe_tx, subscribe_rx) = mpsc::channel(max_capacity_per_subscription);
let (subscribe_tx, subscribe_rx) = subscription_channel(max_capacity_per_subscription);
if manager
.insert_subscription(response_id.clone(), unsub_id, sub_id.clone(), subscribe_tx, unsubscribe_method)
.is_ok()
Expand Down
50 changes: 26 additions & 24 deletions core/src/client/async_client/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ use std::{
};

use crate::{
client::BatchEntry,
client::Error, error::RegisterMethodError,
client::{BatchEntry, Error, SubscriptionReceiver, SubscriptionSender},
error::RegisterMethodError,
};
use jsonrpsee_types::{Id, SubscriptionId};
use rustc_hash::FxHashMap;
use serde_json::value::Value as JsonValue;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;

#[derive(Debug)]
enum Kind {
Expand All @@ -68,8 +68,8 @@ pub(crate) enum RequestStatus {

type PendingCallOneshot = Option<oneshot::Sender<Result<JsonValue, Error>>>;
type PendingBatchOneshot = oneshot::Sender<Result<Vec<BatchEntry<'static, JsonValue>>, Error>>;
type PendingSubscriptionOneshot = oneshot::Sender<Result<(mpsc::Receiver<JsonValue>, SubscriptionId<'static>), Error>>;
type SubscriptionSink = mpsc::Sender<JsonValue>;
type PendingSubscriptionOneshot = oneshot::Sender<Result<(SubscriptionReceiver, SubscriptionId<'static>), Error>>;
type SubscriptionSink = SubscriptionSender;
type UnsubscribeMethod = String;
type RequestId = Id<'static>;

Expand Down Expand Up @@ -339,10 +339,12 @@ impl RequestManager {

#[cfg(test)]
mod tests {
use crate::client::subscription_channel;

use super::{Error, RequestManager};
use jsonrpsee_types::{Id, SubscriptionId};
use serde_json::Value as JsonValue;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;

#[test]
fn insert_remove_pending_request_works() {
Expand All @@ -355,8 +357,8 @@ mod tests {

#[test]
fn insert_remove_subscription_works() {
let (pending_sub_tx, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (sub_tx, _) = mpsc::channel::<JsonValue>(1);
let (pending_sub_tx, _) = oneshot::channel();
let (sub_tx, _) = subscription_channel(1);
let mut manager = RequestManager::new();
assert!(manager
.insert_pending_subscription(Id::Number(1), Id::Number(2), pending_sub_tx, "unsubscribe_method".into())
Expand All @@ -382,10 +384,10 @@ mod tests {

#[test]
fn insert_subscription_with_same_sub_and_unsub_id_should_err() {
let (tx1, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (tx2, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (tx3, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (tx4, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (tx1, _) = oneshot::channel();
let (tx2, _) = oneshot::channel();
let (tx3, _) = oneshot::channel();
let (tx4, _) = oneshot::channel();
let mut manager = RequestManager::new();
assert!(manager
.insert_pending_subscription(Id::Str("1".into()), Id::Str("1".into()), tx1, "unsubscribe_method".into())
Expand Down Expand Up @@ -419,10 +421,10 @@ mod tests {

#[test]
fn pending_method_call_faulty() {
let (request_tx1, _) = oneshot::channel::<Result<JsonValue, Error>>();
let (request_tx2, _) = oneshot::channel::<Result<JsonValue, Error>>();
let (pending_sub_tx, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (sub_tx, _) = mpsc::channel::<JsonValue>(1);
let (request_tx1, _) = oneshot::channel();
let (request_tx2, _) = oneshot::channel();
let (pending_sub_tx, _) = oneshot::channel();
let (sub_tx, _) = subscription_channel(1);

let mut manager = RequestManager::new();
assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx1)).is_ok());
Expand All @@ -447,10 +449,10 @@ mod tests {

#[test]
fn pending_subscription_faulty() {
let (request_tx, _) = oneshot::channel::<Result<JsonValue, Error>>();
let (pending_sub_tx1, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (pending_sub_tx2, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (sub_tx, _) = mpsc::channel::<JsonValue>(1);
let (request_tx, _) = oneshot::channel();
let (pending_sub_tx1, _) = oneshot::channel();
let (pending_sub_tx2, _) = oneshot::channel();
let (sub_tx, _) = subscription_channel(1);

let mut manager = RequestManager::new();
assert!(manager
Expand Down Expand Up @@ -478,10 +480,10 @@ mod tests {

#[test]
fn active_subscriptions_faulty() {
let (request_tx, _) = oneshot::channel::<Result<JsonValue, Error>>();
let (pending_sub_tx, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (sub_tx1, _) = mpsc::channel::<JsonValue>(1);
let (sub_tx2, _) = mpsc::channel::<JsonValue>(1);
let (request_tx, _) = oneshot::channel();
let (pending_sub_tx, _) = oneshot::channel();
let (sub_tx1, _) = subscription_channel(1);
let (sub_tx2, _) = subscription_channel(1);

let mut manager = RequestManager::new();

Expand Down
10 changes: 5 additions & 5 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use tracing::instrument;

use self::utils::{InactivityCheck, IntervalStream};

use super::{generate_batch_id_range, FrontToBack, IdKind, RequestIdManager};
use super::{generate_batch_id_range, subscription_channel, FrontToBack, IdKind, RequestIdManager};

const LOG_TARGET: &str = "jsonrpsee-client";

Expand Down Expand Up @@ -707,13 +707,13 @@ impl SubscriptionClientT for Client {

let res = call_with_timeout(self.request_timeout, send_back_rx).await;

let (notifs_rx, method) = match res {
let (rx, method) = match res {
Ok(Ok(val)) => val,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.disconnect_reason().await),
};

Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Method(method)))
Ok(Subscription::new(self.to_back.clone(), rx, SubscriptionKind::Method(method)))
}
}

Expand Down Expand Up @@ -746,7 +746,7 @@ fn handle_backend_messages<R: TransportReceiverT>(
}
// Subscription response.
else if let Ok(response) = serde_json::from_slice::<SubscriptionResponse<_>>(raw) {
if let Err(Some(sub_id)) = process_subscription_response(&mut manager.lock(), response) {
if let Some(sub_id) = process_subscription_response(&mut manager.lock(), response) {
return Ok(Some(FrontToBack::SubscriptionClosed(sub_id)));
}
}
Expand Down Expand Up @@ -896,7 +896,7 @@ async fn handle_frontend_messages<S: TransportSenderT>(
}
// User called `register_notification` on the front-end.
FrontToBack::RegisterNotification(reg) => {
let (subscribe_tx, subscribe_rx) = mpsc::channel(max_buffer_capacity_per_subscription);
let (subscribe_tx, subscribe_rx) = subscription_channel(max_buffer_capacity_per_subscription);

if manager.lock().insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
Expand Down
Loading
Loading