Skip to content

Commit

Permalink
Implement chain export functionality and car writer (#861)
Browse files Browse the repository at this point in the history
* Implement car exporting functionality

* Tests and cleanup functionality

* Fix bug

* make loading car async

* fmt

* impl AsyncRead for gzip decoder
  • Loading branch information
austinabell authored Nov 24, 2020
1 parent 1b4dd61 commit d690d48
Show file tree
Hide file tree
Showing 21 changed files with 412 additions and 103 deletions.
16 changes: 11 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions blockchain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ lazy_static = "1.4"
interpreter = { path = "../../vm/interpreter/" }
lru = "0.6"
rayon = "1.3"
forest_car = { path = "../../ipld/car" }
forest_ipld = { path = "../../ipld" }

[features]
json = []
Expand Down
160 changes: 156 additions & 4 deletions blockchain/chain/src/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use super::{ChainIndex, Error};
use actor::{power::State as PowerState, STORAGE_POWER_ACTOR_ADDR};
use address::Address;
use async_std::sync::RwLock;
use async_std::sync::{channel, RwLock};
use async_std::task;
use beacon::{BeaconEntry, IGNORE_DRAND_VAR};
use blake2b_simd::Params;
Expand All @@ -16,7 +16,9 @@ use clock::ChainEpoch;
use crypto::DomainSeparationTag;
use encoding::{blake2b_256, de::DeserializeOwned, from_slice, Cbor};
use flo_stream::{MessagePublisher, Publisher, Subscriber};
use futures::{future, StreamExt};
use forest_car::CarHeader;
use forest_ipld::Ipld;
use futures::{future, AsyncWrite, StreamExt};
use interpreter::BlockMessages;
use ipld_amt::Amt;
use ipld_blockstore::BlockStore;
Expand All @@ -28,7 +30,8 @@ use num_traits::Zero;
use rayon::prelude::*;
use serde::Serialize;
use state_tree::StateTree;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet, VecDeque};
use std::error::Error as StdError;
use std::io::Write;
use std::sync::Arc;
use types::WINNING_POST_SECTOR_SET_LOOKBACK;
Expand Down Expand Up @@ -488,6 +491,152 @@ where
let bmsgs = self.block_msgs_for_tipset(ts)?;
Ok(bmsgs.into_iter().map(|bm| bm.messages).flatten().collect())
}

/// Exports a range of tipsets, as well as the state roots based on the `recent_roots`.
pub async fn export<W>(
&self,
tipset: &Tipset,
recent_roots: ChainEpoch,
skip_old_msgs: bool,
mut writer: W,
) -> Result<(), Error>
where
W: AsyncWrite + Send + Unpin + 'static,
{
// Channel cap is equal to buffered write size
const CHANNEL_CAP: usize = 1000;
let (tx, mut rx) = channel(CHANNEL_CAP);
let header = CarHeader::from(tipset.key().cids().to_vec());
let write_task =
task::spawn(async move { header.write_stream_async(&mut writer, &mut rx).await });

// TODO add timer for export
info!("chain export started");
Self::walk_snapshot(tipset, recent_roots, skip_old_msgs, |cid| {
let block = self
.blockstore()
.get_bytes(&cid)?
.ok_or_else(|| format!("Cid {} not found in blockstore", cid))?;

// * If cb can return a generic type, deserializing would remove need to clone.
task::block_on(tx.send((cid, block.clone())));
Ok(block)
})
.await?;

// Drop sender, to close the channel to write task, which will end when finished writing
drop(tx);

// Await on values being written.
write_task
.await
.map_err(|e| Error::Other(format!("Failed to write blocks in export: {}", e)))?;
info!("export finished");

Ok(())
}

async fn walk_snapshot<F>(
tipset: &Tipset,
recent_roots: ChainEpoch,
skip_old_msgs: bool,
mut load_block: F,
) -> Result<(), Error>
where
F: FnMut(Cid) -> Result<Vec<u8>, Box<dyn StdError>>,
{
let mut seen = HashSet::<Cid>::new();
let mut blocks_to_walk: VecDeque<Cid> = tipset.cids().to_vec().into();
let mut current_min_height = tipset.epoch();
let incl_roots_epoch = tipset.epoch() - recent_roots;

while let Some(next) = blocks_to_walk.pop_front() {
if !seen.insert(next) {
continue;
}

let data = load_block(next)?;

let h = BlockHeader::unmarshal_cbor(&data)?;

if current_min_height > h.epoch() {
current_min_height = h.epoch();
}

if !skip_old_msgs || h.epoch() > incl_roots_epoch {
recurse_links(&mut seen, *h.messages(), &mut load_block)?;
}

if h.epoch() > 0 {
for p in h.parents().cids() {
blocks_to_walk.push_back(*p);
}
} else {
for p in h.parents().cids() {
load_block(*p)?;
}
}

if h.epoch() == 0 || h.epoch() > incl_roots_epoch {
recurse_links(&mut seen, *h.state_root(), &mut load_block)?;
}
}
Ok(())
}
}

fn traverse_ipld_links<F>(
walked: &mut HashSet<Cid>,
load_block: &mut F,
ipld: &Ipld,
) -> Result<(), Box<dyn StdError>>
where
F: FnMut(Cid) -> Result<Vec<u8>, Box<dyn StdError>>,
{
match ipld {
Ipld::Map(m) => {
for (_, v) in m.iter() {
traverse_ipld_links(walked, load_block, v)?;
}
}
Ipld::List(list) => {
for v in list.iter() {
traverse_ipld_links(walked, load_block, v)?;
}
}
Ipld::Link(cid) => {
if cid.codec() == cid::DAG_CBOR {
if !walked.insert(*cid) {
return Ok(());
}
let bytes = load_block(*cid)?;
let ipld = Ipld::unmarshal_cbor(&bytes)?;
traverse_ipld_links(walked, load_block, &ipld)?;
}
}
_ => (),
}
Ok(())
}

fn recurse_links<F>(walked: &mut HashSet<Cid>, root: Cid, load_block: &mut F) -> Result<(), Error>
where
F: FnMut(Cid) -> Result<Vec<u8>, Box<dyn StdError>>,
{
if !walked.insert(root) {
// Cid has already been traversed
return Ok(());
}
if root.codec() != cid::DAG_CBOR {
return Ok(());
}

let bytes = load_block(root)?;
let ipld = Ipld::unmarshal_cbor(&bytes)?;

traverse_ipld_links(walked, load_block, &ipld)?;

Ok(())
}

pub(crate) type TipsetCache = RwLock<LruCache<TipsetKeys, Arc<Tipset>>>;
Expand Down Expand Up @@ -575,7 +724,10 @@ where
let secpk_cids = read_amt_cids(db, &roots.secp_message_root)?;
Ok((bls_cids, secpk_cids))
} else {
Err(Error::UndefinedKey("no msgs with that key".to_string()))
Err(Error::UndefinedKey(format!(
"no msg root with cid {}",
msg_cid
)))
}
}

Expand Down
7 changes: 7 additions & 0 deletions blockchain/chain/src/store/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use cid::Error as CidErr;
use db::Error as DbErr;
use encoding::{error::Error as SerdeErr, Error as EncErr};
use ipld_amt::Error as AmtErr;
use std::error::Error as StdError;
use thiserror::Error;

/// Chain error
Expand Down Expand Up @@ -63,3 +64,9 @@ impl From<String> for Error {
Error::Other(e)
}
}

impl From<Box<dyn StdError>> for Error {
fn from(e: Box<dyn StdError>) -> Self {
Error::Other(e.to_string())
}
}
6 changes: 4 additions & 2 deletions blockchain/chain_sync/src/sync_worker/full_sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn space_race_full_sync() {
let (network_send, network_recv) = channel(20);

// Initialize genesis using default (currently space-race) genesis
let (genesis, _) = initialize_genesis(None, &state_manager).unwrap();
let (genesis, _) = initialize_genesis(None, &state_manager).await.unwrap();
let genesis = Arc::new(genesis);

let beacon = Arc::new(DrandBeacon::new(
Expand All @@ -66,7 +66,9 @@ async fn space_race_full_sync() {
let network = SyncNetworkContext::new(network_send, Arc::new(peer_manager), db);

let provider_db = Arc::new(MemoryDB::default());
let cids: Vec<Cid> = load_car(provider_db.as_ref(), EXPORT_SR_40.as_ref()).unwrap();
let cids: Vec<Cid> = load_car(provider_db.as_ref(), EXPORT_SR_40.as_ref())
.await
.unwrap();
let prov_cs = ChainStore::new(provider_db);
let target = prov_cs
.tipset_from_keys(&TipsetKeys::new(cids))
Expand Down
4 changes: 2 additions & 2 deletions blockchain/chain_sync/src/sync_worker/validate_block_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ async fn validate_specific_block() {

let db = Arc::new(MemoryDB::default());

let cids = load_car(db.as_ref(), EXPORT_SR_40.as_ref()).unwrap();
let cids = load_car(db.as_ref(), EXPORT_SR_40.as_ref()).await.unwrap();

let chain_store = Arc::new(ChainStore::new(db.clone()));
let state_manager = Arc::new(StateManager::new(chain_store.clone()));

// Initialize genesis using default (currently space-race) genesis
let (genesis, _) = initialize_genesis(None, &state_manager).unwrap();
let (genesis, _) = initialize_genesis(None, &state_manager).await.unwrap();
let genesis = Arc::new(genesis);

let beacon = Arc::new(DrandBeacon::new(
Expand Down
1 change: 1 addition & 0 deletions blockchain/message_pool/src/msgpool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

mod selection;
use super::config::MpoolConfig;
use super::errors::Error;
Expand Down
3 changes: 3 additions & 0 deletions blockchain/message_pool/src/msgpool/selection.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::{allow_negative_chains, create_message_chains, MessagePool, Provider};
use crate::msg_chain::MsgChain;
use crate::{run_head_change, Error};
Expand Down
5 changes: 3 additions & 2 deletions forest/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ pub(super) async fn start(config: Config) {

// Read Genesis file
// * When snapshot command implemented, this genesis does not need to be initialized
let (genesis, network_name) =
initialize_genesis(config.genesis_file.as_ref(), &state_manager).unwrap();
let (genesis, network_name) = initialize_genesis(config.genesis_file.as_ref(), &state_manager)
.await
.unwrap();

// Sync from snapshot
if let Some(path) = &config.snapshot_path {
Expand Down
4 changes: 3 additions & 1 deletion ipld/car/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ authors = ["ChainSafe Systems <info@chainsafe.io>"]
edition = "2018"

[dependencies]
unsigned-varint = { version = "0.5", features = ["futures-codec"] }
cid = { package = "forest_cid", path = "../cid", features = ["cbor"] }
forest_encoding = { path = "../../encoding" }
blockstore = { package = "ipld_blockstore", path = "../blockstore" }
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
futures = "0.3.5"
integer-encoding = { version = "2.1", features = ["futures_async"] }

[dev-dependencies]
db = { path = "../../node/db" }
async-std = { version = "1.6.3", features = ["unstable", "attributes"] }
4 changes: 4 additions & 0 deletions ipld/car/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ pub enum Error {
ParsingError(String),
#[error("Invalid CAR file: {0}")]
InvalidFile(String),
#[error("Io error: {0}")]
Io(#[from] std::io::Error),
#[error("Cbor encoding error: {0}")]
Cbor(#[from] forest_encoding::error::Error),
#[error("CAR error: {0}")]
Other(String),
}
Expand Down
Loading

0 comments on commit d690d48

Please sign in to comment.