Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffered blockstore write optimization #1059

Merged
merged 1 commit into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion blockchain/state_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ where

// Construct receipt root from receipts
let rect_root = Amt::new_from_iter(self.blockstore(), receipts)?;

// Flush changes to blockstore
let state_root = vm.flush()?;
// Persist changes connected to root
Expand Down
17 changes: 10 additions & 7 deletions ipld/blockstore/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use cid::{Cid, Code, DAG_CBOR};
use db::{Error, Store};
use std::collections::HashMap;
use std::error::Error as StdError;
use std::io::{BufRead, Seek};
use std::io::{BufRead, BufReader, Seek};
use std::{cell::RefCell, convert::TryFrom, io::Cursor};

/// Wrapper around `BlockStore` to limit and have control over when values are written.
Expand All @@ -34,8 +34,10 @@ where
/// This will recursively traverse the cache and write all data connected by links to this
/// root Cid.
pub fn flush(&mut self, root: &Cid) -> Result<(), Box<dyn StdError>> {
copy_rec(self.base, &self.write.borrow(), *root)?;
let mut buffer = Vec::new();
copy_rec(self.base, &self.write.borrow(), *root, &mut buffer)?;

self.base.bulk_write(&buffer)?;
self.write = Default::default();

Ok(())
Expand Down Expand Up @@ -148,6 +150,7 @@ fn copy_rec<BS>(
base: &BS,
cache: &HashMap<Cid, Vec<u8>>,
root: Cid,
buffer: &mut Vec<(Vec<u8>, Vec<u8>)>,
) -> Result<(), Box<dyn StdError>>
where
BS: BlockStore,
Expand All @@ -159,7 +162,7 @@ where
let block = cache
.get(&root)
.ok_or_else(|| format!("Invalid link ({}) in flushing buffered store", root))?;
let links = scan_for_links(&mut std::io::BufReader::new(Cursor::new(block)))?;
let links = scan_for_links(&mut BufReader::new(Cursor::new(block)))?;

// Go through all the links recursively
for link in links.iter() {
Expand All @@ -168,14 +171,14 @@ where
}
// DB reads are expensive. So we check if it exists in the cache.
// If it doesnt exist in the DB, which is likely, we proceed with using the cache.
// So makes sense to check the cache and then check the DB if needed.
if !cache.contains_key(&link) && base.exists(link.to_bytes())? {
if !cache.contains_key(&link) {
continue;
}
// Recursively find more links under the links we're iterating over.
copy_rec(base, cache, *link)?;
copy_rec(base, cache, *link, buffer)?;
}
base.write(&root.to_bytes(), block)?;

buffer.push((root.to_bytes(), block.to_vec()));

Ok(())
}
Expand Down