From 35a9c289ea7c0d29c0d0fe50b9ead59dab9c134b Mon Sep 17 00:00:00 2001 From: Hernando Castano Date: Tue, 23 Jun 2020 16:55:51 -0400 Subject: [PATCH] Switch to new RPC interface (#131) * Move EthereumRpc implementation to Eth client * Move SubstrateRpc implementation to SubstrateClient * Update deploy_contract to use new RPC interface * Fix some types in the Substrate client * Swap out method bodies in Eth sync loop * Swap out method bodies in Substrate sync loop * Remove Client from SourceClient trait return types * Remove Client from TargetClient trait return types * Remove client from Source select! arms * Remove client from Target select! arms * Add missing mutable refs in Substrate client * Use mutable references in Source/Target Client traits * Try and use mutable references in Source/Client trait implementations * Handle errors more gracefully * Remove unused imports * Remove dead_code and unused_variables lints * Remove usage of `jsonrpsee::RawClient` By using a `jsonrpsee::Client` we are able to remove all the shared mutable references required when interacting with the RPC server. This is convenient as trying to sharing mutable references in code that uses async/await is a bit of a pain. However, using a `Client` instead of a `RawClient` is not yet supported by the `jsonrpsee::rpc_api` macro, so a fork must be used for the moment. * Clean up dead code and warnings * Clean up higher level RPCs Some of the RPCs that were "high level" didn't necessarily belong as part of the trait, so they were removed. * Use positional parameters for RPCs Both Substrate and Ethereum's RPCs use positional (array) parameters, so in order to be compatible with both we need to make sure that our API is defined with positional paramters in mind. * Rename argument for eth_getBlockByNumber * Remove some unecessary Ok-wraps * Process client requests synchonously Before the refactoring the sync loop would wait until a client finished handling a request before issuing another one. This behaviour was inadvertently changed during the refactoring leading to race conditions. This commit makes sure that the previous behaviour is respected. * Reduce the errors that are considered a connection error * Only decode bridge contract once * Set genesis_config at RPC client startup * Fetch genesis hash in SubstrateRpcClient::new() * Move Decode error into SubstrateNodeError * Suppress warnings caused by `rpc_api!` * Implement From RpcError for String * Handle Substrate client initalization errors more gracefully * Remove match in favour of ? Co-authored-by: Svyatoslav Nikolsky --- bridges/relays/ethereum/Cargo.toml | 7 +- .../relays/ethereum/src/ethereum_client.rs | 595 ++++++++---------- .../ethereum/src/ethereum_deploy_contract.rs | 83 ++- .../relays/ethereum/src/ethereum_sync_loop.rs | 170 +++-- bridges/relays/ethereum/src/main.rs | 20 +- bridges/relays/ethereum/src/rpc.rs | 253 ++------ bridges/relays/ethereum/src/rpc_errors.rs | 71 ++- .../relays/ethereum/src/substrate_client.rs | 443 +++++-------- .../ethereum/src/substrate_sync_loop.rs | 183 +++--- bridges/relays/ethereum/src/sync.rs | 4 +- bridges/relays/ethereum/src/sync_loop.rs | 243 +++---- 11 files changed, 840 insertions(+), 1232 deletions(-) diff --git a/bridges/relays/ethereum/Cargo.toml b/bridges/relays/ethereum/Cargo.toml index 4bdcf930c1c4..0505ff8bc185 100644 --- a/bridges/relays/ethereum/Cargo.toml +++ b/bridges/relays/ethereum/Cargo.toml @@ -18,7 +18,6 @@ ethabi-derive = "12.0" ethereum-tx-sign = "3.0" futures = "0.3.5" hex = "0.4" -jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee.git", default-features = false, features = ["http"] } linked-hash-map = "0.5.3" log = "0.4.8" num-traits = "0.2" @@ -31,6 +30,12 @@ sp-bridge-eth-poa = { path = "../../primitives/ethereum-poa" } time = "0.2" web3 = { version = "0.12.0", default-features = false } +[dependencies.jsonrpsee] +git = "https://github.com/svyatonik/jsonrpsee.git" +branch = "shared-client-in-rpc-api" +default-features = false +features = ["http"] + # Substrate Based Dependencies [dependencies.frame-system] version = "2.0.0-rc3" diff --git a/bridges/relays/ethereum/src/ethereum_client.rs b/bridges/relays/ethereum/src/ethereum_client.rs index 5dba31344b0a..fcd627549a07 100644 --- a/bridges/relays/ethereum/src/ethereum_client.rs +++ b/bridges/relays/ethereum/src/ethereum_client.rs @@ -15,33 +15,30 @@ // along with Parity Bridges Common. If not, see . use crate::ethereum_types::{ - Address, Bytes, CallRequest, EthereumHeaderId, Header, Receipt, TransactionHash, H256, U256, U64, + Address, Bytes, CallRequest, EthereumHeaderId, Header, Receipt, SignedRawTx, TransactionHash, H256, U256, }; +use crate::rpc::{Ethereum, EthereumRpc}; +use crate::rpc_errors::{EthereumNodeError, RpcError}; use crate::substrate_types::{GrandpaJustification, Hash as SubstrateHash, QueuedSubstrateHeader, SubstrateHeaderId}; -use crate::sync_types::{HeaderId, MaybeConnectionError}; -use crate::{bail_on_arg_error, bail_on_error}; +use crate::sync_types::HeaderId; + +use async_trait::async_trait; use codec::{Decode, Encode}; use ethabi::FunctionOutputDecoder; -use jsonrpsee::common::Params; -use jsonrpsee::raw::{RawClient, RawClientError}; -use jsonrpsee::transport::http::{HttpTransportClient, RequestError}; +use jsonrpsee::raw::RawClient; +use jsonrpsee::transport::http::HttpTransportClient; +use jsonrpsee::Client; use parity_crypto::publickey::KeyPair; -use serde::de::DeserializeOwned; -use serde_json::{from_value, to_value}; + use std::collections::HashSet; // to encode/decode contract calls ethabi_contract::use_contract!(bridge_contract, "res/substrate-bridge-abi.json"); -/// Proof of hash serialization success. -const HASH_SERIALIZATION_PROOF: &'static str = "hash serialization never fails; qed"; -/// Proof of integer serialization success. -const INT_SERIALIZATION_PROOF: &'static str = "integer serialization never fails; qed"; -/// Proof of bool serialization success. -const BOOL_SERIALIZATION_PROOF: &'static str = "bool serialization never fails; qed"; +type Result = std::result::Result; /// Ethereum connection params. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct EthereumConnectionParams { /// Ethereum RPC host. pub host: String, @@ -86,381 +83,283 @@ impl Default for EthereumSigningParams { } } -/// Ethereum client type. -pub type Client = RawClient; - -/// All possible errors that can occur during interacting with Ethereum node. -#[derive(Debug)] -pub enum Error { - /// Request start failed. - StartRequestFailed(RequestError), - /// Error serializing request. - RequestSerialization(serde_json::Error), - /// Request not found (should never occur?). - RequestNotFound, - /// Failed to receive response. - ResponseRetrievalFailed(RawClientError), - /// Failed to parse response. - ResponseParseFailed(String), - /// We have received header with missing number and hash fields. - IncompleteHeader, - /// We have received receipt with missing gas_used field. - IncompleteReceipt, - /// Invalid Substrate block number received from Ethereum node. - InvalidSubstrateBlockNumber, +/// The client used to interact with an Ethereum node through RPC. +pub struct EthereumRpcClient { + client: Client, } -impl MaybeConnectionError for Error { - fn is_connection_error(&self) -> bool { - match *self { - Error::StartRequestFailed(_) | Error::ResponseRetrievalFailed(_) => true, - _ => false, - } +impl EthereumRpcClient { + /// Create a new Ethereum RPC Client. + pub fn new(params: EthereumConnectionParams) -> Self { + let uri = format!("http://{}:{}", params.host, params.port); + let transport = HttpTransportClient::new(&uri); + let raw_client = RawClient::new(transport); + let client: Client = raw_client.into(); + + Self { client } } } -/// Returns client that is able to call RPCs on Ethereum node. -pub fn client(params: EthereumConnectionParams) -> Client { - let uri = format!("http://{}:{}", params.host, params.port); - let transport = HttpTransportClient::new(&uri); - RawClient::new(transport) -} +#[async_trait] +impl EthereumRpc for EthereumRpcClient { + async fn estimate_gas(&self, call_request: CallRequest) -> Result { + Ok(Ethereum::estimate_gas(&self.client, call_request).await?) + } -/// Retrieve best known block number from Ethereum node. -pub async fn best_block_number(client: Client) -> (Client, Result) { - let (client, result) = call_rpc::(client, "eth_blockNumber", Params::None).await; - (client, result.map(|x| x.as_u64())) -} + async fn best_block_number(&self) -> Result { + Ok(Ethereum::block_number(&self.client).await?.as_u64()) + } -/// Retrieve block header by its number from Ethereum node. -pub async fn header_by_number(client: Client, number: u64) -> (Client, Result) { - let (client, header) = call_rpc( - client, - "eth_getBlockByNumber", - Params::Array(vec![ - to_value(U64::from(number)).expect(INT_SERIALIZATION_PROOF), - to_value(false).expect(BOOL_SERIALIZATION_PROOF), - ]), - ) - .await; - ( - client, - header.and_then(|header: Header| { - match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { - true => Ok(header), - false => Err(Error::IncompleteHeader), - } - }), - ) -} + async fn header_by_number(&self, block_number: u64) -> Result
{ + let get_full_tx_objects = false; + let header = Ethereum::get_block_by_number(&self.client, block_number, get_full_tx_objects).await?; + match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { + true => Ok(header), + false => Err(RpcError::Ethereum(EthereumNodeError::IncompleteHeader)), + } + } -/// Retrieve block header by its hash from Ethereum node. -pub async fn header_by_hash(client: Client, hash: H256) -> (Client, Result) { - let (client, header) = call_rpc( - client, - "eth_getBlockByHash", - Params::Array(vec![ - to_value(hash).expect(HASH_SERIALIZATION_PROOF), - to_value(false).expect(BOOL_SERIALIZATION_PROOF), - ]), - ) - .await; - ( - client, - header.and_then(|header: Header| { - match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { - true => Ok(header), - false => Err(Error::IncompleteHeader), - } - }), - ) -} + async fn header_by_hash(&self, hash: H256) -> Result
{ + let header = Ethereum::get_block_by_hash(&self.client, hash).await?; + match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { + true => Ok(header), + false => Err(RpcError::Ethereum(EthereumNodeError::IncompleteHeader)), + } + } + + async fn transaction_receipt(&self, transaction_hash: H256) -> Result { + let receipt = Ethereum::get_transaction_receipt(&self.client, transaction_hash).await?; -/// Retrieve transactions receipts for given block. -pub async fn transactions_receipts( - mut client: Client, - id: EthereumHeaderId, - transactions: Vec, -) -> (Client, Result<(EthereumHeaderId, Vec), Error>) { - let mut transactions_receipts = Vec::with_capacity(transactions.len()); - for transaction in transactions { - let (next_client, transaction_receipt) = bail_on_error!(transaction_receipt(client, transaction).await); - transactions_receipts.push(transaction_receipt); - client = next_client; + match receipt.gas_used { + Some(_) => Ok(receipt), + None => Err(RpcError::Ethereum(EthereumNodeError::IncompleteReceipt)), + } + } + + async fn account_nonce(&self, address: Address) -> Result { + Ok(Ethereum::get_transaction_count(&self.client, address).await?) + } + + async fn submit_transaction(&self, signed_raw_tx: SignedRawTx) -> Result { + let transaction = Bytes(signed_raw_tx); + Ok(Ethereum::submit_transaction(&self.client, transaction).await?) + } + + async fn eth_call(&self, call_transaction: CallRequest) -> Result { + Ok(Ethereum::call(&self.client, call_transaction).await?) } - (client, Ok((id, transactions_receipts))) } -/// Retrieve transaction receipt by transaction hash. -async fn transaction_receipt(client: Client, hash: H256) -> (Client, Result) { - let (client, receipt) = call_rpc::( - client, - "eth_getTransactionReceipt", - Params::Array(vec![to_value(hash).expect(HASH_SERIALIZATION_PROOF)]), - ) - .await; - ( - client, - receipt.and_then(|receipt| match receipt.gas_used.is_some() { - true => Ok(receipt), - false => Err(Error::IncompleteReceipt), - }), - ) +/// A trait which contains methods that work by using multiple low-level RPCs, or more complicated +/// interactions involving, for example, an Ethereum contract. +#[async_trait] +pub trait EthereumHighLevelRpc: EthereumRpc { + /// Returns best Substrate block that PoA chain knows of. + async fn best_substrate_block(&self, contract_address: Address) -> Result; + + /// Returns true if Substrate header is known to Ethereum node. + async fn substrate_header_known( + &self, + contract_address: Address, + id: SubstrateHeaderId, + ) -> Result<(SubstrateHeaderId, bool)>; + + /// Submits Substrate headers to Ethereum contract. + async fn submit_substrate_headers( + &self, + params: EthereumSigningParams, + contract_address: Address, + headers: Vec, + ) -> Result>; + + /// Returns ids of incomplete Substrate headers. + async fn incomplete_substrate_headers(&self, contract_address: Address) -> Result>; + + /// Complete Substrate header. + async fn complete_substrate_header( + &self, + params: EthereumSigningParams, + contract_address: Address, + id: SubstrateHeaderId, + justification: GrandpaJustification, + ) -> Result; + + /// Submit ethereum transaction. + async fn submit_ethereum_transaction( + &self, + params: &EthereumSigningParams, + contract_address: Option
, + nonce: Option, + double_gas: bool, + encoded_call: Vec, + ) -> Result<()>; + + /// Retrieve transactions receipts for given block. + async fn transaction_receipts( + &self, + id: EthereumHeaderId, + transactions: Vec, + ) -> Result<(EthereumHeaderId, Vec)>; } -/// Returns best Substrate block that PoA chain knows of. -pub async fn best_substrate_block( - client: Client, - contract_address: Address, -) -> (Client, Result) { - let (encoded_call, call_decoder) = bridge_contract::functions::best_known_header::call(); - let call_request = bail_on_arg_error!( - to_value(CallRequest { +#[async_trait] +impl EthereumHighLevelRpc for EthereumRpcClient { + async fn best_substrate_block(&self, contract_address: Address) -> Result { + let (encoded_call, call_decoder) = bridge_contract::functions::best_known_header::call(); + let call_request = CallRequest { to: Some(contract_address), data: Some(encoded_call.into()), ..Default::default() - }) - .map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, call_result) = - bail_on_error!(call_rpc::(client, "eth_call", Params::Array(vec![call_request]),).await); - let (number, raw_hash) = match call_decoder.decode(&call_result.0) { - Ok((raw_number, raw_hash)) => (raw_number, raw_hash), - Err(error) => return (client, Err(Error::ResponseParseFailed(format!("{}", error)))), - }; - let hash = match SubstrateHash::decode(&mut &raw_hash[..]) { - Ok(hash) => hash, - Err(error) => return (client, Err(Error::ResponseParseFailed(format!("{}", error)))), - }; - - if number != number.low_u32().into() { - return (client, Err(Error::InvalidSubstrateBlockNumber)); - } + }; - (client, Ok(HeaderId(number.low_u32(), hash))) -} + let call_result = self.eth_call(call_request).await?; + let (number, raw_hash) = call_decoder.decode(&call_result.0)?; + let hash = SubstrateHash::decode(&mut &raw_hash[..])?; -/// Returns true if Substrate header is known to Ethereum node. -pub async fn substrate_header_known( - client: Client, - contract_address: Address, - id: SubstrateHeaderId, -) -> (Client, Result<(SubstrateHeaderId, bool), Error>) { - let (encoded_call, call_decoder) = bridge_contract::functions::is_known_header::call(id.1); - let call_request = bail_on_arg_error!( - to_value(CallRequest { + if number != number.low_u32().into() { + return Err(RpcError::Ethereum(EthereumNodeError::InvalidSubstrateBlockNumber)); + } + + Ok(HeaderId(number.low_u32(), hash)) + } + + async fn substrate_header_known( + &self, + contract_address: Address, + id: SubstrateHeaderId, + ) -> Result<(SubstrateHeaderId, bool)> { + let (encoded_call, call_decoder) = bridge_contract::functions::is_known_header::call(id.1); + let call_request = CallRequest { to: Some(contract_address), data: Some(encoded_call.into()), ..Default::default() - }) - .map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, call_result) = - bail_on_error!(call_rpc::(client, "eth_call", Params::Array(vec![call_request]),).await); - match call_decoder.decode(&call_result.0) { - Ok(is_known_block) => (client, Ok((id, is_known_block))), - Err(error) => (client, Err(Error::ResponseParseFailed(format!("{}", error)))), + }; + + let call_result = self.eth_call(call_request).await?; + let is_known_block = call_decoder.decode(&call_result.0)?; + + Ok((id, is_known_block)) } -} -/// Submits Substrate headers to Ethereum contract. -pub async fn submit_substrate_headers( - client: Client, - params: EthereumSigningParams, - contract_address: Address, - headers: Vec, -) -> (Client, Result, Error>) { - let (mut client, mut nonce) = - bail_on_error!(account_nonce(client, params.signer.address().as_fixed_bytes().into()).await); - - let ids = headers.iter().map(|header| header.id()).collect(); - for header in headers { - client = bail_on_error!( - submit_ethereum_transaction( - client, + async fn submit_substrate_headers( + &self, + params: EthereumSigningParams, + contract_address: Address, + headers: Vec, + ) -> Result> { + let address: Address = params.signer.address().as_fixed_bytes().into(); + let mut nonce = self.account_nonce(address).await?; + + let ids = headers.iter().map(|header| header.id()).collect(); + for header in headers { + self.submit_ethereum_transaction( ¶ms, Some(contract_address), Some(nonce), false, - bridge_contract::functions::import_header::encode_input(header.header().encode(),), + bridge_contract::functions::import_header::encode_input(header.header().encode()), ) - .await - ) - .0; + .await?; - nonce += 1.into(); - } + nonce += 1.into(); + } - (client, Ok(ids)) -} + Ok(ids) + } -/// Returns ids of incomplete Substrate headers. -pub async fn incomplete_substrate_headers( - client: Client, - contract_address: Address, -) -> (Client, Result, Error>) { - let (encoded_call, call_decoder) = bridge_contract::functions::incomplete_headers::call(); - let call_request = bail_on_arg_error!( - to_value(CallRequest { + async fn incomplete_substrate_headers(&self, contract_address: Address) -> Result> { + let (encoded_call, call_decoder) = bridge_contract::functions::incomplete_headers::call(); + let call_request = CallRequest { to: Some(contract_address), data: Some(encoded_call.into()), ..Default::default() - }) - .map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, call_result) = - bail_on_error!(call_rpc::(client, "eth_call", Params::Array(vec![call_request]),).await); - match call_decoder.decode(&call_result.0) { - Ok((incomplete_headers_numbers, incomplete_headers_hashes)) => ( - client, - Ok(incomplete_headers_numbers - .into_iter() - .zip(incomplete_headers_hashes) - .filter_map(|(number, hash)| { - if number != number.low_u32().into() { - return None; - } - - Some(HeaderId(number.low_u32(), hash)) - }) - .collect()), - ), - Err(error) => (client, Err(Error::ResponseParseFailed(format!("{}", error)))), - } -} + }; -/// Complete Substrate header. -pub async fn complete_substrate_header( - client: Client, - params: EthereumSigningParams, - contract_address: Address, - id: SubstrateHeaderId, - justification: GrandpaJustification, -) -> (Client, Result) { - let (client, _) = bail_on_error!( - submit_ethereum_transaction( - client, - ¶ms, - Some(contract_address), - None, - false, - bridge_contract::functions::import_finality_proof::encode_input(id.0, id.1, justification,), - ) - .await - ); - - (client, Ok(id)) -} + let call_result = self.eth_call(call_request).await?; -/// Deploy bridge contract. -pub async fn deploy_bridge_contract( - client: Client, - params: &EthereumSigningParams, - contract_code: Vec, - initial_header: Vec, - initial_set_id: u64, - initial_authorities: Vec, -) -> (Client, Result<(), Error>) { - submit_ethereum_transaction( - client, - params, - None, - None, - false, - bridge_contract::constructor(contract_code, initial_header, initial_set_id, initial_authorities), - ) - .await -} + // Q: Is is correct to call these "incomplete_ids"? + let (incomplete_headers_numbers, incomplete_headers_hashes) = call_decoder.decode(&call_result.0)?; + let incomplete_ids = incomplete_headers_numbers + .into_iter() + .zip(incomplete_headers_hashes) + .filter_map(|(number, hash)| { + if number != number.low_u32().into() { + return None; + } -/// Submit ethereum transaction. -async fn submit_ethereum_transaction( - client: Client, - params: &EthereumSigningParams, - contract_address: Option
, - nonce: Option, - double_gas: bool, - encoded_call: Vec, -) -> (Client, Result<(), Error>) { - let (client, nonce) = match nonce { - Some(nonce) => (client, nonce), - None => bail_on_error!(account_nonce(client, params.signer.address().as_fixed_bytes().into()).await), - }; - let (client, gas) = bail_on_error!( - estimate_gas( - client, - CallRequest { - to: contract_address, - data: Some(encoded_call.clone().into()), - ..Default::default() - } - ) - .await - ); - let raw_transaction = ethereum_tx_sign::RawTransaction { - nonce, - to: contract_address, - value: U256::zero(), - gas: if double_gas { gas.saturating_mul(2.into()) } else { gas }, - gas_price: params.gas_price, - data: encoded_call, + Some(HeaderId(number.low_u32(), hash)) + }) + .collect(); + + Ok(incomplete_ids) } - .sign(¶ms.signer.secret().as_fixed_bytes().into(), ¶ms.chain_id); - let transaction = bail_on_arg_error!( - to_value(Bytes(raw_transaction)).map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, _) = bail_on_error!( - call_rpc::(client, "eth_submitTransaction", Params::Array(vec![transaction])).await - ); - (client, Ok(())) -} -/// Get account nonce. -async fn account_nonce(client: Client, caller_address: Address) -> (Client, Result) { - let caller_address = bail_on_arg_error!( - to_value(caller_address).map_err(|e| Error::RequestSerialization(e)), - client - ); - call_rpc(client, "eth_getTransactionCount", Params::Array(vec![caller_address])).await -} + async fn complete_substrate_header( + &self, + params: EthereumSigningParams, + contract_address: Address, + id: SubstrateHeaderId, + justification: GrandpaJustification, + ) -> Result { + let _ = self + .submit_ethereum_transaction( + ¶ms, + Some(contract_address), + None, + false, + bridge_contract::functions::import_finality_proof::encode_input(id.0, id.1, justification), + ) + .await?; -/// Estimate gas usage for call. -async fn estimate_gas(client: Client, call_request: CallRequest) -> (Client, Result) { - let call_request = bail_on_arg_error!( - to_value(call_request).map_err(|e| Error::RequestSerialization(e)), - client - ); - call_rpc(client, "eth_estimateGas", Params::Array(vec![call_request])).await -} + Ok(id) + } -/// Calls RPC on Ethereum node. -async fn call_rpc( - mut client: Client, - method: &'static str, - params: Params, -) -> (Client, Result) { - async fn do_call_rpc( - client: &mut Client, - method: &'static str, - params: Params, - ) -> Result { - let request_id = client - .start_request(method, params) - .await - .map_err(Error::StartRequestFailed)?; - // WARN: if there'll be need for executing >1 request at a time, we should avoid - // calling request_by_id - let response = client - .request_by_id(request_id) - .ok_or(Error::RequestNotFound)? - .await - .map_err(Error::ResponseRetrievalFailed)?; - from_value(response).map_err(|e| Error::ResponseParseFailed(format!("{}", e))) + async fn submit_ethereum_transaction( + &self, + params: &EthereumSigningParams, + contract_address: Option
, + nonce: Option, + double_gas: bool, + encoded_call: Vec, + ) -> Result<()> { + let nonce = if let Some(n) = nonce { + n + } else { + let address: Address = params.signer.address().as_fixed_bytes().into(); + self.account_nonce(address).await? + }; + + let call_request = CallRequest { + to: contract_address, + data: Some(encoded_call.clone().into()), + ..Default::default() + }; + let gas = self.estimate_gas(call_request).await?; + + let raw_transaction = ethereum_tx_sign::RawTransaction { + nonce, + to: contract_address, + value: U256::zero(), + gas: if double_gas { gas.saturating_mul(2.into()) } else { gas }, + gas_price: params.gas_price, + data: encoded_call, + } + .sign(¶ms.signer.secret().as_fixed_bytes().into(), ¶ms.chain_id); + + let _ = self.submit_transaction(raw_transaction).await?; + Ok(()) } - let result = do_call_rpc(&mut client, method, params).await; - (client, result) + async fn transaction_receipts( + &self, + id: EthereumHeaderId, + transactions: Vec, + ) -> Result<(EthereumHeaderId, Vec)> { + let mut transaction_receipts = Vec::with_capacity(transactions.len()); + for transaction in transactions { + let transaction_receipt = self.transaction_receipt(transaction).await?; + transaction_receipts.push(transaction_receipt); + } + Ok((id, transaction_receipts)) + } } diff --git a/bridges/relays/ethereum/src/ethereum_deploy_contract.rs b/bridges/relays/ethereum/src/ethereum_deploy_contract.rs index eb9f66be924b..0e573bda0634 100644 --- a/bridges/relays/ethereum/src/ethereum_deploy_contract.rs +++ b/bridges/relays/ethereum/src/ethereum_deploy_contract.rs @@ -14,9 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -use crate::ethereum_client::{self, EthereumConnectionParams, EthereumSigningParams}; -use crate::substrate_client::{self, SubstrateConnectionParams}; +use crate::ethereum_client::{ + bridge_contract, EthereumConnectionParams, EthereumHighLevelRpc, EthereumRpcClient, EthereumSigningParams, +}; +use crate::rpc::SubstrateRpc; +use crate::substrate_client::{SubstrateConnectionParams, SubstrateRpcClient}; use crate::substrate_types::{Hash as SubstrateHash, Header as SubstrateHeader}; + use codec::{Decode, Encode}; use num_traits::Zero; @@ -59,18 +63,16 @@ pub fn run(params: EthereumDeployContractParams) { let mut local_pool = futures::executor::LocalPool::new(); let result = local_pool.run_until(async move { - let eth_client = ethereum_client::client(params.eth); - let sub_client = substrate_client::client(params.sub); + let eth_client = EthereumRpcClient::new(params.eth); + let sub_client = SubstrateRpcClient::new(params.sub).await?; - let (sub_client, initial_header) = prepare_initial_header(sub_client, params.sub_initial_header).await; - let (initial_header_hash, initial_header) = initial_header?; + let (initial_header_hash, initial_header) = prepare_initial_header(&sub_client, params.sub_initial_header).await?; let initial_set_id = params.sub_initial_authorities_set_id.unwrap_or(0); - let (_, initial_set) = prepare_initial_authorities_set( - sub_client, + let initial_set = prepare_initial_authorities_set( + &sub_client, initial_header_hash, params.sub_initial_authorities_set, - ).await; - let initial_set = initial_set?; + ).await?; log::info!( target: "bridge", @@ -81,14 +83,14 @@ pub fn run(params: EthereumDeployContractParams) { hex::encode(&initial_set), ); - ethereum_client::deploy_bridge_contract( - eth_client, + deploy_bridge_contract( + ð_client, ¶ms.eth_sign, params.eth_contract_code, initial_header, initial_set_id, initial_set, - ).await.1.map_err(|error| format!("Error deploying contract: {:?}", error)) + ).await }); if let Err(error) = result { @@ -98,39 +100,54 @@ pub fn run(params: EthereumDeployContractParams) { /// Prepare initial header. async fn prepare_initial_header( - sub_client: substrate_client::Client, + sub_client: &SubstrateRpcClient, sub_initial_header: Option>, -) -> (substrate_client::Client, Result<(SubstrateHash, Vec), String>) { +) -> Result<(SubstrateHash, Vec), String> { match sub_initial_header { Some(raw_initial_header) => match SubstrateHeader::decode(&mut &raw_initial_header[..]) { - Ok(initial_header) => (sub_client, Ok((initial_header.hash(), raw_initial_header))), - Err(error) => (sub_client, Err(format!("Error decoding initial header: {}", error))), + Ok(initial_header) => Ok((initial_header.hash(), raw_initial_header)), + Err(error) => Err(format!("Error decoding initial header: {}", error)), }, None => { - let (sub_client, initial_header) = substrate_client::header_by_number(sub_client, Zero::zero()).await; - ( - sub_client, - initial_header - .map(|header| (header.hash(), header.encode())) - .map_err(|error| format!("Error reading Substrate genesis header: {:?}", error)), - ) + let initial_header = sub_client.header_by_number(Zero::zero()).await; + initial_header + .map(|header| (header.hash(), header.encode())) + .map_err(|error| format!("Error reading Substrate genesis header: {:?}", error)) } } } /// Prepare initial GRANDPA authorities set. async fn prepare_initial_authorities_set( - sub_client: substrate_client::Client, + sub_client: &SubstrateRpcClient, sub_initial_header_hash: SubstrateHash, sub_initial_authorities_set: Option>, -) -> (substrate_client::Client, Result, String>) { - let (sub_client, initial_authorities_set) = match sub_initial_authorities_set { - Some(initial_authorities_set) => (sub_client, Ok(initial_authorities_set)), - None => substrate_client::grandpa_authorities_set(sub_client, sub_initial_header_hash).await, +) -> Result, String> { + let initial_authorities_set = match sub_initial_authorities_set { + Some(initial_authorities_set) => Ok(initial_authorities_set), + None => sub_client.grandpa_authorities_set(sub_initial_header_hash).await, }; - ( - sub_client, - initial_authorities_set.map_err(|error| format!("Error reading GRANDPA authorities set: {:?}", error)), - ) + initial_authorities_set.map_err(|error| format!("Error reading GRANDPA authorities set: {:?}", error)) +} + +/// Deploy bridge contract to Ethereum chain. +async fn deploy_bridge_contract( + eth_client: &EthereumRpcClient, + params: &EthereumSigningParams, + contract_code: Vec, + initial_header: Vec, + initial_set_id: u64, + initial_authorities: Vec, +) -> Result<(), String> { + eth_client + .submit_ethereum_transaction( + params, + None, + None, + false, + bridge_contract::constructor(contract_code, initial_header, initial_set_id, initial_authorities), + ) + .await + .map_err(|error| format!("Error deploying contract: {:?}", error)) } diff --git a/bridges/relays/ethereum/src/ethereum_sync_loop.rs b/bridges/relays/ethereum/src/ethereum_sync_loop.rs index 95f415a75322..26804d4de0cb 100644 --- a/bridges/relays/ethereum/src/ethereum_sync_loop.rs +++ b/bridges/relays/ethereum/src/ethereum_sync_loop.rs @@ -16,17 +16,23 @@ //! Ethereum PoA -> Substrate synchronization. -use crate::ethereum_client::{self, EthereumConnectionParams}; +use crate::ethereum_client::{EthereumConnectionParams, EthereumHighLevelRpc, EthereumRpcClient}; use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, QueuedEthereumHeader, Receipt}; -use crate::substrate_client::{self, SubstrateConnectionParams, SubstrateSigningParams}; +use crate::rpc::{EthereumRpc, SubstrateRpc}; +use crate::rpc_errors::RpcError; +use crate::substrate_client::{ + SubmitEthereumHeaders, SubstrateConnectionParams, SubstrateRpcClient, SubstrateSigningParams, +}; +use crate::substrate_types::into_substrate_ethereum_header; use crate::sync::{HeadersSyncParams, TargetTransactionMode}; -use crate::sync_loop::{OwnedSourceFutureOutput, OwnedTargetFutureOutput, SourceClient, TargetClient}; +use crate::sync_loop::{SourceClient, TargetClient}; +use crate::sync_types::SourceHeader; use async_trait::async_trait; -use futures::future::FutureExt; -use std::{collections::HashSet, time::Duration}; use web3::types::H256; +use std::{collections::HashSet, time::Duration}; + /// Interval at which we check new Ethereum headers when we are synced/almost synced. const ETHEREUM_TICK_INTERVAL: Duration = Duration::from_secs(10); /// Interval at which we check new Substrate blocks. @@ -45,6 +51,7 @@ const MAX_SUBMITTED_HEADERS: usize = 128; const PRUNE_DEPTH: u32 = 4096; /// Ethereum synchronization parameters. +#[derive(Clone)] pub struct EthereumSyncParams { /// Ethereum connection params. pub eth: EthereumConnectionParams, @@ -77,159 +84,124 @@ impl Default for EthereumSyncParams { /// Ethereum client as headers source. struct EthereumHeadersSource { /// Ethereum node client. - client: ethereum_client::Client, + client: EthereumRpcClient, } -type EthereumFutureOutput = OwnedSourceFutureOutput; +impl EthereumHeadersSource { + fn new(client: EthereumRpcClient) -> Self { + Self { client } + } +} #[async_trait] impl SourceClient for EthereumHeadersSource { - type Error = ethereum_client::Error; + type Error = RpcError; - async fn best_block_number(self) -> EthereumFutureOutput { - ethereum_client::best_block_number(self.client) - .map(|(client, result)| (EthereumHeadersSource { client }, result)) - .await + async fn best_block_number(&self) -> Result { + self.client.best_block_number().await } - async fn header_by_hash(self, hash: H256) -> EthereumFutureOutput
{ - ethereum_client::header_by_hash(self.client, hash) - .map(|(client, result)| (EthereumHeadersSource { client }, result)) - .await + async fn header_by_hash(&self, hash: H256) -> Result { + self.client.header_by_hash(hash).await } - async fn header_by_number(self, number: u64) -> EthereumFutureOutput
{ - ethereum_client::header_by_number(self.client, number) - .map(|(client, result)| (EthereumHeadersSource { client }, result)) - .await + async fn header_by_number(&self, number: u64) -> Result { + self.client.header_by_number(number).await } - async fn header_completion(self, id: EthereumHeaderId) -> EthereumFutureOutput<(EthereumHeaderId, Option<()>)> { - (self, Ok((id, None))) + async fn header_completion(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, Option<()>), Self::Error> { + Ok((id, None)) } async fn header_extra( - self, + &self, id: EthereumHeaderId, header: QueuedEthereumHeader, - ) -> EthereumFutureOutput<(EthereumHeaderId, Vec)> { - ethereum_client::transactions_receipts(self.client, id, header.header().transactions.clone()) - .map(|(client, result)| (EthereumHeadersSource { client }, result)) + ) -> Result<(EthereumHeaderId, Vec), Self::Error> { + self.client + .transaction_receipts(id, header.header().transactions.clone()) .await } } -/// Substrate client as Ethereum headers target. struct SubstrateHeadersTarget { /// Substrate node client. - client: substrate_client::Client, + client: SubstrateRpcClient, /// Whether we want to submit signed (true), or unsigned (false) transactions. sign_transactions: bool, /// Substrate signing params. sign_params: SubstrateSigningParams, } -type SubstrateFutureOutput = OwnedTargetFutureOutput; +impl SubstrateHeadersTarget { + fn new(client: SubstrateRpcClient, sign_transactions: bool, sign_params: SubstrateSigningParams) -> Self { + Self { + client, + sign_transactions, + sign_params, + } + } +} #[async_trait] impl TargetClient for SubstrateHeadersTarget { - type Error = substrate_client::Error; - - async fn best_header_id(self) -> SubstrateFutureOutput { - let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params); - substrate_client::best_ethereum_block(self.client) - .map(move |(client, result)| { - ( - SubstrateHeadersTarget { - client, - sign_transactions, - sign_params, - }, - result, - ) - }) - .await + type Error = RpcError; + + async fn best_header_id(&self) -> Result { + self.client.best_ethereum_block().await } - async fn is_known_header(self, id: EthereumHeaderId) -> SubstrateFutureOutput<(EthereumHeaderId, bool)> { - let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params); - substrate_client::ethereum_header_known(self.client, id) - .map(move |(client, result)| { - ( - SubstrateHeadersTarget { - client, - sign_transactions, - sign_params, - }, - result, - ) - }) - .await + async fn is_known_header(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, bool), Self::Error> { + Ok((id, self.client.ethereum_header_known(id).await?)) } - async fn submit_headers(self, headers: Vec) -> SubstrateFutureOutput> { - let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params); - substrate_client::submit_ethereum_headers(self.client, sign_params.clone(), headers, sign_transactions) - .map(move |(client, result)| { - ( - SubstrateHeadersTarget { - client, - sign_transactions, - sign_params, - }, - result, - ) - }) + async fn submit_headers(&self, headers: Vec) -> Result, Self::Error> { + let (sign_params, sign_transactions) = (self.sign_params.clone(), self.sign_transactions.clone()); + self.client + .submit_ethereum_headers(sign_params, headers, sign_transactions) .await } - async fn incomplete_headers_ids(self) -> SubstrateFutureOutput> { - (self, Ok(HashSet::new())) + async fn incomplete_headers_ids(&self) -> Result, Self::Error> { + Ok(HashSet::new()) } - async fn complete_header(self, id: EthereumHeaderId, _completion: ()) -> SubstrateFutureOutput { - (self, Ok(id)) + async fn complete_header(&self, id: EthereumHeaderId, _completion: ()) -> Result { + Ok(id) } - async fn requires_extra(self, header: QueuedEthereumHeader) -> SubstrateFutureOutput<(EthereumHeaderId, bool)> { + async fn requires_extra(&self, header: QueuedEthereumHeader) -> Result<(EthereumHeaderId, bool), Self::Error> { // we can minimize number of receipts_check calls by checking header // logs bloom here, but it may give us false positives (when authorities // source is contract, we never need any logs) - let (sign_transactions, sign_params) = (self.sign_transactions, self.sign_params); - substrate_client::ethereum_receipts_required(self.client, header) - .map(move |(client, result)| { - ( - SubstrateHeadersTarget { - client, - sign_transactions, - sign_params, - }, - result, - ) - }) - .await + let id = header.header().id(); + let sub_eth_header = into_substrate_ethereum_header(header.header()); + Ok((id, self.client.ethereum_receipts_required(sub_eth_header).await?)) } } /// Run Ethereum headers synchronization. -pub fn run(params: EthereumSyncParams) { - let eth_client = ethereum_client::client(params.eth); - let sub_client = substrate_client::client(params.sub); +pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> { + let sub_params = params.clone(); + + let eth_client = EthereumRpcClient::new(params.eth); + let sub_client = async_std::task::block_on(async { SubstrateRpcClient::new(sub_params.sub).await })?; let sign_sub_transactions = match params.sync_params.target_tx_mode { TargetTransactionMode::Signed | TargetTransactionMode::Backup => true, TargetTransactionMode::Unsigned => false, }; + let source = EthereumHeadersSource::new(eth_client); + let target = SubstrateHeadersTarget::new(sub_client, sign_sub_transactions, params.sub_sign); + crate::sync_loop::run( - EthereumHeadersSource { client: eth_client }, + source, ETHEREUM_TICK_INTERVAL, - SubstrateHeadersTarget { - client: sub_client, - sign_transactions: sign_sub_transactions, - sign_params: params.sub_sign, - }, + target, SUBSTRATE_TICK_INTERVAL, params.sync_params, ); + + Ok(()) } diff --git a/bridges/relays/ethereum/src/main.rs b/bridges/relays/ethereum/src/main.rs index c17582e23cb0..5786ff23dbfc 100644 --- a/bridges/relays/ethereum/src/main.rs +++ b/bridges/relays/ethereum/src/main.rs @@ -46,28 +46,38 @@ fn main() { let matches = clap::App::from_yaml(yaml).get_matches(); match matches.subcommand() { ("eth-to-sub", Some(eth_to_sub_matches)) => { - ethereum_sync_loop::run(match ethereum_sync_params(ð_to_sub_matches) { + if ethereum_sync_loop::run(match ethereum_sync_params(ð_to_sub_matches) { Ok(ethereum_sync_params) => ethereum_sync_params, Err(err) => { log::error!(target: "bridge", "Error parsing parameters: {}", err); return; } - }); + }) + .is_err() + { + log::error!(target: "bridge", "Unable to get Substrate genesis block for Ethereum sync."); + return; + }; } ("sub-to-eth", Some(sub_to_eth_matches)) => { - substrate_sync_loop::run(match substrate_sync_params(&sub_to_eth_matches) { + if substrate_sync_loop::run(match substrate_sync_params(&sub_to_eth_matches) { Ok(substrate_sync_params) => substrate_sync_params, Err(err) => { log::error!(target: "bridge", "Error parsing parameters: {}", err); return; } - }); + }) + .is_err() + { + log::error!(target: "bridge", "Unable to get Substrate genesis block for Substrate sync."); + return; + }; } ("eth-deploy-contract", Some(eth_deploy_matches)) => { ethereum_deploy_contract::run(match ethereum_deploy_contract_params(ð_deploy_matches) { Ok(ethereum_deploy_matches) => ethereum_deploy_matches, Err(err) => { - log::error!(target: "bridge", "Error parsing parameters: {}", err); + log::error!(target: "bridge", "Error during contract deployment: {}", err); return; } }); diff --git a/bridges/relays/ethereum/src/rpc.rs b/bridges/relays/ethereum/src/rpc.rs index 70c775a268fb..1dc253406c08 100644 --- a/bridges/relays/ethereum/src/rpc.rs +++ b/bridges/relays/ethereum/src/rpc.rs @@ -16,69 +16,61 @@ //! RPC Module +#![warn(missing_docs)] + +// The compiler doesn't think we're using the +// code from rpc_api! #![allow(dead_code)] #![allow(unused_variables)] -#[warn(missing_docs)] use std::result; -use crate::ethereum_client::EthereumConnectionParams; use crate::ethereum_types::{ Address as EthAddress, Bytes, CallRequest, EthereumHeaderId, Header as EthereumHeader, Receipt, SignedRawTx, TransactionHash as EthereumTxHash, H256, U256, U64, }; -use crate::rpc_errors::{EthereumNodeError, RpcError}; -use crate::substrate_client::SubstrateConnectionParams; +use crate::rpc_errors::RpcError; use crate::substrate_types::{ Hash as SubstrateHash, Header as SubstrateHeader, Number as SubBlockNumber, SignedBlock as SubstrateBlock, }; -use crate::sync_types::HeaderId; use async_trait::async_trait; -use codec::{Decode, Encode}; -use jsonrpsee::raw::client::RawClient; -use jsonrpsee::transport::http::HttpTransportClient; use sp_bridge_eth_poa::Header as SubstrateEthereumHeader; -const ETH_API_BEST_BLOCK: &str = "EthereumHeadersApi_best_block"; -const ETH_API_IMPORT_REQUIRES_RECEIPTS: &str = "EthereumHeadersApi_is_import_requires_receipts"; -const ETH_API_IS_KNOWN_BLOCK: &str = "EthereumHeadersApi_is_known_block"; -const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; - type Result = result::Result; type GrandpaAuthorityList = Vec; jsonrpsee::rpc_api! { - Ethereum { - #[rpc(method = "eth_estimateGas")] + pub(crate) Ethereum { + #[rpc(method = "eth_estimateGas", positional_params)] fn estimate_gas(call_request: CallRequest) -> U256; - #[rpc(method = "eth_blockNumber")] + #[rpc(method = "eth_blockNumber", positional_params)] fn block_number() -> U64; - #[rpc(method = "eth_getBlockByNumber")] - fn get_block_by_number(block_number: u64) -> EthereumHeader; - #[rpc(method = "eth_getBlockByHash")] + #[rpc(method = "eth_getBlockByNumber", positional_params)] + fn get_block_by_number(block_number: U64, full_tx_objs: bool) -> EthereumHeader; + #[rpc(method = "eth_getBlockByHash", positional_params)] fn get_block_by_hash(hash: H256) -> EthereumHeader; - #[rpc(method = "eth_getTransactionReceipt")] + #[rpc(method = "eth_getTransactionReceipt", positional_params)] fn get_transaction_receipt(transaction_hash: H256) -> Receipt; - #[rpc(method = "eth_getTransactionCount")] + #[rpc(method = "eth_getTransactionCount", positional_params)] fn get_transaction_count(address: EthAddress) -> U256; - #[rpc(method = "eth_submitTransaction")] + #[rpc(method = "eth_submitTransaction", positional_params)] fn submit_transaction(transaction: Bytes) -> EthereumTxHash; - #[rpc(method = "eth_call")] + #[rpc(method = "eth_call", positional_params)] fn call(transaction_call: CallRequest) -> Bytes; } - Substrate { - #[rpc(method = "chain_getHeader")] + pub(crate) Substrate { + #[rpc(method = "chain_getHeader", positional_params)] fn chain_get_header(block_hash: Option) -> SubstrateHeader; - #[rpc(method = "chain_getBlock")] + #[rpc(method = "chain_getBlock", positional_params)] fn chain_get_block(block_hash: Option) -> SubstrateBlock; - #[rpc(method = "chain_getBlockHash")] + #[rpc(method = "chain_getBlockHash", positional_params)] fn chain_get_block_hash(block_number: Option) -> SubstrateHash; - #[rpc(method = "system_accountNextIndex")] + #[rpc(method = "system_accountNextIndex", positional_params)] fn system_account_next_index(account_id: node_primitives::AccountId) -> node_primitives::Index; - #[rpc(method = "author_submitExtrinsic")] + #[rpc(method = "author_submitExtrinsic", positional_params)] fn author_submit_extrinsic(extrinsic: Bytes) -> SubstrateHash; - #[rpc(method = "state_call")] + #[rpc(method = "state_call", positional_params)] fn state_call(method: String, data: Bytes, at_block: Option) -> Bytes; } } @@ -87,215 +79,52 @@ jsonrpsee::rpc_api! { #[async_trait] pub trait EthereumRpc { /// Estimate gas usage for the given call. - async fn estimate_gas(&mut self, call_request: CallRequest) -> Result; + async fn estimate_gas(&self, call_request: CallRequest) -> Result; /// Retrieve number of the best known block from the Ethereum node. - async fn best_block_number(&mut self) -> Result; + async fn best_block_number(&self) -> Result; /// Retrieve block header by its number from Ethereum node. - async fn header_by_number(&mut self, block_number: u64) -> Result; + async fn header_by_number(&self, block_number: u64) -> Result; /// Retrieve block header by its hash from Ethereum node. - async fn header_by_hash(&mut self, hash: H256) -> Result; + async fn header_by_hash(&self, hash: H256) -> Result; /// Retrieve transaction receipt by transaction hash. - async fn transaction_receipt(&mut self, transaction_hash: H256) -> Result; + async fn transaction_receipt(&self, transaction_hash: H256) -> Result; /// Get the nonce of the given account. - async fn account_nonce(&mut self, address: EthAddress) -> Result; + async fn account_nonce(&self, address: EthAddress) -> Result; /// Submit an Ethereum transaction. /// /// The transaction must already be signed before sending it through this method. - async fn submit_transaction(&mut self, signed_raw_tx: SignedRawTx) -> Result; + async fn submit_transaction(&self, signed_raw_tx: SignedRawTx) -> Result; /// Submit a call to an Ethereum smart contract. - async fn eth_call(&mut self, call_transaction: CallRequest) -> Result; -} - -/// The client used to interact with an Ethereum node through RPC. -pub struct EthereumRpcClient { - client: RawClient, -} - -impl EthereumRpcClient { - /// Create a new Ethereum RPC Client. - pub fn new(params: EthereumConnectionParams) -> Self { - let uri = format!("http://{}:{}", params.host, params.port); - let transport = HttpTransportClient::new(&uri); - let client = RawClient::new(transport); - - Self { client } - } -} - -#[async_trait] -impl EthereumRpc for EthereumRpcClient { - async fn estimate_gas(&mut self, call_request: CallRequest) -> Result { - Ok(Ethereum::estimate_gas(&mut self.client, call_request).await?) - } - - async fn best_block_number(&mut self) -> Result { - Ok(Ethereum::block_number(&mut self.client).await?.as_u64()) - } - - async fn header_by_number(&mut self, block_number: u64) -> Result { - let header = Ethereum::get_block_by_number(&mut self.client, block_number).await?; - match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { - true => Ok(header), - false => Err(RpcError::Ethereum(EthereumNodeError::IncompleteHeader)), - } - } - - async fn header_by_hash(&mut self, hash: H256) -> Result { - let header = Ethereum::get_block_by_hash(&mut self.client, hash).await?; - match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { - true => Ok(header), - false => Err(RpcError::Ethereum(EthereumNodeError::IncompleteHeader)), - } - } - - async fn transaction_receipt(&mut self, transaction_hash: H256) -> Result { - let receipt = Ethereum::get_transaction_receipt(&mut self.client, transaction_hash).await?; - - match receipt.gas_used { - Some(_) => Ok(receipt), - None => Err(RpcError::Ethereum(EthereumNodeError::IncompleteReceipt)), - } - } - - async fn account_nonce(&mut self, address: EthAddress) -> Result { - Ok(Ethereum::get_transaction_count(&mut self.client, address).await?) - } - - async fn submit_transaction(&mut self, signed_raw_tx: SignedRawTx) -> Result { - let transaction = Bytes(signed_raw_tx); - Ok(Ethereum::submit_transaction(&mut self.client, transaction).await?) - } - - async fn eth_call(&mut self, call_transaction: CallRequest) -> Result { - Ok(Ethereum::call(&mut self.client, call_transaction).await?) - } + async fn eth_call(&self, call_transaction: CallRequest) -> Result; } /// The API for the supported Substrate RPC methods. #[async_trait] pub trait SubstrateRpc { /// Returns the best Substrate header. - async fn best_header(&mut self) -> Result; + async fn best_header(&self) -> Result; /// Get a Substrate block from its hash. - async fn get_block(&mut self, block_hash: Option) -> Result; + async fn get_block(&self, block_hash: Option) -> Result; /// Get a Substrate header by its hash. - async fn header_by_hash(&mut self, hash: SubstrateHash) -> Result; + async fn header_by_hash(&self, hash: SubstrateHash) -> Result; /// Get a Substrate block hash by its number. - async fn block_hash_by_number(&mut self, number: SubBlockNumber) -> Result; + async fn block_hash_by_number(&self, number: SubBlockNumber) -> Result; /// Get a Substrate header by its number. - async fn header_by_number(&mut self, block_number: SubBlockNumber) -> Result; + async fn header_by_number(&self, block_number: SubBlockNumber) -> Result; /// Get the nonce of the given Substrate account. /// /// Note: It's the caller's responsibility to make sure `account` is a valid ss58 address. - async fn next_account_index(&mut self, account: node_primitives::AccountId) -> Result; + async fn next_account_index(&self, account: node_primitives::AccountId) -> Result; /// Returns best Ethereum block that Substrate runtime knows of. - async fn best_ethereum_block(&mut self) -> Result; + async fn best_ethereum_block(&self) -> Result; /// Returns whether or not transactions receipts are required for Ethereum header submission. - async fn ethereum_receipts_required(&mut self, header: SubstrateEthereumHeader) -> Result; + async fn ethereum_receipts_required(&self, header: SubstrateEthereumHeader) -> Result; /// Returns whether or not the given Ethereum header is known to the Substrate runtime. - async fn ethereum_header_known(&mut self, header_id: EthereumHeaderId) -> Result; + async fn ethereum_header_known(&self, header_id: EthereumHeaderId) -> Result; /// Submit an extrinsic for inclusion in a block. /// /// Note: The given transaction does not need be SCALE encoded beforehand. - async fn submit_extrinsic(&mut self, transaction: Bytes) -> Result; + async fn submit_extrinsic(&self, transaction: Bytes) -> Result; /// Get the GRANDPA authority set at given block. - async fn grandpa_authorities_set(&mut self, block: SubstrateHash) -> Result; -} - -/// The client used to interact with a Substrate node through RPC. -pub struct SubstrateRpcClient { - client: RawClient, -} - -impl SubstrateRpcClient { - /// Create a new Substrate RPC Client. - pub fn new(params: SubstrateConnectionParams) -> Self { - let uri = format!("http://{}:{}", params.host, params.port); - let transport = HttpTransportClient::new(&uri); - let client = RawClient::new(transport); - - Self { client } - } -} - -#[async_trait] -impl SubstrateRpc for SubstrateRpcClient { - async fn best_header(&mut self) -> Result { - Ok(Substrate::chain_get_header(&mut self.client, None).await?) - } - - async fn get_block(&mut self, block_hash: Option) -> Result { - Ok(Substrate::chain_get_block(&mut self.client, block_hash).await?) - } - - async fn header_by_hash(&mut self, block_hash: SubstrateHash) -> Result { - Ok(Substrate::chain_get_header(&mut self.client, block_hash).await?) - } - - async fn block_hash_by_number(&mut self, number: SubBlockNumber) -> Result { - Ok(Substrate::chain_get_block_hash(&mut self.client, number).await?) - } - - async fn header_by_number(&mut self, block_number: SubBlockNumber) -> Result { - let block_hash = Self::block_hash_by_number(self, block_number).await?; - Ok(Self::header_by_hash(self, block_hash).await?) - } - - async fn next_account_index(&mut self, account: node_primitives::AccountId) -> Result { - Ok(Substrate::system_account_next_index(&mut self.client, account).await?) - } - - async fn best_ethereum_block(&mut self) -> Result { - let call = ETH_API_BEST_BLOCK.to_string(); - let data = Bytes("0x".into()); - - let encoded_response = Substrate::state_call(&mut self.client, call, data, None).await?; - let decoded_response: (u64, sp_bridge_eth_poa::H256) = Decode::decode(&mut &encoded_response.0[..])?; - - let best_header_id = HeaderId(decoded_response.0, decoded_response.1); - Ok(best_header_id) - } - - async fn ethereum_receipts_required(&mut self, header: SubstrateEthereumHeader) -> Result { - let call = ETH_API_IMPORT_REQUIRES_RECEIPTS.to_string(); - let data = Bytes(header.encode()); - - let encoded_response = Substrate::state_call(&mut self.client, call, data, None).await?; - let receipts_required: bool = Decode::decode(&mut &encoded_response.0[..])?; - - // Gonna make it the responsibility of the caller to return (receipts_required, id) - Ok(receipts_required) - } - - // The Substrate module could prune old headers. So this function could return false even - // if header is synced. And we'll mark corresponding Ethereum header as Orphan. - // - // But when we read the best header from Substrate next time, we will know that - // there's a better header. This Orphan will either be marked as synced, or - // eventually pruned. - async fn ethereum_header_known(&mut self, header_id: EthereumHeaderId) -> Result { - let call = ETH_API_IS_KNOWN_BLOCK.to_string(); - let data = Bytes(header_id.1.encode()); - - let encoded_response = Substrate::state_call(&mut self.client, call, data, None).await?; - let is_known_block: bool = Decode::decode(&mut &encoded_response.0[..])?; - - // Gonna make it the responsibility of the caller to return (is_known_block, id) - Ok(is_known_block) - } - - async fn submit_extrinsic(&mut self, transaction: Bytes) -> Result { - let encoded_transaction = Bytes(transaction.0.encode()); - Ok(Substrate::author_submit_extrinsic(&mut self.client, encoded_transaction).await?) - } - - async fn grandpa_authorities_set(&mut self, block: SubstrateHash) -> Result { - let call = SUB_API_GRANDPA_AUTHORITIES.to_string(); - let data = Bytes(block.as_bytes().to_vec()); - - let encoded_response = Substrate::state_call(&mut self.client, call, data, None).await?; - let authority_list = encoded_response.0; - - Ok(authority_list) - } + async fn grandpa_authorities_set(&self, block: SubstrateHash) -> Result; } diff --git a/bridges/relays/ethereum/src/rpc_errors.rs b/bridges/relays/ethereum/src/rpc_errors.rs index 86758b559920..34295df756d6 100644 --- a/bridges/relays/ethereum/src/rpc_errors.rs +++ b/bridges/relays/ethereum/src/rpc_errors.rs @@ -14,14 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -#![allow(dead_code)] +use crate::sync_types::MaybeConnectionError; -use jsonrpsee::raw::client::RawClientError; -use jsonrpsee::transport::http::RequestError; +use jsonrpsee::client::RequestError; use serde_json; -type RpcHttpError = RawClientError; - /// Contains common errors that can occur when /// interacting with a Substrate or Ethereum node /// through RPC. @@ -35,9 +32,18 @@ pub enum RpcError { Substrate(SubstrateNodeError), /// An error that can occur when making an HTTP request to /// an JSON-RPC client. - Request(RpcHttpError), - /// The response from the client could not be SCALE decoded. - Decoding(codec::Error), + Request(RequestError), +} + +impl From for String { + fn from(err: RpcError) -> Self { + match err { + RpcError::Serialization(e) => e.to_string(), + RpcError::Ethereum(e) => e.to_string(), + RpcError::Substrate(e) => e.to_string(), + RpcError::Request(e) => e.to_string(), + } + } } impl From for RpcError { @@ -58,15 +64,30 @@ impl From for RpcError { } } -impl From for RpcError { - fn from(err: RpcHttpError) -> Self { +impl From for RpcError { + fn from(err: RequestError) -> Self { Self::Request(err) } } +impl From for RpcError { + fn from(err: ethabi::Error) -> Self { + Self::Ethereum(EthereumNodeError::ResponseParseFailed(format!("{}", err))) + } +} + +impl MaybeConnectionError for RpcError { + fn is_connection_error(&self) -> bool { + match *self { + RpcError::Request(RequestError::TransportError(_)) => true, + _ => false, + } + } +} + impl From for RpcError { fn from(err: codec::Error) -> Self { - Self::Decoding(err) + Self::Substrate(SubstrateNodeError::Decoding(err)) } } @@ -85,14 +106,30 @@ pub enum EthereumNodeError { InvalidSubstrateBlockNumber, } +impl ToString for EthereumNodeError { + fn to_string(&self) -> String { + match self { + Self::ResponseParseFailed(e) => e, + Self::IncompleteHeader => "Incomplete Ethereum Header Received", + Self::IncompleteReceipt => "Incomplete Ethereum Receipt Recieved", + Self::InvalidSubstrateBlockNumber => "Received an invalid Substrate block from Ethereum Node", + } + .to_string() + } +} + /// Errors that can occur only when interacting with /// a Substrate node through RPC. #[derive(Debug)] pub enum SubstrateNodeError { - /// Request start failed. - StartRequestFailed(RequestError), - /// Error serializing request. - RequestSerialization(serde_json::Error), - /// Failed to parse response. - ResponseParseFailed, + /// The response from the client could not be SCALE decoded. + Decoding(codec::Error), +} + +impl ToString for SubstrateNodeError { + fn to_string(&self) -> String { + match self { + Self::Decoding(e) => e.what().to_string(), + } + } } diff --git a/bridges/relays/ethereum/src/substrate_client.rs b/bridges/relays/ethereum/src/substrate_client.rs index e7067cbda3a0..cd77545b266b 100644 --- a/bridges/relays/ethereum/src/substrate_client.rs +++ b/bridges/relays/ethereum/src/substrate_client.rs @@ -15,24 +15,34 @@ // along with Parity Bridges Common. If not, see . use crate::ethereum_types::{Bytes, EthereumHeaderId, QueuedEthereumHeader, H256}; +use crate::rpc::{Substrate, SubstrateRpc}; +use crate::rpc_errors::RpcError; use crate::substrate_types::{ - into_substrate_ethereum_header, into_substrate_ethereum_receipts, GrandpaJustification, Hash, - Header as SubstrateHeader, Number, SignedBlock as SignedSubstrateBlock, SubstrateHeaderId, + into_substrate_ethereum_header, into_substrate_ethereum_receipts, Hash, Header as SubstrateHeader, Number, + SignedBlock as SignedSubstrateBlock, }; -use crate::sync_types::{HeaderId, MaybeConnectionError, SourceHeader}; -use crate::{bail_on_arg_error, bail_on_error}; +use crate::sync_types::HeaderId; + +use async_trait::async_trait; use codec::{Decode, Encode}; -use jsonrpsee::common::Params; -use jsonrpsee::raw::{RawClient, RawClientError}; -use jsonrpsee::transport::http::{HttpTransportClient, RequestError}; +use jsonrpsee::raw::RawClient; +use jsonrpsee::transport::http::HttpTransportClient; +use jsonrpsee::Client; use num_traits::Zero; -use serde::de::DeserializeOwned; -use serde_json::{from_value, to_value, Value}; +use sp_bridge_eth_poa::Header as SubstrateEthereumHeader; use sp_core::crypto::Pair; use sp_runtime::traits::IdentifyAccount; +const ETH_API_IMPORT_REQUIRES_RECEIPTS: &str = "EthereumHeadersApi_is_import_requires_receipts"; +const ETH_API_IS_KNOWN_BLOCK: &str = "EthereumHeadersApi_is_known_block"; +const ETH_API_BEST_BLOCK: &str = "EthereumHeadersApi_best_block"; +const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; + +type Result = std::result::Result; +type GrandpaAuthorityList = Vec; + /// Substrate connection params. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SubstrateConnectionParams { /// Substrate RPC host. pub host: String, @@ -71,301 +81,177 @@ impl Default for SubstrateSigningParams { } /// Substrate client type. -pub struct Client { +pub struct SubstrateRpcClient { /// Substrate RPC client. - rpc_client: RawClient, + client: Client, /// Genesis block hash. - genesis_hash: Option, + genesis_hash: H256, } -/// All possible errors that can occur during interacting with Ethereum node. -#[derive(Debug)] -pub enum Error { - /// Request start failed. - StartRequestFailed(RequestError), - /// Error serializing request. - RequestSerialization(serde_json::Error), - /// Request not found (should never occur?). - RequestNotFound, - /// Failed to receive response. - ResponseRetrievalFailed(RawClientError), - /// Failed to parse response. - ResponseParseFailed, -} +impl SubstrateRpcClient { + /// Returns client that is able to call RPCs on Substrate node. + pub async fn new(params: SubstrateConnectionParams) -> Result { + let uri = format!("http://{}:{}", params.host, params.port); + let transport = HttpTransportClient::new(&uri); + let raw_client = RawClient::new(transport); + let client: Client = raw_client.into(); -impl MaybeConnectionError for Error { - fn is_connection_error(&self) -> bool { - match *self { - Error::StartRequestFailed(_) | Error::ResponseRetrievalFailed(_) => true, - _ => false, - } + let number: Number = Zero::zero(); + let genesis_hash = Substrate::chain_get_block_hash(&client, number).await?; + + Ok(Self { client, genesis_hash }) } } -/// Returns client that is able to call RPCs on Substrate node. -pub fn client(params: SubstrateConnectionParams) -> Client { - let uri = format!("http://{}:{}", params.host, params.port); - let transport = HttpTransportClient::new(&uri); - Client { - rpc_client: RawClient::new(transport), - genesis_hash: None, +#[async_trait] +impl SubstrateRpc for SubstrateRpcClient { + async fn best_header(&self) -> Result { + Ok(Substrate::chain_get_header(&self.client, None).await?) } -} -/// Returns best Substrate header. -pub async fn best_header(client: Client) -> (Client, Result) { - call_rpc(client, "chain_getHeader", Params::None, rpc_returns_value).await -} + async fn get_block(&self, block_hash: Option) -> Result { + Ok(Substrate::chain_get_block(&self.client, block_hash).await?) + } -/// Returns Substrate header by hash. -pub async fn header_by_hash(client: Client, hash: Hash) -> (Client, Result) { - let hash = bail_on_arg_error!(to_value(hash).map_err(|e| Error::RequestSerialization(e)), client); - call_rpc(client, "chain_getHeader", Params::Array(vec![hash]), rpc_returns_value).await -} + async fn header_by_hash(&self, block_hash: Hash) -> Result { + Ok(Substrate::chain_get_header(&self.client, block_hash).await?) + } -/// Returns Substrate header by number. -pub async fn header_by_number(client: Client, number: Number) -> (Client, Result) { - let (client, hash) = bail_on_error!(block_hash_by_number(client, number).await); - header_by_hash(client, hash).await -} + async fn block_hash_by_number(&self, number: Number) -> Result { + Ok(Substrate::chain_get_block_hash(&self.client, number).await?) + } -/// Returns best Ethereum block that Substrate runtime knows of. -pub async fn best_ethereum_block(client: Client) -> (Client, Result) { - let (client, result) = call_rpc( - client, - "state_call", - Params::Array(vec![ - serde_json::Value::String("EthereumHeadersApi_best_block".into()), - serde_json::Value::String("0x".into()), - ]), - rpc_returns_encoded_value, - ) - .await; - (client, result.map(|(num, hash)| HeaderId(num, hash))) -} + async fn header_by_number(&self, block_number: Number) -> Result { + let block_hash = Self::block_hash_by_number(self, block_number).await?; + Ok(Self::header_by_hash(self, block_hash).await?) + } -/// Returns true if transactions receipts are required for Ethereum header submission. -pub async fn ethereum_receipts_required( - client: Client, - header: QueuedEthereumHeader, -) -> (Client, Result<(EthereumHeaderId, bool), Error>) { - let id = header.header().id(); - let header = into_substrate_ethereum_header(header.header()); - let encoded_header = bail_on_arg_error!( - to_value(Bytes(header.encode())).map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, receipts_required) = call_rpc( - client, - "state_call", - Params::Array(vec![ - serde_json::Value::String("EthereumHeadersApi_is_import_requires_receipts".into()), - encoded_header, - ]), - rpc_returns_encoded_value, - ) - .await; - ( - client, - receipts_required.map(|receipts_required| (id, receipts_required)), - ) -} + async fn next_account_index(&self, account: node_primitives::AccountId) -> Result { + Ok(Substrate::system_account_next_index(&self.client, account).await?) + } -/// Returns true if Ethereum header is known to Substrate runtime. -pub async fn ethereum_header_known( - client: Client, - id: EthereumHeaderId, -) -> (Client, Result<(EthereumHeaderId, bool), Error>) { - // Substrate module could prune old headers. So this fn could return false even + async fn best_ethereum_block(&self) -> Result { + let call = ETH_API_BEST_BLOCK.to_string(); + let data = Bytes("0x".into()); + + let encoded_response = Substrate::state_call(&self.client, call, data, None).await?; + let decoded_response: (u64, sp_bridge_eth_poa::H256) = Decode::decode(&mut &encoded_response.0[..])?; + + let best_header_id = HeaderId(decoded_response.0, decoded_response.1); + Ok(best_header_id) + } + + async fn ethereum_receipts_required(&self, header: SubstrateEthereumHeader) -> Result { + let call = ETH_API_IMPORT_REQUIRES_RECEIPTS.to_string(); + let data = Bytes(header.encode()); + + let encoded_response = Substrate::state_call(&self.client, call, data, None).await?; + let receipts_required: bool = Decode::decode(&mut &encoded_response.0[..])?; + + Ok(receipts_required) + } + + // The Substrate module could prune old headers. So this function could return false even // if header is synced. And we'll mark corresponding Ethereum header as Orphan. // - // But when we'll read best header from Substrate next time, we will know that - // there's a better header => this Orphan will either be marked as synced, or + // But when we read the best header from Substrate next time, we will know that + // there's a better header. This Orphan will either be marked as synced, or // eventually pruned. - let encoded_id = bail_on_arg_error!( - to_value(Bytes(id.1.encode())).map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, is_known_block) = call_rpc( - client, - "state_call", - Params::Array(vec![ - serde_json::Value::String("EthereumHeadersApi_is_known_block".into()), - encoded_id, - ]), - rpc_returns_encoded_value, - ) - .await; - (client, is_known_block.map(|is_known_block| (id, is_known_block))) -} + async fn ethereum_header_known(&self, header_id: EthereumHeaderId) -> Result { + let call = ETH_API_IS_KNOWN_BLOCK.to_string(); + let data = Bytes(header_id.1.encode()); -/// Submits Ethereum header to Substrate runtime. -pub async fn submit_ethereum_headers( - client: Client, - params: SubstrateSigningParams, - headers: Vec, - sign_transactions: bool, -) -> (Client, Result, Error>) { - match sign_transactions { - true => submit_signed_ethereum_headers(client, params, headers).await, - false => submit_unsigned_ethereum_headers(client, headers).await, + let encoded_response = Substrate::state_call(&self.client, call, data, None).await?; + let is_known_block: bool = Decode::decode(&mut &encoded_response.0[..])?; + + Ok(is_known_block) } -} -/// Submits signed Ethereum header to Substrate runtime. -pub async fn submit_signed_ethereum_headers( - client: Client, - params: SubstrateSigningParams, - headers: Vec, -) -> (Client, Result, Error>) { - let ids = headers.iter().map(|header| header.id()).collect(); - let (client, genesis_hash) = match client.genesis_hash { - Some(genesis_hash) => (client, genesis_hash), - None => { - let (mut client, genesis_hash) = bail_on_error!(block_hash_by_number(client, Zero::zero()).await); - client.genesis_hash = Some(genesis_hash); - (client, genesis_hash) - } - }; - let account_id = params.signer.public().as_array_ref().clone().into(); - let (client, nonce) = bail_on_error!(next_account_index(client, account_id).await); + async fn submit_extrinsic(&self, transaction: Bytes) -> Result { + Ok(Substrate::author_submit_extrinsic(&self.client, transaction).await?) + } - let transaction = create_signed_submit_transaction(headers, ¶ms.signer, nonce, genesis_hash); - let encoded_transaction = bail_on_arg_error!( - to_value(Bytes(transaction.encode())).map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, _) = bail_on_error!( - call_rpc( - client, - "author_submitExtrinsic", - Params::Array(vec![encoded_transaction]), - |_| Ok(()), - ) - .await - ); + async fn grandpa_authorities_set(&self, block: Hash) -> Result { + let call = SUB_API_GRANDPA_AUTHORITIES.to_string(); + let data = Bytes(block.as_bytes().to_vec()); - (client, Ok(ids)) -} + let encoded_response = Substrate::state_call(&self.client, call, data, None).await?; + let authority_list = encoded_response.0; -/// Submits unsigned Ethereum header to Substrate runtime. -pub async fn submit_unsigned_ethereum_headers( - mut client: Client, - headers: Vec, -) -> (Client, Result, Error>) { - let ids = headers.iter().map(|header| header.id()).collect(); - for header in headers { - let transaction = create_unsigned_submit_transaction(header); - - let encoded_transaction = bail_on_arg_error!( - to_value(Bytes(transaction.encode())).map_err(|e| Error::RequestSerialization(e)), - client - ); - let (used_client, _) = bail_on_error!( - call_rpc( - client, - "author_submitExtrinsic", - Params::Array(vec![encoded_transaction]), - |_| Ok(()), - ) - .await - ); - - client = used_client; + Ok(authority_list) } - - (client, Ok(ids)) } -/// Get GRANDPA justification for given block. -pub async fn grandpa_justification( - client: Client, - id: SubstrateHeaderId, -) -> (Client, Result<(SubstrateHeaderId, Option), Error>) { - let hash = bail_on_arg_error!(to_value(id.1).map_err(|e| Error::RequestSerialization(e)), client); - let (client, signed_block) = call_rpc(client, "chain_getBlock", Params::Array(vec![hash]), rpc_returns_value).await; - ( - client, - signed_block.map(|signed_block: SignedSubstrateBlock| (id, signed_block.justification)), - ) +/// A trait for RPC calls which are used to submit Ethereum headers to a Substrate +/// runtime. These are typically calls which use a combination of other low-level RPC +/// calls. +#[async_trait] +pub trait SubmitEthereumHeaders: SubstrateRpc { + /// Submits Ethereum header to Substrate runtime. + async fn submit_ethereum_headers( + &self, + params: SubstrateSigningParams, + headers: Vec, + sign_transactions: bool, + ) -> Result>; + + /// Submits signed Ethereum header to Substrate runtime. + async fn submit_signed_ethereum_headers( + &self, + params: SubstrateSigningParams, + headers: Vec, + ) -> Result>; + + /// Submits unsigned Ethereum header to Substrate runtime. + async fn submit_unsigned_ethereum_headers( + &self, + headers: Vec, + ) -> Result>; } -/// Get GRANDPA authorities set at given block. -pub async fn grandpa_authorities_set(client: Client, block: Hash) -> (Client, Result, Error>) { - let block = bail_on_arg_error!(to_value(block).map_err(|e| Error::RequestSerialization(e)), client); - call_rpc( - client, - "state_call", - Params::Array(vec![ - serde_json::Value::String("GrandpaApi_grandpa_authorities".into()), - block, - ]), - rpc_returns_bytes, - ) - .await -} +#[async_trait] +impl SubmitEthereumHeaders for SubstrateRpcClient { + async fn submit_ethereum_headers( + &self, + params: SubstrateSigningParams, + headers: Vec, + sign_transactions: bool, + ) -> Result> { + if sign_transactions { + self.submit_signed_ethereum_headers(params, headers).await + } else { + self.submit_unsigned_ethereum_headers(headers).await + } + } -/// Get Substrate block hash by its number. -async fn block_hash_by_number(client: Client, number: Number) -> (Client, Result) { - let number = bail_on_arg_error!(to_value(number).map_err(|e| Error::RequestSerialization(e)), client); - call_rpc( - client, - "chain_getBlockHash", - Params::Array(vec![number]), - rpc_returns_value, - ) - .await -} + async fn submit_signed_ethereum_headers( + &self, + params: SubstrateSigningParams, + headers: Vec, + ) -> Result> { + let ids = headers.iter().map(|header| header.id()).collect(); -/// Get substrate account nonce. -async fn next_account_index( - client: Client, - account: node_primitives::AccountId, -) -> (Client, Result) { - use sp_core::crypto::Ss58Codec; + let account_id = params.signer.public().as_array_ref().clone().into(); + let nonce = self.next_account_index(account_id).await?; - let account = bail_on_arg_error!( - to_value(account.to_ss58check()).map_err(|e| Error::RequestSerialization(e)), - client - ); - let (client, index) = call_rpc(client, "system_accountNextIndex", Params::Array(vec![account]), |v| { - rpc_returns_value::(v) - }) - .await; - (client, index.map(|index| index as _)) -} + let transaction = create_signed_submit_transaction(headers, ¶ms.signer, nonce, self.genesis_hash); + let _ = self.submit_extrinsic(Bytes(transaction.encode())).await?; -/// Calls RPC on Substrate node that returns Bytes. -async fn call_rpc( - mut client: Client, - method: &'static str, - params: Params, - decode_value: impl Fn(Value) -> Result, -) -> (Client, Result) { - async fn do_call_rpc( - client: &mut Client, - method: &'static str, - params: Params, - decode_value: impl Fn(Value) -> Result, - ) -> Result { - let request_id = client - .rpc_client - .start_request(method, params) - .await - .map_err(Error::StartRequestFailed)?; - // WARN: if there'll be need for executing >1 request at a time, we should avoid - // calling request_by_id - let response = client - .rpc_client - .request_by_id(request_id) - .ok_or(Error::RequestNotFound)? - .await - .map_err(Error::ResponseRetrievalFailed)?; - decode_value(response) + Ok(ids) } - let result = do_call_rpc(&mut client, method, params, decode_value).await; - (client, result) + async fn submit_unsigned_ethereum_headers( + &self, + headers: Vec, + ) -> Result> { + let ids = headers.iter().map(|header| header.id()).collect(); + for header in headers { + let transaction = create_unsigned_submit_transaction(header); + let _ = self.submit_extrinsic(Bytes(transaction.encode())).await?; + } + + Ok(ids) + } } /// Create signed Substrate transaction for submitting Ethereum headers. @@ -429,20 +315,3 @@ fn create_unsigned_submit_transaction(header: QueuedEthereumHeader) -> bridge_no bridge_node_runtime::UncheckedExtrinsic::new_unsigned(function) } - -/// When RPC method returns encoded value. -fn rpc_returns_encoded_value(value: Value) -> Result { - let encoded_response: Bytes = from_value(value).map_err(|_| Error::ResponseParseFailed)?; - Decode::decode(&mut &encoded_response.0[..]).map_err(|_| Error::ResponseParseFailed) -} - -/// When RPC method returns value. -fn rpc_returns_value(value: Value) -> Result { - from_value(value).map_err(|_| Error::ResponseParseFailed) -} - -/// When RPC method returns raw bytes. -fn rpc_returns_bytes(value: Value) -> Result, Error> { - let encoded_response: Bytes = from_value(value).map_err(|_| Error::ResponseParseFailed)?; - Ok(encoded_response.0) -} diff --git a/bridges/relays/ethereum/src/substrate_sync_loop.rs b/bridges/relays/ethereum/src/substrate_sync_loop.rs index 7ca80f9672ad..ede547a9b5d5 100644 --- a/bridges/relays/ethereum/src/substrate_sync_loop.rs +++ b/bridges/relays/ethereum/src/substrate_sync_loop.rs @@ -16,18 +16,22 @@ //! Substrate -> Ethereum synchronization. -use crate::ethereum_client::{self, EthereumConnectionParams, EthereumSigningParams}; +use crate::ethereum_client::{ + EthereumConnectionParams, EthereumHighLevelRpc, EthereumRpcClient, EthereumSigningParams, +}; use crate::ethereum_types::Address; -use crate::substrate_client::{self, SubstrateConnectionParams}; +use crate::rpc::SubstrateRpc; +use crate::rpc_errors::RpcError; +use crate::substrate_client::{SubstrateConnectionParams, SubstrateRpcClient}; use crate::substrate_types::{ GrandpaJustification, Hash, Header, Number, QueuedSubstrateHeader, SubstrateHeaderId, SubstrateHeadersSyncPipeline, }; use crate::sync::{HeadersSyncParams, TargetTransactionMode}; -use crate::sync_loop::{OwnedSourceFutureOutput, OwnedTargetFutureOutput, SourceClient, TargetClient}; +use crate::sync_loop::{SourceClient, TargetClient}; use crate::sync_types::SourceHeader; use async_trait::async_trait; -use futures::future::FutureExt; + use std::{collections::HashSet, time::Duration}; /// Interval at which we check new Substrate headers when we are synced/almost synced. @@ -42,7 +46,7 @@ const MAX_SUBMITTED_HEADERS: usize = 4; const PRUNE_DEPTH: u32 = 256; /// Substrate synchronization parameters. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SubstrateSyncParams { /// Ethereum connection params. pub eth: EthereumConnectionParams, @@ -84,170 +88,125 @@ impl Default for SubstrateSyncParams { /// Substrate client as headers source. struct SubstrateHeadersSource { /// Substrate node client. - client: substrate_client::Client, + client: SubstrateRpcClient, } -type SubstrateFutureOutput = OwnedSourceFutureOutput; +impl SubstrateHeadersSource { + fn new(client: SubstrateRpcClient) -> Self { + Self { client } + } +} #[async_trait] impl SourceClient for SubstrateHeadersSource { - type Error = substrate_client::Error; + type Error = RpcError; - async fn best_block_number(self) -> SubstrateFutureOutput { - substrate_client::best_header(self.client) - .map(|(client, result)| (SubstrateHeadersSource { client }, result.map(|header| header.number))) - .await + async fn best_block_number(&self) -> Result { + Ok(self.client.best_header().await?.number) } - async fn header_by_hash(self, hash: Hash) -> SubstrateFutureOutput
{ - substrate_client::header_by_hash(self.client, hash) - .map(|(client, result)| (SubstrateHeadersSource { client }, result)) - .await + async fn header_by_hash(&self, hash: Hash) -> Result { + self.client.header_by_hash(hash).await } - async fn header_by_number(self, number: Number) -> SubstrateFutureOutput
{ - substrate_client::header_by_number(self.client, number) - .map(|(client, result)| (SubstrateHeadersSource { client }, result)) - .await + async fn header_by_number(&self, number: Number) -> Result { + self.client.header_by_number(number).await } async fn header_completion( - self, + &self, id: SubstrateHeaderId, - ) -> SubstrateFutureOutput<(SubstrateHeaderId, Option)> { - substrate_client::grandpa_justification(self.client, id) - .map(|(client, result)| (SubstrateHeadersSource { client }, result)) - .await + ) -> Result<(SubstrateHeaderId, Option), Self::Error> { + let hash = id.1; + let signed_block = self.client.get_block(Some(hash)).await?; + let grandpa_justification = signed_block.justification; + + Ok((id, grandpa_justification)) } async fn header_extra( - self, + &self, id: SubstrateHeaderId, _header: QueuedSubstrateHeader, - ) -> SubstrateFutureOutput<(SubstrateHeaderId, ())> { - (self, Ok((id, ()))) + ) -> Result<(SubstrateHeaderId, ()), Self::Error> { + Ok((id, ())) } } /// Ethereum client as Substrate headers target. struct EthereumHeadersTarget { /// Ethereum node client. - client: ethereum_client::Client, + client: EthereumRpcClient, /// Bridge contract address. contract: Address, /// Ethereum signing params. sign_params: EthereumSigningParams, } -type EthereumFutureOutput = OwnedTargetFutureOutput; +impl EthereumHeadersTarget { + fn new(client: EthereumRpcClient, contract: Address, sign_params: EthereumSigningParams) -> Self { + Self { + client, + contract, + sign_params, + } + } +} #[async_trait] impl TargetClient for EthereumHeadersTarget { - type Error = ethereum_client::Error; - - async fn best_header_id(self) -> EthereumFutureOutput { - let (contract, sign_params) = (self.contract, self.sign_params); - ethereum_client::best_substrate_block(self.client, contract) - .map(move |(client, result)| { - ( - EthereumHeadersTarget { - client, - contract, - sign_params, - }, - result, - ) - }) - .await + type Error = RpcError; + + async fn best_header_id(&self) -> Result { + self.client.best_substrate_block(self.contract).await } - async fn is_known_header(self, id: SubstrateHeaderId) -> EthereumFutureOutput<(SubstrateHeaderId, bool)> { - let (contract, sign_params) = (self.contract, self.sign_params); - ethereum_client::substrate_header_known(self.client, contract, id) - .map(move |(client, result)| { - ( - EthereumHeadersTarget { - client, - contract, - sign_params, - }, - result, - ) - }) - .await + async fn is_known_header(&self, id: SubstrateHeaderId) -> Result<(SubstrateHeaderId, bool), Self::Error> { + self.client.substrate_header_known(self.contract, id).await } - async fn submit_headers(self, headers: Vec) -> EthereumFutureOutput> { - let (contract, sign_params) = (self.contract, self.sign_params); - ethereum_client::submit_substrate_headers(self.client, sign_params.clone(), contract, headers) - .map(move |(client, result)| { - ( - EthereumHeadersTarget { - client, - contract, - sign_params, - }, - result, - ) - }) + async fn submit_headers(&self, headers: Vec) -> Result, Self::Error> { + self.client + .submit_substrate_headers(self.sign_params.clone(), self.contract, headers) .await } - async fn incomplete_headers_ids(self) -> EthereumFutureOutput> { - let (contract, sign_params) = (self.contract, self.sign_params); - ethereum_client::incomplete_substrate_headers(self.client, contract) - .map(move |(client, result)| { - ( - EthereumHeadersTarget { - client, - contract, - sign_params, - }, - result, - ) - }) - .await + async fn incomplete_headers_ids(&self) -> Result, Self::Error> { + self.client.incomplete_substrate_headers(self.contract).await } async fn complete_header( - self, + &self, id: SubstrateHeaderId, completion: GrandpaJustification, - ) -> EthereumFutureOutput { - let (contract, sign_params) = (self.contract, self.sign_params); - ethereum_client::complete_substrate_header(self.client, sign_params.clone(), contract, id, completion) - .map(move |(client, result)| { - ( - EthereumHeadersTarget { - client, - contract, - sign_params, - }, - result, - ) - }) + ) -> Result { + self.client + .complete_substrate_header(self.sign_params.clone(), self.contract, id, completion) .await } - async fn requires_extra(self, header: QueuedSubstrateHeader) -> EthereumFutureOutput<(SubstrateHeaderId, bool)> { - (self, Ok((header.header().id(), false))) + async fn requires_extra(&self, header: QueuedSubstrateHeader) -> Result<(SubstrateHeaderId, bool), Self::Error> { + Ok((header.header().id(), false)) } } /// Run Substrate headers synchronization. -pub fn run(params: SubstrateSyncParams) { - let eth_client = ethereum_client::client(params.eth); - let sub_client = substrate_client::client(params.sub); +pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> { + let sub_params = params.clone(); + + let eth_client = EthereumRpcClient::new(params.eth); + let sub_client = async_std::task::block_on(async { SubstrateRpcClient::new(sub_params.sub).await })?; + + let target = EthereumHeadersTarget::new(eth_client, params.eth_contract_address, params.eth_sign); + let source = SubstrateHeadersSource::new(sub_client); crate::sync_loop::run( - SubstrateHeadersSource { client: sub_client }, + source, SUBSTRATE_TICK_INTERVAL, - EthereumHeadersTarget { - client: eth_client, - contract: params.eth_contract_address, - sign_params: params.eth_sign, - }, + target, ETHEREUM_TICK_INTERVAL, params.sync_params, ); + + Ok(()) } diff --git a/bridges/relays/ethereum/src/sync.rs b/bridges/relays/ethereum/src/sync.rs index e1fc372e148e..7f15865cc504 100644 --- a/bridges/relays/ethereum/src/sync.rs +++ b/bridges/relays/ethereum/src/sync.rs @@ -19,7 +19,7 @@ use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, QueuedHeade use num_traits::{One, Saturating}; /// Common sync params. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct HeadersSyncParams { /// Maximal number of ethereum headers to pre-download. pub max_future_headers_to_download: usize, @@ -37,7 +37,7 @@ pub struct HeadersSyncParams { } /// Target transaction mode. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum TargetTransactionMode { /// Submit new headers using signed transactions. Signed, diff --git a/bridges/relays/ethereum/src/sync_loop.rs b/bridges/relays/ethereum/src/sync_loop.rs index 978db603591d..28ee40650f32 100644 --- a/bridges/relays/ethereum/src/sync_loop.rs +++ b/bridges/relays/ethereum/src/sync_loop.rs @@ -43,11 +43,6 @@ const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60); /// reconnection again. const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10); -/// Type alias for all SourceClient futures. -pub type OwnedSourceFutureOutput = (Client, Result>::Error>); -/// Type alias for all TargetClient futures. -pub type OwnedTargetFutureOutput = (Client, Result>::Error>); - /// Source client trait. #[async_trait] pub trait SourceClient: Sized { @@ -55,26 +50,26 @@ pub trait SourceClient: Sized { type Error: std::fmt::Debug + MaybeConnectionError; /// Get best block number. - async fn best_block_number(self) -> OwnedSourceFutureOutput; + async fn best_block_number(&self) -> Result; /// Get header by hash. - async fn header_by_hash(self, hash: P::Hash) -> OwnedSourceFutureOutput; + async fn header_by_hash(&self, hash: P::Hash) -> Result; /// Get canonical header by number. - async fn header_by_number(self, number: P::Number) -> OwnedSourceFutureOutput; + async fn header_by_number(&self, number: P::Number) -> Result; /// Get completion data by header hash. async fn header_completion( - self, + &self, id: HeaderId, - ) -> OwnedSourceFutureOutput, Option)>; + ) -> Result<(HeaderId, Option), Self::Error>; /// Get extra data by header hash. async fn header_extra( - self, + &self, id: HeaderId, header: QueuedHeader

, - ) -> OwnedSourceFutureOutput, P::Extra)>; + ) -> Result<(HeaderId, P::Extra), Self::Error>; } /// Target client trait. @@ -84,35 +79,35 @@ pub trait TargetClient: Sized { type Error: std::fmt::Debug + MaybeConnectionError; /// Returns ID of best header known to the target node. - async fn best_header_id(self) -> OwnedTargetFutureOutput>; + async fn best_header_id(&self) -> Result, Self::Error>; /// Returns true if header is known to the target node. async fn is_known_header( - self, + &self, id: HeaderId, - ) -> OwnedTargetFutureOutput, bool)>; + ) -> Result<(HeaderId, bool), Self::Error>; /// Submit headers. async fn submit_headers( - self, + &self, headers: Vec>, - ) -> OwnedTargetFutureOutput>>; + ) -> Result>, Self::Error>; /// Returns ID of headers that require to be 'completed' before children can be submitted. - async fn incomplete_headers_ids(self) -> OwnedTargetFutureOutput>>; + async fn incomplete_headers_ids(&self) -> Result>, Self::Error>; /// Submit completion data for header. async fn complete_header( - self, + &self, id: HeaderId, completion: P::Completion, - ) -> OwnedTargetFutureOutput>; + ) -> Result, Self::Error>; /// Returns true if header requires extra data to be submitted. async fn requires_extra( - self, + &self, header: QueuedHeader

, - ) -> OwnedTargetFutureOutput, bool)>; + ) -> Result<(HeaderId, bool), Self::Error>; } /// Run headers synchronization. @@ -131,7 +126,7 @@ pub fn run( let mut stall_countdown = None; let mut last_update_time = Instant::now(); - let mut source_maybe_client = None; + let mut source_client_is_online = false; let mut source_best_block_number_required = false; let source_best_block_number_future = source_client.best_block_number().fuse(); let source_new_header_future = futures::future::Fuse::terminated(); @@ -141,7 +136,7 @@ pub fn run( let source_go_offline_future = futures::future::Fuse::terminated(); let source_tick_stream = interval(source_tick).fuse(); - let mut target_maybe_client = None; + let mut target_client_is_online = false; let mut target_best_block_required = false; let mut target_incomplete_headers_required = true; let target_best_block_future = target_client.best_header_id().fuse(); @@ -173,77 +168,65 @@ pub fn run( loop { futures::select! { - (source_client, source_best_block_number) = source_best_block_number_future => { + source_best_block_number = source_best_block_number_future => { source_best_block_number_required = false; - process_future_result( - &mut source_maybe_client, - source_client, + source_client_is_online = process_future_result( source_best_block_number, |source_best_block_number| sync.source_best_header_number_response(source_best_block_number), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY, source_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving best header number from {}", P::SOURCE_NAME), ); }, - (source_client, source_new_header) = source_new_header_future => { - process_future_result( - &mut source_maybe_client, - source_client, + source_new_header = source_new_header_future => { + source_client_is_online = process_future_result( source_new_header, |source_new_header| sync.headers_mut().header_response(source_new_header), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY, source_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving header from {} node", P::SOURCE_NAME), ); }, - (source_client, source_orphan_header) = source_orphan_header_future => { - process_future_result( - &mut source_maybe_client, - source_client, + source_orphan_header = source_orphan_header_future => { + source_client_is_online = process_future_result( source_orphan_header, |source_orphan_header| sync.headers_mut().header_response(source_orphan_header), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY, source_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving orphan header from {} node", P::SOURCE_NAME), ); }, - (source_client, source_extra) = source_extra_future => { - process_future_result( - &mut source_maybe_client, - source_client, + source_extra = source_extra_future => { + source_client_is_online = process_future_result( source_extra, |(header, extra)| sync.headers_mut().extra_response(&header, extra), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY, source_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving extra data from {} node", P::SOURCE_NAME), ); }, - (source_client, source_completion) = source_completion_future => { - process_future_result( - &mut source_maybe_client, - source_client, + source_completion = source_completion_future => { + source_client_is_online = process_future_result( source_completion, |(header, completion)| sync.headers_mut().completion_response(&header, completion), &mut source_go_offline_future, - |source_client| delay(CONNECTION_ERROR_DELAY, source_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving completion data from {} node", P::SOURCE_NAME), ); }, source_client = source_go_offline_future => { - source_maybe_client = Some(source_client); + source_client_is_online = true; }, _ = source_tick_stream.next() => { if sync.is_almost_synced() { source_best_block_number_required = true; } }, - (target_client, target_best_block) = target_best_block_future => { + target_best_block = target_best_block_future => { target_best_block_required = false; - process_future_result( - &mut target_maybe_client, - target_client, + target_client_is_online = process_future_result( target_best_block, |target_best_block| { let head_updated = sync.target_best_header_response(target_best_block); @@ -279,73 +262,63 @@ pub fn run( } }, &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving best known header from {} node", P::TARGET_NAME), ); }, - (target_client, incomplete_headers_ids) = target_incomplete_headers_future => { + incomplete_headers_ids = target_incomplete_headers_future => { target_incomplete_headers_required = false; - process_future_result( - &mut target_maybe_client, - target_client, + target_client_is_online = process_future_result( incomplete_headers_ids, |incomplete_headers_ids| sync.headers_mut().incomplete_headers_response(incomplete_headers_ids), &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME), ); }, - (target_client, target_existence_status) = target_existence_status_future => { - process_future_result( - &mut target_maybe_client, - target_client, + target_existence_status = target_existence_status_future => { + target_client_is_online = process_future_result( target_existence_status, |(target_header, target_existence_status)| sync .headers_mut() .maybe_orphan_response(&target_header, target_existence_status), &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving existence status from {} node", P::TARGET_NAME), ); }, - (target_client, target_submit_header_result) = target_submit_header_future => { - process_future_result( - &mut target_maybe_client, - target_client, + target_submit_header_result = target_submit_header_future => { + target_client_is_online = process_future_result( target_submit_header_result, |submitted_headers| sync.headers_mut().headers_submitted(submitted_headers), &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error submitting headers to {} node", P::TARGET_NAME), ); }, - (target_client, target_complete_header_result) = target_complete_header_future => { - process_future_result( - &mut target_maybe_client, - target_client, + target_complete_header_result = target_complete_header_future => { + target_client_is_online = process_future_result( target_complete_header_result, |completed_header| sync.headers_mut().header_completed(&completed_header), &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error completing headers at {}", P::TARGET_NAME), ); }, - (target_client, target_extra_check_result) = target_extra_check_future => { - process_future_result( - &mut target_maybe_client, - target_client, + target_extra_check_result = target_extra_check_future => { + target_client_is_online = process_future_result( target_extra_check_result, |(header, extra_check_result)| sync .headers_mut() .maybe_extra_response(&header, extra_check_result), &mut target_go_offline_future, - |target_client| delay(CONNECTION_ERROR_DELAY, target_client), + || async_std::task::sleep(CONNECTION_ERROR_DELAY), || format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME), ); }, target_client = target_go_offline_future => { - target_maybe_client = Some(target_client); + target_client_is_online = true; }, _ = target_tick_stream.next() => { target_best_block_required = true; @@ -356,15 +329,35 @@ pub fn run( // print progress progress_context = print_sync_progress(progress_context, &sync); - // if target client is available: wait, or call required target methods - if let Some(target_client) = target_maybe_client.take() { - // the priority is to: - // 1) get best block - it stops us from downloading/submitting new blocks + we call it rarely; - // 2) get incomplete headers - it stops us from submitting new blocks + we call it rarely; - // 3) complete headers - it stops us from submitting new blocks; - // 4) check if we need extra data from source - it stops us from downloading/submitting new blocks; - // 5) check existence - it stops us from submitting new blocks; - // 6) submit header + // If the target client is accepting requests we update the requests that + // we want it to run + if target_client_is_online { + // NOTE: Is is important to reset this so that we only have one + // request being processed by the client at a time. This prevents + // race conditions like receiving two transactions with the same + // nonce from the client. + target_client_is_online = false; + + // The following is how we prioritize requests: + // + // 1. Get best block + // - Stops us from downloading or submitting new blocks + // - Only called rarely + // + // 2. Get incomplete headers + // - Stops us from submitting new blocks + // - Only called rarely + // + // 3. Get complete headers + // - Stops us from submitting new blocks + // + // 4. Check if we need extra data from source + // - Stops us from downloading or submitting new blocks + // + // 5. Check existence of header + // - Stops us from submitting new blocks + // + // 6. Submit header if target_best_block_required { log::debug!(target: "bridge", "Asking {} about best block", P::TARGET_NAME); @@ -424,18 +417,35 @@ pub fn run( stall_countdown = Some(Instant::now()); } } else { - target_maybe_client = Some(target_client); + target_client_is_online = true; } } - // if source client is available: wait, or call required source methods - if let Some(source_client) = source_maybe_client.take() { - // the priority is to: - // 1) get best block - it stops us from downloading new blocks + we call it rarely; - // 2) download completion data - it stops us from submitting new blocks; - // 3) download extra data - it stops us from submitting new blocks; - // 4) download missing headers - it stops us from downloading/submitting new blocks; - // 5) downloading new headers + // If the source client is accepting requests we update the requests that + // we want it to run + if source_client_is_online { + // NOTE: Is is important to reset this so that we only have one + // request being processed by the client at a time. This prevents + // race conditions like receiving two transactions with the same + // nonce from the client. + source_client_is_online = false; + + // The following is how we prioritize requests: + // + // 1. Get best block + // - Stops us from downloading or submitting new blocks + // - Only called rarely + // + // 2. Download completion data + // - Stops us from submitting new blocks + // + // 3. Download extra data + // - Stops us from submitting new blocks + // + // 4. Download missing headers + // - Stops us from downloading or submitting new blocks + // + // 5. Downloading new headers if source_best_block_number_required { log::debug!(target: "bridge", "Asking {} node about best block number", P::SOURCE_NAME); @@ -488,55 +498,56 @@ pub fn run( source_new_header_future.set(source_client.header_by_number(id).fuse()); } else { - source_maybe_client = Some(source_client); + source_client_is_online = true; } } } }); } -/// Future that resolves into given value after given timeout. -async fn delay(timeout: Duration, retval: T) -> T { - async_std::task::sleep(timeout).await; - retval -} - /// Stream that emits item every `timeout_ms` milliseconds. fn interval(timeout: Duration) -> impl futures::Stream { futures::stream::unfold((), move |_| async move { - delay(timeout, ()).await; + async_std::task::sleep(timeout).await; Some(((), ())) }) } -/// Process result of the future that may have been caused by connection failure. -fn process_future_result( - maybe_client: &mut Option, - client: TClient, +/// Process result of the future from a client. +/// +/// Returns whether or not the client we're interacting with is online. In this context +/// what online means is that the client is currently not handling any other requests +/// that we've previously sent. +fn process_future_result( result: Result, on_success: impl FnOnce(TResult), go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse>, - go_offline: impl FnOnce(TClient) -> TGoOfflineFuture, + go_offline: impl FnOnce() -> TGoOfflineFuture, error_pattern: impl FnOnce() -> String, -) where +) -> bool +where TError: std::fmt::Debug + MaybeConnectionError, TGoOfflineFuture: FutureExt, { + let mut client_is_online = false; + match result { Ok(result) => { - *maybe_client = Some(client); on_success(result); + client_is_online = true } Err(error) => { if error.is_connection_error() { - go_offline_future.set(go_offline(client).fuse()); + go_offline_future.set(go_offline().fuse()); } else { - *maybe_client = Some(client); + client_is_online = true } log::error!(target: "bridge", "{}: {:?}", error_pattern(), error); } } + + client_is_online } /// Print synchronization progress.