Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous tx confirmations #1278

Merged
merged 32 commits into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d4d0532
Added non-blocking interface to submit msgs (stub)
adizere Jul 15, 2021
f123fab
Basic impl for submit_msgs
adizere Jul 15, 2021
8bf7287
Minor improvements.
adizere Jul 20, 2021
6c4d5e8
Avoid cloning in assemble_msgs
adizere Jul 17, 2021
caa5f0f
relay_from_operational_data is now generic
adizere Jul 20, 2021
2403639
TxHash wrapper
adizere Jul 20, 2021
8e8615b
unconfirmed module and mediator stub
adizere Jul 21, 2021
c76b639
Implemented unconfirmed::Mediator corner-cases
adizere Jul 28, 2021
5591e30
Moved from TxHash to TxHashes for better output
adizere Jul 28, 2021
ce3f169
More comments & ideas
adizere Jul 28, 2021
ca9a93f
Added minimum backoff
adizere Jul 28, 2021
2825f4d
Fixed ordering bug
adizere Aug 11, 2021
e150bcf
Merge branch 'master' into adi/nowait_localized
adizere Aug 12, 2021
3ed45e4
Undo var renaming for easier review
adizere Aug 12, 2021
ab86ac6
Fix type errors
soareschen Aug 16, 2021
930f7fa
WIP refactoring
soareschen Aug 17, 2021
de69974
Refactor mediator code
soareschen Aug 17, 2021
5205edd
Add some comments
soareschen Aug 17, 2021
a2a0613
Refactor relay_path methods to not require &mut self
soareschen Aug 18, 2021
cb7f8ab
Use CPS to retry submitting unconfirmed transactions
soareschen Aug 18, 2021
479a9f4
Fix clippy
soareschen Aug 18, 2021
b254618
Check that channel has valid channel ID in RelayPath::new()
soareschen Aug 18, 2021
7d5611c
Display more information in logs
soareschen Aug 18, 2021
3a5391c
Rename send_msgs and submit_msgs with send_messages_and_wait_commit/c…
soareschen Aug 18, 2021
729920b
Remove min backoff parameter
soareschen Aug 18, 2021
ac2998a
Fix E2E test
soareschen Aug 18, 2021
a22ab36
Handle error repsponse in pending tx separately
soareschen Aug 19, 2021
abb0685
Log RelaySummary in PacketWorker
soareschen Aug 19, 2021
cf3b8d2
Revert change to backoff duration in PacketWorker
soareschen Aug 19, 2021
9492c3a
Merge remote-tracking branch 'origin/master' into adi/nowait_localized
soareschen Aug 23, 2021
e98d806
Minor cleanup
soareschen Aug 23, 2021
c9ee093
Add logging message for when send_messages_* methods are called
soareschen Aug 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion relayer/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use ibc_proto::ibc::core::commitment::v1::MerkleProof;
use ibc_proto::ibc::core::connection::v1::{
QueryClientConnectionsRequest, QueryConnectionsRequest,
};
use tendermint_rpc::endpoint::broadcast::tx_sync::Response as TxResponse;

use crate::connection::ConnectionMsgType;
use crate::error::Error;
Expand Down Expand Up @@ -106,8 +107,19 @@ pub trait ChainEndpoint: Sized {
/// Returns the chain's keybase, mutably
fn keybase_mut(&mut self) -> &mut KeyRing;

/// Sends one or more transactions with `msgs` to chain and
// synchronously wait for it to be committed.
fn send_messages_and_wait_commit(
&mut self,
proto_msgs: Vec<Any>,
) -> Result<Vec<IbcEvent>, Error>;

/// Sends one or more transactions with `msgs` to chain.
fn send_msgs(&mut self, proto_msgs: Vec<Any>) -> Result<Vec<IbcEvent>, Error>;
/// Non-blocking alternative to `send_messages_and_wait_commit` interface.
fn send_messages_and_wait_check_tx(
&mut self,
proto_msgs: Vec<Any>,
) -> Result<Vec<TxResponse>, Error>;

fn get_signer(&mut self) -> Result<Signer, Error>;

Expand Down
56 changes: 53 additions & 3 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,11 @@ impl CosmosSdkChain {
.map(|res| res.response.hash.to_string())
.join(", ");

debug!("[{}] waiting for commit of block(s) {}", self.id(), hashes);
warn!(
"[{}] waiting for commit of tx hashes(s) {}",
self.id(),
hashes
);

// Wait a little bit initially
thread::sleep(Duration::from_millis(200));
Expand Down Expand Up @@ -793,8 +797,15 @@ impl ChainEndpoint for CosmosSdkChain {
/// then it returns error.
/// TODO - more work is required here for a smarter split maybe iteratively accumulating/ evaluating
/// msgs in a Tx until any of the max size, max num msgs, max fee are exceeded.
fn send_msgs(&mut self, proto_msgs: Vec<Any>) -> Result<Vec<IbcEvent>, Error> {
crate::time!("send_msgs");
fn send_messages_and_wait_commit(
&mut self,
proto_msgs: Vec<Any>,
) -> Result<Vec<IbcEvent>, Error> {
crate::time!("send_messages_and_wait_commit");
debug!(
"send_messages_and_wait_commit with {} messages",
proto_msgs.len()
);

if proto_msgs.is_empty() {
return Ok(vec![]);
Expand Down Expand Up @@ -842,6 +853,45 @@ impl ChainEndpoint for CosmosSdkChain {
Ok(events)
}

fn send_messages_and_wait_check_tx(
&mut self,
proto_msgs: Vec<Any>,
) -> Result<Vec<Response>, Error> {
crate::time!("send_messages_and_wait_check_tx");
debug!(
"send_messages_and_wait_check_tx with {} messages",
proto_msgs.len()
);

if proto_msgs.is_empty() {
return Ok(vec![]);
}
let mut responses = vec![];

let mut n = 0;
let mut size = 0;
let mut msg_batch = vec![];
for msg in proto_msgs.iter() {
msg_batch.push(msg.clone());
let mut buf = Vec::new();
prost::Message::encode(msg, &mut buf).unwrap();
n += 1;
size += buf.len();
if n >= self.max_msg_num() || size >= self.max_tx_size() {
// Send the tx and enqueue the resulting response
responses.push(self.send_tx(msg_batch)?);
n = 0;
size = 0;
msg_batch = vec![];
}
}
if !msg_batch.is_empty() {
responses.push(self.send_tx(msg_batch)?);
}

Ok(responses)
}

/// Get the account for the signer
fn get_signer(&mut self) -> Result<Signer, Error> {
crate::time!("get_signer");
Expand Down
21 changes: 19 additions & 2 deletions relayer/src/chain/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,16 @@ pub enum ChainRequest {
reply_to: ReplyTo<Subscription>,
},

SendMsgs {
SendMessagesAndWaitCommit {
proto_msgs: Vec<prost_types::Any>,
reply_to: ReplyTo<Vec<IbcEvent>>,
},

SendMessagesAndWaitCheckTx {
proto_msgs: Vec<prost_types::Any>,
reply_to: ReplyTo<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>>,
},

Signer {
reply_to: ReplyTo<Signer>,
},
Expand Down Expand Up @@ -314,7 +319,19 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug {

/// Send the given `msgs` to the chain, packaged as one or more transactions,
/// and return the list of events emitted by the chain after the transaction was committed.
fn send_msgs(&self, proto_msgs: Vec<prost_types::Any>) -> Result<Vec<IbcEvent>, Error>;
fn send_messages_and_wait_commit(
&self,
proto_msgs: Vec<prost_types::Any>,
) -> Result<Vec<IbcEvent>, Error>;

/// Submit messages asynchronously.
/// Does not block waiting on the chain to produce the
/// resulting events. Instead of events, this method
/// returns a set of transaction hashes.
fn send_messages_and_wait_check_tx(
&self,
proto_msgs: Vec<prost_types::Any>,
) -> Result<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>, Error>;

fn get_signer(&self) -> Result<Signer, Error>;

Expand Down
17 changes: 15 additions & 2 deletions relayer/src/chain/handle/prod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,21 @@ impl ChainHandle for ProdChainHandle {
self.send(|reply_to| ChainRequest::Subscribe { reply_to })
}

fn send_msgs(&self, proto_msgs: Vec<prost_types::Any>) -> Result<Vec<IbcEvent>, Error> {
self.send(|reply_to| ChainRequest::SendMsgs {
fn send_messages_and_wait_commit(
&self,
proto_msgs: Vec<prost_types::Any>,
) -> Result<Vec<IbcEvent>, Error> {
self.send(|reply_to| ChainRequest::SendMessagesAndWaitCommit {
proto_msgs,
reply_to,
})
}

fn send_messages_and_wait_check_tx(
&self,
proto_msgs: Vec<prost_types::Any>,
) -> Result<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>, Error> {
self.send(|reply_to| ChainRequest::SendMessagesAndWaitCheckTx {
proto_msgs,
reply_to,
})
Expand Down
12 changes: 11 additions & 1 deletion relayer/src/chain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,23 @@ impl ChainEndpoint for MockChain {
unimplemented!()
}

fn send_msgs(&mut self, proto_msgs: Vec<Any>) -> Result<Vec<IbcEvent>, Error> {
fn send_messages_and_wait_commit(
&mut self,
proto_msgs: Vec<Any>,
) -> Result<Vec<IbcEvent>, Error> {
// Use the ICS18Context interface to submit the set of messages.
let events = self.context.send(proto_msgs).map_err(Error::ics18)?;

Ok(events)
}

fn send_messages_and_wait_check_tx(
&mut self,
_proto_msgs: Vec<Any>,
) -> Result<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>, Error> {
todo!()
}

fn get_signer(&mut self) -> Result<Signer, Error> {
Ok(get_dummy_account_id())
}
Expand Down
24 changes: 20 additions & 4 deletions relayer/src/chain/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,12 @@ where
self.subscribe(reply_to)?
},

Ok(ChainRequest::SendMsgs { proto_msgs, reply_to }) => {
self.send_msgs(proto_msgs, reply_to)?
Ok(ChainRequest::SendMessagesAndWaitCommit { proto_msgs, reply_to }) => {
self.send_messages_and_wait_commit(proto_msgs, reply_to)?
},

Ok(ChainRequest::SendMessagesAndWaitCheckTx { proto_msgs, reply_to }) => {
self.send_messages_and_wait_check_tx(proto_msgs, reply_to)?
},

Ok(ChainRequest::Signer { reply_to }) => {
Expand Down Expand Up @@ -360,12 +364,24 @@ where
Ok(())
}

fn send_msgs(
fn send_messages_and_wait_commit(
&mut self,
proto_msgs: Vec<prost_types::Any>,
reply_to: ReplyTo<Vec<IbcEvent>>,
) -> Result<(), Error> {
let result = self.chain.send_msgs(proto_msgs);
let result = self.chain.send_messages_and_wait_commit(proto_msgs);

reply_to.send(result).map_err(Error::send)?;

Ok(())
}

fn send_messages_and_wait_check_tx(
&mut self,
proto_msgs: Vec<prost_types::Any>,
reply_to: ReplyTo<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>>,
) -> Result<(), Error> {
let result = self.chain.send_messages_and_wait_check_tx(proto_msgs);

reply_to.send(result).map_err(Error::send)?;

Expand Down
12 changes: 6 additions & 6 deletions relayer/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for channel open init
Expand Down Expand Up @@ -860,7 +860,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for channel open try
Expand Down Expand Up @@ -944,7 +944,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {

let events = channel
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ChannelError::submit(channel.dst_chain().id(), e))?;

// Find the relevant event for channel open ack
Expand Down Expand Up @@ -1040,7 +1040,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {

let events = channel
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ChannelError::submit(channel.dst_chain().id(), e))?;

// Find the relevant event for channel open confirm
Expand Down Expand Up @@ -1103,7 +1103,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for channel close init
Expand Down Expand Up @@ -1182,7 +1182,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for channel close confirm
Expand Down
12 changes: 6 additions & 6 deletions relayer/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for connection init
Expand Down Expand Up @@ -812,7 +812,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {
.map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?;
let client_msgs = self.build_update_client_on_src(src_client_target_height)?;
self.src_chain()
.send_msgs(client_msgs)
.send_messages_and_wait_commit(client_msgs)
.map_err(|e| ConnectionError::submit(self.src_chain().id(), e))?;

let query_height = self
Expand Down Expand Up @@ -883,7 +883,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for connection try transaction
Expand Down Expand Up @@ -929,7 +929,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {
.map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?;
let client_msgs = self.build_update_client_on_src(src_client_target_height)?;
self.src_chain()
.send_msgs(client_msgs)
.send_messages_and_wait_commit(client_msgs)
.map_err(|e| ConnectionError::submit(self.src_chain().id(), e))?;

let query_height = self
Expand Down Expand Up @@ -974,7 +974,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for connection ack
Expand Down Expand Up @@ -1051,7 +1051,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {

let events = self
.dst_chain()
.send_msgs(dst_msgs)
.send_messages_and_wait_commit(dst_msgs)
.map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?;

// Find the relevant event for connection confirm
Expand Down
Loading