Skip to content

Commit

Permalink
Allow importing snapshot from URL + Progress bar (ChainSafe#762)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaumov committed Nov 5, 2020
1 parent b31fa0a commit 66f8ec3
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 63 deletions.
Binary file added 5MB.zip
Binary file not shown.
162 changes: 117 additions & 45 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ members = [
"utils/commcid",
"utils/json_utils",
"utils/genesis",
"utils/net_utils",
"utils/statediff",
"types",
"key_management",
Expand Down
3 changes: 3 additions & 0 deletions forest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ wallet = { package = "key_management", path = "../key_management" }
jsonrpc-v2 = { version = "0.5.2", git = "https://github.com/ChainSafe/jsonrpc-v2", features = ["easy-errors", "macros"], default-features = false }
uuid = { version = "0.8.1", features = ["v4"] }
auth = { path = "../utils/auth"}
net_utils = { path = "../utils/net_utils" }
actor = { path = "../vm/actor/" }
genesis = { path = "../utils/genesis" }
paramfetch = { path = "../utils/paramfetch" }
encoding = { package = "forest_encoding", path = "../encoding" }
reqwest = "0.10.8"
indicatif = "0.15.0"
4 changes: 2 additions & 2 deletions forest/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ pub struct DaemonOpts {
pub kademlia: Option<bool>,
#[structopt(short, long, help = "Allow MDNS (default = true)")]
pub mdns: Option<bool>,
#[structopt(long, help = "Import a snapshot from a CAR file")]
#[structopt(long, help = "Import a snapshot from a local CAR file or url")]
pub import_snapshot: Option<String>,
#[structopt(long, help = "Import a chain from CAR file")]
#[structopt(long, help = "Import a chain from a local CAR file or url")]
pub import_chain: Option<String>,
}

Expand Down
48 changes: 33 additions & 15 deletions forest/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use ipld_blockstore::BlockStore;
use libp2p::identity::{ed25519, Keypair};
use log::{debug, info, trace};
use message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider};
use net_utils::download_file;
use paramfetch::{get_params_default, SectorSizeOpt};
use rpc::{start_rpc, RpcState};
use state_manager::StateManager;
use std::fs::File;
use std::fs::{remove_file, File};
use std::io::BufReader;
use std::io::Read;
use std::sync::Arc;
use utils::write_to_file;
use wallet::{KeyStore, PersistentKeyStore};
Expand All @@ -36,11 +36,23 @@ use wallet::{KeyStore, PersistentKeyStore};
const WORKER_TASKS: usize = 1;

/// Import a chain from a CAR file
async fn import_chain<V: ProofVerifier, R: Read, DB: BlockStore>(
async fn import_chain<V: ProofVerifier, DB: BlockStore>(
bs: Arc<DB>,
reader: R,
path: String,
snapshot: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let is_remote_file: bool = path.starts_with("http://") || path.starts_with("https://");

let mut file_path = path;
if is_remote_file {
match download_file(file_path).await {
Ok(file) => file_path = file,
Err(err) => return Err(err),
}
}

let file = File::open(&file_path).expect("Snapshot file path not found!");
let reader = BufReader::new(file);
info!("Importing chain from snapshot");
// start import
let cids = load_car(bs.as_ref(), reader)?;
Expand All @@ -58,6 +70,10 @@ async fn import_chain<V: ProofVerifier, R: Read, DB: BlockStore>(
ts.cids(),
gen_cid
);

if is_remote_file {
remove_file(file_path).unwrap();
}
Ok(())
}

Expand Down Expand Up @@ -96,9 +112,7 @@ pub(super) async fn start(config: Config) {

// Sync from snapshot
if let Some(path) = &config.snapshot_path {
let file = File::open(path).expect("Snapshot file path not found!");
let reader = BufReader::new(file);
import_chain::<FullVerifier, _, _>(Arc::clone(&db), reader, false)
import_chain::<FullVerifier, _>(Arc::clone(&db), path.to_string(), false)
.await
.unwrap();
}
Expand Down Expand Up @@ -234,19 +248,23 @@ mod test {
#[async_std::test]
async fn import_snapshot_from_file() {
let db = Arc::new(MemoryDB::default());
let file = File::open("test_files/chain4.car").expect("Snapshot file path not found!");
let reader = BufReader::new(file);
import_chain::<FullVerifier, _, _>(Arc::clone(&db), reader, true)
// let file = File::open("test_files/chain4.car").expect("Snapshot file path not found!");
// let reader = BufReader::new(file);
import_chain::<FullVerifier, _>(Arc::clone(&db), "test_files/chain4.car".to_string(), true)
.await
.expect("Failed to import chain");
}
#[async_std::test]
async fn import_chain_from_file() {
let db = Arc::new(MemoryDB::default());
let file = File::open("test_files/chain4.car").expect("Snapshot file path not found!");
let reader = BufReader::new(file);
import_chain::<FullVerifier, _, _>(Arc::clone(&db), reader, false)
.await
.expect("Failed to import chain");
// let file = File::open("test_files/chain4.car").expect("Snapshot file path not found!");
// let reader = BufReader::new(file);
import_chain::<FullVerifier, _>(
Arc::clone(&db),
"test_files/chain4.car".to_string(),
false,
)
.await
.expect("Failed to import chain");
}
}
2 changes: 1 addition & 1 deletion tests/conformance_tests/test-vectors
Submodule test-vectors updated 1006 files
14 changes: 14 additions & 0 deletions utils/net_utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "net_utils"
version = "0.1.0"
authors = ["ChainSafe Systems <info@chainsafe.io>"]
edition = "2018"

[dependencies]
indicatif = "0.15.0"
futures = "0.3.5"
async-std = { version = "1.6.3" }
isahc = "0.9.11"
url = "2.1.1"
log = "0.4.8"
thiserror = "1.0"
103 changes: 103 additions & 0 deletions utils/net_utils/src/download.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use async_std::{
fs,
io::{self, copy, Read as AsyncRead},
};
use futures::task::{Context, Poll};
use indicatif::{ProgressBar, ProgressStyle};
use isahc::HttpClient;
use log::info;
use std::{marker::Unpin, path::Path, pin::Pin};
use thiserror::Error;
use url::Url;

/// Contains progress bar and reader.
struct DownloadProgress<S>
where
S: Unpin,
{
inner: S,
progress_bar: ProgressBar,
}

#[derive(Debug, Error)]
enum DownloadError {
#[error("Cannot read a file header")]
HeaderError,
#[error("Filename encoding error")]
EncodingError,
}

impl<S: AsyncRead + Unpin> AsyncRead for DownloadProgress<S> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
match Pin::new(&mut self.inner).poll_read(cx, buf) {
Poll::Ready(Ok(size)) => {
if size == 0 {
self.progress_bar.finish();
} else {
self.progress_bar.inc(size as u64);
}
Poll::Ready(Ok(size))
}
rest => rest,
}
}
}

/// Downloads the file and returns the path where it's saved.
pub async fn download_file(raw_url: String) -> Result<String, Box<dyn std::error::Error>> {
let url = Url::parse(raw_url.as_str())?;

let client = HttpClient::new()?;
let total_size = {
let resp = client.head(url.as_str())?;
if resp.status().is_success() {
resp.headers()
.get("content-length")
.and_then(|ct_len| ct_len.to_str().ok())
.and_then(|ct_len| ct_len.parse().ok())
.unwrap_or(0)
} else {
return Err(Box::new(DownloadError::HeaderError));
}
};

info!("Downloading file...");
let mut request = client.get(url.as_str())?;

let pb = ProgressBar::new(total_size);
pb.set_style(ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")
.progress_chars("#>-"));

let file = Path::new(
url.path_segments()
.and_then(|segments| segments.last())
.unwrap_or("tmp.bin"),
);

let mut source = DownloadProgress {
progress_bar: pb,
inner: request.body_mut(),
};

let mut dest = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&file)
.await?;

let _ = copy(&mut source, &mut dest).await?;

info!("File has been downloaded");
match file.to_str() {
Some(st) => Ok(st.to_string()),
None => Err(Box::new(DownloadError::EncodingError)),
}
}
6 changes: 6 additions & 0 deletions utils/net_utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

mod download;

pub use self::download::*;

0 comments on commit 66f8ec3

Please sign in to comment.