diff --git a/Cargo.lock b/Cargo.lock index c3ba4216c64d..f66f0084af03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2199,6 +2199,7 @@ dependencies = [ "libp2p", "log", "message_pool", + "net_utils", "paramfetch", "pretty_env_logger", "rpc", @@ -2652,7 +2653,9 @@ dependencies = [ "forest_encoding", "ipld_blockstore", "log", + "net_utils", "state_manager", + "url", ] [[package]] @@ -3188,11 +3191,13 @@ dependencies = [ "crossbeam-utils 0.8.0", "curl", "curl-sys", + "encoding_rs", "futures-channel", "futures-io", "futures-util", "http", "log", + "mime", "once_cell", "slab", "sluice", @@ -4156,6 +4161,20 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "net_utils" +version = "0.1.0" +dependencies = [ + "async-std", + "futures 0.3.8", + "isahc", + "log", + "pbr", + "pin-project-lite", + "thiserror", + "url", +] + [[package]] name = "nix" version = "0.18.0" @@ -4475,12 +4494,15 @@ dependencies = [ "blake2b_simd", "fil_types", "futures 0.3.8", + "isahc", "log", + "net_utils", "pbr", "pin-project-lite", "serde", "serde_json", "surf", + "url", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 153b1d7e09c9..d50026a32877 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ members = [ "utils/commcid", "utils/json_utils", "utils/genesis", + "utils/net_utils", "utils/statediff", "types", "key_management", diff --git a/forest/Cargo.toml b/forest/Cargo.toml index 601f1d8d7055..3b19d6a2e6b9 100644 --- a/forest/Cargo.toml +++ b/forest/Cargo.toml @@ -38,7 +38,8 @@ wallet = { package = "key_management", path = "../key_management" } jsonrpc-v2 = { version = "0.5.2", git = "https://github.com/ChainSafe/jsonrpc-v2", features = ["easy-errors", "macros"], default-features = false } uuid = { version = "0.8.1", features = ["v4"] } auth = { path = "../utils/auth"} +net_utils = { path = "../utils/net_utils" } actor = { path = "../vm/actor/" } genesis = { path = "../utils/genesis" } paramfetch = { path = "../utils/paramfetch" } -encoding = { package = "forest_encoding", path = "../encoding" } +encoding = { package = "forest_encoding", path = "../encoding" } \ No newline at end of file diff --git a/forest/src/cli/mod.rs b/forest/src/cli/mod.rs index d67557fda810..92f0b51fff8b 100644 --- a/forest/src/cli/mod.rs +++ b/forest/src/cli/mod.rs @@ -72,9 +72,9 @@ pub struct DaemonOpts { pub kademlia: Option, #[structopt(short, long, help = "Allow MDNS (default = true)")] pub mdns: Option, - #[structopt(long, help = "Import a snapshot from a CAR file")] + #[structopt(long, help = "Import a snapshot from a local CAR file or url")] pub import_snapshot: Option, - #[structopt(long, help = "Import a chain from CAR file")] + #[structopt(long, help = "Import a chain from a local CAR file or url")] pub import_chain: Option, } diff --git a/forest/src/daemon.rs b/forest/src/daemon.rs index 7471143dc552..471ca9213cdf 100644 --- a/forest/src/daemon.rs +++ b/forest/src/daemon.rs @@ -3,8 +3,7 @@ use super::cli::{block_until_sigint, Config}; use actor::EPOCH_DURATION_SECONDS; -use async_std::sync::RwLock; -use async_std::task; +use async_std::{sync::RwLock, task}; use auth::{generate_priv_key, JWT_IDENTIFIER}; use beacon::{DrandBeacon, DEFAULT_DRAND_URL}; use chain::ChainStore; @@ -20,8 +19,6 @@ use message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider}; use paramfetch::{get_params_default, SectorSizeOpt}; use rpc::{start_rpc, RpcState}; use state_manager::StateManager; -use std::fs::File; -use std::io::BufReader; use std::sync::Arc; use utils::write_to_file; use wallet::{KeyStore, PersistentKeyStore}; @@ -76,9 +73,7 @@ pub(super) async fn start(config: Config) { // Sync from snapshot if let Some(path) = &config.snapshot_path { - let file = File::open(path).expect("Snapshot file path not found!"); - let reader = BufReader::new(file); - import_chain::(&state_manager, reader, Some(0)) + import_chain::(&state_manager, path, Some(0)) .await .unwrap(); } @@ -192,17 +187,13 @@ pub(super) async fn start(config: Config) { mod test { use super::*; use db::MemoryDB; - use std::fs::File; - use std::io::BufReader; #[async_std::test] async fn import_snapshot_from_file() { let db = Arc::new(MemoryDB::default()); let cs = Arc::new(ChainStore::new(db)); let sm = Arc::new(StateManager::new(cs)); - let file = File::open("test_files/chain4.car").expect("Snapshot file path not found!"); - let reader = BufReader::new(file); - import_chain::(&sm, reader, None) + import_chain::(&sm, "test_files/chain4.car", None) .await .expect("Failed to import chain"); } @@ -211,9 +202,7 @@ mod test { let db = Arc::new(MemoryDB::default()); let cs = Arc::new(ChainStore::new(db)); let sm = Arc::new(StateManager::new(cs)); - let file = File::open("test_files/chain4.car").expect("Snapshot file path not found!"); - let reader = BufReader::new(file); - import_chain::(&sm, reader, Some(0)) + import_chain::(&sm, "test_files/chain4.car", Some(0)) .await .expect("Failed to import chain"); } diff --git a/utils/genesis/Cargo.toml b/utils/genesis/Cargo.toml index bb34de3f93ea..a7e3980e6d8d 100644 --- a/utils/genesis/Cargo.toml +++ b/utils/genesis/Cargo.toml @@ -18,3 +18,5 @@ blocks = { package = "forest_blocks", path = "../../blockchain/blocks" } chain = { path = "../../blockchain/chain" } fil_types = { path = "../../types" } encoding = { path = "../../encoding", package = "forest_encoding" } +net_utils = { path = "../net_utils" } +url = "2.1.1" diff --git a/utils/genesis/src/lib.rs b/utils/genesis/src/lib.rs index 839c28e9d985..fbd42a7c1eff 100644 --- a/utils/genesis/src/lib.rs +++ b/utils/genesis/src/lib.rs @@ -9,12 +9,15 @@ use fil_types::verifier::ProofVerifier; use forest_car::load_car; use ipld_blockstore::BlockStore; use log::{debug, info}; +use net_utils::FetchProgress; use state_manager::StateManager; +use std::convert::TryFrom; use std::error::Error as StdError; use std::fs::File; use std::include_bytes; use std::io::{BufReader, Read}; use std::sync::Arc; +use url::Url; #[cfg(feature = "testing")] pub const EXPORT_SR_40: &[u8; 1226395] = include_bytes!("mainnet/export40.car"); @@ -89,17 +92,29 @@ where /// Import a chain from a CAR file. If the snapshot boolean is set, it will not verify the chain /// state and instead accept the largest height as genesis. -pub async fn import_chain( +pub async fn import_chain( sm: &Arc>, - reader: R, + path: &str, validate_height: Option, ) -> Result<(), Box> where DB: BlockStore + Send + Sync + 'static, { + let is_remote_file: bool = path.starts_with("http://") || path.starts_with("https://"); + info!("Importing chain from snapshot"); // start import - let cids = load_car(sm.blockstore(), reader)?; + let cids = if is_remote_file { + let url = Url::parse(path).expect("URL is invalid"); + info!("Downloading file..."); + let reader = FetchProgress::try_from(url)?; + load_car(sm.blockstore(), reader)? + } else { + let file = File::open(&path).expect("Snapshot file path not found!"); + info!("Reading file..."); + let reader = FetchProgress::try_from(file)?; + load_car(sm.blockstore(), reader)? + }; let ts = sm .chain_store() .tipset_from_keys(&TipsetKeys::new(cids)) @@ -109,6 +124,7 @@ where .tipset_by_height(0, &ts, true) .await? .unwrap(); + if let Some(height) = validate_height { info!("Validating imported chain"); sm.validate_chain::(ts.clone(), height).await?; diff --git a/utils/net_utils/Cargo.toml b/utils/net_utils/Cargo.toml new file mode 100644 index 000000000000..8f107a44b94c --- /dev/null +++ b/utils/net_utils/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "net_utils" +version = "0.1.0" +authors = ["ChainSafe Systems "] +edition = "2018" + +[dependencies] +isahc = "0.9.11" +url = "2.1.1" +log = "0.4.8" +thiserror = "1.0" +pbr = "1.0.3" +pin-project-lite = "0.1" +async-std = { version = "1.6.3", features = ["attributes"] } +futures = "0.3.5" \ No newline at end of file diff --git a/utils/net_utils/src/download.rs b/utils/net_utils/src/download.rs new file mode 100644 index 000000000000..298d65dd6d33 --- /dev/null +++ b/utils/net_utils/src/download.rs @@ -0,0 +1,114 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use async_std::io::BufRead; +use futures::prelude::*; +use isahc::{Body, HttpClient}; +use pbr::ProgressBar; +use pin_project_lite::pin_project; +use std::convert::TryFrom; +use std::fs::File; +use std::io::{self, BufReader, Read, Result as IOResult, Stdout, Write}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use thiserror::Error; +use url::Url; + +#[derive(Debug, Error)] +enum DownloadError { + #[error("Cannot read a file header")] + HeaderError, +} + +pin_project! { + /// Holds a Reader, tracks read progress and draw a progress bar. + pub struct FetchProgress { + #[pin] + pub inner: R, + pub progress_bar: ProgressBar, + } +} + +impl FetchProgress { + pub fn finish(&mut self) { + self.progress_bar.finish(); + } +} + +impl AsyncRead for FetchProgress { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let r = Pin::new(&mut self.inner).poll_read(cx, buf); + if let Poll::Ready(Ok(size)) = r { + self.progress_bar.add(size as u64); + } + r + } +} + +impl BufRead for FetchProgress { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.inner.poll_fill_buf(cx) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + Pin::new(&mut self.inner).consume(amt) + } +} + +impl Read for FetchProgress { + fn read(&mut self, buf: &mut [u8]) -> IOResult { + self.inner.read(buf).map(|n| { + self.progress_bar.add(n as u64); + n + }) + } +} + +impl TryFrom for FetchProgress { + type Error = Box; + + fn try_from(url: Url) -> Result { + let client = HttpClient::new()?; + let total_size = { + let resp = client.head(url.as_str())?; + if resp.status().is_success() { + resp.headers() + .get("content-length") + .and_then(|ct_len| ct_len.to_str().ok()) + .and_then(|ct_len| ct_len.parse().ok()) + .unwrap_or(0) + } else { + return Err(Box::new(DownloadError::HeaderError)); + } + }; + + let request = client.get(url.as_str())?; + + let pb = ProgressBar::new(total_size); + + Ok(FetchProgress { + progress_bar: pb, + inner: request.into_body(), + }) + } +} + +impl TryFrom for FetchProgress, Stdout> { + type Error = Box; + + fn try_from(file: File) -> Result { + let total_size = file.metadata()?.len(); + + let pb = ProgressBar::new(total_size); + + Ok(FetchProgress { + progress_bar: pb, + inner: BufReader::new(file), + }) + } +} diff --git a/utils/net_utils/src/lib.rs b/utils/net_utils/src/lib.rs new file mode 100644 index 000000000000..9762280cc49e --- /dev/null +++ b/utils/net_utils/src/lib.rs @@ -0,0 +1,6 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +mod download; + +pub use self::download::*; diff --git a/utils/paramfetch/Cargo.toml b/utils/paramfetch/Cargo.toml index 50bdf6305bf5..705ea715e9e7 100644 --- a/utils/paramfetch/Cargo.toml +++ b/utils/paramfetch/Cargo.toml @@ -15,3 +15,6 @@ log = "0.4.8" blake2b_simd = "0.5.9" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +isahc = "0.9.11" +url = "2.1.1" +net_utils = { path = "../net_utils" } diff --git a/utils/paramfetch/src/lib.rs b/utils/paramfetch/src/lib.rs index 1bad444bc15b..239a7e9608da 100644 --- a/utils/paramfetch/src/lib.rs +++ b/utils/paramfetch/src/lib.rs @@ -3,25 +3,22 @@ use async_std::{ fs::{self, File}, - io::{copy, BufRead, BufWriter}, + io::{copy, BufWriter}, sync::{channel, Arc}, task, }; use blake2b_simd::{Hash, State as Blake2b}; use core::time::Duration; use fil_types::SectorSize; -use futures::prelude::*; use log::{info, warn}; -use pbr::{MultiBar, ProgressBar, Units}; -use pin_project_lite::pin_project; +use net_utils::FetchProgress; +use pbr::{MultiBar, Units}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::error::Error as StdError; use std::fs::File as SyncFile; use std::io::{self, copy as sync_copy, BufReader as SyncBufReader, ErrorKind, Stdout}; use std::path::{Path, PathBuf}; -use std::pin::Pin; -use std::task::{Context, Poll}; use surf::Client; const GATEWAY: &str = "https://proofs.filecoin.io/ipfs/"; @@ -149,45 +146,6 @@ async fn fetch_verify_params( }) } -pin_project! { - struct FetchProgress { - #[pin] - inner: R, - progress_bar: ProgressBar, - } -} - -impl FetchProgress { - fn finish(&mut self) { - self.progress_bar.finish(); - } -} - -impl AsyncRead for FetchProgress { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - let r = Pin::new(&mut self.inner).poll_read(cx, buf); - if let Poll::Ready(Ok(size)) = r { - self.progress_bar.add(size as u64); - } - r - } -} - -impl BufRead for FetchProgress { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - this.inner.poll_fill_buf(cx) - } - - fn consume(mut self: Pin<&mut Self>, amt: usize) { - Pin::new(&mut self.inner).consume(amt) - } -} - async fn fetch_params( path: &Path, info: &ParameterData,