diff --git a/relayer-cli/src/commands/tx/transfer.rs b/relayer-cli/src/commands/tx/transfer.rs index 40a44006a8..973a87b800 100644 --- a/relayer-cli/src/commands/tx/transfer.rs +++ b/relayer-cli/src/commands/tx/transfer.rs @@ -1,6 +1,7 @@ use abscissa_core::clap::Parser; use abscissa_core::{config::Override, Command, FrameworkErrorKind, Runnable}; +use core::time::Duration; use ibc::{ core::{ ics02_client::client_state::ClientState, @@ -141,7 +142,7 @@ impl TxIcs20MsgTransferCmd { denom, receiver: self.receiver.clone(), timeout_height_offset: self.timeout_height_offset, - timeout_seconds: core::time::Duration::from_secs(self.timeout_seconds), + timeout_duration: Duration::from_secs(self.timeout_seconds), number_msgs, }; @@ -226,7 +227,7 @@ impl Runnable for TxIcs20MsgTransferCmd { // Checks pass, build and send the tx let res: Result, Error> = build_and_send_transfer_messages(&chains.src, &chains.dst, &opts) - .map_err(Error::packet); + .map_err(Error::transfer); match res { Ok(ev) => Output::success(ev).exit(), diff --git a/relayer-cli/src/error.rs b/relayer-cli/src/error.rs index fdec0e4287..8565a1f7b8 100644 --- a/relayer-cli/src/error.rs +++ b/relayer-cli/src/error.rs @@ -7,7 +7,7 @@ use ibc_relayer::error::Error as RelayerError; use ibc_relayer::foreign_client::ForeignClientError; use ibc_relayer::link::error::LinkError; use ibc_relayer::supervisor::Error as SupervisorError; -use ibc_relayer::transfer::PacketError; +use ibc_relayer::transfer::TransferError; use ibc_relayer::upgrade_chain::UpgradeChainError; use tendermint::Error as TendermintError; @@ -69,9 +69,9 @@ define_error! { [ ConnectionError ] |_| { "connection error" }, - Packet - [ PacketError ] - |_| { "packet error" }, + Transfer + [ TransferError ] + |_| { "transfer error" }, Channel [ ChannelError ] diff --git a/relayer/src/chain.rs b/relayer/src/chain.rs index 2b7237396b..f94a45fa77 100644 --- a/relayer/src/chain.rs +++ b/relayer/src/chain.rs @@ -67,7 +67,7 @@ pub enum HealthCheck { /// The result of a chain status query. #[derive(Clone, Debug)] -pub struct StatusResponse { +pub struct ChainStatus { pub height: ICSHeight, pub timestamp: Timestamp, } @@ -160,7 +160,7 @@ pub trait ChainEndpoint: Sized { } /// Query the latest height and timestamp the chain is at - fn query_status(&self) -> Result; + fn query_status(&self) -> Result; /// Performs a query to retrieve the state of all clients that a chain hosts. fn query_clients( diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index ba9ff59da7..2e16815ff5 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -1,45 +1,35 @@ use alloc::sync::Arc; use core::{ - cmp::min, convert::{TryFrom, TryInto}, future::Future, str::FromStr, time::Duration, }; use num_bigint::BigInt; -use num_rational::BigRational; -use std::{fmt, thread, time::Instant}; +use std::thread; -use bech32::{ToBase32, Variant}; use bitcoin::hashes::hex::ToHex; -use ibc_proto::google::protobuf::Any; -use itertools::Itertools; -use tendermint::abci::{Code, Event, Path as TendermintABCIPath}; -use tendermint::account::Id as AccountId; use tendermint::block::Height; -use tendermint::node::info::TxIndexStatus; +use tendermint::{ + abci::{Event, Path as TendermintABCIPath}, + node::info::TxIndexStatus, +}; use tendermint_light_client_verifier::types::LightBlock as TMLightBlock; use tendermint_proto::Protobuf; -use tendermint_rpc::endpoint::tx::Response as ResultTx; -use tendermint_rpc::query::Query; -use tendermint_rpc::Url; use tendermint_rpc::{ endpoint::broadcast::tx_sync::Response, endpoint::status, Client, HttpClient, Order, }; use tokio::runtime::Runtime as TokioRuntime; use tonic::codegen::http::Uri; -use tracing::{debug, error, info, span, trace, warn, Level}; +use tracing::{span, warn, Level}; use ibc::clients::ics07_tendermint::client_state::{AllowUpdate, ClientState}; use ibc::clients::ics07_tendermint::consensus_state::ConsensusState as TMConsensusState; use ibc::clients::ics07_tendermint::header::Header as TmHeader; -use ibc::core::ics02_client::client_consensus::{ - AnyConsensusState, AnyConsensusStateWithHeight, QueryClientEventRequest, -}; +use ibc::core::ics02_client::client_consensus::{AnyConsensusState, AnyConsensusStateWithHeight}; use ibc::core::ics02_client::client_state::{AnyClientState, IdentifiedAnyClientState}; use ibc::core::ics02_client::client_type::ClientType; use ibc::core::ics02_client::error::Error as ClientError; -use ibc::core::ics02_client::events as ClientEvents; use ibc::core::ics03_connection::connection::{ConnectionEnd, IdentifiedConnectionEnd}; use ibc::core::ics04_channel::channel::{ ChannelEnd, IdentifiedChannelEnd, QueryPacketEventDataRequest, @@ -47,23 +37,18 @@ use ibc::core::ics04_channel::channel::{ use ibc::core::ics04_channel::events as ChannelEvents; use ibc::core::ics04_channel::packet::{Packet, PacketMsgType, Sequence}; use ibc::core::ics23_commitment::commitment::CommitmentPrefix; -use ibc::core::ics23_commitment::merkle::convert_tm_to_ics_merkle_proof; use ibc::core::ics24_host::identifier::{ChainId, ChannelId, ClientId, ConnectionId, PortId}; +use ibc::core::ics24_host::path::{ + AcksPath, ChannelEndsPath, ClientConsensusStatePath, ClientStatePath, CommitmentsPath, + ConnectionsPath, ReceiptsPath, SeqRecvsPath, +}; use ibc::core::ics24_host::{ClientUpgradePath, Path, IBC_QUERY_PATH, SDK_UPGRADE_QUERY_PATH}; -use ibc::events::{from_tx_response_event, IbcEvent}; +use ibc::events::IbcEvent; use ibc::query::QueryBlockRequest; -use ibc::query::{QueryTxHash, QueryTxRequest}; +use ibc::query::QueryTxRequest; use ibc::signer::Signer; use ibc::Height as ICSHeight; -use ibc_proto::cosmos::base::tendermint::v1beta1::service_client::ServiceClient; -use ibc_proto::cosmos::base::tendermint::v1beta1::GetNodeInfoRequest; -use ibc_proto::cosmos::base::v1beta1::Coin; use ibc_proto::cosmos::staking::v1beta1::Params as StakingParams; -use ibc_proto::cosmos::tx::v1beta1::mode_info::{Single, Sum}; -use ibc_proto::cosmos::tx::v1beta1::{ - AuthInfo, Fee, ModeInfo, SignDoc, SignerInfo, SimulateRequest, SimulateResponse, Tx, TxBody, - TxRaw, -}; use ibc_proto::ibc::core::channel::v1::{ PacketState, QueryChannelClientStateRequest, QueryChannelsRequest, QueryConnectionChannelsRequest, QueryNextSequenceReceiveRequest, @@ -76,65 +61,45 @@ use ibc_proto::ibc::core::connection::v1::{ QueryClientConnectionsRequest, QueryConnectionsRequest, }; -use crate::config::types::Memo; -use crate::config::{AddressType, ChainConfig, GasPrice}; +use crate::chain::client::ClientSettings; +use crate::chain::cosmos::batch::{ + send_batched_messages_and_wait_check_tx, send_batched_messages_and_wait_commit, +}; +use crate::chain::cosmos::encode::encode_to_bech32; +use crate::chain::cosmos::gas::{calculate_fee, mul_ceil}; +use crate::chain::cosmos::query::account::get_or_fetch_account; +use crate::chain::cosmos::query::status::query_status; +use crate::chain::cosmos::query::tx::query_txs; +use crate::chain::cosmos::query::{abci_query, fetch_version_specs, packet_query}; +use crate::chain::cosmos::types::account::Account; +use crate::chain::cosmos::types::gas::{default_gas_from_config, max_gas_from_config}; +use crate::chain::tx::TrackedMsgs; +use crate::chain::{ChainEndpoint, HealthCheck}; +use crate::chain::{ChainStatus, QueryResponse}; +use crate::config::ChainConfig; use crate::error::Error; use crate::event::monitor::{EventMonitor, EventReceiver, TxMonitorCmd}; use crate::keyring::{KeyEntry, KeyRing}; use crate::light_client::tendermint::LightClient as TmLightClient; use crate::light_client::{LightClient, Verified}; -use crate::sdk_error::sdk_error_from_tx_sync_error_code; -use crate::util::retry::{retry_with_index, RetryResult}; - -use ibc::core::ics24_host::path::{ - AcksPath, ChannelEndsPath, ClientConsensusStatePath, ClientStatePath, CommitmentsPath, - ConnectionsPath, ReceiptsPath, SeqRecvsPath, -}; -use self::account::{query_account, Account, AccountNumber, AccountSequence}; -use super::client::ClientSettings; -use super::tx::TrackedMsgs; -use super::{ChainEndpoint, HealthCheck, QueryResponse, StatusResponse}; - -pub mod account; +pub mod batch; pub mod client; pub mod compatibility; +pub mod encode; +pub mod estimate; +pub mod gas; +pub mod query; +pub mod retry; +pub mod simulate; +pub mod tx; +pub mod types; pub mod version; +pub mod wait; -/// Default gas limit when submitting a transaction. -const DEFAULT_MAX_GAS: u64 = 400_000; - -/// Fraction of the estimated gas to add to the estimated gas amount when submitting a transaction. -const DEFAULT_GAS_PRICE_ADJUSTMENT: f64 = 0.1; - -const DEFAULT_FEE_GRANTER: &str = ""; - -/// Upper limit on the size of transactions submitted by Hermes, expressed as a /// fraction of the maximum block size defined in the Tendermint core consensus parameters. pub const GENESIS_MAX_BYTES_MAX_FRACTION: f64 = 0.9; - -// The error "incorrect account sequence" is defined as the unique error code 32 in cosmos-sdk: // https://github.com/cosmos/cosmos-sdk/blob/v0.44.0/types/errors/errors.go#L115-L117 -pub const INCORRECT_ACCOUNT_SEQUENCE_ERR: u32 = 32; - -mod retry_strategy { - use crate::util::retry::Fixed; - use core::time::Duration; - - // Maximum number of retries for send_tx in the case of - // an account sequence mismatch at broadcast step. - pub const MAX_ACCOUNT_SEQUENCE_RETRY: u32 = 1; - - // Backoff multiplier to apply while retrying in the case - // of account sequence mismatch. - pub const BACKOFF_MULTIPLIER_ACCOUNT_SEQUENCE_RETRY: u64 = 300; - - pub fn wait_for_block_commits(max_total_wait: Duration) -> impl Iterator { - let backoff_millis = 300; // The periodic backoff - let count: usize = (max_total_wait.as_millis() / backoff_millis as u128) as usize; - Fixed::from_millis(backoff_millis).take(count) - } -} pub struct CosmosSdkChain { config: ChainConfig, @@ -186,14 +151,17 @@ impl CosmosSdkChain { ); } + let max_gas = max_gas_from_config(&self.config); + let default_gas = default_gas_from_config(&self.config); + // If the default gas is strictly greater than the max gas and the tx simulation fails, // Hermes won't be able to ever submit that tx because the gas amount wanted will be // greater than the max gas. - if self.default_gas() > self.max_gas() { + if default_gas > max_gas { return Err(Error::config_validation_default_gas_too_high( self.id().clone(), - self.default_gas(), - self.max_gas(), + default_gas, + max_gas, )); } @@ -234,10 +202,12 @@ impl CosmosSdkChain { .try_into() .expect("cannot over or underflow because it is positive"); - if self.max_gas() > consensus_max_gas { + let max_gas = max_gas_from_config(&self.config); + + if max_gas > consensus_max_gas { return Err(Error::config_validation_max_gas_too_high( self.id().clone(), - self.max_gas(), + max_gas, result.consensus_params.block.max_gas, )); } @@ -301,296 +271,6 @@ impl CosmosSdkChain { self.rt.block_on(f) } - fn send_tx_with_account_sequence( - &mut self, - proto_msgs: Vec, - account_seq: AccountSequence, - ) -> Result { - debug!( - "sending {} messages using account sequence {}", - proto_msgs.len(), - account_seq, - ); - - let signer_info = self.signer(account_seq)?; - let max_fee = self.max_fee(); - - debug!("max fee, for use in tx simulation: {}", PrettyFee(&max_fee)); - - let (body, body_buf) = tx_body_and_bytes(proto_msgs, self.tx_memo())?; - let account_number = self.account_number()?; - - let (auth_info, auth_buf) = auth_info_and_bytes(signer_info.clone(), max_fee)?; - let signed_doc = self.signed_doc(body_buf.clone(), auth_buf, account_number)?; - - let simulate_tx = Tx { - body: Some(body), - auth_info: Some(auth_info), - signatures: vec![signed_doc], - }; - - // This may result in an account sequence mismatch error - let estimated_gas = self.estimate_gas(simulate_tx)?; - - if estimated_gas > self.max_gas() { - debug!( - id = %self.id(), estimated = ?estimated_gas, max = ?self.max_gas(), - "send_tx: estimated gas is higher than max gas" - ); - - return Err(Error::tx_simulate_gas_estimate_exceeded( - self.id().clone(), - estimated_gas, - self.max_gas(), - )); - } - - let adjusted_fee = self.fee_with_gas(estimated_gas); - - debug!( - id = %self.id(), - "send_tx: using {} gas, fee {}", - estimated_gas, - PrettyFee(&adjusted_fee) - ); - - let (_auth_adjusted, auth_buf_adjusted) = auth_info_and_bytes(signer_info, adjusted_fee)?; - let signed_doc = - self.signed_doc(body_buf.clone(), auth_buf_adjusted.clone(), account_number)?; - - let tx_raw = TxRaw { - body_bytes: body_buf, - auth_info_bytes: auth_buf_adjusted, - signatures: vec![signed_doc], - }; - - let mut tx_bytes = Vec::new(); - prost::Message::encode(&tx_raw, &mut tx_bytes) - .map_err(|e| Error::protobuf_encode(String::from("Transaction"), e))?; - - self.block_on(broadcast_tx_sync( - &self.rpc_client, - &self.config.rpc_addr, - tx_bytes, - )) - } - - /// Try to `send_tx` with retry on account sequence error. - /// An account sequence error can occur if the account sequence that - /// the relayer caches becomes outdated. This may happen if the relayer - /// wallet is used concurrently elsewhere, or when tx fees are mis-configured - /// leading to transactions hanging in the mempool. - /// - /// Account sequence mismatch error can occur at two separate steps: - /// 1. as Err variant, propagated from the `estimate_gas` step. - /// 2. as an Ok variant, with an Code::Err response, propagated from - /// the `broadcast_tx_sync` step. - /// - /// We treat both cases by re-fetching the account sequence number - /// from the full node. - /// Upon case #1, we do not retry submitting the same tx (retry happens - /// nonetheless at the worker `step` level). Upon case #2, we retry - /// submitting the same transaction. - fn send_tx_with_account_sequence_retry( - &mut self, - proto_msgs: Vec, - retry_counter: u32, - ) -> Result { - let account_sequence = self.account_sequence()?; - - match self.send_tx_with_account_sequence(proto_msgs.clone(), account_sequence) { - // Gas estimation failed with acct. s.n. mismatch at estimate gas step. - // This indicates that the full node did not yet push the previous tx out of its - // mempool. Possible explanations: fees too low, network congested, or full node - // congested. Whichever the case, it is more expedient in production to drop the tx - // and refresh the s.n., to allow proceeding to the other transactions. A separate - // retry at the worker-level will handle retrying. - Err(e) if mismatching_account_sequence_number(&e) => { - warn!("failed at estimate_gas step mismatching account sequence: dropping the tx & refreshing account sequence number"); - self.refresh_account()?; - // Note: propagating error here can lead to bug & dropped packets: - // https://github.com/informalsystems/ibc-rs/issues/1153 - // But periodic packet clearing will catch any dropped packets. - Err(e) - } - - // Gas estimation succeeded. Broadcasting failed with a retry-able error. - Ok(response) if response.code == Code::Err(INCORRECT_ACCOUNT_SEQUENCE_ERR) => { - if retry_counter < retry_strategy::MAX_ACCOUNT_SEQUENCE_RETRY { - let retry_counter = retry_counter + 1; - warn!("failed at broadcast step with incorrect account sequence. retrying ({}/{})", - retry_counter, retry_strategy::MAX_ACCOUNT_SEQUENCE_RETRY); - // Backoff & re-fetch the account s.n. - let backoff = (retry_counter as u64) - * retry_strategy::BACKOFF_MULTIPLIER_ACCOUNT_SEQUENCE_RETRY; - thread::sleep(Duration::from_millis(backoff)); - self.refresh_account()?; - - // Now retry. - self.send_tx_with_account_sequence_retry(proto_msgs, retry_counter + 1) - } else { - // If after the max retry we still get an account sequence mismatch error, - // we ignore the error and return the original response to downstream. - // We do not return an error here, because the current convention - // let the caller handle error responses separately. - error!("failed due to account sequence errors. the relayer wallet may be used elsewhere concurrently."); - Ok(response) - } - } - - // Catch-all arm for the Ok variant. - // This is the case when gas estimation succeeded. - Ok(response) => { - // Complete success. - match response.code { - tendermint::abci::Code::Ok => { - debug!("broadcast_tx_sync: {:?}", response); - - self.incr_account_sequence(); - Ok(response) - } - // Gas estimation succeeded, but broadcasting failed with unrecoverable error. - tendermint::abci::Code::Err(code) => { - // Avoid increasing the account s.n. if CheckTx failed - // Log the error - error!( - "broadcast_tx_sync: {:?}: diagnostic: {:?}", - response, - sdk_error_from_tx_sync_error_code(code) - ); - Ok(response) - } - } - } - - // Catch-all case for the Err variant. - // Gas estimation failure or other unrecoverable error, propagate. - Err(e) => Err(e), - } - } - - fn send_tx(&mut self, proto_msgs: Vec) -> Result { - crate::time!("send_tx"); - let _span = span!(Level::ERROR, "send_tx", id = %self.id()).entered(); - - self.send_tx_with_account_sequence_retry(proto_msgs, 0) - } - - /// Try to simulate the given tx in order to estimate how much gas will be needed to submit it. - /// - /// It is possible that a batch of messages are fragmented by the caller (`send_msgs`) such that - /// they do not individually verify. For example for the following batch: - /// [`MsgUpdateClient`, `MsgRecvPacket`, ..., `MsgRecvPacket`] - /// - /// If the batch is split in two TX-es, the second one will fail the simulation in `deliverTx` check. - /// In this case we use the `default_gas` param. - fn estimate_gas(&mut self, tx: Tx) -> Result { - let simulated_gas = self.send_tx_simulate(tx).map(|sr| sr.gas_info); - let _span = span!(Level::ERROR, "estimate_gas").entered(); - - match simulated_gas { - Ok(Some(gas_info)) => { - debug!( - "tx simulation successful, gas amount used: {:?}", - gas_info.gas_used - ); - - Ok(gas_info.gas_used) - } - - Ok(None) => { - warn!( - "tx simulation successful but no gas amount used was returned, falling back on default gas: {}", - self.default_gas() - ); - - Ok(self.default_gas()) - } - - // If there is a chance that the tx will be accepted once actually submitted, we fall - // back on the max gas and will attempt to send it anyway. - // See `can_recover_from_simulation_failure` for more info. - Err(e) if can_recover_from_simulation_failure(&e) => { - warn!( - "failed to simulate tx, falling back on default gas because the error is potentially recoverable: {}", - e.detail() - ); - - Ok(self.default_gas()) - } - - Err(e) => { - error!( - "failed to simulate tx. propagating error to caller: {}", - e.detail() - ); - // Propagate the error, the retrying mechanism at caller may catch & retry. - Err(e) - } - } - } - - /// The default amount of gas the relayer is willing to pay for a transaction, - /// when it cannot simulate the tx and therefore estimate the gas amount needed. - fn default_gas(&self) -> u64 { - self.config.default_gas.unwrap_or_else(|| self.max_gas()) - } - - /// The maximum amount of gas the relayer is willing to pay for a transaction - fn max_gas(&self) -> u64 { - self.config.max_gas.unwrap_or(DEFAULT_MAX_GAS) - } - - /// Get the fee granter address - fn fee_granter(&self) -> &str { - self.config - .fee_granter - .as_deref() - .unwrap_or(DEFAULT_FEE_GRANTER) - } - - /// The gas price - fn gas_price(&self) -> &GasPrice { - &self.config.gas_price - } - - /// The gas price adjustment - fn gas_adjustment(&self) -> f64 { - self.config - .gas_adjustment - .unwrap_or(DEFAULT_GAS_PRICE_ADJUSTMENT) - } - - /// Adjusts the fee based on the configured `gas_adjustment` to prevent out of gas errors. - /// The actual gas cost, when a transaction is executed, may be slightly higher than the - /// one returned by the simulation. - fn apply_adjustment_to_gas(&self, gas_amount: u64) -> u64 { - assert!(self.gas_adjustment() <= 1.0); - - let (_, digits) = mul_ceil(gas_amount, self.gas_adjustment()).to_u64_digits(); - assert!(digits.len() == 1); - - let adjustment = digits[0]; - let gas = gas_amount.checked_add(adjustment).unwrap_or(u64::MAX); - - min(gas, self.max_gas()) - } - - /// The maximum fee the relayer pays for a transaction - fn max_fee_in_coins(&self) -> Coin { - calculate_fee(self.max_gas(), self.gas_price()) - } - - /// The fee in coins based on gas amount - fn fee_from_gas_in_coins(&self, gas: u64) -> Coin { - calculate_fee(gas, self.gas_price()) - } - - /// The maximum number of messages included in a transaction - fn max_msg_num(&self) -> usize { - self.config.max_msg_num.into() - } - /// The maximum size of any transaction sent by the relayer to this chain fn max_tx_size(&self) -> usize { self.config.max_tx_size.into() @@ -615,7 +295,14 @@ impl CosmosSdkChain { return Err(Error::private_store()); } - let response = self.block_on(abci_query(self, path, data.to_string(), height, prove))?; + let response = self.block_on(abci_query( + &self.rpc_client, + &self.config.rpc_addr, + path, + data.to_string(), + height, + prove, + ))?; // TODO - Verify response proof, if requested. if prove {} @@ -640,7 +327,8 @@ impl CosmosSdkChain { let path = TendermintABCIPath::from_str(SDK_UPGRADE_QUERY_PATH) .expect("Turning SDK upgrade query path constant into a Tendermint ABCI path"); let response: QueryResponse = self.block_on(abci_query( - self, + &self.rpc_client, + &self.config.rpc_addr, path, Path::Upgrade(data).to_string(), prev_height, @@ -652,253 +340,18 @@ impl CosmosSdkChain { Ok((response.value, proof)) } - fn send_tx_simulate(&self, tx: Tx) -> Result { - crate::time!("send_tx_simulate"); - - use ibc_proto::cosmos::tx::v1beta1::service_client::ServiceClient; - - // The `tx` field of `SimulateRequest` was deprecated in Cosmos SDK 0.43 in favor of `tx_bytes`. - let mut tx_bytes = vec![]; - prost::Message::encode(&tx, &mut tx_bytes) - .map_err(|e| Error::protobuf_encode(String::from("Transaction"), e))?; - - #[allow(deprecated)] - let req = SimulateRequest { - tx: Some(tx), // needed for simulation to go through with Cosmos SDK < 0.43 - tx_bytes, // needed for simulation to go through with Cosmos SDk >= 0.43 - }; - - let mut client = self - .block_on(ServiceClient::connect(self.grpc_addr.clone())) - .map_err(Error::grpc_transport)?; - - let request = tonic::Request::new(req); - let response = self - .block_on(client.simulate(request)) - .map_err(Error::grpc_status)? - .into_inner(); - - Ok(response) - } - fn key(&self) -> Result { self.keybase() .get_key(&self.config.key_name) .map_err(Error::key_base) } - fn key_bytes(&self, key: &KeyEntry) -> Result, Error> { - let mut pk_buf = Vec::new(); - prost::Message::encode(&key.public_key.public_key.to_bytes(), &mut pk_buf) - .map_err(|e| Error::protobuf_encode(String::from("Key bytes"), e))?; - Ok(pk_buf) - } - - fn key_and_bytes(&self) -> Result<(KeyEntry, Vec), Error> { - let key = self.key()?; - let key_bytes = self.key_bytes(&key)?; - Ok((key, key_bytes)) - } - - fn refresh_account(&mut self) -> Result<(), Error> { - let account = self.block_on(query_account(self, self.key()?.account))?; - - info!( - sequence = %account.sequence, - number = %account.account_number, - "refresh: retrieved account", - ); - - self.account = Some(account.into()); - - Ok(()) - } - - fn account(&mut self) -> Result<&Account, Error> { - if self.account == None { - self.refresh_account()?; - } - - Ok(self - .account - .as_ref() - .expect("account was supposedly just cached")) - } - - fn account_number(&mut self) -> Result { - Ok(self.account()?.number) - } - - fn account_sequence(&mut self) -> Result { - Ok(self.account()?.sequence) - } - - fn incr_account_sequence(&mut self) { - if let Some(account) = &mut self.account { - account.sequence = account.sequence.increment(); - } - } - - fn signer(&self, sequence: AccountSequence) -> Result { - let (_key, pk_buf) = self.key_and_bytes()?; - let pk_type = match &self.config.address_type { - AddressType::Cosmos => "/cosmos.crypto.secp256k1.PubKey".to_string(), - AddressType::Ethermint { pk_type } => pk_type.clone(), - }; - // Create a MsgSend proto Any message - let pk_any = Any { - type_url: pk_type, - value: pk_buf, - }; - - let single = Single { mode: 1 }; - let sum_single = Some(Sum::Single(single)); - let mode = Some(ModeInfo { sum: sum_single }); - let signer_info = SignerInfo { - public_key: Some(pk_any), - mode_info: mode, - sequence: sequence.to_u64(), - }; - Ok(signer_info) - } - - fn max_fee(&self) -> Fee { - Fee { - amount: vec![self.max_fee_in_coins()], - gas_limit: self.max_gas(), - payer: "".to_string(), - granter: self.fee_granter().to_string(), - } - } - - fn fee_with_gas(&self, gas_limit: u64) -> Fee { - let adjusted_gas_limit = self.apply_adjustment_to_gas(gas_limit); - - Fee { - amount: vec![self.fee_from_gas_in_coins(adjusted_gas_limit)], - gas_limit: adjusted_gas_limit, - payer: "".to_string(), - granter: self.fee_granter().to_string(), - } - } - - fn signed_doc( - &self, - body_bytes: Vec, - auth_info_bytes: Vec, - account_number: AccountNumber, - ) -> Result, Error> { - let sign_doc = SignDoc { - body_bytes, - auth_info_bytes, - chain_id: self.config.clone().id.to_string(), - account_number: account_number.to_u64(), - }; - - // A protobuf serialization of a SignDoc - let mut signdoc_buf = Vec::new(); - prost::Message::encode(&sign_doc, &mut signdoc_buf) - .map_err(|e| Error::protobuf_encode(String::from("SignDoc"), e))?; - - // Sign doc - let signed = self - .keybase - .sign_msg( - &self.config.key_name, - signdoc_buf, - &self.config.address_type, - ) - .map_err(Error::key_base)?; - - Ok(signed) - } - - /// Given a vector of `TxSyncResult` elements, - /// each including a transaction response hash for one or more messages, periodically queries the chain - /// with the transaction hashes to get the list of IbcEvents included in those transactions. - pub fn wait_for_block_commits( - &self, - mut tx_sync_results: Vec, - ) -> Result, Error> { - let hashes = tx_sync_results - .iter() - .map(|res| res.response.hash.to_string()) - .join(", "); - - info!( - id = %self.id(), - "wait_for_block_commits: waiting for commit of tx hashes(s) {}", - hashes - ); - - // Wait a little bit initially - thread::sleep(Duration::from_millis(200)); - - let start = Instant::now(); - let result = retry_with_index( - retry_strategy::wait_for_block_commits(self.config.rpc_timeout), - |index| { - if all_tx_results_found(&tx_sync_results) { - trace!( - id = %self.id(), - "wait_for_block_commits: retrieved {} tx results after {} tries ({}ms)", - tx_sync_results.len(), - index, - start.elapsed().as_millis() - ); - - // All transactions confirmed - return RetryResult::Ok(()); - } - - for TxSyncResult { response, events } in tx_sync_results.iter_mut() { - // If this transaction was not committed, determine whether it was because it failed - // or because it hasn't been committed yet. - if empty_event_present(events) { - // If the transaction failed, replace the events with an error, - // so that we don't attempt to resolve the transaction later on. - if response.code.value() != 0 { - *events = vec![IbcEvent::ChainError(format!( - "deliver_tx on chain {} for Tx hash {} reports error: code={:?}, log={:?}", - self.id(), response.hash, response.code, response.log - ))]; - - // Otherwise, try to resolve transaction hash to the corresponding events. - } else if let Ok(events_per_tx) = - self.query_txs(QueryTxRequest::Transaction(QueryTxHash(response.hash))) - { - // If we get events back, progress was made, so we replace the events - // with the new ones. in both cases we will check in the next iteration - // whether or not the transaction was fully committed. - if !events_per_tx.is_empty() { - *events = events_per_tx; - } - } - } - } - RetryResult::Retry(index) - }, - ); - - match result { - // All transactions confirmed - Ok(()) => Ok(tx_sync_results), - // Did not find confirmation - Err(_) => Err(Error::tx_no_confirmation()), - } - } - fn trusting_period(&self, unbonding_period: Duration) -> Duration { self.config .trusting_period .unwrap_or(2 * unbonding_period / 3) } - /// Returns the preconfigured memo to be used for every submitted transaction - fn tx_memo(&self) -> &Memo { - &self.config.memo_prefix - } - /// Query the chain status via an RPC query. /// /// Returns an error if the node is still syncing and has not caught up, @@ -923,23 +376,71 @@ impl CosmosSdkChain { crate::time!("query_latest_height"); crate::telemetry!(query, self.id(), "query_latest_height"); - let status = self.status()?; + let status = self.rt.block_on(query_status( + self.id(), + &self.rpc_client, + &self.config.rpc_addr, + ))?; - Ok(ICSHeight { - revision_number: ChainId::chain_version(status.node_info.network.as_str()), - revision_height: u64::from(status.sync_info.latest_block_height), - }) + Ok(status.height) } -} -fn empty_event_present(events: &[IbcEvent]) -> bool { - events.iter().any(|ev| matches!(ev, IbcEvent::Empty(_))) -} + async fn do_send_messages_and_wait_commit( + &mut self, + tracked_msgs: TrackedMsgs, + ) -> Result, Error> { + crate::time!("send_messages_and_wait_commit"); + + let _span = + span!(Level::DEBUG, "send_tx_commit", id = %tracked_msgs.tracking_id()).entered(); + + let proto_msgs = tracked_msgs.msgs; + + let key_entry = self.key()?; + + let account = + get_or_fetch_account(&self.grpc_addr, &key_entry.account, &mut self.account).await?; + + send_batched_messages_and_wait_commit( + &self.config, + &self.rpc_client, + &self.config.rpc_addr, + &self.grpc_addr, + &key_entry, + account, + &self.config.memo_prefix, + proto_msgs, + ) + .await + } + + async fn do_send_messages_and_wait_check_tx( + &mut self, + tracked_msgs: TrackedMsgs, + ) -> Result, Error> { + crate::time!("send_messages_and_wait_check_tx"); + + let span = span!(Level::DEBUG, "send_tx_check", id = %tracked_msgs.tracking_id()); + let _enter = span.enter(); + + let proto_msgs = tracked_msgs.msgs; + + let key_entry = self.key()?; -fn all_tx_results_found(tx_sync_results: &[TxSyncResult]) -> bool { - tx_sync_results - .iter() - .all(|r| !empty_event_present(&r.events)) + let account = + get_or_fetch_account(&self.grpc_addr, &key_entry.account, &mut self.account).await?; + + send_batched_messages_and_wait_check_tx( + &self.config, + &self.rpc_client, + &self.grpc_addr, + &key_entry, + account, + &self.config.memo_prefix, + proto_msgs, + ) + .await + } } impl ChainEndpoint for CosmosSdkChain { @@ -1069,98 +570,18 @@ impl ChainEndpoint for CosmosSdkChain { &mut self, tracked_msgs: TrackedMsgs, ) -> Result, Error> { - crate::time!("send_messages_and_wait_commit"); + let runtime = self.rt.clone(); - let _span = - span!(Level::DEBUG, "send_tx_commit", id = %tracked_msgs.tracking_id()).entered(); - - let proto_msgs = tracked_msgs.messages(); - - if proto_msgs.is_empty() { - return Ok(vec![]); - } - let mut tx_sync_results = vec![]; - - let mut n = 0; - let mut size = 0; - let mut msg_batch = vec![]; - for msg in proto_msgs.iter() { - msg_batch.push(msg.clone()); - let mut buf = Vec::new(); - prost::Message::encode(msg, &mut buf) - .map_err(|e| Error::protobuf_encode(String::from("Message"), e))?; - n += 1; - size += buf.len(); - if n >= self.max_msg_num() || size >= self.max_tx_size() { - let events_per_tx = vec![IbcEvent::default(); msg_batch.len()]; - let tx_sync_result = self.send_tx(msg_batch)?; - tx_sync_results.push(TxSyncResult { - response: tx_sync_result, - events: events_per_tx, - }); - n = 0; - size = 0; - msg_batch = vec![]; - } - } - if !msg_batch.is_empty() { - let events_per_tx = vec![IbcEvent::default(); msg_batch.len()]; - let tx_sync_result = self.send_tx(msg_batch)?; - tx_sync_results.push(TxSyncResult { - response: tx_sync_result, - events: events_per_tx, - }); - } - - let tx_sync_results = self.wait_for_block_commits(tx_sync_results)?; - - let events = tx_sync_results - .into_iter() - .flat_map(|el| el.events) - .collect(); - - Ok(events) + runtime.block_on(self.do_send_messages_and_wait_commit(tracked_msgs)) } fn send_messages_and_wait_check_tx( &mut self, tracked_msgs: TrackedMsgs, ) -> Result, Error> { - crate::time!("send_messages_and_wait_check_tx"); + let runtime = self.rt.clone(); - let span = span!(Level::DEBUG, "send_tx_check", id = %tracked_msgs.tracking_id()); - let _enter = span.enter(); - - let proto_msgs = tracked_msgs.messages(); - - if proto_msgs.is_empty() { - return Ok(vec![]); - } - let mut responses = vec![]; - - let mut n = 0; - let mut size = 0; - let mut msg_batch = vec![]; - for msg in proto_msgs.iter() { - msg_batch.push(msg.clone()); - let mut buf = Vec::new(); - prost::Message::encode(msg, &mut buf) - .map_err(|e| Error::protobuf_encode(String::from("Messages"), e))?; - n += 1; - size += buf.len(); - if n >= self.max_msg_num() || size >= self.max_tx_size() { - // Send the tx and enqueue the resulting response - responses.push(self.send_tx(msg_batch)?); - n = 0; - size = 0; - msg_batch = vec![]; - } - } - if !msg_batch.is_empty() { - responses.push(self.send_tx(msg_batch)?); - } - - Ok(responses) + runtime.block_on(self.do_send_messages_and_wait_check_tx(tracked_msgs)) } /// Get the account for the signer @@ -1218,22 +639,15 @@ impl ChainEndpoint for CosmosSdkChain { } /// Query the chain status - fn query_status(&self) -> Result { + fn query_status(&self) -> Result { crate::time!("query_status"); crate::telemetry!(query, self.id(), "query_status"); - let status = self.status()?; - - let time = status.sync_info.latest_block_time; - let height = ICSHeight { - revision_number: ChainId::chain_version(status.node_info.network.as_str()), - revision_height: u64::from(status.sync_info.latest_block_height), - }; - - Ok(StatusResponse { - height, - timestamp: time.into(), - }) + self.rt.block_on(query_status( + self.id(), + &self.rpc_client, + &self.config.rpc_addr, + )) } fn query_clients( @@ -1770,99 +1184,12 @@ impl ChainEndpoint for CosmosSdkChain { crate::time!("query_txs"); crate::telemetry!(query, self.id(), "query_txs"); - match request { - QueryTxRequest::Packet(request) => { - crate::time!("query_txs: query packet events"); - - let mut result: Vec = vec![]; - - for seq in &request.sequences { - // query first (and only) Tx that includes the event specified in the query request - let response = self - .block_on(self.rpc_client.tx_search( - packet_query(&request, *seq), - false, - 1, - 1, // get only the first Tx matching the query - Order::Ascending, - )) - .map_err(|e| Error::rpc(self.config.rpc_addr.clone(), e))?; - - assert!( - response.txs.len() <= 1, - "packet_from_tx_search_response: unexpected number of txs" - ); - - if response.txs.is_empty() { - continue; - } - - if let Some(event) = packet_from_tx_search_response( - self.id(), - &request, - *seq, - response.txs[0].clone(), - ) { - result.push(event); - } - } - Ok(result) - } - - QueryTxRequest::Client(request) => { - crate::time!("query_txs: single client update event"); - - // query the first Tx that includes the event matching the client request - // Note: it is possible to have multiple Tx-es for same client and consensus height. - // In this case it must be true that the client updates were performed with tha - // same header as the first one, otherwise a subsequent transaction would have - // failed on chain. Therefore only one Tx is of interest and current API returns - // the first one. - let mut response = self - .block_on(self.rpc_client.tx_search( - header_query(&request), - false, - 1, - 1, // get only the first Tx matching the query - Order::Ascending, - )) - .map_err(|e| Error::rpc(self.config.rpc_addr.clone(), e))?; - - if response.txs.is_empty() { - return Ok(vec![]); - } - - // the response must include a single Tx as specified in the query. - assert!( - response.txs.len() <= 1, - "packet_from_tx_search_response: unexpected number of txs" - ); - - let tx = response.txs.remove(0); - let event = update_client_from_tx_search_response(self.id(), &request, tx); - - Ok(event.into_iter().collect()) - } - - QueryTxRequest::Transaction(tx) => { - let mut response = self - .block_on(self.rpc_client.tx_search( - tx_hash_query(&tx), - false, - 1, - 1, // get only the first Tx matching the query - Order::Ascending, - )) - .map_err(|e| Error::rpc(self.config.rpc_addr.clone(), e))?; - - if response.txs.is_empty() { - Ok(vec![]) - } else { - let tx = response.txs.remove(0); - Ok(all_ibc_events_from_tx_search_response(self.id(), tx)) - } - } - } + self.block_on(query_txs( + self.id(), + &self.rpc_client, + &self.config.rpc_addr, + request, + )) } fn query_blocks( @@ -2121,129 +1448,6 @@ impl ChainEndpoint for CosmosSdkChain { } } -fn packet_query(request: &QueryPacketEventDataRequest, seq: Sequence) -> Query { - tendermint_rpc::query::Query::eq( - format!("{}.packet_src_channel", request.event_id.as_str()), - request.source_channel_id.to_string(), - ) - .and_eq( - format!("{}.packet_src_port", request.event_id.as_str()), - request.source_port_id.to_string(), - ) - .and_eq( - format!("{}.packet_dst_channel", request.event_id.as_str()), - request.destination_channel_id.to_string(), - ) - .and_eq( - format!("{}.packet_dst_port", request.event_id.as_str()), - request.destination_port_id.to_string(), - ) - .and_eq( - format!("{}.packet_sequence", request.event_id.as_str()), - seq.to_string(), - ) -} - -fn header_query(request: &QueryClientEventRequest) -> Query { - tendermint_rpc::query::Query::eq( - format!("{}.client_id", request.event_id.as_str()), - request.client_id.to_string(), - ) - .and_eq( - format!("{}.consensus_height", request.event_id.as_str()), - format!( - "{}-{}", - request.consensus_height.revision_number, request.consensus_height.revision_height - ), - ) -} - -fn tx_hash_query(request: &QueryTxHash) -> Query { - tendermint_rpc::query::Query::eq("tx.hash", request.0.to_string()) -} - -// Extract the packet events from the query_txs RPC response. For any given -// packet query, there is at most one Tx matching such query. Moreover, a Tx may -// contain several events, but a single one must match the packet query. -// For example, if we're querying for the packet with sequence 3 and this packet -// was committed in some Tx along with the packet with sequence 4, the response -// will include both packets. For this reason, we iterate all packets in the Tx, -// searching for those that match (which must be a single one). -fn packet_from_tx_search_response( - chain_id: &ChainId, - request: &QueryPacketEventDataRequest, - seq: Sequence, - response: ResultTx, -) -> Option { - let height = ICSHeight::new(chain_id.version(), u64::from(response.height)); - if request.height != ICSHeight::zero() && height > request.height { - return None; - } - - response - .tx_result - .events - .into_iter() - .find_map(|ev| filter_matching_event(ev, request, seq)) -} - -// Extracts from the Tx the update client event for the requested client and height. -// Note: in the Tx, there may have been multiple events, some of them may be -// for update of other clients that are not relevant to the request. -// For example, if we're querying for a transaction that includes the update for client X at -// consensus height H, it is possible that the transaction also includes an update client -// for client Y at consensus height H'. This is the reason the code iterates all event fields in the -// returned Tx to retrieve the relevant ones. -// Returns `None` if no matching event was found. -fn update_client_from_tx_search_response( - chain_id: &ChainId, - request: &QueryClientEventRequest, - response: ResultTx, -) -> Option { - let height = ICSHeight::new(chain_id.version(), u64::from(response.height)); - if request.height != ICSHeight::zero() && height > request.height { - return None; - } - - response - .tx_result - .events - .into_iter() - .filter(|event| event.type_str == request.event_id.as_str()) - .flat_map(|event| ClientEvents::try_from_tx(&event)) - .flat_map(|event| match event { - IbcEvent::UpdateClient(mut update) => { - update.common.height = height; - Some(update) - } - _ => None, - }) - .find(|update| { - update.common.client_id == request.client_id - && update.common.consensus_height == request.consensus_height - }) - .map(IbcEvent::UpdateClient) -} - -fn all_ibc_events_from_tx_search_response(chain_id: &ChainId, response: ResultTx) -> Vec { - let height = ICSHeight::new(chain_id.version(), u64::from(response.height)); - let deliver_tx_result = response.tx_result; - if deliver_tx_result.code.is_err() { - return vec![IbcEvent::ChainError(format!( - "deliver_tx for {} reports error: code={:?}, log={:?}", - response.hash, deliver_tx_result.code, deliver_tx_result.log - ))]; - } - - let mut result = vec![]; - for event in deliver_tx_result.events { - if let Some(ibc_ev) = from_tx_response_event(height, &event) { - result.push(ibc_ev); - } - } - result -} - fn filter_matching_event( event: Event, request: &QueryPacketEventDataRequest, @@ -2279,76 +1483,6 @@ fn filter_matching_event( } } -/// Perform a generic `abci_query`, and return the corresponding deserialized response data. -async fn abci_query( - chain: &CosmosSdkChain, - path: TendermintABCIPath, - data: String, - height: Height, - prove: bool, -) -> Result { - let height = if height.value() == 0 { - None - } else { - Some(height) - }; - - // Use the Tendermint-rs RPC client to do the query. - let response = chain - .rpc_client - .abci_query(Some(path), data.into_bytes(), height, prove) - .await - .map_err(|e| Error::rpc(chain.config.rpc_addr.clone(), e))?; - - if !response.code.is_ok() { - // Fail with response log. - return Err(Error::abci_query(response)); - } - - if prove && response.proof.is_none() { - // Fail due to empty proof - return Err(Error::empty_response_proof()); - } - - let proof = response - .proof - .map(|p| convert_tm_to_ics_merkle_proof(&p)) - .transpose() - .map_err(Error::ics23)?; - - let response = QueryResponse { - value: response.value, - height: response.height, - proof, - }; - - Ok(response) -} - -/// Perform a `broadcast_tx_sync`, and return the corresponding deserialized response data. -pub async fn broadcast_tx_sync( - rpc_client: &HttpClient, - rpc_address: &Url, - data: Vec, -) -> Result { - let response = rpc_client - .broadcast_tx_sync(data.into()) - .await - .map_err(|e| Error::rpc(rpc_address.clone(), e))?; - - Ok(response) -} - -fn encode_to_bech32(address: &str, account_prefix: &str) -> Result { - let account = AccountId::from_str(address) - .map_err(|e| Error::invalid_key_address(address.to_string(), e))?; - - let encoded = bech32::encode(account_prefix, account.to_base32(), Variant::Bech32) - .map_err(Error::bech32_encoding)?; - - Ok(encoded) -} - /// Returns the suffix counter for a CosmosSDK client id. /// Returns `None` if the client identifier is malformed /// and the suffix could not be parsed. @@ -2360,46 +1494,6 @@ fn client_id_suffix(client_id: &ClientId) -> Option { .and_then(|e| e.parse::().ok()) } -pub struct TxSyncResult { - // the broadcast_tx_sync response - response: Response, - // the events generated by a Tx once executed - events: Vec, -} - -pub fn auth_info_and_bytes( - signer_info: SignerInfo, - fee: Fee, -) -> Result<(AuthInfo, Vec), Error> { - let auth_info = AuthInfo { - signer_infos: vec![signer_info], - fee: Some(fee), - }; - - // A protobuf serialization of a AuthInfo - let mut auth_buf = Vec::new(); - prost::Message::encode(&auth_info, &mut auth_buf) - .map_err(|e| Error::protobuf_encode(String::from("AuthInfo"), e))?; - Ok((auth_info, auth_buf)) -} - -pub fn tx_body_and_bytes(proto_msgs: Vec, memo: &Memo) -> Result<(TxBody, Vec), Error> { - // Create TxBody - let body = TxBody { - messages: proto_msgs.to_vec(), - memo: memo.to_string(), - timeout_height: 0_u64, - extension_options: Vec::::new(), - non_critical_extension_options: Vec::::new(), - }; - - // A protobuf serialization of a TxBody - let mut body_buf = Vec::new(); - prost::Message::encode(&body, &mut body_buf) - .map_err(|e| Error::protobuf_encode(String::from("TxBody"), e))?; - Ok((body, body_buf)) -} - fn do_health_check(chain: &CosmosSdkChain) -> Result<(), Error> { let chain_id = chain.id(); let grpc_address = chain.grpc_addr.to_string(); @@ -2443,107 +1537,6 @@ fn do_health_check(chain: &CosmosSdkChain) -> Result<(), Error> { Ok(()) } -/// Queries the chain to obtain the version information. -async fn fetch_version_specs( - chain_id: &ChainId, - grpc_address: &Uri, -) -> Result { - let grpc_addr_string = grpc_address.to_string(); - - // Construct a gRPC client - let mut client = ServiceClient::connect(grpc_address.clone()) - .await - .map_err(|e| { - Error::fetch_version_grpc_transport( - chain_id.clone(), - grpc_addr_string.clone(), - "tendermint::ServiceClient".to_string(), - e, - ) - })?; - - let request = tonic::Request::new(GetNodeInfoRequest {}); - - let response = client.get_node_info(request).await.map_err(|e| { - Error::fetch_version_grpc_status( - chain_id.clone(), - grpc_addr_string.clone(), - "tendermint::ServiceClient".to_string(), - e, - ) - })?; - - let version = response.into_inner().application_version.ok_or_else(|| { - Error::fetch_version_invalid_version_response( - chain_id.clone(), - grpc_addr_string.clone(), - "tendermint::GetNodeInfoRequest".to_string(), - ) - })?; - - // Parse the raw version info into a domain-type `version::Specs` - version - .try_into() - .map_err(|e| Error::fetch_version_parsing(chain_id.clone(), grpc_addr_string.clone(), e)) -} - -/// Determine whether the given error yielded by `tx_simulate` -/// can be recovered from by submitting the tx anyway. -fn can_recover_from_simulation_failure(e: &Error) -> bool { - use crate::error::ErrorDetail::*; - - match e.detail() { - GrpcStatus(detail) => detail.is_client_state_height_too_low(), - _ => false, - } -} - -/// Determine whether the given error yielded by `tx_simulate` -/// indicates that the current sequence number cached in Hermes -/// may be out-of-sync with the full node's version of the s.n. -fn mismatching_account_sequence_number(e: &Error) -> bool { - use crate::error::ErrorDetail::*; - - match e.detail() { - GrpcStatus(detail) => detail.is_account_sequence_mismatch(), - _ => false, - } -} - -struct PrettyFee<'a>(&'a Fee); - -impl fmt::Display for PrettyFee<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let amount = match self.0.amount.get(0) { - Some(coin) => format!("{}{}", coin.amount, coin.denom), - None => "".to_string(), - }; - - f.debug_struct("Fee") - .field("amount", &amount) - .field("gas_limit", &self.0.gas_limit) - .finish() - } -} - -fn calculate_fee(adjusted_gas_amount: u64, gas_price: &GasPrice) -> Coin { - let fee_amount = mul_ceil(adjusted_gas_amount, gas_price.price); - - Coin { - denom: gas_price.denom.to_string(), - amount: fee_amount.to_string(), - } -} - -/// Multiply `a` with `f` and round the result up to the nearest integer. -fn mul_ceil(a: u64, f: f64) -> BigInt { - assert!(f.is_finite()); - - let a = BigInt::from(a); - let f = BigRational::from_float(f).expect("f is finite"); - (f * a).ceil().to_integer() -} - #[cfg(test)] mod tests { use ibc::{ diff --git a/relayer/src/chain/cosmos/account.rs b/relayer/src/chain/cosmos/account.rs deleted file mode 100644 index ee669d17f7..0000000000 --- a/relayer/src/chain/cosmos/account.rs +++ /dev/null @@ -1,108 +0,0 @@ -use core::fmt; - -use ibc_proto::cosmos::auth::v1beta1::BaseAccount; -use prost::Message; - -use crate::error::Error; - -use super::CosmosSdkChain; - -/// Wrapper for account number and sequence number. -/// -/// More fields may be added later. -#[derive(Clone, Debug, PartialEq)] -pub struct Account { - // pub address: String, - // pub pub_key: Option, - pub number: AccountNumber, - pub sequence: AccountSequence, -} - -impl From for Account { - fn from(value: BaseAccount) -> Self { - Self { - number: AccountNumber::new(value.account_number), - sequence: AccountSequence::new(value.sequence), - } - } -} - -/// Newtype for account numbers -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub struct AccountNumber(u64); - -impl AccountNumber { - pub fn new(number: u64) -> Self { - Self(number) - } - - pub fn to_u64(self) -> u64 { - self.0 - } -} - -impl fmt::Display for AccountNumber { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self) - } -} - -/// Newtype for account sequence numbers -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub struct AccountSequence(u64); - -impl AccountSequence { - pub fn new(sequence: u64) -> Self { - Self(sequence) - } - - pub fn to_u64(self) -> u64 { - self.0 - } - - pub fn increment(self) -> Self { - Self(self.0 + 1) - } -} - -impl fmt::Display for AccountSequence { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self) - } -} - -/// Uses the GRPC client to retrieve the account sequence -pub async fn query_account(chain: &CosmosSdkChain, address: String) -> Result { - use ibc_proto::cosmos::auth::v1beta1::query_client::QueryClient; - use ibc_proto::cosmos::auth::v1beta1::{EthAccount, QueryAccountRequest}; - - crate::telemetry!(query, &chain.config.id, "query_account"); - - let mut client = QueryClient::connect(chain.grpc_addr.clone()) - .await - .map_err(Error::grpc_transport)?; - - let request = tonic::Request::new(QueryAccountRequest { - address: address.clone(), - }); - - let response = client.account(request).await; - - // Querying for an account might fail, i.e. if the account doesn't actually exist - let resp_account = match response.map_err(Error::grpc_status)?.into_inner().account { - Some(account) => account, - None => return Err(Error::empty_query_account(address)), - }; - - if resp_account.type_url == "/cosmos.auth.v1beta1.BaseAccount" { - Ok(BaseAccount::decode(resp_account.value.as_slice()) - .map_err(|e| Error::protobuf_decode("BaseAccount".to_string(), e))?) - } else if resp_account.type_url.ends_with(".EthAccount") { - Ok(EthAccount::decode(resp_account.value.as_slice()) - .map_err(|e| Error::protobuf_decode("EthAccount".to_string(), e))? - .base_account - .ok_or_else(Error::empty_base_account)?) - } else { - Err(Error::unknown_account_type(resp_account.type_url)) - } -} diff --git a/relayer/src/chain/cosmos/batch.rs b/relayer/src/chain/cosmos/batch.rs new file mode 100644 index 0000000000..6a36fb8639 --- /dev/null +++ b/relayer/src/chain/cosmos/batch.rs @@ -0,0 +1,166 @@ +use ibc::events::IbcEvent; +use ibc_proto::google::protobuf::Any; +use prost::Message; +use tendermint_rpc::endpoint::broadcast::tx_sync::Response; +use tendermint_rpc::{HttpClient, Url}; +use tonic::codegen::http::Uri; + +use crate::chain::cosmos::retry::send_tx_with_account_sequence_retry; +use crate::chain::cosmos::types::account::Account; +use crate::chain::cosmos::types::tx::TxSyncResult; +use crate::chain::cosmos::wait::wait_for_block_commits; +use crate::config::types::Memo; +use crate::config::ChainConfig; +use crate::error::Error; +use crate::keyring::KeyEntry; + +pub async fn send_batched_messages_and_wait_commit( + config: &ChainConfig, + rpc_client: &HttpClient, + rpc_address: &Url, + grpc_address: &Uri, + key_entry: &KeyEntry, + account: &mut Account, + tx_memo: &Memo, + messages: Vec, +) -> Result, Error> { + if messages.is_empty() { + return Ok(Vec::new()); + } + + let mut tx_sync_results = send_messages_as_batches( + config, + rpc_client, + grpc_address, + key_entry, + account, + tx_memo, + messages, + ) + .await?; + + wait_for_block_commits( + &config.id, + rpc_client, + rpc_address, + &config.rpc_timeout, + &mut tx_sync_results, + ) + .await?; + + let events = tx_sync_results + .into_iter() + .flat_map(|el| el.events) + .collect(); + + Ok(events) +} + +pub async fn send_batched_messages_and_wait_check_tx( + config: &ChainConfig, + rpc_client: &HttpClient, + grpc_address: &Uri, + key_entry: &KeyEntry, + account: &mut Account, + tx_memo: &Memo, + messages: Vec, +) -> Result, Error> { + if messages.is_empty() { + return Ok(Vec::new()); + } + + let batches = batch_messages(config, messages)?; + + let mut responses = Vec::new(); + + for batch in batches { + let response = send_tx_with_account_sequence_retry( + config, + rpc_client, + grpc_address, + key_entry, + account, + tx_memo, + batch, + 0, + ) + .await?; + + responses.push(response); + } + + Ok(responses) +} + +async fn send_messages_as_batches( + config: &ChainConfig, + rpc_client: &HttpClient, + grpc_address: &Uri, + key_entry: &KeyEntry, + account: &mut Account, + tx_memo: &Memo, + messages: Vec, +) -> Result, Error> { + if messages.is_empty() { + return Ok(Vec::new()); + } + + let batches = batch_messages(config, messages)?; + + let mut tx_sync_results = Vec::new(); + + for batch in batches { + let events_per_tx = vec![IbcEvent::default(); batch.len()]; + + let response = send_tx_with_account_sequence_retry( + config, + rpc_client, + grpc_address, + key_entry, + account, + tx_memo, + batch, + 0, + ) + .await?; + + let tx_sync_result = TxSyncResult { + response, + events: events_per_tx, + }; + + tx_sync_results.push(tx_sync_result); + } + + Ok(tx_sync_results) +} + +fn batch_messages(config: &ChainConfig, messages: Vec) -> Result>, Error> { + let max_message_count = config.max_msg_num.0; + let max_tx_size = config.max_tx_size.into(); + + let mut batches = vec![]; + + let mut current_count = 0; + let mut current_size = 0; + let mut current_batch = vec![]; + + for message in messages.into_iter() { + current_count += 1; + current_size += message.encoded_len(); + current_batch.push(message); + + if current_count >= max_message_count || current_size >= max_tx_size { + let insert_batch = current_batch.drain(..).collect(); + batches.push(insert_batch); + current_count = 0; + current_size = 0; + } + } + + if !current_batch.is_empty() { + batches.push(current_batch); + } + + Ok(batches) +} diff --git a/relayer/src/chain/cosmos/encode.rs b/relayer/src/chain/cosmos/encode.rs new file mode 100644 index 0000000000..b66265c4bc --- /dev/null +++ b/relayer/src/chain/cosmos/encode.rs @@ -0,0 +1,179 @@ +use bech32::{ToBase32, Variant}; +use core::str::FromStr; +use ibc::core::ics24_host::identifier::ChainId; +use ibc_proto::cosmos::tx::v1beta1::mode_info::{Single, Sum}; +use ibc_proto::cosmos::tx::v1beta1::{AuthInfo, Fee, ModeInfo, SignDoc, SignerInfo, TxBody, TxRaw}; +use ibc_proto::google::protobuf::Any; +use tendermint::account::Id as AccountId; + +use crate::chain::cosmos::types::account::{Account, AccountNumber, AccountSequence}; +use crate::chain::cosmos::types::tx::SignedTx; +use crate::config::types::Memo; +use crate::config::AddressType; +use crate::config::ChainConfig; +use crate::error::Error; +use crate::keyring::{sign_message, KeyEntry}; + +pub fn sign_and_encode_tx( + config: &ChainConfig, + key_entry: &KeyEntry, + account: &Account, + tx_memo: &Memo, + messages: Vec, + fee: &Fee, +) -> Result, Error> { + let signed_tx = sign_tx(config, key_entry, account, tx_memo, messages, fee)?; + + let tx_raw = TxRaw { + body_bytes: signed_tx.body_bytes, + auth_info_bytes: signed_tx.auth_info_bytes, + signatures: signed_tx.signatures, + }; + + encode_tx_raw(tx_raw) +} + +pub fn sign_tx( + config: &ChainConfig, + key_entry: &KeyEntry, + account: &Account, + tx_memo: &Memo, + messages: Vec, + fee: &Fee, +) -> Result { + let key_bytes = encode_key_bytes(key_entry)?; + + let signer = encode_signer_info(&config.address_type, account.sequence, key_bytes)?; + + let (body, body_bytes) = tx_body_and_bytes(messages, tx_memo)?; + + let (auth_info, auth_info_bytes) = auth_info_and_bytes(signer, fee.clone())?; + + let signed_doc = encode_sign_doc( + &config.id, + key_entry, + &config.address_type, + account.number, + auth_info_bytes.clone(), + body_bytes.clone(), + )?; + + Ok(SignedTx { + body, + body_bytes, + auth_info, + auth_info_bytes, + signatures: vec![signed_doc], + }) +} + +fn encode_key_bytes(key: &KeyEntry) -> Result, Error> { + let mut pk_buf = Vec::new(); + + prost::Message::encode(&key.public_key.public_key.to_bytes(), &mut pk_buf) + .map_err(|e| Error::protobuf_encode("PublicKey".into(), e))?; + + Ok(pk_buf) +} + +fn encode_sign_doc( + chain_id: &ChainId, + key: &KeyEntry, + address_type: &AddressType, + account_number: AccountNumber, + auth_info_bytes: Vec, + body_bytes: Vec, +) -> Result, Error> { + let sign_doc = SignDoc { + body_bytes, + auth_info_bytes, + chain_id: chain_id.to_string(), + account_number: account_number.to_u64(), + }; + + // A protobuf serialization of a SignDoc + let mut signdoc_buf = Vec::new(); + prost::Message::encode(&sign_doc, &mut signdoc_buf).unwrap(); + + let signed = sign_message(key, signdoc_buf, address_type).map_err(Error::key_base)?; + + Ok(signed) +} + +fn encode_signer_info( + address_type: &AddressType, + sequence: AccountSequence, + key_bytes: Vec, +) -> Result { + let pk_type = match address_type { + AddressType::Cosmos => "/cosmos.crypto.secp256k1.PubKey".to_string(), + AddressType::Ethermint { pk_type } => pk_type.clone(), + }; + // Create a MsgSend proto Any message + let pk_any = Any { + type_url: pk_type, + value: key_bytes, + }; + + let single = Single { mode: 1 }; + let sum_single = Some(Sum::Single(single)); + let mode = Some(ModeInfo { sum: sum_single }); + let signer_info = SignerInfo { + public_key: Some(pk_any), + mode_info: mode, + sequence: sequence.to_u64(), + }; + Ok(signer_info) +} + +fn encode_tx_raw(tx_raw: TxRaw) -> Result, Error> { + let mut tx_bytes = Vec::new(); + prost::Message::encode(&tx_raw, &mut tx_bytes) + .map_err(|e| Error::protobuf_encode("Transaction".to_string(), e))?; + + Ok(tx_bytes) +} + +pub fn encode_to_bech32(address: &str, account_prefix: &str) -> Result { + let account = AccountId::from_str(address) + .map_err(|e| Error::invalid_key_address(address.to_string(), e))?; + + let encoded = bech32::encode(account_prefix, account.to_base32(), Variant::Bech32) + .map_err(Error::bech32_encoding)?; + + Ok(encoded) +} + +fn auth_info_and_bytes(signer_info: SignerInfo, fee: Fee) -> Result<(AuthInfo, Vec), Error> { + let auth_info = AuthInfo { + signer_infos: vec![signer_info], + fee: Some(fee), + }; + + // A protobuf serialization of a AuthInfo + let mut auth_buf = Vec::new(); + + prost::Message::encode(&auth_info, &mut auth_buf) + .map_err(|e| Error::protobuf_encode(String::from("AuthInfo"), e))?; + + Ok((auth_info, auth_buf)) +} + +fn tx_body_and_bytes(proto_msgs: Vec, memo: &Memo) -> Result<(TxBody, Vec), Error> { + // Create TxBody + let body = TxBody { + messages: proto_msgs.to_vec(), + memo: memo.to_string(), + timeout_height: 0_u64, + extension_options: Vec::::new(), + non_critical_extension_options: Vec::::new(), + }; + + // A protobuf serialization of a TxBody + let mut body_buf = Vec::new(); + + prost::Message::encode(&body, &mut body_buf) + .map_err(|e| Error::protobuf_encode(String::from("TxBody"), e))?; + + Ok((body, body_buf)) +} diff --git a/relayer/src/chain/cosmos/estimate.rs b/relayer/src/chain/cosmos/estimate.rs new file mode 100644 index 0000000000..22faf937ea --- /dev/null +++ b/relayer/src/chain/cosmos/estimate.rs @@ -0,0 +1,155 @@ +use ibc::core::ics24_host::identifier::ChainId; +use ibc_proto::cosmos::tx::v1beta1::{Fee, Tx}; +use ibc_proto::google::protobuf::Any; +use tonic::codegen::http::Uri; +use tracing::{debug, error, span, warn, Level}; + +use crate::chain::cosmos::encode::sign_tx; +use crate::chain::cosmos::gas::{gas_amount_to_fees, PrettyFee}; +use crate::chain::cosmos::simulate::send_tx_simulate; +use crate::chain::cosmos::types::account::Account; +use crate::chain::cosmos::types::gas::GasConfig; +use crate::config::types::Memo; +use crate::config::ChainConfig; +use crate::error::Error; +use crate::keyring::KeyEntry; + +pub async fn estimate_tx_fees( + config: &ChainConfig, + grpc_address: &Uri, + key_entry: &KeyEntry, + account: &Account, + tx_memo: &Memo, + messages: Vec, +) -> Result { + let gas_config = GasConfig::from_chain_config(config); + + debug!( + "max fee, for use in tx simulation: {}", + PrettyFee(&gas_config.max_fee) + ); + + let signed_tx = sign_tx( + config, + key_entry, + account, + tx_memo, + messages, + &gas_config.max_fee, + )?; + + let tx = Tx { + body: Some(signed_tx.body), + auth_info: Some(signed_tx.auth_info), + signatures: signed_tx.signatures, + }; + + let estimated_fee = estimate_fee_with_tx(&gas_config, grpc_address, &config.id, tx).await?; + + Ok(estimated_fee) +} + +async fn estimate_fee_with_tx( + gas_config: &GasConfig, + grpc_address: &Uri, + chain_id: &ChainId, + tx: Tx, +) -> Result { + let estimated_gas = estimate_gas_with_tx(gas_config, grpc_address, tx).await?; + + if estimated_gas > gas_config.max_gas { + debug!( + id = %chain_id, estimated = ?estimated_gas, max = ?gas_config.max_gas, + "send_tx: estimated gas is higher than max gas" + ); + + return Err(Error::tx_simulate_gas_estimate_exceeded( + chain_id.clone(), + estimated_gas, + gas_config.max_gas, + )); + } + + let adjusted_fee = gas_amount_to_fees(gas_config, estimated_gas); + + debug!( + id = %chain_id, + "send_tx: using {} gas, fee {}", + estimated_gas, + PrettyFee(&adjusted_fee) + ); + + Ok(adjusted_fee) +} + +/// Try to simulate the given tx in order to estimate how much gas will be needed to submit it. +/// +/// It is possible that a batch of messages are fragmented by the caller (`send_msgs`) such that +/// they do not individually verify. For example for the following batch: +/// [`MsgUpdateClient`, `MsgRecvPacket`, ..., `MsgRecvPacket`] +/// +/// If the batch is split in two TX-es, the second one will fail the simulation in `deliverTx` check. +/// In this case we use the `default_gas` param. +async fn estimate_gas_with_tx( + gas_config: &GasConfig, + grpc_address: &Uri, + tx: Tx, +) -> Result { + let simulated_gas = send_tx_simulate(grpc_address, tx) + .await + .map(|sr| sr.gas_info); + + let _span = span!(Level::ERROR, "estimate_gas").entered(); + + match simulated_gas { + Ok(Some(gas_info)) => { + debug!( + "tx simulation successful, gas amount used: {:?}", + gas_info.gas_used + ); + + Ok(gas_info.gas_used) + } + + Ok(None) => { + warn!( + "tx simulation successful but no gas amount used was returned, falling back on default gas: {}", + gas_config.default_gas + ); + + Ok(gas_config.default_gas) + } + + // If there is a chance that the tx will be accepted once actually submitted, we fall + // back on the max gas and will attempt to send it anyway. + // See `can_recover_from_simulation_failure` for more info. + Err(e) if can_recover_from_simulation_failure(&e) => { + warn!( + "failed to simulate tx, falling back on default gas because the error is potentially recoverable: {}", + e.detail() + ); + + Ok(gas_config.default_gas) + } + + Err(e) => { + error!( + "failed to simulate tx. propagating error to caller: {}", + e.detail() + ); + // Propagate the error, the retrying mechanism at caller may catch & retry. + Err(e) + } + } +} + +/// Determine whether the given error yielded by `tx_simulate` +/// can be recovered from by submitting the tx anyway. +fn can_recover_from_simulation_failure(e: &Error) -> bool { + use crate::error::ErrorDetail::*; + + match e.detail() { + GrpcStatus(detail) => detail.is_client_state_height_too_low(), + _ => false, + } +} diff --git a/relayer/src/chain/cosmos/gas.rs b/relayer/src/chain/cosmos/gas.rs new file mode 100644 index 0000000000..9b881845db --- /dev/null +++ b/relayer/src/chain/cosmos/gas.rs @@ -0,0 +1,74 @@ +use core::cmp::min; +use core::fmt; +use ibc_proto::cosmos::base::v1beta1::Coin; +use ibc_proto::cosmos::tx::v1beta1::Fee; +use num_bigint::BigInt; +use num_rational::BigRational; + +use crate::chain::cosmos::types::gas::GasConfig; +use crate::config::GasPrice; + +pub struct PrettyFee<'a>(pub &'a Fee); + +pub fn gas_amount_to_fees(config: &GasConfig, gas_amount: u64) -> Fee { + let adjusted_gas_limit = adjust_gas_with_simulated_fees(config, gas_amount); + + // The fee in coins based on gas amount + let amount = calculate_fee(adjusted_gas_limit, &config.gas_price); + + Fee { + amount: vec![amount], + gas_limit: adjusted_gas_limit, + payer: "".to_string(), + granter: config.fee_granter.clone(), + } +} + +pub fn calculate_fee(adjusted_gas_amount: u64, gas_price: &GasPrice) -> Coin { + let fee_amount = mul_ceil(adjusted_gas_amount, gas_price.price); + + Coin { + denom: gas_price.denom.to_string(), + amount: fee_amount.to_string(), + } +} + +/// Multiply `a` with `f` and round the result up to the nearest integer. +pub fn mul_ceil(a: u64, f: f64) -> BigInt { + assert!(f.is_finite()); + + let a = BigInt::from(a); + let f = BigRational::from_float(f).expect("f is finite"); + (f * a).ceil().to_integer() +} + +/// Adjusts the fee based on the configured `gas_adjustment` to prevent out of gas errors. +/// The actual gas cost, when a transaction is executed, may be slightly higher than the +/// one returned by the simulation. +fn adjust_gas_with_simulated_fees(config: &GasConfig, gas_amount: u64) -> u64 { + let gas_adjustment = config.gas_adjustment; + + assert!(gas_adjustment <= 1.0); + + let (_, digits) = mul_ceil(gas_amount, gas_adjustment).to_u64_digits(); + assert!(digits.len() == 1); + + let adjustment = digits[0]; + let gas = gas_amount.checked_add(adjustment).unwrap_or(u64::MAX); + + min(gas, config.max_gas) +} + +impl fmt::Display for PrettyFee<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let amount = match self.0.amount.get(0) { + Some(coin) => format!("{}{}", coin.amount, coin.denom), + None => "".to_string(), + }; + + f.debug_struct("Fee") + .field("amount", &amount) + .field("gas_limit", &self.0.gas_limit) + .finish() + } +} diff --git a/relayer/src/chain/cosmos/query.rs b/relayer/src/chain/cosmos/query.rs new file mode 100644 index 0000000000..15d700b173 --- /dev/null +++ b/relayer/src/chain/cosmos/query.rs @@ -0,0 +1,149 @@ +use http::uri::Uri; +use ibc::core::ics02_client::client_consensus::QueryClientEventRequest; +use ibc::core::ics04_channel::channel::QueryPacketEventDataRequest; +use ibc::core::ics04_channel::packet::Sequence; +use ibc::core::ics23_commitment::merkle::convert_tm_to_ics_merkle_proof; +use ibc::core::ics24_host::identifier::ChainId; +use ibc::query::QueryTxHash; +use ibc_proto::cosmos::base::tendermint::v1beta1::service_client::ServiceClient; +use ibc_proto::cosmos::base::tendermint::v1beta1::GetNodeInfoRequest; +use tendermint::abci::Path as TendermintABCIPath; +use tendermint::block::Height; +use tendermint_rpc::query::Query; +use tendermint_rpc::{Client, HttpClient, Url}; + +use crate::chain::cosmos::version::Specs; +use crate::chain::QueryResponse; +use crate::error::Error; + +pub mod account; +pub mod status; +pub mod tx; + +pub fn packet_query(request: &QueryPacketEventDataRequest, seq: Sequence) -> Query { + Query::eq( + format!("{}.packet_src_channel", request.event_id.as_str()), + request.source_channel_id.to_string(), + ) + .and_eq( + format!("{}.packet_src_port", request.event_id.as_str()), + request.source_port_id.to_string(), + ) + .and_eq( + format!("{}.packet_dst_channel", request.event_id.as_str()), + request.destination_channel_id.to_string(), + ) + .and_eq( + format!("{}.packet_dst_port", request.event_id.as_str()), + request.destination_port_id.to_string(), + ) + .and_eq( + format!("{}.packet_sequence", request.event_id.as_str()), + seq.to_string(), + ) +} + +pub fn header_query(request: &QueryClientEventRequest) -> Query { + Query::eq( + format!("{}.client_id", request.event_id.as_str()), + request.client_id.to_string(), + ) + .and_eq( + format!("{}.consensus_height", request.event_id.as_str()), + format!( + "{}-{}", + request.consensus_height.revision_number, request.consensus_height.revision_height + ), + ) +} + +pub fn tx_hash_query(request: &QueryTxHash) -> Query { + Query::eq("tx.hash", request.0.to_string()) +} + +/// Perform a generic `abci_query`, and return the corresponding deserialized response data. +pub async fn abci_query( + rpc_client: &HttpClient, + rpc_address: &Url, + path: TendermintABCIPath, + data: String, + height: Height, + prove: bool, +) -> Result { + let height = if height.value() == 0 { + None + } else { + Some(height) + }; + + // Use the Tendermint-rs RPC client to do the query. + let response = rpc_client + .abci_query(Some(path), data.into_bytes(), height, prove) + .await + .map_err(|e| Error::rpc(rpc_address.clone(), e))?; + + if !response.code.is_ok() { + // Fail with response log. + return Err(Error::abci_query(response)); + } + + if prove && response.proof.is_none() { + // Fail due to empty proof + return Err(Error::empty_response_proof()); + } + + let proof = response + .proof + .map(|p| convert_tm_to_ics_merkle_proof(&p)) + .transpose() + .map_err(Error::ics23)?; + + let response = QueryResponse { + value: response.value, + height: response.height, + proof, + }; + + Ok(response) +} + +/// Queries the chain to obtain the version information. +pub async fn fetch_version_specs(chain_id: &ChainId, grpc_address: &Uri) -> Result { + let grpc_addr_string = grpc_address.to_string(); + + // Construct a gRPC client + let mut client = ServiceClient::connect(grpc_address.clone()) + .await + .map_err(|e| { + Error::fetch_version_grpc_transport( + chain_id.clone(), + grpc_addr_string.clone(), + "tendermint::ServiceClient".to_string(), + e, + ) + })?; + + let request = tonic::Request::new(GetNodeInfoRequest {}); + + let response = client.get_node_info(request).await.map_err(|e| { + Error::fetch_version_grpc_status( + chain_id.clone(), + grpc_addr_string.clone(), + "tendermint::ServiceClient".to_string(), + e, + ) + })?; + + let version = response.into_inner().application_version.ok_or_else(|| { + Error::fetch_version_invalid_version_response( + chain_id.clone(), + grpc_addr_string.clone(), + "tendermint::GetNodeInfoRequest".to_string(), + ) + })?; + + // Parse the raw version info into a domain-type `version::Specs` + version + .try_into() + .map_err(|e| Error::fetch_version_parsing(chain_id.clone(), grpc_addr_string.clone(), e)) +} diff --git a/relayer/src/chain/cosmos/query/account.rs b/relayer/src/chain/cosmos/query/account.rs new file mode 100644 index 0000000000..2cc7579481 --- /dev/null +++ b/relayer/src/chain/cosmos/query/account.rs @@ -0,0 +1,80 @@ +use http::uri::Uri; +use ibc_proto::cosmos::auth::v1beta1::query_client::QueryClient; +use ibc_proto::cosmos::auth::v1beta1::{BaseAccount, EthAccount, QueryAccountRequest}; +use prost::Message; +use tracing::info; + +use crate::chain::cosmos::types::account::Account; +use crate::error::Error; + +/// Get a `&mut Account` from an `&mut Option` if it is `Some(Account)`. +/// Otherwise query for the account information, update the `Option` to `Some`, +/// and return the underlying `&mut` reference. +pub async fn get_or_fetch_account<'a>( + grpc_address: &Uri, + account_address: &str, + m_account: &'a mut Option, +) -> Result<&'a mut Account, Error> { + match m_account { + Some(account) => Ok(account), + None => { + let account = query_account(grpc_address, account_address).await?; + *m_account = Some(account.into()); + + Ok(m_account + .as_mut() + .expect("account was supposedly just cached")) + } + } +} + +/// Refresh the account sequence behind the `&mut Account` by refetching the +/// account and updating the `&mut` reference. +pub async fn refresh_account<'a>( + grpc_address: &Uri, + account_address: &str, + m_account: &'a mut Account, +) -> Result<(), Error> { + let account = query_account(grpc_address, account_address).await?; + + info!( + sequence = %account.sequence, + number = %account.account_number, + "refresh: retrieved account", + ); + + *m_account = account.into(); + + Ok(()) +} + +/// Uses the GRPC client to retrieve the account sequence +async fn query_account(grpc_address: &Uri, account_address: &str) -> Result { + let mut client = QueryClient::connect(grpc_address.clone()) + .await + .map_err(Error::grpc_transport)?; + + let request = tonic::Request::new(QueryAccountRequest { + address: account_address.to_string(), + }); + + let response = client.account(request).await; + + // Querying for an account might fail, i.e. if the account doesn't actually exist + let resp_account = match response.map_err(Error::grpc_status)?.into_inner().account { + Some(account) => account, + None => return Err(Error::empty_query_account(account_address.to_string())), + }; + + if resp_account.type_url == "/cosmos.auth.v1beta1.BaseAccount" { + Ok(BaseAccount::decode(resp_account.value.as_slice()) + .map_err(|e| Error::protobuf_decode("BaseAccount".to_string(), e))?) + } else if resp_account.type_url.ends_with(".EthAccount") { + Ok(EthAccount::decode(resp_account.value.as_slice()) + .map_err(|e| Error::protobuf_decode("EthAccount".to_string(), e))? + .base_account + .ok_or_else(Error::empty_base_account)?) + } else { + Err(Error::unknown_account_type(resp_account.type_url)) + } +} diff --git a/relayer/src/chain/cosmos/query/status.rs b/relayer/src/chain/cosmos/query/status.rs new file mode 100644 index 0000000000..8efc5c57c4 --- /dev/null +++ b/relayer/src/chain/cosmos/query/status.rs @@ -0,0 +1,40 @@ +use ibc::core::ics24_host::identifier::ChainId; +use ibc::Height; +use tendermint_rpc::{Client, HttpClient, Url}; + +use crate::chain::ChainStatus; +use crate::error::Error; + +/// Query the chain status via an RPC query. +/// +/// Returns an error if the node is still syncing and has not caught up, +/// ie. if `sync_info.catching_up` is `true`. +pub async fn query_status( + chain_id: &ChainId, + rpc_client: &HttpClient, + rpc_address: &Url, +) -> Result { + let response = rpc_client + .status() + .await + .map_err(|e| Error::rpc(rpc_address.clone(), e))?; + + if response.sync_info.catching_up { + return Err(Error::chain_not_caught_up( + rpc_address.to_string(), + chain_id.clone(), + )); + } + + let time = response.sync_info.latest_block_time; + + let height = Height { + revision_number: ChainId::chain_version(response.node_info.network.as_str()), + revision_height: u64::from(response.sync_info.latest_block_height), + }; + + Ok(ChainStatus { + height, + timestamp: time.into(), + }) +} diff --git a/relayer/src/chain/cosmos/query/tx.rs b/relayer/src/chain/cosmos/query/tx.rs new file mode 100644 index 0000000000..dab694a53c --- /dev/null +++ b/relayer/src/chain/cosmos/query/tx.rs @@ -0,0 +1,250 @@ +use ibc::core::ics02_client::client_consensus::QueryClientEventRequest; +use ibc::core::ics02_client::events as ClientEvents; +use ibc::core::ics04_channel::channel::QueryPacketEventDataRequest; +use ibc::core::ics04_channel::events as ChannelEvents; +use ibc::core::ics04_channel::packet::{Packet, Sequence}; +use ibc::core::ics24_host::identifier::ChainId; +use ibc::events::{from_tx_response_event, IbcEvent}; +use ibc::query::QueryTxRequest; +use ibc::Height as ICSHeight; +use tendermint::abci::Event; +use tendermint_rpc::endpoint::tx::Response as ResultTx; +use tendermint_rpc::{Client, HttpClient, Order, Url}; + +use crate::chain::cosmos::query::{header_query, packet_query, tx_hash_query}; +use crate::error::Error; + +/// This function queries transactions for events matching certain criteria. +/// 1. Client Update request - returns a vector with at most one update client event +/// 2. Packet event request - returns at most one packet event for each sequence specified +/// in the request. +/// Note - there is no way to format the packet query such that it asks for Tx-es with either +/// sequence (the query conditions can only be AND-ed). +/// There is a possibility to include "<=" and ">=" conditions but it doesn't work with +/// string attributes (sequence is emmitted as a string). +/// Therefore, for packets we perform one tx_search for each sequence. +/// Alternatively, a single query for all packets could be performed but it would return all +/// packets ever sent. +pub async fn query_txs( + chain_id: &ChainId, + rpc_client: &HttpClient, + rpc_address: &Url, + request: QueryTxRequest, +) -> Result, Error> { + crate::time!("query_txs"); + crate::telemetry!(query, chain_id, "query_txs"); + + match request { + QueryTxRequest::Packet(request) => { + crate::time!("query_txs: query packet events"); + + let mut result: Vec = vec![]; + + for seq in &request.sequences { + // query first (and only) Tx that includes the event specified in the query request + let response = rpc_client + .tx_search( + packet_query(&request, *seq), + false, + 1, + 1, // get only the first Tx matching the query + Order::Ascending, + ) + .await + .map_err(|e| Error::rpc(rpc_address.clone(), e))?; + + assert!( + response.txs.len() <= 1, + "packet_from_tx_search_response: unexpected number of txs" + ); + + if response.txs.is_empty() { + continue; + } + + if let Some(event) = packet_from_tx_search_response( + chain_id, + &request, + *seq, + response.txs[0].clone(), + ) { + result.push(event); + } + } + Ok(result) + } + + QueryTxRequest::Client(request) => { + crate::time!("query_txs: single client update event"); + + // query the first Tx that includes the event matching the client request + // Note: it is possible to have multiple Tx-es for same client and consensus height. + // In this case it must be true that the client updates were performed with tha + // same header as the first one, otherwise a subsequent transaction would have + // failed on chain. Therefore only one Tx is of interest and current API returns + // the first one. + let mut response = rpc_client + .tx_search( + header_query(&request), + false, + 1, + 1, // get only the first Tx matching the query + Order::Ascending, + ) + .await + .map_err(|e| Error::rpc(rpc_address.clone(), e))?; + + if response.txs.is_empty() { + return Ok(vec![]); + } + + // the response must include a single Tx as specified in the query. + assert!( + response.txs.len() <= 1, + "packet_from_tx_search_response: unexpected number of txs" + ); + + let tx = response.txs.remove(0); + let event = update_client_from_tx_search_response(chain_id, &request, tx); + + Ok(event.into_iter().collect()) + } + + QueryTxRequest::Transaction(tx) => { + let mut response = rpc_client + .tx_search( + tx_hash_query(&tx), + false, + 1, + 1, // get only the first Tx matching the query + Order::Ascending, + ) + .await + .map_err(|e| Error::rpc(rpc_address.clone(), e))?; + + if response.txs.is_empty() { + Ok(vec![]) + } else { + let tx = response.txs.remove(0); + Ok(all_ibc_events_from_tx_search_response(chain_id, tx)) + } + } + } +} + +// Extracts from the Tx the update client event for the requested client and height. +// Note: in the Tx, there may have been multiple events, some of them may be +// for update of other clients that are not relevant to the request. +// For example, if we're querying for a transaction that includes the update for client X at +// consensus height H, it is possible that the transaction also includes an update client +// for client Y at consensus height H'. This is the reason the code iterates all event fields in the +// returned Tx to retrieve the relevant ones. +// Returns `None` if no matching event was found. +fn update_client_from_tx_search_response( + chain_id: &ChainId, + request: &QueryClientEventRequest, + response: ResultTx, +) -> Option { + let height = ICSHeight::new(chain_id.version(), u64::from(response.height)); + if request.height != ICSHeight::zero() && height > request.height { + return None; + } + + response + .tx_result + .events + .into_iter() + .filter(|event| event.type_str == request.event_id.as_str()) + .flat_map(|event| ClientEvents::try_from_tx(&event)) + .flat_map(|event| match event { + IbcEvent::UpdateClient(mut update) => { + update.common.height = height; + Some(update) + } + _ => None, + }) + .find(|update| { + update.common.client_id == request.client_id + && update.common.consensus_height == request.consensus_height + }) + .map(IbcEvent::UpdateClient) +} + +// Extract the packet events from the query_txs RPC response. For any given +// packet query, there is at most one Tx matching such query. Moreover, a Tx may +// contain several events, but a single one must match the packet query. +// For example, if we're querying for the packet with sequence 3 and this packet +// was committed in some Tx along with the packet with sequence 4, the response +// will include both packets. For this reason, we iterate all packets in the Tx, +// searching for those that match (which must be a single one). +fn packet_from_tx_search_response( + chain_id: &ChainId, + request: &QueryPacketEventDataRequest, + seq: Sequence, + response: ResultTx, +) -> Option { + let height = ICSHeight::new(chain_id.version(), u64::from(response.height)); + if request.height != ICSHeight::zero() && height > request.height { + return None; + } + + response + .tx_result + .events + .into_iter() + .find_map(|ev| filter_matching_event(ev, request, seq)) +} + +fn filter_matching_event( + event: Event, + request: &QueryPacketEventDataRequest, + seq: Sequence, +) -> Option { + fn matches_packet( + request: &QueryPacketEventDataRequest, + seq: Sequence, + packet: &Packet, + ) -> bool { + packet.source_port == request.source_port_id + && packet.source_channel == request.source_channel_id + && packet.destination_port == request.destination_port_id + && packet.destination_channel == request.destination_channel_id + && packet.sequence == seq + } + + if event.type_str != request.event_id.as_str() { + return None; + } + + let ibc_event = ChannelEvents::try_from_tx(&event)?; + match ibc_event { + IbcEvent::SendPacket(ref send_ev) if matches_packet(request, seq, &send_ev.packet) => { + Some(ibc_event) + } + IbcEvent::WriteAcknowledgement(ref ack_ev) + if matches_packet(request, seq, &ack_ev.packet) => + { + Some(ibc_event) + } + _ => None, + } +} + +fn all_ibc_events_from_tx_search_response(chain_id: &ChainId, response: ResultTx) -> Vec { + let height = ICSHeight::new(chain_id.version(), u64::from(response.height)); + let deliver_tx_result = response.tx_result; + if deliver_tx_result.code.is_err() { + return vec![IbcEvent::ChainError(format!( + "deliver_tx for {} reports error: code={:?}, log={:?}", + response.hash, deliver_tx_result.code, deliver_tx_result.log + ))]; + } + + let mut result = vec![]; + for event in deliver_tx_result.events { + if let Some(ibc_ev) = from_tx_response_event(height, &event) { + result.push(ibc_ev); + } + } + result +} diff --git a/relayer/src/chain/cosmos/retry.rs b/relayer/src/chain/cosmos/retry.rs new file mode 100644 index 0000000000..9960455969 --- /dev/null +++ b/relayer/src/chain/cosmos/retry.rs @@ -0,0 +1,199 @@ +use core::future::Future; +use core::pin::Pin; +use core::time::Duration; +use ibc_proto::google::protobuf::Any; +use std::thread; +use tendermint::abci::Code; +use tendermint_rpc::endpoint::broadcast::tx_sync::Response; +use tendermint_rpc::HttpClient; +use tonic::codegen::http::Uri; +use tracing::{debug, error, span, warn, Level}; + +use crate::chain::cosmos::query::account::refresh_account; +use crate::chain::cosmos::tx::estimate_fee_and_send_tx; +use crate::chain::cosmos::types::account::Account; +use crate::config::types::Memo; +use crate::config::ChainConfig; +use crate::error::Error; +use crate::keyring::KeyEntry; +use crate::sdk_error::sdk_error_from_tx_sync_error_code; + +// Maximum number of retries for send_tx in the case of +// an account sequence mismatch at broadcast step. +const MAX_ACCOUNT_SEQUENCE_RETRY: u64 = 1; + +// Backoff multiplier to apply while retrying in the case +// of account sequence mismatch. +const BACKOFF_MULTIPLIER_ACCOUNT_SEQUENCE_RETRY: u64 = 300; + +// The error "incorrect account sequence" is defined as the unique error code 32 in cosmos-sdk: +// https://github.com/cosmos/cosmos-sdk/blob/v0.44.0/types/errors/errors.go#L115-L117 +const INCORRECT_ACCOUNT_SEQUENCE_ERR: u32 = 32; + +/// Try to `send_tx` with retry on account sequence error. +/// An account sequence error can occur if the account sequence that +/// the relayer caches becomes outdated. This may happen if the relayer +/// wallet is used concurrently elsewhere, or when tx fees are mis-configured +/// leading to transactions hanging in the mempool. +/// +/// Account sequence mismatch error can occur at two separate steps: +/// 1. as Err variant, propagated from the `estimate_gas` step. +/// 2. as an Ok variant, with an Code::Err response, propagated from +/// the `broadcast_tx_sync` step. +/// +/// We treat both cases by re-fetching the account sequence number +/// from the full node. +/// Upon case #1, we do not retry submitting the same tx (retry happens +/// nonetheless at the worker `step` level). Upon case #2, we retry +/// submitting the same transaction. +pub async fn send_tx_with_account_sequence_retry( + config: &ChainConfig, + rpc_client: &HttpClient, + grpc_address: &Uri, + key_entry: &KeyEntry, + account: &mut Account, + tx_memo: &Memo, + messages: Vec, + retry_counter: u64, +) -> Result { + crate::time!("send_tx_with_account_sequence_retry"); + let _span = + span!(Level::ERROR, "send_tx_with_account_sequence_retry", id = %config.id).entered(); + + do_send_tx_with_account_sequence_retry( + config, + rpc_client, + grpc_address, + key_entry, + account, + tx_memo, + messages, + retry_counter, + ) + .await +} + +// We have to do explicit return of `Box` because Rust +// do not currently support recursive async functions behind the +// `async fn` syntactic sugar. +fn do_send_tx_with_account_sequence_retry<'a>( + config: &'a ChainConfig, + rpc_client: &'a HttpClient, + grpc_address: &'a Uri, + key_entry: &'a KeyEntry, + account: &'a mut Account, + tx_memo: &'a Memo, + messages: Vec, + retry_counter: u64, +) -> Pin> + 'a>> { + Box::pin(async move { + debug!( + "sending {} messages using account sequence {}", + messages.len(), + account.sequence, + ); + + let tx_result = estimate_fee_and_send_tx( + config, + rpc_client, + grpc_address, + key_entry, + account, + tx_memo, + messages.clone(), + ) + .await; + + match tx_result { + // Gas estimation failed with acct. s.n. mismatch at estimate gas step. + // This indicates that the full node did not yet push the previous tx out of its + // mempool. Possible explanations: fees too low, network congested, or full node + // congested. Whichever the case, it is more expedient in production to drop the tx + // and refresh the s.n., to allow proceeding to the other transactions. A separate + // retry at the worker-level will handle retrying. + Err(e) if mismatching_account_sequence_number(&e) => { + warn!("failed at estimate_gas step mismatching account sequence: dropping the tx & refreshing account sequence number"); + refresh_account(grpc_address, &key_entry.account, account).await?; + // Note: propagating error here can lead to bug & dropped packets: + // https://github.com/informalsystems/ibc-rs/issues/1153 + // But periodic packet clearing will catch any dropped packets. + Err(e) + } + + // Gas estimation succeeded. Broadcasting failed with a retry-able error. + Ok(response) if response.code == Code::Err(INCORRECT_ACCOUNT_SEQUENCE_ERR) => { + if retry_counter < MAX_ACCOUNT_SEQUENCE_RETRY { + let retry_counter = retry_counter + 1; + warn!("failed at broadcast step with incorrect account sequence. retrying ({}/{})", + retry_counter, MAX_ACCOUNT_SEQUENCE_RETRY); + // Backoff & re-fetch the account s.n. + let backoff = retry_counter * BACKOFF_MULTIPLIER_ACCOUNT_SEQUENCE_RETRY; + + thread::sleep(Duration::from_millis(backoff)); + refresh_account(grpc_address, &key_entry.account, account).await?; + + // Now retry. + do_send_tx_with_account_sequence_retry( + config, + rpc_client, + grpc_address, + key_entry, + account, + tx_memo, + messages, + retry_counter + 1, + ) + .await + } else { + // If after the max retry we still get an account sequence mismatch error, + // we ignore the error and return the original response to downstream. + // We do not return an error here, because the current convention + // let the caller handle error responses separately. + error!("failed due to account sequence errors. the relayer wallet may be used elsewhere concurrently."); + Ok(response) + } + } + + // Catch-all arm for the Ok variant. + // This is the case when gas estimation succeeded. + Ok(response) => { + // Complete success. + match response.code { + Code::Ok => { + debug!("broadcast_tx_sync: {:?}", response); + + account.sequence.increment_mut(); + Ok(response) + } + // Gas estimation succeeded, but broadcasting failed with unrecoverable error. + Code::Err(code) => { + // Avoid increasing the account s.n. if CheckTx failed + // Log the error + error!( + "broadcast_tx_sync: {:?}: diagnostic: {:?}", + response, + sdk_error_from_tx_sync_error_code(code) + ); + Ok(response) + } + } + } + + // Catch-all case for the Err variant. + // Gas estimation failure or other unrecoverable error, propagate. + Err(e) => Err(e), + } + }) +} + +/// Determine whether the given error yielded by `tx_simulate` +/// indicates hat the current sequence number cached in Hermes +/// may be out-of-sync with the full node's version of the s.n. +fn mismatching_account_sequence_number(e: &Error) -> bool { + use crate::error::ErrorDetail::*; + + match e.detail() { + GrpcStatus(detail) => detail.is_account_sequence_mismatch(), + _ => false, + } +} diff --git a/relayer/src/chain/cosmos/simulate.rs b/relayer/src/chain/cosmos/simulate.rs new file mode 100644 index 0000000000..ce249a44db --- /dev/null +++ b/relayer/src/chain/cosmos/simulate.rs @@ -0,0 +1,33 @@ +use ibc_proto::cosmos::tx::v1beta1::service_client::ServiceClient; +use ibc_proto::cosmos::tx::v1beta1::{SimulateRequest, SimulateResponse, Tx}; +use tonic::codegen::http::Uri; + +use crate::error::Error; + +pub async fn send_tx_simulate(grpc_address: &Uri, tx: Tx) -> Result { + crate::time!("send_tx_simulate"); + + // The `tx` field of `SimulateRequest` was deprecated in Cosmos SDK 0.43 in favor of `tx_bytes`. + let mut tx_bytes = vec![]; + prost::Message::encode(&tx, &mut tx_bytes) + .map_err(|e| Error::protobuf_encode(String::from("Transaction"), e))?; + + #[allow(deprecated)] + let req = SimulateRequest { + tx: Some(tx), // needed for simulation to go through with Cosmos SDK < 0.43 + tx_bytes, // needed for simulation to go through with Cosmos SDk >= 0.43 + }; + + let mut client = ServiceClient::connect(grpc_address.clone()) + .await + .map_err(Error::grpc_transport)?; + + let request = tonic::Request::new(req); + let response = client + .simulate(request) + .await + .map_err(Error::grpc_status)? + .into_inner(); + + Ok(response) +} diff --git a/relayer/src/chain/cosmos/tx.rs b/relayer/src/chain/cosmos/tx.rs new file mode 100644 index 0000000000..0ac651d270 --- /dev/null +++ b/relayer/src/chain/cosmos/tx.rs @@ -0,0 +1,68 @@ +use ibc_proto::cosmos::tx::v1beta1::Fee; +use ibc_proto::google::protobuf::Any; +use tendermint_rpc::endpoint::broadcast::tx_sync::Response; +use tendermint_rpc::{Client, HttpClient, Url}; +use tonic::codegen::http::Uri; + +use crate::chain::cosmos::encode::sign_and_encode_tx; +use crate::chain::cosmos::estimate::estimate_tx_fees; +use crate::chain::cosmos::types::account::Account; +use crate::config::types::Memo; +use crate::config::ChainConfig; +use crate::error::Error; +use crate::keyring::KeyEntry; + +pub async fn estimate_fee_and_send_tx( + config: &ChainConfig, + rpc_client: &HttpClient, + grpc_address: &Uri, + key_entry: &KeyEntry, + account: &Account, + tx_memo: &Memo, + messages: Vec, +) -> Result { + let fee = estimate_tx_fees( + config, + grpc_address, + key_entry, + account, + tx_memo, + messages.clone(), + ) + .await?; + + send_tx_with_fee( + config, rpc_client, key_entry, account, tx_memo, messages, &fee, + ) + .await +} + +async fn send_tx_with_fee( + config: &ChainConfig, + rpc_client: &HttpClient, + key_entry: &KeyEntry, + account: &Account, + tx_memo: &Memo, + messages: Vec, + fee: &Fee, +) -> Result { + let tx_bytes = sign_and_encode_tx(config, key_entry, account, tx_memo, messages, fee)?; + + let response = broadcast_tx_sync(rpc_client, &config.rpc_addr, tx_bytes).await?; + + Ok(response) +} + +/// Perform a `broadcast_tx_sync`, and return the corresponding deserialized response data. +async fn broadcast_tx_sync( + rpc_client: &HttpClient, + rpc_address: &Url, + data: Vec, +) -> Result { + let response = rpc_client + .broadcast_tx_sync(data.into()) + .await + .map_err(|e| Error::rpc(rpc_address.clone(), e))?; + + Ok(response) +} diff --git a/relayer/src/chain/cosmos/types/account.rs b/relayer/src/chain/cosmos/types/account.rs new file mode 100644 index 0000000000..5350c7b373 --- /dev/null +++ b/relayer/src/chain/cosmos/types/account.rs @@ -0,0 +1,70 @@ +use core::fmt; +use ibc_proto::cosmos::auth::v1beta1::BaseAccount; + +/// Wrapper for account number and sequence number. +/// +/// More fields may be added later. +#[derive(Clone, Debug, PartialEq)] +pub struct Account { + // pub address: String, + // pub pub_key: Option, + pub number: AccountNumber, + pub sequence: AccountSequence, +} + +impl From for Account { + fn from(value: BaseAccount) -> Self { + Self { + number: AccountNumber::new(value.account_number), + sequence: AccountSequence::new(value.sequence), + } + } +} + +/// Newtype for account numbers +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct AccountNumber(u64); + +impl AccountNumber { + pub fn new(number: u64) -> Self { + Self(number) + } + + pub fn to_u64(self) -> u64 { + self.0 + } +} + +impl fmt::Display for AccountNumber { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +/// Newtype for account sequence numbers +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct AccountSequence(u64); + +impl AccountSequence { + pub fn new(sequence: u64) -> Self { + Self(sequence) + } + + pub fn to_u64(self) -> u64 { + self.0 + } + + pub fn increment(self) -> Self { + Self(self.0 + 1) + } + + pub fn increment_mut(&mut self) { + self.0 += 1 + } +} + +impl fmt::Display for AccountSequence { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} diff --git a/relayer/src/chain/cosmos/types/gas.rs b/relayer/src/chain/cosmos/types/gas.rs new file mode 100644 index 0000000000..55625dab3c --- /dev/null +++ b/relayer/src/chain/cosmos/types/gas.rs @@ -0,0 +1,79 @@ +use ibc_proto::cosmos::tx::v1beta1::Fee; + +use crate::chain::cosmos::calculate_fee; +use crate::config::{ChainConfig, GasPrice}; + +/// Default gas limit when submitting a transaction. +const DEFAULT_MAX_GAS: u64 = 400_000; + +/// Fraction of the estimated gas to add to the estimated gas amount when submitting a transaction. +const DEFAULT_GAS_PRICE_ADJUSTMENT: f64 = 0.1; + +const DEFAULT_FEE_GRANTER: &str = ""; + +pub struct GasConfig { + pub default_gas: u64, + pub max_gas: u64, + pub gas_adjustment: f64, + pub gas_price: GasPrice, + pub max_fee: Fee, + pub fee_granter: String, +} + +impl GasConfig { + pub fn from_chain_config(config: &ChainConfig) -> GasConfig { + GasConfig { + default_gas: default_gas_from_config(config), + max_gas: max_gas_from_config(config), + gas_adjustment: gas_adjustment_from_config(config), + gas_price: config.gas_price.clone(), + max_fee: max_fee_from_config(config), + fee_granter: fee_granter_from_config(config), + } + } +} + +/// The default amount of gas the relayer is willing to pay for a transaction, +/// when it cannot simulate the tx and therefore estimate the gas amount needed. +pub fn default_gas_from_config(config: &ChainConfig) -> u64 { + config + .default_gas + .unwrap_or_else(|| max_gas_from_config(config)) +} + +/// The maximum amount of gas the relayer is willing to pay for a transaction +pub fn max_gas_from_config(config: &ChainConfig) -> u64 { + config.max_gas.unwrap_or(DEFAULT_MAX_GAS) +} + +/// The gas price adjustment +fn gas_adjustment_from_config(config: &ChainConfig) -> f64 { + config + .gas_adjustment + .unwrap_or(DEFAULT_GAS_PRICE_ADJUSTMENT) +} + +/// Get the fee granter address +fn fee_granter_from_config(config: &ChainConfig) -> String { + config + .fee_granter + .as_deref() + .unwrap_or(DEFAULT_FEE_GRANTER) + .to_string() +} + +fn max_fee_from_config(config: &ChainConfig) -> Fee { + let max_gas = max_gas_from_config(config); + + // The maximum fee the relayer pays for a transaction + let max_fee_in_coins = calculate_fee(max_gas, &config.gas_price); + + let fee_granter = fee_granter_from_config(config); + + Fee { + amount: vec![max_fee_in_coins], + gas_limit: max_gas, + payer: "".to_string(), + granter: fee_granter, + } +} diff --git a/relayer/src/chain/cosmos/types/mod.rs b/relayer/src/chain/cosmos/types/mod.rs new file mode 100644 index 0000000000..4d91a59966 --- /dev/null +++ b/relayer/src/chain/cosmos/types/mod.rs @@ -0,0 +1,3 @@ +pub mod account; +pub mod gas; +pub mod tx; diff --git a/relayer/src/chain/cosmos/types/tx.rs b/relayer/src/chain/cosmos/types/tx.rs new file mode 100644 index 0000000000..9509c9826c --- /dev/null +++ b/relayer/src/chain/cosmos/types/tx.rs @@ -0,0 +1,18 @@ +use ibc::events::IbcEvent; +use ibc_proto::cosmos::tx::v1beta1::{AuthInfo, TxBody}; +use tendermint_rpc::endpoint::broadcast::tx_sync::Response; + +pub struct SignedTx { + pub body: TxBody, + pub body_bytes: Vec, + pub auth_info: AuthInfo, + pub auth_info_bytes: Vec, + pub signatures: Vec>, +} + +pub struct TxSyncResult { + // the broadcast_tx_sync response + pub response: Response, + // the events generated by a Tx once executed + pub events: Vec, +} diff --git a/relayer/src/chain/cosmos/wait.rs b/relayer/src/chain/cosmos/wait.rs new file mode 100644 index 0000000000..953395fef2 --- /dev/null +++ b/relayer/src/chain/cosmos/wait.rs @@ -0,0 +1,114 @@ +use core::time::Duration; +use ibc::core::ics24_host::identifier::ChainId; +use ibc::events::IbcEvent; +use ibc::query::{QueryTxHash, QueryTxRequest}; +use itertools::Itertools; +use std::thread; +use std::time::Instant; +use tendermint_rpc::{HttpClient, Url}; +use tracing::{info, trace}; + +use crate::chain::cosmos::query::tx::query_txs; +use crate::chain::cosmos::types::tx::TxSyncResult; +use crate::error::Error; + +const WAIT_BACKOFF: Duration = Duration::from_millis(300); + +/// Given a vector of `TxSyncResult` elements, +/// each including a transaction response hash for one or more messages, periodically queries the chain +/// with the transaction hashes to get the list of IbcEvents included in those transactions. +pub async fn wait_for_block_commits( + chain_id: &ChainId, + rpc_client: &HttpClient, + rpc_address: &Url, + rpc_timeout: &Duration, + tx_sync_results: &mut [TxSyncResult], +) -> Result<(), Error> { + let start_time = Instant::now(); + + let hashes = tx_sync_results + .iter() + .map(|res| res.response.hash.to_string()) + .join(", "); + + info!( + id = %chain_id, + "wait_for_block_commits: waiting for commit of tx hashes(s) {}", + hashes + ); + + loop { + let elapsed = start_time.elapsed(); + + if all_tx_results_found(tx_sync_results) { + trace!( + id = %chain_id, + "wait_for_block_commits: retrieved {} tx results after {}ms", + tx_sync_results.len(), + elapsed.as_millis(), + ); + + return Ok(()); + } else if &elapsed > rpc_timeout { + return Err(Error::tx_no_confirmation()); + } else { + thread::sleep(WAIT_BACKOFF); + + for tx_sync_result in tx_sync_results.iter_mut() { + // ignore error + let _ = + update_tx_sync_result(chain_id, rpc_client, rpc_address, tx_sync_result).await; + } + } + } +} + +async fn update_tx_sync_result( + chain_id: &ChainId, + rpc_client: &HttpClient, + rpc_address: &Url, + tx_sync_result: &mut TxSyncResult, +) -> Result<(), Error> { + let TxSyncResult { response, events } = tx_sync_result; + + // If this transaction was not committed, determine whether it was because it failed + // or because it hasn't been committed yet. + if empty_event_present(events) { + // If the transaction failed, replace the events with an error, + // so that we don't attempt to resolve the transaction later on. + if response.code.value() != 0 { + *events = vec![IbcEvent::ChainError(format!( + "deliver_tx on chain {} for Tx hash {} reports error: code={:?}, log={:?}", + chain_id, response.hash, response.code, response.log + ))]; + } + + // Otherwise, try to resolve transaction hash to the corresponding events. + let events_per_tx = query_txs( + chain_id, + rpc_client, + rpc_address, + QueryTxRequest::Transaction(QueryTxHash(response.hash)), + ) + .await?; + + // If we get events back, progress was made, so we replace the events + // with the new ones. in both cases we will check in the next iteration + // whether or not the transaction was fully committed. + if !events_per_tx.is_empty() { + *events = events_per_tx; + } + } + + Ok(()) +} + +fn empty_event_present(events: &[IbcEvent]) -> bool { + events.iter().any(|ev| matches!(ev, IbcEvent::Empty(_))) +} + +fn all_tx_results_found(tx_sync_results: &[TxSyncResult]) -> bool { + tx_sync_results + .iter() + .all(|r| !empty_event_present(&r.events)) +} diff --git a/relayer/src/chain/handle.rs b/relayer/src/chain/handle.rs index d88e7c0825..100a1c249c 100644 --- a/relayer/src/chain/handle.rs +++ b/relayer/src/chain/handle.rs @@ -53,7 +53,7 @@ use crate::{ use super::client::ClientSettings; use super::tx::TrackedMsgs; -use super::{HealthCheck, StatusResponse}; +use super::{ChainStatus, HealthCheck}; mod base; mod cache; @@ -150,7 +150,7 @@ pub enum ChainRequest { }, QueryStatus { - reply_to: ReplyTo, + reply_to: ReplyTo, }, QueryClients { @@ -384,7 +384,7 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug + 'static { /// Return the version of the IBC protocol that this chain is running, if known. fn ibc_version(&self) -> Result, Error>; - fn query_status(&self) -> Result; + fn query_status(&self) -> Result; fn query_latest_height(&self) -> Result { Ok(self.query_status()?.height) diff --git a/relayer/src/chain/handle/base.rs b/relayer/src/chain/handle/base.rs index 9dbf33c6dc..984144c69c 100644 --- a/relayer/src/chain/handle/base.rs +++ b/relayer/src/chain/handle/base.rs @@ -37,7 +37,7 @@ use ibc_proto::ibc::core::connection::v1::QueryClientConnectionsRequest; use ibc_proto::ibc::core::connection::v1::QueryConnectionsRequest; use crate::{ - chain::{client::ClientSettings, tx::TrackedMsgs, StatusResponse}, + chain::{client::ClientSettings, tx::TrackedMsgs, ChainStatus}, config::ChainConfig, connection::ConnectionMsgType, error::Error, @@ -142,7 +142,7 @@ impl ChainHandle for BaseChainHandle { self.send(|reply_to| ChainRequest::IbcVersion { reply_to }) } - fn query_status(&self) -> Result { + fn query_status(&self) -> Result { self.send(|reply_to| ChainRequest::QueryStatus { reply_to }) } diff --git a/relayer/src/chain/handle/cache.rs b/relayer/src/chain/handle/cache.rs index 9a3870d7b7..d7e1b83654 100644 --- a/relayer/src/chain/handle/cache.rs +++ b/relayer/src/chain/handle/cache.rs @@ -38,7 +38,7 @@ use crate::cache::{Cache, CacheStatus}; use crate::chain::client::ClientSettings; use crate::chain::handle::{ChainHandle, ChainRequest, Subscription}; use crate::chain::tx::TrackedMsgs; -use crate::chain::{HealthCheck, StatusResponse}; +use crate::chain::{ChainStatus, HealthCheck}; use crate::config::ChainConfig; use crate::error::Error; use crate::telemetry; @@ -127,7 +127,7 @@ impl ChainHandle for CachingChainHandle { self.inner().ibc_version() } - fn query_status(&self) -> Result { + fn query_status(&self) -> Result { self.inner().query_status() } diff --git a/relayer/src/chain/handle/counting.rs b/relayer/src/chain/handle/counting.rs index 8a8b192c97..d094d04c84 100644 --- a/relayer/src/chain/handle/counting.rs +++ b/relayer/src/chain/handle/counting.rs @@ -38,7 +38,7 @@ use tracing::debug; use crate::chain::client::ClientSettings; use crate::chain::handle::{ChainHandle, ChainRequest, Subscription}; use crate::chain::tx::TrackedMsgs; -use crate::chain::{HealthCheck, StatusResponse}; +use crate::chain::{ChainStatus, HealthCheck}; use crate::config::ChainConfig; use crate::error::Error; use crate::util::lock::LockExt; @@ -155,7 +155,7 @@ impl ChainHandle for CountingChainHandle { self.inner().ibc_version() } - fn query_status(&self) -> Result { + fn query_status(&self) -> Result { self.inc_metric("query_status"); self.inner().query_status() } diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index 52627d2c9c..f26f60ce36 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -40,7 +40,7 @@ use ibc_proto::ibc::core::connection::v1::{ }; use crate::chain::client::ClientSettings; -use crate::chain::{ChainEndpoint, StatusResponse}; +use crate::chain::{ChainEndpoint, ChainStatus}; use crate::config::ChainConfig; use crate::error::Error; use crate::event::monitor::{EventReceiver, EventSender, TxMonitorCmd}; @@ -170,8 +170,8 @@ impl ChainEndpoint for MockChain { unimplemented!() } - fn query_status(&self) -> Result { - Ok(StatusResponse { + fn query_status(&self) -> Result { + Ok(ChainStatus { height: self.context.host_height(), timestamp: self.context.host_timestamp(), }) diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index 09224c241f..e4f74aba8c 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -44,7 +44,7 @@ use ibc_proto::ibc::core::{ }; use crate::{ - chain::{client::ClientSettings, StatusResponse}, + chain::{client::ClientSettings, ChainStatus}, config::ChainConfig, connection::ConnectionMsgType, error::Error, @@ -465,7 +465,7 @@ where reply_to.send(result).map_err(Error::send) } - fn query_status(&self, reply_to: ReplyTo) -> Result<(), Error> { + fn query_status(&self, reply_to: ReplyTo) -> Result<(), Error> { let latest_timestamp = self.chain.query_status(); reply_to.send(latest_timestamp).map_err(Error::send) } diff --git a/relayer/src/chain/tx.rs b/relayer/src/chain/tx.rs index e234af229f..dc9d23f86e 100644 --- a/relayer/src/chain/tx.rs +++ b/relayer/src/chain/tx.rs @@ -9,8 +9,8 @@ use ibc_proto::google::protobuf::Any; /// by sharing the same `tracking_id`. #[derive(Debug, Clone)] pub struct TrackedMsgs { - msgs: Vec, - tracking_id: String, + pub msgs: Vec, + pub tracking_id: String, } impl TrackedMsgs { diff --git a/relayer/src/keyring.rs b/relayer/src/keyring.rs index 389c616528..1dee320b0c 100644 --- a/relayer/src/keyring.rs +++ b/relayer/src/keyring.rs @@ -360,25 +360,7 @@ impl KeyRing { ) -> Result, Error> { let key = self.get_key(key_name)?; - let private_key_bytes = key.private_key.private_key.to_bytes(); - match address_type { - AddressType::Ethermint { ref pk_type } if pk_type.ends_with(".ethsecp256k1.PubKey") => { - let hash = keccak256_hash(msg.as_slice()); - let s = Secp256k1::signing_only(); - // SAFETY: hash is 32 bytes, as expected in `Message::from_slice` -- see `keccak256_hash`, hence `unwrap` - let sign_msg = Message::from_slice(hash.as_slice()).unwrap(); - let key = SecretKey::from_slice(private_key_bytes.as_slice()) - .map_err(Error::invalid_key_raw)?; - let (_, sig_bytes) = s.sign_recoverable(&sign_msg, &key).serialize_compact(); - Ok(sig_bytes.to_vec()) - } - AddressType::Cosmos | AddressType::Ethermint { .. } => { - let signing_key = SigningKey::from_bytes(private_key_bytes.as_slice()) - .map_err(Error::invalid_key)?; - let signature: Signature = signing_key.sign(&msg); - Ok(signature.as_ref().to_vec()) - } - } + sign_message(&key, msg, address_type) } pub fn account_prefix(&self) -> &str { @@ -389,6 +371,33 @@ impl KeyRing { } } +/// Sign a message +pub fn sign_message( + key: &KeyEntry, + msg: Vec, + address_type: &AddressType, +) -> Result, Error> { + let private_key_bytes = key.private_key.private_key.to_bytes(); + match address_type { + AddressType::Ethermint { ref pk_type } if pk_type.ends_with(".ethsecp256k1.PubKey") => { + let hash = keccak256_hash(msg.as_slice()); + let s = Secp256k1::signing_only(); + // SAFETY: hash is 32 bytes, as expected in `Message::from_slice` -- see `keccak256_hash`, hence `unwrap` + let sign_msg = Message::from_slice(hash.as_slice()).unwrap(); + let key = SecretKey::from_slice(private_key_bytes.as_slice()) + .map_err(Error::invalid_key_raw)?; + let (_, sig_bytes) = s.sign_recoverable(&sign_msg, &key).serialize_compact(); + Ok(sig_bytes.to_vec()) + } + AddressType::Cosmos | AddressType::Ethermint { .. } => { + let signing_key = + SigningKey::from_bytes(private_key_bytes.as_slice()).map_err(Error::invalid_key)?; + let signature: Signature = signing_key.sign(&msg); + Ok(signature.as_ref().to_vec()) + } + } +} + /// Decode an extended private key from a mnemonic fn private_key_from_mnemonic( mnemonic_words: &str, diff --git a/relayer/src/lib.rs b/relayer/src/lib.rs index 47552d930c..7d99879458 100644 --- a/relayer/src/lib.rs +++ b/relayer/src/lib.rs @@ -7,6 +7,7 @@ unused_qualifications, rust_2018_idioms )] +#![allow(clippy::too_many_arguments)] // TODO: disable unwraps: // https://github.com/informalsystems/ibc-rs/issues/987 // #![cfg_attr(not(test), deny(clippy::unwrap_used))] diff --git a/relayer/src/link/error.rs b/relayer/src/link/error.rs index 67b11869bc..bf2f0803b2 100644 --- a/relayer/src/link/error.rs +++ b/relayer/src/link/error.rs @@ -9,7 +9,7 @@ use crate::connection::ConnectionError; use crate::error::Error; use crate::foreign_client::{ForeignClientError, HasExpiredOrFrozenError}; use crate::supervisor::Error as SupervisorError; -use crate::transfer::PacketError; +use crate::transfer::TransferError; define_error! { LinkError { @@ -64,7 +64,7 @@ define_error! { |_| { "failed during a client operation" }, Packet - [ PacketError ] + [ TransferError ] |_| { "packet error" }, OldPacketClearingFailed diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 0561d3ceae..519fcd46d8 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -42,7 +42,7 @@ use crate::chain::counterparty::{ }; use crate::chain::handle::ChainHandle; use crate::chain::tx::TrackedMsgs; -use crate::chain::StatusResponse; +use crate::chain::ChainStatus; use crate::channel::error::ChannelError; use crate::channel::Channel; use crate::event::monitor::EventBatch; @@ -1317,7 +1317,7 @@ impl RelayPath { fn build_timeout_from_send_packet_event( &self, event: &SendPacket, - dst_info: &StatusResponse, + dst_info: &ChainStatus, ) -> Result, LinkError> { let packet = event.packet.clone(); if self @@ -1335,7 +1335,7 @@ impl RelayPath { fn build_recv_or_timeout_from_send_packet_event( &self, event: &SendPacket, - dst_info: &StatusResponse, + dst_info: &ChainStatus, ) -> Result<(Option, Option), LinkError> { let timeout = self.build_timeout_from_send_packet_event(event, dst_info)?; if timeout.is_some() { diff --git a/relayer/src/transfer.rs b/relayer/src/transfer.rs index f6e928431a..e9d950e1e5 100644 --- a/relayer/src/transfer.rs +++ b/relayer/src/transfer.rs @@ -13,11 +13,12 @@ use uint::FromStrRadixErr; use crate::chain::handle::ChainHandle; use crate::chain::tx::TrackedMsgs; +use crate::chain::ChainStatus; use crate::error::Error; use crate::util::bigint::U256; define_error! { - PacketError { + TransferError { Relayer [ Error ] |_| { "relayer error" }, @@ -51,6 +52,9 @@ define_error! { format!("internal error, expected IBCEvent::ChainError, got {:?}", e.event) }, + + ZeroTimeout + | _ | { "packet timeout height and packet timeout timestamp cannot both be 0" }, } } @@ -71,6 +75,48 @@ impl FromStr for Amount { } } +#[derive(Copy, Clone)] +pub struct TransferTimeout { + pub timeout_height: Height, + pub timeout_timestamp: Timestamp, +} + +impl TransferTimeout { + /** + Construct the transfer timeout parameters from the given timeout + height offset, timeout duration, and the latest chain status + containing the latest time of the destination chain. + + The height offset and duration are optional, with zero indicating + that the packet do not get expired at the given height or time. + If both height offset and duration are zero, then the packet will + never expire. + */ + pub fn new( + timeout_height_offset: u64, + timeout_duration: Duration, + destination_chain_status: &ChainStatus, + ) -> Result { + let timeout_height = if timeout_height_offset == 0 { + Height::zero() + } else { + destination_chain_status.height.add(timeout_height_offset) + }; + + let timeout_timestamp = if timeout_duration == Duration::ZERO { + Timestamp::none() + } else { + (destination_chain_status.timestamp + timeout_duration) + .map_err(TransferError::timestamp_overflow)? + }; + + Ok(TransferTimeout { + timeout_height, + timeout_timestamp, + }) + } +} + #[derive(Clone, Debug)] pub struct TransferOptions { pub packet_src_port_id: PortId, @@ -79,7 +125,7 @@ pub struct TransferOptions { pub denom: String, pub receiver: Option, pub timeout_height_offset: u64, - pub timeout_seconds: Duration, + pub timeout_duration: Duration, pub number_msgs: usize, } @@ -87,28 +133,23 @@ pub fn build_and_send_transfer_messages Result, PacketError> { +) -> Result, TransferError> { let receiver = match &opts.receiver { - None => packet_dst_chain.get_signer().map_err(PacketError::key)?, + None => packet_dst_chain.get_signer().map_err(TransferError::key)?, Some(r) => r.clone().into(), }; - let sender = packet_src_chain.get_signer().map_err(PacketError::key)?; + let sender = packet_src_chain.get_signer().map_err(TransferError::key)?; - let timeout_timestamp = if opts.timeout_seconds == Duration::from_secs(0) { - Timestamp::none() - } else { - (Timestamp::now() + opts.timeout_seconds).map_err(PacketError::timestamp_overflow)? - }; + let chain_status = packet_dst_chain + .query_status() + .map_err(TransferError::relayer)?; - let timeout_height = if opts.timeout_height_offset == 0 { - Height::zero() - } else { - packet_dst_chain - .query_latest_height() - .map_err(PacketError::relayer)? - .add(opts.timeout_height_offset) - }; + let timeout = TransferTimeout::new( + opts.timeout_height_offset, + opts.timeout_duration, + &chain_status, + )?; let msg = MsgTransfer { source_port: opts.packet_src_port_id.clone(), @@ -119,8 +160,8 @@ pub fn build_and_send_transfer_messages Ok(events), Some(err) => { if let IbcEvent::ChainError(err) = err { - Err(PacketError::tx_response(err.clone())) + Err(TransferError::tx_response(err.clone())) } else { panic!( "internal error, expected IBCEvent::ChainError, got {:?}", diff --git a/tools/test-framework/src/error.rs b/tools/test-framework/src/error.rs index 2683989201..d8d8db24c3 100644 --- a/tools/test-framework/src/error.rs +++ b/tools/test-framework/src/error.rs @@ -7,7 +7,7 @@ use ibc_relayer::channel::error::ChannelError; use ibc_relayer::connection::ConnectionError; use ibc_relayer::error::Error as RelayerError; use ibc_relayer::supervisor::error::Error as SupervisorError; -use ibc_relayer::transfer::PacketError; +use ibc_relayer::transfer::TransferError; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; define_error! { @@ -45,9 +45,9 @@ define_error! { [ ConnectionError ] | _ | { "connection error"}, - Packet - [ PacketError ] - | _ | { "packet error"}, + Transfer + [ TransferError ] + | _ | { "transfer error"}, Retry { @@ -60,7 +60,7 @@ define_error! { e.attempts, e.task_name ) - } + }, } } @@ -111,8 +111,8 @@ impl From for Error { } } -impl From for Error { - fn from(e: PacketError) -> Self { - Error::packet(e) +impl From for Error { + fn from(e: TransferError) -> Self { + Error::transfer(e) } } diff --git a/tools/test-framework/src/relayer/chain.rs b/tools/test-framework/src/relayer/chain.rs index bca4a5bf34..3d2c00be6b 100644 --- a/tools/test-framework/src/relayer/chain.rs +++ b/tools/test-framework/src/relayer/chain.rs @@ -57,7 +57,7 @@ use ibc_proto::ibc::core::connection::v1::QueryConnectionsRequest; use ibc_relayer::chain::client::ClientSettings; use ibc_relayer::chain::handle::{ChainHandle, ChainRequest, Subscription}; use ibc_relayer::chain::tx::TrackedMsgs; -use ibc_relayer::chain::{HealthCheck, StatusResponse}; +use ibc_relayer::chain::{ChainStatus, HealthCheck}; use ibc_relayer::config::ChainConfig; use ibc_relayer::error::Error; use ibc_relayer::{connection::ConnectionMsgType, keyring::KeyEntry}; @@ -128,7 +128,7 @@ where self.value().ibc_version() } - fn query_status(&self) -> Result { + fn query_status(&self) -> Result { self.value().query_status() } diff --git a/tools/test-framework/src/relayer/transfer.rs b/tools/test-framework/src/relayer/transfer.rs index cd0575a76e..81643bd4da 100644 --- a/tools/test-framework/src/relayer/transfer.rs +++ b/tools/test-framework/src/relayer/transfer.rs @@ -46,7 +46,7 @@ pub fn tx_raw_ft_transfer( denom: &MonoTagged, amount: u64, timeout_height_offset: u64, - timeout_seconds: Duration, + timeout_duration: Duration, number_messages: usize, ) -> Result, Error> { let transfer_options = TransferOptions { @@ -56,7 +56,7 @@ pub fn tx_raw_ft_transfer( denom: denom.value().to_string(), receiver: Some(recipient.value().0.clone()), timeout_height_offset, - timeout_seconds, + timeout_duration, number_msgs: number_messages, };