Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ordered channels more resilient in the face of failing packets #3610

Merged
merged 48 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
f0281a8
Start scaffolding ica_ordered_channel test
seanchen1991 Sep 1, 2023
92c2ec7
Disable packet clearing
seanchen1991 Sep 8, 2023
27f762a
Add ica_ordered_channel test
seanchen1991 Sep 13, 2023
d54a891
Merge branch 'master' of https://github.com/informalsystems/hermes in…
seanchen1991 Sep 13, 2023
54abddf
Move some imports around
seanchen1991 Sep 13, 2023
7eefadc
Clean up imports
seanchen1991 Sep 14, 2023
bc16d41
Add sleep calls in between supervisor runs
seanchen1991 Sep 18, 2023
94f7f83
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Sep 18, 2023
083b81a
Formatting
romac Oct 2, 2023
f649204
Fix compilation issues
romac Oct 2, 2023
98c89b6
Merge branch 'master' into sean/ordered-channels-resilience
romac Oct 12, 2023
3facd4e
Merge branch 'master' of https://github.com/informalsystems/hermes in…
seanchen1991 Oct 16, 2023
08b479d
Emphasize wording in documentation
seanchen1991 Oct 17, 2023
1781c20
Merge branch 'sean/ordered-channels-resilience' of https://github.com…
seanchen1991 Oct 17, 2023
ed173a2
Merge branch 'master' of https://github.com/informalsystems/hermes in…
seanchen1991 Oct 30, 2023
e1d48d2
Fill in code from discussion
seanchen1991 Oct 30, 2023
7480c1f
Rename TrakingId::ClearId to TrackingId::PacketClearing
seanchen1991 Nov 2, 2023
6fc48b4
Compile ica ordered channel test under the ica feature flag
seanchen1991 Nov 2, 2023
53df768
Cargo fmt
seanchen1991 Nov 2, 2023
0538481
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Nov 2, 2023
7d9d367
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Nov 8, 2023
25f0974
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Nov 9, 2023
f927d0f
Move interchain_send_tx fn to test-framework crate
seanchen1991 Nov 10, 2023
9eeb84e
Merge branch 'sean/ordered-channels-resilience' of https://github.com…
seanchen1991 Nov 10, 2023
3cd60e2
Cargo fmt
seanchen1991 Nov 10, 2023
fca4417
Update relayer config for consumer chain
seanchen1991 Nov 13, 2023
85466b0
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Nov 14, 2023
ccf329f
Move ica_ordered_channel test under the ica feature
seanchen1991 Nov 14, 2023
6f9f691
Merge branch 'sean/ordered-channels-resilience' of https://github.com…
seanchen1991 Nov 14, 2023
112c4f8
Move ica_transfer test under ica feature
seanchen1991 Nov 14, 2023
1dbe79c
Check that ICA channel is eventually established using the supervisor
seanchen1991 Nov 14, 2023
fdc352b
Fix clippy warnings
seanchen1991 Nov 14, 2023
50379f9
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Nov 28, 2023
a7836e1
Merge branch 'master' into sean/ordered-channels-resilience
romac Jan 5, 2024
cc81934
Improve logs
romac Jan 12, 2024
7cbad88
Merge branch 'master' into sean/ordered-channels-resilience
romac Jan 12, 2024
19d1065
Add changelog entry
romac Jan 12, 2024
8e85859
Merge branch 'master' into sean/ordered-channels-resilience
seanchen1991 Feb 15, 2024
107a175
Merge branch 'master' into sean/ordered-channels-resilience
romac Feb 20, 2024
a6506bf
Fix compilation of ICA tests
romac Feb 27, 2024
11abdd7
Add `force_disable_clear_on_start` config option, only available in t…
romac Feb 27, 2024
7acbfa4
Cleanup
romac Feb 27, 2024
3a5acfc
Check whether packet clear is needed instead of reacting to error whe…
romac Feb 27, 2024
a364e6e
Merge branch 'master' into sean/ordered-channels-resilience
romac Feb 27, 2024
b2c7fea
Force disable clear on start in ICA ordered channel test
romac Feb 27, 2024
92fe20e
Update changelog entry
romac Feb 27, 2024
00745da
Improve ICA ordered channel test asserts
ljoss17 Feb 28, 2024
dd579ec
Merge branch 'master' into sean/ordered-channels-resilience
ljoss17 Feb 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- Improve resilience when relaying on ordered channels.
When relaying packets on an ordered channel, Hermes will now attempt
to detect whether the next message to send has the sequence number
expected on that channel. If there is a mismatch, then Hermes will trigger a packet
clear on the channel to unblock it before resuming operations on that channel.
([\#3540](https://github.com/informalsystems/hermes/issues/3540))
13 changes: 9 additions & 4 deletions crates/relayer/src/chain/tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum TrackingId {
/// the CLI or during packet clearing.
Static(&'static str),
/// Random identifier used to track latency of packet clearing.
ClearedUuid(Uuid),
PacketClearing(Uuid),
}

impl TrackingId {
Expand All @@ -29,8 +29,13 @@ impl TrackingId {
Self::Static(s)
}

pub fn new_cleared_uuid() -> Self {
Self::ClearedUuid(Uuid::new_v4())
pub fn new_packet_clearing() -> Self {
Self::PacketClearing(Uuid::new_v4())
}

/// Indicates whether a packet clearing process is currently in-progress.
pub fn is_clearing(&self) -> bool {
matches!(self, Self::PacketClearing(_))
}
}

Expand All @@ -43,7 +48,7 @@ impl Display for TrackingId {
s.fmt(f)
}
TrackingId::Static(s) => s.fmt(f),
TrackingId::ClearedUuid(u) => {
TrackingId::PacketClearing(u) => {
let mut uuid = "cleared/".to_owned();
let mut s = u.to_string();
s.truncate(8);
Expand Down
4 changes: 4 additions & 0 deletions crates/relayer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ pub struct Packets {
pub ics20_max_memo_size: Ics20FieldSizeLimit,
#[serde(default = "default::ics20_max_receiver_size")]
pub ics20_max_receiver_size: Ics20FieldSizeLimit,

#[serde(skip)]
pub force_disable_clear_on_start: bool,
}

impl Default for Packets {
Expand All @@ -426,6 +429,7 @@ impl Default for Packets {
auto_register_counterparty_payee: default::auto_register_counterparty_payee(),
ics20_max_memo_size: default::ics20_max_memo_size(),
ics20_max_receiver_size: default::ics20_max_receiver_size(),
force_disable_clear_on_start: false,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/relayer/src/link/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ define_error! {
Send
{ event: IbcEvent }
|e| {
format!("chain error when sending messages: {0}", e.event)
format!("failed to send message: {0}", e.event)
},

MissingChannelId
Expand Down
10 changes: 5 additions & 5 deletions crates/relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
fn relay_pending_packets(&self, height: Option<Height>) -> Result<(), LinkError> {
let _span = span!(Level::ERROR, "relay_pending_packets", ?height).entered();

let tracking_id = TrackingId::new_cleared_uuid();
let tracking_id = TrackingId::new_packet_clearing();
telemetry!(received_event_batch, tracking_id);

let src_config = self.src_chain().config().map_err(LinkError::relayer)?;
Expand Down Expand Up @@ -710,12 +710,12 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {

return Ok(reply);
}
Err(LinkError(error::LinkErrorDetail::Send(e), _)) => {
// This error means we could retry
error!("error {}", e.event);
Err(LinkError(error::LinkErrorDetail::Send(_), _)) => {
if i + 1 == MAX_RETRIES {
error!("{}/{} retries exhausted. giving up", i + 1, MAX_RETRIES)
error!("{}/{} retries exhausted, giving up", i + 1, MAX_RETRIES)
} else {
debug!("{}/{} retries exhausted, retrying with newly-generated operational data", i + 1, MAX_RETRIES);

// If we haven't exhausted all retries, regenerate the op. data & retry
match self.regenerate_operational_data(odata.clone()) {
None => return Ok(S::Reply::empty()), // Nothing to retry
Expand Down
5 changes: 3 additions & 2 deletions crates/relayer/src/link/relay_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ impl Submit for AsyncSender {
type Reply = AsyncReply;

fn submit(target: &impl ChainHandle, msgs: TrackedMsgs) -> Result<Self::Reply, LinkError> {
let a = target
let responses = target
.send_messages_and_wait_check_tx(msgs)
.map_err(LinkError::relayer)?;
let reply = AsyncReply { responses: a };

let reply = AsyncReply { responses };

// Note: There may be errors in the reply, for example:
// `Response { code: Err(11), data: Data([]), log: Log("Too much gas wanted: 35000000, maximum is 25000000: out of gas")`
Expand Down
10 changes: 9 additions & 1 deletion crates/relayer/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub fn spawn_worker_tasks<ChainA: ChainHandle, ChainB: ChainHandle>(
Ok(link) => {
let channel_ordering = link.a_to_b.channel().ordering;
let should_clear_on_start =
packets_config.clear_on_start || channel_ordering == Ordering::Ordered;
should_clear_on_start(&packets_config, channel_ordering);

let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded();
let link = Arc::new(Mutex::new(link));
Expand Down Expand Up @@ -217,3 +217,11 @@ pub fn spawn_worker_tasks<ChainA: ChainHandle, ChainB: ChainHandle>(

WorkerHandle::new(id, object, data, cmd_tx, task_handles)
}

fn should_clear_on_start(config: &crate::config::Packets, channel_ordering: Ordering) -> bool {
if config.force_disable_clear_on_start {
false
} else {
config.clear_on_start || channel_ordering == Ordering::Ordered
}
}
49 changes: 49 additions & 0 deletions crates/relayer/src/worker/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ use ibc_proto::ibc::apps::fee::v1::{IdentifiedPacketFees, QueryIncentivizedPacke
use ibc_proto::ibc::core::channel::v1::PacketId;
use ibc_relayer_types::applications::ics29_fee::events::IncentivizedPacket;
use ibc_relayer_types::applications::transfer::{Amount, Coin, RawCoin};
use ibc_relayer_types::core::ics04_channel::channel::Ordering;
use ibc_relayer_types::core::ics04_channel::events::WriteAcknowledgement;
use ibc_relayer_types::core::ics04_channel::packet::Sequence;
use ibc_relayer_types::events::{IbcEvent, IbcEventType};
use ibc_relayer_types::Height;

use crate::chain::handle::ChainHandle;
use crate::chain::requests::QueryHeight;
use crate::config::filter::FeePolicy;
use crate::event::source::EventBatch;
use crate::event::IbcEventWithHeight;
use crate::foreign_client::HasExpiredOrFrozenError;
use crate::link::Resubmit;
use crate::link::{error::LinkError, Link};
Expand Down Expand Up @@ -204,6 +207,24 @@ fn handle_packet_cmd<ChainA: ChainHandle, ChainB: ChainHandle>(
) -> Result<(), TaskError<RunError>> {
// Handle packet clearing which is triggered from a command
let (do_clear, maybe_height) = match &cmd {
WorkerCmd::IbcEvents { batch } if link.a_to_b.channel().ordering == Ordering::Ordered => {
let lowest_sequence = lowest_sequence(&batch.events);

let next_sequence = query_next_sequence_receive(
link.a_to_b.dst_chain(),
link.a_to_b.dst_port_id(),
link.a_to_b.dst_channel_id(),
QueryHeight::Specific(batch.height),
)
.ok();

if *should_clear_on_start || next_sequence < lowest_sequence {
(true, Some(batch.height))
} else {
(false, None)
}
}

WorkerCmd::IbcEvents { batch } => {
if *should_clear_on_start {
(true, Some(batch.height))
Expand Down Expand Up @@ -439,6 +460,34 @@ fn handle_execute_schedule<ChainA: ChainHandle, ChainB: ChainHandle>(
Ok(())
}

fn query_next_sequence_receive<Chain: ChainHandle>(
chain: &Chain,
port_id: &PortId,
channel_id: &ChannelId,
height: QueryHeight,
) -> Result<Sequence, LinkError> {
use crate::chain::requests::{IncludeProof, QueryNextSequenceReceiveRequest};

chain
.query_next_sequence_receive(
QueryNextSequenceReceiveRequest {
port_id: port_id.clone(),
channel_id: channel_id.clone(),
height,
},
IncludeProof::No,
)
.map(|(seq, _height)| seq)
.map_err(|e| LinkError::query(chain.id(), e))
}

fn lowest_sequence(events: &[IbcEventWithHeight]) -> Option<Sequence> {
events
.iter()
.flat_map(|event| event.event.packet().map(|p| p.sequence))
.min()
}

#[cfg(feature = "telemetry")]
use crate::link::RelaySummary;

Expand Down
28 changes: 2 additions & 26 deletions tools/integration-test/src/tests/ica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ use std::collections::HashMap;
use std::str::FromStr;

use ibc_relayer::chain::handle::ChainHandle;
use ibc_relayer::chain::tracking::TrackedMsgs;
use ibc_relayer::config::{
filter::{ChannelFilters, ChannelPolicy, FilterPattern},
ChainConfig, PacketFilter,
};
use ibc_relayer::event::IbcEventWithHeight;
use ibc_relayer_types::applications::ics27_ica::msgs::send_tx::MsgSendTx;
use ibc_relayer_types::applications::ics27_ica::packet_data::InterchainAccountPacketData;
use ibc_relayer_types::applications::{
ics27_ica::cosmos_tx::CosmosTx,
Expand All @@ -26,6 +23,7 @@ use ibc_test_framework::prelude::*;
use ibc_test_framework::relayer::channel::{
assert_eventually_channel_closed, assert_eventually_channel_established, query_channel_end,
};
use ibc_test_framework::util::interchain_security::interchain_send_tx;

#[test]
fn test_ica_filter_default() -> Result<(), Error> {
Expand Down Expand Up @@ -187,6 +185,7 @@ impl BinaryConnectionTest for IcaFilterTestAllow {
Ok(())
}
}

pub struct IcaFilterTestDeny;

impl TestOverrides for IcaFilterTestDeny {
Expand Down Expand Up @@ -373,26 +372,3 @@ impl BinaryConnectionTest for ICACloseChannelTest {
})
}
}

fn interchain_send_tx<ChainA: ChainHandle>(
chain: &ChainA,
from: &Signer,
connection: &ConnectionId,
msg: InterchainAccountPacketData,
relative_timeout: Timestamp,
) -> Result<Vec<IbcEventWithHeight>, Error> {
let msg = MsgSendTx {
owner: from.clone(),
connection_id: connection.clone(),
packet_data: msg,
relative_timeout,
};

let msg_any = msg.to_any();

let tm = TrackedMsgs::new_static(vec![msg_any], "SendTx");

chain
.send_messages_and_wait_commit(tm)
.map_err(Error::relayer)
}
Loading
Loading