Skip to content

Commit

Permalink
[ws server]: support for subscriptions with params (#336)
Browse files Browse the repository at this point in the history
* [ws server]: draft SubscriptionSinkWithParams

* rexport types

* PoC design2

* improve example

* Update ws-server/src/server.rs

Co-authored-by: David <dvdplm@gmail.com>

* Subscription example (#324)

* Add a test for calling methods with multiple params of multiple types (#308)

* Add a test for calling methods with multiple params of multiple types

* cargo fmt

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* [ws client] RegisterNotification support (#303)

* Rename NotifResponse to SubscriptionResponse to make room for new impl

* Add support for on_notification Subscription<T> types

* Fix handling of NotificationHandler in manager

* cleanup

* Implement NotificationHandler to replace Subscription<T> and clean up plumbing

* More cleanup

* impl Drop for NotificationHandler

* Address pr feedback #1

* ws client register_notification pr feedback 2

* Fix doc

* fix typo

* Add tests, get NH working

* More cleanup of String/&str

* fix doc

* Drop notification handler on send_back_sink error

* ws client notification auto unsubscribe when channel full test

* Change order of type params to register_method (#312)

* Change order of type params to register_method

* Cleanup and fmt

* Update ws-server/src/tests.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* CI: optimize caching (#317)

* Bump actions/checkout from 2 to 2.3.4 (#315)

Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 2.3.4.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](actions/checkout@v2...v2.3.4)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump actions-rs/cargo from 1 to 1.0.3 (#314)

Bumps [actions-rs/cargo](https://github.com/actions-rs/cargo) from 1 to 1.0.3.
- [Release notes](https://github.com/actions-rs/cargo/releases)
- [Changelog](https://github.com/actions-rs/cargo/blob/master/CHANGELOG.md)
- [Commits](actions-rs/cargo@v1...v1.0.3)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump actions-rs/toolchain from 1 to 1.0.7 (#313)

Bumps [actions-rs/toolchain](https://github.com/actions-rs/toolchain) from 1 to 1.0.7.
- [Release notes](https://github.com/actions-rs/toolchain/releases)
- [Changelog](https://github.com/actions-rs/toolchain/blob/master/CHANGELOG.md)
- [Commits](actions-rs/toolchain@v1...v1.0.7)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* [ws server]: add logs (#319)

* WIP - hangs

* fix example

* cleanup

* Add certificate_store() to WsClientBuilder (#321)

* Add custom_certificate to WsClientBuilder

* Use system certs instead of specified file

* Cache client_config

* Move client_config logic to fn build

* Default use_system_certificates to true

* Move out connector

* Add CertificateStore type

* cargo fmt

* cargo clippy

* Resolve comment: Rename variable

* Resolved comments

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
Co-authored-by: Billy Lindeman <billylindeman@gmail.com>
Co-authored-by: Denis Pisarev <denis.pisarev@parity.io>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Albin Hedman <albin9604@gmail.com>

* grumbles: impl maciej proposal

* fix test build

* add test for subscription with param

* cargo fmt

* Update examples/ws_subscription.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update utils/src/server/rpc_module.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update utils/src/server/rpc_module.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update utils/src/server/rpc_module.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update utils/src/server/rpc_module.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update utils/src/server/rpc_module.rs

Co-authored-by: David <dvdplm@gmail.com>

* grumbles

* Update utils/src/server/rpc_module.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update utils/src/server/rpc_module.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update utils/src/server/rpc_module.rs

Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* fix more grumbles

* [subscriptionSink]: introduce into_sinks

* use replace

* fix more nits

* maciej design 2

* fix tests

* remove log

* [rpc context mod]: register_subscription with ctx

* nits

* nits again

* move subscribers mutex

* clippy

* [ws subscribe]: avoid send message on unsubscribed

* revert unintentional changes

* Subscription with context example (#345)

* Add weather example to show how to use subscriptions with context

* Add note

* Cleanup

* Additional cleanup (#347)

* Add weather example to show how to use subscriptions with context

* Add note

* Cleanup

* fmt

* Cleanup and docs

* fmt

* ignore error on subscription

Co-authored-by: David Palm <dvdplm@gmail.com>
Co-authored-by: Billy Lindeman <billylindeman@gmail.com>
Co-authored-by: Denis Pisarev <denis.pisarev@parity.io>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Albin Hedman <albin9604@gmail.com>
Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>
  • Loading branch information
7 people authored May 28, 2021
1 parent 13c3a88 commit 4a29388
Show file tree
Hide file tree
Showing 12 changed files with 459 additions and 82 deletions.
11 changes: 11 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ env_logger = "0.8"
jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
log = "0.4"
tokio = { version = "1", features = ["full"] }
serde = "1"
serde_json = "1"
restson = "0.7"

[[example]]
name = "http"
Expand All @@ -25,6 +28,14 @@ path = "ws.rs"
name = "ws_subscription"
path = "ws_subscription.rs"

[[example]]
name = "ws_sub_with_params"
path = "ws_sub_with_params.rs"

[[example]]
name = "proc_macro"
path = "proc_macro.rs"

[[example]]
name = "weather"
path = "weather.rs"
133 changes: 133 additions & 0 deletions examples/weather.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Example of setting up a subscription that polls a remote API, in this case the api.openweathermap.org/weather, and
//! sends the data back to the subscriber whenever the weather in London changes. The openweathermap API client is
//! passed at registration as part of the "context" object. We only want to send data on the subscription when the
//! weather actually changes, so we store the current weather in the context, hence the need for a `Mutex` to allow
//! mutation.

use jsonrpsee::{
ws_client::{traits::SubscriptionClient, v2::params::JsonRpcParams, WsClientBuilder},
ws_server::RpcContextModule,
ws_server::WsServer,
};
use restson::{Error as RestsonError, RestPath};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Mutex;

// Set up the types to deserialize the weather data.
// See https://openweathermap.org/current for the details about the API used in this example.
#[derive(Deserialize, Serialize, Debug, Default, PartialEq)]
struct Weather {
name: String,
wind: Wind,
clouds: Clouds,
main: Main,
}
#[derive(Deserialize, Serialize, Debug, Default, PartialEq)]
struct Clouds {
all: usize,
}
#[derive(Deserialize, Serialize, Debug, Default, PartialEq)]
struct Main {
temp: f64,
pressure: usize,
humidity: usize,
}
#[derive(Deserialize, Serialize, Debug, Default, PartialEq)]
struct Wind {
speed: f64,
deg: usize,
}

impl RestPath<&(String, String)> for Weather {
fn get_path(params: &(String, String)) -> Result<String, RestsonError> {
// Set up your own API key at https://openweathermap.org/current
const API_KEY: &'static str = "f6ba475df300d5f91135550da0f4a867";
Ok(String::from(format!("data/2.5/weather?q={}&units={}&appid={}", params.0, params.1, API_KEY,)))
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
let addr = run_server().await?;
let url = format!("ws://{}", addr);

let client = WsClientBuilder::default().build(&url).await?;

// Subscription to the London weather
let params = JsonRpcParams::Array(vec!["London,uk".into(), "metric".into()]);
let mut weather_sub = client.subscribe::<Weather>("weather_sub", params, "weather_unsub").await?;
while let Some(w) = weather_sub.next().await {
println!("[client] London weather: {:?}", w);
}

Ok(())
}

/// The context passed on registration, used to store a REST client to query for the current weather and the current
/// "state".
struct WeatherApiCx {
api_client: restson::RestClient,
last_weather: Weather,
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = WsServer::new("127.0.0.1:0").await?;

let api_client = restson::RestClient::new("http://api.openweathermap.org").unwrap();
let last_weather = Weather::default();
let cx = Mutex::new(WeatherApiCx { api_client, last_weather });
let mut module = RpcContextModule::new(cx);
module
.register_subscription_with_context("weather_sub", "weather_unsub", |params, sink, cx| {
let params: (String, String) = params.parse()?;
log::debug!(target: "server", "Subscribed with params={:?}", params);
std::thread::spawn(move || loop {
let mut cx = cx.lock().unwrap();
let current_weather: Weather = cx.api_client.get(&params).unwrap();
if current_weather != cx.last_weather {
log::debug!(target: "server", "Fetched London weather: {:?}, sending", current_weather);
sink.send(&current_weather).unwrap();
cx.last_weather = current_weather;
} else {
log::trace!(target: "server", "Same weather as before. Not sending.")
}
std::thread::sleep(std::time::Duration::from_millis(500));
});
Ok(())
})
.unwrap();

server.register_module(module.into_module()).unwrap();

let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
Ok(addr)
}
81 changes: 81 additions & 0 deletions examples/ws_sub_with_params.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use jsonrpsee::{
ws_client::{traits::SubscriptionClient, v2::params::JsonRpcParams, WsClientBuilder},
ws_server::WsServer,
};
use std::net::SocketAddr;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
let addr = run_server().await?;
let url = format!("ws://{}", addr);

let client = WsClientBuilder::default().build(&url).await?;

// Subscription with a single parameter
let params = JsonRpcParams::Array(vec![3.into()]);
let mut sub_params_one = client.subscribe::<Option<char>>("sub_one_param", params, "unsub_one_param").await?;
println!("subscription with one param: {:?}", sub_params_one.next().await);

// Subscription with multiple parameters
let params = JsonRpcParams::Array(vec![2.into(), 5.into()]);
let mut sub_params_two = client.subscribe::<String>("sub_params_two", params, "unsub_params_two").await?;
println!("subscription with two params: {:?}", sub_params_two.next().await);

Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
const LETTERS: &'static str = "abcdefghijklmnopqrstuvxyz";
let mut server = WsServer::new("127.0.0.1:0").await?;
server
.register_subscription("sub_one_param", "unsub_one_param", |params, sink| {
let idx: usize = params.one()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS.chars().nth(idx));
std::thread::sleep(std::time::Duration::from_millis(50));
});
Ok(())
})
.unwrap();
server
.register_subscription("sub_params_two", "unsub_params_two", |params, sink| {
let (one, two): (usize, usize) = params.parse()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS[one..two].to_string());
std::thread::sleep(std::time::Duration::from_millis(100));
});
Ok(())
})
.unwrap();

let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
Ok(addr)
}
13 changes: 7 additions & 6 deletions examples/ws_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ async fn main() -> anyhow::Result<()> {

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = WsServer::new("127.0.0.1:0").await?;
let mut subscription = server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap();

std::thread::spawn(move || loop {
subscription.send(&"hello my friend").unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
});
server.register_subscription("subscribe_hello", "unsubscribe_hello", |_, sink| {
std::thread::spawn(move || loop {
sink.send(&"hello my friend").unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
});
Ok(())
})?;

let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
Expand Down
43 changes: 33 additions & 10 deletions tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,45 @@ pub async fn websocket_server_with_subscription() -> SocketAddr {
let rt = tokio::runtime::Runtime::new().unwrap();

let mut server = rt.block_on(WsServer::new("127.0.0.1:0")).unwrap();
let mut sub_hello = server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap();
let mut sub_foo = server.register_subscription("subscribe_foo", "unsubscribe_foo").unwrap();

server.register_method("say_hello", |_| Ok("hello")).unwrap();

server_started_tx.send(server.local_addr().unwrap()).unwrap();

rt.spawn(server.start());
server
.register_subscription("subscribe_hello", "unsubscribe_hello", |_, sink| {
std::thread::spawn(move || loop {
let _ = sink.send(&"hello from subscription");
std::thread::sleep(Duration::from_millis(50));
});
Ok(())
})
.unwrap();

server
.register_subscription("subscribe_foo", "unsubscribe_foo", |_, sink| {
std::thread::spawn(move || loop {
let _ = sink.send(&1337);
std::thread::sleep(Duration::from_millis(100));
});
Ok(())
})
.unwrap();

server
.register_subscription("subscribe_add_one", "unsubscribe_add_one", |params, sink| {
let mut count: usize = params.one()?;
std::thread::spawn(move || loop {
count = count.wrapping_add(1);
let _ = sink.send(&count);
std::thread::sleep(Duration::from_millis(100));
});
Ok(())
})
.unwrap();

rt.block_on(async move {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
server_started_tx.send(server.local_addr().unwrap()).unwrap();

sub_hello.send(&"hello from subscription").unwrap();
sub_foo.send(&1337_u64).unwrap();
}
server.start().await
});
});

Expand Down
14 changes: 14 additions & 0 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ async fn ws_subscription_works() {
}
}

#[tokio::test]
async fn ws_subscription_with_input_works() {
let server_addr = websocket_server_with_subscription().await;
let server_url = format!("ws://{}", server_addr);
let client = WsClientBuilder::default().build(&server_url).await.unwrap();
let mut add_one: Subscription<u64> =
client.subscribe("subscribe_add_one", vec![1.into()].into(), "unsubscribe_add_one").await.unwrap();

for i in 2..4 {
let next = add_one.next().await.unwrap();
assert_eq!(next, i);
}
}

#[tokio::test]
async fn ws_method_call_works() {
let server_addr = websocket_server().await;
Expand Down
4 changes: 3 additions & 1 deletion types/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ where
match self.notifs_rx.next().await {
Some(n) => match serde_json::from_value(n) {
Ok(parsed) => return Some(parsed),
Err(e) => log::debug!("Subscription response error: {:?}", e),
Err(e) => {
log::error!("Subscription response error: {:?}", e);
}
},
None => return None,
}
Expand Down
2 changes: 1 addition & 1 deletion types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub enum Error {
Request(String),
/// Frontend/backend channel error.
#[error("Frontend/backend channel error: {0}")]
Internal(#[source] futures_channel::mpsc::SendError),
Internal(#[from] futures_channel::mpsc::SendError),
/// Invalid response,
#[error("Invalid response: {0}")]
InvalidResponse(Mismatch<String>),
Expand Down
6 changes: 2 additions & 4 deletions types/src/v2/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ impl<'a> RpcParams<'a> {
where
T: Deserialize<'a>,
{
match self.0 {
None => Err(CallError::InvalidParams),
Some(params) => serde_json::from_str(params).map_err(|_| CallError::InvalidParams),
}
let params = self.0.unwrap_or("null");
serde_json::from_str(params).map_err(|_| CallError::InvalidParams)
}

/// Attempt to parse only the first parameter from an array into type T
Expand Down
Loading

0 comments on commit 4a29388

Please sign in to comment.