Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug with import and cleanup #844

Merged
merged 3 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion blockchain/state_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,9 +908,10 @@ where
pub async fn validate_chain<V: ProofVerifier>(
self: &Arc<Self>,
mut ts: Tipset,
height: i64,
) -> Result<(), Box<dyn StdError>> {
let mut ts_chain = Vec::<Tipset>::new();
while ts.epoch() != 0 {
while ts.epoch() != height {
let next = self.cs.tipset_from_keys(ts.parents())?;
ts_chain.push(std::mem::replace(&mut ts, next));
}
Expand Down
53 changes: 10 additions & 43 deletions forest/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,13 @@ use async_std::sync::RwLock;
use async_std::task;
use auth::{generate_priv_key, JWT_IDENTIFIER};
use beacon::{DrandBeacon, DEFAULT_DRAND_URL};
use blocks::TipsetKeys;
use chain::ChainStore;
use chain_sync::ChainSyncer;
use db::RocksDb;
use encoding::Cbor;
use fil_types::verifier::{FullVerifier, ProofVerifier};
use fil_types::verifier::FullVerifier;
use flo_stream::{MessagePublisher, Publisher};
use forest_car::load_car;
use forest_libp2p::{get_keypair, Libp2pService};
use genesis::initialize_genesis;
use ipld_blockstore::BlockStore;
use genesis::{import_chain, initialize_genesis};
use libp2p::identity::{ed25519, Keypair};
use log::{debug, info, trace};
use message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider};
Expand All @@ -26,7 +22,6 @@ use rpc::{start_rpc, RpcState};
use state_manager::StateManager;
use std::fs::File;
use std::io::BufReader;
use std::io::Read;
use std::sync::Arc;
use utils::write_to_file;
use wallet::{KeyStore, PersistentKeyStore};
Expand All @@ -35,35 +30,6 @@ use wallet::{KeyStore, PersistentKeyStore};
// TODO benchmark and/or add this as a config option. (1 is temporary value to avoid overlap)
const WORKER_TASKS: usize = 1;

/// Import a chain from a CAR file
async fn import_chain<V: ProofVerifier, R: Read, DB>(
sm: &Arc<StateManager<DB>>,
reader: R,
snapshot: bool,
) -> Result<(), Box<dyn std::error::Error>>
where
DB: BlockStore + Send + Sync + 'static,
{
info!("Importing chain from snapshot");
// start import
let cids = load_car(sm.blockstore(), reader)?;
let ts = sm.chain_store().tipset_from_keys(&TipsetKeys::new(cids))?;
let gb = sm.chain_store().tipset_by_height(0, &ts, true)?.unwrap();
if !snapshot {
info!("Validating imported chain");
sm.validate_chain::<V>(ts.clone()).await?;
}
let gen_cid = sm.chain_store().set_genesis(&gb.blocks()[0])?;
sm.blockstore()
.write(chain::HEAD_KEY, ts.key().marshal_cbor()?)?;
info!(
"Accepting {:?} as new head with genesis {:?}",
ts.cids(),
gen_cid
);
Ok(())
}

/// Starts daemon process
pub(super) async fn start(config: Config) {
info!("Starting Forest daemon");
Expand Down Expand Up @@ -101,19 +67,20 @@ pub(super) async fn start(config: Config) {
let chain_store = Arc::new(ChainStore::new(Arc::clone(&db)));
let state_manager = Arc::new(StateManager::new(Arc::clone(&chain_store)));

// Read Genesis file
// * When snapshot command implemented, this genesis does not need to be initialized
let (genesis, network_name) =
initialize_genesis(config.genesis_file.as_ref(), &state_manager).unwrap();

// Sync from snapshot
if let Some(path) = &config.snapshot_path {
let file = File::open(path).expect("Snapshot file path not found!");
let reader = BufReader::new(file);
import_chain::<FullVerifier, _, _>(&state_manager, reader, false)
import_chain::<FullVerifier, _, _>(&state_manager, reader, Some(0))
.await
.unwrap();
}

// Read Genesis file
let (genesis, network_name) =
initialize_genesis(config.genesis_file.as_ref(), &state_manager).unwrap();

// Fetch and ensure verification keys are downloaded
get_params_default(SectorSizeOpt::Keys, false)
.await
Expand Down Expand Up @@ -228,7 +195,7 @@ mod test {
let sm = Arc::new(StateManager::new(cs));
let file = File::open("test_files/chain4.car").expect("Snapshot file path not found!");
let reader = BufReader::new(file);
import_chain::<FullVerifier, _, _>(&sm, reader, true)
import_chain::<FullVerifier, _, _>(&sm, reader, None)
.await
.expect("Failed to import chain");
}
Expand All @@ -239,7 +206,7 @@ mod test {
let sm = Arc::new(StateManager::new(cs));
let file = File::open("test_files/chain4.car").expect("Snapshot file path not found!");
let reader = BufReader::new(file);
import_chain::<FullVerifier, _, _>(&sm, reader, false)
import_chain::<FullVerifier, _, _>(&sm, reader, Some(0))
.await
.expect("Failed to import chain");
}
Expand Down
6 changes: 0 additions & 6 deletions node/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ pub use memory::MemoryDB;
#[cfg(feature = "rocksdb")]
pub use rocks::{RocksDb, WriteBatch};

pub trait DatabaseService {
fn open(&mut self) -> Result<(), Error> {
Ok(())
}
}

/// Store interface used as a KV store implementation
pub trait Store {
/// Read single value from data store and return `None` if key doesn't exist.
Expand Down
14 changes: 2 additions & 12 deletions node/db/src/memory.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::{DatabaseService, Error, Store};
use super::{Error, Store};
use parking_lot::RwLock;
use std::collections::{hash_map::DefaultHasher, HashMap};
use std::hash::{Hash, Hasher};

/// A thread-safe `HashMap` wrapper.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct MemoryDB {
db: RwLock<HashMap<u64, Vec<u8>>>,
}
Expand All @@ -31,16 +31,6 @@ impl Clone for MemoryDB {
}
}

impl Default for MemoryDB {
fn default() -> Self {
Self {
db: RwLock::new(HashMap::new()),
}
}
}

impl DatabaseService for MemoryDB {}

impl Store for MemoryDB {
fn write<K, V>(&self, key: K, value: V) -> Result<(), Error>
where
Expand Down
8 changes: 1 addition & 7 deletions node/db/src/rocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#![cfg(feature = "rocksdb")]

use super::errors::Error;
use super::{DatabaseService, Store};
use super::Store;
pub use rocksdb::{Options, WriteBatch, DB};
use std::env::temp_dir;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -67,12 +67,6 @@ impl RocksDb {
}
}

impl DatabaseService for RocksDb {
fn open(&mut self) -> Result<(), Error> {
self.open()
}
}

impl Store for RocksDb {
fn write<K, V>(&self, key: K, value: V) -> Result<(), Error>
where
Expand Down
8 changes: 0 additions & 8 deletions node/db/tests/mem_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@ mod subtests;

use db::MemoryDB;

#[test]
fn mem_db_open() {
let mut db = MemoryDB::default();
subtests::open(&mut db);
// Calling open on opened db should not error
subtests::open(&mut db);
}

#[test]
fn mem_db_write() {
let db = MemoryDB::default();
Expand Down
25 changes: 8 additions & 17 deletions node/db/tests/rocks_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,75 +9,66 @@ mod subtests;
use db::RocksDb;
use db_utils::DBPath;

#[test]
fn rocks_db_open() {
let path = DBPath::new("start_rocks_test");
let mut db = RocksDb::new(path.as_ref());
subtests::open(&mut db);
// Calling open on opened db should not error
subtests::open(&mut db);
}

#[test]
fn rocks_db_write() {
let path = DBPath::new("write_rocks_test");
let mut db = RocksDb::new(path.as_ref());
subtests::open(&mut db);
db.open().unwrap();
subtests::write(&db);
}

#[test]
fn rocks_db_read() {
let path = DBPath::new("read_rocks_test");
let mut db = RocksDb::new(path.as_ref());
subtests::open(&mut db);
db.open().unwrap();
subtests::read(&db);
}

#[test]
fn rocks_db_exists() {
let path = DBPath::new("exists_rocks_test");
let mut db = RocksDb::new(path.as_ref());
subtests::open(&mut db);
db.open().unwrap();
subtests::exists(&db);
}

#[test]
fn rocks_db_does_not_exist() {
let path = DBPath::new("does_not_exists_rocks_test");
let mut db = RocksDb::new(path.as_ref());
subtests::open(&mut db);
db.open().unwrap();
subtests::does_not_exist(&db);
}

#[test]
fn rocks_db_delete() {
let path = DBPath::new("delete_rocks_test");
let mut db = RocksDb::new(path.as_ref());
subtests::open(&mut db);
db.open().unwrap();
subtests::delete(&db);
}

#[test]
fn rocks_db_bulk_write() {
let path = DBPath::new("bulk_write_rocks_test");
let mut db = RocksDb::new(path.as_ref());
subtests::open(&mut db);
db.open().unwrap();
subtests::bulk_write(&db);
}

#[test]
fn rocks_db_bulk_read() {
let path = DBPath::new("bulk_read_rocks_test");
let mut db = RocksDb::new(path.as_ref());
subtests::open(&mut db);
db.open().unwrap();
subtests::bulk_read(&db);
}

#[test]
fn rocks_db_bulk_delete() {
let path = DBPath::new("bulk_delete_rocks_test");
let mut db = RocksDb::new(path.as_ref());
subtests::open(&mut db);
db.open().unwrap();
subtests::bulk_delete(&db);
}
9 changes: 1 addition & 8 deletions node/db/tests/subtests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use db::{DatabaseService, Store};

pub fn open<DB>(db: &mut DB)
where
DB: DatabaseService,
{
db.open().unwrap();
}
use db::Store;

pub fn write<DB>(db: &DB)
where
Expand Down
2 changes: 2 additions & 0 deletions utils/genesis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ state_manager = { path = "../../blockchain/state_manager" }
cid = { package = "forest_cid", path = "../../ipld/cid" }
blocks = { package = "forest_blocks", path = "../../blockchain/blocks" }
chain = { path = "../../blockchain/chain" }
fil_types = { path = "../../types" }
encoding = { path = "../../encoding", package = "forest_encoding" }
34 changes: 33 additions & 1 deletion utils/genesis/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use blocks::{BlockHeader, Tipset};
use blocks::{BlockHeader, Tipset, TipsetKeys};
use chain::ChainStore;
use cid::Cid;
use encoding::Cbor;
use fil_types::verifier::ProofVerifier;
use forest_car::load_car;
use ipld_blockstore::BlockStore;
use log::{debug, info};
Expand Down Expand Up @@ -84,3 +86,33 @@ where
Ok(genesis_block)
}
}

/// Import a chain from a CAR file. If the snapshot boolean is set, it will not verify the chain
/// state and instead accept the largest height as genesis.
pub async fn import_chain<V: ProofVerifier, R: Read, DB>(
sm: &Arc<StateManager<DB>>,
reader: R,
validate_height: Option<i64>,
) -> Result<(), Box<dyn std::error::Error>>
where
DB: BlockStore + Send + Sync + 'static,
{
info!("Importing chain from snapshot");
// start import
let cids = load_car(sm.blockstore(), reader)?;
let ts = sm.chain_store().tipset_from_keys(&TipsetKeys::new(cids))?;
let gb = sm.chain_store().tipset_by_height(0, &ts, true)?.unwrap();
if let Some(height) = validate_height {
info!("Validating imported chain");
sm.validate_chain::<V>(ts.clone(), height).await?;
}
let gen_cid = sm.chain_store().set_genesis(&gb.blocks()[0])?;
sm.blockstore()
.write(chain::HEAD_KEY, ts.key().marshal_cbor()?)?;
info!(
"Accepting {:?} as new head with genesis {:?}",
ts.cids(),
gen_cid
);
Ok(())
}