Skip to content

Commit

Permalink
Support for light clients.
Browse files Browse the repository at this point in the history
  • Loading branch information
dvc94ch committed Apr 18, 2020
1 parent b8159ef commit 4abcbeb
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 9 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Build
run: cargo build --verbose
run: cargo build --workspace --verbose
- name: Run tests
run: cargo test --verbose
run: cargo test --workspace --verbose
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[workspace]
members = [".", "light-client"]

[package]
name = "substrate-subxt"
version = "0.6.0"
Expand Down
17 changes: 17 additions & 0 deletions light-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "substrate-subxt-light-client"
version = "0.1.0"
authors = ["David Craven <david@craven.ch>"]
edition = "2018"

[dependencies]
async-std = "1.5.0"
futures = { version = "0.3.4", features = ["compat"] }
futures01 = { package = "futures", version = "0.1.29" }
jsonrpsee = "0.1.0"
log = "0.4.8"
sc-informant = "0.8.0-alpha.6"
sc-network = "0.8.0-alpha.6"
sc-service = "0.8.0-alpha.6"
serde_json = "1.0.51"
thiserror = "1.0.15"
226 changes: 226 additions & 0 deletions light-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
use async_std::task;
use futures::{
compat::{
Compat01As03,
Compat01As03Sink,
Sink01CompatExt,
Stream01CompatExt,
},
future::poll_fn,
sink::SinkExt,
stream::{
Stream,
StreamExt,
},
};
use futures01::sync::mpsc;
use jsonrpsee::{
common::{
Request,
Response,
},
transport::TransportClient,
};
use sc_network::config::TransportConfig;
use sc_service::{
config::{
DatabaseConfig,
KeystoreConfig,
NetworkConfiguration,
},
AbstractService,
ChainSpec,
Configuration,
Role,
RpcSession,
};
use std::{
future::Future,
path::PathBuf,
pin::Pin,
sync::Arc,
task::Poll,
};
use thiserror::Error;

#[derive(Debug, Error)]
pub enum LightClientError {
#[error("{0}")]
Json(#[from] serde_json::Error),
#[error("{0}")]
Mpsc(#[from] mpsc::SendError<String>),
}

pub struct LightClientConfig<C: ChainSpec + 'static, S: AbstractService> {
pub impl_name: &'static str,
pub impl_version: &'static str,
pub author: &'static str,
pub copyright_start_year: i32,
pub db_path: PathBuf,
pub builder: fn(Configuration) -> Result<S, sc_service::Error>,
pub chain_spec: C,
}

pub struct LightClient {
to_back: Compat01As03Sink<mpsc::Sender<String>, String>,
from_back: Compat01As03<mpsc::Receiver<String>>,
}

impl LightClient {
pub fn new<C: ChainSpec + 'static, S: AbstractService>(
config: LightClientConfig<C, S>,
) -> Result<Self, sc_service::Error> {
let (to_back, from_front) = mpsc::channel(4);
let (to_front, from_back) = mpsc::channel(4);
start_light_client(config, from_front, to_front)?;
Ok(LightClient {
to_back: to_back.sink_compat(),
from_back: from_back.compat(),
})
}
}

impl TransportClient for LightClient {
type Error = LightClientError;

fn send_request<'a>(
&'a mut self,
request: Request,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'a>> {
Box::pin(async move {
let request = serde_json::to_string(&request)?;
self.to_back.send(request).await?;
Ok(())
})
}

fn next_response<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Response, Self::Error>> + Send + 'a>> {
Box::pin(async move {
let response = self
.from_back
.next()
.await
.expect("channel shouldn't close")
.unwrap();
Ok(serde_json::from_str(&response)?)
})
}
}

impl From<LightClient> for jsonrpsee::Client {
fn from(client: LightClient) -> Self {
let client = jsonrpsee::raw::RawClient::new(client);
jsonrpsee::Client::new(client)
}
}

fn start_light_client<C: ChainSpec + 'static, S: AbstractService>(
config: LightClientConfig<C, S>,
from_front: mpsc::Receiver<String>,
to_front: mpsc::Sender<String>,
) -> Result<(), sc_service::Error> {
let mut network = NetworkConfiguration::new(
format!("{} (light client)", config.chain_spec.name()),
"unknown",
Default::default(),
&PathBuf::new(),
);
network.boot_nodes = config.chain_spec.boot_nodes().to_vec();
network.transport = TransportConfig::Normal {
enable_mdns: true,
allow_private_ipv4: true,
wasm_external_transport: None,
use_yamux_flow_control: true,
};
let service_config = Configuration {
network,
impl_name: config.impl_name,
impl_version: config.impl_version,
chain_spec: Box::new(config.chain_spec),
role: Role::Light,
task_executor: Arc::new(move |fut| {
task::spawn(fut);
}),
database: DatabaseConfig::Path {
path: config.db_path,
cache_size: 64,
},
keystore: KeystoreConfig::InMemory,
max_runtime_instances: 8,
announce_block: true,

telemetry_endpoints: Default::default(),
telemetry_external_transport: Default::default(),
default_heap_pages: Default::default(),
dev_key_seed: Default::default(),
disable_grandpa: Default::default(),
execution_strategies: Default::default(),
force_authoring: Default::default(),
offchain_worker: Default::default(),
prometheus_config: Default::default(),
pruning: Default::default(),
rpc_cors: Default::default(),
rpc_http: Default::default(),
rpc_ws: Default::default(),
rpc_ws_max_connections: Default::default(),
state_cache_child_ratio: Default::default(),
state_cache_size: Default::default(),
tracing_receiver: Default::default(),
tracing_targets: Default::default(),
transaction_pool: Default::default(),
wasm_method: Default::default(),
};

log::info!("{}", service_config.impl_name);
log::info!("✌️ version {}", service_config.impl_version);
log::info!("❤️ by {}, {}", config.author, config.copyright_start_year);
log::info!(
"📋 Chain specification: {}",
service_config.chain_spec.name()
);
log::info!("🏷 Node name: {}", service_config.network.node_name);
log::info!("👤 Role: {:?}", service_config.role);

// Create the service. This is the most heavy initialization step.
let mut service = (config.builder)(service_config)?;

// Spawn informant.
task::spawn(sc_informant::build(
&service,
sc_informant::OutputFormat::Plain,
));

// Spawn background task.
let session = RpcSession::new(to_front.clone());
let mut from_front = from_front.compat();
task::spawn(poll_fn(move |cx| {
loop {
match Pin::new(&mut from_front).poll_next(cx) {
Poll::Ready(Some(message)) => {
let mut to_front = to_front.clone().sink_compat();
let message = message.unwrap();
let fut = service.rpc_query(&session, &message);
task::spawn(async move {
if let Some(response) = fut.await {
to_front.send(response).await.ok();
}
});
}
Poll::Pending => break,
Poll::Ready(None) => return Poll::Ready(()),
}
}

loop {
match Pin::new(&mut service).poll(cx) {
Poll::Ready(Ok(())) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => log::error!("{}", e),
}
}
}));

Ok(())
}
29 changes: 25 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ use self::{
pub struct ClientBuilder<T: System, S = MultiSignature, E = DefaultExtra<T>> {
_marker: std::marker::PhantomData<(T, S, E)>,
url: Option<String>,
client: Option<jsonrpsee::Client>,
}

impl<T: System, S, E> ClientBuilder<T, S, E> {
Expand All @@ -125,19 +126,39 @@ impl<T: System, S, E> ClientBuilder<T, S, E> {
Self {
_marker: std::marker::PhantomData,
url: None,
client: None,
}
}

/// Sets the jsonrpsee client.
pub fn set_client<P: Into<jsonrpsee::Client>>(mut self, client: P) -> Self {
self.client = Some(client.into());
self
}

/// Set the substrate rpc address.
pub fn set_url(mut self, url: &str) -> Self {
self.url = Some(url.to_string());
pub fn set_url<P: Into<String>>(mut self, url: P) -> Self {
self.url = Some(url.into());
self
}

/// Creates a new Client.
pub async fn build(self) -> Result<Client<T, S, E>, Error> {
let url = self.url.unwrap_or("ws://127.0.0.1:9944".to_string());
let rpc = Rpc::connect_ws(&url).await?;
let client = if let Some(client) = self.client {
client
} else {
let url = self
.url
.as_ref()
.map(|s| &**s)
.unwrap_or("ws://127.0.0.1:9944");
if url.starts_with("ws://") || url.starts_with("wss://") {
jsonrpsee::ws_client(url).await?
} else {
jsonrpsee::http_client(url)
}
};
let rpc = Rpc::new(client).await?;

let (metadata, genesis_hash, runtime_version) = future::join3(
rpc.metadata(),
Expand Down
5 changes: 2 additions & 3 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,9 @@ impl<T> Rpc<T>
where
T: System,
{
pub async fn connect_ws(url: &str) -> Result<Self, Error> {
let client = jsonrpsee::ws_client(&url).await?;
pub async fn new(client: Client) -> Result<Self, Error> {
Ok(Rpc {
client: client.into(),
client,
marker: PhantomData,
})
}
Expand Down

0 comments on commit 4abcbeb

Please sign in to comment.