Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): JSON-RPC specific middleware #1215

Merged
merged 33 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a2858ff
refactor new jsonrpc middleware
niklasad1 Sep 11, 2023
0bdbfd9
add jsonrpsee specific service trait
niklasad1 Oct 13, 2023
679543c
use tower::ServiceBuilder for composable middleware
niklasad1 Oct 13, 2023
1c2a8a6
revert changelog
niklasad1 Oct 13, 2023
264944c
fix nits
niklasad1 Oct 13, 2023
ed2ea57
types: impl Clone
niklasad1 Oct 16, 2023
461fed9
clarify examples
niklasad1 Oct 16, 2023
013f045
Update server/src/middleware/mod.rs
niklasad1 Oct 16, 2023
a8eb284
Update server/src/transport/http.rs
niklasad1 Oct 16, 2023
5c003a0
remove some boiler plate
niklasad1 Oct 16, 2023
7432768
add back logging
niklasad1 Oct 16, 2023
cb627ae
remove needless Arc
niklasad1 Oct 17, 2023
0fc11a3
remove clone bounds for Middleware
niklasad1 Oct 17, 2023
5b94aa8
add wrapper for tower::ServiceBuilder
niklasad1 Oct 17, 2023
6e9e308
fix docs
niklasad1 Oct 17, 2023
1ac49a1
add modify request example
niklasad1 Oct 17, 2023
d1dbce3
add rate limit example
niklasad1 Oct 17, 2023
2b10662
fix some nits in rate limiting middleware example
niklasad1 Oct 18, 2023
be0103d
Meta -> Context
niklasad1 Oct 18, 2023
4376602
restruct middleware module
niklasad1 Oct 19, 2023
9462220
Merge remote-tracking branch 'origin/master' into na-new-jsonrpc-midd…
niklasad1 Oct 19, 2023
c182c1f
fix broken links
niklasad1 Oct 20, 2023
cad5ffc
Merge remote-tracking branch 'origin/master' into na-new-jsonrpc-midd…
niklasad1 Oct 20, 2023
84c2787
Merge remote-tracking branch 'origin/master' into na-new-jsonrpc-midd…
niklasad1 Nov 2, 2023
938ff4f
grumbles: remove `Context`
niklasad1 Nov 2, 2023
1f47102
fix tests
niklasad1 Nov 2, 2023
6455325
Update examples/examples/rpc_middleware_modify_request.rs
niklasad1 Nov 2, 2023
a5251df
fix grumbles
niklasad1 Nov 2, 2023
1c0eb53
Merge remote-tracking branch 'origin/na-new-jsonrpc-middleware' into …
niklasad1 Nov 2, 2023
5bd5cc5
grumbles: adjust docs for method response
niklasad1 Nov 3, 2023
c207d61
grumbles: MethodKind::Unknown -> NotFound
niklasad1 Nov 3, 2023
c1122b7
Update examples/examples/http_middleware.rs
niklasad1 Nov 3, 2023
abb104b
fix some nits in examples
niklasad1 Nov 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<L> HttpClientBuilder<L> {
}

/// Set custom tower middleware.
pub fn set_middleware<T>(self, service_builder: tower::ServiceBuilder<T>) -> HttpClientBuilder<T> {
pub fn set_http_middleware<T>(self, service_builder: tower::ServiceBuilder<T>) -> HttpClientBuilder<T> {
HttpClientBuilder {
certificate_store: self.certificate_store,
id_kind: self.id_kind,
Expand Down
42 changes: 35 additions & 7 deletions core/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ pub struct MethodResponse {
pub result: String,
/// Indicates whether the call was successful or not.
pub success_or_error: MethodResponseResult,
/// Indicates whether the call was a subscription response.
pub is_subscription: bool,
}

impl MethodResponse {
Expand All @@ -192,6 +194,11 @@ impl MethodResponse {
pub fn is_error(&self) -> bool {
self.success_or_error.is_success()
}

/// Returns whether the call is a subscription.
pub fn is_subscription(&self) -> bool {
self.is_subscription
}
}

/// Represent the outcome of a method call success or failed.
Expand Down Expand Up @@ -226,6 +233,16 @@ impl MethodResponseResult {
}

impl MethodResponse {
/// Create a subscription response.
pub fn subscription_response<T>(id: Id, result: ResponsePayload<T>, max_response_size: usize) -> Self
where
T: Serialize + Clone,
{
let mut rp = Self::response(id, result, max_response_size);
rp.is_subscription = true;
rp
}

/// Send a JSON-RPC response to the client. If the serialization of `result` exceeds `max_response_size`,
/// an error will be sent instead.
pub fn response<T>(id: Id, result: ResponsePayload<T>, max_response_size: usize) -> Self
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I wonder whether it's worth renaming this to method_response to try and make it feel like "not the default one to use", and/or maybe just a tweak to docs like "If the response is from a subscription, use subscription_response", orr ditching subscription_response and instead adding another arg to response (ie is_subscription: bool) so that every call has to be explicit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it makes sense.

I just added subscription_response and never thought about it. I was just happy that it worked :P

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too fussed, having looked at the rest of the code (maybe just add the doc comment?), so whatever you prefer :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, a subscription response is basically just a method response but in some scenarios in the server it may be useful to know and in middleware doing grafana metrics etc.

I think I want to keep response but lemme adjust the docs a little bit better.

Expand All @@ -245,7 +262,7 @@ impl MethodResponse {
// Safety - serde_json does not emit invalid UTF-8.
let result = unsafe { String::from_utf8_unchecked(writer.into_bytes()) };

Self { result, success_or_error }
Self { result, success_or_error, is_subscription: false }
}
Err(err) => {
tracing::error!("Error serializing response: {:?}", err);
Expand All @@ -262,24 +279,35 @@ impl MethodResponse {
let result =
serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed");

Self { result, success_or_error: MethodResponseResult::Failed(err_code) }
Self { result, success_or_error: MethodResponseResult::Failed(err_code), is_subscription: false }
} else {
let err_code = ErrorCode::InternalError;
let result = serde_json::to_string(&Response::new(err_code.into(), id))
.expect("JSON serialization infallible; qed");
Self { result, success_or_error: MethodResponseResult::Failed(err_code.code()) }
Self {
result,
success_or_error: MethodResponseResult::Failed(err_code.code()),
is_subscription: false,
}
}
}
}
}

/// Create a subscription response error.
pub fn subscription_error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
let mut rp = Self::error(id, err);
rp.is_subscription = true;
rp
}

/// Create a `MethodResponse` from an error.
pub fn error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
let err: ErrorObject = err.into();
let err_code = err.code();
let err = ResponsePayload::error_borrowed(err);
let result = serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed");
Self { result, success_or_error: MethodResponseResult::Failed(err_code) }
Self { result, success_or_error: MethodResponseResult::Failed(err_code), is_subscription: false }
}
}

Expand All @@ -305,13 +333,13 @@ impl BatchResponseBuilder {
///
/// Fails if the max limit is exceeded and returns to error response to
/// return early in order to not process method call responses which are thrown away anyway.
pub fn append(&mut self, response: &MethodResponse) -> Result<(), String> {
pub fn append(&mut self, response: &MethodResponse) -> Result<(), MethodResponse> {
// `,` will occupy one extra byte for each entry
// on the last item the `,` is replaced by `]`.
let len = response.result.len() + self.result.len() + 1;

if len > self.max_response_size {
Err(batch_response_error(Id::Null, reject_too_big_batch_response(self.max_response_size)))
Err(MethodResponse::error(Id::Null, reject_too_big_batch_response(self.max_response_size)))
} else {
self.result.push_str(&response.result);
self.result.push(',');
Expand Down Expand Up @@ -409,6 +437,6 @@ mod tests {
let batch = BatchResponseBuilder::new_with_limit(63).append(&method).unwrap_err();

let exp_err = r#"{"jsonrpc":"2.0","error":{"code":-32011,"message":"The batch response was too large","data":"Exceeded max limit of 63"},"id":null}"#;
assert_eq!(batch, exp_err);
assert_eq!(batch.result, exp_err);
}
}
38 changes: 37 additions & 1 deletion core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,32 @@ pub enum MethodCallback {
Unsubscription(UnsubscriptionMethod),
}

/// The kind of the JSON-RPC method call, it can be a subscription, method call or unknown.
#[derive(Debug, Copy, Clone)]
pub enum MethodKind {
/// Subscription Call.
Subscription,
/// Unsubscription Call.
Unsubscription,
/// Method call.
MethodCall,
/// Unknown method.
Unknown,
}

impl std::fmt::Display for MethodKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::Subscription => "subscription",
Self::MethodCall => "method call",
Self::Unknown => "unknown",
Self::Unsubscription => "unsubscription",
};

write!(f, "{s}")
}
}

/// Result of a method, either direct value or a future of one.
pub enum MethodResult<T> {
/// Result by value
Expand Down Expand Up @@ -219,6 +245,16 @@ impl Methods {
self.callbacks.get_key_value(method_name).map(|(k, v)| (*k, v))
}

/// Returns the kind of the method callback,
pub fn method_kind(&self, method_name: &str) -> MethodKind {
match self.method(method_name) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of MethodKind::Unknown, wouyld it make more sense to return Option<MethodKind> here? (or alternately, rename it to somehting clearer eg MethodKind::NotFound?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, this code is not used so I removed it but I guess your comment still applies.

Lemme fix it NotFound is better

None => MethodKind::Unknown,
Some(MethodCallback::Async(_)) | Some(MethodCallback::Sync(_)) => MethodKind::MethodCall,
Some(MethodCallback::Subscription(_)) => MethodKind::Subscription,
Some(MethodCallback::Unsubscription(_)) => MethodKind::Unsubscription,
}
}

/// Helper to call a method on the `RPC module` without having to spin up a server.
///
/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
Expand Down Expand Up @@ -307,7 +343,7 @@ impl Methods {
) -> RawRpcResponse {
let (tx, mut rx) = mpsc::channel(buf_size);
let id = req.id.clone();
let params = Params::new(req.params.map(|params| params.get()));
let params = Params::new(req.params.as_ref().map(|params| params.as_ref().get()));

let response = match self.method(&req.method) {
None => MethodResponse::error(req.id, ErrorObject::from(ErrorCode::MethodNotFound)),
Expand Down
7 changes: 5 additions & 2 deletions core/src/server/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl PendingSubscriptionSink {
/// the return value is simply ignored because no further notification are propagated
/// once reject has been called.
pub async fn reject(self, err: impl Into<ErrorObjectOwned>) {
let err = MethodResponse::error(self.id, err.into());
let err = MethodResponse::subscription_error(self.id, err.into());
_ = self.inner.send(err.result.clone()).await;
_ = self.subscribe.send(err);
}
Expand All @@ -269,7 +269,7 @@ impl PendingSubscriptionSink {
///
/// Panics if the subscription response exceeded the `max_response_size`.
pub async fn accept(self) -> Result<SubscriptionSink, PendingSubscriptionAcceptError> {
let response = MethodResponse::response(
let response = MethodResponse::subscription_response(
self.id,
ResponsePayload::result_borrowed(&self.uniq_sub.sub_id),
self.inner.max_response_size() as usize,
Expand Down Expand Up @@ -438,6 +438,9 @@ impl Subscription {
let raw = self.rx.recv().await?;

tracing::debug!("[Subscription::next]: rx {}", raw);

// clippy complains about this but it doesn't compile without the extra res binding.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh weird; what's the error?!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lifetime doesn't live long enough, it may be some issue with my own serde impl I did a long time ago.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange though that nothing changes at all whether the variable is there or not! I wouldn't expect any difference since you haven't given any extra type info on the variable or whatever :)

#[allow(clippy::let_and_return)]
let res = match serde_json::from_str::<SubscriptionResponse<T>>(&raw) {
Ok(r) => Some(Ok((r.params.result, r.params.subscription.into_owned()))),
Err(e) => match serde_json::from_str::<SubscriptionError<serde_json::Value>>(&raw) {
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/cors_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
// modifying requests / responses. These features are independent of one another
// and can also be used separately.
// In this example, we use both features.
let server = Server::builder().set_middleware(middleware).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;
let server = Server::builder().set_http_middleware(middleware).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;

let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| {
Expand Down
5 changes: 3 additions & 2 deletions examples/examples/host_filter_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::net::SocketAddr;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::rpc_params;
use jsonrpsee::server::middleware::HostFilterLayer;
use jsonrpsee::server::middleware::http::HostFilterLayer;
use jsonrpsee::server::{RpcModule, Server};

#[tokio::main]
Expand Down Expand Up @@ -65,7 +65,8 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
// `HostFilerLayer::new` only fails on invalid URIs..
.layer(HostFilterLayer::new(["example.com"]).unwrap());

let server = Server::builder().set_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;
let server =
Server::builder().set_http_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;

let addr = server.local_addr()?;

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn main() -> anyhow::Result<()> {
.on_response(DefaultOnResponse::new().include_headers(true).latency_unit(LatencyUnit::Micros)),
);

let client = HttpClientBuilder::default().set_middleware(middleware).build(url)?;
let client = HttpClientBuilder::default().set_http_middleware(middleware).build(url)?;
let params = rpc_params![1_u64, 2, 3];
let response: Result<String, _> = client.request("say_hello", params).await;
tracing::info!("r: {:?}", response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! This example sets a custom tower service middleware to the RPC implementation.
//! jsonrpsee supports two kinds of middlewares `http_middleware` and `rpc_middleware`.
//!
//! This example demonstrates how to use the `http_middleware` which applies for each
//! HTTP request.
//!
//! A typical use-case for this to apply a specific CORS policy which applies both
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
//! for HTTP and WebSocket.
//!
//! It works with both `WebSocket` and `HTTP` which is done in the example.

use hyper::body::Bytes;
use hyper::http::HeaderValue;
Expand Down Expand Up @@ -104,7 +109,8 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
.layer(cors)
.timeout(Duration::from_secs(2));

let server = Server::builder().set_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;
let server =
Server::builder().set_http_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;

let addr = server.local_addr()?;

Expand Down
5 changes: 3 additions & 2 deletions examples/examples/http_proxy_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use std::time::Duration;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::rpc_params;
use jsonrpsee::server::middleware::ProxyGetRequestLayer;
use jsonrpsee::server::middleware::http::ProxyGetRequestLayer;
use jsonrpsee::server::{RpcModule, Server};

#[tokio::main]
Expand Down Expand Up @@ -87,7 +87,8 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
.layer(ProxyGetRequestLayer::new("/health", "system_health")?)
.timeout(Duration::from_secs(2));

let server = Server::builder().set_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;
let server =
Server::builder().set_http_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;

let addr = server.local_addr()?;

Expand Down
Loading
Loading