Skip to content

Commit

Permalink
add IntoResponse trait for method calls (#1057)
Browse files Browse the repository at this point in the history
* IntoResponse trait for rpc calls

* remove ErrorResponse

* cleanup, fix nits

* separate types from ser/deserialization

* fix uncommented code

* add Success type

* add missing dev-dep

* fix tests with issue link

* fix tests

* add missing file

* PartialResponse -> ResponsePayload

* chore(deps): bump actions/checkout from 3.4.0 to 3.5.0 (#1055)

Bumps [actions/checkout](https://github.com/actions/checkout) from 3.4.0 to 3.5.0.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](actions/checkout@v3.4.0...v3.5.0)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump baptiste0928/cargo-install from 1 to 2 (#1056)

Bumps [baptiste0928/cargo-install](https://github.com/baptiste0928/cargo-install) from 1 to 2.
- [Release notes](https://github.com/baptiste0928/cargo-install/releases)
- [Changelog](https://github.com/baptiste0928/cargo-install/blob/main/CHANGELOG.md)
- [Commits](baptiste0928/cargo-install@v1...v2)

---
updated-dependencies:
- dependency-name: baptiste0928/cargo-install
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* fix: tokio v1.27 (#1062)

* fix: tokio v1.27

* Update server/src/transport/ws.rs

* fix rustdoc

* Update server/src/transport/ws.rs

* Update server/src/transport/ws.rs

* no more futuredriver for incoming conns

* add comment for unclear code

* ResponsePayload Cow-like

* fix ugly code

* cleanup

* address grumbles

* ToOwned -> Clone

* compile-time tests to workaround rustc bug

* Update proc-macros/src/helpers.rs

* add missing impls for Vec and [T; N]

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
niklasad1 and dependabot[bot] authored Apr 11, 2023
1 parent 118acc3 commit 191167a
Show file tree
Hide file tree
Showing 39 changed files with 770 additions and 492 deletions.
16 changes: 5 additions & 11 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,23 +169,17 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::se
fn gen_rpc_module() -> jsonrpsee::RpcModule<()> {
let mut module = jsonrpsee::RpcModule::new(());

module.register_method(SYNC_FAST_CALL, |_, _| Ok("lo")).unwrap();
module
.register_async_method(ASYNC_FAST_CALL, |_, _| async { Result::<_, jsonrpsee::core::Error>::Ok("lo") })
.unwrap();
module.register_method(SYNC_FAST_CALL, |_, _| "lo").unwrap();
module.register_async_method(ASYNC_FAST_CALL, |_, _| async { "lo" }).unwrap();

module.register_method(SYNC_MEM_CALL, |_, _| Ok("A".repeat(MIB))).unwrap();
module.register_method(SYNC_MEM_CALL, |_, _| "A".repeat(MIB)).unwrap();

module
.register_async_method(ASYNC_MEM_CALL, |_, _| async move {
Result::<_, jsonrpsee::core::Error>::Ok("A".repeat(MIB))
})
.unwrap();
module.register_async_method(ASYNC_MEM_CALL, |_, _| async move { "A".repeat(MIB) }).unwrap();

module
.register_method(SYNC_SLOW_CALL, |_, _| {
std::thread::sleep(SLOW_CALL);
Ok("slow call")
"slow call"
})
.unwrap();

Expand Down
37 changes: 14 additions & 23 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::sync::Arc;
use std::time::Duration;

use crate::transport::{self, Error as TransportError, HttpTransportClient};
use crate::types::{ErrorResponse, NotificationSer, RequestSer, Response};
use crate::types::{NotificationSer, RequestSer, Response};
use async_trait::async_trait;
use hyper::body::HttpBody;
use hyper::http::HeaderMap;
Expand All @@ -44,7 +44,7 @@ use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{Error, JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::CallError;
use jsonrpsee_types::{ErrorObject, TwoPointZero};
use jsonrpsee_types::{ErrorObject, ResponseSuccess, TwoPointZero};
use serde::de::DeserializeOwned;
use tower::layer::util::Identity;
use tower::{Layer, Service};
Expand Down Expand Up @@ -300,13 +300,8 @@ where

// NOTE: it's decoded first to `JsonRawValue` and then to `R` below to get
// a better error message if `R` couldn't be decoded.
let response: Response<&JsonRawValue> = match serde_json::from_slice(&body) {
Ok(response) => response,
Err(_) => {
let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?;
return Err(Error::Call(CallError::Custom(err.into_error_object())));
}
};
let response = ResponseSuccess::try_from(serde_json::from_slice::<Response<&JsonRawValue>>(&body)?)
.map_err(|e| Error::Call(CallError::Custom(e)))?;

let result = serde_json::from_str(response.result.get()).map_err(Error::ParseError)?;

Expand Down Expand Up @@ -345,7 +340,7 @@ where
Ok(Err(e)) => return Err(Error::Transport(e.into())),
};

let json_rps: Vec<&JsonRawValue> = serde_json::from_slice(&body).map_err(Error::ParseError)?;
let json_rps: Vec<Response<&JsonRawValue>> = serde_json::from_slice(&body).map_err(Error::ParseError)?;

let mut responses = Vec::with_capacity(json_rps.len());
let mut successful_calls = 0;
Expand All @@ -356,22 +351,18 @@ where
}

for rp in json_rps {
let (id, res) = match serde_json::from_str::<Response<R>>(rp.get()).map_err(Error::ParseError) {
let id = rp.id.try_parse_inner_as_number().ok_or(Error::InvalidRequestId)?;

let res = match ResponseSuccess::try_from(rp) {
Ok(r) => {
let id = r.id.try_parse_inner_as_number().ok_or(Error::InvalidRequestId)?;
let result = serde_json::from_str(r.result.get())?;
successful_calls += 1;
(id, Ok(r.result))
Ok(result)
}
Err(err) => {
failed_calls += 1;
Err(err)
}
Err(err) => match serde_json::from_str::<ErrorResponse>(rp.get()).map_err(Error::ParseError) {
Ok(err) => {
let id = err.id().try_parse_inner_as_number().ok_or(Error::InvalidRequestId)?;
failed_calls += 1;
(id, Err(err.into_error_object()))
}
Err(_) => {
return Err(err);
}
},
};

let maybe_elem = id
Expand Down
2 changes: 1 addition & 1 deletion client/http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ async fn batch_request_out_of_order_response() {
assert_eq!(response, vec!["hello".to_string(), "goodbye".to_string(), "here's your swag".to_string()]);
}

async fn run_batch_request_with_response<T: Send + DeserializeOwned + std::fmt::Debug + 'static>(
async fn run_batch_request_with_response<T: Send + DeserializeOwned + std::fmt::Debug + Clone + 'static>(
batch: BatchRequestBuilder<'_>,
response: String,
) -> Result<BatchResponse<T>, Error> {
Expand Down
2 changes: 1 addition & 1 deletion client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ async fn is_connected_works() {
assert!(!client.is_connected())
}

async fn run_batch_request_with_response<T: Send + DeserializeOwned + std::fmt::Debug + 'static>(
async fn run_batch_request_with_response<T: Send + DeserializeOwned + std::fmt::Debug + Clone + 'static>(
batch: BatchRequestBuilder<'_>,
response: String,
) -> Result<BatchResponse<T>, Error> {
Expand Down
42 changes: 14 additions & 28 deletions core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tokio::sync::{mpsc, oneshot};
use jsonrpsee_types::error::CallError;
use jsonrpsee_types::response::SubscriptionError;
use jsonrpsee_types::{
ErrorObject, ErrorResponse, Id, Notification, RequestSer, Response, SubscriptionId, SubscriptionResponse,
ErrorObject, Id, Notification, RequestSer, Response, ResponseSuccess, SubscriptionId, SubscriptionResponse,
};
use serde_json::Value as JsonValue;
use std::ops::Range;
Expand Down Expand Up @@ -175,28 +175,36 @@ pub(crate) fn process_single_response(
response: Response<JsonValue>,
max_capacity_per_subscription: usize,
) -> Result<Option<RequestMessage>, Error> {
let response_id = response.id.into_owned();
let response_id = response.id.clone().into_owned();
let result = ResponseSuccess::try_from(response).map(|s| s.result).map_err(|e| Error::Call(CallError::Custom(e)));

match manager.request_status(&response_id) {
RequestStatus::PendingMethodCall => {
let send_back_oneshot = match manager.complete_pending_call(response_id) {
Some(Some(send)) => send,
Some(None) => return Ok(None),
None => return Err(Error::InvalidRequestId),
};
let _ = send_back_oneshot.send(Ok(response.result));

let _ = send_back_oneshot.send(result);
Ok(None)
}
RequestStatus::PendingSubscription => {
let (unsub_id, send_back_oneshot, unsubscribe_method) =
manager.complete_pending_subscription(response_id.clone()).ok_or(Error::InvalidRequestId)?;

let sub_id: Result<SubscriptionId, _> = response.result.try_into();
let sub_id = result.map(|r| SubscriptionId::try_from(r).ok());

let sub_id = match sub_id {
Ok(sub_id) => sub_id,
Err(_) => {
Ok(Some(sub_id)) => sub_id,
Ok(None) => {
let _ = send_back_oneshot.send(Err(Error::InvalidSubscriptionId));
return Ok(None);
}
Err(e) => {
let _ = send_back_oneshot.send(Err(e));
return Ok(None);
}
};

let (subscribe_tx, subscribe_rx) = mpsc::channel(max_capacity_per_subscription);
Expand Down Expand Up @@ -248,28 +256,6 @@ pub(crate) fn build_unsubscribe_message(
Some(RequestMessage { raw, id: unsub_req_id, send_back: None })
}

/// Attempts to process an error response.
///
/// Returns `Ok` if the response was successfully sent.
/// Returns `Err(_)` if the response ID was not found.
pub(crate) fn process_error_response(manager: &mut RequestManager, err: ErrorResponse) -> Result<(), Error> {
let id = err.id().clone().into_owned();

match manager.request_status(&id) {
RequestStatus::PendingMethodCall => {
let send_back = manager.complete_pending_call(id).expect("State checked above; qed");
let _ = send_back.map(|s| s.send(Err(Error::Call(CallError::Custom(err.into_error_object())))));
Ok(())
}
RequestStatus::PendingSubscription => {
let (_, send_back, _) = manager.complete_pending_subscription(id).expect("State checked above; qed");
let _ = send_back.send(Err(Error::Call(CallError::Custom(err.into_error_object()))));
Ok(())
}
_ => Err(Error::InvalidRequestId),
}
}

/// Wait for a stream to complete within the given timeout.
pub(crate) async fn call_with_timeout<T>(
timeout: std::time::Duration,
Expand Down
30 changes: 11 additions & 19 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use std::borrow::Cow as StdCow;

use core::time::Duration;
use helpers::{
build_unsubscribe_message, call_with_timeout, process_batch_response, process_error_response, process_notification,
build_unsubscribe_message, call_with_timeout, process_batch_response, process_notification,
process_single_response, process_subscription_response, stop_subscription,
};
use jsonrpsee_types::TwoPointZero;
use jsonrpsee_types::{ResponseSuccess, TwoPointZero};
use manager::RequestManager;

use async_lock::Mutex;
Expand All @@ -29,8 +29,8 @@ use futures_timer::Delay;
use futures_util::future::{self, Either, Fuse};
use futures_util::stream::StreamExt;
use futures_util::FutureExt;
use jsonrpsee_types::response::SubscriptionError;
use jsonrpsee_types::{ErrorResponse, Notification, NotificationSer, RequestSer, Response, SubscriptionResponse};
use jsonrpsee_types::response::{ResponsePayload, SubscriptionError};
use jsonrpsee_types::{Notification, NotificationSer, RequestSer, Response, SubscriptionResponse};
use serde::de::DeserializeOwned;
use tokio::sync::{mpsc, oneshot};
use tracing::instrument;
Expand Down Expand Up @@ -339,7 +339,7 @@ impl ClientT for Client {
Err(_) => return Err(self.read_error_from_backend().await),
};

rx_log_from_json(&Response::new(&json_value, id), self.max_log_length);
rx_log_from_json(&Response::new(ResponsePayload::result_borrowed(&json_value), id), self.max_log_length);

serde_json::from_value(json_value).map_err(Error::ParseError)
}
Expand Down Expand Up @@ -462,7 +462,7 @@ impl SubscriptionClientT for Client {
Err(_) => return Err(self.read_error_from_backend().await),
};

rx_log_from_json(&Response::new(&sub_id, id_unsub), self.max_log_length);
rx_log_from_json(&Response::new(ResponsePayload::result_borrowed(&sub_id), id_unsub), self.max_log_length);

Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id)))
}
Expand Down Expand Up @@ -542,10 +542,6 @@ async fn handle_backend_messages<S: TransportSenderT, R: TransportReceiverT>(
// Incoming Notification
else if let Ok(notif) = serde_json::from_slice::<Notification<_>>(raw) {
let _ = process_notification(manager, notif);
}
// Error response
else if let Ok(err) = serde_json::from_slice::<ErrorResponse>(raw) {
process_error_response(manager, err)?;
} else {
return Err(unparse_error(raw));
}
Expand All @@ -558,18 +554,14 @@ async fn handle_backend_messages<S: TransportSenderT, R: TransportReceiverT>(
let mut range = None;

for r in raw_responses {
let id = if let Ok(response) = serde_json::from_str::<Response<_>>(r.get()) {
let id = response.id.try_parse_inner_as_number().ok_or(Error::InvalidRequestId)?;
batch.push(InnerBatchResponse { id, result: Ok(response.result) });
id
} else if let Ok(err) = serde_json::from_str::<ErrorResponse>(r.get()) {
let id = err.id().try_parse_inner_as_number().ok_or(Error::InvalidRequestId)?;
batch.push(InnerBatchResponse { id, result: Err(err.into_error_object()) });
id
} else {
let Ok(response) = serde_json::from_str::<Response<_>>(r.get()) else {
return Err(unparse_error(raw));
};

let id = response.id.try_parse_inner_as_number().ok_or(Error::InvalidRequestId)?;
let result = ResponseSuccess::try_from(response).map(|s| s.result);
batch.push(InnerBatchResponse { id, result });

let r = range.get_or_insert(id..id);

if id < r.start {
Expand Down
Loading

0 comments on commit 191167a

Please sign in to comment.