From f276ad902853616c92b80573f4ec6e481feff0d2 Mon Sep 17 00:00:00 2001 From: Stepan Naumov Date: Tue, 3 Nov 2020 15:55:49 -0500 Subject: [PATCH 1/6] Allow importing snapshot from URL + Progress bar (#762) --- Cargo.lock | 40 ++++++++++++- Cargo.toml | 1 + forest/Cargo.toml | 1 + forest/src/cli/mod.rs | 4 +- forest/src/daemon.rs | 25 ++++---- utils/genesis/src/lib.rs | 41 +++++++++++++ utils/net_utils/Cargo.toml | 14 +++++ utils/net_utils/src/download.rs | 103 ++++++++++++++++++++++++++++++++ utils/net_utils/src/lib.rs | 6 ++ 9 files changed, 218 insertions(+), 17 deletions(-) create mode 100644 utils/net_utils/Cargo.toml create mode 100644 utils/net_utils/src/download.rs create mode 100644 utils/net_utils/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index c3ba4216c64d..e4c923babed0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2057,7 +2057,7 @@ dependencies = [ "groupy", "hex", "humansize", - "indicatif", + "indicatif 0.14.0", "itertools 0.9.0", "lazy_static", "log", @@ -2199,6 +2199,7 @@ dependencies = [ "libp2p", "log", "message_pool", + "net_utils", "paramfetch", "pretty_env_logger", "rpc", @@ -3057,6 +3058,18 @@ dependencies = [ "regex", ] +[[package]] +name = "indicatif" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7baab56125e25686df467fe470785512329883aab42696d661247aca2a2896e4" +dependencies = [ + "console 0.12.0", + "lazy_static", + "number_prefix", + "regex", +] + [[package]] name = "infer" version = "0.2.3" @@ -3188,6 +3201,7 @@ dependencies = [ "crossbeam-utils 0.8.0", "curl", "curl-sys", + "encoding_rs", "futures-channel", "futures-io", "futures-util", @@ -4156,6 +4170,19 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "net_utils" +version = "0.1.0" +dependencies = [ + "async-std", + "futures 0.3.6", + "indicatif 0.15.0", + "isahc", + "log", + "thiserror", + "url", +] + [[package]] name = "nix" version = "0.18.0" @@ -4661,6 +4688,17 @@ dependencies = [ "syn 1.0.48", ] +[[package]] +name = "pin-project-internal" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81a4ffa594b66bff340084d4081df649a7dc049ac8d7fc458d8e628bfbbb2f86" +dependencies = [ + "proc-macro2 1.0.24", + "quote 1.0.7", + "syn 1.0.44", +] + [[package]] name = "pin-project-lite" version = "0.1.11" diff --git a/Cargo.toml b/Cargo.toml index 153b1d7e09c9..d50026a32877 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ members = [ "utils/commcid", "utils/json_utils", "utils/genesis", + "utils/net_utils", "utils/statediff", "types", "key_management", diff --git a/forest/Cargo.toml b/forest/Cargo.toml index 601f1d8d7055..d4f680bdb7d1 100644 --- a/forest/Cargo.toml +++ b/forest/Cargo.toml @@ -38,6 +38,7 @@ wallet = { package = "key_management", path = "../key_management" } jsonrpc-v2 = { version = "0.5.2", git = "https://github.com/ChainSafe/jsonrpc-v2", features = ["easy-errors", "macros"], default-features = false } uuid = { version = "0.8.1", features = ["v4"] } auth = { path = "../utils/auth"} +net_utils = { path = "../utils/net_utils" } actor = { path = "../vm/actor/" } genesis = { path = "../utils/genesis" } paramfetch = { path = "../utils/paramfetch" } diff --git a/forest/src/cli/mod.rs b/forest/src/cli/mod.rs index d67557fda810..92f0b51fff8b 100644 --- a/forest/src/cli/mod.rs +++ b/forest/src/cli/mod.rs @@ -72,9 +72,9 @@ pub struct DaemonOpts { pub kademlia: Option, #[structopt(short, long, help = "Allow MDNS (default = true)")] pub mdns: Option, - #[structopt(long, help = "Import a snapshot from a CAR file")] + #[structopt(long, help = "Import a snapshot from a local CAR file or url")] pub import_snapshot: Option, - #[structopt(long, help = "Import a chain from CAR file")] + #[structopt(long, help = "Import a chain from a local CAR file or url")] pub import_chain: Option, } diff --git a/forest/src/daemon.rs b/forest/src/daemon.rs index 7471143dc552..b3f346fdd21f 100644 --- a/forest/src/daemon.rs +++ b/forest/src/daemon.rs @@ -17,10 +17,11 @@ use genesis::{import_chain, initialize_genesis}; use libp2p::identity::{ed25519, Keypair}; use log::{debug, info, trace}; use message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider}; +use net_utils::download_file; use paramfetch::{get_params_default, SectorSizeOpt}; use rpc::{start_rpc, RpcState}; use state_manager::StateManager; -use std::fs::File; +use std::fs::{remove_file, File}; use std::io::BufReader; use std::sync::Arc; use utils::write_to_file; @@ -76,9 +77,7 @@ pub(super) async fn start(config: Config) { // Sync from snapshot if let Some(path) = &config.snapshot_path { - let file = File::open(path).expect("Snapshot file path not found!"); - let reader = BufReader::new(file); - import_chain::(&state_manager, reader, Some(0)) + import_chain::(&state_manager, path.to_string(), Some(0)) .await .unwrap(); } @@ -192,17 +191,13 @@ pub(super) async fn start(config: Config) { mod test { use super::*; use db::MemoryDB; - use std::fs::File; - use std::io::BufReader; #[async_std::test] async fn import_snapshot_from_file() { let db = Arc::new(MemoryDB::default()); let cs = Arc::new(ChainStore::new(db)); let sm = Arc::new(StateManager::new(cs)); - let file = File::open("test_files/chain4.car").expect("Snapshot file path not found!"); - let reader = BufReader::new(file); - import_chain::(&sm, reader, None) + import_chain::(&sm, "test_files/chain4.car".to_string(), None) .await .expect("Failed to import chain"); } @@ -211,10 +206,12 @@ mod test { let db = Arc::new(MemoryDB::default()); let cs = Arc::new(ChainStore::new(db)); let sm = Arc::new(StateManager::new(cs)); - let file = File::open("test_files/chain4.car").expect("Snapshot file path not found!"); - let reader = BufReader::new(file); - import_chain::(&sm, reader, Some(0)) - .await - .expect("Failed to import chain"); + import_chain::( + &sm, + "test_files/chain4.car".to_string(), + Some(0), + ) + .await + .expect("Failed to import chain"); } } diff --git a/utils/genesis/src/lib.rs b/utils/genesis/src/lib.rs index 839c28e9d985..2821f2a94937 100644 --- a/utils/genesis/src/lib.rs +++ b/utils/genesis/src/lib.rs @@ -123,3 +123,44 @@ where ); Ok(()) } +// TODO update +// async fn import_chain( +// bs: Arc, +// path: String, +// snapshot: bool, +// ) -> Result<(), Box> { +// let is_remote_file: bool = path.starts_with("http://") || path.starts_with("https://"); + +// let mut file_path = path; +// if is_remote_file { +// match download_file(file_path).await { +// Ok(file) => file_path = file, +// Err(err) => return Err(err), +// } +// } + +// let file = File::open(&file_path).expect("Snapshot file path not found!"); +// let reader = BufReader::new(file); +// info!("Importing chain from snapshot"); +// // start import +// let cids = load_car(bs.as_ref(), reader)?; +// let ts = chain::tipset_from_keys(bs.as_ref(), &TipsetKeys::new(cids))?; +// let gb = chain::tipset_by_height(bs.as_ref(), 0, &ts, true)?.unwrap(); +// let sm = StateManager::new(bs.clone()); +// if !snapshot { +// info!("Validating imported chain"); +// sm.validate_chain::(ts.clone()).await?; +// } +// let gen_cid = chain::set_genesis(bs.as_ref(), &gb.blocks()[0])?; +// bs.write(chain::HEAD_KEY, ts.key().marshal_cbor()?)?; +// info!( +// "Accepting {:?} as new head with genesis {:?}", +// ts.cids(), +// gen_cid +// ); + +// if is_remote_file { +// remove_file(file_path).unwrap(); +// } +// Ok(()) +// } diff --git a/utils/net_utils/Cargo.toml b/utils/net_utils/Cargo.toml new file mode 100644 index 000000000000..370187c93a02 --- /dev/null +++ b/utils/net_utils/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "net_utils" +version = "0.1.0" +authors = ["ChainSafe Systems "] +edition = "2018" + +[dependencies] +indicatif = "0.15.0" +futures = "0.3.5" +async-std = { version = "1.6.3" } +isahc = "0.9.11" +url = "2.1.1" +log = "0.4.8" +thiserror = "1.0" \ No newline at end of file diff --git a/utils/net_utils/src/download.rs b/utils/net_utils/src/download.rs new file mode 100644 index 000000000000..38a5848cd5b9 --- /dev/null +++ b/utils/net_utils/src/download.rs @@ -0,0 +1,103 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use async_std::{ + fs, + io::{self, copy, Read as AsyncRead}, +}; +use futures::task::{Context, Poll}; +use indicatif::{ProgressBar, ProgressStyle}; +use isahc::HttpClient; +use log::info; +use std::{marker::Unpin, path::Path, pin::Pin}; +use thiserror::Error; +use url::Url; + +/// Contains progress bar and reader. +struct DownloadProgress +where + S: Unpin, +{ + inner: S, + progress_bar: ProgressBar, +} + +#[derive(Debug, Error)] +enum DownloadError { + #[error("Cannot read a file header")] + HeaderError, + #[error("Filename encoding error")] + EncodingError, +} + +impl AsyncRead for DownloadProgress { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + match Pin::new(&mut self.inner).poll_read(cx, buf) { + Poll::Ready(Ok(size)) => { + if size == 0 { + self.progress_bar.finish(); + } else { + self.progress_bar.inc(size as u64); + } + Poll::Ready(Ok(size)) + } + rest => rest, + } + } +} + +/// Downloads the file and returns the path where it's saved. +pub async fn download_file(raw_url: String) -> Result> { + let url = Url::parse(raw_url.as_str())?; + + let client = HttpClient::new()?; + let total_size = { + let resp = client.head(url.as_str())?; + if resp.status().is_success() { + resp.headers() + .get("content-length") + .and_then(|ct_len| ct_len.to_str().ok()) + .and_then(|ct_len| ct_len.parse().ok()) + .unwrap_or(0) + } else { + return Err(Box::new(DownloadError::HeaderError)); + } + }; + + info!("Downloading file..."); + let mut request = client.get(url.as_str())?; + + let pb = ProgressBar::new(total_size); + pb.set_style(ProgressStyle::default_bar() + .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})") + .progress_chars("#>-")); + + let file = Path::new( + url.path_segments() + .and_then(|segments| segments.last()) + .unwrap_or("tmp.bin"), + ); + + let mut source = DownloadProgress { + progress_bar: pb, + inner: request.body_mut(), + }; + + let mut dest = fs::OpenOptions::new() + .create(true) + .append(true) + .open(&file) + .await?; + + let _ = copy(&mut source, &mut dest).await?; + + info!("File has been downloaded"); + match file.to_str() { + Some(st) => Ok(st.to_string()), + None => Err(Box::new(DownloadError::EncodingError)), + } +} diff --git a/utils/net_utils/src/lib.rs b/utils/net_utils/src/lib.rs new file mode 100644 index 000000000000..9762280cc49e --- /dev/null +++ b/utils/net_utils/src/lib.rs @@ -0,0 +1,6 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +mod download; + +pub use self::download::*; From bc9cb3aaff47bedde03c766508109675c69332c2 Mon Sep 17 00:00:00 2001 From: Stepan Naumov Date: Thu, 12 Nov 2020 20:11:51 -0500 Subject: [PATCH 2/6] second interation --- Cargo.lock | 18 ++------ forest/Cargo.toml | 2 +- forest/src/daemon.rs | 8 ++-- utils/genesis/src/lib.rs | 25 +++++------ utils/net_utils/Cargo.toml | 4 +- utils/net_utils/src/download.rs | 77 ++++++++------------------------- utils/paramfetch/Cargo.toml | 2 + 7 files changed, 41 insertions(+), 95 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4c923babed0..c87b5deb9013 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2057,7 +2057,7 @@ dependencies = [ "groupy", "hex", "humansize", - "indicatif 0.14.0", + "indicatif", "itertools 0.9.0", "lazy_static", "log", @@ -3058,18 +3058,6 @@ dependencies = [ "regex", ] -[[package]] -name = "indicatif" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7baab56125e25686df467fe470785512329883aab42696d661247aca2a2896e4" -dependencies = [ - "console 0.12.0", - "lazy_static", - "number_prefix", - "regex", -] - [[package]] name = "infer" version = "0.2.3" @@ -4176,9 +4164,9 @@ version = "0.1.0" dependencies = [ "async-std", "futures 0.3.6", - "indicatif 0.15.0", "isahc", "log", + "pbr", "thiserror", "url", ] @@ -4502,12 +4490,14 @@ dependencies = [ "blake2b_simd", "fil_types", "futures 0.3.8", + "isahc", "log", "pbr", "pin-project-lite", "serde", "serde_json", "surf", + "url", ] [[package]] diff --git a/forest/Cargo.toml b/forest/Cargo.toml index d4f680bdb7d1..3b19d6a2e6b9 100644 --- a/forest/Cargo.toml +++ b/forest/Cargo.toml @@ -42,4 +42,4 @@ net_utils = { path = "../utils/net_utils" } actor = { path = "../vm/actor/" } genesis = { path = "../utils/genesis" } paramfetch = { path = "../utils/paramfetch" } -encoding = { package = "forest_encoding", path = "../encoding" } +encoding = { package = "forest_encoding", path = "../encoding" } \ No newline at end of file diff --git a/forest/src/daemon.rs b/forest/src/daemon.rs index b3f346fdd21f..b3361315b131 100644 --- a/forest/src/daemon.rs +++ b/forest/src/daemon.rs @@ -3,8 +3,7 @@ use super::cli::{block_until_sigint, Config}; use actor::EPOCH_DURATION_SECONDS; -use async_std::sync::RwLock; -use async_std::task; +use async_std::{sync::RwLock, task}; use auth::{generate_priv_key, JWT_IDENTIFIER}; use beacon::{DrandBeacon, DEFAULT_DRAND_URL}; use chain::ChainStore; @@ -17,12 +16,11 @@ use genesis::{import_chain, initialize_genesis}; use libp2p::identity::{ed25519, Keypair}; use log::{debug, info, trace}; use message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider}; -use net_utils::download_file; +use net_utils::make_reader; use paramfetch::{get_params_default, SectorSizeOpt}; use rpc::{start_rpc, RpcState}; use state_manager::StateManager; -use std::fs::{remove_file, File}; -use std::io::BufReader; +use std::fs::File; use std::sync::Arc; use utils::write_to_file; use wallet::{KeyStore, PersistentKeyStore}; diff --git a/utils/genesis/src/lib.rs b/utils/genesis/src/lib.rs index 2821f2a94937..f77689ee3c78 100644 --- a/utils/genesis/src/lib.rs +++ b/utils/genesis/src/lib.rs @@ -124,6 +124,7 @@ where Ok(()) } // TODO update +/// Import a chain from a CAR file // async fn import_chain( // bs: Arc, // path: String, @@ -131,19 +132,16 @@ where // ) -> Result<(), Box> { // let is_remote_file: bool = path.starts_with("http://") || path.starts_with("https://"); -// let mut file_path = path; -// if is_remote_file { -// match download_file(file_path).await { -// Ok(file) => file_path = file, -// Err(err) => return Err(err), -// } -// } - -// let file = File::open(&file_path).expect("Snapshot file path not found!"); -// let reader = BufReader::new(file); // info!("Importing chain from snapshot"); -// // start import -// let cids = load_car(bs.as_ref(), reader)?; +// let cids = if is_remote_file { +// let reader = make_reader(path)?; +// info!("Downloading file..."); +// load_car(bs.as_ref(), reader)? +// } else { +// let reader = File::open(&path).expect("Snapshot file path not found!"); +// load_car(bs.as_ref(), reader)? +// }; + // let ts = chain::tipset_from_keys(bs.as_ref(), &TipsetKeys::new(cids))?; // let gb = chain::tipset_by_height(bs.as_ref(), 0, &ts, true)?.unwrap(); // let sm = StateManager::new(bs.clone()); @@ -159,8 +157,5 @@ where // gen_cid // ); -// if is_remote_file { -// remove_file(file_path).unwrap(); -// } // Ok(()) // } diff --git a/utils/net_utils/Cargo.toml b/utils/net_utils/Cargo.toml index 370187c93a02..47e5a0188ef5 100644 --- a/utils/net_utils/Cargo.toml +++ b/utils/net_utils/Cargo.toml @@ -5,10 +5,10 @@ authors = ["ChainSafe Systems "] edition = "2018" [dependencies] -indicatif = "0.15.0" futures = "0.3.5" async-std = { version = "1.6.3" } isahc = "0.9.11" url = "2.1.1" log = "0.4.8" -thiserror = "1.0" \ No newline at end of file +thiserror = "1.0" +pbr = "1.0.3" \ No newline at end of file diff --git a/utils/net_utils/src/download.rs b/utils/net_utils/src/download.rs index 38a5848cd5b9..866d1b7a7850 100644 --- a/utils/net_utils/src/download.rs +++ b/utils/net_utils/src/download.rs @@ -6,52 +6,37 @@ use async_std::{ io::{self, copy, Read as AsyncRead}, }; use futures::task::{Context, Poll}; -use indicatif::{ProgressBar, ProgressStyle}; -use isahc::HttpClient; +use isahc::{Body, HttpClient}; use log::info; -use std::{marker::Unpin, path::Path, pin::Pin}; +use pbr::ProgressBar; +use std::io::{Read, Result as IOResult}; +use std::{io::Stdout, marker::Unpin, path::Path, pin::Pin}; use thiserror::Error; use url::Url; /// Contains progress bar and reader. -struct DownloadProgress -where - S: Unpin, -{ - inner: S, - progress_bar: ProgressBar, +pub struct DownloadProgress { + inner: R, + progress_bar: ProgressBar, } #[derive(Debug, Error)] enum DownloadError { #[error("Cannot read a file header")] HeaderError, - #[error("Filename encoding error")] - EncodingError, } -impl AsyncRead for DownloadProgress { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - match Pin::new(&mut self.inner).poll_read(cx, buf) { - Poll::Ready(Ok(size)) => { - if size == 0 { - self.progress_bar.finish(); - } else { - self.progress_bar.inc(size as u64); - } - Poll::Ready(Ok(size)) - } - rest => rest, - } +impl Read for DownloadProgress { + fn read(&mut self, buf: &mut [u8]) -> IOResult { + self.inner.read(buf).map(|n| { + self.progress_bar.add(n as u64); + n + }) } } -/// Downloads the file and returns the path where it's saved. -pub async fn download_file(raw_url: String) -> Result> { +/// Builds Reader for a provided URL. +pub fn make_reader(raw_url: String) -> Result, Box> { let url = Url::parse(raw_url.as_str())?; let client = HttpClient::new()?; @@ -68,36 +53,12 @@ pub async fn download_file(raw_url: String) -> Result-")); - - let file = Path::new( - url.path_segments() - .and_then(|segments| segments.last()) - .unwrap_or("tmp.bin"), - ); - let mut source = DownloadProgress { + Ok(DownloadProgress { progress_bar: pb, - inner: request.body_mut(), - }; - - let mut dest = fs::OpenOptions::new() - .create(true) - .append(true) - .open(&file) - .await?; - - let _ = copy(&mut source, &mut dest).await?; - - info!("File has been downloaded"); - match file.to_str() { - Some(st) => Ok(st.to_string()), - None => Err(Box::new(DownloadError::EncodingError)), - } + inner: request.into_body(), + }) } diff --git a/utils/paramfetch/Cargo.toml b/utils/paramfetch/Cargo.toml index 50bdf6305bf5..a38f749713d5 100644 --- a/utils/paramfetch/Cargo.toml +++ b/utils/paramfetch/Cargo.toml @@ -15,3 +15,5 @@ log = "0.4.8" blake2b_simd = "0.5.9" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +isahc = "0.9.11" +url = "2.1.1" From 4c0bee07130392723a3976409530198679fb4d9f Mon Sep 17 00:00:00 2001 From: Stepan Naumov Date: Thu, 12 Nov 2020 21:05:29 -0500 Subject: [PATCH 3/6] rebasing, fixing linters --- Cargo.lock | 15 ++------- forest/src/daemon.rs | 16 +++------ utils/genesis/Cargo.toml | 1 + utils/genesis/src/lib.rs | 57 ++++++++++----------------------- utils/net_utils/Cargo.toml | 2 -- utils/net_utils/src/download.rs | 8 +---- 6 files changed, 26 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c87b5deb9013..5789321ca49a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2653,6 +2653,7 @@ dependencies = [ "forest_encoding", "ipld_blockstore", "log", + "net_utils", "state_manager", ] @@ -3195,6 +3196,7 @@ dependencies = [ "futures-util", "http", "log", + "mime", "once_cell", "slab", "sluice", @@ -4162,8 +4164,6 @@ dependencies = [ name = "net_utils" version = "0.1.0" dependencies = [ - "async-std", - "futures 0.3.6", "isahc", "log", "pbr", @@ -4678,17 +4678,6 @@ dependencies = [ "syn 1.0.48", ] -[[package]] -name = "pin-project-internal" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81a4ffa594b66bff340084d4081df649a7dc049ac8d7fc458d8e628bfbbb2f86" -dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.7", - "syn 1.0.44", -] - [[package]] name = "pin-project-lite" version = "0.1.11" diff --git a/forest/src/daemon.rs b/forest/src/daemon.rs index b3361315b131..1d24b39b0c86 100644 --- a/forest/src/daemon.rs +++ b/forest/src/daemon.rs @@ -16,11 +16,9 @@ use genesis::{import_chain, initialize_genesis}; use libp2p::identity::{ed25519, Keypair}; use log::{debug, info, trace}; use message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider}; -use net_utils::make_reader; use paramfetch::{get_params_default, SectorSizeOpt}; use rpc::{start_rpc, RpcState}; use state_manager::StateManager; -use std::fs::File; use std::sync::Arc; use utils::write_to_file; use wallet::{KeyStore, PersistentKeyStore}; @@ -75,7 +73,7 @@ pub(super) async fn start(config: Config) { // Sync from snapshot if let Some(path) = &config.snapshot_path { - import_chain::(&state_manager, path.to_string(), Some(0)) + import_chain::(&state_manager, path.to_string(), Some(0)) .await .unwrap(); } @@ -195,7 +193,7 @@ mod test { let db = Arc::new(MemoryDB::default()); let cs = Arc::new(ChainStore::new(db)); let sm = Arc::new(StateManager::new(cs)); - import_chain::(&sm, "test_files/chain4.car".to_string(), None) + import_chain::(&sm, "test_files/chain4.car".to_string(), None) .await .expect("Failed to import chain"); } @@ -204,12 +202,8 @@ mod test { let db = Arc::new(MemoryDB::default()); let cs = Arc::new(ChainStore::new(db)); let sm = Arc::new(StateManager::new(cs)); - import_chain::( - &sm, - "test_files/chain4.car".to_string(), - Some(0), - ) - .await - .expect("Failed to import chain"); + import_chain::(&sm, "test_files/chain4.car".to_string(), Some(0)) + .await + .expect("Failed to import chain"); } } diff --git a/utils/genesis/Cargo.toml b/utils/genesis/Cargo.toml index bb34de3f93ea..5b3690a80c99 100644 --- a/utils/genesis/Cargo.toml +++ b/utils/genesis/Cargo.toml @@ -18,3 +18,4 @@ blocks = { package = "forest_blocks", path = "../../blockchain/blocks" } chain = { path = "../../blockchain/chain" } fil_types = { path = "../../types" } encoding = { path = "../../encoding", package = "forest_encoding" } +net_utils = { path = "../net_utils" } diff --git a/utils/genesis/src/lib.rs b/utils/genesis/src/lib.rs index f77689ee3c78..68b32da98a43 100644 --- a/utils/genesis/src/lib.rs +++ b/utils/genesis/src/lib.rs @@ -9,6 +9,7 @@ use fil_types::verifier::ProofVerifier; use forest_car::load_car; use ipld_blockstore::BlockStore; use log::{debug, info}; +use net_utils::make_reader; use state_manager::StateManager; use std::error::Error as StdError; use std::fs::File; @@ -89,17 +90,26 @@ where /// Import a chain from a CAR file. If the snapshot boolean is set, it will not verify the chain /// state and instead accept the largest height as genesis. -pub async fn import_chain( +pub async fn import_chain( sm: &Arc>, - reader: R, + path: String, validate_height: Option, ) -> Result<(), Box> where DB: BlockStore + Send + Sync + 'static, { + let is_remote_file: bool = path.starts_with("http://") || path.starts_with("https://"); + info!("Importing chain from snapshot"); // start import - let cids = load_car(sm.blockstore(), reader)?; + let cids = if is_remote_file { + let reader = make_reader(path)?; + info!("Downloading file..."); + load_car(sm.blockstore(), reader)? + } else { + let reader = File::open(&path).expect("Snapshot file path not found!"); + load_car(sm.blockstore(), reader)? + }; let ts = sm .chain_store() .tipset_from_keys(&TipsetKeys::new(cids)) @@ -109,6 +119,9 @@ where .tipset_by_height(0, &ts, true) .await? .unwrap(); + + let ts = sm.chain_store().tipset_from_keys(&TipsetKeys::new(cids))?; + let gb = sm.chain_store().tipset_by_height(0, &ts, true)?.unwrap(); if let Some(height) = validate_height { info!("Validating imported chain"); sm.validate_chain::(ts.clone(), height).await?; @@ -122,40 +135,4 @@ where gen_cid ); Ok(()) -} -// TODO update -/// Import a chain from a CAR file -// async fn import_chain( -// bs: Arc, -// path: String, -// snapshot: bool, -// ) -> Result<(), Box> { -// let is_remote_file: bool = path.starts_with("http://") || path.starts_with("https://"); - -// info!("Importing chain from snapshot"); -// let cids = if is_remote_file { -// let reader = make_reader(path)?; -// info!("Downloading file..."); -// load_car(bs.as_ref(), reader)? -// } else { -// let reader = File::open(&path).expect("Snapshot file path not found!"); -// load_car(bs.as_ref(), reader)? -// }; - -// let ts = chain::tipset_from_keys(bs.as_ref(), &TipsetKeys::new(cids))?; -// let gb = chain::tipset_by_height(bs.as_ref(), 0, &ts, true)?.unwrap(); -// let sm = StateManager::new(bs.clone()); -// if !snapshot { -// info!("Validating imported chain"); -// sm.validate_chain::(ts.clone()).await?; -// } -// let gen_cid = chain::set_genesis(bs.as_ref(), &gb.blocks()[0])?; -// bs.write(chain::HEAD_KEY, ts.key().marshal_cbor()?)?; -// info!( -// "Accepting {:?} as new head with genesis {:?}", -// ts.cids(), -// gen_cid -// ); - -// Ok(()) -// } +} \ No newline at end of file diff --git a/utils/net_utils/Cargo.toml b/utils/net_utils/Cargo.toml index 47e5a0188ef5..a9b8d41d32c3 100644 --- a/utils/net_utils/Cargo.toml +++ b/utils/net_utils/Cargo.toml @@ -5,8 +5,6 @@ authors = ["ChainSafe Systems "] edition = "2018" [dependencies] -futures = "0.3.5" -async-std = { version = "1.6.3" } isahc = "0.9.11" url = "2.1.1" log = "0.4.8" diff --git a/utils/net_utils/src/download.rs b/utils/net_utils/src/download.rs index 866d1b7a7850..c68dd15070ab 100644 --- a/utils/net_utils/src/download.rs +++ b/utils/net_utils/src/download.rs @@ -1,16 +1,10 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use async_std::{ - fs, - io::{self, copy, Read as AsyncRead}, -}; -use futures::task::{Context, Poll}; use isahc::{Body, HttpClient}; -use log::info; use pbr::ProgressBar; +use std::io::Stdout; use std::io::{Read, Result as IOResult}; -use std::{io::Stdout, marker::Unpin, path::Path, pin::Pin}; use thiserror::Error; use url::Url; From 736177d2657aae5f2268257d94e1a590aac9992d Mon Sep 17 00:00:00 2001 From: Stepan Naumov Date: Fri, 13 Nov 2020 20:18:16 -0500 Subject: [PATCH 4/6] generic write on FetchProgress --- Cargo.lock | 4 +++ utils/genesis/src/lib.rs | 2 +- utils/net_utils/Cargo.toml | 5 ++- utils/net_utils/src/download.rs | 61 +++++++++++++++++++++++++++------ utils/paramfetch/Cargo.toml | 1 + utils/paramfetch/src/lib.rs | 48 ++------------------------ 6 files changed, 63 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5789321ca49a..4648fc5814f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4164,9 +4164,12 @@ dependencies = [ name = "net_utils" version = "0.1.0" dependencies = [ + "async-std", + "futures 0.3.8", "isahc", "log", "pbr", + "pin-project-lite", "thiserror", "url", ] @@ -4492,6 +4495,7 @@ dependencies = [ "futures 0.3.8", "isahc", "log", + "net_utils", "pbr", "pin-project-lite", "serde", diff --git a/utils/genesis/src/lib.rs b/utils/genesis/src/lib.rs index 68b32da98a43..bd880fc052c3 100644 --- a/utils/genesis/src/lib.rs +++ b/utils/genesis/src/lib.rs @@ -135,4 +135,4 @@ where gen_cid ); Ok(()) -} \ No newline at end of file +} diff --git a/utils/net_utils/Cargo.toml b/utils/net_utils/Cargo.toml index a9b8d41d32c3..8f107a44b94c 100644 --- a/utils/net_utils/Cargo.toml +++ b/utils/net_utils/Cargo.toml @@ -9,4 +9,7 @@ isahc = "0.9.11" url = "2.1.1" log = "0.4.8" thiserror = "1.0" -pbr = "1.0.3" \ No newline at end of file +pbr = "1.0.3" +pin-project-lite = "0.1" +async-std = { version = "1.6.3", features = ["attributes"] } +futures = "0.3.5" \ No newline at end of file diff --git a/utils/net_utils/src/download.rs b/utils/net_utils/src/download.rs index c68dd15070ab..ff0673a5cbf1 100644 --- a/utils/net_utils/src/download.rs +++ b/utils/net_utils/src/download.rs @@ -1,26 +1,63 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use async_std::io::BufRead; +use futures::prelude::*; use isahc::{Body, HttpClient}; use pbr::ProgressBar; -use std::io::Stdout; -use std::io::{Read, Result as IOResult}; +use pin_project_lite::pin_project; +use std::io::{self, Read, Result as IOResult, Stdout, Write}; +use std::pin::Pin; +use std::task::{Context, Poll}; use thiserror::Error; use url::Url; -/// Contains progress bar and reader. -pub struct DownloadProgress { - inner: R, - progress_bar: ProgressBar, -} - #[derive(Debug, Error)] enum DownloadError { #[error("Cannot read a file header")] HeaderError, } -impl Read for DownloadProgress { +pin_project! { + pub struct FetchProgress { + #[pin] + pub inner: R, + pub progress_bar: ProgressBar, + } +} + +impl FetchProgress { + pub fn finish(&mut self) { + self.progress_bar.finish(); + } +} + +impl AsyncRead for FetchProgress { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let r = Pin::new(&mut self.inner).poll_read(cx, buf); + if let Poll::Ready(Ok(size)) = r { + self.progress_bar.add(size as u64); + } + r + } +} + +impl BufRead for FetchProgress { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.inner.poll_fill_buf(cx) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + Pin::new(&mut self.inner).consume(amt) + } +} + +impl Read for FetchProgress { fn read(&mut self, buf: &mut [u8]) -> IOResult { self.inner.read(buf).map(|n| { self.progress_bar.add(n as u64); @@ -30,7 +67,9 @@ impl Read for DownloadProgress { } /// Builds Reader for a provided URL. -pub fn make_reader(raw_url: String) -> Result, Box> { +pub fn make_reader( + raw_url: String, +) -> Result, Box> { let url = Url::parse(raw_url.as_str())?; let client = HttpClient::new()?; @@ -51,7 +90,7 @@ pub fn make_reader(raw_url: String) -> Result, Box { - #[pin] - inner: R, - progress_bar: ProgressBar, - } -} - -impl FetchProgress { - fn finish(&mut self) { - self.progress_bar.finish(); - } -} - -impl AsyncRead for FetchProgress { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - let r = Pin::new(&mut self.inner).poll_read(cx, buf); - if let Poll::Ready(Ok(size)) = r { - self.progress_bar.add(size as u64); - } - r - } -} - -impl BufRead for FetchProgress { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - this.inner.poll_fill_buf(cx) - } - - fn consume(mut self: Pin<&mut Self>, amt: usize) { - Pin::new(&mut self.inner).consume(amt) - } -} - async fn fetch_params( path: &Path, info: &ParameterData, From 84ebc32af29eb425f257d42cc2d70a1467d9dd30 Mon Sep 17 00:00:00 2001 From: Stepan Naumov Date: Tue, 17 Nov 2020 20:34:18 -0500 Subject: [PATCH 5/6] addressing comments --- Cargo.lock | 1 + forest/src/daemon.rs | 6 +++--- utils/genesis/Cargo.toml | 1 + utils/genesis/src/lib.rs | 13 +++++++------ utils/net_utils/src/download.rs | 6 ++---- 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4648fc5814f6..f66f0084af03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2655,6 +2655,7 @@ dependencies = [ "log", "net_utils", "state_manager", + "url", ] [[package]] diff --git a/forest/src/daemon.rs b/forest/src/daemon.rs index 1d24b39b0c86..471ca9213cdf 100644 --- a/forest/src/daemon.rs +++ b/forest/src/daemon.rs @@ -73,7 +73,7 @@ pub(super) async fn start(config: Config) { // Sync from snapshot if let Some(path) = &config.snapshot_path { - import_chain::(&state_manager, path.to_string(), Some(0)) + import_chain::(&state_manager, path, Some(0)) .await .unwrap(); } @@ -193,7 +193,7 @@ mod test { let db = Arc::new(MemoryDB::default()); let cs = Arc::new(ChainStore::new(db)); let sm = Arc::new(StateManager::new(cs)); - import_chain::(&sm, "test_files/chain4.car".to_string(), None) + import_chain::(&sm, "test_files/chain4.car", None) .await .expect("Failed to import chain"); } @@ -202,7 +202,7 @@ mod test { let db = Arc::new(MemoryDB::default()); let cs = Arc::new(ChainStore::new(db)); let sm = Arc::new(StateManager::new(cs)); - import_chain::(&sm, "test_files/chain4.car".to_string(), Some(0)) + import_chain::(&sm, "test_files/chain4.car", Some(0)) .await .expect("Failed to import chain"); } diff --git a/utils/genesis/Cargo.toml b/utils/genesis/Cargo.toml index 5b3690a80c99..a7e3980e6d8d 100644 --- a/utils/genesis/Cargo.toml +++ b/utils/genesis/Cargo.toml @@ -19,3 +19,4 @@ chain = { path = "../../blockchain/chain" } fil_types = { path = "../../types" } encoding = { path = "../../encoding", package = "forest_encoding" } net_utils = { path = "../net_utils" } +url = "2.1.1" diff --git a/utils/genesis/src/lib.rs b/utils/genesis/src/lib.rs index bd880fc052c3..53c358ba777f 100644 --- a/utils/genesis/src/lib.rs +++ b/utils/genesis/src/lib.rs @@ -9,13 +9,14 @@ use fil_types::verifier::ProofVerifier; use forest_car::load_car; use ipld_blockstore::BlockStore; use log::{debug, info}; -use net_utils::make_reader; +use net_utils::make_http_reader; use state_manager::StateManager; use std::error::Error as StdError; use std::fs::File; use std::include_bytes; use std::io::{BufReader, Read}; use std::sync::Arc; +use url::Url; #[cfg(feature = "testing")] pub const EXPORT_SR_40: &[u8; 1226395] = include_bytes!("mainnet/export40.car"); @@ -92,7 +93,7 @@ where /// state and instead accept the largest height as genesis. pub async fn import_chain( sm: &Arc>, - path: String, + path: &str, validate_height: Option, ) -> Result<(), Box> where @@ -103,11 +104,13 @@ where info!("Importing chain from snapshot"); // start import let cids = if is_remote_file { - let reader = make_reader(path)?; + let url = Url::parse(path)?; + let reader = make_http_reader(url)?; info!("Downloading file..."); load_car(sm.blockstore(), reader)? } else { - let reader = File::open(&path).expect("Snapshot file path not found!"); + let file = File::open(&path).expect("Snapshot file path not found!"); + let reader = BufReader::new(file); load_car(sm.blockstore(), reader)? }; let ts = sm @@ -120,8 +123,6 @@ where .await? .unwrap(); - let ts = sm.chain_store().tipset_from_keys(&TipsetKeys::new(cids))?; - let gb = sm.chain_store().tipset_by_height(0, &ts, true)?.unwrap(); if let Some(height) = validate_height { info!("Validating imported chain"); sm.validate_chain::(ts.clone(), height).await?; diff --git a/utils/net_utils/src/download.rs b/utils/net_utils/src/download.rs index ff0673a5cbf1..4b203a3f8601 100644 --- a/utils/net_utils/src/download.rs +++ b/utils/net_utils/src/download.rs @@ -67,11 +67,9 @@ impl Read for FetchProgress { } /// Builds Reader for a provided URL. -pub fn make_reader( - raw_url: String, +pub fn make_http_reader( + url: Url, ) -> Result, Box> { - let url = Url::parse(raw_url.as_str())?; - let client = HttpClient::new()?; let total_size = { let resp = client.head(url.as_str())?; From aec408a3e3c304afd2c519da6367acc25508c916 Mon Sep 17 00:00:00 2001 From: Stepan Naumov Date: Wed, 18 Nov 2020 20:54:10 -0500 Subject: [PATCH 6/6] switching to TryFrom --- utils/genesis/src/lib.rs | 10 +++-- utils/net_utils/src/download.rs | 67 +++++++++++++++++++++------------ 2 files changed, 49 insertions(+), 28 deletions(-) diff --git a/utils/genesis/src/lib.rs b/utils/genesis/src/lib.rs index 53c358ba777f..fbd42a7c1eff 100644 --- a/utils/genesis/src/lib.rs +++ b/utils/genesis/src/lib.rs @@ -9,8 +9,9 @@ use fil_types::verifier::ProofVerifier; use forest_car::load_car; use ipld_blockstore::BlockStore; use log::{debug, info}; -use net_utils::make_http_reader; +use net_utils::FetchProgress; use state_manager::StateManager; +use std::convert::TryFrom; use std::error::Error as StdError; use std::fs::File; use std::include_bytes; @@ -104,13 +105,14 @@ where info!("Importing chain from snapshot"); // start import let cids = if is_remote_file { - let url = Url::parse(path)?; - let reader = make_http_reader(url)?; + let url = Url::parse(path).expect("URL is invalid"); info!("Downloading file..."); + let reader = FetchProgress::try_from(url)?; load_car(sm.blockstore(), reader)? } else { let file = File::open(&path).expect("Snapshot file path not found!"); - let reader = BufReader::new(file); + info!("Reading file..."); + let reader = FetchProgress::try_from(file)?; load_car(sm.blockstore(), reader)? }; let ts = sm diff --git a/utils/net_utils/src/download.rs b/utils/net_utils/src/download.rs index 4b203a3f8601..298d65dd6d33 100644 --- a/utils/net_utils/src/download.rs +++ b/utils/net_utils/src/download.rs @@ -6,7 +6,9 @@ use futures::prelude::*; use isahc::{Body, HttpClient}; use pbr::ProgressBar; use pin_project_lite::pin_project; -use std::io::{self, Read, Result as IOResult, Stdout, Write}; +use std::convert::TryFrom; +use std::fs::File; +use std::io::{self, BufReader, Read, Result as IOResult, Stdout, Write}; use std::pin::Pin; use std::task::{Context, Poll}; use thiserror::Error; @@ -19,6 +21,7 @@ enum DownloadError { } pin_project! { + /// Holds a Reader, tracks read progress and draw a progress bar. pub struct FetchProgress { #[pin] pub inner: R, @@ -66,30 +69,46 @@ impl Read for FetchProgress { } } -/// Builds Reader for a provided URL. -pub fn make_http_reader( - url: Url, -) -> Result, Box> { - let client = HttpClient::new()?; - let total_size = { - let resp = client.head(url.as_str())?; - if resp.status().is_success() { - resp.headers() - .get("content-length") - .and_then(|ct_len| ct_len.to_str().ok()) - .and_then(|ct_len| ct_len.parse().ok()) - .unwrap_or(0) - } else { - return Err(Box::new(DownloadError::HeaderError)); - } - }; +impl TryFrom for FetchProgress { + type Error = Box; + + fn try_from(url: Url) -> Result { + let client = HttpClient::new()?; + let total_size = { + let resp = client.head(url.as_str())?; + if resp.status().is_success() { + resp.headers() + .get("content-length") + .and_then(|ct_len| ct_len.to_str().ok()) + .and_then(|ct_len| ct_len.parse().ok()) + .unwrap_or(0) + } else { + return Err(Box::new(DownloadError::HeaderError)); + } + }; + + let request = client.get(url.as_str())?; - let request = client.get(url.as_str())?; + let pb = ProgressBar::new(total_size); - let pb = ProgressBar::new(total_size); + Ok(FetchProgress { + progress_bar: pb, + inner: request.into_body(), + }) + } +} - Ok(FetchProgress { - progress_bar: pb, - inner: request.into_body(), - }) +impl TryFrom for FetchProgress, Stdout> { + type Error = Box; + + fn try_from(file: File) -> Result { + let total_size = file.metadata()?.len(); + + let pb = ProgressBar::new(total_size); + + Ok(FetchProgress { + progress_bar: pb, + inner: BufReader::new(file), + }) + } }