-
Notifications
You must be signed in to change notification settings - Fork 153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: fetch arbitrary state-trees #2979
Changes from all commits
aa99920
4b99ae4
c817627
d7e58c8
a7746c2
1e7c44c
25506c7
2913ca8
b8d1f1f
bb6b6f1
01afa89
1e259e2
f70f808
7acc8a7
115176f
d5abfac
a63769b
a8a0c9e
3733e45
032aa7a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,14 @@ | ||
// Copyright 2019-2023 ChainSafe Systems | ||
// SPDX-License-Identifier: Apache-2.0, MIT | ||
|
||
use forest_rpc_api::state_api::*; | ||
use jsonrpc_v2::Error; | ||
|
||
use crate::call; | ||
|
||
pub async fn state_fetch_root( | ||
params: StateFetchRootParams, | ||
auth_token: &Option<String>, | ||
) -> Result<StateFetchRootResult, Error> { | ||
call(STATE_FETCH_ROOT, params, auth_token).await | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,22 +2,26 @@ | |
// SPDX-License-Identifier: Apache-2.0, MIT | ||
#![allow(clippy::unused_async)] | ||
|
||
use ahash::{HashMap, HashMapExt}; | ||
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; | ||
use cid::Cid; | ||
use fil_actor_interface::market; | ||
use forest_beacon::Beacon; | ||
use forest_blocks::tipset_keys_json::TipsetKeysJson; | ||
use forest_ipld::json::IpldJson; | ||
use forest_json::cid::CidJson; | ||
use forest_libp2p::NetworkMessage; | ||
use forest_rpc_api::{ | ||
data_types::{MarketDeal, MessageLookup, RPCState}, | ||
state_api::*, | ||
}; | ||
use forest_shim::address::Address; | ||
use forest_state_manager::InvocResult; | ||
use fvm_ipld_blockstore::Blockstore; | ||
use fvm_ipld_encoding::{CborStore, DAG_CBOR}; | ||
use jsonrpc_v2::{Data, Error as JsonRpcError, Params}; | ||
use libipld_core::ipld::Ipld; | ||
use std::{sync::Arc, time::Duration}; | ||
use tokio::{sync::Semaphore, task::JoinSet, time::timeout}; | ||
|
||
// TODO handle using configurable verification implementation in RPC (all | ||
// defaulting to Full). | ||
|
@@ -186,3 +190,91 @@ pub(crate) async fn state_wait_msg<DB: Blockstore + Clone + Send + Sync + 'stati | |
return_dec: IpldJson(ipld), | ||
}) | ||
} | ||
|
||
// Sample CIDs (useful for testing): | ||
// Mainnet: | ||
// 1,594,681 bafy2bzaceaclaz3jvmbjg3piazaq5dcesoyv26cdpoozlkzdiwnsvdvm2qoqm OhSnap upgrade | ||
// 1_960_320 bafy2bzacec43okhmihmnwmgqspyrkuivqtxv75rpymsdbulq6lgsdq2vkwkcg Skyr upgrade | ||
// 2,833,266 bafy2bzacecaydufxqo5vtouuysmg3tqik6onyuezm6lyviycriohgfnzfslm2 | ||
// 2,933,266 bafy2bzacebyp6cmbshtzzuogzk7icf24pt6s5veyq5zkkqbn3sbbvswtptuuu | ||
// Calibnet: | ||
// 242,150 bafy2bzaceb522vvt3wo7xhleo2dvb7wb7pyydmzlahc4aqd7lmvg3afreejiw | ||
// 630,932 bafy2bzacedidwdsd7ds73t3z76hcjfsaisoxrangkxsqlzih67ulqgtxnypqk | ||
// | ||
/// Traverse an IPLD directed acyclic graph and use libp2p-bitswap to request any missing nodes. | ||
/// This function has two primary uses: (1) Downloading specific state-roots when Forest deviates | ||
/// from the mainline blockchain, (2) fetching historical state-trees to verify past versions of the | ||
/// consensus rules. | ||
pub(crate) async fn state_fetch_root<DB: Blockstore + Clone + Sync + Send + 'static, B: Beacon>( | ||
data: Data<RPCState<DB, B>>, | ||
Params((CidJson(root_cid),)): Params<StateFetchRootParams>, | ||
) -> Result<StateFetchRootResult, JsonRpcError> { | ||
const MAX_CONCURRENT_REQUESTS: usize = 16; | ||
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); | ||
|
||
let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS)); | ||
let mut seen: HashSet<Cid> = HashSet::new(); | ||
let mut counter: usize = 0; | ||
let mut failures: usize = 0; | ||
let mut task_set = JoinSet::new(); | ||
|
||
let mut get_ipld_link = |ipld: &Ipld| match ipld { | ||
Ipld::Link(cid) if cid.codec() == DAG_CBOR && seen.insert(*cid) => Some(*cid), | ||
_ => None, | ||
}; | ||
|
||
task_set.spawn(async move { Ok(Ipld::Link(root_cid)) }); | ||
|
||
// Iterate until there are no more ipld nodes to traverse | ||
while let Some(result) = task_set.join_next().await { | ||
match result? { | ||
Ok(ipld) => { | ||
for new_cid in ipld.iter().filter_map(&mut get_ipld_link) { | ||
counter += 1; | ||
if counter % 1_000 == 0 { | ||
// set RUST_LOG=forest_rpc::state_api=debug to enable these printouts. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe leave a TODO and refer to #2955 if it's meant to be replaced by the desired progress bar There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Showing a progress bar requires knowing how much work is left. We should show some progress indicators in the client, though. Maybe as a separate PR. |
||
log::debug!( | ||
"Still downloading. Fetched: {counter}, Failures: {failures}, Concurrent: {}", | ||
MAX_CONCURRENT_REQUESTS - sem.available_permits() | ||
); | ||
} | ||
task_set.spawn({ | ||
let network_send = data.network_send.clone(); | ||
let db = data.chain_store.db.clone(); | ||
let sem = sem.clone(); | ||
async move { | ||
if !db.has(&new_cid)? { | ||
// If a CID isn't in our database, request it via bitswap (limited | ||
// by MAX_CONCURRENT_REQUESTS) | ||
let permit = sem.acquire_owned().await?; | ||
let (tx, rx) = flume::bounded(1); | ||
network_send | ||
.send_async(NetworkMessage::BitswapRequest { | ||
epoch: 0, | ||
cid: new_cid, | ||
response_channel: tx, | ||
}) | ||
.await?; | ||
// Bitswap requests do not fail. They are just ignored if no-one has | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI there is a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not available through the |
||
// the requested data. Here we arbitrary decide to only wait for | ||
// REQUEST_TIMEOUT before deciding that the data is unavailable. | ||
let _ignore = timeout(REQUEST_TIMEOUT, rx.recv_async()).await; | ||
drop(permit); | ||
} | ||
|
||
db.get_cbor::<Ipld>(&new_cid)? | ||
.ok_or_else(|| anyhow::anyhow!("Request failed: {new_cid}")) | ||
} | ||
}); | ||
} | ||
} | ||
Err(msg) => { | ||
failures += 1; | ||
log::debug!("Request failed: {msg}"); | ||
} | ||
} | ||
} | ||
Ok(format!( | ||
"IPLD graph traversed! CIDs: {counter}, failures: {failures}." | ||
)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe
CidHashSet
here if collision is not criticalThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use
CidHashSet
ifinsert
didn't take an extra callback function. :)