Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hermes retrying mechanism for packet clearing #1146

Merged
merged 85 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from 73 commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
a3f2698
temp change for ft-transfer to send one msg/Tx
ancazamfir May 18, 2021
0362b44
event monitor: Bulk events from all transactions included in a block
romac May 19, 2021
c1fb08a
Update changelog
romac May 19, 2021
532a828
temp change for ft-transfer to send one msg/Tx
ancazamfir May 18, 2021
272209a
Merge branch 'anca/evets_multi_tx' of https://github.com/informalsyst…
ancazamfir May 25, 2021
a138b51
Merge branch 'master' into anca/evets_multi_tx
ancazamfir May 25, 2021
104843b
Optimize spawning of workers - draft
ancazamfir May 25, 2021
3214571
Add back check to start workers only if channel is open
ancazamfir May 25, 2021
a7020fa
Merge branch 'master' into anca/optimize_worker_spawn
ancazamfir May 26, 2021
8642269
Cleanup
ancazamfir May 26, 2021
92f82d0
Check connection state
ancazamfir May 26, 2021
a31756a
Merge branch 'master' into anca/evets_multi_tx
ancazamfir May 26, 2021
14ebcc4
temp change for ft-transfer to send one msg/Tx
ancazamfir May 18, 2021
2d2a5dd
Improve config loading message (#933)
romac May 26, 2021
b0f4a38
Merge branch 'anca/evets_multi_tx' of https://github.com/informalsyst…
ancazamfir May 26, 2021
6a13f0c
Merge branch 'master' into anca/evets_multi_tx
ancazamfir May 31, 2021
90cd221
Migration to tx sync
ancazamfir Jun 1, 2021
21ba39a
Merge branch 'master' into anca/evets_multi_tx
ancazamfir Jun 1, 2021
412876e
Merge branch 'master' into anca/evets_multi_tx
ancazamfir Jun 9, 2021
490c236
Add Tx simulate
ancazamfir Jun 13, 2021
9e96177
Merge branch 'master' into anca/evets_multi_tx
ancazamfir Jun 15, 2021
de0a85e
Add adjustment to the tx simulate result, some cleanup
ancazamfir Jun 15, 2021
9ed71e2
Nitpick in CosmosSdkChain::key_and_bytes
romac Jun 16, 2021
9452c16
Small cleanup
romac Jun 16, 2021
b00063c
Remove duplicate send_msgs method
romac Jun 16, 2021
529c20d
Cleanup config file
romac Jun 16, 2021
a799a89
Fix typo after refactoring
romac Jun 16, 2021
041c78a
Fix `query packet tx` description
romac Jun 16, 2021
92df6b5
Rework `wait_for_block_commits` to use the `retry` crate
romac Jun 16, 2021
d211189
Compute tx fee based on gas from tx simulate and gas price
ancazamfir Jun 16, 2021
a59e0db
Merge branch 'anca/evets_multi_tx' of https://github.com/informalsyst…
ancazamfir Jun 16, 2021
0729957
Re-add missing error type
romac Jun 16, 2021
298d605
Combine `fee_denom` and `gas_price` into `gas_price` config option
romac Jun 16, 2021
b594072
Add tests for `mul_ceil`
romac Jun 16, 2021
946ac5a
Fix config serialization
romac Jun 16, 2021
abbc83f
Remove `fee_amount` config property
romac Jun 17, 2021
8b7e733
Update changelog
romac Jun 17, 2021
7785370
Avoid op data regeneration if retries exhausted.
adizere Jun 17, 2021
32f3f41
Increase the number of retries while checking Tx is included in a block.
ancazamfir Jun 17, 2021
2a9bdb6
Move `query packet tx` to `query tx events`
romac Jun 17, 2021
99d4226
Merge branch 'master' into anca/evets_multi_tx
romac Jun 17, 2021
7d8eb15
better error msgs
adizere Jun 18, 2021
954b4d6
Merge branch 'master' into anca/evets_multi_tx
adizere Jun 18, 2021
816ae0b
Add Display instance for GasPrice
romac Jun 18, 2021
793f9e6
Fix default gas price denomination
romac Jun 18, 2021
b6307ed
Improve some debug messages
romac Jun 18, 2021
faf55a9
Rename `gas_price.amount` to `gase_price.price`
romac Jun 18, 2021
d2be250
Add configurable fee adjustment to make it more likely gas estimate e…
romac Jun 18, 2021
7c0ac36
Add Tx hash to ChainErr
ancazamfir Jun 18, 2021
eb9d8e1
Merge branch 'anca/evets_multi_tx' of https://github.com/informalsyst…
ancazamfir Jun 18, 2021
d4f2eaa
Fix config files
romac Jun 18, 2021
4a6d2f4
Masked tonic::Code::NotFound result for query_client_connections.
adizere Jun 18, 2021
71c8828
Modified cfg option from gas to max_gas
adizere Jun 18, 2021
1d35f23
Consistent trust_threshold in default config.toml
adizere Jun 18, 2021
4b0a941
Revert guide updates
romac Jun 18, 2021
deb77fd
Nit: Imports for query.rs
adizere Jun 18, 2021
4bd7892
Print info message when Hermes starts
romac Jun 18, 2021
ffe02a0
Implement basic filtering based on channel identifiers
romac Jun 18, 2021
4f1cfb2
Add per chain filters, only channel based filtering support
ancazamfir Jun 18, 2021
115712d
Fix gas adjustement to be percentage on top of computed gas
ancazamfir Jun 19, 2021
0886401
Attempt to fix gas_limit
adizere Jun 21, 2021
fb2cf54
Fix chain spawn unrwap for QueryUnreceivedPacketsCmd
ancazamfir Jun 22, 2021
8a2bfa1
Merge branch 'master' into romac/channel-filter
ancazamfir Jun 24, 2021
cd45364
Retry on no confirmation
ancazamfir Jun 24, 2021
054ff2a
Print simulation errors. Trim clear packets output.
adizere Jun 28, 2021
25e03eb
Retry timeout parametrized parametrized by rpc_timeout
adizere Jun 28, 2021
4c7d9a4
Bring back NoConfirmation error and don't retry.
ancazamfir Jun 28, 2021
d9987d2
Don't log during clearing packets unless there is work to do.
ancazamfir Jun 28, 2021
5db2bb2
Merge branch 'master' into anca/no_confirmation
adizere Jul 2, 2021
7b5075c
Revert printing of simulation error.
adizere Jul 5, 2021
4d90288
Cleanup
adizere Jul 5, 2021
a25fc5b
Merge branch 'master' into anca/no_confirmation
adizere Jul 5, 2021
9e9b935
Consolidated output for packet clearing methods
adizere Jul 5, 2021
67674db
Cleaner code around clear_packets method
adizere Jul 5, 2021
f05a35c
Undid -k option for packet-recv
adizere Jul 5, 2021
89a72e1
More context loaded into TxNoConfirmation
adizere Jul 5, 2021
58c373a
More context when uni path/link worker encounters error
adizere Jul 6, 2021
7d5c6b9
Improve tx no confirmation error message
romac Jul 7, 2021
50eadd6
Make clearing packets interval configurable
romac Jul 7, 2021
4df5e5b
Update changelog
romac Jul 7, 2021
4ebb769
Update CI config
romac Jul 7, 2021
e9dd3ef
Better defaults
romac Jul 7, 2021
83203a7
Remove backup file
romac Jul 7, 2021
d501006
Merge branch 'master' into anca/no_confirmation
romac Jul 7, 2021
6e838f6
Formatting
romac Jul 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion relayer-cli/src/commands/tx/packet.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use abscissa_core::{Command, Options, Runnable};
use abscissa_core::config::Override;
use abscissa_core::{Command, FrameworkErrorKind, Options, Runnable};

use ibc::events::IbcEvent;
use ibc::ics24_host::identifier::{ChainId, ChannelId, PortId};
use ibc_relayer::link::{Link, LinkParameters};

use crate::cli_utils::ChainHandlePair;
use crate::conclude::Output;
use crate::config::Config;
use crate::error::{Error, Kind};
use crate::prelude::*;

Expand All @@ -22,6 +24,26 @@ pub struct TxRawPacketRecvCmd {

#[options(free, required, help = "identifier of the source channel")]
src_channel_id: ChannelId,

#[options(
help = "use the given signing key (default: `key_name` config)",
short = "k"
)]
key: Option<String>,
}

impl Override<Config> for TxRawPacketRecvCmd {
fn override_config(&self, mut config: Config) -> Result<Config, abscissa_core::FrameworkError> {
let src_chain_config = config.find_chain_mut(&self.src_chain_id).ok_or_else(|| {
FrameworkErrorKind::ComponentError.context("missing src chain configuration")
})?;

if let Some(ref key_name) = self.key {
src_chain_config.key_name = key_name.to_string();
}

Ok(config)
}
adizere marked this conversation as resolved.
Show resolved Hide resolved
}

impl Runnable for TxRawPacketRecvCmd {
Expand Down
79 changes: 41 additions & 38 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ mod retry_strategy {
use crate::util::retry::Fixed;
use std::time::Duration;

pub fn wait_for_block_commits() -> impl Iterator<Item = Duration> {
// The total time should be higher than the full node timeout which defaults to 10sec.
Fixed::from_millis(300).take(40) // 12 seconds
pub fn wait_for_block_commits(max_total_wait: Duration) -> impl Iterator<Item = Duration> {
let backoff_millis = 300; // The periodic backoff
let count: usize = (max_total_wait.as_millis() / backoff_millis as u128) as usize;
Fixed::from_millis(backoff_millis).take(count)
}
}

Expand Down Expand Up @@ -495,53 +496,55 @@ impl CosmosSdkChain {
thread::sleep(Duration::from_millis(200));

let start = Instant::now();
let result = retry_with_index(
retry_strategy::wait_for_block_commits(self.config.rpc_timeout),
|index| {
if all_tx_results_found(&tx_sync_results) {
trace!(
"wait_for_block_commits: retrieved {} tx results after {} tries ({}ms)",
tx_sync_results.len(),
index,
start.elapsed().as_millis()
);

let result = retry_with_index(retry_strategy::wait_for_block_commits(), |index| {
if all_tx_results_found(&tx_sync_results) {
trace!(
"wait_for_block_commits: retrieved {} tx results after {} tries ({}ms)",
tx_sync_results.len(),
index,
start.elapsed().as_millis()
);

// All transactions confirmed
return RetryResult::Ok(());
}
// All transactions confirmed
return RetryResult::Ok(());
}

for TxSyncResult { response, events } in tx_sync_results.iter_mut() {
// If this transaction was not committed, determine whether it was because it failed
// or because it hasn't been committed yet.
if empty_event_present(&events) {
// If the transaction failed, replace the events with an error,
// so that we don't attempt to resolve the transaction later on.
if response.code.value() != 0 {
*events = vec![IbcEvent::ChainError(format!(
"deliver_tx for Tx hash {} reports error: code={:?}, log={:?}",
response.hash, response.code, response.log
for TxSyncResult { response, events } in tx_sync_results.iter_mut() {
// If this transaction was not committed, determine whether it was because it failed
// or because it hasn't been committed yet.
if empty_event_present(&events) {
// If the transaction failed, replace the events with an error,
// so that we don't attempt to resolve the transaction later on.
if response.code.value() != 0 {
*events = vec![IbcEvent::ChainError(format!(
"deliver_tx on chain {} for Tx hash {} reports error: code={:?}, log={:?}",
self.id(), response.hash, response.code, response.log
))];

// Otherwise, try to resolve transaction hash to the corresponding events.
} else if let Ok(events_per_tx) =
self.query_txs(QueryTxRequest::Transaction(QueryTxHash(response.hash)))
{
// If we get events back, progress was made, so we replace the events
// with the new ones. in both cases we will check in the next iteration
// whether or not the transaction was fully committed.
if !events_per_tx.is_empty() {
*events = events_per_tx;
// Otherwise, try to resolve transaction hash to the corresponding events.
} else if let Ok(events_per_tx) =
self.query_txs(QueryTxRequest::Transaction(QueryTxHash(response.hash)))
{
// If we get events back, progress was made, so we replace the events
// with the new ones. in both cases we will check in the next iteration
// whether or not the transaction was fully committed.
if !events_per_tx.is_empty() {
*events = events_per_tx;
}
}
}
}
}
RetryResult::Retry(index)
});
RetryResult::Retry(index)
},
);

match result {
// All transactions confirmed
Ok(()) => Ok(tx_sync_results),
// Did not find confirmation
Err(_) => Err(Kind::TxNoConfirmation.into()),
Err(_) => Err(Kind::TxNoConfirmation(format!("from chain {}", self.id())).into()),
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions relayer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ pub enum Kind {
#[error("Failed to create client state")]
BuildClientStateFailure,

/// Tx failure due to lack of confirmation
#[error("Failed Tx: no confirmation {0}")]
TxNoConfirmation(String),

/// Gas estimate from simulated Tx exceeds the maximum configured
#[error("{chain_id} gas estimate {estimated_gas} from simulated Tx exceeds the maximum configured {max_gas}")]
TxSimulateGasEstimateExceeded {
Expand All @@ -86,10 +90,6 @@ pub enum Kind {
max_gas: u64,
},

/// Tx failure for lack of confirmation
#[error("Failed Tx: no confirmation")]
TxNoConfirmation,

/// Create client failure
#[error("Failed to create client {0}")]
CreateClient(String),
Expand Down
92 changes: 58 additions & 34 deletions relayer/src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::fmt;
use std::thread;
use std::time::Instant;

use itertools::Itertools;
use prost_types::Any;
use thiserror::Error;
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -397,7 +398,6 @@ impl RelayPath {
}

fn relay_pending_packets(&mut self, height: Height) -> Result<(), LinkError> {
info!("[{}] clearing old packets", self);
for _ in 0..MAX_RETRIES {
if self
.build_recv_packet_and_timeout_msgs(Some(height))
Expand All @@ -412,7 +412,7 @@ impl RelayPath {

/// Should not run more than once per execution.
pub fn clear_packets(&mut self, above_height: Height) -> Result<(), LinkError> {
if self.clear_packets {
if self.clear_packets || above_height.revision_height % 100 == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100 should be configurable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what's the best way to make this configurable. Do you suggest we put an option in config.toml? Seems clunky to me.

info!(
"[{}] clearing pending packets from events before height {}",
self, above_height
Expand All @@ -423,7 +423,10 @@ impl RelayPath {

self.relay_pending_packets(clear_height)?;

info!("[{}] finished clearing pending packets", self);
info!(
"[{}] finished scheduling the clearing of pending packets",
self
);

self.clear_packets = false;
}
Expand Down Expand Up @@ -721,7 +724,8 @@ impl RelayPath {
}

/// Sends a transaction to the chain targeted by the operational data `odata`.
/// If the transaction generates an error, returns the error as well as `LinkError::SendError` if input events if a sending failure occurs.
/// If the transaction generates an error, returns the error as well as `LinkError::SendError`
/// if input events if a sending failure occurs.
/// Returns the events generated by the target chain upon success.
fn send_from_operational_data(
&mut self,
Expand Down Expand Up @@ -921,19 +925,13 @@ impl RelayPath {
if packet_commitments.is_empty() {
return Ok((events_result, query_height));
}
let commit_sequences = packet_commitments.iter().map(|p| p.sequence).collect();
debug!(
"[{}] packets that still have commitments on {}: {:?}",
self,
self.src_chain().id(),
commit_sequences
);
let commit_sequences: Vec<u64> = packet_commitments.iter().map(|p| p.sequence).collect();

// Get the packets that have not been received on destination chain
let request = QueryUnreceivedPacketsRequest {
port_id: self.dst_port_id().to_string(),
channel_id: self.dst_channel_id()?.to_string(),
packet_commitment_sequences: commit_sequences,
packet_commitment_sequences: commit_sequences.clone(),
};

let sequences: Vec<Sequence> = self
Expand All @@ -943,17 +941,25 @@ impl RelayPath {
.map(From::from)
.collect();

if sequences.is_empty() {
return Ok((events_result, query_height));
}

debug!(
"[{}] recv packets to send out to {} of the ones with commitments on source {}: {:?}",
"[{}] packets that still have commitments on {}: {} (first 10 shown here; total={})",
self,
self.dst_chain().id(),
self.src_chain().id(),
sequences
commit_sequences.iter().take(10).join(", "),
commit_sequences.len()
);

if sequences.is_empty() {
return Ok((events_result, query_height));
}
debug!(
"[{}] recv packets to send out to {} of the ones with commitments on source {}: {} (first 10 shown here; total={})",
self,
self.dst_chain().id(),
self.src_chain().id(),
sequences.iter().take(10).join(", "), sequences.len()
);

let query = QueryTxRequest::Packet(QueryPacketEventDataRequest {
event_id: IbcEventType::SendPacket,
Expand All @@ -972,8 +978,17 @@ impl RelayPath {
let send_event = downcast!(event => IbcEvent::SendPacket)
.ok_or_else(|| LinkError::Failed("unexpected query tx response".into()))?;
packet_sequences.push(send_event.packet.sequence);
if packet_sequences.len() > 10 {
// Enough to print the first 10
break;
}
}
debug!("[{}] received from query_txs {:?}", self, packet_sequences);
info!(
"[{}] found unprocessed SendPacket events for {:?} (first 10 shown here; total={})",
self,
packet_sequences,
events_result.len()
);

Ok((events_result, query_height))
}
Expand Down Expand Up @@ -1006,18 +1021,13 @@ impl RelayPath {
return Ok((events_result, query_height));
}

let acked_sequences = acks_on_source.iter().map(|p| p.sequence).collect();
debug!(
"[{}] packets that have acknowledgments on {} {:?}",
self,
self.src_chain().id(),
acked_sequences
);
let mut acked_sequences: Vec<u64> = acks_on_source.iter().map(|p| p.sequence).collect();
acked_sequences.sort_unstable();

let request = QueryUnreceivedAcksRequest {
port_id: self.dst_port_id().to_string(),
channel_id: dst_channel_id.to_string(),
packet_ack_sequences: acked_sequences,
packet_ack_sequences: acked_sequences.clone(),
};

let sequences: Vec<Sequence> = self
Expand All @@ -1027,17 +1037,27 @@ impl RelayPath {
.into_iter()
.map(From::from)
.collect();

if sequences.is_empty() {
return Ok((events_result, query_height));
}

debug!(
"[{}] ack packets to send out to {} of the ones with acknowledgments on {}: {:?}",
"[{}] packets that have acknowledgments on {}: [{:?}..{:?}] (total={})",
self,
self.dst_chain().id(),
self.src_chain().id(),
sequences
acked_sequences.first(),
acked_sequences.last(),
acked_sequences.len()
);

if sequences.is_empty() {
return Ok((events_result, query_height));
}
debug!(
"[{}] ack packets to send out to {} of the ones with acknowledgments on {}: {} (first 10 shown here; total={})",
self,
self.dst_chain().id(),
self.src_chain().id(),
sequences.iter().take(10).join(", "), sequences.len()
);

events_result = self
.src_chain()
Expand All @@ -1057,8 +1077,12 @@ impl RelayPath {
let write_ack_event = downcast!(event => IbcEvent::WriteAcknowledgement)
.ok_or_else(|| LinkError::Failed("unexpected query tx response".into()))?;
packet_sequences.push(write_ack_event.packet.sequence);
if packet_sequences.len() > 10 {
// Enough to print the first 10
break;
}
}
info!("[{}] received from query_txs {:?}", self, packet_sequences);
info!("[{}] found unprocessed WriteAcknowledgement events for {:?} (first 10 shown here; total={})", self, packet_sequences, events_result.len());

Ok((events_result, query_height))
}
Expand Down