Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow importing snapshot from URL + Progress bar (#762) #811

Merged
merged 6 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ members = [
"utils/commcid",
"utils/json_utils",
"utils/genesis",
"utils/net_utils",
"utils/statediff",
"types",
"key_management",
Expand Down
3 changes: 2 additions & 1 deletion forest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ wallet = { package = "key_management", path = "../key_management" }
jsonrpc-v2 = { version = "0.5.2", git = "https://github.com/ChainSafe/jsonrpc-v2", features = ["easy-errors", "macros"], default-features = false }
uuid = { version = "0.8.1", features = ["v4"] }
auth = { path = "../utils/auth"}
net_utils = { path = "../utils/net_utils" }
actor = { path = "../vm/actor/" }
genesis = { path = "../utils/genesis" }
paramfetch = { path = "../utils/paramfetch" }
encoding = { package = "forest_encoding", path = "../encoding" }
encoding = { package = "forest_encoding", path = "../encoding" }
4 changes: 2 additions & 2 deletions forest/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ pub struct DaemonOpts {
pub kademlia: Option<bool>,
#[structopt(short, long, help = "Allow MDNS (default = true)")]
pub mdns: Option<bool>,
#[structopt(long, help = "Import a snapshot from a CAR file")]
#[structopt(long, help = "Import a snapshot from a local CAR file or url")]
pub import_snapshot: Option<String>,
#[structopt(long, help = "Import a chain from CAR file")]
#[structopt(long, help = "Import a chain from a local CAR file or url")]
pub import_chain: Option<String>,
}

Expand Down
19 changes: 4 additions & 15 deletions forest/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

use super::cli::{block_until_sigint, Config};
use actor::EPOCH_DURATION_SECONDS;
use async_std::sync::RwLock;
use async_std::task;
use async_std::{sync::RwLock, task};
use auth::{generate_priv_key, JWT_IDENTIFIER};
use beacon::{DrandBeacon, DEFAULT_DRAND_URL};
use chain::ChainStore;
Expand All @@ -20,8 +19,6 @@ use message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider};
use paramfetch::{get_params_default, SectorSizeOpt};
use rpc::{start_rpc, RpcState};
use state_manager::StateManager;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
use utils::write_to_file;
use wallet::{KeyStore, PersistentKeyStore};
Expand Down Expand Up @@ -76,9 +73,7 @@ pub(super) async fn start(config: Config) {

// Sync from snapshot
if let Some(path) = &config.snapshot_path {
let file = File::open(path).expect("Snapshot file path not found!");
let reader = BufReader::new(file);
import_chain::<FullVerifier, _, _>(&state_manager, reader, Some(0))
import_chain::<FullVerifier, _>(&state_manager, path, Some(0))
.await
.unwrap();
}
Expand Down Expand Up @@ -192,17 +187,13 @@ pub(super) async fn start(config: Config) {
mod test {
use super::*;
use db::MemoryDB;
use std::fs::File;
use std::io::BufReader;

#[async_std::test]
async fn import_snapshot_from_file() {
let db = Arc::new(MemoryDB::default());
let cs = Arc::new(ChainStore::new(db));
let sm = Arc::new(StateManager::new(cs));
let file = File::open("test_files/chain4.car").expect("Snapshot file path not found!");
let reader = BufReader::new(file);
import_chain::<FullVerifier, _, _>(&sm, reader, None)
import_chain::<FullVerifier, _>(&sm, "test_files/chain4.car", None)
.await
.expect("Failed to import chain");
}
Expand All @@ -211,9 +202,7 @@ mod test {
let db = Arc::new(MemoryDB::default());
let cs = Arc::new(ChainStore::new(db));
let sm = Arc::new(StateManager::new(cs));
let file = File::open("test_files/chain4.car").expect("Snapshot file path not found!");
let reader = BufReader::new(file);
import_chain::<FullVerifier, _, _>(&sm, reader, Some(0))
import_chain::<FullVerifier, _>(&sm, "test_files/chain4.car", Some(0))
.await
.expect("Failed to import chain");
}
Expand Down
2 changes: 2 additions & 0 deletions utils/genesis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ blocks = { package = "forest_blocks", path = "../../blockchain/blocks" }
chain = { path = "../../blockchain/chain" }
fil_types = { path = "../../types" }
encoding = { path = "../../encoding", package = "forest_encoding" }
net_utils = { path = "../net_utils" }
url = "2.1.1"
22 changes: 19 additions & 3 deletions utils/genesis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ use fil_types::verifier::ProofVerifier;
use forest_car::load_car;
use ipld_blockstore::BlockStore;
use log::{debug, info};
use net_utils::FetchProgress;
use state_manager::StateManager;
use std::convert::TryFrom;
use std::error::Error as StdError;
use std::fs::File;
use std::include_bytes;
use std::io::{BufReader, Read};
use std::sync::Arc;
use url::Url;

#[cfg(feature = "testing")]
pub const EXPORT_SR_40: &[u8; 1226395] = include_bytes!("mainnet/export40.car");
Expand Down Expand Up @@ -89,17 +92,29 @@ where

/// Import a chain from a CAR file. If the snapshot boolean is set, it will not verify the chain
/// state and instead accept the largest height as genesis.
pub async fn import_chain<V: ProofVerifier, R: Read, DB>(
pub async fn import_chain<V: ProofVerifier, DB>(
sm: &Arc<StateManager<DB>>,
reader: R,
path: &str,
validate_height: Option<i64>,
) -> Result<(), Box<dyn std::error::Error>>
where
DB: BlockStore + Send + Sync + 'static,
{
let is_remote_file: bool = path.starts_with("http://") || path.starts_with("https://");

info!("Importing chain from snapshot");
// start import
let cids = load_car(sm.blockstore(), reader)?;
let cids = if is_remote_file {
let url = Url::parse(path).expect("URL is invalid");
info!("Downloading file...");
let reader = FetchProgress::try_from(url)?;
load_car(sm.blockstore(), reader)?
} else {
let file = File::open(&path).expect("Snapshot file path not found!");
info!("Reading file...");
let reader = FetchProgress::try_from(file)?;
load_car(sm.blockstore(), reader)?
};
let ts = sm
.chain_store()
.tipset_from_keys(&TipsetKeys::new(cids))
Expand All @@ -109,6 +124,7 @@ where
.tipset_by_height(0, &ts, true)
.await?
.unwrap();

if let Some(height) = validate_height {
info!("Validating imported chain");
sm.validate_chain::<V>(ts.clone(), height).await?;
Expand Down
15 changes: 15 additions & 0 deletions utils/net_utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "net_utils"
version = "0.1.0"
authors = ["ChainSafe Systems <info@chainsafe.io>"]
edition = "2018"

[dependencies]
isahc = "0.9.11"
snaumov marked this conversation as resolved.
Show resolved Hide resolved
url = "2.1.1"
log = "0.4.8"
thiserror = "1.0"
pbr = "1.0.3"
pin-project-lite = "0.1"
async-std = { version = "1.6.3", features = ["attributes"] }
futures = "0.3.5"
114 changes: 114 additions & 0 deletions utils/net_utils/src/download.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use async_std::io::BufRead;
use futures::prelude::*;
use isahc::{Body, HttpClient};
use pbr::ProgressBar;
use pin_project_lite::pin_project;
use std::convert::TryFrom;
use std::fs::File;
use std::io::{self, BufReader, Read, Result as IOResult, Stdout, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
use thiserror::Error;
use url::Url;

#[derive(Debug, Error)]
enum DownloadError {
#[error("Cannot read a file header")]
HeaderError,
}

pin_project! {
/// Holds a Reader, tracks read progress and draw a progress bar.
pub struct FetchProgress<R, W: Write> {
#[pin]
pub inner: R,
pub progress_bar: ProgressBar<W>,
}
}

impl<R, W: Write> FetchProgress<R, W> {
pub fn finish(&mut self) {
self.progress_bar.finish();
}
}

impl<R: AsyncRead + Unpin, W: Write> AsyncRead for FetchProgress<R, W> {
fn poll_read(
snaumov marked this conversation as resolved.
Show resolved Hide resolved
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
let r = Pin::new(&mut self.inner).poll_read(cx, buf);
if let Poll::Ready(Ok(size)) = r {
self.progress_bar.add(size as u64);
}
r
}
}

impl<R: BufRead + Unpin, W: Write> BufRead for FetchProgress<R, W> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IOResult<&'_ [u8]>> {
let this = self.project();
this.inner.poll_fill_buf(cx)
}

fn consume(mut self: Pin<&mut Self>, amt: usize) {
Pin::new(&mut self.inner).consume(amt)
}
}

impl<R: Read, W: Write> Read for FetchProgress<R, W> {
fn read(&mut self, buf: &mut [u8]) -> IOResult<usize> {
self.inner.read(buf).map(|n| {
self.progress_bar.add(n as u64);
n
})
}
}

impl TryFrom<Url> for FetchProgress<Body, Stdout> {
type Error = Box<dyn std::error::Error>;

fn try_from(url: Url) -> Result<Self, Self::Error> {
let client = HttpClient::new()?;
let total_size = {
let resp = client.head(url.as_str())?;
if resp.status().is_success() {
resp.headers()
.get("content-length")
.and_then(|ct_len| ct_len.to_str().ok())
.and_then(|ct_len| ct_len.parse().ok())
.unwrap_or(0)
} else {
return Err(Box::new(DownloadError::HeaderError));
}
};

let request = client.get(url.as_str())?;

let pb = ProgressBar::new(total_size);

Ok(FetchProgress {
progress_bar: pb,
inner: request.into_body(),
})
}
}

impl TryFrom<File> for FetchProgress<BufReader<File>, Stdout> {
type Error = Box<dyn std::error::Error>;

fn try_from(file: File) -> Result<Self, Self::Error> {
let total_size = file.metadata()?.len();

let pb = ProgressBar::new(total_size);

Ok(FetchProgress {
progress_bar: pb,
inner: BufReader::new(file),
})
}
}
6 changes: 6 additions & 0 deletions utils/net_utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

mod download;

pub use self::download::*;
3 changes: 3 additions & 0 deletions utils/paramfetch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ log = "0.4.8"
blake2b_simd = "0.5.9"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
isahc = "0.9.11"
url = "2.1.1"
net_utils = { path = "../net_utils" }
Loading