From bc5dcc64548ba2db113e7edb8f88d42604f6943b Mon Sep 17 00:00:00 2001 From: Vlad Frolov Date: Tue, 28 May 2019 10:25:41 +0300 Subject: [PATCH 1/5] Refactored jsonrpc to use futures 0.3 + async/await --- Cargo.lock | 118 +++++++++++++++++++++++ chain/chain/src/types.rs | 4 +- chain/client/src/client.rs | 4 +- chain/client/src/view_client.rs | 4 +- chain/jsonrpc/Cargo.toml | 4 +- chain/jsonrpc/src/lib.rs | 164 +++++++++++++++++--------------- 6 files changed, 213 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3eb315f7082..afd1d8b451e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1269,6 +1269,19 @@ name = "futures" version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "futures-channel-preview" +version = "0.3.0-alpha.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-core-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-core-preview" +version = "0.3.0-alpha.16" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "futures-cpupool" version = "0.1.8" @@ -1278,6 +1291,79 @@ dependencies = [ "num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "futures-executor-preview" +version = "0.3.0-alpha.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-channel-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-io-preview" +version = "0.3.0-alpha.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-core-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-preview" +version = "0.3.0-alpha.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-channel-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-executor-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-io-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-select-macro-preview" +version = "0.3.0-alpha.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro-hack 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-sink-preview" +version = "0.3.0-alpha.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-channel-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-util-preview" +version = "0.3.0-alpha.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-channel-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-io-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-select-macro-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro-hack 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro-nested 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "gcc" version = "0.3.55" @@ -1963,6 +2049,7 @@ dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "near-chain 0.1.0", "near-client 0.1.0", @@ -2455,6 +2542,11 @@ dependencies = [ "unicase 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "pin-utils" +version = "0.1.0-alpha.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "pkg-config" version = "0.3.14" @@ -2473,11 +2565,26 @@ dependencies = [ "proc-macro-hack-impl 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "proc-macro-hack-impl" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "proc-macro-nested" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "proc-macro2" version = "0.4.30" @@ -4115,7 +4222,15 @@ dependencies = [ "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)" = "a2037ec1c6c1c4f79557762eab1f7eae1f64f6cb418ace90fae88f0942b60139" +"checksum futures-channel-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)" = "4cd523712fc272e9b714669165a2832debee5a5b7e409bfccdc7c0d5cd0cf07a" +"checksum futures-core-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)" = "719770f328642b657b849856bb5a607db9538dd5bb3000122e5ead55d0a58c36" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" +"checksum futures-executor-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)" = "315dc58c908535d059576a329b86cd185933433382cfcd394fb2fa353330de03" +"checksum futures-io-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)" = "cca0bf7a1f39c9d32b797b0def93d5932aa71796236aad6b549bac6f7df159a3" +"checksum futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)" = "fcfeac5f016a4b5835bb93eb7961f50a64f0e001207562703d9ddf4109d7b263" +"checksum futures-select-macro-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)" = "afee2644abc7c8a6529530bb20e044ace4b7dfc4df1c114614d1b458fc29f0b0" +"checksum futures-sink-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)" = "49dcfdacd6b5974ca0b9b78bc38ffd1071da0206179735c3df82e279f5b784e4" +"checksum futures-util-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)" = "f7a0451b9c5047c2b9ab93425ffd0793165511e93c04b977cd45fbd41c6e34b2" "checksum gcc 0.3.55 (registry+https://github.com/rust-lang/crates.io-index)" = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" "checksum generic-array 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3c0f28c2f5bfb5960175af447a2da7c18900693738343dc896ffbcabd9839592" "checksum generic-array 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)" = "fceb69994e330afed50c93524be68c42fa898c2d9fd4ee8da03bd7363acd26f2" @@ -4214,10 +4329,13 @@ dependencies = [ "checksum phf_codegen 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e" "checksum phf_generator 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "09364cc93c159b8b06b1f4dd8a4398984503483891b0c26b867cf431fb132662" "checksum phf_shared 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "234f71a15de2288bcb7e3b6515828d22af7ec8598ee6d24c3b526fa0a80b67a0" +"checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" "checksum pkg-config 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "676e8eb2b1b4c9043511a9b7bea0915320d7e502b0a079fb03f9635a5252b18c" "checksum podio 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "780fb4b6698bbf9cf2444ea5d22411cef2953f0824b98f33cf454ec5615645bd" "checksum proc-macro-hack 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2c725b36c99df7af7bf9324e9c999b9e37d92c8f8caf106d82e1d7953218d2d8" +"checksum proc-macro-hack 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)" = "0c1dd4172a1e1f96f709341418f49b11ea6c2d95d53dca08c0f74cbd332d9cf3" "checksum proc-macro-hack-impl 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2b753ad9ed99dd8efeaa7d2fb8453c8f6bc3e54b97966d35f1bc77ca6865254a" +"checksum proc-macro-nested 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "369a6ed065f249a159e06c45752c780bda2fb53c995718f9e484d08daa9eb42e" "checksum proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)" = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" "checksum protobuf 2.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3959be8d6250192f80ef056c0a4aaaeaff8a25e904e6e7a0f5285cb1a061835f" "checksum protobuf-codegen 2.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0561a1c7ae9e965ce4d6f18e7049242a1cc7f79bd237cc4780f14eb1ea7505b7" diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 000e710d43c..72bcf073a80 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -305,7 +305,7 @@ pub type ReceiptResult = HashMap>; /// Bridge between the chain and the runtime. /// Main function is to update state given transactions. /// Additionally handles authorities and block weight computation. -pub trait RuntimeAdapter { +pub trait RuntimeAdapter : Send + Sync { /// Initialize state to genesis state and returns StoreUpdate and state root. /// StoreUpdate can be discarded if the chain past the genesis. fn genesis_state(&self, shard_id: ShardId) -> (StoreUpdate, MerkleHash); @@ -453,4 +453,4 @@ mod tests { assert!(signer.verify(b2.hash().as_ref(), &b2.header.signature)); assert_eq!(b2.header.total_weight.to_num(), 3); } -} \ No newline at end of file +} diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index f9dc21423a4..1b34269ebda 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -47,7 +47,7 @@ pub struct ClientActor { config: ClientConfig, sync_status: SyncStatus, chain: Chain, - runtime_adapter: Arc, + runtime_adapter: Arc, tx_pool: TransactionPool, network_actor: Recipient, block_producer: Option, @@ -72,7 +72,7 @@ impl ClientActor { config: ClientConfig, store: Arc, genesis_time: DateTime, - runtime_adapter: Arc, + runtime_adapter: Arc, network_actor: Recipient, block_producer: Option, ) -> Result { diff --git a/chain/client/src/view_client.rs b/chain/client/src/view_client.rs index 30599186d40..99ebf336a84 100644 --- a/chain/client/src/view_client.rs +++ b/chain/client/src/view_client.rs @@ -21,14 +21,14 @@ use crate::TxDetails; /// View client provides currently committed (to the storage) view of the current chain and state. pub struct ViewClientActor { chain: Chain, - runtime_adapter: Arc, + runtime_adapter: Arc, } impl ViewClientActor { pub fn new( store: Arc, genesis_time: DateTime, - runtime_adapter: Arc, + runtime_adapter: Arc, ) -> Result { // TODO: should we create shared ChainStore that is passed to both Client and ViewClient? let chain = Chain::new(store, runtime_adapter.clone(), genesis_time)?; diff --git a/chain/jsonrpc/Cargo.toml b/chain/jsonrpc/Cargo.toml index 47c5e1ef853..df171619bcb 100644 --- a/chain/jsonrpc/Cargo.toml +++ b/chain/jsonrpc/Cargo.toml @@ -7,9 +7,11 @@ edition = "2018" [dependencies] ansi_term = "0.11.0" actix = "0.8.1" -actix-web = "1.0.0-beta.3" +actix-web = "1.0.0-rc" +base64 = "0.10.0" bytes = "0.4.11" futures = "0.1" +futures03 = { package = "futures-preview", version = "0.3.0-alpha.16", features = ["compat", "async-await", "nightly"] } chrono = { version = "0.4.4", features = ["serde"] } log = "0.4" serde_derive = "1.0" diff --git a/chain/jsonrpc/src/lib.rs b/chain/jsonrpc/src/lib.rs index 781e0522bab..e8d691f9f93 100644 --- a/chain/jsonrpc/src/lib.rs +++ b/chain/jsonrpc/src/lib.rs @@ -1,20 +1,19 @@ +#![feature(await_macro, async_await)] + use std::convert::TryFrom; use std::convert::TryInto; -use std::sync::Arc; use std::time::Duration; use actix::{Addr, MailboxError}; use actix_web::{middleware, web, App, Error as HttpError, HttpResponse, HttpServer}; -use futures::future; +use base64; use futures::future::Future; -use futures::stream::Stream; +use futures03::{compat::Future01CompatExt as _, FutureExt as _, TryFutureExt as _}; use log::error; use protobuf::parse_from_bytes; use serde::de::DeserializeOwned; use serde_derive::{Deserialize, Serialize}; use serde_json::Value; -use tokio::timer::Interval; -use tokio::util::FutureExt; use message::Message; use near_client::{ClientActor, GetBlock, Query, Status, TxDetails, TxStatus, ViewClientActor}; @@ -30,6 +29,19 @@ use crate::message::{Request, RpcError}; pub mod client; mod message; +macro_rules! select { // replace `::futures_util` with `::futures03` as the crate path + ($($tokens:tt)*) => { + futures03::inner_select::select! { + futures_crate_path ( ::futures03 ) + $( $tokens )* + } + } +} + +async fn delay(duration: Duration) -> Result<(), tokio::timer::Error> { + tokio::timer::Delay::new(std::time::Instant::now() + duration).compat().await +} + #[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub struct RpcPollingConfig { pub polling_interval: Duration, @@ -68,20 +80,12 @@ impl RpcConfig { } } -macro_rules! ok_or_rpc_error(($obj: expr) => (match $obj { - Ok(value) => value, - Err(err) => { - error!(target: "rpc", "RPC error: {:?}", err); - return Box::new(future::err(err)) - } -})); - fn parse_params(value: Option) -> Result { if let Some(value) = value { serde_json::from_value(value) .map_err(|err| RpcError::invalid_params(Some(format!("Failed parsing args: {}", err)))) } else { - Err(RpcError::invalid_params(Some("Require at least one parameter".to_string()))) + Err(RpcError::invalid_params(Some("Require at least one parameter".to_owned()))) } } @@ -116,37 +120,37 @@ fn parse_hash(params: Option) -> Result { struct JsonRpcHandler { client_addr: Addr, - view_client_addr: Arc>, + view_client_addr: Addr, polling_config: RpcPollingConfig, } impl JsonRpcHandler { - pub fn process(&self, message: Message) -> Box> { + pub async fn process(&self, message: Message) -> Result { let id = message.id(); match message { - Message::Request(request) => Box::new( - self.process_request(request).then(|result| Ok(Message::response(id, result))), - ), - _ => Box::new(future::ok::<_, HttpError>(Message::error(RpcError::invalid_request()))), + Message::Request(request) => { + Ok(Message::response(id, self.process_request(request).await)) + } + _ => Ok(Message::error(RpcError::invalid_request())), } } - fn process_request(&self, request: Request) -> Box> { + async fn process_request(&self, request: Request) -> Result { match request.method.as_ref() { - "broadcast_tx_async" => self.send_tx_async(request.params), - "broadcast_tx_commit" => self.send_tx_commit(request.params), - "query" => self.query(request.params), - "health" => self.health(), - "status" => self.status(), - "tx" => self.tx_status(request.params), - "tx_details" => self.tx_details(request.params), - "block" => self.block(request.params), - _ => Box::new(future::err(RpcError::method_not_found(request.method))), + "broadcast_tx_async" => self.send_tx_async(request.params).await, + "broadcast_tx_commit" => self.send_tx_commit(request.params).await, + "query" => self.query(request.params).await, + "health" => self.health().await, + "status" => self.status().await, + "tx" => self.tx_status(request.params).await, + "tx_details" => self.tx_details(request.params).await, + "block" => self.block(request.params).await, + _ => Err(RpcError::method_not_found(request.method)), } } - fn send_tx_async(&self, params: Option) -> Box> { - let tx = ok_or_rpc_error!(parse_tx(params)); + async fn send_tx_async(&self, params: Option) -> Result { + let tx = parse_tx(params)?; let hash = (&tx.get_hash()).to_base(); actix::spawn( self.client_addr @@ -154,66 +158,66 @@ impl JsonRpcHandler { .map(|_| ()) .map_err(|_| ()), ); - Box::new(future::ok(Value::String(hash))) + Ok(Value::String(hash)) } - fn send_tx_commit(&self, params: Option) -> Box> { - let tx = ok_or_rpc_error!(parse_tx(params)); + async fn send_tx_commit(&self, params: Option) -> Result { + let tx = parse_tx(params)?; let tx_hash = tx.get_hash(); - let view_client_addr = self.view_client_addr.clone(); let RpcPollingConfig { polling_interval, polling_timeout, .. } = self.polling_config; - Box::new( - self.client_addr - .send(NetworkClientMessages::Transaction(tx)) - .map_err(|err| RpcError::server_error(Some(err.to_string()))) - .and_then(move |_result| { - Interval::new_interval(polling_interval) - .map(move |_| view_client_addr.send(TxStatus { tx_hash }).wait()) - .filter(|tx_status| match tx_status { - Ok(Ok(tx_status)) => match tx_status.status { - FinalTransactionStatus::Started - | FinalTransactionStatus::Unknown => false, - _ => true, - }, - _ => false, - }) - .map_err(|err| RpcError::server_error(Some(err.to_string()))) - .take(1) - .fold(Value::Null, |_, tx_status| jsonify(tx_status)) - }) - .timeout(polling_timeout) - .map_err(|_| { - RpcError::server_error(Some("send_tx_commit has timed out.".to_owned())) - }), - ) + self.client_addr + .send(NetworkClientMessages::Transaction(tx)) + .map_err(|err| RpcError::server_error(Some(err.to_string()))) + .compat() + .await?; + let final_tx_async = async { + loop { + let final_tx = self.view_client_addr.send(TxStatus { tx_hash }).compat().await; + if let Ok(Ok(ref tx)) = final_tx { + match tx.status { + FinalTransactionStatus::Started | FinalTransactionStatus::Unknown => {} + _ => { + break jsonify(final_tx); + } + } + } + let _ = delay(polling_interval).await; + } + }; + select! { + final_tx = final_tx_async.boxed().fuse() => final_tx, + _ = delay(polling_timeout).boxed().fuse() => { + Err(RpcError::server_error(Some("send_tx_commit has timed out.".to_owned()))) + } + } } - fn health(&self) -> Box> { - Box::new(future::ok(Value::Null)) + async fn health(&self) -> Result { + Ok(Value::Null) } - fn status(&self) -> Box> { - Box::new(self.client_addr.send(Status {}).then(jsonify)) + async fn status(&self) -> Result { + jsonify(self.client_addr.send(Status {}).compat().await) } - fn query(&self, params: Option) -> Box> { - let (path, data) = ok_or_rpc_error!(parse_params::<(String, Vec)>(params)); - Box::new(self.view_client_addr.send(Query { path, data }).then(jsonify)) + async fn query(&self, params: Option) -> Result { + let (path, data) = parse_params::<(String, Vec)>(params)?; + jsonify(self.view_client_addr.send(Query { path, data }).compat().await) } - fn tx_status(&self, params: Option) -> Box> { - let tx_hash = ok_or_rpc_error!(parse_hash(params)); - Box::new(self.view_client_addr.send(TxStatus { tx_hash }).then(jsonify)) + async fn tx_status(&self, params: Option) -> Result { + let tx_hash = parse_hash(params)?; + jsonify(self.view_client_addr.send(TxStatus { tx_hash }).compat().await) } - fn tx_details(&self, params: Option) -> Box> { - let tx_hash = ok_or_rpc_error!(parse_hash(params)); - Box::new(self.view_client_addr.send(TxDetails { tx_hash }).then(jsonify)) + async fn tx_details(&self, params: Option) -> Result { + let tx_hash = parse_hash(params)?; + jsonify(self.view_client_addr.send(TxDetails { tx_hash }).compat().await) } - fn block(&self, params: Option) -> Box> { - let (height,) = ok_or_rpc_error!(parse_params::<(BlockIndex,)>(params)); - Box::new(self.view_client_addr.send(GetBlock::Height(height)).then(jsonify)) + async fn block(&self, params: Option) -> Result { + let (height,) = parse_params::<(BlockIndex,)>(params)?; + jsonify(self.view_client_addr.send(GetBlock::Height(height)).compat().await) } } @@ -221,7 +225,11 @@ fn rpc_handler( message: web::Json, handler: web::Data, ) -> impl Future { - handler.process(message.0).and_then(|message| Ok(HttpResponse::Ok().json(message))) + let response = async move { + let message = handler.process(message.0).await?; + Ok(HttpResponse::Ok().json(message)) + }; + response.boxed().compat() } pub fn start_http( @@ -234,7 +242,7 @@ pub fn start_http( App::new() .data(JsonRpcHandler { client_addr: client_addr.clone(), - view_client_addr: Arc::new(view_client_addr.clone()), + view_client_addr: view_client_addr.clone(), polling_config, }) .wrap(middleware::Logger::default()) From 988f3c3f6e3a1143ab9db8e8f71f8df141b6746b Mon Sep 17 00:00:00 2001 From: Vlad Frolov Date: Wed, 29 May 2019 00:47:01 +0300 Subject: [PATCH 2/5] Upgraded Rust compiler for async/await support --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 1122981f1c9..ee84cdce3b0 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,4 +1,4 @@ -image: parity/rust:nightly-20190315 +image: parity/rust:a811bb14-20190522 variables: CI_SERVER_NAME: "GitLab CI" From 625d0fbc491ecda2265e290ac0a0e91d2c8d46db Mon Sep 17 00:00:00 2001 From: Vlad Frolov Date: Wed, 29 May 2019 01:32:43 +0300 Subject: [PATCH 3/5] [CI] Replace wget with curl to avoid unnecessary dependency --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index ee84cdce3b0..959e36b649c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -26,7 +26,7 @@ variables: .install_protos: &install_protos sudo apt-get install -y unzip && - wget -O /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.6.1/protoc-3.6.1-linux-x86_64.zip && + curl -Lo /tmp/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v3.6.1/protoc-3.6.1-linux-x86_64.zip && sudo unzip /tmp/protoc.zip -d protoc && sudo mv protoc/bin/* /usr/local/bin/ && sudo chmod 755 /usr/local/bin/protoc From eb1adc5f864607983db7f5d48c1056a226791802 Mon Sep 17 00:00:00 2001 From: Vlad Frolov Date: Wed, 29 May 2019 02:02:30 +0300 Subject: [PATCH 4/5] Introduce timeout helper for jsonrpc async/await --- chain/jsonrpc/src/lib.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/chain/jsonrpc/src/lib.rs b/chain/jsonrpc/src/lib.rs index e8d691f9f93..e1ca8266445 100644 --- a/chain/jsonrpc/src/lib.rs +++ b/chain/jsonrpc/src/lib.rs @@ -42,6 +42,18 @@ async fn delay(duration: Duration) -> Result<(), tokio::timer::Error> { tokio::timer::Delay::new(std::time::Instant::now() + duration).compat().await } +async fn timeout(timeout: Duration, f: Fut) -> Result +where + Fut: futures03::Future> + Send, +{ + select! { + result = f.boxed().fuse() => result, + _ = delay(timeout).boxed().fuse() => { + Err(RpcError::server_error(Some("send_tx_commit has timed out.".to_owned()))) + } + } +} + #[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub struct RpcPollingConfig { pub polling_interval: Duration, @@ -164,13 +176,12 @@ impl JsonRpcHandler { async fn send_tx_commit(&self, params: Option) -> Result { let tx = parse_tx(params)?; let tx_hash = tx.get_hash(); - let RpcPollingConfig { polling_interval, polling_timeout, .. } = self.polling_config; self.client_addr .send(NetworkClientMessages::Transaction(tx)) .map_err(|err| RpcError::server_error(Some(err.to_string()))) .compat() .await?; - let final_tx_async = async { + timeout(self.polling_config.polling_timeout, async { loop { let final_tx = self.view_client_addr.send(TxStatus { tx_hash }).compat().await; if let Ok(Ok(ref tx)) = final_tx { @@ -181,15 +192,10 @@ impl JsonRpcHandler { } } } - let _ = delay(polling_interval).await; + let _ = delay(self.polling_config.polling_interval).await; } - }; - select! { - final_tx = final_tx_async.boxed().fuse() => final_tx, - _ = delay(polling_timeout).boxed().fuse() => { - Err(RpcError::server_error(Some("send_tx_commit has timed out.".to_owned()))) - } - } + }) + .await } async fn health(&self) -> Result { From 02b57a5785b7fe802712d96eaf85dd50d0914f2c Mon Sep 17 00:00:00 2001 From: Vlad Frolov Date: Wed, 29 May 2019 23:13:31 +0300 Subject: [PATCH 5/5] Extracted the async/await helpers into async-utils crate --- Cargo.lock | 10 ++++++++ Cargo.toml | 1 + async-utils/Cargo.toml | 9 ++++++++ async-utils/src/lib.rs | 49 ++++++++++++++++++++++++++++++++++++++++ chain/jsonrpc/Cargo.toml | 1 + chain/jsonrpc/src/lib.rs | 29 ++---------------------- 6 files changed, 72 insertions(+), 27 deletions(-) create mode 100644 async-utils/Cargo.toml create mode 100644 async-utils/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index afd1d8b451e..a73d27dd71e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -325,6 +325,14 @@ name = "assert_matches" version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "async-utils" +version = "0.1.0" +dependencies = [ + "futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "atty" version = "0.2.11" @@ -2046,6 +2054,8 @@ dependencies = [ "actix 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 1.0.0-rc (registry+https://github.com/rust-lang/crates.io-index)", "ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "async-utils 0.1.0", + "base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 9182eab9717..8c0f4719727 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" [workspace] members = [ + "async-utils/", "protos/builder", "core/primitives", "core/store", diff --git a/async-utils/Cargo.toml b/async-utils/Cargo.toml new file mode 100644 index 00000000000..9831f6f6587 --- /dev/null +++ b/async-utils/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "async-utils" +version = "0.1.0" +authors = ["Near Inc "] +edition = "2018" + +[dependencies] +futures03 = { package = "futures-preview", version = "0.3.0-alpha.16", features = ["compat", "async-await", "nightly"] } +tokio = "0.1.15" diff --git a/async-utils/src/lib.rs b/async-utils/src/lib.rs new file mode 100644 index 00000000000..5b94ecd97bc --- /dev/null +++ b/async-utils/src/lib.rs @@ -0,0 +1,49 @@ +#![feature(await_macro, async_await)] + +use std::time::{Duration, Instant}; + +use futures03::{compat::Future01CompatExt as _, FutureExt as _}; + +/// This macro is extracted from +/// https://github.com/rust-lang-nursery/futures-rs/blob/c30adf513b9eea35ab385c0797210c77986fc82f/futures/src/lib.rs#L503-L510 +/// +/// It is useful when the `futures-preview` is imported as `futures03`. +macro_rules! select { // replace `::futures_util` with `::futures03` as the crate path + ($($tokens:tt)*) => { + futures03::inner_select::select! { + futures_crate_path ( ::futures03 ) + $( $tokens )* + } + } +} + +/// An async/await helper to delay the execution: +/// +/// ```ignore +/// let _ = delay(std::time::Duration::from_secs(1)).await; +/// ``` +pub async fn delay(duration: Duration) -> Result<(), tokio::timer::Error> { + tokio::timer::Delay::new(Instant::now() + duration).compat().await +} + +pub struct TimeoutError; + +/// An async/await helper to timeout a given async context: +/// +/// ```ignore +/// timeout( +/// std::time::Duration::from_secs(1), +/// async { +/// let _ = delay(std::time::Duration::from_secs(2)).await; +/// } +/// ).await +/// ``` +pub async fn timeout(timeout: Duration, f: Fut) -> Result +where + Fut: futures03::Future + Send, +{ + select! { + result = f.boxed().fuse() => Ok(result), + _ = delay(timeout).boxed().fuse() => Err(TimeoutError {}) + } +} diff --git a/chain/jsonrpc/Cargo.toml b/chain/jsonrpc/Cargo.toml index df171619bcb..a7802cec829 100644 --- a/chain/jsonrpc/Cargo.toml +++ b/chain/jsonrpc/Cargo.toml @@ -21,6 +21,7 @@ tokio = "0.1.15" uuid = { version = "~0.6", features = ["v4"] } protobuf = "2.4" +async-utils = { path = "../../async-utils" } near-primitives = { path = "../../core/primitives" } near-protos = { path = "../../core/protos" } near-store = { path = "../../core/store" } diff --git a/chain/jsonrpc/src/lib.rs b/chain/jsonrpc/src/lib.rs index e1ca8266445..166b6726f40 100644 --- a/chain/jsonrpc/src/lib.rs +++ b/chain/jsonrpc/src/lib.rs @@ -6,15 +6,14 @@ use std::time::Duration; use actix::{Addr, MailboxError}; use actix_web::{middleware, web, App, Error as HttpError, HttpResponse, HttpServer}; -use base64; use futures::future::Future; use futures03::{compat::Future01CompatExt as _, FutureExt as _, TryFutureExt as _}; -use log::error; use protobuf::parse_from_bytes; use serde::de::DeserializeOwned; use serde_derive::{Deserialize, Serialize}; use serde_json::Value; +use async_utils::{delay, timeout}; use message::Message; use near_client::{ClientActor, GetBlock, Query, Status, TxDetails, TxStatus, ViewClientActor}; use near_network::NetworkClientMessages; @@ -29,31 +28,6 @@ use crate::message::{Request, RpcError}; pub mod client; mod message; -macro_rules! select { // replace `::futures_util` with `::futures03` as the crate path - ($($tokens:tt)*) => { - futures03::inner_select::select! { - futures_crate_path ( ::futures03 ) - $( $tokens )* - } - } -} - -async fn delay(duration: Duration) -> Result<(), tokio::timer::Error> { - tokio::timer::Delay::new(std::time::Instant::now() + duration).compat().await -} - -async fn timeout(timeout: Duration, f: Fut) -> Result -where - Fut: futures03::Future> + Send, -{ - select! { - result = f.boxed().fuse() => result, - _ = delay(timeout).boxed().fuse() => { - Err(RpcError::server_error(Some("send_tx_commit has timed out.".to_owned()))) - } - } -} - #[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub struct RpcPollingConfig { pub polling_interval: Duration, @@ -196,6 +170,7 @@ impl JsonRpcHandler { } }) .await + .map_err(|_| RpcError::server_error(Some("send_tx_commit has timed out.".to_owned())))? } async fn health(&self) -> Result {