Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ws-server] Batch support #300

Merged
merged 15 commits into from
May 7, 2021
12 changes: 10 additions & 2 deletions http-server/src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl RpcModule {
pub fn register_method<F, R>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
R: Serialize,
F: RpcMethod<R, InvalidParams>,
F: RpcMethod<R, CallError>,
{
self.verify_method_name(method_name)?;

Expand All @@ -45,7 +45,15 @@ impl RpcModule {
Box::new(move |id, params, tx, _| {
match callback(params) {
Ok(res) => send_response(id, tx, res),
Err(InvalidParams) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()),
// TODO: this looks wonky...
Err(CallError::InvalidParams(InvalidParams)) => {
send_error(id, tx, JsonRpcErrorCode::InvalidParams.into())
}
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
Err(CallError::Failed(e)) => {
// TODO: do something smart(-er) with the error?
log::error!("Call failed with: {}", e);
send_error(id, tx, JsonRpcErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE).into())
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
}
};

Ok(())
Expand Down
69 changes: 24 additions & 45 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ use hyper::{
service::{make_service_fn, service_fn},
Error as HyperError,
};
use jsonrpsee_types::error::{Error, GenericTransportError, InvalidParams};
use jsonrpsee_types::error::{CallError, Error, GenericTransportError};
use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest};
use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams};
use jsonrpsee_utils::{
hyper_helpers::read_response_to_body,
server::{send_error, RpcSender},
server::{collect_batch_responses, send_error, RpcSender},
};
use serde::Serialize;
use serde_json::value::RawValue;
use socket2::{Domain, Socket, Type};
use std::{
cmp,
Expand Down Expand Up @@ -129,7 +128,7 @@ impl Server {
pub fn register_method<F, R>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
R: Serialize,
F: Fn(RpcParams) -> Result<R, InvalidParams> + Send + Sync + 'static,
F: Fn(RpcParams) -> Result<R, CallError> + Send + Sync + 'static,
{
self.root.register_method(method_name, callback)
}
Expand Down Expand Up @@ -162,23 +161,23 @@ impl Server {
// Look up the "method" (i.e. function pointer) from the registered methods and run it passing in
// the params from the request. The result of the computation is sent back over the `tx` channel and
// the result(s) are collected into a `String` and sent back over the wire.
let execute =
move |id: Option<&RawValue>, tx: RpcSender, method_name: &str, params: Option<&RawValue>| {
if let Some(method) = methods.get(method_name) {
let params = RpcParams::new(params.map(|params| params.get()));
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`.
if let Err(err) = (method)(id, params, &tx, 0) {
log::error!(
"execution of method call '{}' failed: {:?}, request id={:?}",
method_name,
err,
id
);
}
} else {
send_error(id, tx, JsonRpcErrorCode::MethodNotFound.into());
let execute = move |tx: RpcSender, req: JsonRpcRequest| {
if let Some(method) = methods.get(&*req.method) {
let params = RpcParams::new(req.params.map(|params| params.get()));
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`.
if let Err(err) = (method)(req.id, params, &tx, 0) {
log::error!(
"execution of method call '{}' failed: {:?}, request id={:?}",
req.method,
err,
req.id
);
send_error(req.id, &tx, JsonRpcErrorCode::ServerError(-1).into());
}
};
} else {
send_error(req.id, &tx, JsonRpcErrorCode::MethodNotFound.into());
}
};

// Run some validation on the http request, then read the body and try to deserialize it into one of
// two cases: a single RPC request or a batch of RPC requests.
Expand All @@ -203,7 +202,7 @@ impl Server {
};

// NOTE(niklasad1): it's a channel because it's needed for batch requests.
let (tx, mut rx) = mpsc::unbounded();
let (tx, mut rx) = mpsc::unbounded::<String>();
// Is this a single request or a batch (or error)?
let mut single = true;

Expand All @@ -213,15 +212,13 @@ impl Server {
// batch case and lastly the error. For the worst case – unparseable input – we make three calls
// to [`serde_json::from_slice`] which is pretty annoying.
// Our [issue](https://github.com/paritytech/jsonrpsee/issues/296).
if let Ok(JsonRpcRequest { id, method: method_name, params, .. }) =
serde_json::from_slice::<JsonRpcRequest>(&body)
{
execute(id, &tx, &method_name, params);
if let Ok(req) = serde_json::from_slice::<JsonRpcRequest>(&body) {
execute(&tx, req);
} else if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&body) {
if !batch.is_empty() {
single = false;
for JsonRpcRequest { id, method: method_name, params, .. } in batch {
execute(id, &tx, &method_name, params);
for req in batch {
execute(&tx, req);
}
} else {
send_error(None, &tx, JsonRpcErrorCode::InvalidRequest.into());
Expand Down Expand Up @@ -257,24 +254,6 @@ impl Server {
}
}

// Collect the results of all computations sent back on the ['Stream'] into a single `String` appropriately wrapped in
// `[`/`]`.
async fn collect_batch_responses(rx: mpsc::UnboundedReceiver<String>) -> String {
let mut buf = String::with_capacity(2048);
buf.push('[');
let mut buf = rx
.fold(buf, |mut acc, response| async {
acc = [acc, response].concat();
acc.push(',');
acc
})
.await;
// Remove trailing comma
buf.pop();
buf.push(']');
buf
}

// Checks to that access control of the received request is the same as configured.
fn access_control_is_valid(
access_control: &AccessControl,
Expand Down
7 changes: 7 additions & 0 deletions test-utils/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ pub fn internal_error(id: Id) -> String {
)
}

pub fn server_error(id: Id) -> String {
format!(
r#"{{"jsonrpc":"2.0","error":{{"code":-32000,"message":"Server error"}},"id":{}}}"#,
serde_json::to_string(&id).unwrap()
)
}

/// Hardcoded server response when a client initiates a new subscription.
///
/// NOTE: works only for one subscription because the subscription ID is hardcoded.
Expand Down
12 changes: 12 additions & 0 deletions test-utils/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ impl WebSocketTestClient {
String::from_utf8(data).map_err(Into::into)
}

// TODO: this is completely wrong, the batch response should be sent back in a single response, clients should not
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
// read multiple times like this.
pub async fn send_batch(&mut self, msg: impl AsRef<str>, batch_len: usize) -> Result<String, Error> {
self.tx.send_text(msg).await?;
self.tx.flush().await?;
let mut data = Vec::new();
for _ in 0..batch_len {
self.rx.receive_data(&mut data).await?;
}
String::from_utf8(data).map_err(Into::into)
}

pub async fn send_request_binary(&mut self, msg: &[u8]) -> Result<String, Error> {
self.tx.send_binary(msg).await?;
self.tx.flush().await?;
Expand Down
19 changes: 19 additions & 0 deletions utils/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Shared helpers for JSON-RPC Servers.

use futures_channel::mpsc;
use futures_util::stream::StreamExt;
use jsonrpsee_types::v2::error::{JsonRpcError, JsonRpcErrorCode, JsonRpcErrorObject};
use jsonrpsee_types::v2::params::{RpcParams, TwoPointZero};
use jsonrpsee_types::v2::response::JsonRpcResponse;
Expand Down Expand Up @@ -50,3 +51,21 @@ pub fn send_error(id: RpcId, tx: RpcSender, error: JsonRpcErrorObject) {
log::error!("Error sending response to the client: {:?}", err)
}
}

/// Read all the results of all method calls in a batch request from the ['Stream']. Format the result into a single
/// `String` appropriately wrapped in `[`/`]`.
pub async fn collect_batch_responses(rx: mpsc::UnboundedReceiver<String>) -> String {
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
let mut buf = String::with_capacity(2048);
buf.push('[');
let mut buf = rx
.fold(buf, |mut acc, response| async {
acc = [acc, response].concat();
acc.push(',');
acc
})
.await;
// Remove trailing comma
buf.pop();
buf.push(']');
buf
}
78 changes: 57 additions & 21 deletions ws-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

use futures_channel::mpsc;
use futures_util::io::{BufReader, BufWriter};
use futures_util::stream::StreamExt;
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use serde::Serialize;
Expand All @@ -34,14 +35,14 @@ use soketto::handshake::{server::Response, Server as SokettoServer};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::{TcpListener, ToSocketAddrs};
use tokio_stream::{wrappers::TcpListenerStream, StreamExt};
use tokio_stream::wrappers::TcpListenerStream;
use tokio_util::compat::TokioAsyncReadCompatExt;

use jsonrpsee_types::error::{Error, InvalidParams};
use jsonrpsee_types::error::{CallError, Error};
use jsonrpsee_types::v2::error::JsonRpcErrorCode;
use jsonrpsee_types::v2::params::{JsonRpcNotificationParams, RpcParams, TwoPointZero};
use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcNotification, JsonRpcRequest};
use jsonrpsee_utils::server::{send_error, ConnectionId, Methods};
use jsonrpsee_utils::server::{collect_batch_responses, send_error, ConnectionId, Methods, RpcSender};

mod module;

Expand Down Expand Up @@ -105,7 +106,7 @@ impl Server {
pub fn register_method<F, R>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
R: Serialize,
F: Fn(RpcParams) -> Result<R, InvalidParams> + Send + Sync + 'static,
F: Fn(RpcParams) -> Result<R, CallError> + Send + Sync + 'static,
{
self.root.register_method(method_name, callback)
}
Expand Down Expand Up @@ -149,7 +150,11 @@ impl Server {
}
}

async fn background_task(socket: tokio::net::TcpStream, methods: Arc<Methods>, id: ConnectionId) -> anyhow::Result<()> {
async fn background_task(
socket: tokio::net::TcpStream,
methods: Arc<Methods>,
conn_id: ConnectionId,
) -> anyhow::Result<()> {
// For each incoming background_task we perform a handshake.
let mut server = SokettoServer::new(BufReader::new(BufWriter::new(socket.compat())));

Expand All @@ -166,38 +171,69 @@ async fn background_task(socket: tokio::net::TcpStream, methods: Arc<Methods>, i
let (mut sender, mut receiver) = server.into_builder().finish();
let (tx, mut rx) = mpsc::unbounded::<String>();

// Send results back to the client.
tokio::spawn(async move {
while let Some(response) = rx.next().await {
let _ = sender.send_binary_mut(response.into_bytes()).await;
let _ = sender.flush().await;
}
});

let mut data = Vec::new();
let mut data = Vec::with_capacity(100);

// Look up the "method" (i.e. function pointer) from the registered methods and run it passing in
// the params from the request. The result of the computation is sent back over the `tx` channel and
// the result(s) are collected into a `String` and sent back over the wire.
let execute = move |tx: RpcSender, req: JsonRpcRequest| {
if let Some(method) = methods.get(&*req.method) {
let params = RpcParams::new(req.params.map(|params| params.get()));
if let Err(err) = (method)(req.id, params, &tx, conn_id) {
log::error!("execution of method call '{}' failed: {:?}, request id={:?}", req.method, err, req.id);
send_error(req.id, &tx, JsonRpcErrorCode::ServerError(-1).into());
}
} else {
send_error(req.id, &tx, JsonRpcErrorCode::MethodNotFound.into());
}
};

loop {
data.clear();

receiver.receive_data(&mut data).await?;

match serde_json::from_slice::<JsonRpcRequest>(&data) {
Ok(req) => {
let params = RpcParams::new(req.params.map(|params| params.get()));

if let Some(method) = methods.get(&*req.method) {
(method)(req.id, params, &tx, id)?;
} else {
send_error(req.id, &tx, JsonRpcErrorCode::MethodNotFound.into());
// For reasons outlined [here](https://github.com/serde-rs/json/issues/497), `RawValue` can't be
// used with untagged enums at the moment. This means we can't use an `SingleOrBatch` untagged
// enum here and have to try each case individually: first the single request case, then the
// batch case and lastly the error. For the worst case – unparseable input – we make three calls
// to [`serde_json::from_slice`] which is pretty annoying.
// Our [issue](https://github.com/paritytech/jsonrpsee/issues/296).
if let Ok(req) = serde_json::from_slice::<JsonRpcRequest>(&data) {
execute(&tx, req);
} else if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&data) {
if !batch.is_empty() {
// Batch responses must be sent back as a single message so we the results from each request in the
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
// batch and read the results off of a new channel, `rx2`, and then send the complete batch response
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
// back to the client over `tx`.
let (tx2, mut rx2) = mpsc::unbounded::<String>();
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
for req in batch {
execute(&tx2, req);
}
// TODO: add a test with a slow method call to prove this is correct.
rx2.close();
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
let results = collect_batch_responses(rx2).await;
if let Err(err) = tx.unbounded_send(results) {
log::error!("Error sending batch response to the client: {:?}", err)
}
} else {
send_error(None, &tx, JsonRpcErrorCode::InvalidRequest.into());
}
Err(_) => {
let (id, code) = match serde_json::from_slice::<JsonRpcInvalidRequest>(&data) {
Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest),
Err(_) => (None, JsonRpcErrorCode::ParseError),
};
} else {
let (id, code) = match serde_json::from_slice::<JsonRpcInvalidRequest>(&data) {
Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest),
Err(_) => (None, JsonRpcErrorCode::ParseError),
};

send_error(id, &tx, code.into());
}
send_error(id, &tx, code.into());
}
}
}
12 changes: 10 additions & 2 deletions ws-server/src/server/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl RpcModule {
pub fn register_method<F, R>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
R: Serialize,
F: RpcMethod<R, InvalidParams>,
F: RpcMethod<R, CallError>,
{
self.verify_method_name(method_name)?;

Expand All @@ -47,7 +47,15 @@ impl RpcModule {
Box::new(move |id, params, tx, _| {
match callback(params) {
Ok(res) => send_response(id, tx, res),
Err(InvalidParams) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()),
// TODO: this looks wonky...
Err(CallError::InvalidParams(InvalidParams)) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this InvalidParams type and just use to InvalidParams variant,

perhaps link to #299

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thus, if CallError would implement a trait or something JsonRpcErrorCode then you could just to err.to_error_code() or something similar

send_error(id, tx, JsonRpcErrorCode::InvalidParams.into())
}
Err(CallError::Failed(e)) => {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
// TODO: do something smart(-er) with the error?
log::error!("Call failed with: {}", e);
send_error(id, tx, JsonRpcErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE).into())
}
};

Ok(())
Expand Down
Loading