Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ordered channels more resilient in the face of failing packets #3610

Merged
merged 48 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
f0281a8
Start scaffolding ica_ordered_channel test
seanchen1991 Sep 1, 2023
92c2ec7
Disable packet clearing
seanchen1991 Sep 8, 2023
27f762a
Add ica_ordered_channel test
seanchen1991 Sep 13, 2023
d54a891
Merge branch 'master' of https://github.com/informalsystems/hermes in…
seanchen1991 Sep 13, 2023
54abddf
Move some imports around
seanchen1991 Sep 13, 2023
7eefadc
Clean up imports
seanchen1991 Sep 14, 2023
bc16d41
Add sleep calls in between supervisor runs
seanchen1991 Sep 18, 2023
94f7f83
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Sep 18, 2023
083b81a
Formatting
romac Oct 2, 2023
f649204
Fix compilation issues
romac Oct 2, 2023
98c89b6
Merge branch 'master' into sean/ordered-channels-resilience
romac Oct 12, 2023
3facd4e
Merge branch 'master' of https://github.com/informalsystems/hermes in…
seanchen1991 Oct 16, 2023
08b479d
Emphasize wording in documentation
seanchen1991 Oct 17, 2023
1781c20
Merge branch 'sean/ordered-channels-resilience' of https://github.com…
seanchen1991 Oct 17, 2023
ed173a2
Merge branch 'master' of https://github.com/informalsystems/hermes in…
seanchen1991 Oct 30, 2023
e1d48d2
Fill in code from discussion
seanchen1991 Oct 30, 2023
7480c1f
Rename TrakingId::ClearId to TrackingId::PacketClearing
seanchen1991 Nov 2, 2023
6fc48b4
Compile ica ordered channel test under the ica feature flag
seanchen1991 Nov 2, 2023
53df768
Cargo fmt
seanchen1991 Nov 2, 2023
0538481
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Nov 2, 2023
7d9d367
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Nov 8, 2023
25f0974
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Nov 9, 2023
f927d0f
Move interchain_send_tx fn to test-framework crate
seanchen1991 Nov 10, 2023
9eeb84e
Merge branch 'sean/ordered-channels-resilience' of https://github.com…
seanchen1991 Nov 10, 2023
3cd60e2
Cargo fmt
seanchen1991 Nov 10, 2023
fca4417
Update relayer config for consumer chain
seanchen1991 Nov 13, 2023
85466b0
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Nov 14, 2023
ccf329f
Move ica_ordered_channel test under the ica feature
seanchen1991 Nov 14, 2023
6f9f691
Merge branch 'sean/ordered-channels-resilience' of https://github.com…
seanchen1991 Nov 14, 2023
112c4f8
Move ica_transfer test under ica feature
seanchen1991 Nov 14, 2023
1dbe79c
Check that ICA channel is eventually established using the supervisor
seanchen1991 Nov 14, 2023
fdc352b
Fix clippy warnings
seanchen1991 Nov 14, 2023
50379f9
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Nov 28, 2023
a7836e1
Merge branch 'master' into sean/ordered-channels-resilience
romac Jan 5, 2024
cc81934
Improve logs
romac Jan 12, 2024
7cbad88
Merge branch 'master' into sean/ordered-channels-resilience
romac Jan 12, 2024
19d1065
Add changelog entry
romac Jan 12, 2024
8e85859
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Feb 15, 2024
107a175
Merge branch 'master' into sean/ordered-channels-resilience
romac Feb 20, 2024
a6506bf
Fix compilation of ICA tests
romac Feb 27, 2024
11abdd7
Add `force_disable_clear_on_start` config option, only available in t…
romac Feb 27, 2024
7acbfa4
Cleanup
romac Feb 27, 2024
3a5acfc
Check whether packet clear is needed instead of reacting to error whe…
romac Feb 27, 2024
a364e6e
Merge branch 'master' into sean/ordered-channels-resilience
romac Feb 27, 2024
b2c7fea
Force disable clear on start in ICA ordered channel test
romac Feb 27, 2024
92fe20e
Update changelog entry
romac Feb 27, 2024
00745da
Improve ICA ordered channel test asserts
ljoss17 Feb 28, 2024
dd579ec
Merge branch 'master' into sean/ordered-channels-resilience
ljoss17 Feb 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- Improve resilience when relaying on ordered channels. When a packet
fails to be relayed on an ordered channel, this used to block the
channel until either another relayer relayed the packet successfully or
until packet clearing kicked off. Hermes will now detect the failure
and attempt to clear packets on the channel in order to unblock it.
([\#3540](https://github.com/informalsystems/hermes/issues/3540))
13 changes: 9 additions & 4 deletions crates/relayer/src/chain/tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum TrackingId {
/// the CLI or during packet clearing.
Static(&'static str),
/// Random identifier used to track latency of packet clearing.
ClearedUuid(Uuid),
PacketClearing(Uuid),
}

impl TrackingId {
Expand All @@ -29,8 +29,13 @@ impl TrackingId {
Self::Static(s)
}

pub fn new_cleared_uuid() -> Self {
Self::ClearedUuid(Uuid::new_v4())
pub fn new_packet_clearing() -> Self {
Self::PacketClearing(Uuid::new_v4())
}

/// Indicates whether a packet clearing process is currently in-progress.
pub fn is_clearing(&self) -> bool {
matches!(self, Self::PacketClearing(_))
}
}

Expand All @@ -43,7 +48,7 @@ impl Display for TrackingId {
s.fmt(f)
}
TrackingId::Static(s) => s.fmt(f),
TrackingId::ClearedUuid(u) => {
TrackingId::PacketClearing(u) => {
let mut uuid = "cleared/".to_owned();
let mut s = u.to_string();
s.truncate(8);
Expand Down
20 changes: 15 additions & 5 deletions crates/relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
fn relay_pending_packets(&self, height: Option<Height>) -> Result<(), LinkError> {
let _span = span!(Level::ERROR, "relay_pending_packets", ?height).entered();

let tracking_id = TrackingId::new_cleared_uuid();
let tracking_id = TrackingId::new_packet_clearing();
telemetry!(received_event_batch, tracking_id);

let src_config = self.src_chain().config().map_err(LinkError::relayer)?;
Expand Down Expand Up @@ -676,12 +676,22 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {

return Ok(reply);
}
Err(LinkError(error::LinkErrorDetail::Send(e), _)) => {
// This error means we could retry
error!("error {}", e.event);
Err(LinkError(error::LinkErrorDetail::Send(_), _)) => {
// If on an ordered channel, perform a packet clearing, but only if we
// are not in the middle of another packet clearing process
if self.ordered_channel() && i == 0 && !odata.tracking_id.is_clearing() {
warn!("Failed to relay to ordered channel, attempting to recover by clearing packets");

// We do need to specify the height for the packet clearing,
// since no progress will have been made on the clearing process
self.relay_pending_packets(None)?;
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo, we should not check on error but rather when we attempt to clear a packet on an IBC event.
Here are some tentative diffs:

index 2eb85f633..4d4471143 100644
--- a/crates/relayer/src/worker/packet.rs
+++ b/crates/relayer/src/worker/packet.rs
@@ -17,12 +17,14 @@ use ibc_proto::ibc::apps::fee::v1::{IdentifiedPacketFees, QueryIncentivizedPacke
 use ibc_proto::ibc::core::channel::v1::PacketId;
 use ibc_relayer_types::applications::ics29_fee::events::IncentivizedPacket;
 use ibc_relayer_types::applications::transfer::{Amount, Coin, RawCoin};
+use ibc_relayer_types::core::ics04_channel::channel::Ordering;
 use ibc_relayer_types::core::ics04_channel::events::WriteAcknowledgement;
 use ibc_relayer_types::core::ics04_channel::packet::Sequence;
 use ibc_relayer_types::events::{IbcEvent, IbcEventType};
 use ibc_relayer_types::Height;

 use crate::chain::handle::ChainHandle;
+use crate::chain::requests::{IncludeProof, QueryHeight, QueryNextSequenceReceiveRequest};
 use crate::config::filter::FeePolicy;
 use crate::event::source::EventBatch;
 use crate::foreign_client::HasExpiredOrFrozenError;
@@ -186,7 +188,7 @@ pub fn spawn_incentivized_packet_cmd_worker<ChainA: ChainHandle, ChainB: ChainHa
 /// Receives worker commands and handles them accordingly.
 ///
 /// Given an `IbcEvent` command, updates the schedule and initiates
-/// packet clearing if the `should_clear_on_start` flag has been toggled.
+/// packet clearing if the `should_clear_on_start` flag has been set or dependent packets must be cleared on ordred channels.
 ///
 /// Given a `NewBlock` command, checks if packet clearing should occur
 /// and performs it if so.
@@ -205,7 +207,19 @@ fn handle_packet_cmd<ChainA: ChainHandle, ChainB: ChainHandle>(
     // Handle packet clearing which is triggered from a command
     let (do_clear, maybe_height) = match &cmd {
         WorkerCmd::IbcEvents { batch } => {
-            if *should_clear_on_start {
+            let channel_ordering = link.a_to_b.channel().ordering;
+            let (_next_seq, _) = link.a_to_b.dst_chain().query_next_sequence_receive(
+                    QueryNextSequenceReceiveRequest {
+                        port_id: link.a_to_b.dst_port_id().clone(),
+                        channel_id: link.a_to_b.dst_channel_id().clone(),
+                        height: QueryHeight::Specific(batch.height),
+                    },
+                    IncludeProof::No,
+                )
+                .map_err(|e| LinkError::query(link.a_to_b.dst_chain().id(), e))?;
+            // TODO - if _next_seq is smaller than the sequence of the first packet in batch, then we should clear
+            let ordered_clear_required = channel_ordering == Ordering::Ordered;
+            if *should_clear_on_start || ordered_clear_required {
                 (true, Some(batch.height))
             } else {
                 (false, None)

if i + 1 == MAX_RETRIES {
error!("{}/{} retries exhausted. giving up", i + 1, MAX_RETRIES)
error!("{}/{} retries exhausted, giving up", i + 1, MAX_RETRIES)
} else {
debug!("{}/{} retries exhausted, retrying with newly-generated operational data", i + 1, MAX_RETRIES);

// If we haven't exhausted all retries, regenerate the op. data & retry
match self.regenerate_operational_data(odata.clone()) {
None => return Ok(S::Reply::empty()), // Nothing to retry
Expand Down
27 changes: 1 addition & 26 deletions tools/integration-test/src/tests/ica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ use std::collections::HashMap;
use std::str::FromStr;

use ibc_relayer::chain::handle::ChainHandle;
use ibc_relayer::chain::tracking::TrackedMsgs;
use ibc_relayer::config::{
filter::{ChannelFilters, ChannelPolicy, FilterPattern},
ChainConfig, PacketFilter,
};
use ibc_relayer::event::IbcEventWithHeight;
use ibc_relayer_types::applications::ics27_ica::msgs::send_tx::MsgSendTx;
use ibc_relayer_types::applications::ics27_ica::packet_data::InterchainAccountPacketData;
use ibc_relayer_types::applications::{
ics27_ica::cosmos_tx::CosmosTx,
Expand All @@ -26,6 +23,7 @@ use ibc_test_framework::prelude::*;
use ibc_test_framework::relayer::channel::{
assert_eventually_channel_established, query_channel_end,
};
use ibc_test_framework::util::interchain_security::interchain_send_tx;

#[test]
fn test_ica_filter_default() -> Result<(), Error> {
Expand Down Expand Up @@ -178,29 +176,6 @@ impl BinaryConnectionTest for IcaFilterTestAllow {
}
}

fn interchain_send_tx<ChainA: ChainHandle>(
chain: &ChainA,
from: &Signer,
connection: &ConnectionId,
msg: InterchainAccountPacketData,
relative_timeout: Timestamp,
) -> Result<Vec<IbcEventWithHeight>, Error> {
let msg = MsgSendTx {
owner: from.clone(),
connection_id: connection.clone(),
packet_data: msg,
relative_timeout,
};

let msg_any = msg.to_any();

let tm = TrackedMsgs::new_static(vec![msg_any], "SendTx");

chain
.send_messages_and_wait_commit(tm)
.map_err(Error::relayer)
}

#[test]
fn test_ica_filter_deny() -> Result<(), Error> {
run_binary_connection_test(&IcaFilterTestDeny)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
//! Verifies the behaviour of ordered channels with Interchain Accounts.
//!
//! In order to ensure that ordered channels correctly clear packets on ICA
//! channels, this test sends some sequential packets with the supervisor enabled,
//! sends the next packet *without* the supervisor enabled, then sends additional
//! packets with the supervisor enabled again. The pending packet that was sent
//! without the supervisor enabled should be relayed in order along with the
//! other packets, as expected of ordered channel behaviour.

use std::str::FromStr;

use ibc_relayer_types::applications::ics27_ica::cosmos_tx::CosmosTx;
use ibc_relayer_types::applications::ics27_ica::packet_data::InterchainAccountPacketData;
use ibc_relayer_types::applications::transfer::msgs::send::MsgSend;
use ibc_relayer_types::applications::transfer::{Amount, Coin};
use ibc_relayer_types::bigint::U256;
use ibc_relayer_types::signer::Signer;
use ibc_relayer_types::timestamp::Timestamp;
use ibc_relayer_types::tx_msg::Msg;
use ibc_test_framework::chain::ext::ica::register_interchain_account;
use ibc_test_framework::framework::binary::channel::run_binary_interchain_security_channel_test;
use ibc_test_framework::prelude::*;
use ibc_test_framework::relayer::channel::assert_eventually_channel_established;
use ibc_test_framework::relayer::channel::query_channel_end;
use ibc_test_framework::util::interchain_security::{
interchain_send_tx, update_genesis_for_consumer_chain, update_relayer_config_for_consumer_chain,
};

#[test]
fn test_ica_ordered_channel() -> Result<(), Error> {
run_binary_interchain_security_channel_test(&IcaOrderedChannelTest)
}

struct IcaOrderedChannelTest;

impl TestOverrides for IcaOrderedChannelTest {
fn modify_genesis_file(&self, genesis: &mut serde_json::Value) -> Result<(), Error> {
use serde_json::Value;

// Allow MsgSend messages over ICA
let allow_messages = genesis
.get_mut("app_state")
.and_then(|app_state| app_state.get_mut("interchainaccounts"))
.and_then(|ica| ica.get_mut("host_genesis_state"))
.and_then(|state| state.get_mut("params"))
.and_then(|params| params.get_mut("allow_messages"))
.and_then(|allow_messages| allow_messages.as_array_mut());

if let Some(allow_messages) = allow_messages {
allow_messages.push(Value::String("/cosmos.bank.v1beta1.MsgSend".to_string()));
} else {
return Err(Error::generic(eyre!("failed to update genesis file")));
}

update_genesis_for_consumer_chain(genesis)?;

Ok(())
}

fn modify_relayer_config(&self, config: &mut Config) {
config.mode.channels.enabled = true;

// Disable packet clearing so that packets sent without the supervisor
// enabled enter a pending state.
config.mode.packets.enabled = true;
config.mode.packets.clear_on_start = false;
config.mode.packets.clear_interval = 0;

update_relayer_config_for_consumer_chain(config);
}

fn should_spawn_supervisor(&self) -> bool {
false
}
}

impl BinaryChannelTest for IcaOrderedChannelTest {
fn run<ChainA: ChainHandle, ChainB: ChainHandle>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove most of the sleeps in the test

&self,
_config: &TestConfig,
relayer: RelayerDriver,
chains: ConnectedChains<ChainA, ChainB>,
channel: ConnectedChannel<ChainA, ChainB>,
) -> Result<(), Error> {
let connection_b_to_a = channel.connection.clone().flip();
let (wallet, channel_id, port_id) =
register_interchain_account(&chains.node_b, chains.handle_b(), &connection_b_to_a)?;

relayer.with_supervisor(|| {
// Check that the corresponding ICA channel is eventually established.
let _counterparty_channel_id = assert_eventually_channel_established(
chains.handle_b(),
chains.handle_a(),
&channel_id.as_ref(),
&port_id.as_ref(),
)?;

Ok(())
})?;

// Assert that the channel returned by `register_interchain_account` is an ordered channel
let channel_end =
query_channel_end(chains.handle_b(), &channel_id.as_ref(), &port_id.as_ref())?;

assert_eq!(channel_end.value().ordering(), &Ordering::Ordered);

// Query the controller chain for the address of the ICA wallet on the host chain.
let ica_address = chains.node_b.chain_driver().query_interchain_account(
&wallet.address(),
&channel.connection.connection_id_b.as_ref(),
)?;

let stake_denom: MonoTagged<ChainA, Denom> = MonoTagged::new(Denom::base("stake"));

chains.node_a.chain_driver().assert_eventual_wallet_amount(
&ica_address.as_ref(),
&stake_denom.with_amount(0u64).as_ref(),
)?;

// Send funds to the interchain account.
let ica_fund = 42000u64;

chains.node_a.chain_driver().local_transfer_token(
&chains.node_a.wallets().user1(),
&ica_address.as_ref(),
&stake_denom.with_amount(ica_fund).as_ref(),
)?;

chains.node_a.chain_driver().assert_eventual_wallet_amount(
&ica_address.as_ref(),
&stake_denom.with_amount(ica_fund).as_ref(),
)?;

let amount = 1200;

let msg = MsgSend {
from_address: ica_address.to_string(),
to_address: chains.node_a.wallets().user2().address().to_string(),
amount: vec![Coin {
denom: stake_denom.to_string(),
amount: Amount(U256::from(amount)),
}],
};

let raw_msg = msg.to_any();

let cosmos_tx = CosmosTx {
messages: vec![raw_msg],
};

let raw_cosmos_tx = cosmos_tx.to_any();

let interchain_account_packet_data = InterchainAccountPacketData::new(raw_cosmos_tx.value);

let signer = Signer::from_str(&wallet.address().to_string()).unwrap();

sleep(Duration::from_secs(5));

relayer.with_supervisor(|| {
sleep(Duration::from_secs(1));

let ica_events = interchain_send_tx(
chains.handle_b(),
&signer,
&channel.connection.connection_id_b.0,
interchain_account_packet_data.clone(),
Timestamp::from_nanoseconds(120000000000).unwrap(),
)?;

info!("First ICA transfer made with supervisor: {ica_events:#?}");

sleep(Duration::from_secs(5));

Ok(())
})?;

sleep(Duration::from_secs(1));

let ica_events = interchain_send_tx(
chains.handle_b(),
&signer,
&channel.connection.connection_id_b.0,
interchain_account_packet_data.clone(),
Timestamp::from_nanoseconds(120000000000).unwrap(),
)?;

info!("Second ICA transfer made without supervisor: {ica_events:#?}");

sleep(Duration::from_secs(5));

relayer.with_supervisor(|| {
sleep(Duration::from_secs(1));

let ica_events = interchain_send_tx(
chains.handle_b(),
&signer,
&channel.connection.connection_id_b.0,
interchain_account_packet_data,
Timestamp::from_nanoseconds(120000000000).unwrap(),
)?;

info!("Third ICA transfer made with supervisor: {ica_events:#?}");

sleep(Duration::from_secs(5));

Ok(())
})?;

// Check that the ICA account's balance has been debited the sent amount.
chains.node_a.chain_driver().assert_eventual_wallet_amount(
&ica_address.as_ref(),
&stake_denom.with_amount(ica_fund - 3 * amount).as_ref(),
)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move this in the supervisor block to insure that the clearing is done, and maybe add an assertion that the amount is correctly received


Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
//! the second chain a Consumer chain.
use std::str::FromStr;

use ibc_relayer::chain::tracking::TrackedMsgs;
use ibc_relayer::event::IbcEventWithHeight;
use ibc_relayer_types::applications::ics27_ica::cosmos_tx::CosmosTx;
use ibc_relayer_types::applications::ics27_ica::msgs::send_tx::MsgSendTx;
use ibc_relayer_types::applications::ics27_ica::packet_data::InterchainAccountPacketData;
use ibc_relayer_types::applications::transfer::msgs::send::MsgSend;
use ibc_relayer_types::applications::transfer::{Amount, Coin};
Expand All @@ -19,7 +16,7 @@ use ibc_test_framework::framework::binary::channel::run_binary_interchain_securi
use ibc_test_framework::prelude::*;
use ibc_test_framework::relayer::channel::assert_eventually_channel_established;
use ibc_test_framework::util::interchain_security::{
update_genesis_for_consumer_chain, update_relayer_config_for_consumer_chain,
interchain_send_tx, update_genesis_for_consumer_chain, update_relayer_config_for_consumer_chain,
};

#[test]
Expand Down Expand Up @@ -151,29 +148,7 @@ impl BinaryChannelTest for InterchainSecurityIcaTransferTest {
&ica_address.as_ref(),
&stake_denom.with_amount(ica_fund - amount).as_ref(),
)?;

Ok(())
}
}

fn interchain_send_tx<ChainA: ChainHandle>(
chain: &ChainA,
from: &Signer,
connection: &ConnectionId,
msg: InterchainAccountPacketData,
relative_timeout: Timestamp,
) -> Result<Vec<IbcEventWithHeight>, Error> {
let msg = MsgSendTx {
owner: from.clone(),
connection_id: connection.clone(),
packet_data: msg,
relative_timeout,
};

let msg_any = msg.to_any();

let tm = TrackedMsgs::new_static(vec![msg_any], "SendTx");

chain
.send_messages_and_wait_commit(tm)
.map_err(Error::relayer)
}
Loading
Loading