Skip to content

Commit

Permalink
Fix sub2eth synchronization (paritytech#172)
Browse files Browse the repository at this point in the history
* ease serde version requirements (to build OE with builtin)

* trace + fix completion notifications

* check incompletion on submit

* fix compilation

* do not ask for synced blocks when queue is empty

* cargo fmt --all

* Update relays/ethereum/src/ethereum_client.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* remove closure

* fn submit_substrate_header() -> Option<RpcError>

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
  • Loading branch information
2 people authored and serban300 committed Apr 9, 2024
1 parent 7a5cce6 commit 052f1e2
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 66 deletions.
2 changes: 1 addition & 1 deletion bridges/relays/ethereum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ num-traits = "0.2"
parity-crypto = { version = "0.6", features = ["publickey"] }
parking_lot = "0.11.0"
rustc-hex = "2.0.1"
serde = { version = "1.0.114", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.55"
sp-bridge-eth-poa = { path = "../../primitives/ethereum-poa" }
time = "0.2"
Expand Down
19 changes: 19 additions & 0 deletions bridges/relays/ethereum/res/substrate-bridge-abi.json
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,25 @@
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "bytes",
"name": "rawHeader",
"type": "bytes"
}
],
"name": "isIncompleteHeader",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
Expand Down
2 changes: 1 addition & 1 deletion bridges/relays/ethereum/res/substrate-bridge-bytecode.hex

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions bridges/relays/ethereum/res/substrate-bridge-metadata.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Last Change Date: 2020-05-01
Last Change Date: 2020-07-03
Solc version: 0.6.6+commit.6c089d02
Source hash (keccak256): 0x36403636ad41082ca6c937c60ab06446cd9ef7036c178fa2f04d7c8286544d39
Source gist: https://github.com/svyatonik/substrate-bridge-sol/blob/8b54f5f648f8685fecd52b7af1deb277922b0fc3/substrate-bridge.sol
Source hash (keccak256): 0x3e6339beefe6786f4f26b408d4f727e03c6fd9630d692af9a7f6b46143fa308f
Source gist: https://github.com/svyatonik/substrate-bridge-sol/blob/1d0fa475a2ba3a70a47ed2dd870568c42ec16c8c/substrate-bridge.sol
Compiler flags used (command to produce the file): `docker run -i ethereum/solc:0.6.6 --optimize --bin - < substrate-bridge.sol`
251 changes: 231 additions & 20 deletions bridges/relays/ethereum/src/ethereum_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::ethereum_types::{
use crate::rpc::{Ethereum, EthereumRpc};
use crate::rpc_errors::{EthereumNodeError, RpcError};
use crate::substrate_types::{GrandpaJustification, Hash as SubstrateHash, QueuedSubstrateHeader, SubstrateHeaderId};
use crate::sync_types::HeaderId;
use crate::sync_types::{HeaderId, MaybeConnectionError, SubmittedHeaders};

use async_trait::async_trait;
use codec::{Decode, Encode};
Expand All @@ -30,7 +30,7 @@ use jsonrpsee::transport::http::HttpTransportClient;
use jsonrpsee::Client;
use parity_crypto::publickey::KeyPair;

use std::collections::HashSet;
use std::collections::{HashSet, VecDeque};

// to encode/decode contract calls
ethabi_contract::use_contract!(bridge_contract, "res/substrate-bridge-abi.json");
Expand Down Expand Up @@ -170,7 +170,7 @@ pub trait EthereumHighLevelRpc: EthereumRpc {
params: EthereumSigningParams,
contract_address: Address,
headers: Vec<QueuedSubstrateHeader>,
) -> Result<Vec<SubstrateHeaderId>>;
) -> SubmittedHeaders<SubstrateHeaderId, RpcError>;

/// Returns ids of incomplete Substrate headers.
async fn incomplete_substrate_headers(&self, contract_address: Address) -> Result<HashSet<SubstrateHeaderId>>;
Expand Down Expand Up @@ -246,25 +246,35 @@ impl EthereumHighLevelRpc for EthereumRpcClient {
params: EthereumSigningParams,
contract_address: Address,
headers: Vec<QueuedSubstrateHeader>,
) -> Result<Vec<SubstrateHeaderId>> {
) -> SubmittedHeaders<SubstrateHeaderId, RpcError> {
// read nonce of signer
let address: Address = params.signer.address().as_fixed_bytes().into();
let mut nonce = self.account_nonce(address).await?;

let ids = headers.iter().map(|header| header.id()).collect();
for header in headers {
self.submit_ethereum_transaction(
&params,
Some(contract_address),
Some(nonce),
false,
bridge_contract::functions::import_header::encode_input(header.header().encode()),
)
.await?;

nonce += 1.into();
}
let nonce = match self.account_nonce(address).await {
Ok(nonce) => nonce,
Err(error) => {
return SubmittedHeaders {
submitted: Vec::new(),
incomplete: Vec::new(),
rejected: headers.iter().rev().map(|header| header.id()).collect(),
fatal_error: Some(error),
}
}
};

Ok(ids)
// submit headers. Note that we're cloning self here. It is ok, because
// cloning `jsonrpsee::Client` only clones reference to background threads
submit_substrate_headers(
EthereumHeadersSubmitter {
client: EthereumRpcClient {
client: self.client.clone(),
},
params,
contract_address,
nonce,
},
headers,
)
.await
}

async fn incomplete_substrate_headers(&self, contract_address: Address) -> Result<HashSet<SubstrateHeaderId>> {
Expand Down Expand Up @@ -363,3 +373,204 @@ impl EthereumHighLevelRpc for EthereumRpcClient {
Ok((id, transaction_receipts))
}
}

/// Substrate headers submitter API.
#[async_trait]
trait HeadersSubmitter {
/// Returns Ok(true) if not-yet-imported header is incomplete.
/// Returns Ok(false) if not-yet-imported header is complete.
///
/// Returns Err(()) if contract has rejected header. This probably means
/// that the header is already imported by the contract.
async fn is_header_incomplete(&self, header: &QueuedSubstrateHeader) -> Result<bool>;

/// Submit given header to Ethereum node.
async fn submit_header(&mut self, header: QueuedSubstrateHeader) -> Result<()>;
}

/// Implementation of Substrate headers submitter that sends headers to running Ethereum node.
struct EthereumHeadersSubmitter {
client: EthereumRpcClient,
params: EthereumSigningParams,
contract_address: Address,
nonce: U256,
}

#[async_trait]
impl HeadersSubmitter for EthereumHeadersSubmitter {
async fn is_header_incomplete(&self, header: &QueuedSubstrateHeader) -> Result<bool> {
let (encoded_call, call_decoder) =
bridge_contract::functions::is_incomplete_header::call(header.header().encode());
let call_request = CallRequest {
to: Some(self.contract_address),
data: Some(encoded_call.into()),
..Default::default()
};

let call_result = self.client.eth_call(call_request).await?;
let is_incomplete = call_decoder.decode(&call_result.0)?;

Ok(is_incomplete)
}

async fn submit_header(&mut self, header: QueuedSubstrateHeader) -> Result<()> {
let result = self
.client
.submit_ethereum_transaction(
&self.params,
Some(self.contract_address),
Some(self.nonce),
false,
bridge_contract::functions::import_header::encode_input(header.header().encode()),
)
.await;

if result.is_ok() {
self.nonce += U256::one();
}

result
}
}

/// Submit multiple Substrate headers.
async fn submit_substrate_headers(
mut header_submitter: impl HeadersSubmitter,
headers: Vec<QueuedSubstrateHeader>,
) -> SubmittedHeaders<SubstrateHeaderId, RpcError> {
let mut ids = headers.iter().map(|header| header.id()).collect::<VecDeque<_>>();
let mut submitted_headers = SubmittedHeaders::default();
for header in headers {
let id = ids.pop_front().expect("both collections have same size; qed");
submitted_headers.fatal_error =
submit_substrate_header(&mut header_submitter, &mut submitted_headers, id, header).await;

if submitted_headers.fatal_error.is_some() {
submitted_headers.rejected.extend(ids);
break;
}
}

submitted_headers
}

/// Submit single Substrate header.
async fn submit_substrate_header(
header_submitter: &mut impl HeadersSubmitter,
submitted_headers: &mut SubmittedHeaders<SubstrateHeaderId, RpcError>,
id: SubstrateHeaderId,
header: QueuedSubstrateHeader,
) -> Option<RpcError> {
// if parent of this header is either incomplete, or rejected, we assume that contract
// will reject this header as well
let parent_id = header.parent_id();
if submitted_headers.rejected.contains(&parent_id) || submitted_headers.incomplete.contains(&parent_id) {
submitted_headers.rejected.push(id);
return None;
}

// check if this header is incomplete
let is_header_incomplete = match header_submitter.is_header_incomplete(&header).await {
Ok(true) => true,
Ok(false) => false,
Err(error) => {
// contract has rejected this header => we do not want to submit it
submitted_headers.rejected.push(id);
if error.is_connection_error() {
return Some(error);
} else {
return None;
}
}
};

// submit header and update submitted headers
match header_submitter.submit_header(header).await {
Ok(_) => {
submitted_headers.submitted.push(id);
if is_header_incomplete {
submitted_headers.incomplete.push(id);
}
None
}
Err(error) => {
submitted_headers.rejected.push(id);
Some(error)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::substrate_types::{Header as SubstrateHeader, Number as SubstrateBlockNumber};
use sp_runtime::traits::Header;

struct TestHeadersSubmitter {
incomplete: Vec<SubstrateHeaderId>,
failed: Vec<SubstrateHeaderId>,
}

#[async_trait]
impl HeadersSubmitter for TestHeadersSubmitter {
async fn is_header_incomplete(&self, header: &QueuedSubstrateHeader) -> Result<bool> {
if self.incomplete.iter().any(|i| i.0 == header.id().0) {
Ok(true)
} else {
Ok(false)
}
}

async fn submit_header(&mut self, header: QueuedSubstrateHeader) -> Result<()> {
if self.failed.iter().any(|i| i.0 == header.id().0) {
Err(RpcError::Ethereum(EthereumNodeError::InvalidSubstrateBlockNumber))
} else {
Ok(())
}
}
}

fn header(number: SubstrateBlockNumber) -> QueuedSubstrateHeader {
QueuedSubstrateHeader::new(SubstrateHeader::new(
number,
Default::default(),
Default::default(),
if number == 0 {
Default::default()
} else {
header(number - 1).id().1
},
Default::default(),
))
}

#[test]
fn descendants_of_incomplete_headers_are_not_submitted() {
let submitted_headers = async_std::task::block_on(submit_substrate_headers(
TestHeadersSubmitter {
incomplete: vec![header(5).id()],
failed: vec![],
},
vec![header(5), header(6)],
));
assert_eq!(submitted_headers.submitted, vec![header(5).id()]);
assert_eq!(submitted_headers.incomplete, vec![header(5).id()]);
assert_eq!(submitted_headers.rejected, vec![header(6).id()]);
assert!(submitted_headers.fatal_error.is_none());
}

#[test]
fn headers_after_fatal_error_are_not_submitted() {
let submitted_headers = async_std::task::block_on(submit_substrate_headers(
TestHeadersSubmitter {
incomplete: vec![],
failed: vec![header(6).id()],
},
vec![header(5), header(6), header(7)],
));
assert_eq!(submitted_headers.submitted, vec![header(5).id()]);
assert_eq!(submitted_headers.incomplete, vec![]);
assert_eq!(submitted_headers.rejected, vec![header(6).id(), header(7).id()]);
assert!(submitted_headers.fatal_error.is_some());
}
}
7 changes: 5 additions & 2 deletions bridges/relays/ethereum/src/ethereum_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::substrate_client::{
use crate::substrate_types::into_substrate_ethereum_header;
use crate::sync::{HeadersSyncParams, TargetTransactionMode};
use crate::sync_loop::{SourceClient, TargetClient};
use crate::sync_types::SourceHeader;
use crate::sync_types::{SourceHeader, SubmittedHeaders};

use async_trait::async_trait;
use web3::types::H256;
Expand Down Expand Up @@ -155,7 +155,10 @@ impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
Ok((id, self.client.ethereum_header_known(id).await?))
}

async fn submit_headers(&self, headers: Vec<QueuedEthereumHeader>) -> Result<Vec<EthereumHeaderId>, Self::Error> {
async fn submit_headers(
&self,
headers: Vec<QueuedEthereumHeader>,
) -> SubmittedHeaders<EthereumHeaderId, Self::Error> {
let (sign_params, sign_transactions) = (self.sign_params.clone(), self.sign_transactions.clone());
self.client
.submit_ethereum_headers(sign_params, headers, sign_transactions)
Expand Down
Loading

0 comments on commit 052f1e2

Please sign in to comment.