Skip to content

Commit

Permalink
Additional cleanup (#347)
Browse files Browse the repository at this point in the history
* Add weather example to show how to use subscriptions with context

* Add note

* Cleanup

* fmt

* Cleanup and docs

* fmt
  • Loading branch information
dvdplm authored May 28, 2021
1 parent 06d165e commit 4bd1959
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 24 deletions.
1 change: 0 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ env_logger = "0.8"
jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
log = "0.4"
tokio = { version = "1", features = ["full"] }
maplit = "1"
serde = "1"
serde_json = "1"
restson = "0.7"
Expand Down
43 changes: 21 additions & 22 deletions examples/weather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,24 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::net::SocketAddr;
use std::sync::Mutex;
use restson::{Error as RestsonError, RestPath};
use serde::{Deserialize, Serialize};
//! Example of setting up a subscription that polls a remote API, in this case the api.openweathermap.org/weather, and
//! sends the data back to the subscriber whenever the weather in London changes. The openweathermap API client is
//! passed at registration as part of the "context" object. We only want to send data on the subscription when the
//! weather actually changes, so we store the current weather in the context, hence the need for a `Mutex` to allow
//! mutation.

use jsonrpsee::{
ws_client::{traits::SubscriptionClient, v2::params::JsonRpcParams, WsClientBuilder},
ws_server::WsServer,
ws_server::RpcContextModule,
ws_server::WsServer,
};
use restson::{Error as RestsonError, RestPath};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Mutex;

// Set up the types to deserialize the weather data.
// See https://openweathermap.org/current for the details about the API used in this example.
#[derive(Deserialize, Serialize, Debug, Default, PartialEq)]
struct Weather {
name: String,
Expand All @@ -57,25 +65,14 @@ struct Wind {
deg: usize,
}


impl RestPath<&(String, String)> for Weather {
fn get_path(params: &(String, String)) -> Result<String, RestsonError> {
// Set up your own API key at https://openweathermap.org/current
const API_KEY: &'static str = "f6ba475df300d5f91135550da0f4a867";
Ok(
String::from(
format!("data/2.5/weather?q={}&units={}&appid={}",
params.0,
params.1,
API_KEY,
)
)
)
Ok(String::from(format!("data/2.5/weather?q={}&units={}&appid={}", params.0, params.1, API_KEY,)))
}
}

/// Example of setting up a subscription that polls a remote API, in this case the api.openweathermap.org/weather, and
/// sends the data back to the subscriber whenever the weather in London changes.

#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
Expand All @@ -84,7 +81,7 @@ async fn main() -> anyhow::Result<()> {

let client = WsClientBuilder::default().build(&url).await?;

// Subscription to the london weather
// Subscription to the London weather
let params = JsonRpcParams::Array(vec!["London,uk".into(), "metric".into()]);
let mut weather_sub = client.subscribe::<Weather>("weather_sub", params, "weather_unsub").await?;
while let Some(w) = weather_sub.next().await {
Expand All @@ -94,6 +91,8 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

/// The context passed on registration, used to store a REST client to query for the current weather and the current
/// "state".
struct WeatherApiCx {
api_client: restson::RestClient,
last_weather: Weather,
Expand All @@ -109,16 +108,16 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
module
.register_subscription_with_context("weather_sub", "weather_unsub", |params, sink, cx| {
let params: (String, String) = params.parse()?;
log::debug!(target: "server", "subscribed with params={:?}", params);
log::debug!(target: "server", "Subscribed with params={:?}", params);
std::thread::spawn(move || loop {
let mut cx = cx.lock().unwrap();
let current_weather: Weather = cx.api_client.get(&params).unwrap();
if current_weather != cx.last_weather {
log::debug!(target: "server", "Fetched London weather: {:?}, sending", current_weather);
sink.send(&current_weather).expect("Sending should work yes?");
sink.send(&current_weather).unwrap();
cx.last_weather = current_weather;
} else {
log::trace!(target: "server", "Same weather as before")
log::trace!(target: "server", "Same weather as before. Not sending.")
}
std::thread::sleep(std::time::Duration::from_millis(500));
});
Expand Down
2 changes: 1 addition & 1 deletion types/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ where
Ok(parsed) => return Some(parsed),
Err(e) => {
log::error!("Subscription response error: {:?}", e);
return None
return None;
}
},
None => return None,
Expand Down

0 comments on commit 4bd1959

Please sign in to comment.