Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delete 'shallow' fh eth blks on rpc+ingestor start #4790

Merged
merged 8 commits into from
Aug 7, 2023
12 changes: 12 additions & 0 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,18 @@ async fn main() {
Some(&eth_firehose_only_networks)
};

if !opt.disable_block_ingestor && eth_networks.networks.len() != 0 {
let eth_network_names = Vec::from_iter(eth_networks.networks.keys());
let fh_only = match eth_firehose_only_networks {
Some(firehose_only) => Some(Vec::from_iter(firehose_only.networks.keys())),
None => None,
};
network_store
.block_store()
.cleanup_ethereum_shallow_blocks(eth_network_names, fh_only)
.unwrap();
}

let ethereum_chains = ethereum_networks_as_chains(
&mut blockchain_map,
&logger,
Expand Down

This file was deleted.

This file was deleted.

36 changes: 35 additions & 1 deletion store/postgres/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
use graph::{
blockchain::ChainIdentifier,
components::store::BlockStore as BlockStoreTrait,
prelude::{error, warn, BlockNumber, BlockPtr, Logger, ENV_VARS},
prelude::{error, info, warn, BlockNumber, BlockPtr, Logger, ENV_VARS},
};
use graph::{constraint_violation, prelude::CheapClone};
use graph::{
Expand Down Expand Up @@ -442,6 +442,40 @@ impl BlockStore {
Ok(())
}

pub fn cleanup_ethereum_shallow_blocks(
&self,
ethereum_networks: Vec<&String>,
firehose_only_networks: Option<Vec<&String>>,
) -> Result<(), StoreError> {
for store in self.stores.read().unwrap().values() {
if !ethereum_networks.contains(&&store.chain) {
continue;
};
if let Some(fh_nets) = firehose_only_networks.clone() {
if fh_nets.contains(&&store.chain) {
continue;
};
}
match store.chain_head_block(&&store.chain).unwrap_or(None) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

if let Some(head) = store.chain_head_block(&&store.chain)?

Some(head) => {
let lower_bound = head - ENV_VARS.reorg_threshold;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for overflow. Also I'd give some slack here so we don't have to worry about off by one, race conditions or whatever, maybe use 2 * ENV_VARS.reorg_threshold.


info!(&self.logger, "Cleaning shallow blocks and cursor on non-firehose chain"; "network" => &store.chain, "lower_bound" => lower_bound);
let ret = store.remove_cursor();
if ret.is_err() {
return ret;
}
store.cleanup_shallow_blocks(lower_bound)?
}
None => {
info!(&self.logger, "Cleaning any cursor on non-firehose chain"; "network" => &store.chain);
store.remove_cursor()?
}
}
}
Ok(())
}

fn truncate_block_caches(&self) -> Result<(), StoreError> {
for store in self.stores.read().unwrap().values() {
store.truncate_block_cache()?
Expand Down
37 changes: 37 additions & 0 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,31 @@ mod data {
Ok(())
}

pub(super) fn cleanup_shallow_blocks(
&self,
conn: &PgConnection,
lowest_block: i32,
) -> Result<(), StoreError> {
let table_name = match &self {
Storage::Shared => ETHEREUM_BLOCKS_TABLE_NAME,
Storage::Private(Schema { blocks, .. }) => &blocks.qname,
};
conn.batch_execute(&format!(
"delete from {} WHERE number >= {} AND data->'block'->'data' = 'null'::jsonb;",
table_name, lowest_block,
))?;
Ok(())
}

pub(super) fn remove_cursor(&self, conn: &PgConnection) -> Result<(), StoreError> {
use public::ethereum_networks::dsl::*;

update(ethereum_networks)
.set(head_block_cursor.eq(None as Option<String>))
.execute(conn)?;
Ok(())
}

/// Insert a block. If the table already contains a block with the
/// same hash, then overwrite that block since it may be adding
/// transaction receipts. If `overwrite` is `true`, overwrite a
Expand Down Expand Up @@ -1553,6 +1578,18 @@ impl ChainStore {
.delete_blocks_by_hash(&conn, &self.chain, block_hashes)
}

pub fn cleanup_shallow_blocks(&self, lowest_block: i32) -> Result<(), StoreError> {
let conn = self.get_conn()?;
self.storage.cleanup_shallow_blocks(&conn, lowest_block)?;
Ok(())
}

pub fn remove_cursor(&self) -> Result<(), StoreError> {
let conn = self.get_conn()?;
self.storage.remove_cursor(&conn)?;
Ok(())
}

pub fn truncate_block_cache(&self) -> Result<(), StoreError> {
let conn = self.get_conn()?;
self.storage.truncate_block_cache(&conn)?;
Expand Down