diff --git a/proc-macros/src/new/render_server.rs b/proc-macros/src/new/render_server.rs index 0851035000..a622c04fcb 100644 --- a/proc-macros/src/new/render_server.rs +++ b/proc-macros/src/new/render_server.rs @@ -61,9 +61,7 @@ impl RpcDescription { if method.signature.sig.asyncness.is_some() { quote! { rpc.register_async_method(#rpc_method_name, |params, context| { - let owned_params = params.owned(); let fut = async move { - let params = owned_params.borrowed(); #parsing Ok(context.as_ref().#rust_method_name(#params_seq).await) }; diff --git a/types/src/v2/params.rs b/types/src/v2/params.rs index d93a63f2a8..843304855a 100644 --- a/types/src/v2/params.rs +++ b/types/src/v2/params.rs @@ -58,25 +58,71 @@ impl Serialize for TwoPointZero { } } -/// Parameters sent with the RPC request -#[derive(Clone, Copy, Debug)] -pub struct RpcParams<'a>(Option<&'a str>); +/// Parameters sent with the RPC request. +/// +/// The data containing the params is a `Cow<&str>` and can either be a borrowed `&str` of JSON from an incoming +/// [`super::request::JsonRpcRequest`] (which in turn borrows it from the input buffer that is shared between requests); +/// or, it can be an owned `String`. +#[derive(Clone, Debug)] +pub struct RpcParams<'a>(Option>); impl<'a> RpcParams<'a> { /// Create params pub fn new(raw: Option<&'a str>) -> Self { - Self(raw) + Self(raw.map(Into::into)) + } + + /// Obtain a sequence parser, [`RpcParamsSequence`]. + /// + /// This allows sequential parsing of the incoming params, using an `Iterator`-style API and is useful when the RPC + /// request has optional parameters at the tail that may or may not be present. + pub fn sequence(&self) -> RpcParamsSequence { + RpcParamsSequence(self.0.as_ref().map(AsRef::as_ref).unwrap_or("")) + } + + /// Attempt to parse all parameters as an array or map into type `T`. + pub fn parse(&'a self) -> Result + where + T: Deserialize<'a>, + { + let params = self.0.as_ref().map(AsRef::as_ref).unwrap_or("null"); + serde_json::from_str(params).map_err(|_| CallError::InvalidParams) + } + + /// Attempt to parse parameters as an array of a single value of type `T`, and returns that value. + pub fn one(&'a self) -> Result + where + T: Deserialize<'a>, + { + self.parse::<[T; 1]>().map(|[res]| res) } + /// Convert `RpcParams<'a>` to `RpcParams<'static>` so that it can be moved across threads. + /// + /// This will cause an allocation if the params internally are using a borrowed JSON slice. + pub fn into_owned(self) -> RpcParams<'static> { + RpcParams(self.0.map(|s| Cow::owned(s.into_owned()))) + } +} + +/// An `Iterator`-like parser for a sequence of `RpcParams`. +/// +/// This will parse the params one at a time, and allows for graceful handling of optional parameters at the tail; other +/// use cases are likely better served by [`RpcParams::parse`]. The reason this is not an actual [`Iterator`] is that +/// params parsing (often) yields values of different types. +#[derive(Debug)] +pub struct RpcParamsSequence<'a>(&'a str); + +impl<'a> RpcParamsSequence<'a> { fn next_inner(&mut self) -> Option> where T: Deserialize<'a>, { - let mut json = self.0?.trim_start(); + let mut json = self.0.trim_start(); match json.as_bytes().get(0)? { b']' => { - self.0 = None; + self.0 = ""; return None; } @@ -88,12 +134,12 @@ impl<'a> RpcParams<'a> { match iter.next()? { Ok(value) => { - self.0 = Some(&json[iter.byte_offset()..]); + self.0 = &json[iter.byte_offset()..]; Some(Ok(value)) } Err(_) => { - self.0 = None; + self.0 = ""; Some(Err(CallError::InvalidParams)) } @@ -104,11 +150,12 @@ impl<'a> RpcParams<'a> { /// /// ``` /// # use jsonrpsee_types::v2::params::RpcParams; - /// let mut params = RpcParams::new(Some(r#"[true, 10, "foo"]"#)); + /// let params = RpcParams::new(Some(r#"[true, 10, "foo"]"#)); + /// let mut seq = params.sequence(); /// - /// let a: bool = params.next().unwrap(); - /// let b: i32 = params.next().unwrap(); - /// let c: &str = params.next().unwrap(); + /// let a: bool = seq.next().unwrap(); + /// let b: i32 = seq.next().unwrap(); + /// let c: &str = seq.next().unwrap(); /// /// assert_eq!(a, true); /// assert_eq!(b, 10); @@ -130,13 +177,14 @@ impl<'a> RpcParams<'a> { /// /// ``` /// # use jsonrpsee_types::v2::params::RpcParams; - /// let mut params = RpcParams::new(Some(r#"[1, 2, null]"#)); + /// let params = RpcParams::new(Some(r#"[1, 2, null]"#)); + /// let mut seq = params.sequence(); /// /// let params: [Option; 4] = [ - /// params.optional_next().unwrap(), - /// params.optional_next().unwrap(), - /// params.optional_next().unwrap(), - /// params.optional_next().unwrap(), + /// seq.optional_next().unwrap(), + /// seq.optional_next().unwrap(), + /// seq.optional_next().unwrap(), + /// seq.optional_next().unwrap(), /// ];; /// /// assert_eq!(params, [Some(1), Some(2), None, None]); @@ -150,62 +198,21 @@ impl<'a> RpcParams<'a> { None => Ok(None), } } - - /// Attempt to parse all parameters as array or map into type `T` - pub fn parse(self) -> Result - where - T: Deserialize<'a>, - { - let params = self.0.unwrap_or("null"); - serde_json::from_str(params).map_err(|_| CallError::InvalidParams) - } - - /// Attempt to parse parameters as an array of a single value of type `T`, and returns that value. - pub fn one(self) -> Result - where - T: Deserialize<'a>, - { - self.parse::<[T; 1]>().map(|[res]| res) - } - - /// Creates an owned version of parameters. - /// Required to simplify proc-macro implementation. - #[doc(hidden)] - pub fn owned(self) -> OwnedRpcParams { - self.into() - } -} - -/// Owned version of [`RpcParams`]. -#[derive(Clone, Debug)] -pub struct OwnedRpcParams(Option); - -impl OwnedRpcParams { - /// Converts `OwnedRpcParams` into borrowed [`RpcParams`]. - pub fn borrowed(&self) -> RpcParams<'_> { - RpcParams(self.0.as_ref().map(|s| s.as_ref())) - } -} - -impl<'a> From> for OwnedRpcParams { - fn from(borrowed: RpcParams<'a>) -> Self { - Self(borrowed.0.map(Into::into)) - } } /// [Serializable JSON-RPC parameters](https://www.jsonrpc.org/specification#parameter_structures) /// -/// If your type implement `Into` call that favor of `serde_json::to:value` to -/// construct the parameters. Because `serde_json::to_value` serializes the type which -/// allocates whereas `Into` doesn't in most cases. +/// If your type implements `Into`, call that in favor of `serde_json::to:value` to +/// construct the parameters. Because `serde_json::to_value` serializes the type which allocates +/// whereas `Into` doesn't in most cases. #[derive(Serialize, Debug, Clone)] #[serde(untagged)] pub enum JsonRpcParams<'a> { /// No params. NoParams, - /// Positional params (heap allocated) + /// Positional params (heap allocated). Array(Vec), - /// Positional params (slice) + /// Positional params (slice). ArrayRef(&'a [JsonValue]), /// Params by name. Map(BTreeMap<&'a str, JsonValue>), @@ -287,38 +294,15 @@ impl<'a> Id<'a> { _ => None, } } -} - -/// Owned version of [`Id`] that allocates memory for the `Str` variant. -#[derive(Debug, PartialEq, Clone, Hash, Eq, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -#[serde(untagged)] -pub enum OwnedId { - /// Null - Null, - /// Numeric id - Number(u64), - /// String id - Str(String), -} -impl OwnedId { - /// Converts `OwnedId` into borrowed `Id`. - pub fn borrowed(&self) -> Id<'_> { + /// Convert `Id<'a>` to `Id<'static>` so that it can be moved across threads. + /// + /// This can cause an allocation if the id is a string. + pub fn into_owned(self) -> Id<'static> { match self { - Self::Null => Id::Null, - Self::Number(num) => Id::Number(*num), - Self::Str(str) => Id::Str(Cow::borrowed(str)), - } - } -} - -impl<'a> From> for OwnedId { - fn from(borrowed: Id<'a>) -> Self { - match borrowed { - Id::Null => Self::Null, - Id::Number(num) => Self::Number(num), - Id::Str(num) => Self::Str(num.as_ref().to_owned()), + Id::Null => Id::Null, + Id::Number(num) => Id::Number(num), + Id::Str(s) => Id::Str(Cow::owned(s.into_owned())), } } } @@ -333,7 +317,13 @@ mod test { fn id_deserialization() { let s = r#""2""#; let deserialized: Id = serde_json::from_str(s).unwrap(); - assert_eq!(deserialized, Id::Str("2".into())); + match deserialized { + Id::Str(ref cow) => { + assert!(cow.is_borrowed()); + assert_eq!(cow, "2"); + } + _ => panic!("Expected Id::Str"), + } let s = r#"2"#; let deserialized: Id = serde_json::from_str(s).unwrap(); @@ -378,17 +368,19 @@ mod test { #[test] fn params_parse() { - let mut none = RpcParams::new(None); - assert!(none.next::().is_err()); + let none = RpcParams::new(None); + assert!(none.sequence().next::().is_err()); - let mut array_params = RpcParams::new(Some("[1, 2, 3]")); + let array_params = RpcParams::new(Some("[1, 2, 3]")); let arr: Result<[u64; 3], _> = array_params.parse(); assert!(arr.is_ok()); - assert_eq!(array_params.next::().unwrap(), 1); - assert_eq!(array_params.next::().unwrap(), 2); - assert_eq!(array_params.next::().unwrap(), 3); - assert!(array_params.next::().is_err()); + let mut seq = array_params.sequence(); + + assert_eq!(seq.next::().unwrap(), 1); + assert_eq!(seq.next::().unwrap(), 2); + assert_eq!(seq.next::().unwrap(), 3); + assert!(seq.next::().is_err()); let array_one = RpcParams::new(Some("[1]")); let one: Result = array_one.one(); @@ -399,6 +391,20 @@ mod test { assert!(obj.is_ok()); } + #[test] + fn params_sequence_borrows() { + let params = RpcParams::new(Some(r#"["foo", "bar"]"#)); + let mut seq = params.sequence(); + + assert_eq!(seq.next::<&str>().unwrap(), "foo"); + assert_eq!(seq.next::<&str>().unwrap(), "bar"); + assert!(seq.next::<&str>().is_err()); + + // It's ok to parse the params again. + let params: (&str, &str) = params.parse().unwrap(); + assert_eq!(params, ("foo", "bar")); + } + #[test] fn two_point_zero_serde_works() { let initial_ser = r#""2.0""#; diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index ee09d1b4ff..89fedbad3c 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -4,8 +4,7 @@ use futures_util::{future::BoxFuture, FutureExt}; use jsonrpsee_types::error::{CallError, Error, SubscriptionClosedError}; use jsonrpsee_types::v2::error::{JsonRpcErrorCode, JsonRpcErrorObject, CALL_EXECUTION_FAILED_CODE}; use jsonrpsee_types::v2::params::{ - Id, JsonRpcSubscriptionParams, OwnedId, OwnedRpcParams, RpcParams, SubscriptionId as JsonRpcSubscriptionId, - TwoPointZero, + Id, JsonRpcSubscriptionParams, RpcParams, SubscriptionId as JsonRpcSubscriptionId, TwoPointZero, }; use jsonrpsee_types::v2::request::{JsonRpcNotification, JsonRpcRequest}; @@ -22,7 +21,9 @@ use std::sync::Arc; pub type SyncMethod = Arc Result<(), Error>>; /// Similar to [`SyncMethod`], but represents an asynchronous handler. pub type AsyncMethod = Arc< - dyn Send + Sync + Fn(OwnedId, OwnedRpcParams, MethodSink, ConnectionId) -> BoxFuture<'static, Result<(), Error>>, + dyn Send + + Sync + + Fn(Id<'static>, RpcParams<'static>, MethodSink, ConnectionId) -> BoxFuture<'static, Result<(), Error>>, >; /// Connection ID, used for stateful protocol such as WebSockets. /// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. @@ -60,8 +61,8 @@ impl MethodCallback { MethodCallback::Sync(callback) => (callback)(req.id.clone(), params, tx, conn_id), MethodCallback::Async(callback) => { let tx = tx.clone(); - let params = OwnedRpcParams::from(params); - let id = OwnedId::from(req.id); + let params = params.into_owned(); + let id = req.id.into_owned(); (callback)(id, params, tx, conn_id).await } @@ -215,7 +216,11 @@ impl RpcModule { pub fn register_async_method(&mut self, method_name: &'static str, callback: F) -> Result<(), Error> where R: Serialize + Send + Sync + 'static, - F: Fn(RpcParams, Arc) -> BoxFuture<'static, Result> + Copy + Send + Sync + 'static, + F: Fn(RpcParams<'static>, Arc) -> BoxFuture<'static, Result> + + Copy + + Send + + Sync + + 'static, { self.methods.verify_method_name(method_name)?; @@ -226,8 +231,6 @@ impl RpcModule { MethodCallback::Async(Arc::new(move |id, params, tx, _| { let ctx = ctx.clone(); let future = async move { - let params = params.borrowed(); - let id = id.borrowed(); match callback(params, ctx).await { Ok(res) => send_response(id, &tx, res), Err(CallError::InvalidParams) => send_error(id, &tx, JsonRpcErrorCode::InvalidParams.into()), @@ -267,8 +270,8 @@ impl RpcModule { /// use jsonrpsee_utils::server::rpc_module::RpcModule; /// /// let mut ctx = RpcModule::new(99_usize); - /// ctx.register_subscription("sub", "unsub", |mut params, mut sink, ctx| { - /// let x: usize = params.next()?; + /// ctx.register_subscription("sub", "unsub", |params, mut sink, ctx| { + /// let x: usize = params.one()?; /// std::thread::spawn(move || { /// let sum = x + (*ctx); /// sink.send(&sum) diff --git a/ws-server/src/tests.rs b/ws-server/src/tests.rs index bf3b286bb4..cffb79d902 100644 --- a/ws-server/src/tests.rs +++ b/ws-server/src/tests.rs @@ -5,10 +5,7 @@ use futures_util::FutureExt; use jsonrpsee_test_utils::helpers::*; use jsonrpsee_test_utils::types::{Id, TestContext, WebSocketTestClient}; use jsonrpsee_test_utils::TimeoutFutureExt; -use jsonrpsee_types::{ - error::{CallError, Error}, - v2::params::RpcParams, -}; +use jsonrpsee_types::error::{CallError, Error}; use serde_json::Value as JsonValue; use std::fmt; use std::net::SocketAddr; @@ -25,13 +22,15 @@ impl fmt::Display for MyAppError { impl std::error::Error for MyAppError {} /// Spawns a dummy `JSONRPC v2 WebSocket` -/// It has two hardcoded methods: "say_hello" and "add" async fn server() -> SocketAddr { server_with_handles().await.0 } /// Spawns a dummy `JSONRPC v2 WebSocket` -/// It has two hardcoded methods: "say_hello" and "add" +/// It has the following methods: +/// sync methods: `say_hello` and `add` +/// async: `say_hello_async` and `add_sync` +/// other: `invalid_params` (always returns `CallError::InvalidParams`), `call_fail` (always returns `CallError::Failed`), `sleep_for` /// Returns the address together with handles for server future and server stop. async fn server_with_handles() -> (SocketAddr, JoinHandle<()>, StopHandle) { let mut server = WsServerBuilder::default().build("127.0.0.1:0").with_default_timeout().await.unwrap().unwrap(); @@ -43,7 +42,14 @@ async fn server_with_handles() -> (SocketAddr, JoinHandle<()>, StopHandle) { }) .unwrap(); module - .register_async_method("say_hello_async", |_: RpcParams, _| { + .register_method("add", |params, _| { + let params: Vec = params.parse()?; + let sum: u64 = params.into_iter().sum(); + Ok(sum) + }) + .unwrap(); + module + .register_async_method("say_hello_async", |_, _| { async move { log::debug!("server respond to hello"); // Call some async function inside. @@ -54,10 +60,13 @@ async fn server_with_handles() -> (SocketAddr, JoinHandle<()>, StopHandle) { }) .unwrap(); module - .register_method("add", |params, _| { - let params: Vec = params.parse()?; - let sum: u64 = params.into_iter().sum(); - Ok(sum) + .register_async_method("add_async", |params, _| { + async move { + let params: Vec = params.parse()?; + let sum: u64 = params.into_iter().sum(); + Ok(sum) + } + .boxed() }) .unwrap(); module.register_method("invalid_params", |_params, _| Err::<(), _>(CallError::InvalidParams)).unwrap(); @@ -310,6 +319,16 @@ async fn async_method_call_with_ok_context() { assert_eq!(response, ok_response("ok!".into(), Id::Num(1))); } +#[tokio::test] +async fn async_method_call_with_params() { + let addr = server().await; + let mut client = WebSocketTestClient::new(addr).await.unwrap(); + + let req = r#"{"jsonrpc":"2.0","method":"add_async", "params":[1, 2],"id":1}"#; + let response = client.send_request_text(req).await.unwrap(); + assert_eq!(response, ok_response(JsonValue::Number(3.into()), Id::Num(1))); +} + #[tokio::test] async fn async_method_call_that_fails() { let addr = server_with_context().await;