Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional cleanup #347

Merged
merged 7 commits into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ env_logger = "0.8"
jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
log = "0.4"
tokio = { version = "1", features = ["full"] }
maplit = "1"
serde = "1"
serde_json = "1"
restson = "0.7"
Expand Down
43 changes: 21 additions & 22 deletions examples/weather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,24 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::net::SocketAddr;
use std::sync::Mutex;
use restson::{Error as RestsonError, RestPath};
use serde::{Deserialize, Serialize};
//! Example of setting up a subscription that polls a remote API, in this case the api.openweathermap.org/weather, and
//! sends the data back to the subscriber whenever the weather in London changes. The openweathermap API client is
//! passed at registration as part of the "context" object. We only want to send data on the subscription when the
//! weather actually changes, so we store the current weather in the context, hence the need for a `Mutex` to allow
//! mutation.

use jsonrpsee::{
ws_client::{traits::SubscriptionClient, v2::params::JsonRpcParams, WsClientBuilder},
ws_server::WsServer,
ws_server::RpcContextModule,
ws_server::WsServer,
};
use restson::{Error as RestsonError, RestPath};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Mutex;

// Set up the types to deserialize the weather data.
// See https://openweathermap.org/current for the details about the API used in this example.
#[derive(Deserialize, Serialize, Debug, Default, PartialEq)]
struct Weather {
name: String,
Expand All @@ -57,25 +65,14 @@ struct Wind {
deg: usize,
}


impl RestPath<&(String, String)> for Weather {
fn get_path(params: &(String, String)) -> Result<String, RestsonError> {
// Set up your own API key at https://openweathermap.org/current
const API_KEY: &'static str = "f6ba475df300d5f91135550da0f4a867";
Ok(
String::from(
format!("data/2.5/weather?q={}&units={}&appid={}",
params.0,
params.1,
API_KEY,
)
)
)
Ok(String::from(format!("data/2.5/weather?q={}&units={}&appid={}", params.0, params.1, API_KEY,)))
}
}

/// Example of setting up a subscription that polls a remote API, in this case the api.openweathermap.org/weather, and
/// sends the data back to the subscriber whenever the weather in London changes.

#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
Expand All @@ -84,7 +81,7 @@ async fn main() -> anyhow::Result<()> {

let client = WsClientBuilder::default().build(&url).await?;

// Subscription to the london weather
// Subscription to the London weather
let params = JsonRpcParams::Array(vec!["London,uk".into(), "metric".into()]);
let mut weather_sub = client.subscribe::<Weather>("weather_sub", params, "weather_unsub").await?;
while let Some(w) = weather_sub.next().await {
Expand All @@ -94,6 +91,8 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

/// The context passed on registration, used to store a REST client to query for the current weather and the current
/// "state".
struct WeatherApiCx {
api_client: restson::RestClient,
last_weather: Weather,
Expand All @@ -109,16 +108,16 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
module
.register_subscription_with_context("weather_sub", "weather_unsub", |params, sink, cx| {
let params: (String, String) = params.parse()?;
log::debug!(target: "server", "subscribed with params={:?}", params);
log::debug!(target: "server", "Subscribed with params={:?}", params);
std::thread::spawn(move || loop {
let mut cx = cx.lock().unwrap();
let current_weather: Weather = cx.api_client.get(&params).unwrap();
if current_weather != cx.last_weather {
log::debug!(target: "server", "Fetched London weather: {:?}, sending", current_weather);
sink.send(&current_weather).expect("Sending should work yes?");
sink.send(&current_weather).unwrap();
cx.last_weather = current_weather;
} else {
log::trace!(target: "server", "Same weather as before")
log::trace!(target: "server", "Same weather as before. Not sending.")
}
std::thread::sleep(std::time::Duration::from_millis(500));
});
Expand Down
2 changes: 1 addition & 1 deletion types/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ where
Ok(parsed) => return Some(parsed),
Err(e) => {
log::error!("Subscription response error: {:?}", e);
return None
return None;
}
},
None => return None,
Expand Down