Skip to content

Commit

Permalink
fix: reduce memory usage in archive file ingestion (#480)
Browse files Browse the repository at this point in the history
Previously, when parsing and archive tsv to determine the canonical
fork, we would always start at block 0, even if there was a rocksdb
instance containing stacks chaindata. The entire fork was loaded in
memory and inserted.

Now, we start at the latest confirmed chaintip for this process, which
greatly reduces the memory footprint on subsequent startups of the
Chainhook service. Note: an initial run of the Chainhook service can
still use significant memory.
  • Loading branch information
MicaiahReid committed Jan 30, 2024
1 parent fc29be7 commit 83af58b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 13 deletions.
1 change: 1 addition & 0 deletions components/chainhook-cli/src/archive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont
std::process::exit(1);
}
}
info!(ctx.expect_logger(), "Successfully downloaded tsv file");
config.add_local_stacks_tsv_source(&tsv_file_path);
}
true
Expand Down
38 changes: 31 additions & 7 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
storage::{
get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted,
get_stacks_block_at_block_height, insert_entry_in_stacks_blocks, is_stacks_block_present,
open_readwrite_stacks_db_conn,
open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn,
},
};
use chainhook_sdk::types::{BlockIdentifier, Chain};
Expand Down Expand Up @@ -62,14 +62,18 @@ pub enum RecordKind {

pub async fn get_canonical_fork_from_tsv(
config: &mut Config,
start_block: Option<u64>,
ctx: &Context,
) -> Result<VecDeque<(BlockIdentifier, BlockIdentifier, String)>, String> {
let seed_tsv_path = config.expected_local_stacks_tsv_file().clone();

let (record_tx, record_rx) = std::sync::mpsc::channel();

let start_block = 0;

let mut start_block = start_block.unwrap_or(0);
info!(
ctx.expect_logger(),
"Parsing tsv file to determine canoncial fork"
);
let parsing_handle = hiro_system_kit::thread_named("Stacks chainstate CSV parsing")
.spawn(move || {
let mut reader_builder = csv::ReaderBuilder::default()
Expand All @@ -95,6 +99,7 @@ pub async fn get_canonical_fork_from_tsv(
})
.expect("unable to spawn thread");

let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let canonical_fork = {
let mut cursor = BlockIdentifier::default();
let mut dump = HashMap::new();
Expand All @@ -114,7 +119,16 @@ pub async fn get_canonical_fork_from_tsv(
};

if start_block > block_identifier.index {
continue;
// don't insert blocks that are already in the db,
// but do fill any gaps in our data
if is_stacks_block_present(&block_identifier, 0, &stacks_db)
|| block_identifier.index == 0
{
continue;
} else {
start_block = block_identifier.index;
info!(ctx.expect_logger(), "Found missing block ({start_block}) during tsv parsing; will insert into db",);
}
}

if block_identifier.index > cursor.index {
Expand All @@ -140,6 +154,10 @@ pub async fn get_canonical_fork_from_tsv(
};
let _ = parsing_handle.join();

info!(
ctx.expect_logger(),
"Finished parsing tsv file to determine canonical fork"
);
Ok(canonical_fork)
}

Expand Down Expand Up @@ -442,7 +460,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(

let _ = download_stacks_dataset_if_required(config, ctx).await;

let mut canonical_fork = get_canonical_fork_from_tsv(config, ctx).await?;
let mut canonical_fork = get_canonical_fork_from_tsv(config, None, ctx).await?;

let mut indexer = Indexer::new(config.network.clone());

Expand Down Expand Up @@ -538,13 +556,19 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(

let _ = download_stacks_dataset_if_required(config, ctx).await;

let mut canonical_fork = get_canonical_fork_from_tsv(config, ctx).await?;
let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let confirmed_tip = get_last_block_height_inserted(&stacks_db, &ctx);
let mut canonical_fork = get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?;

let mut indexer = Indexer::new(config.network.clone());
let mut blocks_inserted = 0;
let mut blocks_read = 0;
let blocks_to_insert = canonical_fork.len();
let stacks_db_rw = open_readwrite_stacks_db_conn(&config.expected_cache_path(), ctx)?;
info!(
ctx.expect_logger(),
"Begining import of {} Stacks blocks into rocks db", blocks_to_insert
);
for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
blocks_read += 1;

Expand Down Expand Up @@ -573,7 +597,7 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(
if blocks_inserted % 2500 == 0 {
info!(
ctx.expect_logger(),
"Importing Stacks blocks: {}/{}", blocks_read, blocks_to_insert
"Importing Stacks blocks into rocks db: {}/{}", blocks_read, blocks_to_insert
);
let _ = stacks_db_rw.flush();
}
Expand Down
39 changes: 33 additions & 6 deletions components/chainhook-cli/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@ fn get_default_stacks_db_file_path(base_dir: &PathBuf) -> PathBuf {
destination_path
}

pub fn open_readonly_stacks_db_conn_with_retry(
base_dir: &PathBuf,
retry: u8,
ctx: &Context,
) -> Result<DB, String> {
let mut attempt = 0;
loop {
match open_readonly_stacks_db_conn(base_dir, ctx) {
Ok(conn) => return Ok(conn),
Err(e) => {
debug!(
ctx.expect_logger(),
"Failed to open stadcks.rocksdb. Trying again in a few seconds."
);
attempt += 1;
std::thread::sleep(std::time::Duration::from_secs(2));
if attempt > retry {
return Err(e);
}
}
}
}
}

pub fn open_readonly_stacks_db_conn(base_dir: &PathBuf, ctx: &Context) -> Result<DB, String> {
let path = get_default_stacks_db_file_path(&base_dir);
let opts = get_db_default_options();
Expand Down Expand Up @@ -91,12 +115,15 @@ pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB,
stacks_db_rw
.put(&key, &block_bytes.to_string().as_bytes())
.expect("unable to insert blocks");
stacks_db_rw
.put(
get_last_confirmed_insert_key(),
block.block_identifier.index.to_be_bytes(),
)
.expect("unable to insert metadata");
let previous_last_inserted = get_last_block_height_inserted(stacks_db_rw, _ctx).unwrap_or(0);
if block.block_identifier.index > previous_last_inserted {
stacks_db_rw
.put(
get_last_confirmed_insert_key(),
block.block_identifier.index.to_be_bytes(),
)
.expect("unable to insert metadata");
}
}

pub fn insert_unconfirmed_entry_in_stacks_blocks(
Expand Down

0 comments on commit 83af58b

Please sign in to comment.