Skip to content

Commit

Permalink
Structured logs for relayer logic (#1491)
Browse files Browse the repository at this point in the history
* Printing tx hashes from SendPacket events.

This was inspired from commit b63335b
(see rpc.rs therein).

* log the tx hashes in ibc_channel event SendPacket

* Redo displaying for `OperationalData`

Add `OperationalInfo` that can hold the displayable data on the batch,
either borrowed or owned with transforming from first to the other.
Implement `Display` on the `OperationalInfo` instead of `OperationalData`
for clarity.

* Improve logging of operational data

Use the `odata` key in tracing.

* Use a tracing span for task log messages

The task lifetime is better tracked with a tracing span,
which also reduces code repetition in tracking macros by putting in
the task name once.

spawn_background_task receives a Span constructed by the caller.
This allows embedding contextual information for the task, which is
used to reduce repetition in logging macros for the workers.

* Erase Display impl on RelayPath, use spans instead

With spans injecting all of the information that was formatted
using the RelayPath Display impl, this is redundant.

* Erase [rest] prefixes from log messages

We have the span now, these are redundant and bad style.

Co-authored-by: Mikhail Zabaluev <mikhail@informal.systems>
  • Loading branch information
adizere and mzabaluev authored Jan 17, 2022
1 parent a193c66 commit 2757031
Show file tree
Hide file tree
Showing 32 changed files with 631 additions and 428 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- More structural logging in relayer, using tracing spans and key-value pairs.
([#1491](https://github.com/informalsystems/ibc-rs/pull/1491))
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ websocket_addr = 'ws://127.0.0.1:26657/websocket'

# Specify the maximum amount of time (duration) that the RPC requests should
# take before timing out. Default: 10s (10 seconds)
# Note: Hermes uses this parameter _only_ in `start` mode; for all other CLIs,
# Hermes uses a large preconfigured timeout (on the order of minutes).
rpc_timeout = '10s'

# Specify the prefix used by the chain. Required
Expand Down
10 changes: 9 additions & 1 deletion relayer-cli/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! See the `impl Configurable` below for how to specify the path to the
//! application's configuration file.

use core::time::Duration;
use std::path::PathBuf;

use abscissa_core::{config::Override, Clap, Command, Configurable, FrameworkError, Runnable};
Expand Down Expand Up @@ -138,6 +139,14 @@ impl Configurable<Config> for CliCmd {
ccfg.memo_prefix.apply_suffix(&suffix);
}

// For all commands except for `start` Hermes retries
// for a prolonged period of time.
if !matches!(self, CliCmd::Start(_)) {
for c in config.chains.iter_mut() {
c.rpc_timeout = Duration::from_secs(120);
}
}

match self {
CliCmd::Tx(cmd) => cmd.override_config(config),
// CliCmd::Help(cmd) => cmd.override_config(config),
Expand All @@ -146,7 +155,6 @@ impl Configurable<Config> for CliCmd {
// CliCmd::Update(cmd) => cmd.override_config(config),
// CliCmd::Upgrade(cmd) => cmd.override_config(config),
// CliCmd::Start(cmd) => cmd.override_config(config),
// CliCmd::StartMulti(cmd) => cmd.override_config(config),
// CliCmd::Query(cmd) => cmd.override_config(config),
// CliCmd::Listen(cmd) => cmd.override_config(config),
// CliCmd::Misbehaviour(cmd) => cmd.override_config(config),
Expand Down
1 change: 1 addition & 0 deletions relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ anyhow = "1.0"
semver = "1.0"
uint = "0.9"
humantime = "2.1.0"
nanoid = "0.4.0"

[dependencies.num-bigint]
version = "0.4"
Expand Down
8 changes: 5 additions & 3 deletions relayer/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use alloc::sync::Arc;
use prost_types::Any;
use tendermint::block::Height;
use tokio::runtime::Runtime as TokioRuntime;

Expand Down Expand Up @@ -46,10 +45,13 @@ use crate::keyring::{KeyEntry, KeyRing};
use crate::light_client::LightClient;
use crate::{config::ChainConfig, event::monitor::EventReceiver};

use self::tx::TrackedMsgs;

pub mod cosmos;
pub mod counterparty;
pub mod handle;
pub mod runtime;
pub mod tx;

#[cfg(test)]
pub mod mock;
Expand Down Expand Up @@ -125,14 +127,14 @@ pub trait ChainEndpoint: Sized {
// synchronously wait for it to be committed.
fn send_messages_and_wait_commit(
&mut self,
proto_msgs: Vec<Any>,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<IbcEvent>, Error>;

/// Sends one or more transactions with `msgs` to chain.
/// Non-blocking alternative to `send_messages_and_wait_commit` interface.
fn send_messages_and_wait_check_tx(
&mut self,
proto_msgs: Vec<Any>,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<TxResponse>, Error>;

fn get_signer(&mut self) -> Result<Signer, Error>;
Expand Down
67 changes: 30 additions & 37 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tendermint_rpc::{
};
use tokio::runtime::Runtime as TokioRuntime;
use tonic::codegen::http::Uri;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, span, trace, warn, Level};

use ibc::clients::ics07_tendermint::client_state::{AllowUpdate, ClientState};
use ibc::clients::ics07_tendermint::consensus_state::ConsensusState as TMConsensusState;
Expand Down Expand Up @@ -94,7 +94,7 @@ use crate::{
sdk_error::sdk_error_from_tx_sync_error_code,
};

use super::{ChainEndpoint, HealthCheck};
use super::{tx::TrackedMsgs, ChainEndpoint, HealthCheck};
use ibc::core::ics24_host::path::{
AcksPath, ChannelEndsPath, ClientConsensusStatePath, ClientStatePath, CommitmentsPath,
ConnectionsPath, ReceiptsPath, SeqRecvsPath,
Expand Down Expand Up @@ -303,20 +303,15 @@ impl CosmosSdkChain {
account_seq: u64,
) -> Result<Response, Error> {
debug!(
"[{}] send_tx: sending {} messages using account sequence {}",
self.id(),
"sending {} messages using account sequence {}",
proto_msgs.len(),
account_seq,
);

let signer_info = self.signer(account_seq)?;
let max_fee = self.max_fee();

debug!(
"[{}] send_tx: max fee, for use in tx simulation: {}",
self.id(),
PrettyFee(&max_fee)
);
debug!("max fee, for use in tx simulation: {}", PrettyFee(&max_fee));

let (body, body_buf) = tx_body_and_bytes(proto_msgs, self.tx_memo())?;

Expand Down Expand Up @@ -345,8 +340,7 @@ impl CosmosSdkChain {
let adjusted_fee = self.fee_with_gas(estimated_gas);

debug!(
"[{}] send_tx: using {} gas, fee {}",
self.id(),
"using {} gas, fee {}",
estimated_gas,
PrettyFee(&adjusted_fee)
);
Expand Down Expand Up @@ -404,7 +398,7 @@ impl CosmosSdkChain {
// and refresh the s.n., to allow proceeding to the other transactions. A separate
// retry at the worker-level will handle retrying.
Err(e) if mismatching_account_sequence_number(&e) => {
warn!("send_tx failed at estimate_gas step mismatching account sequence: dropping the tx & refreshing account sequence number");
warn!("failed at estimate_gas step mismatching account sequence: dropping the tx & refreshing account sequence number");
self.refresh_account()?;
// Note: propagating error here can lead to bug & dropped packets:
// https://github.com/informalsystems/ibc-rs/issues/1153
Expand All @@ -416,7 +410,7 @@ impl CosmosSdkChain {
Ok(response) if response.code == Code::Err(INCORRECT_ACCOUNT_SEQUENCE_ERR) => {
if retry_counter < retry_strategy::MAX_ACCOUNT_SEQUENCE_RETRY {
let retry_counter = retry_counter + 1;
warn!("send_tx failed at broadcast step with incorrect account sequence. retrying ({}/{})",
warn!("failed at broadcast step with incorrect account sequence. retrying ({}/{})",
retry_counter, retry_strategy::MAX_ACCOUNT_SEQUENCE_RETRY);
// Backoff & re-fetch the account s.n.
let backoff = (retry_counter as u64)
Expand All @@ -431,7 +425,7 @@ impl CosmosSdkChain {
// we ignore the error and return the original response to downstream.
// We do not return an error here, because the current convention
// let the caller handle error responses separately.
error!("failed to send_tx due to account sequence errors. the relayer wallet may be used elsewhere concurrently.");
error!("failed due to account sequence errors. the relayer wallet may be used elsewhere concurrently.");
Ok(response)
}
}
Expand All @@ -442,7 +436,7 @@ impl CosmosSdkChain {
// Complete success.
match response.code {
tendermint::abci::Code::Ok => {
debug!("[{}] send_tx: broadcast_tx_sync: {:?}", self.id(), response);
debug!("broadcast_tx_sync: {:?}", response);

self.incr_account_sequence();
Ok(response)
Expand All @@ -452,8 +446,7 @@ impl CosmosSdkChain {
// Avoid increasing the account s.n. if CheckTx failed
// Log the error
error!(
"[{}] send_tx: broadcast_tx_sync: {:?}: diagnostic: {:?}",
self.id(),
"broadcast_tx_sync: {:?}: diagnostic: {:?}",
response,
sdk_error_from_tx_sync_error_code(code)
);
Expand All @@ -470,6 +463,8 @@ impl CosmosSdkChain {

fn send_tx(&mut self, proto_msgs: Vec<Any>) -> Result<Response, Error> {
crate::time!("send_tx");
let _span = span!(Level::ERROR, "send_tx", id = %self.id()).entered();

self.send_tx_with_account_sequence_retry(proto_msgs, 0)
}

Expand All @@ -483,12 +478,12 @@ impl CosmosSdkChain {
/// In this case we use the `default_gas` param.
fn estimate_gas(&mut self, tx: Tx) -> Result<u64, Error> {
let simulated_gas = self.send_tx_simulate(tx).map(|sr| sr.gas_info);
let _span = span!(Level::ERROR, "estimate_gas").entered();

match simulated_gas {
Ok(Some(gas_info)) => {
debug!(
"[{}] estimate_gas: tx simulation successful, gas amount used: {:?}",
self.id(),
"tx simulation successful, gas amount used: {:?}",
gas_info.gas_used
);

Expand All @@ -497,8 +492,7 @@ impl CosmosSdkChain {

Ok(None) => {
warn!(
"[{}] estimate_gas: tx simulation successful but no gas amount used was returned, falling back on default gas: {}",
self.id(),
"tx simulation successful but no gas amount used was returned, falling back on default gas: {}",
self.default_gas()
);

Expand All @@ -510,8 +504,7 @@ impl CosmosSdkChain {
// See `can_recover_from_simulation_failure` for more info.
Err(e) if can_recover_from_simulation_failure(&e) => {
warn!(
"[{}] estimate_gas: failed to simulate tx, falling back on default gas because the error is potentially recoverable: {}",
self.id(),
"failed to simulate tx, falling back on default gas because the error is potentially recoverable: {}",
e.detail()
);

Expand All @@ -520,8 +513,7 @@ impl CosmosSdkChain {

Err(e) => {
error!(
"[{}] estimate_gas: failed to simulate tx. propagating error to caller: {}",
self.id(),
"failed to simulate tx. propagating error to caller: {}",
e.detail()
);
// Propagate the error, the retrying mechanism at caller may catch & retry.
Expand Down Expand Up @@ -705,8 +697,7 @@ impl CosmosSdkChain {
info!(
sequence = %account.sequence,
number = %account.account_number,
"[{}] refresh: retrieved account",
self.id()
"refresh: retrieved account",
);

self.account = Some(account);
Expand Down Expand Up @@ -1059,13 +1050,14 @@ impl ChainEndpoint for CosmosSdkChain {
/// msgs in a Tx until any of the max size, max num msgs, max fee are exceeded.
fn send_messages_and_wait_commit(
&mut self,
proto_msgs: Vec<Any>,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<IbcEvent>, Error> {
crate::time!("send_messages_and_wait_commit");
debug!(
"send_messages_and_wait_commit with {} messages",
proto_msgs.len()
);

let _span =
span!(Level::DEBUG, "send_tx_commit", id = %tracked_msgs.tracking_id()).entered();

let proto_msgs = tracked_msgs.messages();

if proto_msgs.is_empty() {
return Ok(vec![]);
Expand Down Expand Up @@ -1116,13 +1108,14 @@ impl ChainEndpoint for CosmosSdkChain {

fn send_messages_and_wait_check_tx(
&mut self,
proto_msgs: Vec<Any>,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<Response>, Error> {
crate::time!("send_messages_and_wait_check_tx");
debug!(
"send_messages_and_wait_check_tx with {} messages",
proto_msgs.len()
);

let span = span!(Level::DEBUG, "send_tx_check", id = %tracked_msgs.tracking_id());
let _enter = span.enter();

let proto_msgs = tracked_msgs.messages();

if proto_msgs.is_empty() {
return Ok(vec![]);
Expand Down
10 changes: 5 additions & 5 deletions relayer/src/chain/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::{
keyring::KeyEntry,
};

use super::HealthCheck;
use super::{tx::TrackedMsgs, HealthCheck};

mod prod;
pub mod requests;
Expand Down Expand Up @@ -111,12 +111,12 @@ pub enum ChainRequest {
},

SendMessagesAndWaitCommit {
proto_msgs: Vec<prost_types::Any>,
tracked_msgs: TrackedMsgs,
reply_to: ReplyTo<Vec<IbcEvent>>,
},

SendMessagesAndWaitCheckTx {
proto_msgs: Vec<prost_types::Any>,
tracked_msgs: TrackedMsgs,
reply_to: ReplyTo<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>>,
},

Expand Down Expand Up @@ -350,7 +350,7 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug + 'static {
/// and return the list of events emitted by the chain after the transaction was committed.
fn send_messages_and_wait_commit(
&self,
proto_msgs: Vec<prost_types::Any>,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<IbcEvent>, Error>;

/// Submit messages asynchronously.
Expand All @@ -359,7 +359,7 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug + 'static {
/// returns a set of transaction hashes.
fn send_messages_and_wait_check_tx(
&self,
proto_msgs: Vec<prost_types::Any>,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>, Error>;

fn get_signer(&self) -> Result<Signer, Error>;
Expand Down
9 changes: 5 additions & 4 deletions relayer/src/chain/handle/prod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use ibc_proto::ibc::core::commitment::v1::MerkleProof;
use ibc_proto::ibc::core::connection::v1::QueryClientConnectionsRequest;
use ibc_proto::ibc::core::connection::v1::QueryConnectionsRequest;

use crate::chain::tx::TrackedMsgs;
use crate::{
chain::handle::requests::AppVersion, chain::StatusResponse, config::ChainConfig,
connection::ConnectionMsgType, error::Error, keyring::KeyEntry,
Expand Down Expand Up @@ -98,20 +99,20 @@ impl ChainHandle for ProdChainHandle {

fn send_messages_and_wait_commit(
&self,
proto_msgs: Vec<prost_types::Any>,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<IbcEvent>, Error> {
self.send(|reply_to| ChainRequest::SendMessagesAndWaitCommit {
proto_msgs,
tracked_msgs,
reply_to,
})
}

fn send_messages_and_wait_check_tx(
&self,
proto_msgs: Vec<prost_types::Any>,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<tendermint_rpc::endpoint::broadcast::tx_sync::Response>, Error> {
self.send(|reply_to| ChainRequest::SendMessagesAndWaitCheckTx {
proto_msgs,
tracked_msgs,
reply_to,
})
}
Expand Down
Loading

0 comments on commit 2757031

Please sign in to comment.