Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relayer balance metrics #2976

Merged
merged 24 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
30323f1
feat: scaffolding for generic balance fetcher and cosmos impl
daniel-savu Nov 27, 2023
42d887e
feat: add agent metrics fetchers
daniel-savu Nov 28, 2023
a400dc8
fix: cosmos balance denom usage
daniel-savu Nov 28, 2023
42b45ad
Merge branch 'main' into dan/relayer-balance-metrics
daniel-savu Nov 28, 2023
1b59e8d
feat: verify relayer balance in cosmos invariants
daniel-savu Nov 29, 2023
7d39a38
fix: cleanup
daniel-savu Nov 29, 2023
a696f69
Merge branch 'main' into dan/relayer-balance-metrics
daniel-savu Nov 29, 2023
2b949f4
fix: clippy
daniel-savu Nov 29, 2023
e5a41f4
Merge branch 'main' into dan/relayer-balance-metrics
daniel-savu Nov 29, 2023
d091ced
fix: wait for longer before querying metrics in cosmos e2e
daniel-savu Nov 29, 2023
4bafa76
chore: use a paranoid sleep in cosmos e2e because wasm init may be slow
daniel-savu Nov 30, 2023
bbecadb
fix(cosmos): agent reorg period
daniel-savu Nov 30, 2023
67d22f8
chore(cosmos): re-enable e2e invariants
daniel-savu Nov 30, 2023
dd0e1a3
fix: upgrade osmosis version for non-mac OSs
daniel-savu Nov 30, 2023
2ff104b
Merge branch 'main' into dan/relayer-balance-metrics
daniel-savu Nov 30, 2023
b1ccae1
feat: query evm balance, clean up
daniel-savu Dec 1, 2023
cc8578a
fix: evm balance metrics querying
daniel-savu Dec 4, 2023
e160a20
fix: seaorm connection timeout hiccups
daniel-savu Dec 4, 2023
fd808be
Merge branch 'main' into dan/relayer-balance-metrics
daniel-savu Dec 4, 2023
c555e6a
cleanup
daniel-savu Dec 4, 2023
fb614f8
outdated comment
daniel-savu Dec 4, 2023
5de6a05
fix: review comments
daniel-savu Dec 4, 2023
b99e518
Merge branch 'main' into dan/relayer-balance-metrics
daniel-savu Dec 5, 2023
2b4ae36
fix: cosmos metrics balance
daniel-savu Dec 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 53 additions & 20 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ use derive_more::AsRef;
use eyre::Result;
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync, WatermarkContractSync,
metrics::{AgentMetrics, AgentMetricsUpdater},
run_all,
settings::ChainConf,
BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, SequencedDataContractSync,
WatermarkContractSync,
};
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256,
metrics::agent::METRICS_SCRAPE_INTERVAL, HyperlaneDomain, HyperlaneMessage,
InterchainGasPayment, MerkleTreeInsertion, U256,
};
use tokio::{
sync::{
Expand Down Expand Up @@ -49,7 +53,7 @@ struct ContextKey {
#[derive(AsRef)]
pub struct Relayer {
origin_chains: HashSet<HyperlaneDomain>,
destination_chains: HashSet<HyperlaneDomain>,
destination_chains: HashMap<HyperlaneDomain, ChainConf>,
#[as_ref]
core: HyperlaneAgentCore,
message_syncs: HashMap<HyperlaneDomain, Arc<SequencedDataContractSync<HyperlaneMessage>>>,
Expand All @@ -67,6 +71,8 @@ pub struct Relayer {
transaction_gas_limit: Option<U256>,
skip_transaction_gas_limit_for: HashSet<u32>,
allow_local_checkpoint_syncers: bool,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
}

impl Debug for Relayer {
Expand All @@ -92,11 +98,15 @@ impl BaseAgent for Relayer {

type Settings = RelayerSettings;

async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
) -> Result<Self>
where
Self: Sized,
{
let core = settings.build_hyperlane_core(metrics.clone());
let core = settings.build_hyperlane_core(core_metrics.clone());
let db = DB::from_path(&settings.db)?;
let dbs = settings
.origin_chains
Expand All @@ -105,18 +115,18 @@ impl BaseAgent for Relayer {
.collect::<HashMap<_, _>>();

let mailboxes = settings
.build_mailboxes(settings.destination_chains.iter(), &metrics)
.build_mailboxes(settings.destination_chains.iter(), &core_metrics)
.await?;
let validator_announces = settings
.build_validator_announces(settings.origin_chains.iter(), &metrics)
.build_validator_announces(settings.origin_chains.iter(), &core_metrics)
.await?;

let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&metrics));
let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&core_metrics));

let message_syncs = settings
.build_message_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand All @@ -126,7 +136,7 @@ impl BaseAgent for Relayer {
let interchain_gas_payment_syncs = settings
.build_interchain_gas_payment_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand All @@ -136,7 +146,7 @@ impl BaseAgent for Relayer {
let merkle_tree_hook_syncs = settings
.build_merkle_tree_hook_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand Down Expand Up @@ -188,9 +198,10 @@ impl BaseAgent for Relayer {
.collect();

let mut msg_ctxs = HashMap::new();
let mut destination_chains = HashMap::new();
for destination in &settings.destination_chains {
let destination_chain_setup = core.settings.chain_setup(destination).unwrap().clone();

destination_chains.insert(destination.clone(), destination_chain_setup.clone());
let transaction_gas_limit: Option<U256> =
if skip_transaction_gas_limit_for.contains(&destination.id()) {
None
Expand Down Expand Up @@ -221,7 +232,7 @@ impl BaseAgent for Relayer {
metadata_builder,
origin_gas_payment_enforcer: gas_payment_enforcers[origin].clone(),
transaction_gas_limit,
metrics: MessageSubmissionMetrics::new(&metrics, origin, destination),
metrics: MessageSubmissionMetrics::new(&core_metrics, origin, destination),
}),
);
}
Expand All @@ -230,7 +241,7 @@ impl BaseAgent for Relayer {
Ok(Self {
dbs,
origin_chains: settings.origin_chains,
destination_chains: settings.destination_chains,
destination_chains,
msg_ctxs,
core,
message_syncs,
Expand All @@ -242,6 +253,8 @@ impl BaseAgent for Relayer {
transaction_gas_limit,
skip_transaction_gas_limit_for,
allow_local_checkpoint_syncers: settings.allow_local_checkpoint_syncers,
core_metrics,
agent_metrics,
})
}

Expand All @@ -251,12 +264,32 @@ impl BaseAgent for Relayer {

// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
for destination in &self.destination_chains {
for (dest_domain, dest_conf) in &self.destination_chains {
let (send_channel, receive_channel) =
mpsc::unbounded_channel::<Box<DynPendingOperation>>();
send_channels.insert(destination.id(), send_channel);
send_channels.insert(dest_domain.id(), send_channel);

tasks.push(self.run_destination_submitter(dest_domain, receive_channel));

tasks.push(self.run_destination_submitter(destination, receive_channel));
let agent_metrics_conf = dest_conf
.agent_metrics_conf(Self::AGENT_NAME.to_string())
.await
.unwrap();
let agent_metrics_fetcher = dest_conf.build_provider(&self.core_metrics).await.unwrap();
let agent_metrics = AgentMetricsUpdater::new(
self.agent_metrics.clone(),
agent_metrics_conf,
agent_metrics_fetcher,
);

let fetcher_task = tokio::spawn(async move {
agent_metrics
.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("AgentMetrics"));
tasks.push(fetcher_task);
}

for origin in &self.origin_chains {
Expand Down Expand Up @@ -330,11 +363,11 @@ impl Relayer {
let metrics = MessageProcessorMetrics::new(
&self.core.metrics,
origin,
self.destination_chains.iter(),
self.destination_chains.keys(),
);
let destination_ctxs = self
.destination_chains
.iter()
.keys()
.filter(|&destination| destination != origin)
.map(|destination| {
(
Expand Down
8 changes: 6 additions & 2 deletions rust/agents/scraper/migration/bin/common.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::env;
use std::{env, time::Duration};

use migration::sea_orm::{Database, DatabaseConnection};
pub use migration::{DbErr, Migrator, MigratorTrait as _};
use sea_orm::ConnectOptions;

const LOCAL_DATABASE_URL: &str = "postgresql://postgres:47221c18c610@localhost:5432/postgres";
const CONNECT_TIMEOUT: u64 = 20;

pub fn url() -> String {
env::var("DATABASE_URL").unwrap_or_else(|_| LOCAL_DATABASE_URL.into())
Expand All @@ -16,6 +18,8 @@ pub async fn init() -> Result<DatabaseConnection, DbErr> {
.init();

let url = url();
let mut options: ConnectOptions = url.clone().into();
options.connect_timeout(Duration::from_secs(CONNECT_TIMEOUT));
println!("Connecting to {url}");
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
Database::connect(url).await
Database::connect(options).await
}
5 changes: 3 additions & 2 deletions rust/agents/scraper/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use derive_more::AsRef;
use hyperlane_base::{
run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics, CoreMetrics,
HyperlaneAgentCore,
metrics::AgentMetrics, run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics,
CoreMetrics, HyperlaneAgentCore,
};
use hyperlane_core::HyperlaneDomain;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -38,6 +38,7 @@ impl BaseAgent for Scraper {
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
) -> eyre::Result<Self>
where
Self: Sized,
Expand Down
7 changes: 6 additions & 1 deletion rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument

use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::AgentMetrics,
run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync,
};
Expand Down Expand Up @@ -51,7 +52,11 @@ impl BaseAgent for Validator {

type Settings = ValidatorSettings;

async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
) -> Result<Self>
where
Self: Sized,
{
Expand Down
15 changes: 10 additions & 5 deletions rust/chains/hyperlane-cosmos/src/aggregation_ism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;

use crate::{
address::CosmosAddress,
grpc::{WasmGrpcProvider, WasmProvider},
grpc::WasmProvider,
payloads::aggregate_ism::{ModulesAndThresholdRequest, ModulesAndThresholdResponse},
ConnectionConf, CosmosProvider, Signer,
};
Expand All @@ -18,7 +18,7 @@ use tracing::instrument;
pub struct CosmosAggregationIsm {
domain: HyperlaneDomain,
address: H256,
provider: Box<WasmGrpcProvider>,
provider: Box<CosmosProvider>,
}

impl CosmosAggregationIsm {
Expand All @@ -28,7 +28,12 @@ impl CosmosAggregationIsm {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;

Ok(Self {
domain: locator.domain.clone(),
Expand All @@ -50,7 +55,7 @@ impl HyperlaneChain for CosmosAggregationIsm {
}

fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
self.provider.clone()
}
}

Expand All @@ -63,7 +68,7 @@ impl AggregationIsm for CosmosAggregationIsm {
) -> ChainResult<(Vec<H256>, u8)> {
let payload = ModulesAndThresholdRequest::new(message);

let data = self.provider.wasm_query(payload, None).await?;
let data = self.provider.grpc().wasm_query(payload, None).await?;
let response: ModulesAndThresholdResponse = serde_json::from_slice(&data)?;

let modules: ChainResult<Vec<H256>> = response
Expand Down
12 changes: 9 additions & 3 deletions rust/chains/hyperlane-cosmos/src/interchain_gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use once_cell::sync::Lazy;
use std::ops::RangeInclusive;

use crate::{
grpc::WasmGrpcProvider,
rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer},
signers::Signer,
utils::{CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64},
Expand All @@ -22,6 +21,7 @@ use crate::{
pub struct CosmosInterchainGasPaymaster {
domain: HyperlaneDomain,
address: H256,
provider: CosmosProvider,
}

impl HyperlaneContract for CosmosInterchainGasPaymaster {
Expand All @@ -36,7 +36,7 @@ impl HyperlaneChain for CosmosInterchainGasPaymaster {
}

fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
Box::new(self.provider.clone())
}
}

Expand All @@ -49,11 +49,17 @@ impl CosmosInterchainGasPaymaster {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;

Ok(Self {
domain: locator.domain.clone(),
address: locator.address,
provider,
})
}
}
Expand Down
Loading
Loading