Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Net API Module + Some other RPC methods #884

Merged
merged 9 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions node/forest_libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ pub use self::behaviour::*;
pub use self::chain_exchange::{ChainExchangeRequest, MESSAGES};
pub use self::config::*;
pub use self::service::*;

// Re-export some libp2p types
pub use libp2p::core::PeerId;
pub use libp2p::multiaddr::Multiaddr;
23 changes: 21 additions & 2 deletions node/forest_libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use futures::channel::oneshot::Sender as OneShotSender;
use futures::select;
use futures_util::stream::StreamExt;
use ipld_blockstore::BlockStore;
use libp2p::core::Multiaddr;
pub use libp2p::gossipsub::Topic;
use libp2p::{
core,
Expand Down Expand Up @@ -104,6 +105,13 @@ pub enum NetworkMessage {
cid: Cid,
response_channel: OneShotSender<()>,
},
JSONRPCRequest {
method: NetRPCMethods,
},
}
#[derive(Debug)]
pub enum NetRPCMethods {
NetAddrsListen(OneShotSender<(PeerId, Vec<Multiaddr>)>),
}
/// The Libp2pService listens to events from the Libp2p swarm.
pub struct Libp2pService<DB> {
Expand Down Expand Up @@ -324,9 +332,20 @@ where
warn!("Failed to send a bitswap want_block: {}", e.to_string());
} else if let Some(chans) = self.bitswap_response_channels.get_mut(&cid) {
chans.push(response_channel);
} else {
self.bitswap_response_channels.insert(cid, vec![response_channel]);
} else {
self.bitswap_response_channels.insert(cid, vec![response_channel]);
}
}
NetworkMessage::JSONRPCRequest { method } => {
match method {
NetRPCMethods::NetAddrsListen(response_channel) => {
let listeners: Vec<_> = Swarm::listeners( swarm_stream.get_mut()).cloned().collect();
let peer_id = Swarm::local_peer_id(swarm_stream.get_mut());
if response_channel.send((peer_id.clone(), listeners)).is_err() {
warn!("Failed to get Libp2p listeners");
}
}
}
}
}
None => { break; }
Expand Down
3 changes: 2 additions & 1 deletion node/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ rand_distr = "0.3"
rand = "0.7"
interpreter = { path = "../../vm/interpreter/" }
fil_types = "0.1"
bitfield = { package = "forest_bitfield", version = "0.1", features = ["json"] }
futures = "0.3.5"
async-tungstenite = "0.9.1"
async-log = "2.0.0"
Expand All @@ -48,9 +47,11 @@ libp2p = { version = "0.24", default-features = false }
vm = { package = "forest_vm", version = "0.3.1" }
base64 = "0.13"
ipld = { package = "forest_ipld", path = "../../ipld", features = ["json"] }
bitfield = { package = "forest_bitfield", version = "0.1", features = ["json"] }

[dev-dependencies]
db = { package = "forest_db", version = "0.1" }
futures = "0.3.5"
test_utils = { version = "0.1.0", path = "../../utils/test_utils/", features = ["test_constructors"] }
hex = "0.4.2"

4 changes: 2 additions & 2 deletions node/rpc/src/chain_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,14 @@ where

pub(crate) async fn chain_get_tipset<DB, KS, B>(
data: Data<RpcState<DB, KS, B>>,
Params(params): Params<(TipsetKeys,)>,
Params(params): Params<(TipsetKeysJson,)>,
) -> Result<TipsetJson, JsonRpcError>
where
DB: BlockStore + Send + Sync + 'static,
KS: KeyStore + Send + Sync + 'static,
B: Beacon + Send + Sync + 'static,
{
let (tsk,) = params;
let (TipsetKeysJson(tsk),) = params;
let ts = data
.state_manager
.chain_store()
Expand Down
14 changes: 13 additions & 1 deletion node/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod chain_api;
mod common_api;
mod gas_api;
mod mpool_api;
mod net_api;
mod state_api;
mod sync_api;
mod wallet_api;
Expand Down Expand Up @@ -122,7 +123,7 @@ where
false,
)
.with_method(
"Filecoin.ChainGetTipset",
"Filecoin.ChainGetTipSet",
chain_get_tipset::<DB, KS, B>,
false,
)
Expand Down Expand Up @@ -274,6 +275,11 @@ where
false,
)
.with_method("Filecoin.StateWaitMsg", state_wait_msg::<DB, KS, B>, false)
.with_method(
"Filecoin.StateMinerSectorAllocated",
state_miner_sector_allocated::<DB, KS, B>,
false,
)
.with_method(
"Filecoin.StateNetworkName",
state_network_name::<DB, KS, B>,
Expand Down Expand Up @@ -319,6 +325,12 @@ where
beacon_get_entry::<DB, KS, B>,
false,
)
// Net
.with_method(
"Filecoin.NetAddrsListen",
net_api::net_addrs_listen::<DB, KS, B>,
false,
)
.finish_unwrapped();

let try_socket = TcpListener::bind(rpc_endpoint).await;
Expand Down
37 changes: 37 additions & 0 deletions node/rpc/src/net_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::RpcState;
use beacon::Beacon;
use blockstore::BlockStore;
use forest_libp2p::{Multiaddr, NetRPCMethods, NetworkMessage};
use futures::channel::oneshot;
use jsonrpc_v2::{Data, Error as JsonRpcError};
use serde::Serialize;
use wallet::KeyStore;

#[derive(Serialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct AddrInfo {
#[serde(rename = "ID")]
id: String,
addrs: Vec<Multiaddr>,
}
pub(crate) async fn net_addrs_listen<
DB: BlockStore + Send + Sync + 'static,
KS: KeyStore + Send + Sync + 'static,
B: Beacon + Send + Sync + 'static,
>(
data: Data<RpcState<DB, KS, B>>,
) -> Result<AddrInfo, JsonRpcError> {
let (tx, rx) = oneshot::channel();
let req = NetworkMessage::JSONRPCRequest {
method: NetRPCMethods::NetAddrsListen(tx),
};
data.network_send.send(req).await;
let (id, addrs) = rx.await?;
Ok(AddrInfo {
id: id.to_string(),
addrs,
})
}
33 changes: 33 additions & 0 deletions node/rpc/src/state_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,39 @@ pub(crate) async fn state_market_deals<
Ok(out)
}

pub(crate) async fn state_miner_sector_allocated<
DB: BlockStore + Send + Sync + 'static,
KS: KeyStore + Send + Sync + 'static,
B: Beacon + Send + Sync + 'static,
>(
data: Data<RpcState<DB, KS, B>>,
Params(params): Params<(AddressJson, u64, TipsetKeysJson)>,
) -> Result<bool, JsonRpcError> {
let (AddressJson(maddr), sector_num, TipsetKeysJson(tsk)) = params;
let ts = data.chain_store.tipset_from_keys(&tsk).await?;

let actor = data
.state_manager
.get_actor(&maddr, ts.parent_state())?
.ok_or(format!("Miner actor {} could not be resolved", maddr))?;
let allocated_sectors = match miner::State::load(data.state_manager.blockstore(), &actor)? {
miner::State::V0(m) => data
.chain_store
.db
.get::<bitfield::BitField>(&m.allocated_sectors)?
.ok_or("allocated sectors bitfield not found")?
.get(sector_num as usize),
miner::State::V2(m) => data
.chain_store
.db
.get::<bitfield::BitField>(&m.allocated_sectors)?
.ok_or("allocated sectors bitfield not found")?
.get(sector_num as usize),
};

Ok(allocated_sectors)
}

/// returns a state tree given a tipset
async fn state_for_ts<DB>(
state_manager: &Arc<StateManager<DB>>,
Expand Down