Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous tx confirmations #1278

Merged
merged 32 commits into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d4d0532
Added non-blocking interface to submit msgs (stub)
adizere Jul 15, 2021
f123fab
Basic impl for submit_msgs
adizere Jul 15, 2021
8bf7287
Minor improvements.
adizere Jul 20, 2021
6c4d5e8
Avoid cloning in assemble_msgs
adizere Jul 17, 2021
caa5f0f
relay_from_operational_data is now generic
adizere Jul 20, 2021
2403639
TxHash wrapper
adizere Jul 20, 2021
8e8615b
unconfirmed module and mediator stub
adizere Jul 21, 2021
c76b639
Implemented unconfirmed::Mediator corner-cases
adizere Jul 28, 2021
5591e30
Moved from TxHash to TxHashes for better output
adizere Jul 28, 2021
ce3f169
More comments & ideas
adizere Jul 28, 2021
ca9a93f
Added minimum backoff
adizere Jul 28, 2021
2825f4d
Fixed ordering bug
adizere Aug 11, 2021
e150bcf
Merge branch 'master' into adi/nowait_localized
adizere Aug 12, 2021
3ed45e4
Undo var renaming for easier review
adizere Aug 12, 2021
ab86ac6
Fix type errors
soareschen Aug 16, 2021
930f7fa
WIP refactoring
soareschen Aug 17, 2021
de69974
Refactor mediator code
soareschen Aug 17, 2021
5205edd
Add some comments
soareschen Aug 17, 2021
a2a0613
Refactor relay_path methods to not require &mut self
soareschen Aug 18, 2021
cb7f8ab
Use CPS to retry submitting unconfirmed transactions
soareschen Aug 18, 2021
479a9f4
Fix clippy
soareschen Aug 18, 2021
b254618
Check that channel has valid channel ID in RelayPath::new()
soareschen Aug 18, 2021
7d5611c
Display more information in logs
soareschen Aug 18, 2021
3a5391c
Rename send_msgs and submit_msgs with send_messages_and_wait_commit/c…
soareschen Aug 18, 2021
729920b
Remove min backoff parameter
soareschen Aug 18, 2021
ac2998a
Fix E2E test
soareschen Aug 18, 2021
a22ab36
Handle error repsponse in pending tx separately
soareschen Aug 19, 2021
abb0685
Log RelaySummary in PacketWorker
soareschen Aug 19, 2021
cf3b8d2
Revert change to backoff duration in PacketWorker
soareschen Aug 19, 2021
9492c3a
Merge remote-tracking branch 'origin/master' into adi/nowait_localized
soareschen Aug 23, 2021
e98d806
Minor cleanup
soareschen Aug 23, 2021
c9ee093
Add logging message for when send_messages_* methods are called
soareschen Aug 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions relayer/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ pub trait ChainEndpoint: Sized {
/// Sends one or more transactions with `msgs` to chain.
fn send_msgs(&mut self, proto_msgs: Vec<Any>) -> Result<Vec<IbcEvent>, Error>;

/// Sends one or more transactions with `msgs` to chain.
/// Non-blocking alternative to `send_msgs` interface.
fn submit_msgs(
&mut self,
proto_msgs: Vec<Any>,
) -> Result<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>, Error>;

fn get_signer(&mut self) -> Result<Signer, Error>;

fn get_key(&mut self) -> Result<KeyEntry, Error>;
Expand Down
39 changes: 38 additions & 1 deletion relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,11 @@ impl CosmosSdkChain {
.map(|res| res.response.hash.to_string())
.join(", ");

debug!("[{}] waiting for commit of block(s) {}", self.id(), hashes);
warn!(
"[{}] waiting for commit of tx hashes(s) {}",
self.id(),
hashes
);

// Wait a little bit initially
thread::sleep(Duration::from_millis(200));
Expand Down Expand Up @@ -655,6 +659,7 @@ impl CosmosSdkChain {
if empty_event_present(events) {
// If the transaction failed, replace the events with an error,
// so that we don't attempt to resolve the transaction later on.
// !! TODO(Adi): This check must be part of `submit_msgs` method!!
if response.code.value() != 0 {
*events = vec![IbcEvent::ChainError(format!(
"deliver_tx on chain {} for Tx hash {} reports error: code={:?}, log={:?}",
Expand Down Expand Up @@ -839,6 +844,38 @@ impl ChainEndpoint for CosmosSdkChain {
Ok(events)
}

fn submit_msgs(&mut self, proto_msgs: Vec<Any>) -> Result<Vec<Response>, Error> {
crate::time!("submit_msgs");

if proto_msgs.is_empty() {
return Ok(vec![]);
}
let mut responses = vec![];

let mut n = 0;
let mut size = 0;
let mut msg_batch = vec![];
for msg in proto_msgs.iter() {
msg_batch.push(msg.clone());
let mut buf = Vec::new();
prost::Message::encode(msg, &mut buf).unwrap();
n += 1;
size += buf.len();
if n >= self.max_msg_num() || size >= self.max_tx_size() {
// Send the tx and enqueue the resulting response
responses.push(self.send_tx(msg_batch)?);
n = 0;
size = 0;
msg_batch = vec![];
}
}
if !msg_batch.is_empty() {
responses.push(self.send_tx(msg_batch)?);
}

Ok(responses)
}

/// Get the account for the signer
fn get_signer(&mut self) -> Result<Signer, Error> {
crate::time!("get_signer");
Expand Down
14 changes: 14 additions & 0 deletions relayer/src/chain/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ pub enum ChainRequest {
reply_to: ReplyTo<Vec<IbcEvent>>,
},

SubmitMsgs {
proto_msgs: Vec<prost_types::Any>,
reply_to: ReplyTo<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>>,
},

Signer {
reply_to: ReplyTo<Signer>,
},
Expand Down Expand Up @@ -316,6 +321,15 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug {
/// and return the list of events emitted by the chain after the transaction was committed.
fn send_msgs(&self, proto_msgs: Vec<prost_types::Any>) -> Result<Vec<IbcEvent>, Error>;

/// Submit messages asynchronously.
/// Does not block waiting on the chain to produce the
/// resulting events. Instead of events, this method
/// returns a set of transaction hashes.
fn submit_msgs(
&self,
proto_msgs: Vec<prost_types::Any>,
) -> Result<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>, Error>;

fn get_signer(&self) -> Result<Signer, Error>;

fn get_key(&self) -> Result<KeyEntry, Error>;
Expand Down
10 changes: 10 additions & 0 deletions relayer/src/chain/handle/prod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ impl ChainHandle for ProdChainHandle {
})
}

fn submit_msgs(
&self,
proto_msgs: Vec<prost_types::Any>,
) -> Result<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>, Error> {
self.send(|reply_to| ChainRequest::SubmitMsgs {
proto_msgs,
reply_to,
})
}

fn get_signer(&self) -> Result<Signer, Error> {
self.send(|reply_to| ChainRequest::Signer { reply_to })
}
Expand Down
7 changes: 7 additions & 0 deletions relayer/src/chain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ impl ChainEndpoint for MockChain {
Ok(events)
}

fn submit_msgs(
&mut self,
_proto_msgs: Vec<Any>,
) -> Result<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>, Error> {
todo!()
}

fn get_signer(&mut self) -> Result<Signer, Error> {
Ok(get_dummy_account_id())
}
Expand Down
16 changes: 16 additions & 0 deletions relayer/src/chain/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ where
self.send_msgs(proto_msgs, reply_to)?
},

Ok(ChainRequest::SubmitMsgs { proto_msgs, reply_to }) => {
self.submit_msgs(proto_msgs, reply_to)?
},

Ok(ChainRequest::Signer { reply_to }) => {
self.get_signer(reply_to)?
}
Expand Down Expand Up @@ -372,6 +376,18 @@ where
Ok(())
}

fn submit_msgs(
&mut self,
proto_msgs: Vec<prost_types::Any>,
reply_to: ReplyTo<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>>,
) -> Result<(), Error> {
let result = self.chain.submit_msgs(proto_msgs);

reply_to.send(result).map_err(Error::send)?;

Ok(())
}

fn query_latest_height(&self, reply_to: ReplyTo<Height>) -> Result<(), Error> {
let latest_height = self.chain.query_latest_height();

Expand Down
14 changes: 12 additions & 2 deletions relayer/src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ use crate::link::relay_path::RelayPath;
pub mod error;
mod operational_data;
mod relay_path;
mod relay_sender;
mod relay_summary;
mod tx_hashes;
mod unconfirmed;
use tx_hashes::TxHashes;

// Re-export the telemetries summary
pub use relay_summary::RelaySummary;
Expand Down Expand Up @@ -150,28 +154,34 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Link<ChainA, ChainB> {
Ok(Link::new(channel))
}

/// Implements the `packet-recv` CLI
pub fn build_and_send_recv_packet_messages(&mut self) -> Result<Vec<IbcEvent>, LinkError> {
self.a_to_b.build_recv_packet_and_timeout_msgs(None)?;

let mut results = vec![];

// Block waiting for all of the scheduled data (until `None` is returned)
while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data() {
let mut last_res = self.a_to_b.relay_from_operational_data(odata)?;
let mut last_res = self
.a_to_b
.relay_from_operational_data::<relay_sender::SyncSender>(odata)?;
results.append(&mut last_res.events);
}

Ok(results)
}

/// Implements the `packet-ack` CLI
pub fn build_and_send_ack_packet_messages(&mut self) -> Result<Vec<IbcEvent>, LinkError> {
self.a_to_b.build_packet_ack_msgs(None)?;

let mut results = vec![];

// Block waiting for all of the scheduled data
while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data() {
let mut last_res = self.a_to_b.relay_from_operational_data(odata)?;
let mut last_res = self
.a_to_b
.relay_from_operational_data::<relay_sender::SyncSender>(odata)?;
results.append(&mut last_res.events);
}

Expand Down
17 changes: 11 additions & 6 deletions relayer/src/link/operational_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ impl OperationalData {
return Ok(vec![]);
}

let mut msgs: Vec<Any> = self.batch.iter().map(|gm| gm.msg.clone()).collect();

// For zero delay we prepend the client update msgs.
if relay_path.zero_delay() {
let client_update_msg = if relay_path.zero_delay() {
let update_height = self.proofs_height.increment();

info!(
Expand All @@ -98,9 +96,16 @@ impl OperationalData {
}
};

if let Some(client_update) = client_update_opt.pop() {
msgs.insert(0, client_update);
}
client_update_opt.pop()
} else {
None
};

let mut msgs: Vec<Any> = self.batch.iter().map(|gm| gm.msg.clone()).collect();
if let Some(client_update) = client_update_msg {
// SAFETY: inserting at position `0` should not panic because
// we already checked that the `batch` vector is non-empty.
msgs.insert(0, client_update);
}

info!(
Expand Down
Loading