From aa999207b3e7eadc5a3c7dcf555395af46d9592f Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Fri, 9 Jun 2023 07:49:19 +0200 Subject: [PATCH 01/16] play around with bitswap --- Cargo.lock | 4 ++ forest/daemon/Cargo.toml | 4 ++ forest/daemon/src/daemon.rs | 89 ++++++++++++++++++++++++++++++++++++- 3 files changed, 95 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6930055cfbc8..228216ecb61c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3046,10 +3046,12 @@ dependencies = [ name = "forest-daemon" version = "0.8.2" dependencies = [ + "ahash 0.8.3", "anyhow", "assert_cmd", "atty", "cfg-if 1.0.0", + "cid", "clap", "daemonize-me", "dialoguer", @@ -3064,6 +3066,7 @@ dependencies = [ "forest_fil_cns", "forest_genesis", "forest_interpreter", + "forest_ipld", "forest_key_management", "forest_libp2p", "forest_message_pool", @@ -3076,6 +3079,7 @@ dependencies = [ "forest_utils", "futures", "fvm_ipld_blockstore", + "fvm_ipld_encoding 0.2.3", "lazy_static", "log", "raw_sync", diff --git a/forest/daemon/Cargo.toml b/forest/daemon/Cargo.toml index f6ad6ad92f8a..795a77162149 100644 --- a/forest/daemon/Cargo.toml +++ b/forest/daemon/Cargo.toml @@ -12,6 +12,10 @@ name = "forest" path = "src/main.rs" [dependencies] +forest_ipld.workspace = true +fvm_ipld_encoding.workspace = true +cid.workspace = true +ahash.workspace = true anyhow.workspace = true atty.workspace = true cfg-if.workspace = true diff --git a/forest/daemon/src/daemon.rs b/forest/daemon/src/daemon.rs index c66988feb79c..da9ac99772ec 100644 --- a/forest/daemon/src/daemon.rs +++ b/forest/daemon/src/daemon.rs @@ -28,7 +28,7 @@ use forest_genesis::{ use forest_key_management::{ KeyStore, KeyStoreConfig, ENCRYPTED_KEYSTORE_NAME, FOREST_KEYSTORE_PHRASE_ENV, }; -use forest_libp2p::{get_keypair, Libp2pConfig, Libp2pService, PeerId, PeerManager}; +use forest_libp2p::{get_keypair, Libp2pConfig, Libp2pService, PeerId, PeerManager, NetworkMessage}; use forest_message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider}; use forest_rpc::start_rpc; use forest_rpc_api::data_types::RPCState; @@ -358,6 +358,7 @@ pub(super) async fn start( let rpc_chain_store = Arc::clone(&chain_store); let gc_event_tx = db_garbage_collector.get_tx(); + let network_send_clone = network_send.clone(); services.spawn(async move { info!("JSON-RPC endpoint started at {}", config.client.rpc_address); // XXX: The JSON error message are a nightmare to print. @@ -368,7 +369,7 @@ pub(super) async fn start( mpool, bad_blocks, sync_state, - network_send, + network_send: network_send_clone, network_name, // TODO: the RPCState can fetch this itself from the StateManager beacon: rpc_state_manager.beacon_schedule(), @@ -435,11 +436,95 @@ pub(super) async fn start( ensure_params_downloaded().await?; services.spawn(p2p_service.run()); + services.spawn(async move { + use ahash::{HashSet, HashSetExt}; + use cid::Cid; + use fvm_ipld_encoding::CborStore; + use forest_ipld::Ipld; + + fn scan_for_links(ipld: forest_ipld::Ipld, seen: &mut HashSet, to_fetch: &mut Vec) { + match ipld { + Ipld::Null => {}, + Ipld::Bool(_) => {}, + Ipld::Integer(_) => {}, + Ipld::Float(_) => {}, + Ipld::String(_) => {}, + Ipld::Bytes(_) => {}, + Ipld::List(list) => { + for elt in list.into_iter() { + scan_for_links(elt, seen, to_fetch); + } + }, + Ipld::Map(map) => { + for (_key, elt) in map.into_iter() { + scan_for_links(elt, seen, to_fetch); + } + }, + Ipld::Link(cid) => { + if cid.codec() == 0x55 { + if seen.insert(cid) { + info!("Found WASM: {cid}"); + } + } + if cid.codec() == 0x71 { + if seen.insert(cid) { + info!("Found link: {cid}"); + to_fetch.push(cid); + } + } + }, + } + } + + tokio::time::sleep(Duration::from_secs(10)).await; + + let mut seen = HashSet::new(); + let mut to_fetch = Vec::new(); + let mut failures = 0; + // mainnet: 1,594,681 + // let root_cid = "bafy2bzaceaclaz3jvmbjg3piazaq5dcesoyv26cdpoozlkzdiwnsvdvm2qoqm".parse::().unwrap(); + // calibnet: 242,150, 21144 cids + let root_cid = "bafy2bzaceb522vvt3wo7xhleo2dvb7wb7pyydmzlahc4aqd7lmvg3afreejiw".parse::().unwrap(); + // calibnet: 630,932, 88594 cids + // let root_cid = "bafy2bzacedidwdsd7ds73t3z76hcjfsaisoxrangkxsqlzih67ulqgtxnypqk".parse::().unwrap(); + seen.insert(root_cid); + to_fetch.push(root_cid); + while let Some(required_cid) = to_fetch.pop() { + info!("Fetching new ipld block. Remaining: {}, Seen: {}", to_fetch.len(), seen.len()); + + let (tx, rx) = flume::bounded(1); + network_send.send_async(NetworkMessage::BitswapRequest{epoch: 0, cid: required_cid, response_channel: tx}).await?; + + let success = tokio::task::spawn_blocking(move || { + rx.recv_timeout(Duration::from_secs_f32(10.0)).unwrap_or_default() + }) + .await + .unwrap_or(false); + + match db.get_cbor::(&required_cid) { + Ok(Some(ipld)) => { + // info!("Request successful"); + scan_for_links(ipld, &mut seen, &mut to_fetch); + }, + Ok(None) => { + failures += 1; + info!("Request failed: {success}, failures: {failures}") + }, + Err(msg) => info!("Failed to decode data: {msg}") + } + + // tokio::task::yield_now().await; + } + info!("All fetches done. Failures: {failures}"); + Ok(()) + }); + // blocking until any of the services returns an error, let err = propagate_error(&mut services).await; anyhow::bail!("services failure: {}", err); } + /// Generates, prints and optionally writes to a file the administrator JWT /// token. fn handle_admin_token(opts: &CliOpts, config: &Config, keystore: &KeyStore) -> anyhow::Result<()> { From 4b99ae4674caf2a6c711918a87ec611ec1c9f4d0 Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Fri, 9 Jun 2023 19:49:09 +0200 Subject: [PATCH 02/16] initial stab at a state fetch command --- forest/cli/src/cli/state_cmd.rs | 18 +++++++++++++++--- forest/cli/src/subcommand.rs | 2 +- forest/daemon/src/daemon.rs | 14 ++++++++++++-- node/rpc-api/src/lib.rs | 4 ++++ node/rpc-client/src/state_ops.rs | 12 ++++++++++++ utils/forest_utils/src/db/file_backed_obj.rs | 2 +- 6 files changed, 45 insertions(+), 7 deletions(-) diff --git a/forest/cli/src/cli/state_cmd.rs b/forest/cli/src/cli/state_cmd.rs index e8c92af2d9ac..8a47b3336202 100644 --- a/forest/cli/src/cli/state_cmd.rs +++ b/forest/cli/src/cli/state_cmd.rs @@ -1,10 +1,14 @@ // Copyright 2019-2023 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use cid::Cid; use clap::Subcommand; +use forest_json::cid::CidJson; +use forest_rpc_client::state_ops::*; use fvm_shared::{clock::ChainEpoch, econ::TokenAmount}; use serde_tuple::{self, Deserialize_tuple, Serialize_tuple}; +use super::handle_rpc_err; use super::Config; #[derive(Serialize_tuple, Deserialize_tuple, Clone, Debug)] @@ -19,11 +23,19 @@ struct VestingScheduleEntry { } #[derive(Debug, Subcommand)] -pub enum StateCommands {} +pub enum StateCommands { + Fetch { root: Cid }, +} impl StateCommands { - pub fn run(&self, _config: Config) -> anyhow::Result<()> { - // match self {} + pub async fn run(self, config: Config) -> anyhow::Result<()> { + match self { + Self::Fetch { root } => { + state_fetch_root((CidJson(root),), &config.client.rpc_token) + .await + .map_err(handle_rpc_err)?; + } + } Ok(()) } } diff --git a/forest/cli/src/subcommand.rs b/forest/cli/src/subcommand.rs index 7d2c8f5df641..464e75201e11 100644 --- a/forest/cli/src/subcommand.rs +++ b/forest/cli/src/subcommand.rs @@ -34,7 +34,7 @@ pub async fn process( Subcommand::Wallet(cmd) => cmd.run(config).await, Subcommand::Sync(cmd) => cmd.run(config).await, Subcommand::Mpool(cmd) => cmd.run(config), - Subcommand::State(cmd) => cmd.run(config), + Subcommand::State(cmd) => cmd.run(config).await, Subcommand::Config(cmd) => cmd.run(&config, &mut std::io::stdout()), Subcommand::Send(cmd) => cmd.run(config).await, Subcommand::DB(cmd) => cmd.run(&config).await, diff --git a/forest/daemon/src/daemon.rs b/forest/daemon/src/daemon.rs index da9ac99772ec..bf17163ca408 100644 --- a/forest/daemon/src/daemon.rs +++ b/forest/daemon/src/daemon.rs @@ -483,14 +483,24 @@ pub(super) async fn start( let mut failures = 0; // mainnet: 1,594,681 // let root_cid = "bafy2bzaceaclaz3jvmbjg3piazaq5dcesoyv26cdpoozlkzdiwnsvdvm2qoqm".parse::().unwrap(); + + // mainnet: 2,933,266 + // let root_cid = "bafy2bzacebyp6cmbshtzzuogzk7icf24pt6s5veyq5zkkqbn3sbbvswtptuuu".parse::().unwrap(); + + // mainnet: 2,833,266 + // let root_cid = "bafy2bzacecaydufxqo5vtouuysmg3tqik6onyuezm6lyviycriohgfnzfslm2".parse::().unwrap(); + + // mainnet: 1_960_320 + let root_cid = "bafy2bzacec43okhmihmnwmgqspyrkuivqtxv75rpymsdbulq6lgsdq2vkwkcg".parse::().unwrap(); + // calibnet: 242,150, 21144 cids - let root_cid = "bafy2bzaceb522vvt3wo7xhleo2dvb7wb7pyydmzlahc4aqd7lmvg3afreejiw".parse::().unwrap(); + // let root_cid = "bafy2bzaceb522vvt3wo7xhleo2dvb7wb7pyydmzlahc4aqd7lmvg3afreejiw".parse::().unwrap(); // calibnet: 630,932, 88594 cids // let root_cid = "bafy2bzacedidwdsd7ds73t3z76hcjfsaisoxrangkxsqlzih67ulqgtxnypqk".parse::().unwrap(); seen.insert(root_cid); to_fetch.push(root_cid); while let Some(required_cid) = to_fetch.pop() { - info!("Fetching new ipld block. Remaining: {}, Seen: {}", to_fetch.len(), seen.len()); + info!("Fetching new ipld block. Remaining: {}, Seen: {}, Failures: {}", to_fetch.len(), seen.len(), failures); let (tx, rx) = flume::bounded(1); network_send.send_async(NetworkMessage::BitswapRequest{epoch: 0, cid: required_cid, response_channel: tx}).await?; diff --git a/node/rpc-api/src/lib.rs b/node/rpc-api/src/lib.rs index 7200b2645549..8c3e4f9a2f4f 100644 --- a/node/rpc-api/src/lib.rs +++ b/node/rpc-api/src/lib.rs @@ -370,6 +370,10 @@ pub mod state_api { pub const STATE_WAIT_MSG: &str = "Filecoin.StateWaitMsg"; pub type StateWaitMsgParams = (CidJson, i64); pub type StateWaitMsgResult = MessageLookup; + + pub const STATE_FETCH_ROOT: &str = "Filecoin.StateFetchRoot"; + pub type StateFetchRootParams = (CidJson,); + pub type StateFetchRootResult = (); } /// Gas API diff --git a/node/rpc-client/src/state_ops.rs b/node/rpc-client/src/state_ops.rs index 3ec2696da862..9851d622bf6f 100644 --- a/node/rpc-client/src/state_ops.rs +++ b/node/rpc-client/src/state_ops.rs @@ -1,2 +1,14 @@ // Copyright 2019-2023 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT + +use forest_rpc_api::state_api::*; +use jsonrpc_v2::Error; + +use crate::call; + +pub async fn state_fetch_root( + params: StateFetchRootParams, + auth_token: &Option, +) -> Result { + call(STATE_FETCH_ROOT, params, auth_token).await +} diff --git a/utils/forest_utils/src/db/file_backed_obj.rs b/utils/forest_utils/src/db/file_backed_obj.rs index 57ae2b45a77f..04c4dd801661 100644 --- a/utils/forest_utils/src/db/file_backed_obj.rs +++ b/utils/forest_utils/src/db/file_backed_obj.rs @@ -18,7 +18,7 @@ pub struct FileBacked { sync_period: Option, } -pub const SYNC_PERIOD: Duration = Duration::from_secs(600); +pub const SYNC_PERIOD: Duration = Duration::from_secs(30); impl FileBacked { /// Gets a borrow of the inner object From d7e58c8237767cbe61f15e77c000b8f21190301c Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Tue, 13 Jun 2023 09:11:19 +0200 Subject: [PATCH 03/16] separate out fetch-root command --- Cargo.lock | 1 + forest/daemon/src/daemon.rs | 98 +------------------------- node/rpc-api/src/lib.rs | 1 + node/rpc/Cargo.toml | 1 + node/rpc/src/lib.rs | 1 + node/rpc/src/state_api.rs | 136 ++++++++++++++++++++++++++++++++++++ 6 files changed, 143 insertions(+), 95 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 458a117d56ab..c2c9f1e5eeb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3695,6 +3695,7 @@ version = "0.8.2" dependencies = [ "ahash 0.8.3", "anyhow", + "async-recursion", "axum", "base64 0.21.2", "cid", diff --git a/forest/daemon/src/daemon.rs b/forest/daemon/src/daemon.rs index bf17163ca408..b91b15882868 100644 --- a/forest/daemon/src/daemon.rs +++ b/forest/daemon/src/daemon.rs @@ -28,7 +28,9 @@ use forest_genesis::{ use forest_key_management::{ KeyStore, KeyStoreConfig, ENCRYPTED_KEYSTORE_NAME, FOREST_KEYSTORE_PHRASE_ENV, }; -use forest_libp2p::{get_keypair, Libp2pConfig, Libp2pService, PeerId, PeerManager, NetworkMessage}; +use forest_libp2p::{ + get_keypair, Libp2pConfig, Libp2pService, PeerId, PeerManager, +}; use forest_message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider}; use forest_rpc::start_rpc; use forest_rpc_api::data_types::RPCState; @@ -436,105 +438,11 @@ pub(super) async fn start( ensure_params_downloaded().await?; services.spawn(p2p_service.run()); - services.spawn(async move { - use ahash::{HashSet, HashSetExt}; - use cid::Cid; - use fvm_ipld_encoding::CborStore; - use forest_ipld::Ipld; - - fn scan_for_links(ipld: forest_ipld::Ipld, seen: &mut HashSet, to_fetch: &mut Vec) { - match ipld { - Ipld::Null => {}, - Ipld::Bool(_) => {}, - Ipld::Integer(_) => {}, - Ipld::Float(_) => {}, - Ipld::String(_) => {}, - Ipld::Bytes(_) => {}, - Ipld::List(list) => { - for elt in list.into_iter() { - scan_for_links(elt, seen, to_fetch); - } - }, - Ipld::Map(map) => { - for (_key, elt) in map.into_iter() { - scan_for_links(elt, seen, to_fetch); - } - }, - Ipld::Link(cid) => { - if cid.codec() == 0x55 { - if seen.insert(cid) { - info!("Found WASM: {cid}"); - } - } - if cid.codec() == 0x71 { - if seen.insert(cid) { - info!("Found link: {cid}"); - to_fetch.push(cid); - } - } - }, - } - } - - tokio::time::sleep(Duration::from_secs(10)).await; - - let mut seen = HashSet::new(); - let mut to_fetch = Vec::new(); - let mut failures = 0; - // mainnet: 1,594,681 - // let root_cid = "bafy2bzaceaclaz3jvmbjg3piazaq5dcesoyv26cdpoozlkzdiwnsvdvm2qoqm".parse::().unwrap(); - - // mainnet: 2,933,266 - // let root_cid = "bafy2bzacebyp6cmbshtzzuogzk7icf24pt6s5veyq5zkkqbn3sbbvswtptuuu".parse::().unwrap(); - - // mainnet: 2,833,266 - // let root_cid = "bafy2bzacecaydufxqo5vtouuysmg3tqik6onyuezm6lyviycriohgfnzfslm2".parse::().unwrap(); - - // mainnet: 1_960_320 - let root_cid = "bafy2bzacec43okhmihmnwmgqspyrkuivqtxv75rpymsdbulq6lgsdq2vkwkcg".parse::().unwrap(); - - // calibnet: 242,150, 21144 cids - // let root_cid = "bafy2bzaceb522vvt3wo7xhleo2dvb7wb7pyydmzlahc4aqd7lmvg3afreejiw".parse::().unwrap(); - // calibnet: 630,932, 88594 cids - // let root_cid = "bafy2bzacedidwdsd7ds73t3z76hcjfsaisoxrangkxsqlzih67ulqgtxnypqk".parse::().unwrap(); - seen.insert(root_cid); - to_fetch.push(root_cid); - while let Some(required_cid) = to_fetch.pop() { - info!("Fetching new ipld block. Remaining: {}, Seen: {}, Failures: {}", to_fetch.len(), seen.len(), failures); - - let (tx, rx) = flume::bounded(1); - network_send.send_async(NetworkMessage::BitswapRequest{epoch: 0, cid: required_cid, response_channel: tx}).await?; - - let success = tokio::task::spawn_blocking(move || { - rx.recv_timeout(Duration::from_secs_f32(10.0)).unwrap_or_default() - }) - .await - .unwrap_or(false); - - match db.get_cbor::(&required_cid) { - Ok(Some(ipld)) => { - // info!("Request successful"); - scan_for_links(ipld, &mut seen, &mut to_fetch); - }, - Ok(None) => { - failures += 1; - info!("Request failed: {success}, failures: {failures}") - }, - Err(msg) => info!("Failed to decode data: {msg}") - } - - // tokio::task::yield_now().await; - } - info!("All fetches done. Failures: {failures}"); - Ok(()) - }); - // blocking until any of the services returns an error, let err = propagate_error(&mut services).await; anyhow::bail!("services failure: {}", err); } - /// Generates, prints and optionally writes to a file the administrator JWT /// token. fn handle_admin_token(opts: &CliOpts, config: &Config, keystore: &KeyStore) -> anyhow::Result<()> { diff --git a/node/rpc-api/src/lib.rs b/node/rpc-api/src/lib.rs index 8c3e4f9a2f4f..372bb4c6ec54 100644 --- a/node/rpc-api/src/lib.rs +++ b/node/rpc-api/src/lib.rs @@ -79,6 +79,7 @@ pub static ACCESS_MAP: Lazy> = Lazy::new(|| { access.insert(state_api::STATE_WAIT_MSG, Access::Read); access.insert(state_api::STATE_NETWORK_NAME, Access::Read); access.insert(state_api::STATE_NETWORK_VERSION, Access::Read); + access.insert(state_api::STATE_FETCH_ROOT, Access::Read); // Gas API access.insert(gas_api::GAS_ESTIMATE_GAS_LIMIT, Access::Read); diff --git a/node/rpc/Cargo.toml b/node/rpc/Cargo.toml index a2ebd5f950f9..f40c1596b013 100644 --- a/node/rpc/Cargo.toml +++ b/node/rpc/Cargo.toml @@ -7,6 +7,7 @@ authors.workspace = true edition = "2021" [dependencies] +async-recursion = "1.0" ahash.workspace = true anyhow.workspace = true axum = { workspace = true, features = ["ws"] } diff --git a/node/rpc/src/lib.rs b/node/rpc/src/lib.rs index 4451ae8b01e0..327038e1e573 100644 --- a/node/rpc/src/lib.rs +++ b/node/rpc/src/lib.rs @@ -119,6 +119,7 @@ where .with_method(STATE_MARKET_DEALS, state_market_deals::) .with_method(STATE_GET_RECEIPT, state_get_receipt::) .with_method(STATE_WAIT_MSG, state_wait_msg::) + .with_method(STATE_FETCH_ROOT,state_fetch_root::) // Gas API .with_method(GAS_ESTIMATE_FEE_CAP, gas_estimate_fee_cap::) .with_method(GAS_ESTIMATE_GAS_LIMIT, gas_estimate_gas_limit::) diff --git a/node/rpc/src/state_api.rs b/node/rpc/src/state_api.rs index 9881c4e593e5..a190736e212f 100644 --- a/node/rpc/src/state_api.rs +++ b/node/rpc/src/state_api.rs @@ -18,6 +18,8 @@ use forest_state_manager::InvocResult; use fvm_ipld_blockstore::Blockstore; use jsonrpc_v2::{Data, Error as JsonRpcError, Params}; use libipld_core::ipld::Ipld; +use log::info; +use std::time::Duration; // TODO handle using configurable verification implementation in RPC (all // defaulting to Full). @@ -186,3 +188,137 @@ pub(crate) async fn state_wait_msg( + data: Data>, + Params((CidJson(root_cid),)): Params, +) -> Result { + { + use ahash::{HashSet, HashSetExt}; + use forest_libp2p::NetworkMessage; + use fvm_ipld_encoding::CborStore; + use parking_lot::Mutex; + use std::ops::DerefMut; + use std::sync::Arc; + + fn scan_for_links(ipld: forest_ipld::Ipld, seen: &mut HashSet, links: &mut Vec) { + match ipld { + Ipld::Null => {} + Ipld::Bool(_) => {} + Ipld::Integer(_) => {} + Ipld::Float(_) => {} + Ipld::String(_) => {} + Ipld::Bytes(_) => {} + Ipld::List(list) => { + for elt in list.into_iter() { + scan_for_links(elt, seen, links); + } + } + Ipld::Map(map) => { + for (_key, elt) in map.into_iter() { + scan_for_links(elt, seen, links); + } + } + Ipld::Link(cid) => { + if cid.codec() == 0x55 { + if seen.insert(cid) { + // info!("Found WASM: {cid}"); + } + } + if cid.codec() == 0x71 { + if seen.insert(cid) { + // info!("Found link: {cid}"); + links.push(cid); + } + } + } + } + } + + // tokio::time::sleep(Duration::from_secs(10)).await; + + let seen = Arc::new(Mutex::new(HashSet::new())); + let sem = Arc::new(tokio::sync::Semaphore::new(16)); + let (work_send, mut work_recv) = tokio::sync::mpsc::channel(1024); + let failures = Arc::new(Mutex::new(0_usize)); + let task_set = tokio::task::JoinSet::new(); + // mainnet: 1,594,681 + // let root_cid = "bafy2bzaceaclaz3jvmbjg3piazaq5dcesoyv26cdpoozlkzdiwnsvdvm2qoqm".parse::().unwrap(); + + // mainnet: 2,933,266 + // let root_cid = "bafy2bzacebyp6cmbshtzzuogzk7icf24pt6s5veyq5zkkqbn3sbbvswtptuuu".parse::().unwrap(); + + // mainnet: 2,833,266 + // let root_cid = "bafy2bzacecaydufxqo5vtouuysmg3tqik6onyuezm6lyviycriohgfnzfslm2".parse::().unwrap(); + + // mainnet: 1_960_320 + // let root_cid = "bafy2bzacec43okhmihmnwmgqspyrkuivqtxv75rpymsdbulq6lgsdq2vkwkcg".parse::().unwrap(); + + // calibnet: 242,150, 21144 cids + // let root_cid = "bafy2bzaceb522vvt3wo7xhleo2dvb7wb7pyydmzlahc4aqd7lmvg3afreejiw".parse::().unwrap(); + // calibnet: 630,932, 88594 cids + // let root_cid = "bafy2bzacedidwdsd7ds73t3z76hcjfsaisoxrangkxsqlzih67ulqgtxnypqk".parse::().unwrap(); + seen.lock().insert(root_cid); + work_send.send(root_cid).await?; + // to_fetch.push(root_cid); + while let Some(required_cid) = work_recv.recv().await { + info!( + "Fetching new ipld block. Seen: {}, Failures: {}, Concurrent: {}", + seen.lock().len(), + failures.lock(), + 16 - sem.available_permits() + ); + + let permit = sem.clone().acquire_owned().await; + tokio::task::spawn({ + let network_send = data.network_send.clone(); + let chain_store = data.chain_store.clone(); + let work_send = work_send.clone(); + let seen = seen.clone(); + let failures = failures.clone(); + async move { + let (tx, rx) = flume::bounded(1); + let _ignore = network_send + .send_async(NetworkMessage::BitswapRequest { + epoch: 0, + cid: required_cid, + response_channel: tx, + }) + .await; + + if !chain_store.db.has(&required_cid).unwrap_or(false) { + let _success = tokio::task::spawn_blocking(move || { + rx.recv_timeout(Duration::from_secs_f32(10.0)) + .unwrap_or_default() + }) + .await + .unwrap_or(false); + } + drop(permit); + + match chain_store.db.get_cbor::(&required_cid) { + Ok(Some(ipld)) => { + // info!("Request successful"); + let mut new_links = Vec::new(); + scan_for_links(ipld, seen.lock().deref_mut(), &mut new_links); + for cid in new_links.into_iter() { + let _ignore_channel_close_errors = work_send.send(cid).await; + } + // forest_ipld::traverse_ipld_links_hash(seen, load_block, ipld, |_|); + } + Ok(None) => { + *failures.lock() += 1; + info!("Request failed: failures: {}", failures.lock()) + } + Err(msg) => info!("Failed to decode data: {msg}"), + } + drop(work_send); + } + }); + + // tokio::task::yield_now().await; + } + info!("All fetches done. Failures: {}", failures.lock()); + Ok(()) + } +} From a7746c23213c0c336dc268a10eca61d07118a392 Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Tue, 13 Jun 2023 11:21:40 +0200 Subject: [PATCH 04/16] clean up state-root-fetch implementation --- node/rpc/src/state_api.rs | 170 +++++++++++++------------------------- 1 file changed, 59 insertions(+), 111 deletions(-) diff --git a/node/rpc/src/state_api.rs b/node/rpc/src/state_api.rs index a190736e212f..3db20eaa10c7 100644 --- a/node/rpc/src/state_api.rs +++ b/node/rpc/src/state_api.rs @@ -189,7 +189,16 @@ pub(crate) async fn state_wait_msg( +// Sample CIDs (useful for testing): +// Mainnet: +// 1,594,681 bafy2bzaceaclaz3jvmbjg3piazaq5dcesoyv26cdpoozlkzdiwnsvdvm2qoqm OhSnap upgrade +// 1_960_320 bafy2bzacec43okhmihmnwmgqspyrkuivqtxv75rpymsdbulq6lgsdq2vkwkcg Skyr upgrade +// 2,833,266 bafy2bzacecaydufxqo5vtouuysmg3tqik6onyuezm6lyviycriohgfnzfslm2 +// 2,933,266 bafy2bzacebyp6cmbshtzzuogzk7icf24pt6s5veyq5zkkqbn3sbbvswtptuuu +// Calibnet: +// 242,150 bafy2bzaceb522vvt3wo7xhleo2dvb7wb7pyydmzlahc4aqd7lmvg3afreejiw +// 630,932 bafy2bzacedidwdsd7ds73t3z76hcjfsaisoxrangkxsqlzih67ulqgtxnypqk +pub(crate) async fn state_fetch_root( data: Data>, Params((CidJson(root_cid),)): Params, ) -> Result { @@ -197,128 +206,67 @@ pub(crate) async fn state_fetch_root, links: &mut Vec) { - match ipld { - Ipld::Null => {} - Ipld::Bool(_) => {} - Ipld::Integer(_) => {} - Ipld::Float(_) => {} - Ipld::String(_) => {} - Ipld::Bytes(_) => {} - Ipld::List(list) => { - for elt in list.into_iter() { - scan_for_links(elt, seen, links); - } - } - Ipld::Map(map) => { - for (_key, elt) in map.into_iter() { - scan_for_links(elt, seen, links); - } - } - Ipld::Link(cid) => { - if cid.codec() == 0x55 { - if seen.insert(cid) { - // info!("Found WASM: {cid}"); - } - } - if cid.codec() == 0x71 { - if seen.insert(cid) { - // info!("Found link: {cid}"); - links.push(cid); - } - } - } - } - } + const MAX_CONCURRENT_REQUESTS: usize = 16; + const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); - // tokio::time::sleep(Duration::from_secs(10)).await; + let sem = Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_REQUESTS)); + let mut seen: HashSet = HashSet::new(); + let mut counter: usize = 0; + let mut failures: usize = 0; + let mut task_set = tokio::task::JoinSet::new(); - let seen = Arc::new(Mutex::new(HashSet::new())); - let sem = Arc::new(tokio::sync::Semaphore::new(16)); - let (work_send, mut work_recv) = tokio::sync::mpsc::channel(1024); - let failures = Arc::new(Mutex::new(0_usize)); - let task_set = tokio::task::JoinSet::new(); - // mainnet: 1,594,681 - // let root_cid = "bafy2bzaceaclaz3jvmbjg3piazaq5dcesoyv26cdpoozlkzdiwnsvdvm2qoqm".parse::().unwrap(); + let mut get_ipld_link = |ipld: &forest_ipld::Ipld| match ipld { + Ipld::Link(cid) if cid.codec() == 0x71 && seen.insert(*cid) => Some(*cid), + _ => None, + }; - // mainnet: 2,933,266 - // let root_cid = "bafy2bzacebyp6cmbshtzzuogzk7icf24pt6s5veyq5zkkqbn3sbbvswtptuuu".parse::().unwrap(); - - // mainnet: 2,833,266 - // let root_cid = "bafy2bzacecaydufxqo5vtouuysmg3tqik6onyuezm6lyviycriohgfnzfslm2".parse::().unwrap(); - - // mainnet: 1_960_320 - // let root_cid = "bafy2bzacec43okhmihmnwmgqspyrkuivqtxv75rpymsdbulq6lgsdq2vkwkcg".parse::().unwrap(); - - // calibnet: 242,150, 21144 cids - // let root_cid = "bafy2bzaceb522vvt3wo7xhleo2dvb7wb7pyydmzlahc4aqd7lmvg3afreejiw".parse::().unwrap(); - // calibnet: 630,932, 88594 cids - // let root_cid = "bafy2bzacedidwdsd7ds73t3z76hcjfsaisoxrangkxsqlzih67ulqgtxnypqk".parse::().unwrap(); - seen.lock().insert(root_cid); - work_send.send(root_cid).await?; - // to_fetch.push(root_cid); - while let Some(required_cid) = work_recv.recv().await { + task_set.spawn(async move { Ok(Ipld::Link(root_cid)) }); + while let Some(result) = task_set.join_next().await { info!( - "Fetching new ipld block. Seen: {}, Failures: {}, Concurrent: {}", - seen.lock().len(), - failures.lock(), - 16 - sem.available_permits() + "Got new ipld block. Fetched: {counter}, Failures: {failures}, Concurrent: {}", + MAX_CONCURRENT_REQUESTS - sem.available_permits() ); + match result? { + Ok(ipld) => { + for new_cid in ipld.iter().filter_map(&mut get_ipld_link) { + counter += 1; + task_set.spawn({ + let network_send = data.network_send.clone(); + let db = data.chain_store.db.clone(); + let sem = sem.clone(); + async move { + if !db.has(&new_cid).unwrap_or(false) { + let permit = sem.acquire_owned().await?; + let (tx, rx) = flume::bounded(1); + network_send + .send_async(NetworkMessage::BitswapRequest { + epoch: 0, + cid: new_cid, + response_channel: tx, + }) + .await?; + let _ignore = + tokio::time::timeout(REQUEST_TIMEOUT, rx.recv_async()) + .await; + drop(permit); + } - let permit = sem.clone().acquire_owned().await; - tokio::task::spawn({ - let network_send = data.network_send.clone(); - let chain_store = data.chain_store.clone(); - let work_send = work_send.clone(); - let seen = seen.clone(); - let failures = failures.clone(); - async move { - let (tx, rx) = flume::bounded(1); - let _ignore = network_send - .send_async(NetworkMessage::BitswapRequest { - epoch: 0, - cid: required_cid, - response_channel: tx, - }) - .await; - - if !chain_store.db.has(&required_cid).unwrap_or(false) { - let _success = tokio::task::spawn_blocking(move || { - rx.recv_timeout(Duration::from_secs_f32(10.0)) - .unwrap_or_default() - }) - .await - .unwrap_or(false); - } - drop(permit); - - match chain_store.db.get_cbor::(&required_cid) { - Ok(Some(ipld)) => { - // info!("Request successful"); - let mut new_links = Vec::new(); - scan_for_links(ipld, seen.lock().deref_mut(), &mut new_links); - for cid in new_links.into_iter() { - let _ignore_channel_close_errors = work_send.send(cid).await; + db.get_cbor::(&new_cid)?.ok_or_else(|| { + anyhow::anyhow!("Request failed: {new_cid}") + }) } - // forest_ipld::traverse_ipld_links_hash(seen, load_block, ipld, |_|); - } - Ok(None) => { - *failures.lock() += 1; - info!("Request failed: failures: {}", failures.lock()) - } - Err(msg) => info!("Failed to decode data: {msg}"), + }); } - drop(work_send); } - }); - - // tokio::task::yield_now().await; + Err(msg) => { + failures += 1; + info!("Request failed: {msg}"); + } + } } - info!("All fetches done. Failures: {}", failures.lock()); + info!("All fetches done. Failures: {}", failures); Ok(()) } } From 25506c7de5477dabbd391889404bd9b2f6710c6f Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Tue, 13 Jun 2023 13:20:08 +0200 Subject: [PATCH 05/16] document state_fetch_root --- API_IMPLEMENTATION.md | 5 +- forest/cli/src/cli/state_cmd.rs | 9 ++- node/rpc-api/src/lib.rs | 2 +- node/rpc/src/state_api.rs | 121 ++++++++++++++++---------------- 4 files changed, 70 insertions(+), 67 deletions(-) diff --git a/API_IMPLEMENTATION.md b/API_IMPLEMENTATION.md index 31a37670c848..20ed3fd237a6 100644 --- a/API_IMPLEMENTATION.md +++ b/API_IMPLEMENTATION.md @@ -2,9 +2,9 @@ ## Stats -- Forest method count: 45 +- Forest method count: 46 - Lotus method count: 173 -- API coverage: 26.01% +- API coverage: 26.59% ## Forest-only Methods @@ -19,6 +19,7 @@ These methods exist in Forest only and cannot be compared: - `Filecoin.ChainValidateTipSetCheckpoints` - `Filecoin.NetAddrsListen` - `Filecoin.NetPeers` +- `Filecoin.StateFetchRoot` - `Filecoin.StateGetReceipt` - `Filecoin.Version` diff --git a/forest/cli/src/cli/state_cmd.rs b/forest/cli/src/cli/state_cmd.rs index 8a47b3336202..943d0b7c135d 100644 --- a/forest/cli/src/cli/state_cmd.rs +++ b/forest/cli/src/cli/state_cmd.rs @@ -31,9 +31,12 @@ impl StateCommands { pub async fn run(self, config: Config) -> anyhow::Result<()> { match self { Self::Fetch { root } => { - state_fetch_root((CidJson(root),), &config.client.rpc_token) - .await - .map_err(handle_rpc_err)?; + println!( + "{}", + state_fetch_root((CidJson(root),), &config.client.rpc_token) + .await + .map_err(handle_rpc_err)? + ); } } Ok(()) diff --git a/node/rpc-api/src/lib.rs b/node/rpc-api/src/lib.rs index 83c1e7b27b82..268c265b033f 100644 --- a/node/rpc-api/src/lib.rs +++ b/node/rpc-api/src/lib.rs @@ -373,7 +373,7 @@ pub mod state_api { pub const STATE_FETCH_ROOT: &str = "Filecoin.StateFetchRoot"; pub type StateFetchRootParams = (CidJson,); - pub type StateFetchRootResult = (); + pub type StateFetchRootResult = String; } /// Gas API diff --git a/node/rpc/src/state_api.rs b/node/rpc/src/state_api.rs index 3db20eaa10c7..73824837c8fb 100644 --- a/node/rpc/src/state_api.rs +++ b/node/rpc/src/state_api.rs @@ -2,13 +2,14 @@ // SPDX-License-Identifier: Apache-2.0, MIT #![allow(clippy::unused_async)] -use ahash::{HashMap, HashMapExt}; +use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use cid::Cid; use fil_actor_interface::market; use forest_beacon::Beacon; use forest_blocks::tipset_keys_json::TipsetKeysJson; use forest_ipld::json::IpldJson; use forest_json::cid::CidJson; +use forest_libp2p::NetworkMessage; use forest_rpc_api::{ data_types::{MarketDeal, MessageLookup, RPCState}, state_api::*, @@ -16,10 +17,11 @@ use forest_rpc_api::{ use forest_shim::address::Address; use forest_state_manager::InvocResult; use fvm_ipld_blockstore::Blockstore; +use fvm_ipld_encoding::CborStore; use jsonrpc_v2::{Data, Error as JsonRpcError, Params}; use libipld_core::ipld::Ipld; -use log::info; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; +use tokio::{sync::Semaphore, task::JoinSet, time::timeout}; // TODO handle using configurable verification implementation in RPC (all // defaulting to Full). @@ -198,75 +200,72 @@ pub(crate) async fn state_wait_msg( data: Data>, Params((CidJson(root_cid),)): Params, ) -> Result { - { - use ahash::{HashSet, HashSetExt}; - use forest_libp2p::NetworkMessage; - use fvm_ipld_encoding::CborStore; - use std::sync::Arc; + const MAX_CONCURRENT_REQUESTS: usize = 16; + const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); - const MAX_CONCURRENT_REQUESTS: usize = 16; - const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS)); + let mut seen: HashSet = HashSet::new(); + let mut counter: usize = 0; + let mut failures: usize = 0; + let mut task_set = JoinSet::new(); - let sem = Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_REQUESTS)); - let mut seen: HashSet = HashSet::new(); - let mut counter: usize = 0; - let mut failures: usize = 0; - let mut task_set = tokio::task::JoinSet::new(); - - let mut get_ipld_link = |ipld: &forest_ipld::Ipld| match ipld { - Ipld::Link(cid) if cid.codec() == 0x71 && seen.insert(*cid) => Some(*cid), - _ => None, - }; + let mut get_ipld_link = |ipld: &Ipld| match ipld { + Ipld::Link(cid) if cid.codec() == 0x71 && seen.insert(*cid) => Some(*cid), + _ => None, + }; - task_set.spawn(async move { Ok(Ipld::Link(root_cid)) }); - while let Some(result) = task_set.join_next().await { - info!( - "Got new ipld block. Fetched: {counter}, Failures: {failures}, Concurrent: {}", - MAX_CONCURRENT_REQUESTS - sem.available_permits() - ); - match result? { - Ok(ipld) => { - for new_cid in ipld.iter().filter_map(&mut get_ipld_link) { - counter += 1; - task_set.spawn({ - let network_send = data.network_send.clone(); - let db = data.chain_store.db.clone(); - let sem = sem.clone(); - async move { - if !db.has(&new_cid).unwrap_or(false) { - let permit = sem.acquire_owned().await?; - let (tx, rx) = flume::bounded(1); - network_send - .send_async(NetworkMessage::BitswapRequest { - epoch: 0, - cid: new_cid, - response_channel: tx, - }) - .await?; - let _ignore = - tokio::time::timeout(REQUEST_TIMEOUT, rx.recv_async()) - .await; - drop(permit); - } + task_set.spawn(async move { Ok(Ipld::Link(root_cid)) }); - db.get_cbor::(&new_cid)?.ok_or_else(|| { - anyhow::anyhow!("Request failed: {new_cid}") - }) - } - }); + while let Some(result) = task_set.join_next().await { + match result? { + Ok(ipld) => { + for new_cid in ipld.iter().filter_map(&mut get_ipld_link) { + counter += 1; + if counter % 1_000 == 0 { + log::debug!( + "Still downloading. Fetched: {counter}, Failures: {failures}, Concurrent: {}", + MAX_CONCURRENT_REQUESTS - sem.available_permits() + ); } - } - Err(msg) => { - failures += 1; - info!("Request failed: {msg}"); + task_set.spawn({ + let network_send = data.network_send.clone(); + let db = data.chain_store.db.clone(); + let sem = sem.clone(); + async move { + if !db.has(&new_cid)? { + let permit = sem.acquire_owned().await?; + let (tx, rx) = flume::bounded(1); + network_send + .send_async(NetworkMessage::BitswapRequest { + epoch: 0, + cid: new_cid, + response_channel: tx, + }) + .await?; + let _ignore = timeout(REQUEST_TIMEOUT, rx.recv_async()).await; + drop(permit); + } + + db.get_cbor::(&new_cid)? + .ok_or_else(|| anyhow::anyhow!("Request failed: {new_cid}")) + } + }); } } + Err(msg) => { + failures += 1; + log::debug!("Request failed: {msg}"); + } } - info!("All fetches done. Failures: {}", failures); - Ok(()) } + Ok(format!("IPLD graph traversed! CIDs: {counter}, failures: {failures}.")) } From b8d1f1f604c7d0b431e255df46ca014da0c137a4 Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Tue, 13 Jun 2023 13:27:36 +0200 Subject: [PATCH 06/16] remove unused dep, format --- Cargo.lock | 1 - forest/daemon/Cargo.toml | 6 +++--- forest/daemon/src/daemon.rs | 4 +--- node/rpc/Cargo.toml | 1 - node/rpc/src/lib.rs | 2 +- node/rpc/src/state_api.rs | 4 +++- 6 files changed, 8 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eea2a78bb19f..cb33bac4245c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3694,7 +3694,6 @@ version = "0.8.2" dependencies = [ "ahash 0.8.3", "anyhow", - "async-recursion", "axum", "base64 0.21.2", "cid", diff --git a/forest/daemon/Cargo.toml b/forest/daemon/Cargo.toml index 795a77162149..fe5c03cf014d 100644 --- a/forest/daemon/Cargo.toml +++ b/forest/daemon/Cargo.toml @@ -12,13 +12,11 @@ name = "forest" path = "src/main.rs" [dependencies] -forest_ipld.workspace = true -fvm_ipld_encoding.workspace = true -cid.workspace = true ahash.workspace = true anyhow.workspace = true atty.workspace = true cfg-if.workspace = true +cid.workspace = true clap.workspace = true daemonize-me = "2.0" dialoguer.workspace = true @@ -33,6 +31,7 @@ forest_deleg_cns = { workspace = true, optional = true } forest_fil_cns = { workspace = true, optional = true } forest_genesis.workspace = true forest_interpreter.workspace = true +forest_ipld.workspace = true forest_key_management.workspace = true forest_libp2p.workspace = true forest_message_pool.workspace = true @@ -45,6 +44,7 @@ forest_state_manager.workspace = true forest_utils.workspace = true futures.workspace = true fvm_ipld_blockstore.workspace = true +fvm_ipld_encoding.workspace = true lazy_static.workspace = true log.workspace = true raw_sync = "0.1" diff --git a/forest/daemon/src/daemon.rs b/forest/daemon/src/daemon.rs index b91b15882868..7b9befe17772 100644 --- a/forest/daemon/src/daemon.rs +++ b/forest/daemon/src/daemon.rs @@ -28,9 +28,7 @@ use forest_genesis::{ use forest_key_management::{ KeyStore, KeyStoreConfig, ENCRYPTED_KEYSTORE_NAME, FOREST_KEYSTORE_PHRASE_ENV, }; -use forest_libp2p::{ - get_keypair, Libp2pConfig, Libp2pService, PeerId, PeerManager, -}; +use forest_libp2p::{get_keypair, Libp2pConfig, Libp2pService, PeerId, PeerManager}; use forest_message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider}; use forest_rpc::start_rpc; use forest_rpc_api::data_types::RPCState; diff --git a/node/rpc/Cargo.toml b/node/rpc/Cargo.toml index f40c1596b013..a2ebd5f950f9 100644 --- a/node/rpc/Cargo.toml +++ b/node/rpc/Cargo.toml @@ -7,7 +7,6 @@ authors.workspace = true edition = "2021" [dependencies] -async-recursion = "1.0" ahash.workspace = true anyhow.workspace = true axum = { workspace = true, features = ["ws"] } diff --git a/node/rpc/src/lib.rs b/node/rpc/src/lib.rs index 327038e1e573..43c7836b887b 100644 --- a/node/rpc/src/lib.rs +++ b/node/rpc/src/lib.rs @@ -119,7 +119,7 @@ where .with_method(STATE_MARKET_DEALS, state_market_deals::) .with_method(STATE_GET_RECEIPT, state_get_receipt::) .with_method(STATE_WAIT_MSG, state_wait_msg::) - .with_method(STATE_FETCH_ROOT,state_fetch_root::) + .with_method(STATE_FETCH_ROOT, state_fetch_root::) // Gas API .with_method(GAS_ESTIMATE_FEE_CAP, gas_estimate_fee_cap::) .with_method(GAS_ESTIMATE_GAS_LIMIT, gas_estimate_gas_limit::) diff --git a/node/rpc/src/state_api.rs b/node/rpc/src/state_api.rs index 73824837c8fb..291ebd80293e 100644 --- a/node/rpc/src/state_api.rs +++ b/node/rpc/src/state_api.rs @@ -267,5 +267,7 @@ pub(crate) async fn state_fetch_root Date: Tue, 13 Jun 2023 13:37:09 +0200 Subject: [PATCH 07/16] remove more unused dependencies --- Cargo.lock | 4 ---- forest/daemon/Cargo.toml | 4 ---- 2 files changed, 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cb33bac4245c..5fc115014ccb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3046,12 +3046,10 @@ dependencies = [ name = "forest-daemon" version = "0.8.2" dependencies = [ - "ahash 0.8.3", "anyhow", "assert_cmd", "atty", "cfg-if 1.0.0", - "cid", "clap", "daemonize-me", "dialoguer", @@ -3066,7 +3064,6 @@ dependencies = [ "forest_fil_cns", "forest_genesis", "forest_interpreter", - "forest_ipld", "forest_key_management", "forest_libp2p", "forest_message_pool", @@ -3079,7 +3076,6 @@ dependencies = [ "forest_utils", "futures", "fvm_ipld_blockstore", - "fvm_ipld_encoding 0.2.3", "lazy_static", "log", "raw_sync", diff --git a/forest/daemon/Cargo.toml b/forest/daemon/Cargo.toml index fe5c03cf014d..f6ad6ad92f8a 100644 --- a/forest/daemon/Cargo.toml +++ b/forest/daemon/Cargo.toml @@ -12,11 +12,9 @@ name = "forest" path = "src/main.rs" [dependencies] -ahash.workspace = true anyhow.workspace = true atty.workspace = true cfg-if.workspace = true -cid.workspace = true clap.workspace = true daemonize-me = "2.0" dialoguer.workspace = true @@ -31,7 +29,6 @@ forest_deleg_cns = { workspace = true, optional = true } forest_fil_cns = { workspace = true, optional = true } forest_genesis.workspace = true forest_interpreter.workspace = true -forest_ipld.workspace = true forest_key_management.workspace = true forest_libp2p.workspace = true forest_message_pool.workspace = true @@ -44,7 +41,6 @@ forest_state_manager.workspace = true forest_utils.workspace = true futures.workspace = true fvm_ipld_blockstore.workspace = true -fvm_ipld_encoding.workspace = true lazy_static.workspace = true log.workspace = true raw_sync = "0.1" From 01afa8982701bb72fb0b55b3da1d6fa56c436787 Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Tue, 13 Jun 2023 14:50:27 +0200 Subject: [PATCH 08/16] Remove unused code. --- forest/daemon/src/daemon.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/forest/daemon/src/daemon.rs b/forest/daemon/src/daemon.rs index 7b9befe17772..c66988feb79c 100644 --- a/forest/daemon/src/daemon.rs +++ b/forest/daemon/src/daemon.rs @@ -358,7 +358,6 @@ pub(super) async fn start( let rpc_chain_store = Arc::clone(&chain_store); let gc_event_tx = db_garbage_collector.get_tx(); - let network_send_clone = network_send.clone(); services.spawn(async move { info!("JSON-RPC endpoint started at {}", config.client.rpc_address); // XXX: The JSON error message are a nightmare to print. @@ -369,7 +368,7 @@ pub(super) async fn start( mpool, bad_blocks, sync_state, - network_send: network_send_clone, + network_send, network_name, // TODO: the RPCState can fetch this itself from the StateManager beacon: rpc_state_manager.beacon_schedule(), From 1e259e2ff65705b7134b39d3548dd2c054651238 Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Tue, 13 Jun 2023 15:26:23 +0200 Subject: [PATCH 09/16] improve documentation --- node/rpc/src/state_api.rs | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/node/rpc/src/state_api.rs b/node/rpc/src/state_api.rs index 291ebd80293e..e0bf11769e0b 100644 --- a/node/rpc/src/state_api.rs +++ b/node/rpc/src/state_api.rs @@ -200,11 +200,11 @@ pub(crate) async fn state_wait_msg( data: Data>, Params((CidJson(root_cid),)): Params, @@ -225,16 +225,18 @@ pub(crate) async fn state_fetch_root { for new_cid in ipld.iter().filter_map(&mut get_ipld_link) { counter += 1; if counter % 1_000 == 0 { + // set RUST_LOG=forest_rpc::state_api=debug to enable these printouts. log::debug!( - "Still downloading. Fetched: {counter}, Failures: {failures}, Concurrent: {}", - MAX_CONCURRENT_REQUESTS - sem.available_permits() - ); + "Still downloading. Fetched: {counter}, Failures: {failures}, Concurrent: {}", + MAX_CONCURRENT_REQUESTS - sem.available_permits() + ); } task_set.spawn({ let network_send = data.network_send.clone(); @@ -242,6 +244,8 @@ pub(crate) async fn state_fetch_root Date: Tue, 13 Jun 2023 15:28:59 +0200 Subject: [PATCH 10/16] enumerate imports --- forest/cli/src/cli/state_cmd.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/forest/cli/src/cli/state_cmd.rs b/forest/cli/src/cli/state_cmd.rs index 943d0b7c135d..889c4f6715f7 100644 --- a/forest/cli/src/cli/state_cmd.rs +++ b/forest/cli/src/cli/state_cmd.rs @@ -4,7 +4,7 @@ use cid::Cid; use clap::Subcommand; use forest_json::cid::CidJson; -use forest_rpc_client::state_ops::*; +use forest_rpc_client::state_ops::state_fetch_root; use fvm_shared::{clock::ChainEpoch, econ::TokenAmount}; use serde_tuple::{self, Deserialize_tuple, Serialize_tuple}; From 7acc8a73e9d394d5c0264b6b49c37a7305b7eb4b Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Tue, 13 Jun 2023 15:33:04 +0200 Subject: [PATCH 11/16] Revert changes to the file backed objects --- utils/forest_utils/src/db/file_backed_obj.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/forest_utils/src/db/file_backed_obj.rs b/utils/forest_utils/src/db/file_backed_obj.rs index 04c4dd801661..57ae2b45a77f 100644 --- a/utils/forest_utils/src/db/file_backed_obj.rs +++ b/utils/forest_utils/src/db/file_backed_obj.rs @@ -18,7 +18,7 @@ pub struct FileBacked { sync_period: Option, } -pub const SYNC_PERIOD: Duration = Duration::from_secs(30); +pub const SYNC_PERIOD: Duration = Duration::from_secs(600); impl FileBacked { /// Gets a borrow of the inner object From 115176fe3968716e7c73c9d9facf9f53b0f4a620 Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Tue, 13 Jun 2023 15:35:06 +0200 Subject: [PATCH 12/16] Update CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d76155706aa9..832f6f59e2da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,8 @@ - [#2706](https://github.com/ChainSafe/forest/issues/2706): implement `Filecoin.ChainSetHead` RPC endpoint and `forest-cli chain set-head` subcommand. +- [#2979](https://github.com/ChainSafe/forest/pull/2979): implement command + for downloading an IPLD graph via bitswap. ### Changed From d5abfacf4bd8e817e7f32f649f02107d4edf5bed Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Tue, 13 Jun 2023 16:27:59 +0200 Subject: [PATCH 13/16] format changelog --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 832f6f59e2da..4b41862b2c33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,8 +41,8 @@ - [#2706](https://github.com/ChainSafe/forest/issues/2706): implement `Filecoin.ChainSetHead` RPC endpoint and `forest-cli chain set-head` subcommand. -- [#2979](https://github.com/ChainSafe/forest/pull/2979): implement command - for downloading an IPLD graph via bitswap. +- [#2979](https://github.com/ChainSafe/forest/pull/2979): implement command for + downloading an IPLD graph via bitswap. ### Changed From a63769bbbc6e49ed9ccc1b5f2bfcca78a3a6b0af Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Wed, 14 Jun 2023 11:23:50 +0200 Subject: [PATCH 14/16] replace magic value with DAG_CBOR constant --- node/rpc/src/state_api.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/rpc/src/state_api.rs b/node/rpc/src/state_api.rs index e0bf11769e0b..04b12c6944e0 100644 --- a/node/rpc/src/state_api.rs +++ b/node/rpc/src/state_api.rs @@ -17,7 +17,7 @@ use forest_rpc_api::{ use forest_shim::address::Address; use forest_state_manager::InvocResult; use fvm_ipld_blockstore::Blockstore; -use fvm_ipld_encoding::CborStore; +use fvm_ipld_encoding::{CborStore, DAG_CBOR}; use jsonrpc_v2::{Data, Error as JsonRpcError, Params}; use libipld_core::ipld::Ipld; use std::{sync::Arc, time::Duration}; @@ -219,7 +219,7 @@ pub(crate) async fn state_fetch_root Some(*cid), + Ipld::Link(cid) if cid.codec() == DAG_CBOR && seen.insert(*cid) => Some(*cid), _ => None, }; From a8a0c9efece0115d0777fc1f75a18e430e446e53 Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Wed, 14 Jun 2023 11:29:17 +0200 Subject: [PATCH 15/16] Write basic smoke test for `forest-cli state fetch` --- scripts/tests/calibnet_other_check.sh | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/scripts/tests/calibnet_other_check.sh b/scripts/tests/calibnet_other_check.sh index 376e26258ae1..56d7d9c205fc 100755 --- a/scripts/tests/calibnet_other_check.sh +++ b/scripts/tests/calibnet_other_check.sh @@ -40,3 +40,13 @@ echo "Test subcommand: chain set-head" $FOREST_CLI_PATH chain set-head --epoch -10 --force $FOREST_CLI_PATH sync wait # allow the node to re-sync + +echo "Test IPLD traversal by walking the genesis block" +# The IPLD graph for the calibnet genesis block contains 1197 CIDs +EXPECTED_WALK="IPLD graph traversed! CIDs: 1197, failures: 0." +ACTUAL_WALK=$($FOREST_CLI_PATH state fetch bafy2bzacecyaggy24wol5ruvs6qm73gjibs2l2iyhcqmvi7r7a4ph7zx3yqd4) +if [[ $EXPECTED_WALK != "$ACTUAL_WALK" ]]; then + printf "Invalid traversal of genesis block:\n%s" "$ACTUAL_WALK" + printf "Expected:\n%s" "$EXPECTED_WALK" + exit 1 +fi From 3733e4524fb9a990761b07a616d39d06b2bda80b Mon Sep 17 00:00:00 2001 From: David Himmelstrup Date: Wed, 14 Jun 2023 11:43:18 +0200 Subject: [PATCH 16/16] Switch from genesis to epoch 1. --- scripts/tests/calibnet_other_check.sh | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/scripts/tests/calibnet_other_check.sh b/scripts/tests/calibnet_other_check.sh index 56d7d9c205fc..208c71467c07 100755 --- a/scripts/tests/calibnet_other_check.sh +++ b/scripts/tests/calibnet_other_check.sh @@ -41,12 +41,13 @@ $FOREST_CLI_PATH chain set-head --epoch -10 --force $FOREST_CLI_PATH sync wait # allow the node to re-sync -echo "Test IPLD traversal by walking the genesis block" -# The IPLD graph for the calibnet genesis block contains 1197 CIDs -EXPECTED_WALK="IPLD graph traversed! CIDs: 1197, failures: 0." -ACTUAL_WALK=$($FOREST_CLI_PATH state fetch bafy2bzacecyaggy24wol5ruvs6qm73gjibs2l2iyhcqmvi7r7a4ph7zx3yqd4) +echo "Test IPLD traversal by fetching the state of epoch 1" +# The IPLD graph for the state-root of epoch 1 contains 1197 CIDs +EXPECTED_WALK="IPLD graph traversed! CIDs: 1195, failures: 0." +# The state-root of epoch 1 can be found here: https://calibration.filscan.io/tipset/chain?hash=bafy2bzaced577h7b7wzib6tryq4w6mnzdwtrjpyii4srahqwfqxsfey5kyxos +ACTUAL_WALK=$($FOREST_CLI_PATH state fetch bafy2bzacedjq7lc42qhlk2iymcpjlanntyzdupc3ckg66gkca6plfjs5m7euo) if [[ $EXPECTED_WALK != "$ACTUAL_WALK" ]]; then - printf "Invalid traversal of genesis block:\n%s" "$ACTUAL_WALK" + printf "Invalid traversal:\n%s" "$ACTUAL_WALK" printf "Expected:\n%s" "$EXPECTED_WALK" exit 1 fi