From d690d484ba1fdcd895ca56c21e177a24cd65f4b5 Mon Sep 17 00:00:00 2001 From: Austin Abell Date: Tue, 24 Nov 2020 12:45:52 -0500 Subject: [PATCH] Implement chain export functionality and car writer (#861) * Implement car exporting functionality * Tests and cleanup functionality * Fix bug * make loading car async * fmt * impl AsyncRead for gzip decoder --- Cargo.lock | 16 +- blockchain/chain/Cargo.toml | 2 + blockchain/chain/src/store/chain_store.rs | 160 +++++++++++++++++- blockchain/chain/src/store/errors.rs | 7 + .../src/sync_worker/full_sync_test.rs | 6 +- .../src/sync_worker/validate_block_test.rs | 4 +- blockchain/message_pool/src/msgpool/mod.rs | 1 + .../message_pool/src/msgpool/selection.rs | 3 + forest/src/daemon.rs | 5 +- ipld/car/Cargo.toml | 4 +- ipld/car/src/error.rs | 4 + ipld/car/src/lib.rs | 114 +++++++++++-- ipld/car/src/util.rs | 52 ++++-- ipld/car/tests/car_file_test.rs | 12 +- node/forest_libp2p/Cargo.toml | 1 + .../src/chain_exchange/provider.rs | 18 +- tests/conformance_tests/Cargo.toml | 4 +- .../tests/conformance_runner.rs | 46 +++-- utils/genesis/Cargo.toml | 1 + utils/genesis/src/lib.rs | 27 +-- utils/net_utils/src/download.rs | 28 +-- 21 files changed, 412 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fd2171c4ad6b..6ca4b47eb53a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -950,9 +950,11 @@ dependencies = [ "forest_address", "forest_bigint", "forest_blocks", + "forest_car", "forest_cid", "forest_crypto", "forest_encoding", + "forest_ipld", "forest_message", "futures 0.3.8", "interpreter", @@ -1166,6 +1168,7 @@ dependencies = [ "forest_encoding", "forest_message", "forest_vm", + "futures 0.3.8", "interpreter", "ipld_blockstore", "lazy_static", @@ -2279,13 +2282,15 @@ dependencies = [ name = "forest_car" version = "0.1.0" dependencies = [ + "async-std", "db", "forest_cid", "forest_encoding", + "futures 0.3.8", + "integer-encoding", "ipld_blockstore", "serde", "thiserror", - "unsigned-varint 0.5.1", ] [[package]] @@ -2663,6 +2668,7 @@ dependencies = [ "forest_car", "forest_cid", "forest_encoding", + "futures 0.3.8", "ipld_blockstore", "log", "net_utils", @@ -3101,6 +3107,10 @@ name = "integer-encoding" version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ddae3dfefb8ba2cb7c2bab20594847cea19f0cd0f28d57a4e79a12ddcfa9940" +dependencies = [ + "async-trait", + "futures-util", +] [[package]] name = "interpreter" @@ -6697,10 +6707,6 @@ name = "unsigned-varint" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7fdeedbf205afadfe39ae559b75c3240f24e257d0ca27e85f85cb82aa19ac35" -dependencies = [ - "bytes", - "futures_codec", -] [[package]] name = "untrusted" diff --git a/blockchain/chain/Cargo.toml b/blockchain/chain/Cargo.toml index f8e8abcb39a2..c8ac97bb7e8b 100644 --- a/blockchain/chain/Cargo.toml +++ b/blockchain/chain/Cargo.toml @@ -33,6 +33,8 @@ lazy_static = "1.4" interpreter = { path = "../../vm/interpreter/" } lru = "0.6" rayon = "1.3" +forest_car = { path = "../../ipld/car" } +forest_ipld = { path = "../../ipld" } [features] json = [] diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index d3a66647e0ba..cbc7b792ef7b 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -4,7 +4,7 @@ use super::{ChainIndex, Error}; use actor::{power::State as PowerState, STORAGE_POWER_ACTOR_ADDR}; use address::Address; -use async_std::sync::RwLock; +use async_std::sync::{channel, RwLock}; use async_std::task; use beacon::{BeaconEntry, IGNORE_DRAND_VAR}; use blake2b_simd::Params; @@ -16,7 +16,9 @@ use clock::ChainEpoch; use crypto::DomainSeparationTag; use encoding::{blake2b_256, de::DeserializeOwned, from_slice, Cbor}; use flo_stream::{MessagePublisher, Publisher, Subscriber}; -use futures::{future, StreamExt}; +use forest_car::CarHeader; +use forest_ipld::Ipld; +use futures::{future, AsyncWrite, StreamExt}; use interpreter::BlockMessages; use ipld_amt::Amt; use ipld_blockstore::BlockStore; @@ -28,7 +30,8 @@ use num_traits::Zero; use rayon::prelude::*; use serde::Serialize; use state_tree::StateTree; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::error::Error as StdError; use std::io::Write; use std::sync::Arc; use types::WINNING_POST_SECTOR_SET_LOOKBACK; @@ -488,6 +491,152 @@ where let bmsgs = self.block_msgs_for_tipset(ts)?; Ok(bmsgs.into_iter().map(|bm| bm.messages).flatten().collect()) } + + /// Exports a range of tipsets, as well as the state roots based on the `recent_roots`. + pub async fn export( + &self, + tipset: &Tipset, + recent_roots: ChainEpoch, + skip_old_msgs: bool, + mut writer: W, + ) -> Result<(), Error> + where + W: AsyncWrite + Send + Unpin + 'static, + { + // Channel cap is equal to buffered write size + const CHANNEL_CAP: usize = 1000; + let (tx, mut rx) = channel(CHANNEL_CAP); + let header = CarHeader::from(tipset.key().cids().to_vec()); + let write_task = + task::spawn(async move { header.write_stream_async(&mut writer, &mut rx).await }); + + // TODO add timer for export + info!("chain export started"); + Self::walk_snapshot(tipset, recent_roots, skip_old_msgs, |cid| { + let block = self + .blockstore() + .get_bytes(&cid)? + .ok_or_else(|| format!("Cid {} not found in blockstore", cid))?; + + // * If cb can return a generic type, deserializing would remove need to clone. + task::block_on(tx.send((cid, block.clone()))); + Ok(block) + }) + .await?; + + // Drop sender, to close the channel to write task, which will end when finished writing + drop(tx); + + // Await on values being written. + write_task + .await + .map_err(|e| Error::Other(format!("Failed to write blocks in export: {}", e)))?; + info!("export finished"); + + Ok(()) + } + + async fn walk_snapshot( + tipset: &Tipset, + recent_roots: ChainEpoch, + skip_old_msgs: bool, + mut load_block: F, + ) -> Result<(), Error> + where + F: FnMut(Cid) -> Result, Box>, + { + let mut seen = HashSet::::new(); + let mut blocks_to_walk: VecDeque = tipset.cids().to_vec().into(); + let mut current_min_height = tipset.epoch(); + let incl_roots_epoch = tipset.epoch() - recent_roots; + + while let Some(next) = blocks_to_walk.pop_front() { + if !seen.insert(next) { + continue; + } + + let data = load_block(next)?; + + let h = BlockHeader::unmarshal_cbor(&data)?; + + if current_min_height > h.epoch() { + current_min_height = h.epoch(); + } + + if !skip_old_msgs || h.epoch() > incl_roots_epoch { + recurse_links(&mut seen, *h.messages(), &mut load_block)?; + } + + if h.epoch() > 0 { + for p in h.parents().cids() { + blocks_to_walk.push_back(*p); + } + } else { + for p in h.parents().cids() { + load_block(*p)?; + } + } + + if h.epoch() == 0 || h.epoch() > incl_roots_epoch { + recurse_links(&mut seen, *h.state_root(), &mut load_block)?; + } + } + Ok(()) + } +} + +fn traverse_ipld_links( + walked: &mut HashSet, + load_block: &mut F, + ipld: &Ipld, +) -> Result<(), Box> +where + F: FnMut(Cid) -> Result, Box>, +{ + match ipld { + Ipld::Map(m) => { + for (_, v) in m.iter() { + traverse_ipld_links(walked, load_block, v)?; + } + } + Ipld::List(list) => { + for v in list.iter() { + traverse_ipld_links(walked, load_block, v)?; + } + } + Ipld::Link(cid) => { + if cid.codec() == cid::DAG_CBOR { + if !walked.insert(*cid) { + return Ok(()); + } + let bytes = load_block(*cid)?; + let ipld = Ipld::unmarshal_cbor(&bytes)?; + traverse_ipld_links(walked, load_block, &ipld)?; + } + } + _ => (), + } + Ok(()) +} + +fn recurse_links(walked: &mut HashSet, root: Cid, load_block: &mut F) -> Result<(), Error> +where + F: FnMut(Cid) -> Result, Box>, +{ + if !walked.insert(root) { + // Cid has already been traversed + return Ok(()); + } + if root.codec() != cid::DAG_CBOR { + return Ok(()); + } + + let bytes = load_block(root)?; + let ipld = Ipld::unmarshal_cbor(&bytes)?; + + traverse_ipld_links(walked, load_block, &ipld)?; + + Ok(()) } pub(crate) type TipsetCache = RwLock>>; @@ -575,7 +724,10 @@ where let secpk_cids = read_amt_cids(db, &roots.secp_message_root)?; Ok((bls_cids, secpk_cids)) } else { - Err(Error::UndefinedKey("no msgs with that key".to_string())) + Err(Error::UndefinedKey(format!( + "no msg root with cid {}", + msg_cid + ))) } } diff --git a/blockchain/chain/src/store/errors.rs b/blockchain/chain/src/store/errors.rs index 123a666839c8..55e9f5808fc4 100644 --- a/blockchain/chain/src/store/errors.rs +++ b/blockchain/chain/src/store/errors.rs @@ -6,6 +6,7 @@ use cid::Error as CidErr; use db::Error as DbErr; use encoding::{error::Error as SerdeErr, Error as EncErr}; use ipld_amt::Error as AmtErr; +use std::error::Error as StdError; use thiserror::Error; /// Chain error @@ -63,3 +64,9 @@ impl From for Error { Error::Other(e) } } + +impl From> for Error { + fn from(e: Box) -> Self { + Error::Other(e.to_string()) + } +} diff --git a/blockchain/chain_sync/src/sync_worker/full_sync_test.rs b/blockchain/chain_sync/src/sync_worker/full_sync_test.rs index 8274bfc58cb8..c9fb3cc19d02 100644 --- a/blockchain/chain_sync/src/sync_worker/full_sync_test.rs +++ b/blockchain/chain_sync/src/sync_worker/full_sync_test.rs @@ -48,7 +48,7 @@ async fn space_race_full_sync() { let (network_send, network_recv) = channel(20); // Initialize genesis using default (currently space-race) genesis - let (genesis, _) = initialize_genesis(None, &state_manager).unwrap(); + let (genesis, _) = initialize_genesis(None, &state_manager).await.unwrap(); let genesis = Arc::new(genesis); let beacon = Arc::new(DrandBeacon::new( @@ -66,7 +66,9 @@ async fn space_race_full_sync() { let network = SyncNetworkContext::new(network_send, Arc::new(peer_manager), db); let provider_db = Arc::new(MemoryDB::default()); - let cids: Vec = load_car(provider_db.as_ref(), EXPORT_SR_40.as_ref()).unwrap(); + let cids: Vec = load_car(provider_db.as_ref(), EXPORT_SR_40.as_ref()) + .await + .unwrap(); let prov_cs = ChainStore::new(provider_db); let target = prov_cs .tipset_from_keys(&TipsetKeys::new(cids)) diff --git a/blockchain/chain_sync/src/sync_worker/validate_block_test.rs b/blockchain/chain_sync/src/sync_worker/validate_block_test.rs index 3322a0ea1d85..b44130bd8058 100644 --- a/blockchain/chain_sync/src/sync_worker/validate_block_test.rs +++ b/blockchain/chain_sync/src/sync_worker/validate_block_test.rs @@ -23,13 +23,13 @@ async fn validate_specific_block() { let db = Arc::new(MemoryDB::default()); - let cids = load_car(db.as_ref(), EXPORT_SR_40.as_ref()).unwrap(); + let cids = load_car(db.as_ref(), EXPORT_SR_40.as_ref()).await.unwrap(); let chain_store = Arc::new(ChainStore::new(db.clone())); let state_manager = Arc::new(StateManager::new(chain_store.clone())); // Initialize genesis using default (currently space-race) genesis - let (genesis, _) = initialize_genesis(None, &state_manager).unwrap(); + let (genesis, _) = initialize_genesis(None, &state_manager).await.unwrap(); let genesis = Arc::new(genesis); let beacon = Arc::new(DrandBeacon::new( diff --git a/blockchain/message_pool/src/msgpool/mod.rs b/blockchain/message_pool/src/msgpool/mod.rs index d963bd655740..f234e0975d73 100644 --- a/blockchain/message_pool/src/msgpool/mod.rs +++ b/blockchain/message_pool/src/msgpool/mod.rs @@ -1,5 +1,6 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT + mod selection; use super::config::MpoolConfig; use super::errors::Error; diff --git a/blockchain/message_pool/src/msgpool/selection.rs b/blockchain/message_pool/src/msgpool/selection.rs index 161b89fead7c..947ad5936249 100644 --- a/blockchain/message_pool/src/msgpool/selection.rs +++ b/blockchain/message_pool/src/msgpool/selection.rs @@ -1,3 +1,6 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + use super::{allow_negative_chains, create_message_chains, MessagePool, Provider}; use crate::msg_chain::MsgChain; use crate::{run_head_change, Error}; diff --git a/forest/src/daemon.rs b/forest/src/daemon.rs index 7624129c5275..d6feaa16b946 100644 --- a/forest/src/daemon.rs +++ b/forest/src/daemon.rs @@ -71,8 +71,9 @@ pub(super) async fn start(config: Config) { // Read Genesis file // * When snapshot command implemented, this genesis does not need to be initialized - let (genesis, network_name) = - initialize_genesis(config.genesis_file.as_ref(), &state_manager).unwrap(); + let (genesis, network_name) = initialize_genesis(config.genesis_file.as_ref(), &state_manager) + .await + .unwrap(); // Sync from snapshot if let Some(path) = &config.snapshot_path { diff --git a/ipld/car/Cargo.toml b/ipld/car/Cargo.toml index 240ead2de88f..707847433c21 100644 --- a/ipld/car/Cargo.toml +++ b/ipld/car/Cargo.toml @@ -5,12 +5,14 @@ authors = ["ChainSafe Systems "] edition = "2018" [dependencies] -unsigned-varint = { version = "0.5", features = ["futures-codec"] } cid = { package = "forest_cid", path = "../cid", features = ["cbor"] } forest_encoding = { path = "../../encoding" } blockstore = { package = "ipld_blockstore", path = "../blockstore" } serde = { version = "1.0", features = ["derive"] } thiserror = "1.0" +futures = "0.3.5" +integer-encoding = { version = "2.1", features = ["futures_async"] } [dev-dependencies] db = { path = "../../node/db" } +async-std = { version = "1.6.3", features = ["unstable", "attributes"] } diff --git a/ipld/car/src/error.rs b/ipld/car/src/error.rs index e659881f2a06..7e69fd6f2565 100644 --- a/ipld/car/src/error.rs +++ b/ipld/car/src/error.rs @@ -10,6 +10,10 @@ pub enum Error { ParsingError(String), #[error("Invalid CAR file: {0}")] InvalidFile(String), + #[error("Io error: {0}")] + Io(#[from] std::io::Error), + #[error("Cbor encoding error: {0}")] + Cbor(#[from] forest_encoding::error::Error), #[error("CAR error: {0}")] Other(String), } diff --git a/ipld/car/src/lib.rs b/ipld/car/src/lib.rs index a001b25ba9a2..d75569e9e9fe 100644 --- a/ipld/car/src/lib.rs +++ b/ipld/car/src/lib.rs @@ -7,13 +7,13 @@ mod util; use blockstore::BlockStore; use cid::Cid; use error::*; -use forest_encoding::from_slice; +use forest_encoding::{from_slice, to_vec}; +use futures::{AsyncRead, AsyncWrite, Stream, StreamExt}; use serde::{Deserialize, Serialize}; -use std::io::Read; -use util::{ld_read, read_node}; +use util::{ld_read, ld_write, read_node}; /// CAR file header -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize, PartialEq)] pub struct CarHeader { pub roots: Vec, pub version: u64, @@ -24,6 +24,34 @@ impl CarHeader { pub fn new(roots: Vec, version: u64) -> Self { Self { roots, version } } + + /// Writes header and stream of data to writer in Car format. + pub async fn write_stream_async( + &self, + writer: &mut W, + stream: &mut S, + ) -> Result<(), Error> + where + W: AsyncWrite + Send + Unpin, + S: Stream)> + Unpin, + { + // Write header bytes + let header_bytes = to_vec(self)?; + ld_write(writer, &header_bytes).await?; + + // Write all key values from the stream + while let Some((cid, bytes)) = stream.next().await { + ld_write(writer, &[cid.to_bytes(), bytes].concat()).await?; + } + + Ok(()) + } +} + +impl From> for CarHeader { + fn from(roots: Vec) -> Self { + Self { roots, version: 1 } + } } /// Reads CAR files that are in a BufReader @@ -34,11 +62,12 @@ pub struct CarReader { impl CarReader where - R: Read, + R: AsyncRead + Send + Unpin, { /// Creates a new CarReader and parses the CarHeader - pub fn new(mut reader: R) -> Result { - let buf = ld_read(&mut reader)? + pub async fn new(mut reader: R) -> Result { + let buf = ld_read(&mut reader) + .await? .ok_or_else(|| Error::ParsingError("failed to parse uvarint for header".to_string()))?; let header: CarHeader = from_slice(&buf).map_err(|e| Error::ParsingError(e.to_string()))?; if header.roots.is_empty() { @@ -51,9 +80,11 @@ where } /// Returns the next IPLD Block in the buffer - pub fn next_block(&mut self) -> Result, Error> { + pub async fn next_block(&mut self) -> Result, Error> { // Read node -> cid, bytes - let block = read_node(&mut self.reader)?.map(|(cid, data)| Block { cid, data }); + let block = read_node(&mut self.reader) + .await? + .map(|(cid, data)| Block { cid, data }); Ok(block) } } @@ -66,12 +97,15 @@ pub struct Block { } /// Loads a CAR buffer into a BlockStore -pub fn load_car(s: &B, reader: R) -> Result, Error> { - let mut car_reader = CarReader::new(reader)?; +pub async fn load_car(s: &B, reader: R) -> Result, Error> +where + R: AsyncRead + Send + Unpin, +{ + let mut car_reader = CarReader::new(reader).await?; // Batch write key value pairs from car file let mut buf: Vec<(Vec, Vec)> = Vec::with_capacity(100); - while let Some(block) = car_reader.next_block()? { + while let Some(block) = car_reader.next_block().await? { buf.push((block.cid.to_bytes(), block.data)); if buf.len() > 1000 { s.bulk_write(&buf) @@ -83,3 +117,59 @@ pub fn load_car(s: &B, reader: R) -> Result, Er .map_err(|e| Error::Other(e.to_string()))?; Ok(car_reader.header.roots) } + +#[cfg(test)] +mod tests { + use super::*; + use async_std::io::Cursor; + use async_std::sync::{channel, RwLock}; + use db::Store; + use std::sync::Arc; + + #[test] + fn symmetric_header() { + let cid = cid::new_from_cbor(b"test", cid::Code::Blake2b256); + + let header = CarHeader { + roots: vec![cid], + version: 1, + }; + + let bytes = to_vec(&header).unwrap(); + assert_eq!(from_slice::(&bytes).unwrap(), header); + } + + #[async_std::test] + async fn car_write_read() { + let buffer: Arc>> = Default::default(); + let cid = cid::new_from_cbor(b"test", cid::Code::Blake2b256); + + let header = CarHeader { + roots: vec![cid], + version: 1, + }; + assert_eq!(to_vec(&header).unwrap().len(), 60); + + let (tx, mut rx) = channel(10); + + let buffer_cloned = buffer.clone(); + let write_task = async_std::task::spawn(async move { + header + .write_stream_async(&mut *buffer_cloned.write().await, &mut rx) + .await + .unwrap() + }); + + tx.send((cid, b"test".to_vec())).await; + drop(tx); + write_task.await; + + let buffer: Vec<_> = buffer.read().await.clone(); + let reader = Cursor::new(&buffer); + + let db = db::MemoryDB::default(); + load_car(&db, reader).await.unwrap(); + + assert_eq!(db.read(&cid.to_bytes()).unwrap(), Some(b"test".to_vec())); + } +} diff --git a/ipld/car/src/util.rs b/ipld/car/src/util.rs index ae4760391689..3513ca030c28 100644 --- a/ipld/car/src/util.rs +++ b/ipld/car/src/util.rs @@ -3,31 +3,46 @@ use super::error::Error; use cid::Cid; -use std::io::Read; -use unsigned_varint::io::ReadError; +use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use integer_encoding::{VarIntAsyncReader, VarIntAsyncWriter}; -pub(crate) fn ld_read(mut reader: &mut R) -> Result>, Error> { - let l = match unsigned_varint::io::read_u64(&mut reader) { +pub(crate) async fn ld_read(mut reader: &mut R) -> Result>, Error> +where + R: AsyncRead + Send + Unpin, +{ + let l: usize = match VarIntAsyncReader::read_varint_async(&mut reader).await { Ok(len) => len, Err(e) => { - if let ReadError::Io(ioe) = &e { - if ioe.kind() == std::io::ErrorKind::UnexpectedEof { - return Ok(None); - } + if e.kind() == std::io::ErrorKind::UnexpectedEof { + return Ok(None); } return Err(Error::Other(e.to_string())); } }; let mut buf = Vec::with_capacity(l as usize); reader - .take(l) + .take(l as u64) .read_to_end(&mut buf) + .await .map_err(|e| Error::Other(e.to_string()))?; Ok(Some(buf)) } -pub(crate) fn read_node(buf_reader: &mut R) -> Result)>, Error> { - match ld_read(buf_reader)? { +pub(crate) async fn ld_write<'a, W>(writer: &mut W, bytes: &[u8]) -> Result<(), Error> +where + W: AsyncWrite + Send + Unpin, +{ + writer.write_varint_async(bytes.len()).await?; + writer.write_all(bytes).await?; + writer.flush().await?; + Ok(()) +} + +pub(crate) async fn read_node(buf_reader: &mut R) -> Result)>, Error> +where + R: AsyncRead + Send + Unpin, +{ + match ld_read(buf_reader).await? { Some(buf) => { let cid = Cid::read_bytes(&*buf)?; let len = cid.to_bytes().len(); @@ -36,3 +51,18 @@ pub(crate) fn read_node(buf_reader: &mut R) -> Result Ok(None), } } + +#[cfg(test)] +mod tests { + use super::*; + use async_std::io::Cursor; + + #[async_std::test] + async fn ld_read_write() { + let mut buffer = Vec::::new(); + ld_write(&mut buffer, b"test bytes").await.unwrap(); + let mut reader = Cursor::new(&buffer); + let read = ld_read(&mut reader).await.unwrap(); + assert_eq!(read, Some(b"test bytes".to_vec())); + } +} diff --git a/ipld/car/tests/car_file_test.rs b/ipld/car/tests/car_file_test.rs index b26b803167f5..211acb92547c 100644 --- a/ipld/car/tests/car_file_test.rs +++ b/ipld/car/tests/car_file_test.rs @@ -1,16 +1,16 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use async_std::fs::File; +use async_std::io::BufReader; use db::MemoryDB; use forest_car::*; -use std::fs::File; -use std::io::BufReader; -#[test] -fn load_into_blockstore() { - let file = File::open("tests/test.car").unwrap(); +#[async_std::test] +async fn load_into_blockstore() { + let file = File::open("tests/test.car").await.unwrap(); let buf_reader = BufReader::new(file); let mut bs = MemoryDB::default(); - let _ = load_car(&mut bs, buf_reader).unwrap(); + let _ = load_car(&mut bs, buf_reader).await.unwrap(); } diff --git a/node/forest_libp2p/Cargo.toml b/node/forest_libp2p/Cargo.toml index 06451c36d931..6ec63d720a5c 100644 --- a/node/forest_libp2p/Cargo.toml +++ b/node/forest_libp2p/Cargo.toml @@ -48,3 +48,4 @@ forest_address = { path = "../../vm/address" } num-bigint = { path = "../../utils/bigint", package = "forest_bigint" } crypto = { package = "forest_crypto", path = "../../crypto" } genesis = { path = "../../utils/genesis", features = ["testing"] } +async-std = { version = "1.6.3", features = ["attributes"] } diff --git a/node/forest_libp2p/src/chain_exchange/provider.rs b/node/forest_libp2p/src/chain_exchange/provider.rs index ecfd426ad205..b4d2f9a345e2 100644 --- a/node/forest_libp2p/src/chain_exchange/provider.rs +++ b/node/forest_libp2p/src/chain_exchange/provider.rs @@ -151,33 +151,33 @@ where mod tests { use super::super::BLOCKS_MESSAGES; use super::*; - use async_std::task; + use async_std::io::BufReader; use db::MemoryDB; use forest_car::load_car; use genesis::EXPORT_SR_40; - use std::io::BufReader; use std::sync::Arc; - fn populate_db() -> (Vec, MemoryDB) { + async fn populate_db() -> (Vec, MemoryDB) { let db = MemoryDB::default(); let reader = BufReader::<&[u8]>::new(EXPORT_SR_40.as_ref()); // The cids are the tipset cids of the most recent tipset (39th) - let cids: Vec = load_car(&db, reader).unwrap(); + let cids: Vec = load_car(&db, reader).await.unwrap(); return (cids, db); } - #[test] - fn compact_messages_test() { - let (cids, db) = populate_db(); + #[async_std::test] + async fn compact_messages_test() { + let (cids, db) = populate_db().await; - let response = task::block_on(make_chain_exchange_response( + let response = make_chain_exchange_response( &ChainStore::new(Arc::new(db)), &ChainExchangeRequest { start: cids, request_len: 2, options: BLOCKS_MESSAGES, }, - )); + ) + .await; // The response will be loaded with tipsets 39 and 38. // See: diff --git a/tests/conformance_tests/Cargo.toml b/tests/conformance_tests/Cargo.toml index f378e4f7a4d3..eaf21a4dd91d 100644 --- a/tests/conformance_tests/Cargo.toml +++ b/tests/conformance_tests/Cargo.toml @@ -25,6 +25,7 @@ submodule_tests = [ "state_manager", "state_tree", "forest_message", + "futures", "crypto" ] @@ -53,6 +54,7 @@ fil_types = { path = "../../types", optional = true } forest_message = { path = "../../vm/message", features = ["json"], optional = true } state_tree = { path = "../../vm/state_tree/", optional = true } chain = { path = "../../blockchain/chain", optional = true } +futures = { version = "0.3.5", optional = true } [dev-dependencies] regex = { version = "1.0" } @@ -64,7 +66,7 @@ lazy_static = "1.4" pretty_env_logger = "0.4.0" log = "0.4" paramfetch = { path = "../../utils/paramfetch" } -async-std = "1.6" +async-std = { version = "1.6", features = ["attributes"] } forest_blocks = { path = "../../blockchain/blocks" } chain_sync = { path = "../../blockchain/chain_sync" } statediff = { path = "../../utils/statediff" } diff --git a/tests/conformance_tests/tests/conformance_runner.rs b/tests/conformance_tests/tests/conformance_runner.rs index 31233270c428..aacce420c5d5 100644 --- a/tests/conformance_tests/tests/conformance_runner.rs +++ b/tests/conformance_tests/tests/conformance_runner.rs @@ -13,6 +13,7 @@ use encoding::Cbor; use fil_types::TOTAL_FILECOIN; use flate2::read::GzDecoder; use forest_message::{MessageReceipt, UnsignedMessage}; +use futures::AsyncRead; use interpreter::ApplyRet; use num_bigint::{BigInt, ToBigInt}; use paramfetch::{get_params_default, SectorSizeOpt}; @@ -22,7 +23,9 @@ use std::error::Error as StdError; use std::fmt; use std::fs::File; use std::io::BufReader; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use walkdir::{DirEntry, WalkDir}; lazy_static! { @@ -59,14 +62,26 @@ fn is_valid_file(entry: &DirEntry) -> bool { file_name.ends_with(".json") } -fn load_car(gzip_bz: &[u8]) -> Result> { +struct GzipDecoder(GzDecoder); + +impl AsyncRead for GzipDecoder { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Poll::Ready(std::io::Read::read(&mut self.0, buf)) + } +} + +async fn load_car(gzip_bz: &[u8]) -> Result> { let bs = db::MemoryDB::default(); // Decode gzip bytes - let d = GzDecoder::new(gzip_bz); + let d = GzipDecoder(GzDecoder::new(gzip_bz)); // Load car file with bytes - forest_car::load_car(&bs, d)?; + forest_car::load_car(&bs, d).await?; Ok(bs) } @@ -127,7 +142,7 @@ fn compare_state_roots(bs: &db::MemoryDB, root: &Cid, expected_root: &Cid) -> Re Ok(()) } -fn execute_message_vector( +async fn execute_message_vector( selector: &Option, car: &[u8], root_cid: Cid, @@ -138,7 +153,7 @@ fn execute_message_vector( randomness: &Randomness, variant: &Variant, ) -> Result<(), Box> { - let bs = load_car(car)?; + let bs = load_car(car).await?; let mut base_epoch: ChainEpoch = variant.epoch; let mut root = root_cid; @@ -177,7 +192,7 @@ fn execute_message_vector( Ok(()) } -fn execute_tipset_vector( +async fn execute_tipset_vector( _selector: &Option, car: &[u8], root_cid: Cid, @@ -185,7 +200,8 @@ fn execute_tipset_vector( postconditions: &PostConditions, variant: &Variant, ) -> Result<(), Box> { - let bs = Arc::new(load_car(car)?); + let bs = load_car(car).await?; + let bs = Arc::new(bs); let base_epoch = variant.epoch; let mut root = root_cid; @@ -229,12 +245,14 @@ fn execute_tipset_vector( Ok(()) } -#[test] -fn conformance_test_runner() { +#[async_std::test] +async fn conformance_test_runner() { pretty_env_logger::init(); // Retrieve verification params - async_std::task::block_on(get_params_default(SectorSizeOpt::Keys, false)).unwrap(); + get_params_default(SectorSizeOpt::Keys, false) + .await + .unwrap(); let walker = WalkDir::new("test-vectors/corpus").into_iter(); let mut failed = Vec::new(); @@ -270,7 +288,9 @@ fn conformance_test_runner() { &postconditions, &randomness, &variant, - ) { + ) + .await + { failed.push(( format!("{} variant {}", test_name, variant.id), meta.clone(), @@ -302,7 +322,9 @@ fn conformance_test_runner() { &apply_tipsets, &postconditions, &variant, - ) { + ) + .await + { failed.push(( format!("{} variant {}", test_name, variant.id), meta.clone(), diff --git a/utils/genesis/Cargo.toml b/utils/genesis/Cargo.toml index a7e3980e6d8d..e7c6b625e951 100644 --- a/utils/genesis/Cargo.toml +++ b/utils/genesis/Cargo.toml @@ -20,3 +20,4 @@ fil_types = { path = "../../types" } encoding = { path = "../../encoding", package = "forest_encoding" } net_utils = { path = "../net_utils" } url = "2.1.1" +futures = "0.3.5" diff --git a/utils/genesis/src/lib.rs b/utils/genesis/src/lib.rs index c3cc57c1f8b0..c6fc7943a952 100644 --- a/utils/genesis/src/lib.rs +++ b/utils/genesis/src/lib.rs @@ -1,21 +1,22 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use async_std::fs::File; +use async_std::io::BufReader; use blocks::{BlockHeader, Tipset, TipsetKeys}; use chain::ChainStore; use cid::Cid; use encoding::Cbor; use fil_types::verifier::ProofVerifier; use forest_car::load_car; +use futures::AsyncRead; use ipld_blockstore::BlockStore; use log::{debug, info}; use net_utils::FetchProgress; use state_manager::StateManager; use std::convert::TryFrom; use std::error::Error as StdError; -use std::fs::File; use std::include_bytes; -use std::io::{BufReader, Read}; use std::sync::Arc; use url::Url; @@ -24,7 +25,7 @@ pub const EXPORT_SR_40: &[u8; 1226395] = include_bytes!("mainnet/export40.car"); /// Uses an optional file path or the default genesis to parse the genesis and determine if /// chain store has existing data for the given genesis. -pub fn initialize_genesis( +pub async fn initialize_genesis( genesis_fp: Option<&String>, state_manager: &StateManager, ) -> Result<(Tipset, String), Box> @@ -33,15 +34,15 @@ where { let genesis = match genesis_fp { Some(path) => { - let file = File::open(path)?; + let file = File::open(path).await?; let reader = BufReader::new(file); - process_car(reader, state_manager.chain_store())? + process_car(reader, state_manager.chain_store()).await? } None => { debug!("No specified genesis in config. Using default genesis."); let bz = include_bytes!("mainnet/genesis.car"); let reader = BufReader::<&[u8]>::new(bz.as_ref()); - process_car(reader, state_manager.chain_store())? + process_car(reader, state_manager.chain_store()).await? } }; @@ -54,16 +55,16 @@ where Ok((Tipset::new(vec![genesis])?, network_name)) } -fn process_car( +async fn process_car( reader: R, chain_store: &ChainStore, ) -> Result> where - R: Read, + R: AsyncRead + Send + Unpin, BS: BlockStore + Send + Sync + 'static, { // Load genesis state into the database and get the Cid - let genesis_cids: Vec = load_car(chain_store.blockstore(), reader)?; + let genesis_cids: Vec = load_car(chain_store.blockstore(), reader).await?; if genesis_cids.len() != 1 { panic!("Invalid Genesis. Genesis Tipset must have only 1 Block."); } @@ -108,12 +109,14 @@ where let url = Url::parse(path).expect("URL is invalid"); info!("Downloading file..."); let reader = FetchProgress::try_from(url)?; - load_car(sm.blockstore(), reader)? + load_car(sm.blockstore(), reader).await? } else { - let file = File::open(&path).expect("Snapshot file path not found!"); + let file = File::open(&path) + .await + .expect("Snapshot file path not found!"); info!("Reading file..."); let reader = FetchProgress::try_from(file)?; - load_car(sm.blockstore(), reader)? + load_car(sm.blockstore(), reader).await? }; let ts = sm .chain_store() diff --git a/utils/net_utils/src/download.rs b/utils/net_utils/src/download.rs index 298d65dd6d33..ba9a3d4294d2 100644 --- a/utils/net_utils/src/download.rs +++ b/utils/net_utils/src/download.rs @@ -1,14 +1,14 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use async_std::io::BufRead; +use async_std::fs::File; +use async_std::io::BufReader; use futures::prelude::*; use isahc::{Body, HttpClient}; use pbr::ProgressBar; use pin_project_lite::pin_project; use std::convert::TryFrom; -use std::fs::File; -use std::io::{self, BufReader, Read, Result as IOResult, Stdout, Write}; +use std::io::{self, Stdout, Write}; use std::pin::Pin; use std::task::{Context, Poll}; use thiserror::Error; @@ -49,26 +49,6 @@ impl AsyncRead for FetchProgress { } } -impl BufRead for FetchProgress { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - this.inner.poll_fill_buf(cx) - } - - fn consume(mut self: Pin<&mut Self>, amt: usize) { - Pin::new(&mut self.inner).consume(amt) - } -} - -impl Read for FetchProgress { - fn read(&mut self, buf: &mut [u8]) -> IOResult { - self.inner.read(buf).map(|n| { - self.progress_bar.add(n as u64); - n - }) - } -} - impl TryFrom for FetchProgress { type Error = Box; @@ -102,7 +82,7 @@ impl TryFrom for FetchProgress, Stdout> { type Error = Box; fn try_from(file: File) -> Result { - let total_size = file.metadata()?.len(); + let total_size = async_std::task::block_on(file.metadata())?.len(); let pb = ProgressBar::new(total_size);