Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass OwnedRpcParams to async methods #410

Merged
merged 8 commits into from
Jul 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions proc-macros/src/new/render_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ impl RpcDescription {
if method.signature.sig.asyncness.is_some() {
quote! {
rpc.register_async_method(#rpc_method_name, |params, context| {
let owned_params = params.owned();
let fut = async move {
let params = owned_params.borrowed();
#parsing
Ok(context.as_ref().#rust_method_name(#params_seq).await)
};
Expand Down
208 changes: 107 additions & 101 deletions types/src/v2/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,71 @@ impl Serialize for TwoPointZero {
}
}

/// Parameters sent with the RPC request
#[derive(Clone, Copy, Debug)]
pub struct RpcParams<'a>(Option<&'a str>);
/// Parameters sent with the RPC request.
///
/// The data containing the params is a `Cow<&str>` and can either be a borrowed `&str` of JSON from an incoming
/// [`super::request::JsonRpcRequest`] (which in turn borrows it from the input buffer that is shared between requests);
/// or, it can be an owned `String`.
#[derive(Clone, Debug)]
pub struct RpcParams<'a>(Option<Cow<'a, str>>);

impl<'a> RpcParams<'a> {
/// Create params
pub fn new(raw: Option<&'a str>) -> Self {
Self(raw)
Self(raw.map(Into::into))
}

/// Obtain a sequence parser, [`RpcParamsSequence`].
///
/// This allows sequential parsing of the incoming params, using an `Iterator`-style API and is useful when the RPC
/// request has optional parameters at the tail that may or may not be present.
pub fn sequence(&self) -> RpcParamsSequence {
RpcParamsSequence(self.0.as_ref().map(AsRef::as_ref).unwrap_or(""))
}

/// Attempt to parse all parameters as an array or map into type `T`.
pub fn parse<T>(&'a self) -> Result<T, CallError>
where
T: Deserialize<'a>,
{
let params = self.0.as_ref().map(AsRef::as_ref).unwrap_or("null");
serde_json::from_str(params).map_err(|_| CallError::InvalidParams)
}

/// Attempt to parse parameters as an array of a single value of type `T`, and returns that value.
pub fn one<T>(&'a self) -> Result<T, CallError>
where
T: Deserialize<'a>,
{
self.parse::<[T; 1]>().map(|[res]| res)
}

/// Convert `RpcParams<'a>` to `RpcParams<'static>` so that it can be moved across threads.
///
/// This will cause an allocation if the params internally are using a borrowed JSON slice.
pub fn into_owned(self) -> RpcParams<'static> {
RpcParams(self.0.map(|s| Cow::owned(s.into_owned())))
}
}

/// An `Iterator`-like parser for a sequence of `RpcParams`.
///
/// This will parse the params one at a time, and allows for graceful handling of optional parameters at the tail; other
/// use cases are likely better served by [`RpcParams::parse`]. The reason this is not an actual [`Iterator`] is that
/// params parsing (often) yields values of different types.
#[derive(Debug)]
pub struct RpcParamsSequence<'a>(&'a str);

impl<'a> RpcParamsSequence<'a> {
fn next_inner<T>(&mut self) -> Option<Result<T, CallError>>
where
T: Deserialize<'a>,
{
let mut json = self.0?.trim_start();
let mut json = self.0.trim_start();

match json.as_bytes().get(0)? {
b']' => {
self.0 = None;
self.0 = "";

return None;
}
Expand All @@ -88,12 +134,12 @@ impl<'a> RpcParams<'a> {

match iter.next()? {
Ok(value) => {
self.0 = Some(&json[iter.byte_offset()..]);
self.0 = &json[iter.byte_offset()..];

Some(Ok(value))
}
Err(_) => {
self.0 = None;
self.0 = "";

Some(Err(CallError::InvalidParams))
}
Expand All @@ -104,11 +150,12 @@ impl<'a> RpcParams<'a> {
///
/// ```
/// # use jsonrpsee_types::v2::params::RpcParams;
/// let mut params = RpcParams::new(Some(r#"[true, 10, "foo"]"#));
/// let params = RpcParams::new(Some(r#"[true, 10, "foo"]"#));
/// let mut seq = params.sequence();
///
/// let a: bool = params.next().unwrap();
/// let b: i32 = params.next().unwrap();
/// let c: &str = params.next().unwrap();
/// let a: bool = seq.next().unwrap();
/// let b: i32 = seq.next().unwrap();
/// let c: &str = seq.next().unwrap();
///
/// assert_eq!(a, true);
/// assert_eq!(b, 10);
Expand All @@ -130,13 +177,14 @@ impl<'a> RpcParams<'a> {
///
/// ```
/// # use jsonrpsee_types::v2::params::RpcParams;
/// let mut params = RpcParams::new(Some(r#"[1, 2, null]"#));
/// let params = RpcParams::new(Some(r#"[1, 2, null]"#));
/// let mut seq = params.sequence();
///
/// let params: [Option<u32>; 4] = [
/// params.optional_next().unwrap(),
/// params.optional_next().unwrap(),
/// params.optional_next().unwrap(),
/// params.optional_next().unwrap(),
/// seq.optional_next().unwrap(),
/// seq.optional_next().unwrap(),
/// seq.optional_next().unwrap(),
/// seq.optional_next().unwrap(),
/// ];;
///
/// assert_eq!(params, [Some(1), Some(2), None, None]);
Expand All @@ -150,62 +198,21 @@ impl<'a> RpcParams<'a> {
None => Ok(None),
}
}

/// Attempt to parse all parameters as array or map into type `T`
pub fn parse<T>(self) -> Result<T, CallError>
where
T: Deserialize<'a>,
{
let params = self.0.unwrap_or("null");
serde_json::from_str(params).map_err(|_| CallError::InvalidParams)
}

/// Attempt to parse parameters as an array of a single value of type `T`, and returns that value.
pub fn one<T>(self) -> Result<T, CallError>
where
T: Deserialize<'a>,
{
self.parse::<[T; 1]>().map(|[res]| res)
}

/// Creates an owned version of parameters.
/// Required to simplify proc-macro implementation.
#[doc(hidden)]
pub fn owned(self) -> OwnedRpcParams {
self.into()
}
}

/// Owned version of [`RpcParams`].
#[derive(Clone, Debug)]
pub struct OwnedRpcParams(Option<String>);

impl OwnedRpcParams {
/// Converts `OwnedRpcParams` into borrowed [`RpcParams`].
pub fn borrowed(&self) -> RpcParams<'_> {
RpcParams(self.0.as_ref().map(|s| s.as_ref()))
}
}

impl<'a> From<RpcParams<'a>> for OwnedRpcParams {
fn from(borrowed: RpcParams<'a>) -> Self {
Self(borrowed.0.map(Into::into))
}
}

/// [Serializable JSON-RPC parameters](https://www.jsonrpc.org/specification#parameter_structures)
///
/// If your type implement `Into<JsonValue>` call that favor of `serde_json::to:value` to
/// construct the parameters. Because `serde_json::to_value` serializes the type which
/// allocates whereas `Into<JsonValue>` doesn't in most cases.
/// If your type implements `Into<JsonValue>`, call that in favor of `serde_json::to:value` to
/// construct the parameters. Because `serde_json::to_value` serializes the type which allocates
/// whereas `Into<JsonValue>` doesn't in most cases.
#[derive(Serialize, Debug, Clone)]
#[serde(untagged)]
pub enum JsonRpcParams<'a> {
/// No params.
NoParams,
/// Positional params (heap allocated)
/// Positional params (heap allocated).
Array(Vec<JsonValue>),
/// Positional params (slice)
/// Positional params (slice).
ArrayRef(&'a [JsonValue]),
/// Params by name.
Map(BTreeMap<&'a str, JsonValue>),
Expand Down Expand Up @@ -287,38 +294,15 @@ impl<'a> Id<'a> {
_ => None,
}
}
}

/// Owned version of [`Id`] that allocates memory for the `Str` variant.
#[derive(Debug, PartialEq, Clone, Hash, Eq, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(untagged)]
pub enum OwnedId {
/// Null
Null,
/// Numeric id
Number(u64),
/// String id
Str(String),
}

impl OwnedId {
/// Converts `OwnedId` into borrowed `Id`.
pub fn borrowed(&self) -> Id<'_> {
/// Convert `Id<'a>` to `Id<'static>` so that it can be moved across threads.
///
/// This can cause an allocation if the id is a string.
pub fn into_owned(self) -> Id<'static> {
match self {
Self::Null => Id::Null,
Self::Number(num) => Id::Number(*num),
Self::Str(str) => Id::Str(Cow::borrowed(str)),
}
}
}

impl<'a> From<Id<'a>> for OwnedId {
fn from(borrowed: Id<'a>) -> Self {
match borrowed {
Id::Null => Self::Null,
Id::Number(num) => Self::Number(num),
Id::Str(num) => Self::Str(num.as_ref().to_owned()),
Id::Null => Id::Null,
Id::Number(num) => Id::Number(num),
Id::Str(s) => Id::Str(Cow::owned(s.into_owned())),
}
}
}
Expand All @@ -333,7 +317,13 @@ mod test {
fn id_deserialization() {
let s = r#""2""#;
let deserialized: Id = serde_json::from_str(s).unwrap();
assert_eq!(deserialized, Id::Str("2".into()));
match deserialized {
Id::Str(ref cow) => {
assert!(cow.is_borrowed());
assert_eq!(cow, "2");
}
_ => panic!("Expected Id::Str"),
}

let s = r#"2"#;
let deserialized: Id = serde_json::from_str(s).unwrap();
Expand Down Expand Up @@ -378,17 +368,19 @@ mod test {

#[test]
fn params_parse() {
let mut none = RpcParams::new(None);
assert!(none.next::<u64>().is_err());
let none = RpcParams::new(None);
assert!(none.sequence().next::<u64>().is_err());

let mut array_params = RpcParams::new(Some("[1, 2, 3]"));
let array_params = RpcParams::new(Some("[1, 2, 3]"));
let arr: Result<[u64; 3], _> = array_params.parse();
assert!(arr.is_ok());

assert_eq!(array_params.next::<u64>().unwrap(), 1);
assert_eq!(array_params.next::<u64>().unwrap(), 2);
assert_eq!(array_params.next::<u64>().unwrap(), 3);
assert!(array_params.next::<u64>().is_err());
let mut seq = array_params.sequence();

assert_eq!(seq.next::<u64>().unwrap(), 1);
assert_eq!(seq.next::<u64>().unwrap(), 2);
assert_eq!(seq.next::<u64>().unwrap(), 3);
assert!(seq.next::<u64>().is_err());

let array_one = RpcParams::new(Some("[1]"));
let one: Result<u64, _> = array_one.one();
Expand All @@ -399,6 +391,20 @@ mod test {
assert!(obj.is_ok());
}

#[test]
fn params_sequence_borrows() {
let params = RpcParams::new(Some(r#"["foo", "bar"]"#));
let mut seq = params.sequence();

assert_eq!(seq.next::<&str>().unwrap(), "foo");
assert_eq!(seq.next::<&str>().unwrap(), "bar");
assert!(seq.next::<&str>().is_err());
Comment on lines +396 to +401
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the important test and the reason for adding a separate struct for handling sequence parsing: seq borrows the json from params, which allows it to produce slices with that lifetime, while &mut self borrows are transient.


// It's ok to parse the params again.
let params: (&str, &str) = params.parse().unwrap();
assert_eq!(params, ("foo", "bar"));
}

#[test]
fn two_point_zero_serde_works() {
let initial_ser = r#""2.0""#;
Expand Down
23 changes: 13 additions & 10 deletions utils/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use futures_util::{future::BoxFuture, FutureExt};
use jsonrpsee_types::error::{CallError, Error, SubscriptionClosedError};
use jsonrpsee_types::v2::error::{JsonRpcErrorCode, JsonRpcErrorObject, CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_types::v2::params::{
Id, JsonRpcSubscriptionParams, OwnedId, OwnedRpcParams, RpcParams, SubscriptionId as JsonRpcSubscriptionId,
TwoPointZero,
Id, JsonRpcSubscriptionParams, RpcParams, SubscriptionId as JsonRpcSubscriptionId, TwoPointZero,
};
use jsonrpsee_types::v2::request::{JsonRpcNotification, JsonRpcRequest};

Expand All @@ -22,7 +21,9 @@ use std::sync::Arc;
pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, RpcParams, &MethodSink, ConnectionId) -> Result<(), Error>>;
/// Similar to [`SyncMethod`], but represents an asynchronous handler.
pub type AsyncMethod = Arc<
dyn Send + Sync + Fn(OwnedId, OwnedRpcParams, MethodSink, ConnectionId) -> BoxFuture<'static, Result<(), Error>>,
dyn Send
+ Sync
+ Fn(Id<'static>, RpcParams<'static>, MethodSink, ConnectionId) -> BoxFuture<'static, Result<(), Error>>,
>;
/// Connection ID, used for stateful protocol such as WebSockets.
/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value.
Expand Down Expand Up @@ -60,8 +61,8 @@ impl MethodCallback {
MethodCallback::Sync(callback) => (callback)(req.id.clone(), params, tx, conn_id),
MethodCallback::Async(callback) => {
let tx = tx.clone();
let params = OwnedRpcParams::from(params);
let id = OwnedId::from(req.id);
let params = params.into_owned();
let id = req.id.into_owned();

(callback)(id, params, tx, conn_id).await
}
Expand Down Expand Up @@ -215,7 +216,11 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
pub fn register_async_method<R, F>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
R: Serialize + Send + Sync + 'static,
F: Fn(RpcParams, Arc<Context>) -> BoxFuture<'static, Result<R, CallError>> + Copy + Send + Sync + 'static,
F: Fn(RpcParams<'static>, Arc<Context>) -> BoxFuture<'static, Result<R, CallError>>
+ Copy
+ Send
+ Sync
+ 'static,
{
self.methods.verify_method_name(method_name)?;

Expand All @@ -226,8 +231,6 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
MethodCallback::Async(Arc::new(move |id, params, tx, _| {
let ctx = ctx.clone();
let future = async move {
let params = params.borrowed();
let id = id.borrowed();
match callback(params, ctx).await {
Ok(res) => send_response(id, &tx, res),
Err(CallError::InvalidParams) => send_error(id, &tx, JsonRpcErrorCode::InvalidParams.into()),
Expand Down Expand Up @@ -267,8 +270,8 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// use jsonrpsee_utils::server::rpc_module::RpcModule;
///
/// let mut ctx = RpcModule::new(99_usize);
/// ctx.register_subscription("sub", "unsub", |mut params, mut sink, ctx| {
/// let x: usize = params.next()?;
/// ctx.register_subscription("sub", "unsub", |params, mut sink, ctx| {
/// let x: usize = params.one()?;
/// std::thread::spawn(move || {
/// let sum = x + (*ctx);
/// sink.send(&sum)
Expand Down
Loading