Skip to content

Commit

Permalink
Add indexerHints to manifest with the ability to configure pruning
Browse files Browse the repository at this point in the history
* graph, core : new structure for historyBlocks in manifest

* graph: Add history_blocks_override and min_history_blocks env vars

* graph, store: refactor history_blocks setting and wire overrides

* graph: bump spec version to v1

* graph: min_history_blocks to default to 2 * reorg_threshold

* fix parse_indexer_hints unit tests

* node/graphman: graphman prune to default to min_history_blocks

* graph: change historyBlocks syntax to use the term `prune`

* node: better docstring graphman prune
  • Loading branch information
incrypto32 authored Jan 16, 2024
1 parent b197682 commit 6fe6676
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 34 deletions.
9 changes: 4 additions & 5 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
debug_fork: Option<DeploymentHash>,
version_switching_mode: SubgraphVersionSwitchingMode,
resolver: &Arc<dyn LinkResolver>,
history_blocks: Option<i32>,
history_blocks_override: Option<i32>,
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
let raw_string = serde_yaml::to_string(&raw).unwrap();
let unvalidated = UnvalidatedSubgraphManifest::<C>::resolve(
Expand All @@ -613,8 +613,6 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
.await
.map_err(SubgraphRegistrarError::ManifestValidationError)?;

let history_blocks = history_blocks.or(manifest.history_blocks());

let network_name = manifest.network_name();

let chain = chains
Expand Down Expand Up @@ -687,8 +685,9 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
.graft(base_block)
.debug(debug_fork)
.entities_with_causality_region(needs_causality_region);
if let Some(history_blocks) = history_blocks {
deployment = deployment.with_history_blocks(history_blocks);

if let Some(history_blocks) = history_blocks_override {
deployment = deployment.with_history_blocks_override(history_blocks);
}

deployment_store
Expand Down
2 changes: 1 addition & 1 deletion graph/src/data/subgraph/api_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub const SPEC_VERSION_0_0_8: Version = Version::new(0, 0, 8);
pub const SPEC_VERSION_0_0_9: Version = Version::new(0, 0, 9);

// Enables `indexerHints` feature.
pub const SPEC_VERSION_0_1_0: Version = Version::new(0, 1, 0);
pub const SPEC_VERSION_1_0_0: Version = Version::new(1, 0, 0);

pub const MIN_SPEC_VERSION: Version = Version::new(0, 0, 2);

Expand Down
98 changes: 89 additions & 9 deletions graph/src/data/subgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ pub mod status;

pub use features::{SubgraphFeature, SubgraphFeatureValidationError};

use crate::object;
use crate::{components::store::BLOCK_NUMBER_MAX, object};
use anyhow::{anyhow, Context, Error};
use futures03::{future::try_join, stream::FuturesOrdered, TryStreamExt as _};
use itertools::Itertools;
use semver::Version;
use serde::{de, ser};
use serde::{
de::{self, Visitor},
ser,
};
use serde_yaml;
use slog::Logger;
use stable_hash::{FieldAddress, StableHash};
Expand Down Expand Up @@ -548,7 +551,83 @@ pub struct BaseSubgraphManifest<C, S, D, T> {
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct IndexerHints {
pub history_blocks: Option<BlockNumber>,
prune: Option<Prune>,
}

impl IndexerHints {
pub fn history_blocks(&self) -> BlockNumber {
match self.prune {
Some(ref hb) => hb.history_blocks(),
None => BLOCK_NUMBER_MAX,
}
}
}

#[derive(Debug)]
pub enum Prune {
Auto,
Never,
Blocks(BlockNumber),
}

impl Prune {
pub fn history_blocks(&self) -> BlockNumber {
match self {
Prune::Never => BLOCK_NUMBER_MAX,
Prune::Auto => ENV_VARS.min_history_blocks,
Prune::Blocks(x) => *x,
}
}
}

impl<'de> de::Deserialize<'de> for Prune {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: de::Deserializer<'de>,
{
struct HistoryBlocksVisitor;

const ERROR_MSG: &str = "expected 'all', 'min', or a number for history blocks";

impl<'de> Visitor<'de> for HistoryBlocksVisitor {
type Value = Prune;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string or an integer for history blocks")
}

fn visit_str<E>(self, value: &str) -> Result<Prune, E>
where
E: de::Error,
{
match value {
"never" => Ok(Prune::Never),
"auto" => Ok(Prune::Auto),
_ => value
.parse::<i32>()
.map(Prune::Blocks)
.map_err(|_| E::custom(ERROR_MSG)),
}
}

fn visit_i32<E>(self, value: i32) -> Result<Prune, E>
where
E: de::Error,
{
Ok(Prune::Blocks(value))
}

fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
let i = v.try_into().map_err(|_| E::custom(ERROR_MSG))?;
Ok(Prune::Blocks(i))
}
}

deserializer.deserialize_any(HistoryBlocksVisitor)
}
}

/// SubgraphManifest with IPFS links unresolved
Expand Down Expand Up @@ -681,10 +760,11 @@ impl<C: Blockchain> SubgraphManifest<C> {
.collect()
}

pub fn history_blocks(&self) -> Option<BlockNumber> {
self.indexer_hints
.as_ref()
.and_then(|hints| hints.history_blocks)
pub fn history_blocks(&self) -> BlockNumber {
match self.indexer_hints {
Some(ref hints) => hints.history_blocks(),
None => BLOCK_NUMBER_MAX,
}
}

pub fn api_versions(&self) -> impl Iterator<Item = semver::Version> + '_ {
Expand Down Expand Up @@ -872,10 +952,10 @@ impl<C: Blockchain> UnresolvedSubgraphManifest<C> {
);
}

if spec_version < SPEC_VERSION_0_1_0 && indexer_hints.is_some() {
if spec_version < SPEC_VERSION_1_0_0 && indexer_hints.is_some() {
bail!(
"`indexerHints` are not supported prior to {}",
SPEC_VERSION_0_1_0
SPEC_VERSION_1_0_0
);
}

Expand Down
10 changes: 5 additions & 5 deletions graph/src/data/subgraph/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub struct DeploymentCreate {
pub graft_base: Option<DeploymentHash>,
pub graft_block: Option<BlockPtr>,
pub debug_fork: Option<DeploymentHash>,
pub history_blocks: Option<i32>,
pub history_blocks_override: Option<i32>,
}

impl DeploymentCreate {
Expand All @@ -120,12 +120,12 @@ impl DeploymentCreate {
graft_base: None,
graft_block: None,
debug_fork: None,
history_blocks: None,
history_blocks_override: None,
}
}

pub fn with_history_blocks(mut self, blocks: i32) -> Self {
self.history_blocks = Some(blocks);
pub fn with_history_blocks_override(mut self, blocks: i32) -> Self {
self.history_blocks_override = Some(blocks);
self
}

Expand Down Expand Up @@ -201,7 +201,7 @@ impl SubgraphManifestEntity {
schema: manifest.schema.document_string(),
raw_yaml: Some(raw_yaml),
entities_with_causality_region,
history_blocks: BLOCK_NUMBER_MAX,
history_blocks: manifest.history_blocks(),
}
}

Expand Down
18 changes: 17 additions & 1 deletion graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ pub struct EnvVars {
/// is there to simplify development and will be changed to `false` when
/// we get closer to release
pub enable_timeseries: bool,
/// Set by the env var `GRAPH_HISTORY_BLOCKS_OVERRIDE`. Defaults to None
/// Sets an override for the amount history to keep regardless of the
/// historyBlocks set in the manifest
pub history_blocks_override: Option<BlockNumber>,
/// Set by the env var `GRAPH_MIN_HISTORY_BLOCKS`
/// The amount of history to keep when using 'min' historyBlocks
/// in the manifest
pub min_history_blocks: BlockNumber,
}

impl EnvVars {
Expand Down Expand Up @@ -258,6 +266,10 @@ impl EnvVars {
prefer_substreams_block_streams: inner.prefer_substreams_block_streams,
enable_gas_metrics: inner.enable_gas_metrics.0,
enable_timeseries: inner.enable_timeseries.unwrap_or(cfg!(debug_assertions)),
history_blocks_override: inner.history_blocks_override,
min_history_blocks: inner
.min_history_blocks
.unwrap_or(2 * inner.reorg_threshold),
})
}

Expand Down Expand Up @@ -310,7 +322,7 @@ struct Inner {
default = "false"
)]
allow_non_deterministic_fulltext_search: EnvVarBoolean,
#[envconfig(from = "GRAPH_MAX_SPEC_VERSION", default = "0.0.9")]
#[envconfig(from = "GRAPH_MAX_SPEC_VERSION", default = "1.0.0")]
max_spec_version: Version,
#[envconfig(from = "GRAPH_LOAD_WINDOW_SIZE", default = "300")]
load_window_size_in_secs: u64,
Expand Down Expand Up @@ -391,6 +403,10 @@ struct Inner {
enable_gas_metrics: EnvVarBoolean,
#[envconfig(from = "GRAPH_EXPERIMENTAL_TIMESERIES")]
enable_timeseries: Option<bool>,
#[envconfig(from = "GRAPH_HISTORY_BLOCKS_OVERRIDE")]
history_blocks_override: Option<BlockNumber>,
#[envconfig(from = "GRAPH_MIN_HISTORY_BLOCKS")]
min_history_blocks: Option<BlockNumber>,
}

#[derive(Clone, Debug)]
Expand Down
9 changes: 6 additions & 3 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use config::PoolSize;
use git_testament::{git_testament, render_testament};
use graph::bail;
use graph::endpoint::EndpointMetrics;
use graph::env::ENV_VARS;
use graph::log::logger_with_levels;
use graph::prelude::{MetricsRegistry, BLOCK_NUMBER_MAX};
use graph::{data::graphql::load_manager::LoadManager, prelude::chrono, prometheus::Registry};
Expand Down Expand Up @@ -310,9 +311,10 @@ pub enum Command {
/// GRAPH_STORE_HISTORY_DELETE_THRESHOLD
#[clap(long, short)]
delete_threshold: Option<f64>,
/// How much history to keep in blocks
#[clap(long, short = 'y', default_value = "10000")]
history: usize,
/// How much history to keep in blocks. Defaults to
/// GRAPH_MIN_HISTORY_BLOCKS
#[clap(long, short = 'y')]
history: Option<usize>,
/// Prune only this once
#[clap(long, short)]
once: bool,
Expand Down Expand Up @@ -1493,6 +1495,7 @@ async fn main() -> anyhow::Result<()> {
once,
} => {
let (store, primary_pool) = ctx.store_and_primary();
let history = history.unwrap_or(ENV_VARS.min_history_blocks.try_into()?);
commands::prune::run(
store,
primary_pool,
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ pub fn create_deployment(
graft_base,
graft_block,
debug_fork,
history_blocks: history_blocks_override,
history_blocks_override,
} = deployment;
let earliest_block_number = start_block.as_ref().map(|ptr| ptr.number).unwrap_or(0);
let entities_with_causality_region = Vec::from_iter(
Expand Down
10 changes: 10 additions & 0 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ impl DeploymentStore {
// Create (or update) the metadata. Update only happens in tests
let entities_with_causality_region =
deployment.manifest.entities_with_causality_region.clone();

// If `GRAPH_HISTORY_BLOCKS_OVERRIDE` is set, override the history_blocks
// setting with the value of the environment variable.
let deployment =
if let Some(history_blocks_global_override) = ENV_VARS.history_blocks_override {
deployment.with_history_blocks_override(history_blocks_global_override)
} else {
deployment
};

if replace || !exists {
deployment::create_deployment(&conn, &site, deployment, exists, replace)?;
};
Expand Down
4 changes: 1 addition & 3 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,16 +630,14 @@ impl SubgraphStoreInner {
)));
}

let history_blocks = deployment.manifest.history_blocks;

// Transmogrify the deployment into a new one
let deployment = DeploymentCreate {
manifest: deployment.manifest,
start_block: deployment.start_block.clone(),
graft_base: Some(src.deployment.clone()),
graft_block: Some(block),
debug_fork: deployment.debug_fork,
history_blocks: Some(history_blocks),
history_blocks_override: None,
};

let graft_base = self.layout(&src.deployment)?;
Expand Down
48 changes: 42 additions & 6 deletions store/test-store/tests/chain/ethereum/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ use std::sync::Arc;
use std::time::Duration;

use graph::blockchain::DataSource;
use graph::components::store::BLOCK_NUMBER_MAX;
use graph::data::store::scalar::Bytes;
use graph::data::store::Value;
use graph::data::subgraph::schema::SubgraphError;
use graph::data::subgraph::{
SPEC_VERSION_0_0_4, SPEC_VERSION_0_0_7, SPEC_VERSION_0_0_8, SPEC_VERSION_0_0_9,
SPEC_VERSION_0_1_0,
Prune, SPEC_VERSION_0_0_4, SPEC_VERSION_0_0_7, SPEC_VERSION_0_0_8, SPEC_VERSION_0_0_9,
SPEC_VERSION_1_0_0,
};
use graph::data_source::offchain::OffchainDataSourceKind;
use graph::data_source::DataSourceTemplate;
use graph::entity;
use graph::env::ENV_VARS;
use graph::prelude::{
anyhow, async_trait, serde_yaml, tokio, BigDecimal, BigInt, DeploymentHash, Link, Logger,
SubgraphManifest, SubgraphManifestValidationError, SubgraphStore, UnvalidatedSubgraphManifest,
Expand Down Expand Up @@ -198,14 +200,48 @@ schema:
graft:
base: Qmbase
block: 12345
specVersion: 0.1.0
specVersion: 1.0.0
indexerHints:
historyBlocks: 100
prune: 100
";

let manifest = resolve_manifest(YAML, SPEC_VERSION_0_1_0).await;
let manifest = resolve_manifest(YAML, SPEC_VERSION_1_0_0).await;

assert_eq!(manifest.history_blocks().unwrap(), 100);
assert_eq!(manifest.history_blocks(), 100);

let yaml: &str = "
dataSources: []
schema:
file:
/: /ipfs/Qmschema
graft:
base: Qmbase
block: 12345
specVersion: 1.0.0
indexerHints:
prune: auto
";

let manifest = resolve_manifest(yaml, SPEC_VERSION_1_0_0).await;
Prune::Auto.history_blocks();
assert_eq!(manifest.history_blocks(), ENV_VARS.min_history_blocks);

let yaml: &str = "
dataSources: []
schema:
file:
/: /ipfs/Qmschema
graft:
base: Qmbase
block: 12345
specVersion: 1.0.0
indexerHints:
prune: never
";

let manifest = resolve_manifest(yaml, SPEC_VERSION_1_0_0).await;

assert_eq!(manifest.history_blocks(), BLOCK_NUMBER_MAX);
}

#[test]
Expand Down

0 comments on commit 6fe6676

Please sign in to comment.