Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relayer balance metrics #2976

Merged
merged 24 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
30323f1
feat: scaffolding for generic balance fetcher and cosmos impl
daniel-savu Nov 27, 2023
42d887e
feat: add agent metrics fetchers
daniel-savu Nov 28, 2023
a400dc8
fix: cosmos balance denom usage
daniel-savu Nov 28, 2023
42b45ad
Merge branch 'main' into dan/relayer-balance-metrics
daniel-savu Nov 28, 2023
1b59e8d
feat: verify relayer balance in cosmos invariants
daniel-savu Nov 29, 2023
7d39a38
fix: cleanup
daniel-savu Nov 29, 2023
a696f69
Merge branch 'main' into dan/relayer-balance-metrics
daniel-savu Nov 29, 2023
2b949f4
fix: clippy
daniel-savu Nov 29, 2023
e5a41f4
Merge branch 'main' into dan/relayer-balance-metrics
daniel-savu Nov 29, 2023
d091ced
fix: wait for longer before querying metrics in cosmos e2e
daniel-savu Nov 29, 2023
4bafa76
chore: use a paranoid sleep in cosmos e2e because wasm init may be slow
daniel-savu Nov 30, 2023
bbecadb
fix(cosmos): agent reorg period
daniel-savu Nov 30, 2023
67d22f8
chore(cosmos): re-enable e2e invariants
daniel-savu Nov 30, 2023
dd0e1a3
fix: upgrade osmosis version for non-mac OSs
daniel-savu Nov 30, 2023
2ff104b
Merge branch 'main' into dan/relayer-balance-metrics
daniel-savu Nov 30, 2023
b1ccae1
feat: query evm balance, clean up
daniel-savu Dec 1, 2023
cc8578a
fix: evm balance metrics querying
daniel-savu Dec 4, 2023
e160a20
fix: seaorm connection timeout hiccups
daniel-savu Dec 4, 2023
fd808be
Merge branch 'main' into dan/relayer-balance-metrics
daniel-savu Dec 4, 2023
c555e6a
cleanup
daniel-savu Dec 4, 2023
fb614f8
outdated comment
daniel-savu Dec 4, 2023
5de6a05
fix: review comments
daniel-savu Dec 4, 2023
b99e518
Merge branch 'main' into dan/relayer-balance-metrics
daniel-savu Dec 5, 2023
2b4ae36
fix: cosmos metrics balance
daniel-savu Dec 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 50 additions & 14 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ use derive_more::AsRef;
use eyre::Result;
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::{
self,
agent::{AgentMetrics, Metrics, MetricsFetcher},
},
run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync, WatermarkContractSync,
};
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256,
metrics::agent::METRICS_SCRAPE_INTERVAL, HyperlaneDomain, HyperlaneMessage,
InterchainGasPayment, MerkleTreeInsertion, U256,
};
use tokio::{
sync::{
Expand Down Expand Up @@ -92,11 +97,15 @@ impl BaseAgent for Relayer {

type Settings = RelayerSettings;

async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
core_metrics: Arc<CoreMetrics>,
agent_metrics: Metrics,
) -> Result<(Self, Vec<MetricsFetcher>)>
where
Self: Sized,
{
let core = settings.build_hyperlane_core(metrics.clone());
let core = settings.build_hyperlane_core(core_metrics.clone());
let db = DB::from_path(&settings.db)?;
let dbs = settings
.origin_chains
Expand All @@ -105,18 +114,18 @@ impl BaseAgent for Relayer {
.collect::<HashMap<_, _>>();

let mailboxes = settings
.build_mailboxes(settings.destination_chains.iter(), &metrics)
.build_mailboxes(settings.destination_chains.iter(), &core_metrics)
.await?;
let validator_announces = settings
.build_validator_announces(settings.origin_chains.iter(), &metrics)
.build_validator_announces(settings.origin_chains.iter(), &core_metrics)
.await?;

let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&metrics));
let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&core_metrics));

let message_syncs = settings
.build_message_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand All @@ -126,7 +135,7 @@ impl BaseAgent for Relayer {
let interchain_gas_payment_syncs = settings
.build_interchain_gas_payment_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand All @@ -136,7 +145,7 @@ impl BaseAgent for Relayer {
let merkle_tree_hook_syncs = settings
.build_merkle_tree_hook_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
Expand Down Expand Up @@ -188,8 +197,28 @@ impl BaseAgent for Relayer {
.collect();

let mut msg_ctxs = HashMap::new();
let mut metrics_fetchers = vec![];
// let mut custom_metrics = HashMap::new();
for destination in &settings.destination_chains {
let destination_chain_setup = core.settings.chain_setup(destination).unwrap().clone();
let agent_metrics_conf = destination_chain_setup
.agent_metrics_conf("relayer".to_owned())
.await?;
// PrometheusAgent
let metrics_fetcher = destination_chain_setup
.build_agent_metrics_fetcher()
.await?;
let agent_metrics =
AgentMetrics::new(agent_metrics.clone(), agent_metrics_conf, metrics_fetcher);

let fetcher_task = tokio::spawn(async move {
agent_metrics
.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("AgentMetricsFetcher"));
metrics_fetchers.push(fetcher_task);

let transaction_gas_limit: Option<U256> =
if skip_transaction_gas_limit_for.contains(&destination.id()) {
Expand Down Expand Up @@ -221,13 +250,13 @@ impl BaseAgent for Relayer {
metadata_builder,
origin_gas_payment_enforcer: gas_payment_enforcers[origin].clone(),
transaction_gas_limit,
metrics: MessageSubmissionMetrics::new(&metrics, origin, destination),
metrics: MessageSubmissionMetrics::new(&core_metrics, origin, destination),
}),
);
}
}

Ok(Self {
let relayer = Self {
dbs,
origin_chains: settings.origin_chains,
destination_chains: settings.destination_chains,
Expand All @@ -242,12 +271,19 @@ impl BaseAgent for Relayer {
transaction_gas_limit,
skip_transaction_gas_limit_for,
allow_local_checkpoint_syncers: settings.allow_local_checkpoint_syncers,
})
};

Ok((relayer, metrics_fetchers))
}

#[allow(clippy::async_yields_async)]
async fn run(self) -> Instrumented<JoinHandle<Result<()>>> {
let mut tasks = vec![];
async fn run(
self,
metrics_fetchers: Vec<MetricsFetcher>,
) -> Instrumented<JoinHandle<Result<()>>> {
// The tasks vec is initially set to the metrics fetcher tasks,
// and is then extended with the rest of the tasks.
let mut tasks = metrics_fetchers;

// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
Expand Down
29 changes: 19 additions & 10 deletions rust/agents/scraper/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use derive_more::AsRef;
use hyperlane_base::{
run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics, CoreMetrics,
HyperlaneAgentCore,
metrics::agent::{Metrics as AgentMetrics, MetricsFetcher},
run_all,
settings::IndexSettings,
BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
};
use hyperlane_core::HyperlaneDomain;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -38,7 +40,8 @@ impl BaseAgent for Scraper {
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
) -> eyre::Result<Self>
agent_metrics: AgentMetrics,
) -> eyre::Result<(Self, Vec<MetricsFetcher>)>
where
Self: Sized,
{
Expand Down Expand Up @@ -73,16 +76,22 @@ impl BaseAgent for Scraper {

trace!(domain_count = scrapers.len(), "Created scrapers");

Ok(Self {
core,
metrics,
contract_sync_metrics,
scrapers,
})
Ok((
Self {
core,
metrics,
contract_sync_metrics,
scrapers,
},
Default::default(),
))
}

#[allow(clippy::async_yields_async)]
async fn run(self) -> Instrumented<JoinHandle<eyre::Result<()>>> {
async fn run(
self,
_metrics_fetchers: Vec<MetricsFetcher>,
) -> Instrumented<JoinHandle<eyre::Result<()>>> {
let mut tasks = Vec::with_capacity(self.scrapers.len());
for domain in self.scrapers.keys() {
tasks.push(self.scrape(*domain).await);
Expand Down
22 changes: 17 additions & 5 deletions rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument

use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::agent::{Metrics as AgentMetrics, MetricsFetcher},
run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync,
};
Expand Down Expand Up @@ -51,7 +52,11 @@ impl BaseAgent for Validator {

type Settings = ValidatorSettings;

async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
) -> Result<(Self, Vec<MetricsFetcher>)>
where
Self: Sized,
{
Expand Down Expand Up @@ -88,7 +93,7 @@ impl BaseAgent for Validator {
.await?
.into();

Ok(Self {
let validator = Self {
origin_chain: settings.origin_chain,
core,
db: msg_db,
Expand All @@ -101,12 +106,19 @@ impl BaseAgent for Validator {
reorg_period: settings.reorg_period,
interval: settings.interval,
checkpoint_syncer,
})
};

Ok((validator, Default::default()))
}

#[allow(clippy::async_yields_async)]
async fn run(mut self) -> Instrumented<JoinHandle<Result<()>>> {
let mut tasks = vec![];
async fn run(
mut self,
metrics_fetchers: Vec<MetricsFetcher>,
) -> Instrumented<JoinHandle<Result<()>>> {
// The tasks vec is initially set to the metrics fetcher tasks,
// and is then extended with the rest of the tasks.
let mut tasks = metrics_fetchers;

if let Some(signer_instance) = self.signer_instance.take() {
tasks.push(
Expand Down
13 changes: 9 additions & 4 deletions rust/chains/hyperlane-cosmos/src/aggregation_ism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tracing::instrument;
pub struct CosmosAggregationIsm {
domain: HyperlaneDomain,
address: H256,
provider: Box<WasmGrpcProvider>,
provider: Box<CosmosProvider>,
}

impl CosmosAggregationIsm {
Expand All @@ -28,7 +28,12 @@ impl CosmosAggregationIsm {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;

Ok(Self {
domain: locator.domain.clone(),
Expand All @@ -50,7 +55,7 @@ impl HyperlaneChain for CosmosAggregationIsm {
}

fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
Box::new(self.provider.clone())
}
}

Expand All @@ -63,7 +68,7 @@ impl AggregationIsm for CosmosAggregationIsm {
) -> ChainResult<(Vec<H256>, u8)> {
let payload = ModulesAndThresholdRequest::new(message);

let data = self.provider.wasm_query(payload, None).await?;
let data = self.provider.grpc().wasm_query(payload, None).await?;
let response: ModulesAndThresholdResponse = serde_json::from_slice(&data)?;

let modules: ChainResult<Vec<H256>> = response
Expand Down
11 changes: 9 additions & 2 deletions rust/chains/hyperlane-cosmos/src/interchain_gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
pub struct CosmosInterchainGasPaymaster {
domain: HyperlaneDomain,
address: H256,
provider: CosmosProvider,
}

impl HyperlaneContract for CosmosInterchainGasPaymaster {
Expand All @@ -36,7 +37,7 @@ impl HyperlaneChain for CosmosInterchainGasPaymaster {
}

fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
Box::new(self.provider.clone())
}
}

Expand All @@ -49,11 +50,17 @@ impl CosmosInterchainGasPaymaster {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;

Ok(Self {
domain: locator.domain.clone(),
address: locator.address,
provider,
})
}
}
Expand Down
15 changes: 10 additions & 5 deletions rust/chains/hyperlane-cosmos/src/interchain_security_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct CosmosInterchainSecurityModule {
/// The address of the ISM contract.
address: H256,
/// The provider for the ISM contract.
provider: Box<WasmGrpcProvider>,
provider: CosmosProvider,
}

/// The Cosmos Interchain Security Module Implementation.
Expand All @@ -33,13 +33,17 @@ impl CosmosInterchainSecurityModule {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider: WasmGrpcProvider =
WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;

Ok(Self {
domain: locator.domain.clone(),
address: locator.address,
provider: Box::new(provider),
provider,
})
}
}
Expand All @@ -56,7 +60,7 @@ impl HyperlaneChain for CosmosInterchainSecurityModule {
}

fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
Box::new(self.provider.clone())
}
}

Expand All @@ -71,6 +75,7 @@ impl InterchainSecurityModule for CosmosInterchainSecurityModule {

let data = self
.provider
.grpc()
.wasm_query(QueryIsmGeneralRequest { ism: query }, None)
.await?;

Expand Down
Loading
Loading