Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fetch arbitrary state-trees #2979

Merged
merged 20 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions API_IMPLEMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

## Stats

- Forest method count: 45
- Forest method count: 46
- Lotus method count: 173
- API coverage: 26.01%
- API coverage: 26.59%

## Forest-only Methods

Expand All @@ -19,6 +19,7 @@ These methods exist in Forest only and cannot be compared:
- `Filecoin.ChainValidateTipSetCheckpoints`
- `Filecoin.NetAddrsListen`
- `Filecoin.NetPeers`
- `Filecoin.StateFetchRoot`
- `Filecoin.StateGetReceipt`
- `Filecoin.Version`

Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
- [#2706](https://github.com/ChainSafe/forest/issues/2706): implement
`Filecoin.ChainSetHead` RPC endpoint and `forest-cli chain set-head`
subcommand.
- [#2979](https://github.com/ChainSafe/forest/pull/2979): implement command for
downloading an IPLD graph via bitswap.

### Changed

Expand Down
21 changes: 18 additions & 3 deletions forest/cli/src/cli/state_cmd.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
// Copyright 2019-2023 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use cid::Cid;
use clap::Subcommand;
use forest_json::cid::CidJson;
use forest_rpc_client::state_ops::state_fetch_root;
use fvm_shared::{clock::ChainEpoch, econ::TokenAmount};
use serde_tuple::{self, Deserialize_tuple, Serialize_tuple};

use super::handle_rpc_err;
use super::Config;

#[derive(Serialize_tuple, Deserialize_tuple, Clone, Debug)]
Expand All @@ -19,11 +23,22 @@ struct VestingScheduleEntry {
}

#[derive(Debug, Subcommand)]
pub enum StateCommands {}
pub enum StateCommands {
Fetch { root: Cid },
}

impl StateCommands {
pub fn run(&self, _config: Config) -> anyhow::Result<()> {
// match self {}
pub async fn run(self, config: Config) -> anyhow::Result<()> {
match self {
Self::Fetch { root } => {
println!(
"{}",
state_fetch_root((CidJson(root),), &config.client.rpc_token)
.await
.map_err(handle_rpc_err)?
);
}
}
Ok(())
}
}
2 changes: 1 addition & 1 deletion forest/cli/src/subcommand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn process(
Subcommand::Wallet(cmd) => cmd.run(config).await,
Subcommand::Sync(cmd) => cmd.run(config).await,
Subcommand::Mpool(cmd) => cmd.run(config),
Subcommand::State(cmd) => cmd.run(config),
Subcommand::State(cmd) => cmd.run(config).await,
Subcommand::Config(cmd) => cmd.run(&config, &mut std::io::stdout()),
Subcommand::Send(cmd) => cmd.run(config).await,
Subcommand::DB(cmd) => cmd.run(&config).await,
Expand Down
5 changes: 5 additions & 0 deletions node/rpc-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub static ACCESS_MAP: Lazy<HashMap<&str, Access>> = Lazy::new(|| {
access.insert(state_api::STATE_WAIT_MSG, Access::Read);
access.insert(state_api::STATE_NETWORK_NAME, Access::Read);
access.insert(state_api::STATE_NETWORK_VERSION, Access::Read);
access.insert(state_api::STATE_FETCH_ROOT, Access::Read);

// Gas API
access.insert(gas_api::GAS_ESTIMATE_GAS_LIMIT, Access::Read);
Expand Down Expand Up @@ -369,6 +370,10 @@ pub mod state_api {
pub const STATE_WAIT_MSG: &str = "Filecoin.StateWaitMsg";
pub type StateWaitMsgParams = (CidJson, i64);
pub type StateWaitMsgResult = MessageLookup;

pub const STATE_FETCH_ROOT: &str = "Filecoin.StateFetchRoot";
pub type StateFetchRootParams = (CidJson,);
pub type StateFetchRootResult = String;
}

/// Gas API
Expand Down
12 changes: 12 additions & 0 deletions node/rpc-client/src/state_ops.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,14 @@
// Copyright 2019-2023 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use forest_rpc_api::state_api::*;
use jsonrpc_v2::Error;

use crate::call;

pub async fn state_fetch_root(
params: StateFetchRootParams,
auth_token: &Option<String>,
) -> Result<StateFetchRootResult, Error> {
call(STATE_FETCH_ROOT, params, auth_token).await
}
1 change: 1 addition & 0 deletions node/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ where
.with_method(STATE_MARKET_DEALS, state_market_deals::<DB, B>)
.with_method(STATE_GET_RECEIPT, state_get_receipt::<DB, B>)
.with_method(STATE_WAIT_MSG, state_wait_msg::<DB, B>)
.with_method(STATE_FETCH_ROOT, state_fetch_root::<DB, B>)
// Gas API
.with_method(GAS_ESTIMATE_FEE_CAP, gas_estimate_fee_cap::<DB, B>)
.with_method(GAS_ESTIMATE_GAS_LIMIT, gas_estimate_gas_limit::<DB, B>)
Expand Down
94 changes: 93 additions & 1 deletion node/rpc/src/state_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@
// SPDX-License-Identifier: Apache-2.0, MIT
#![allow(clippy::unused_async)]

use ahash::{HashMap, HashMapExt};
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use cid::Cid;
use fil_actor_interface::market;
use forest_beacon::Beacon;
use forest_blocks::tipset_keys_json::TipsetKeysJson;
use forest_ipld::json::IpldJson;
use forest_json::cid::CidJson;
use forest_libp2p::NetworkMessage;
use forest_rpc_api::{
data_types::{MarketDeal, MessageLookup, RPCState},
state_api::*,
};
use forest_shim::address::Address;
use forest_state_manager::InvocResult;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::{CborStore, DAG_CBOR};
use jsonrpc_v2::{Data, Error as JsonRpcError, Params};
use libipld_core::ipld::Ipld;
use std::{sync::Arc, time::Duration};
use tokio::{sync::Semaphore, task::JoinSet, time::timeout};

// TODO handle using configurable verification implementation in RPC (all
// defaulting to Full).
Expand Down Expand Up @@ -186,3 +190,91 @@ pub(crate) async fn state_wait_msg<DB: Blockstore + Clone + Send + Sync + 'stati
return_dec: IpldJson(ipld),
})
}

// Sample CIDs (useful for testing):
// Mainnet:
// 1,594,681 bafy2bzaceaclaz3jvmbjg3piazaq5dcesoyv26cdpoozlkzdiwnsvdvm2qoqm OhSnap upgrade
// 1_960_320 bafy2bzacec43okhmihmnwmgqspyrkuivqtxv75rpymsdbulq6lgsdq2vkwkcg Skyr upgrade
// 2,833,266 bafy2bzacecaydufxqo5vtouuysmg3tqik6onyuezm6lyviycriohgfnzfslm2
// 2,933,266 bafy2bzacebyp6cmbshtzzuogzk7icf24pt6s5veyq5zkkqbn3sbbvswtptuuu
// Calibnet:
// 242,150 bafy2bzaceb522vvt3wo7xhleo2dvb7wb7pyydmzlahc4aqd7lmvg3afreejiw
// 630,932 bafy2bzacedidwdsd7ds73t3z76hcjfsaisoxrangkxsqlzih67ulqgtxnypqk
//
/// Traverse an IPLD directed acyclic graph and use libp2p-bitswap to request any missing nodes.
/// This function has two primary uses: (1) Downloading specific state-roots when Forest deviates
/// from the mainline blockchain, (2) fetching historical state-trees to verify past versions of the
/// consensus rules.
pub(crate) async fn state_fetch_root<DB: Blockstore + Clone + Sync + Send + 'static, B: Beacon>(
data: Data<RPCState<DB, B>>,
Params((CidJson(root_cid),)): Params<StateFetchRootParams>,
) -> Result<StateFetchRootResult, JsonRpcError> {
const MAX_CONCURRENT_REQUESTS: usize = 16;
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);

let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS));
let mut seen: HashSet<Cid> = HashSet::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe CidHashSet here if collision is not critical

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use CidHashSet if insert didn't take an extra callback function. :)

let mut counter: usize = 0;
let mut failures: usize = 0;
let mut task_set = JoinSet::new();

let mut get_ipld_link = |ipld: &Ipld| match ipld {
Ipld::Link(cid) if cid.codec() == DAG_CBOR && seen.insert(*cid) => Some(*cid),
_ => None,
};

task_set.spawn(async move { Ok(Ipld::Link(root_cid)) });

// Iterate until there are no more ipld nodes to traverse
while let Some(result) = task_set.join_next().await {
match result? {
Ok(ipld) => {
for new_cid in ipld.iter().filter_map(&mut get_ipld_link) {
counter += 1;
if counter % 1_000 == 0 {
// set RUST_LOG=forest_rpc::state_api=debug to enable these printouts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe leave a TODO and refer to #2955 if it's meant to be replaced by the desired progress bar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Showing a progress bar requires knowing how much work is left. We should show some progress indicators in the client, though. Maybe as a separate PR.

log::debug!(
"Still downloading. Fetched: {counter}, Failures: {failures}, Concurrent: {}",
MAX_CONCURRENT_REQUESTS - sem.available_permits()
);
}
task_set.spawn({
let network_send = data.network_send.clone();
let db = data.chain_store.db.clone();
let sem = sem.clone();
async move {
if !db.has(&new_cid)? {
// If a CID isn't in our database, request it via bitswap (limited
// by MAX_CONCURRENT_REQUESTS)
let permit = sem.acquire_owned().await?;
let (tx, rx) = flume::bounded(1);
network_send
.send_async(NetworkMessage::BitswapRequest {
epoch: 0,
cid: new_cid,
response_channel: tx,
})
.await?;
// Bitswap requests do not fail. They are just ignored if no-one has
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI there is a send_dont_have option in BitswapRequest to ask peers to respond even if they don't have the requested block, if that could be beneficial here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not available through the NetworkMessage interface.

// the requested data. Here we arbitrary decide to only wait for
// REQUEST_TIMEOUT before deciding that the data is unavailable.
let _ignore = timeout(REQUEST_TIMEOUT, rx.recv_async()).await;
drop(permit);
}

db.get_cbor::<Ipld>(&new_cid)?
.ok_or_else(|| anyhow::anyhow!("Request failed: {new_cid}"))
}
});
}
}
Err(msg) => {
failures += 1;
log::debug!("Request failed: {msg}");
}
}
}
Ok(format!(
"IPLD graph traversed! CIDs: {counter}, failures: {failures}."
))
}
11 changes: 11 additions & 0 deletions scripts/tests/calibnet_other_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,14 @@ echo "Test subcommand: chain set-head"
$FOREST_CLI_PATH chain set-head --epoch -10 --force

$FOREST_CLI_PATH sync wait # allow the node to re-sync

echo "Test IPLD traversal by fetching the state of epoch 1"
# The IPLD graph for the state-root of epoch 1 contains 1197 CIDs
EXPECTED_WALK="IPLD graph traversed! CIDs: 1195, failures: 0."
# The state-root of epoch 1 can be found here: https://calibration.filscan.io/tipset/chain?hash=bafy2bzaced577h7b7wzib6tryq4w6mnzdwtrjpyii4srahqwfqxsfey5kyxos
ACTUAL_WALK=$($FOREST_CLI_PATH state fetch bafy2bzacedjq7lc42qhlk2iymcpjlanntyzdupc3ckg66gkca6plfjs5m7euo)
if [[ $EXPECTED_WALK != "$ACTUAL_WALK" ]]; then
printf "Invalid traversal:\n%s" "$ACTUAL_WALK"
printf "Expected:\n%s" "$EXPECTED_WALK"
exit 1
fi