Skip to content

Commit

Permalink
Hermes retrying mechanism for packet clearing (#1146)
Browse files Browse the repository at this point in the history
* temp change for ft-transfer to send one msg/Tx

* event monitor: Bulk events from all transactions included in a block

* Update changelog

* temp change for ft-transfer to send one msg/Tx

* Optimize spawning of workers - draft

* Add back check to start workers only if channel is open

* Cleanup

* Check connection state

* temp change for ft-transfer to send one msg/Tx

* Improve config loading message (#933)

* Improve config load message

* Raised log level to error. Log link to config example in the guide.

* Changelog

Co-authored-by: Adi Seredinschi <adi@informal.systems>

* Migration to tx sync

* Add Tx simulate

* Add adjustment to the tx simulate result, some cleanup

* Nitpick in CosmosSdkChain::key_and_bytes

* Small cleanup

* Remove duplicate send_msgs method

* Cleanup config file

* Fix typo after refactoring

* Fix `query packet tx` description

* Rework `wait_for_block_commits` to use the `retry` crate

* Compute tx fee based on gas from tx simulate and gas price

* Re-add missing error type

* Combine `fee_denom` and `gas_price` into `gas_price` config option

* Add tests for `mul_ceil`

* Fix config serialization

* Remove `fee_amount` config property

* Update changelog

* Avoid op data regeneration if retries exhausted.

* Increase the number of retries while checking Tx is included in a block.

* Move `query packet tx` to `query tx events`

* better error msgs

* Add Display instance for GasPrice

* Fix default gas price denomination

* Improve some debug messages

* Rename `gas_price.amount` to `gase_price.price`

* Add configurable fee adjustment to make it more likely gas estimate ends up being enough

* Add Tx hash to ChainErr

* Fix config files

* Masked tonic::Code::NotFound result for query_client_connections.

* Modified cfg option from gas to max_gas

* Consistent trust_threshold in default config.toml

* Revert guide updates

* Nit: Imports for query.rs

* Print info message when Hermes starts

* Implement basic filtering based on channel identifiers

* Add per chain filters, only channel based filtering support

* Fix gas adjustement to be percentage on top of computed gas

* Attempt to fix gas_limit

* Fix chain spawn unrwap for QueryUnreceivedPacketsCmd

* Retry on no confirmation

* Print simulation errors. Trim clear packets output.

* Retry timeout parametrized parametrized by rpc_timeout

* Bring back NoConfirmation error and don't retry.

Trigger packet clearing every 100 blocks.

* Don't log during clearing packets unless there is work to do.

* Revert printing of simulation error.

Revert commit 054ff2a partly.
Reason: printing of simulation error is handled separately
in parallel PR #1137.

* Cleanup

* Consolidated output for packet clearing methods

* Cleaner code around clear_packets method

* Undid -k option for packet-recv

* More context loaded into TxNoConfirmation

* More context when uni path/link worker encounters error

* Improve tx no confirmation error message

* Make clearing packets interval configurable

* Update changelog

* Update CI config

* Better defaults

* Remove backup file

* Formatting

Co-authored-by: Anca Zamfir <zamfiranca@gmail.com>
Co-authored-by: Romain Ruetschi <romain@informal.systems>
  • Loading branch information
3 people authored Jul 7, 2021
1 parent 05afbfe commit d9eba70
Show file tree
Hide file tree
Showing 12 changed files with 234 additions and 112 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ reliability of Hermes ([#697]).
- Update to `tendermint-rs` v0.20.0 ([#1125])
- Add inline documentation to config.toml ([#1127])

- [ibc-relayer]
- Hermes will now clear pending packets at a configurable interval ([#1124])

### BUG FIXES

- [ibc-relayer]
Expand All @@ -29,6 +32,7 @@ reliability of Hermes ([#697]).
[#1062]: https://github.com/informalsystems/ibc-rs/issues/1062
[#1057]: https://github.com/informalsystems/ibc-rs/issues/1057
[#1125]: https://github.com/informalsystems/ibc-rs/issues/1125
[#1124]: https://github.com/informalsystems/ibc-rs/issues/1124
[#1127]: https://github.com/informalsystems/ibc-rs/issues/1127
[#1140]: https://github.com/informalsystems/ibc-rs/issues/1140
[#1143]: https://github.com/informalsystems/ibc-rs/issues/1143
Expand Down
1 change: 1 addition & 0 deletions ci/simple_config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[global]
strategy = 'all'
log_level = 'info'
clear_packets_interval = 100

[[chains]]
id = 'ibc-0'
Expand Down
3 changes: 3 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ filter = true
# Valid options are 'error', 'warn', 'info', 'debug', 'trace'.
log_level = 'info'

# Interval (in number of blocks) at which pending packets
# should be eagerly cleared. Default: 100
clear_packets_interval = 100

# The telemetry section defines parameters for Hermes' built-in telemetry capabilities.
# https://hermes.informal.systems/telemetry.html
Expand Down
90 changes: 52 additions & 38 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
use anomaly::fail;
use bech32::{ToBase32, Variant};
use bitcoin::hashes::hex::ToHex;
use itertools::Itertools;
use prost::Message;
use prost_types::Any;
use tendermint::abci::Path as TendermintABCIPath;
Expand Down Expand Up @@ -96,9 +97,10 @@ mod retry_strategy {
use crate::util::retry::Fixed;
use std::time::Duration;

pub fn wait_for_block_commits() -> impl Iterator<Item = Duration> {
// The total time should be higher than the full node timeout which defaults to 10sec.
Fixed::from_millis(300).take(40) // 12 seconds
pub fn wait_for_block_commits(max_total_wait: Duration) -> impl Iterator<Item = Duration> {
let backoff_millis = 300; // The periodic backoff
let count: usize = (max_total_wait.as_millis() / backoff_millis as u128) as usize;
Fixed::from_millis(backoff_millis).take(count)
}
}

Expand Down Expand Up @@ -598,57 +600,69 @@ impl CosmosSdkChain {

trace!("waiting for commit of block(s)");

let hashes = tx_sync_results
.iter()
.map(|res| res.response.hash.to_string())
.join(", ");

// Wait a little bit initially
thread::sleep(Duration::from_millis(200));

let start = Instant::now();
let result = retry_with_index(
retry_strategy::wait_for_block_commits(self.config.rpc_timeout),
|index| {
if all_tx_results_found(&tx_sync_results) {
trace!(
"wait_for_block_commits: retrieved {} tx results after {} tries ({}ms)",
tx_sync_results.len(),
index,
start.elapsed().as_millis()
);

let result = retry_with_index(retry_strategy::wait_for_block_commits(), |index| {
if all_tx_results_found(&tx_sync_results) {
trace!(
"wait_for_block_commits: retrieved {} tx results after {} tries ({}ms)",
tx_sync_results.len(),
index,
start.elapsed().as_millis()
);

// All transactions confirmed
return RetryResult::Ok(());
}
// All transactions confirmed
return RetryResult::Ok(());
}

for TxSyncResult { response, events } in tx_sync_results.iter_mut() {
// If this transaction was not committed, determine whether it was because it failed
// or because it hasn't been committed yet.
if empty_event_present(&events) {
// If the transaction failed, replace the events with an error,
// so that we don't attempt to resolve the transaction later on.
if response.code.value() != 0 {
*events = vec![IbcEvent::ChainError(format!(
"deliver_tx for Tx hash {} reports error: code={:?}, log={:?}",
response.hash, response.code, response.log
for TxSyncResult { response, events } in tx_sync_results.iter_mut() {
// If this transaction was not committed, determine whether it was because it failed
// or because it hasn't been committed yet.
if empty_event_present(&events) {
// If the transaction failed, replace the events with an error,
// so that we don't attempt to resolve the transaction later on.
if response.code.value() != 0 {
*events = vec![IbcEvent::ChainError(format!(
"deliver_tx on chain {} for Tx hash {} reports error: code={:?}, log={:?}",
self.id(), response.hash, response.code, response.log
))];

// Otherwise, try to resolve transaction hash to the corresponding events.
} else if let Ok(events_per_tx) =
self.query_txs(QueryTxRequest::Transaction(QueryTxHash(response.hash)))
{
// If we get events back, progress was made, so we replace the events
// with the new ones. in both cases we will check in the next iteration
// whether or not the transaction was fully committed.
if !events_per_tx.is_empty() {
*events = events_per_tx;
// Otherwise, try to resolve transaction hash to the corresponding events.
} else if let Ok(events_per_tx) =
self.query_txs(QueryTxRequest::Transaction(QueryTxHash(response.hash)))
{
// If we get events back, progress was made, so we replace the events
// with the new ones. in both cases we will check in the next iteration
// whether or not the transaction was fully committed.
if !events_per_tx.is_empty() {
*events = events_per_tx;
}
}
}
}
}
RetryResult::Retry(index)
});
RetryResult::Retry(index)
},
);

match result {
// All transactions confirmed
Ok(()) => Ok(tx_sync_results),
// Did not find confirmation
Err(_) => Err(Kind::TxNoConfirmation.into()),
Err(_) => Err(Kind::TxNoConfirmation(format!(
"from chain {} for hash(es) {}",
self.id(),
hashes
))
.into()),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion relayer/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ impl Channel {

match self.handshake_step(state) {
Err(e) => {
error!("Failed {:?} with error {}", state, e);
error!("Failed Chan{:?} with error: {}", state, e);
RetryResult::Retry(index)
}
Ok(ev) => {
Expand Down
17 changes: 14 additions & 3 deletions relayer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ impl Default for ChainFilters {
pub mod default {
use super::*;

pub fn filter() -> bool {
false
}

pub fn clear_packets_interval() -> u64 {
100
}

pub fn rpc_timeout() -> Duration {
Duration::from_secs(10)
}
Expand Down Expand Up @@ -134,17 +142,20 @@ impl fmt::Display for LogLevel {
#[serde(default, deny_unknown_fields)]
pub struct GlobalConfig {
pub strategy: Strategy,
#[serde(default)]
pub filter: bool,
pub log_level: LogLevel,
#[serde(default = "default::filter")]
pub filter: bool,
#[serde(default = "default::clear_packets_interval")]
pub clear_packets_interval: u64,
}

impl Default for GlobalConfig {
fn default() -> Self {
Self {
strategy: Strategy::default(),
filter: false,
log_level: LogLevel::default(),
filter: default::filter(),
clear_packets_interval: default::clear_packets_interval(),
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions relayer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ pub enum Kind {
#[error("Failed to create client state")]
BuildClientStateFailure,

/// Did not find tx confirmation
#[error("did not find tx confirmation {0}")]
TxNoConfirmation(String),

/// Gas estimate from simulated Tx exceeds the maximum configured
#[error("{chain_id} gas estimate {estimated_gas} from simulated Tx exceeds the maximum configured {max_gas}")]
TxSimulateGasEstimateExceeded {
Expand All @@ -86,10 +90,6 @@ pub enum Kind {
max_gas: u64,
},

/// Tx failure for lack of confirmation
#[error("Failed Tx: no confirmation")]
TxNoConfirmation,

/// Create client failure
#[error("Failed to create client {0}")]
CreateClient(String),
Expand Down
Loading

0 comments on commit d9eba70

Please sign in to comment.