From 4abcbebf5dfb4a7cb614c4e31a0534b5607e68b3 Mon Sep 17 00:00:00 2001 From: David Craven Date: Thu, 16 Apr 2020 18:23:59 +0200 Subject: [PATCH] Support for light clients. --- .github/workflows/rust.yml | 4 +- Cargo.toml | 3 + light-client/Cargo.toml | 17 +++ light-client/src/lib.rs | 226 +++++++++++++++++++++++++++++++++++++ src/lib.rs | 29 ++++- src/rpc.rs | 5 +- 6 files changed, 275 insertions(+), 9 deletions(-) create mode 100644 light-client/Cargo.toml create mode 100644 light-client/src/lib.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 6738b0b2929..301487405a6 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -14,6 +14,6 @@ jobs: steps: - uses: actions/checkout@v2 - name: Build - run: cargo build --verbose + run: cargo build --workspace --verbose - name: Run tests - run: cargo test --verbose + run: cargo test --workspace --verbose diff --git a/Cargo.toml b/Cargo.toml index 8448c19840b..bb0faf78a9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,6 @@ +[workspace] +members = [".", "light-client"] + [package] name = "substrate-subxt" version = "0.6.0" diff --git a/light-client/Cargo.toml b/light-client/Cargo.toml new file mode 100644 index 00000000000..ff938895949 --- /dev/null +++ b/light-client/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "substrate-subxt-light-client" +version = "0.1.0" +authors = ["David Craven "] +edition = "2018" + +[dependencies] +async-std = "1.5.0" +futures = { version = "0.3.4", features = ["compat"] } +futures01 = { package = "futures", version = "0.1.29" } +jsonrpsee = "0.1.0" +log = "0.4.8" +sc-informant = "0.8.0-alpha.6" +sc-network = "0.8.0-alpha.6" +sc-service = "0.8.0-alpha.6" +serde_json = "1.0.51" +thiserror = "1.0.15" diff --git a/light-client/src/lib.rs b/light-client/src/lib.rs new file mode 100644 index 00000000000..c0f9380a3c9 --- /dev/null +++ b/light-client/src/lib.rs @@ -0,0 +1,226 @@ +use async_std::task; +use futures::{ + compat::{ + Compat01As03, + Compat01As03Sink, + Sink01CompatExt, + Stream01CompatExt, + }, + future::poll_fn, + sink::SinkExt, + stream::{ + Stream, + StreamExt, + }, +}; +use futures01::sync::mpsc; +use jsonrpsee::{ + common::{ + Request, + Response, + }, + transport::TransportClient, +}; +use sc_network::config::TransportConfig; +use sc_service::{ + config::{ + DatabaseConfig, + KeystoreConfig, + NetworkConfiguration, + }, + AbstractService, + ChainSpec, + Configuration, + Role, + RpcSession, +}; +use std::{ + future::Future, + path::PathBuf, + pin::Pin, + sync::Arc, + task::Poll, +}; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum LightClientError { + #[error("{0}")] + Json(#[from] serde_json::Error), + #[error("{0}")] + Mpsc(#[from] mpsc::SendError), +} + +pub struct LightClientConfig { + pub impl_name: &'static str, + pub impl_version: &'static str, + pub author: &'static str, + pub copyright_start_year: i32, + pub db_path: PathBuf, + pub builder: fn(Configuration) -> Result, + pub chain_spec: C, +} + +pub struct LightClient { + to_back: Compat01As03Sink, String>, + from_back: Compat01As03>, +} + +impl LightClient { + pub fn new( + config: LightClientConfig, + ) -> Result { + let (to_back, from_front) = mpsc::channel(4); + let (to_front, from_back) = mpsc::channel(4); + start_light_client(config, from_front, to_front)?; + Ok(LightClient { + to_back: to_back.sink_compat(), + from_back: from_back.compat(), + }) + } +} + +impl TransportClient for LightClient { + type Error = LightClientError; + + fn send_request<'a>( + &'a mut self, + request: Request, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + let request = serde_json::to_string(&request)?; + self.to_back.send(request).await?; + Ok(()) + }) + } + + fn next_response<'a>( + &'a mut self, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + let response = self + .from_back + .next() + .await + .expect("channel shouldn't close") + .unwrap(); + Ok(serde_json::from_str(&response)?) + }) + } +} + +impl From for jsonrpsee::Client { + fn from(client: LightClient) -> Self { + let client = jsonrpsee::raw::RawClient::new(client); + jsonrpsee::Client::new(client) + } +} + +fn start_light_client( + config: LightClientConfig, + from_front: mpsc::Receiver, + to_front: mpsc::Sender, +) -> Result<(), sc_service::Error> { + let mut network = NetworkConfiguration::new( + format!("{} (light client)", config.chain_spec.name()), + "unknown", + Default::default(), + &PathBuf::new(), + ); + network.boot_nodes = config.chain_spec.boot_nodes().to_vec(); + network.transport = TransportConfig::Normal { + enable_mdns: true, + allow_private_ipv4: true, + wasm_external_transport: None, + use_yamux_flow_control: true, + }; + let service_config = Configuration { + network, + impl_name: config.impl_name, + impl_version: config.impl_version, + chain_spec: Box::new(config.chain_spec), + role: Role::Light, + task_executor: Arc::new(move |fut| { + task::spawn(fut); + }), + database: DatabaseConfig::Path { + path: config.db_path, + cache_size: 64, + }, + keystore: KeystoreConfig::InMemory, + max_runtime_instances: 8, + announce_block: true, + + telemetry_endpoints: Default::default(), + telemetry_external_transport: Default::default(), + default_heap_pages: Default::default(), + dev_key_seed: Default::default(), + disable_grandpa: Default::default(), + execution_strategies: Default::default(), + force_authoring: Default::default(), + offchain_worker: Default::default(), + prometheus_config: Default::default(), + pruning: Default::default(), + rpc_cors: Default::default(), + rpc_http: Default::default(), + rpc_ws: Default::default(), + rpc_ws_max_connections: Default::default(), + state_cache_child_ratio: Default::default(), + state_cache_size: Default::default(), + tracing_receiver: Default::default(), + tracing_targets: Default::default(), + transaction_pool: Default::default(), + wasm_method: Default::default(), + }; + + log::info!("{}", service_config.impl_name); + log::info!("✌️ version {}", service_config.impl_version); + log::info!("❤️ by {}, {}", config.author, config.copyright_start_year); + log::info!( + "📋 Chain specification: {}", + service_config.chain_spec.name() + ); + log::info!("🏷 Node name: {}", service_config.network.node_name); + log::info!("👤 Role: {:?}", service_config.role); + + // Create the service. This is the most heavy initialization step. + let mut service = (config.builder)(service_config)?; + + // Spawn informant. + task::spawn(sc_informant::build( + &service, + sc_informant::OutputFormat::Plain, + )); + + // Spawn background task. + let session = RpcSession::new(to_front.clone()); + let mut from_front = from_front.compat(); + task::spawn(poll_fn(move |cx| { + loop { + match Pin::new(&mut from_front).poll_next(cx) { + Poll::Ready(Some(message)) => { + let mut to_front = to_front.clone().sink_compat(); + let message = message.unwrap(); + let fut = service.rpc_query(&session, &message); + task::spawn(async move { + if let Some(response) = fut.await { + to_front.send(response).await.ok(); + } + }); + } + Poll::Pending => break, + Poll::Ready(None) => return Poll::Ready(()), + } + } + + loop { + match Pin::new(&mut service).poll(cx) { + Poll::Ready(Ok(())) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => log::error!("{}", e), + } + } + })); + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index dc4b46285db..70203430f37 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -117,6 +117,7 @@ use self::{ pub struct ClientBuilder> { _marker: std::marker::PhantomData<(T, S, E)>, url: Option, + client: Option, } impl ClientBuilder { @@ -125,19 +126,39 @@ impl ClientBuilder { Self { _marker: std::marker::PhantomData, url: None, + client: None, } } + /// Sets the jsonrpsee client. + pub fn set_client>(mut self, client: P) -> Self { + self.client = Some(client.into()); + self + } + /// Set the substrate rpc address. - pub fn set_url(mut self, url: &str) -> Self { - self.url = Some(url.to_string()); + pub fn set_url>(mut self, url: P) -> Self { + self.url = Some(url.into()); self } /// Creates a new Client. pub async fn build(self) -> Result, Error> { - let url = self.url.unwrap_or("ws://127.0.0.1:9944".to_string()); - let rpc = Rpc::connect_ws(&url).await?; + let client = if let Some(client) = self.client { + client + } else { + let url = self + .url + .as_ref() + .map(|s| &**s) + .unwrap_or("ws://127.0.0.1:9944"); + if url.starts_with("ws://") || url.starts_with("wss://") { + jsonrpsee::ws_client(url).await? + } else { + jsonrpsee::http_client(url) + } + }; + let rpc = Rpc::new(client).await?; let (metadata, genesis_hash, runtime_version) = future::join3( rpc.metadata(), diff --git a/src/rpc.rs b/src/rpc.rs index 04acb4669ab..6351b134656 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -119,10 +119,9 @@ impl Rpc where T: System, { - pub async fn connect_ws(url: &str) -> Result { - let client = jsonrpsee::ws_client(&url).await?; + pub async fn new(client: Client) -> Result { Ok(Rpc { - client: client.into(), + client, marker: PhantomData, }) }