Skip to content

Commit

Permalink
Handle expired client errors in workers (#1664)
Browse files Browse the repository at this point in the history
* Improve spawning of supervisor worker tasks

* Add test to reproduce client expiry error

* Trying to make connection worker abort on error

* Properly terminate connection worker when client is expired

* Abort channel worker on client expired error

* New issue found in handling client expiration error in workers

* Fix mock test failure

* Terminate packet worker when client is expired

* Do not retry channel creation if client is expired

* Abort connection and channel worker when handshake is completed

* Use better names for worker tasks

* Improve connection expiration test

* Use better names for worker tasks

* Add integration tests for connection and channel workers

* Fix connection and channel workers

* Fix typo

* Reorder arguments in assert_eventually_succeed

* Make task step runner return Next::Continue/Abort

* Make init_connection/channel return initialized Connection/Channel

* Refactor connection/channel established as assertions

* Found a bug in connection handshake code

* Fix incorrect ordering in restore_from_event

* Automate packet worker

* Log handshake step result as info

* Remove connection expiration test

The same test is now within the channel expiration test

* Move client_expiration tests to non-manual

* Try to tame misbehavior task error on expiration

* Make handshake_step return task::Next instead of bool

* Update comment instruction for running expiration tests

* Slightly improve misbehavior task and add failure test

* Slightly simplify misbehavior expiration test

* Add changelog

* Abort connection/channel worker if counterparty state is already Open
  • Loading branch information
soareschen authored Dec 23, 2021
1 parent 080d769 commit 03d4716
Show file tree
Hide file tree
Showing 43 changed files with 1,213 additions and 366 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Handle expired client errors in workers ([#1543](https://github.com/informalsystems/ibc-rs/issues/1543))
2 changes: 1 addition & 1 deletion modules/src/clients/ics07_tendermint/client_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct ClientState {
pub proof_specs: ProofSpecs,
pub upgrade_path: Vec<String>,
pub allow_update: AllowUpdate,
frozen_height: Option<Height>,
pub frozen_height: Option<Height>,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion relayer-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ fn spawn_telemetry_server(
Ok(())
}

fn make_supervisor<Chain: ChainHandle + 'static>(
fn make_supervisor<Chain: ChainHandle>(
config: Arc<RwLock<Config>>,
) -> Result<SupervisorHandle, Box<dyn Error + Send + Sync>> {
let registry = SharedRegistry::<Chain>::new(config.clone());
Expand Down
2 changes: 1 addition & 1 deletion relayer/src/chain/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ pub enum ChainRequest {
},
}

pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug {
pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug + 'static {
fn new(chain_id: ChainId, sender: channel::Sender<ChainRequest>) -> Self;

/// Get the [`ChainId`] of this chain.
Expand Down
11 changes: 8 additions & 3 deletions relayer/src/chain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,16 @@ impl ChainEndpoint for MockChain {
/// Performs a query to retrieve the identifiers of all connections.
fn query_consensus_state(
&self,
_client_id: ClientId,
_consensus_height: Height,
client_id: ClientId,
consensus_height: Height,
_query_height: Height,
) -> Result<AnyConsensusState, Error> {
unimplemented!()
let consensus_states = self.context.consensus_states(&client_id);
Ok(consensus_states
.into_iter()
.find(|s| s.height == consensus_height)
.unwrap()
.consensus_state)
}

fn query_upgraded_consensus_state(
Expand Down
98 changes: 67 additions & 31 deletions relayer/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ use crate::chain::counterparty::{channel_connection_client, channel_state_on_des
use crate::chain::handle::ChainHandle;
use crate::channel::version::ResolveContext;
use crate::connection::Connection;
use crate::foreign_client::ForeignClient;
use crate::foreign_client::{ForeignClient, HasExpiredOrFrozenError};
use crate::object::Channel as WorkerChannelObject;
use crate::supervisor::error::Error as SupervisorError;
use crate::util::retry::retry_with_index;
use crate::util::retry::RetryResult;
use crate::util::retry::{retry_count, RetryResult};
use crate::util::task::Next;

pub mod error;
mod version;
Expand Down Expand Up @@ -418,10 +419,18 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {

fn do_chan_open_try_and_send_with_retry(&mut self) -> Result<(), ChannelError> {
retry_with_index(retry_strategy::default(), |_| {
self.do_chan_open_try_and_send()
if let Err(e) = self.do_chan_open_try_and_send() {
if e.is_expired_or_frozen_error() {
RetryResult::Err(e)
} else {
RetryResult::Retry(e)
}
} else {
RetryResult::Ok(())
}
})
.map_err(|err| {
error!("failed to open channel after {} retries", err);
error!("failed to open channel after {} retries", retry_count(&err));

from_retry_error(
err,
Expand Down Expand Up @@ -584,15 +593,24 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {
///
/// Post-condition: the channel state is `Open` on both ends if successful.
fn do_chan_open_finalize_with_retry(&self) -> Result<(), ChannelError> {
retry_with_index(retry_strategy::default(), |_| self.do_chan_open_finalize()).map_err(
|err| {
error!("failed to open channel after {} retries", err);
from_retry_error(
err,
format!("Failed to finish channel handshake for {:?}", self),
)
},
)?;
retry_with_index(retry_strategy::default(), |_| {
if let Err(e) = self.do_chan_open_finalize() {
if e.is_expired_or_frozen_error() {
RetryResult::Err(e)
} else {
RetryResult::Retry(e)
}
} else {
RetryResult::Ok(())
}
})
.map_err(|err| {
error!("failed to open channel after {} retries", err);
from_retry_error(
err,
format!("Failed to finish channel handshake for {:?}", self),
)
})?;

Ok(())
}
Expand Down Expand Up @@ -622,33 +640,51 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {
.map_err(|e| ChannelError::query_channel(channel_id.clone(), e))
}

pub fn handshake_step(&mut self, state: State) -> Result<Vec<IbcEvent>, ChannelError> {
match (state, self.counterparty_state()?) {
(State::Init, State::Uninitialized) => Ok(vec![self.build_chan_open_try_and_send()?]),
(State::Init, State::Init) => Ok(vec![self.build_chan_open_try_and_send()?]),
(State::TryOpen, State::Init) => Ok(vec![self.build_chan_open_ack_and_send()?]),
(State::TryOpen, State::TryOpen) => Ok(vec![self.build_chan_open_ack_and_send()?]),
(State::Open, State::TryOpen) => Ok(vec![self.build_chan_open_confirm_and_send()?]),
_ => Ok(vec![]),
}
}
pub fn handshake_step(
&mut self,
state: State,
) -> Result<(Option<IbcEvent>, Next), ChannelError> {
let res = match (state, self.counterparty_state()?) {
(State::Init, State::Uninitialized) => Some(self.build_chan_open_try_and_send()?),
(State::Init, State::Init) => Some(self.build_chan_open_try_and_send()?),
(State::TryOpen, State::Init) => Some(self.build_chan_open_ack_and_send()?),
(State::TryOpen, State::TryOpen) => Some(self.build_chan_open_ack_and_send()?),
(State::Open, State::TryOpen) => Some(self.build_chan_open_confirm_and_send()?),
(State::Open, State::Open) => return Ok((None, Next::Abort)),

// If the counterparty state is already Open but current state is TryOpen,
// return anyway as the final step is to be done by the counterparty worker.
(State::TryOpen, State::Open) => return Ok((None, Next::Abort)),

_ => None,
};

pub fn step_state(&mut self, state: State, index: u64) -> RetryResult<(), u64> {
let done = '🥳';
Ok((res, Next::Continue))
}

pub fn step_state(&mut self, state: State, index: u64) -> RetryResult<Next, u64> {
match self.handshake_step(state) {
Err(e) => {
error!("Failed Chan{:?} with error: {}", state, e);
RetryResult::Retry(index)
if e.is_expired_or_frozen_error() {
error!(
"failed to establish channel handshake on frozen client: {}",
e
);
RetryResult::Err(index)
} else {
error!("Failed Chan{:?} with error: {}", state, e);
RetryResult::Retry(index)
}
}
Ok(ev) => {
debug!("{} => {:#?}\n", done, ev);
RetryResult::Ok(())
Ok((Some(ev), handshake_completed)) => {
info!("channel handshake step completed with events: {:#?}\n", ev);
RetryResult::Ok(handshake_completed)
}
Ok((None, handshake_completed)) => RetryResult::Ok(handshake_completed),
}
}

pub fn step_event(&mut self, event: IbcEvent, index: u64) -> RetryResult<(), u64> {
pub fn step_event(&mut self, event: IbcEvent, index: u64) -> RetryResult<Next, u64> {
let state = match event {
IbcEvent::OpenInitChannel(_) => State::Init,
IbcEvent::OpenTryChannel(_) => State::TryOpen,
Expand Down
17 changes: 16 additions & 1 deletion relayer/src/channel/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ibc::core::ics24_host::identifier::{ChainId, ChannelId, ClientId, PortChanne
use ibc::events::IbcEvent;

use crate::error::Error;
use crate::foreign_client::ForeignClientError;
use crate::foreign_client::{ForeignClientError, HasExpiredOrFrozenError};
use crate::supervisor::Error as SupervisorError;

define_error! {
Expand Down Expand Up @@ -206,3 +206,18 @@ define_error! {

}
}

impl HasExpiredOrFrozenError for ChannelErrorDetail {
fn is_expired_or_frozen_error(&self) -> bool {
match self {
Self::ClientOperation(e) => e.source.is_expired_or_frozen_error(),
_ => false,
}
}
}

impl HasExpiredOrFrozenError for ChannelError {
fn is_expired_or_frozen_error(&self) -> bool {
self.detail().is_expired_or_frozen_error()
}
}
78 changes: 57 additions & 21 deletions relayer/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use flex_error::define_error;
use ibc_proto::ibc::core::connection::v1::QueryConnectionsRequest;
use prost_types::Any;
use serde::Serialize;
use tracing::debug;
use tracing::{error, warn};
use tracing::{error, info, warn};

use ibc::core::ics02_client::height::Height;
use ibc::core::ics03_connection::connection::{
Expand All @@ -24,9 +23,10 @@ use ibc::tx_msg::Msg;

use crate::chain::handle::ChainHandle;
use crate::error::Error as RelayerError;
use crate::foreign_client::{ForeignClient, ForeignClientError};
use crate::foreign_client::{ForeignClient, ForeignClientError, HasExpiredOrFrozenError};
use crate::object::Connection as WorkerConnectionObject;
use crate::supervisor::Error as SupervisorError;
use crate::util::task::Next;

/// Maximum value allowed for packet delay on any new connection that the relayer establishes.
pub const MAX_PACKET_DELAY: Duration = Duration::from_secs(120);
Expand Down Expand Up @@ -190,6 +190,21 @@ define_error! {
}
}

impl HasExpiredOrFrozenError for ConnectionErrorDetail {
fn is_expired_or_frozen_error(&self) -> bool {
match self {
Self::ClientOperation(e) => e.source.is_expired_or_frozen_error(),
_ => false,
}
}
}

impl HasExpiredOrFrozenError for ConnectionError {
fn is_expired_or_frozen_error(&self) -> bool {
self.detail().is_expired_or_frozen_error()
}
}

#[derive(Clone, Debug)]
pub struct ConnectionSide<Chain: ChainHandle> {
pub(crate) chain: Chain,
Expand Down Expand Up @@ -606,33 +621,54 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {
.map_err(ConnectionError::supervisor)
}

pub fn handshake_step(&mut self, state: State) -> Result<Vec<IbcEvent>, ConnectionError> {
match (state, self.counterparty_state()?) {
(State::Init, State::Uninitialized) => Ok(vec![self.build_conn_try_and_send()?]),
(State::Init, State::Init) => Ok(vec![self.build_conn_try_and_send()?]),
(State::TryOpen, State::Init) => Ok(vec![self.build_conn_ack_and_send()?]),
(State::TryOpen, State::TryOpen) => Ok(vec![self.build_conn_ack_and_send()?]),
(State::Open, State::TryOpen) => Ok(vec![self.build_conn_confirm_and_send()?]),
_ => Ok(vec![]),
}
}
pub fn handshake_step(
&mut self,
state: State,
) -> Result<(Option<IbcEvent>, Next), ConnectionError> {
let event = match (state, self.counterparty_state()?) {
(State::Init, State::Uninitialized) => Some(self.build_conn_try_and_send()?),
(State::Init, State::Init) => Some(self.build_conn_try_and_send()?),
(State::TryOpen, State::Init) => Some(self.build_conn_ack_and_send()?),
(State::TryOpen, State::TryOpen) => Some(self.build_conn_ack_and_send()?),
(State::Open, State::TryOpen) => Some(self.build_conn_confirm_and_send()?),
(State::Open, State::Open) => return Ok((None, Next::Abort)),

// If the counterparty state is already Open but current state is TryOpen,
// return anyway as the final step is to be done by the counterparty worker.
(State::TryOpen, State::Open) => return Ok((None, Next::Abort)),

_ => None,
};

pub fn step_state(&mut self, state: State, index: u64) -> RetryResult<(), u64> {
let done = '🥳';
Ok((event, Next::Continue))
}

pub fn step_state(&mut self, state: State, index: u64) -> RetryResult<Next, u64> {
match self.handshake_step(state) {
Err(e) => {
error!("failed {:?} with error {}", state, e);
RetryResult::Retry(index)
if e.is_expired_or_frozen_error() {
error!(
"failed to establish connection handshake on frozen client: {}",
e
);
RetryResult::Err(index)
} else {
error!("failed {:?} with error {}", state, e);
RetryResult::Retry(index)
}
}
Ok(ev) => {
debug!("{} => {:#?}\n", done, ev);
RetryResult::Ok(())
Ok((Some(ev), handshake_completed)) => {
info!(
"connection handshake step completed with events: {:#?}\n",
ev
);
RetryResult::Ok(handshake_completed)
}
Ok((None, handshake_completed)) => RetryResult::Ok(handshake_completed),
}
}

pub fn step_event(&mut self, event: IbcEvent, index: u64) -> RetryResult<(), u64> {
pub fn step_event(&mut self, event: IbcEvent, index: u64) -> RetryResult<Next, u64> {
let state = match event {
IbcEvent::OpenInitConnection(_) => State::Init,
IbcEvent::OpenTryConnection(_) => State::TryOpen,
Expand Down
Loading

0 comments on commit 03d4716

Please sign in to comment.