Skip to content

Commit

Permalink
feat: implement deployment cache to reduce IPFS request load
Browse files Browse the repository at this point in the history
Signed-off-by: Tomás Migone <tomas@edgeandnode.com>
  • Loading branch information
tmigone committed Jul 26, 2024
1 parent 61f6aa6 commit 29fff77
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 14 deletions.
86 changes: 73 additions & 13 deletions availability-oracle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use manifest::{Abi, DataSource, Manifest, Mapping};
use network_subgraph::*;
use secp256k1::SecretKey;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::{Duration, Instant};
use std::{fmt::Display, str::FromStr};
use structopt::StructOpt;
Expand Down Expand Up @@ -148,6 +149,8 @@ struct Config {
pub oracle_index: Option<u64>,
}

const DEPLOYMENT_CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 24);

#[tokio::main]
async fn main() -> Result<()> {
common::main(run).await
Expand All @@ -159,6 +162,10 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
let epoch_subgraph =
EpochBlockOracleSubgraphImpl::new(logger.clone(), config.epoch_block_oracle_subgraph);
let contract: Box<dyn StateManager> = if config.dry_run {
info!(
logger,
"Running in dry mode: no transactions will be submitted on chain!"
);
Box::new(StateManagerDryRun::new(logger.clone()))
} else {
let signing_key: &SecretKey = &config.signing_key.unwrap().parse()?;
Expand All @@ -181,6 +188,10 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
if config.period > Duration::from_secs(0) {
let mut interval = tokio::time::interval(config.period);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

// Valid deployments get checked only every DEPLOYMENT_CACHE_TTL seconds
let mut deployment_cache: Vec<(Cid, SystemTime)> = Vec::new();

loop {
interval.tick().await;

Expand All @@ -197,11 +208,16 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
grace_period,
epoch_subgraph.clone(),
&config.supported_data_source_kinds,
deployment_cache.clone(),
)
.await
{
Ok(()) => {
Ok(updated_deployment_cache) => {
METRICS.reconcile_runs_ok.inc();
deployment_cache = updated_deployment_cache;
info!(logger, "Deployment cache updated";
"count" => deployment_cache.len()
);
}
Err(e) => {
METRICS.reconcile_runs_err.inc();
Expand All @@ -221,7 +237,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
ipfs.invalidate_cache();
}
}
reconcile_deny_list(
match reconcile_deny_list(
&logger,
&ipfs,
&*contract,
Expand All @@ -230,8 +246,13 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
grace_period,
epoch_subgraph.clone(),
&config.supported_data_source_kinds,
Vec::new(),
)
.await
{
Ok(_) => return Ok(()),
Err(e) => return Err(e),
}
}

// This function is used to create a state manager based on the configuration.
Expand Down Expand Up @@ -281,7 +302,8 @@ pub async fn reconcile_deny_list(
grace_period: Duration,
epoch_subgraph: Arc<impl EpochBlockOracleSubgraph>,
supported_ds_kinds: &[String],
) -> Result<(), Error> {
deployment_cache: Vec<(Cid, SystemTime)>,
) -> Result<Vec<(Cid, SystemTime)>, Error> {
let logger = logger.clone();

// Fetch supported networks
Expand All @@ -300,20 +322,35 @@ pub async fn reconcile_deny_list(
);

// Check the availability status of all subgraphs, and gather which should flip the deny flag.
let status_changes: Vec<([u8; 32], bool)> = subgraph
let deployment_status: Vec<([u8; 32], bool, bool, SystemTime)> = subgraph
.deployments_over_threshold(min_signal, grace_period)
.map(|deployment| async {
let deployment = deployment?;
let id = bytes32_to_cid_v0(deployment.id);
let validity = match check(ipfs, id, &supported_networks, supported_ds_kinds).await {
Ok(()) => Valid::Yes,
Err(CheckError::Invalid(e)) => Valid::No(e),
Err(CheckError::Other(e)) => return Err(e),

// Valid subgraphs are only checked every DEPLOYMENT_CACHE_TTL seconds to reduce IPFS requests
let cached = deployment_cache
.iter()
.filter(|(_, last_validated)| {
last_validated.elapsed().unwrap() < DEPLOYMENT_CACHE_TTL
})
.find(|(cid, _)| *cid == id);

if cached.is_some() {
METRICS.deployment_cache_hits.inc();
return Ok((deployment, Valid::Yes, cached.unwrap().1));
} else {
let validity = match check(ipfs, id, &supported_networks, supported_ds_kinds).await
{
Ok(()) => Valid::Yes,
Err(CheckError::Invalid(e)) => Valid::No(e),
Err(CheckError::Other(e)) => return Err(e),
};
return Ok((deployment, validity, SystemTime::now()));
};
Result::<_, Error>::Ok((deployment, validity))
})
.buffered(100)
.try_filter_map(move |(deployment, validity)| {
.try_filter_map(move |(deployment, validity, last_validated)| {
let logger = logger.clone();
async move {
info!(logger, "Check subgraph";
Expand All @@ -336,7 +373,7 @@ pub async fn reconcile_deny_list(
);
}
};
None
Some((deployment.id, should_deny, false, last_validated))
}

// The validity status changed, flip the deny flag.
Expand All @@ -347,15 +384,32 @@ pub async fn reconcile_deny_list(
"status" => should_deny,
"reason" => validity.to_string(),
);
Some((deployment.id, should_deny))
Some((deployment.id, should_deny, true, last_validated))
}
})
}
})
.try_collect()
.await?;

state_manager.deny_many(status_changes).await
// Flip on chain status for those deployments that changed
let changed_deployments = deployment_status
.iter()
.filter(|(_, _, status_changed, _)| *status_changed)
.map(|(cid, should_deny, _, _)| (*cid, *should_deny))
.collect();
match state_manager.deny_many(changed_deployments).await {
Ok(_) => {}
Err(e) => return Err(e),
};

// Return updated deployment cache
let updated_deployment_cache: Vec<(Cid, SystemTime)> = deployment_status
.iter()
.filter(|(_, should_deny, _, _)| !*should_deny)
.map(|(cid, _, _, last_validated)| (bytes32_to_cid_v0(*cid), *last_validated))
.collect();
Ok(updated_deployment_cache)
}

enum Valid {
Expand Down Expand Up @@ -537,6 +591,7 @@ struct Metrics {
reconcile_runs_total: prometheus::IntCounter,
reconcile_runs_ok: prometheus::IntCounter,
reconcile_runs_err: prometheus::IntCounter,
deployment_cache_hits: prometheus::IntCounter,
}

lazy_static! {
Expand All @@ -561,6 +616,11 @@ impl Metrics {
"Total reconcile runs with errors"
)
.unwrap(),
deployment_cache_hits: prometheus::register_int_counter!(
"deployment_cache_hits",
"Total deployment cache hits"
)
.unwrap(),
}
}
}
4 changes: 3 additions & 1 deletion availability-oracle/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod tests {
use futures::Stream;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use std::{pin::Pin, str::FromStr};
use tiny_cid::Cid;

Expand Down Expand Up @@ -61,9 +62,10 @@ mod tests {
"file/ipfs".into(),
"substreams".into(),
],
vec![]
)
.await
.unwrap()
.unwrap();
}

struct MockSubgraph;
Expand Down

0 comments on commit 29fff77

Please sign in to comment.