Skip to content

Commit

Permalink
Asynchronous tx confirmations (informalsystems#1278)
Browse files Browse the repository at this point in the history
* Added non-blocking interface to submit msgs (stub)

* Basic impl for submit_msgs

* Minor improvements.

Cherry-picked from

commit b63335b
Author: Adi Seredinschi <adi@informal.systems>
Date:   Sat Jul 17 10:36:38 2021 +0200

    From IbcEvent to IbcEventWithHash

* Avoid cloning in assemble_msgs

* relay_from_operational_data is now generic

* TxHash wrapper

* unconfirmed module and mediator stub

* Implemented unconfirmed::Mediator corner-cases

* Moved from TxHash to TxHashes for better output

* More comments & ideas

* Added minimum backoff

* Fixed ordering bug

* Undo var renaming for easier review

* Fix type errors

* WIP refactoring

* Refactor mediator code

* Add some comments

* Refactor relay_path methods to not require &mut self

* Use CPS to retry submitting unconfirmed transactions

* Fix clippy

* Check that channel has valid channel ID in RelayPath::new()

There is no more &mut self reference in RelayPath, so there is
no way self.channel will be updated to contain channel ID later on

* Display more information in logs

* Rename send_msgs and submit_msgs with send_messages_and_wait_commit/check_tx

* Remove min backoff parameter

* Fix E2E test

* Handle error repsponse in pending tx separately

* Log RelaySummary in PacketWorker

* Revert change to backoff duration in PacketWorker

* Minor cleanup

* Add logging message for when send_messages_* methods are called

Co-authored-by: Soares Chen <soares.chen@gmail.com>
Co-authored-by: Soares Chen <soares.chen@maybevoid.com>
  • Loading branch information
3 people authored Aug 23, 2021
1 parent a29805c commit 0ee1788
Show file tree
Hide file tree
Showing 21 changed files with 925 additions and 235 deletions.
14 changes: 13 additions & 1 deletion relayer/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use ibc_proto::ibc::core::commitment::v1::MerkleProof;
use ibc_proto::ibc::core::connection::v1::{
QueryClientConnectionsRequest, QueryConnectionsRequest,
};
use tendermint_rpc::endpoint::broadcast::tx_sync::Response as TxResponse;

use crate::connection::ConnectionMsgType;
use crate::error::Error;
Expand Down Expand Up @@ -106,8 +107,19 @@ pub trait ChainEndpoint: Sized {
/// Returns the chain's keybase, mutably
fn keybase_mut(&mut self) -> &mut KeyRing;

/// Sends one or more transactions with `msgs` to chain and
// synchronously wait for it to be committed.
fn send_messages_and_wait_commit(
&mut self,
proto_msgs: Vec<Any>,
) -> Result<Vec<IbcEvent>, Error>;

/// Sends one or more transactions with `msgs` to chain.
fn send_msgs(&mut self, proto_msgs: Vec<Any>) -> Result<Vec<IbcEvent>, Error>;
/// Non-blocking alternative to `send_messages_and_wait_commit` interface.
fn send_messages_and_wait_check_tx(
&mut self,
proto_msgs: Vec<Any>,
) -> Result<Vec<TxResponse>, Error>;

fn get_signer(&mut self) -> Result<Signer, Error>;

Expand Down
56 changes: 53 additions & 3 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,11 @@ impl CosmosSdkChain {
.map(|res| res.response.hash.to_string())
.join(", ");

debug!("[{}] waiting for commit of block(s) {}", self.id(), hashes);
warn!(
"[{}] waiting for commit of tx hashes(s) {}",
self.id(),
hashes
);

// Wait a little bit initially
thread::sleep(Duration::from_millis(200));
Expand Down Expand Up @@ -793,8 +797,15 @@ impl ChainEndpoint for CosmosSdkChain {
/// then it returns error.
/// TODO - more work is required here for a smarter split maybe iteratively accumulating/ evaluating
/// msgs in a Tx until any of the max size, max num msgs, max fee are exceeded.
fn send_msgs(&mut self, proto_msgs: Vec<Any>) -> Result<Vec<IbcEvent>, Error> {
crate::time!("send_msgs");
fn send_messages_and_wait_commit(
&mut self,
proto_msgs: Vec<Any>,
) -> Result<Vec<IbcEvent>, Error> {
crate::time!("send_messages_and_wait_commit");
debug!(
"send_messages_and_wait_commit with {} messages",
proto_msgs.len()
);

if proto_msgs.is_empty() {
return Ok(vec![]);
Expand Down Expand Up @@ -842,6 +853,45 @@ impl ChainEndpoint for CosmosSdkChain {
Ok(events)
}

fn send_messages_and_wait_check_tx(
&mut self,
proto_msgs: Vec<Any>,
) -> Result<Vec<Response>, Error> {
crate::time!("send_messages_and_wait_check_tx");
debug!(
"send_messages_and_wait_check_tx with {} messages",
proto_msgs.len()
);

if proto_msgs.is_empty() {
return Ok(vec![]);
}
let mut responses = vec![];

let mut n = 0;
let mut size = 0;
let mut msg_batch = vec![];
for msg in proto_msgs.iter() {
msg_batch.push(msg.clone());
let mut buf = Vec::new();
prost::Message::encode(msg, &mut buf).unwrap();
n += 1;
size += buf.len();
if n >= self.max_msg_num() || size >= self.max_tx_size() {
// Send the tx and enqueue the resulting response
responses.push(self.send_tx(msg_batch)?);
n = 0;
size = 0;
msg_batch = vec![];
}
}
if !msg_batch.is_empty() {
responses.push(self.send_tx(msg_batch)?);
}

Ok(responses)
}

/// Get the account for the signer
fn get_signer(&mut self) -> Result<Signer, Error> {
crate::time!("get_signer");
Expand Down
21 changes: 19 additions & 2 deletions relayer/src/chain/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,16 @@ pub enum ChainRequest {
reply_to: ReplyTo<Subscription>,
},

SendMsgs {
SendMessagesAndWaitCommit {
proto_msgs: Vec<prost_types::Any>,
reply_to: ReplyTo<Vec<IbcEvent>>,
},

SendMessagesAndWaitCheckTx {
proto_msgs: Vec<prost_types::Any>,
reply_to: ReplyTo<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>>,
},

Signer {
reply_to: ReplyTo<Signer>,
},
Expand Down Expand Up @@ -314,7 +319,19 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug {

/// Send the given `msgs` to the chain, packaged as one or more transactions,
/// and return the list of events emitted by the chain after the transaction was committed.
fn send_msgs(&self, proto_msgs: Vec<prost_types::Any>) -> Result<Vec<IbcEvent>, Error>;
fn send_messages_and_wait_commit(
&self,
proto_msgs: Vec<prost_types::Any>,
) -> Result<Vec<IbcEvent>, Error>;

/// Submit messages asynchronously.
/// Does not block waiting on the chain to produce the
/// resulting events. Instead of events, this method
/// returns a set of transaction hashes.
fn send_messages_and_wait_check_tx(
&self,
proto_msgs: Vec<prost_types::Any>,
) -> Result<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>, Error>;

fn get_signer(&self) -> Result<Signer, Error>;

Expand Down
17 changes: 15 additions & 2 deletions relayer/src/chain/handle/prod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,21 @@ impl ChainHandle for ProdChainHandle {
self.send(|reply_to| ChainRequest::Subscribe { reply_to })
}

fn send_msgs(&self, proto_msgs: Vec<prost_types::Any>) -> Result<Vec<IbcEvent>, Error> {
self.send(|reply_to| ChainRequest::SendMsgs {
fn send_messages_and_wait_commit(
&self,
proto_msgs: Vec<prost_types::Any>,
) -> Result<Vec<IbcEvent>, Error> {
self.send(|reply_to| ChainRequest::SendMessagesAndWaitCommit {
proto_msgs,
reply_to,
})
}

fn send_messages_and_wait_check_tx(
&self,
proto_msgs: Vec<prost_types::Any>,
) -> Result<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>, Error> {
self.send(|reply_to| ChainRequest::SendMessagesAndWaitCheckTx {
proto_msgs,
reply_to,
})
Expand Down
12 changes: 11 additions & 1 deletion relayer/src/chain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,23 @@ impl ChainEndpoint for MockChain {
unimplemented!()
}

fn send_msgs(&mut self, proto_msgs: Vec<Any>) -> Result<Vec<IbcEvent>, Error> {
fn send_messages_and_wait_commit(
&mut self,
proto_msgs: Vec<Any>,
) -> Result<Vec<IbcEvent>, Error> {
// Use the ICS18Context interface to submit the set of messages.
let events = self.context.send(proto_msgs).map_err(Error::ics18)?;

Ok(events)
}

fn send_messages_and_wait_check_tx(
&mut self,
_proto_msgs: Vec<Any>,
) -> Result<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>, Error> {
todo!()
}

fn get_signer(&mut self) -> Result<Signer, Error> {
Ok(get_dummy_account_id())
}
Expand Down
24 changes: 20 additions & 4 deletions relayer/src/chain/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,12 @@ where
self.subscribe(reply_to)?
},

Ok(ChainRequest::SendMsgs { proto_msgs, reply_to }) => {
self.send_msgs(proto_msgs, reply_to)?
Ok(ChainRequest::SendMessagesAndWaitCommit { proto_msgs, reply_to }) => {
self.send_messages_and_wait_commit(proto_msgs, reply_to)?
},

Ok(ChainRequest::SendMessagesAndWaitCheckTx { proto_msgs, reply_to }) => {
self.send_messages_and_wait_check_tx(proto_msgs, reply_to)?
},

Ok(ChainRequest::Signer { reply_to }) => {
Expand Down Expand Up @@ -360,12 +364,24 @@ where
Ok(())
}

fn send_msgs(
fn send_messages_and_wait_commit(
&mut self,
proto_msgs: Vec<prost_types::Any>,
reply_to: ReplyTo<Vec<IbcEvent>>,
) -> Result<(), Error> {
let result = self.chain.send_msgs(proto_msgs);
let result = self.chain.send_messages_and_wait_commit(proto_msgs);

reply_to.send(result).map_err(Error::send)?;

Ok(())
}

fn send_messages_and_wait_check_tx(
&mut self,
proto_msgs: Vec<prost_types::Any>,
reply_to: ReplyTo<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>>,
) -> Result<(), Error> {
let result = self.chain.send_messages_and_wait_check_tx(proto_msgs);

reply_to.send(result).map_err(Error::send)?;

Expand Down
12 changes: 6 additions & 6 deletions relayer/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for channel open init
Expand Down Expand Up @@ -860,7 +860,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for channel open try
Expand Down Expand Up @@ -944,7 +944,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {

let events = channel
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ChannelError::submit(channel.dst_chain().id(), e))?;

// Find the relevant event for channel open ack
Expand Down Expand Up @@ -1040,7 +1040,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {

let events = channel
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ChannelError::submit(channel.dst_chain().id(), e))?;

// Find the relevant event for channel open confirm
Expand Down Expand Up @@ -1103,7 +1103,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for channel close init
Expand Down Expand Up @@ -1182,7 +1182,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for channel close confirm
Expand Down
12 changes: 6 additions & 6 deletions relayer/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for connection init
Expand Down Expand Up @@ -812,7 +812,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {
.map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?;
let client_msgs = self.build_update_client_on_src(src_client_target_height)?;
self.src_chain()
.send_msgs(client_msgs)
.send_messages_and_wait_commit(client_msgs)
.map_err(|e| ConnectionError::submit(self.src_chain().id(), e))?;

let query_height = self
Expand Down Expand Up @@ -883,7 +883,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for connection try transaction
Expand Down Expand Up @@ -929,7 +929,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {
.map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?;
let client_msgs = self.build_update_client_on_src(src_client_target_height)?;
self.src_chain()
.send_msgs(client_msgs)
.send_messages_and_wait_commit(client_msgs)
.map_err(|e| ConnectionError::submit(self.src_chain().id(), e))?;

let query_height = self
Expand Down Expand Up @@ -974,7 +974,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for connection ack
Expand Down Expand Up @@ -1051,7 +1051,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for connection confirm
Expand Down
Loading

0 comments on commit 0ee1788

Please sign in to comment.