diff --git a/Cargo.lock b/Cargo.lock index c9fad00392d0..276a0f7f7fa3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2327,6 +2327,7 @@ dependencies = [ "key_management", "libp2p 0.21.1", "log", + "message_pool", "pbr", "pin-project-lite", "pretty_env_logger", @@ -5507,14 +5508,17 @@ dependencies = [ "forest_libp2p", "forest_message", "futures 0.3.5", + "hex", "ipld_blockstore", "jsonrpc-v2", "key_management", + "message_pool", "num-traits 0.2.12", "serde", "serde_json", "state_manager", "state_tree", + "test_utils", "thiserror", "tide", ] diff --git a/blockchain/message_pool/Cargo.toml b/blockchain/message_pool/Cargo.toml index 23bfad93bdf2..00a132d3391a 100644 --- a/blockchain/message_pool/Cargo.toml +++ b/blockchain/message_pool/Cargo.toml @@ -26,6 +26,7 @@ libsecp256k1 = "0.3.4" blake2b_simd = "0.5.10" log = "0.4.8" async-std = "1.6.0" +key_management = { path = "../../key_management"} [dev-dependencies] interpreter = { path = "../../vm/interpreter/" } diff --git a/blockchain/message_pool/src/msgpool.rs b/blockchain/message_pool/src/msgpool.rs index 677bdc901299..594b99a7b1fc 100644 --- a/blockchain/message_pool/src/msgpool.rs +++ b/blockchain/message_pool/src/msgpool.rs @@ -156,9 +156,74 @@ where } } +/// This is the Provider implementation that will be used for the mpool RPC +pub struct MpoolRpcProvider { + subscriber: Subscriber, + db: Arc, +} + +impl MpoolRpcProvider +where + DB: BlockStore, +{ + pub fn new(subscriber: Subscriber, db: Arc) -> Self + where + DB: BlockStore, + { + MpoolRpcProvider { subscriber, db } + } +} + +impl Provider for MpoolRpcProvider +where + DB: BlockStore, +{ + fn subscribe_head_changes(&mut self) -> Subscriber { + self.subscriber.clone() + } + + fn get_heaviest_tipset(&mut self) -> Option { + chain::get_heaviest_tipset(self.db.as_ref()) + .ok() + .unwrap_or(None) + } + + fn put_message(&self, msg: &SignedMessage) -> Result { + let cid = self + .db + .as_ref() + .put(msg, Blake2b256) + .map_err(|err| Error::Other(err.to_string()))?; + Ok(cid) + } + + fn state_get_actor(&self, addr: &Address, ts: &Tipset) -> Result { + let state = + StateTree::new_from_root(self.db.as_ref(), ts.parent_state()).map_err(Error::Other)?; + let actor = state.get_actor(addr).map_err(Error::Other)?; + actor.ok_or_else(|| Error::Other("No actor state".to_owned())) + } + + fn messages_for_block( + &self, + h: &BlockHeader, + ) -> Result<(Vec, Vec), Error> { + chain::block_messages(self.db.as_ref(), h).map_err(|err| err.into()) + } + + fn messages_for_tipset(&self, h: &Tipset) -> Result, Error> { + chain::unsigned_messages_for_tipset(self.db.as_ref(), h).map_err(|err| err.into()) + } + + fn load_tipset(&self, tsk: &TipsetKeys) -> Result { + let ts = chain::tipset_from_keys(self.db.as_ref(), tsk)?; + Ok(ts) + } +} + /// This is the main MessagePool struct pub struct MessagePool { - local_addrs: Vec
, + local_addrs: Arc>>, pending: Arc>>, pub cur_tipset: Arc>, api: Arc>, @@ -166,9 +231,9 @@ pub struct MessagePool { pub max_tx_pool_size: i64, pub network_name: String, bls_sig_cache: Arc>>, - sig_val_cache: LruCache, + sig_val_cache: Arc>>, // TODO look into adding a cap to local_msgs - local_msgs: HashSet, + local_msgs: Arc>>, } impl MessagePool @@ -180,17 +245,19 @@ where where T: Provider, { + let local_addrs = Arc::new(RwLock::new(Vec::new())); // LruCache sizes have been taken from the lotus implementation let pending = Arc::new(RwLock::new(HashMap::new())); let tipset = Arc::new(RwLock::new(api.get_heaviest_tipset().ok_or_else(|| { Error::Other("No ts in api to set as cur_tipset".to_owned()) })?)); let bls_sig_cache = Arc::new(RwLock::new(LruCache::new(40000))); - let sig_val_cache = LruCache::new(32000); + let sig_val_cache = Arc::new(RwLock::new(LruCache::new(32000))); let api_mutex = Arc::new(RwLock::new(api)); + let local_msgs = Arc::new(RwLock::new(HashSet::new())); let mut mp = MessagePool { - local_addrs: Vec::new(), + local_addrs, pending, cur_tipset: tipset, api: api_mutex, @@ -199,7 +266,7 @@ where network_name, bls_sig_cache, sig_val_cache, - local_msgs: HashSet::new(), + local_msgs, }; mp.load_local().await?; @@ -236,23 +303,23 @@ where } /// Add a signed message to local_addrs and local_msgs - fn add_local(&mut self, m: SignedMessage) -> Result<(), Error> { - self.local_addrs.push(*m.from()); - self.local_msgs.insert(m); + async fn add_local(&self, m: SignedMessage) -> Result<(), Error> { + self.local_addrs.write().await.push(*m.from()); + self.local_msgs.write().await.insert(m); Ok(()) } /// Push a signed message to the MessagePool - pub async fn push(&mut self, msg: SignedMessage) -> Result { + pub async fn push(&self, msg: SignedMessage) -> Result { let cid = msg.cid().map_err(|err| Error::Other(err.to_string()))?; self.add(&msg).await?; - self.add_local(msg)?; + self.add_local(msg).await?; Ok(cid) } /// This is a helper to push that will help to make sure that the message fits the parameters /// to be pushed to the MessagePool - pub async fn add(&mut self, msg: &SignedMessage) -> Result<(), Error> { + pub async fn add(&self, msg: &SignedMessage) -> Result<(), Error> { let size = msg.marshal_cbor()?.len(); if size > 32 * 1024 { return Err(Error::MessageTooBig); @@ -261,7 +328,7 @@ where return Err(Error::MessageValueTooHigh); } - self.verify_msg_sig(msg)?; + self.verify_msg_sig(msg).await?; let tmp = msg.clone(); let tip = self.cur_tipset.read().await.clone(); @@ -276,10 +343,10 @@ where /// Verify the message signature. first check if it has already been verified and put into /// cache. If it has not, then manually verify it then put it into cache for future use - fn verify_msg_sig(&mut self, msg: &SignedMessage) -> Result<(), Error> { + async fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> { let cid = msg.cid()?; - if let Some(()) = self.sig_val_cache.get(&cid) { + if let Some(()) = self.sig_val_cache.write().await.get(&cid) { return Ok(()); } @@ -288,14 +355,14 @@ where .verify(umsg.as_slice(), msg.from()) .map_err(Error::Other)?; - self.sig_val_cache.put(cid, ()); + self.sig_val_cache.write().await.put(cid, ()); Ok(()) } /// Verify the state_sequence and balance for the sender of the message given then call add_locked /// to finish adding the signed_message to pending - async fn add_tipset(&mut self, msg: SignedMessage, cur_ts: &Tipset) -> Result<(), Error> { + async fn add_tipset(&self, msg: SignedMessage, cur_ts: &Tipset) -> Result<(), Error> { let sequence = self.get_state_sequence(msg.from(), cur_ts).await?; if sequence > msg.message().sequence() { @@ -314,7 +381,7 @@ where /// Finish verifying signed message before adding it to the pending mset hashmap. If an entry /// in the hashmap does not yet exist, create a new mset that will correspond to the from message /// and push it to the pending phashmap - async fn add_helper(&mut self, msg: SignedMessage) -> Result<(), Error> { + async fn add_helper(&self, msg: SignedMessage) -> Result<(), Error> { add_helper( self.api.as_ref(), self.bls_sig_cache.as_ref(), @@ -368,7 +435,7 @@ where /// Get the state balance for the actor that corresponds to the supplied address and tipset, /// if this actor does not exist, return an error - async fn get_state_balance(&mut self, addr: &Address, ts: &Tipset) -> Result { + async fn get_state_balance(&self, addr: &Address, ts: &Tipset) -> Result { let actor = self.api.read().await.state_get_actor(&addr, &ts)?; Ok(actor.balance) } @@ -417,7 +484,7 @@ where /// Return Vector of signed messages given a block header for self pub async fn messages_for_blocks( - &mut self, + &self, blks: &[BlockHeader], ) -> Result, Error> { let mut msg_vec: Vec = Vec::new(); @@ -437,7 +504,13 @@ where /// Return gas price estimate this has been translated from lotus, a more smart implementation will /// most likely need to be implemented - pub fn estimate_gas_price(&self, nblocksincl: u64) -> Result { + pub fn estimate_gas_price( + &self, + nblocksincl: u64, + _sender: Address, + _gas_limit: u64, + _tsk: TipsetKeys, + ) -> Result { // TODO possibly come up with a smarter way to estimate the gas price let min_gas_price = 0; match nblocksincl { @@ -450,10 +523,11 @@ where /// Load local messages into pending. As of right now messages are not deleted from self's /// local_message field, possibly implement this in the future? pub async fn load_local(&mut self) -> Result<(), Error> { + let mut local_msgs = self.local_msgs.write().await; let mut rm_vec = Vec::new(); - let thing: Vec = self.local_msgs.iter().cloned().collect(); + let msg_vec: Vec = local_msgs.iter().cloned().collect(); - for k in thing { + for k in msg_vec { self.add(&k).await.unwrap_or_else(|err| { if err == Error::SequenceTooLow { warn!("error adding message: {:?}", err); @@ -463,7 +537,7 @@ where } for item in rm_vec { - self.local_msgs.remove(&item); + local_msgs.remove(&item); } Ok(()) @@ -641,34 +715,27 @@ fn add(m: SignedMessage, rmsgs: &mut HashMap>, state_sequence: HashMap, tipsets: Vec, publisher: Publisher, } - impl TestApi { - pub fn new() -> Self { + impl Default for TestApi { + /// Create a new TestApi + fn default() -> Self { TestApi { bmsgs: HashMap::new(), state_sequence: HashMap::new(), @@ -676,17 +743,22 @@ mod tests { publisher: Publisher::new(1), } } + } + impl TestApi { + /// Set the state sequence for an Address for TestApi pub fn set_state_sequence(&mut self, addr: &Address, sequence: u64) { - self.state_sequence.insert(addr.clone(), sequence); + self.state_sequence.insert(*addr, sequence); } + /// Set the block messages for TestApi pub fn set_block_messages(&mut self, h: &BlockHeader, msgs: Vec) { self.bmsgs.insert(h.cid().clone(), msgs); self.tipsets.push(Tipset::new(vec![h.clone()]).unwrap()) } - pub async fn set_heaviest_tipset(&mut self, ts: Arc) -> () { + /// Set the heaviest tipset for TestApi + pub async fn set_heaviest_tipset(&mut self, ts: Arc) { self.publisher.publish(HeadChange::Current(ts)).await } } @@ -707,8 +779,8 @@ mod tests { fn state_get_actor(&self, addr: &Address, _ts: &Tipset) -> Result { let s = self.state_sequence.get(addr); let mut sequence = 0; - if s.is_some() { - sequence = s.unwrap().clone(); + if let Some(sq) = s { + sequence = *sq; } let actor = ActorState::new( Cid::default(), @@ -758,16 +830,34 @@ mod tests { } } - fn create_header(weight: u64, parent_bz: &[u8], cached_bytes: &[u8]) -> BlockHeader { - let header = BlockHeader::builder() + pub fn create_header(weight: u64, parent_bz: &[u8], cached_bytes: &[u8]) -> BlockHeader { + BlockHeader::builder() .weight(BigUint::from(weight)) .cached_bytes(cached_bytes.to_vec()) .cached_cid(Cid::new_from_cbor(parent_bz, Blake2b256)) .miner_address(Address::new_id(0)) .build() - .unwrap(); - header + .unwrap() } +} + +#[cfg(test)] +pub mod tests { + use super::test_provider::*; + use super::*; + use crate::MessagePool; + use address::Address; + use async_std::task; + use blocks::{BlockHeader, Ticket, Tipset}; + use cid::Cid; + use crypto::{election_proof::ElectionProof, SignatureType, VRFProof}; + use key_management::{MemKeyStore, Wallet}; + use message::{SignedMessage, UnsignedMessage}; + use num_bigint::BigUint; + use std::borrow::BorrowMut; + use std::convert::TryFrom; + use std::thread::sleep; + use std::time::Duration; fn create_smsg( to: &Address, @@ -844,11 +934,11 @@ mod tests { let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap(); let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap(); - let mut tma = TestApi::new(); + let mut tma = TestApi::default(); tma.set_state_sequence(&sender, 0); task::block_on(async move { - let mut mpool = MessagePool::new(tma, "mptest".to_string()).await.unwrap(); + let mpool = MessagePool::new(tma, "mptest".to_string()).await.unwrap(); let mut smsg_vec = Vec::new(); for i in 0..4 { @@ -893,7 +983,7 @@ mod tests { #[test] fn test_revert_messages() { - let tma = TestApi::new(); + let tma = TestApi::default(); let mut wallet = Wallet::new(MemKeyStore::new()); let a = mock_block(1, 1); @@ -911,7 +1001,7 @@ mod tests { } task::block_on(async move { - let mut mpool = MessagePool::new(tma, "mptest".to_string()).await.unwrap(); + let mpool = MessagePool::new(tma, "mptest".to_string()).await.unwrap(); let mut api_temp = mpool.api.write().await; api_temp.set_block_messages(&a, vec![smsg_vec[0].clone()]); @@ -991,11 +1081,11 @@ mod tests { let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap(); let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap(); - let mut tma = TestApi::new(); + let mut tma = TestApi::default(); tma.set_state_sequence(&sender, 0); task::block_on(async move { - let mut mpool = MessagePool::new(tma, "mptest".to_string()).await.unwrap(); + let mpool = MessagePool::new(tma, "mptest".to_string()).await.unwrap(); let mut smsg_vec = Vec::new(); for i in 0..3 { diff --git a/forest/Cargo.toml b/forest/Cargo.toml index 2fd6ffa32fe3..6ca547d9ed3f 100644 --- a/forest/Cargo.toml +++ b/forest/Cargo.toml @@ -33,4 +33,5 @@ blake2b_simd = "0.5.9" surf = "2.0.0-alpha.4" pbr = "1.0.3" pin-project-lite = "0.1" +message_pool = { package = "message_pool", path = "../blockchain/message_pool" } wallet = {package = "key_management", path = "../key_management"} \ No newline at end of file diff --git a/forest/src/daemon.rs b/forest/src/daemon.rs index bce42893bf6f..b83b4693de43 100644 --- a/forest/src/daemon.rs +++ b/forest/src/daemon.rs @@ -11,6 +11,7 @@ use db::RocksDb; use forest_libp2p::{get_keypair, Libp2pService}; use libp2p::identity::{ed25519, Keypair}; use log::{debug, info, trace}; +use message_pool::{MessagePool, MpoolRpcProvider}; use rpc::{start_rpc, RpcState}; use std::sync::Arc; use utils::write_to_file; @@ -53,6 +54,15 @@ pub(super) async fn start(config: Config) { let network_rx = p2p_service.network_receiver(); let network_send = p2p_service.network_sender(); + // Initialize mpool + let subscriber = chain_store.subscribe(); + let provider = MpoolRpcProvider::new(subscriber, Arc::clone(&db)); + let mpool = Arc::new( + MessagePool::new(provider, network_name.clone()) + .await + .unwrap(), + ); + // Get Drand Coefficients let coeff = config.drand_dist_public; @@ -91,6 +101,7 @@ pub(super) async fn start(config: Config) { RpcState { store: db_rpc, keystore: keystore_rpc, + mpool, bad_blocks, sync_state, network_send, diff --git a/node/rpc/Cargo.toml b/node/rpc/Cargo.toml index 005a055c1671..7768a484a7fa 100644 --- a/node/rpc/Cargo.toml +++ b/node/rpc/Cargo.toml @@ -17,6 +17,7 @@ blocks = { package = "forest_blocks", path = "../../blockchain/blocks", features clock = { path = "../clock" } message = { package = "forest_message", path = "../../vm/message", features = ["json"] } jsonrpc-v2 = { version = "0.5.2", features = ["easy-errors", "macros"] } +message_pool = { path = "../../blockchain/message_pool" } crypto = { package = "forest_crypto", path = "../../crypto", features = ["json"] } num-traits = "0.2.11" wallet = {package = "key_management", path = "../../key_management", features = ["json"] } @@ -31,3 +32,5 @@ forest_libp2p = { path = "../forest_libp2p" } [dev-dependencies] db = { path = "../db" } futures = "0.3.5" +test_utils = { version = "0.1.0", path = "../../utils/test_utils/", features = ["test_constructors"] } +hex = "0.4.2" \ No newline at end of file diff --git a/node/rpc/src/lib.rs b/node/rpc/src/lib.rs index 385e96914613..6088acf63f87 100644 --- a/node/rpc/src/lib.rs +++ b/node/rpc/src/lib.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT mod chain_api; +mod mpool_api; mod sync_api; mod wallet_api; @@ -10,12 +11,12 @@ use blockstore::BlockStore; use chain_sync::{BadBlockCache, SyncState}; use forest_libp2p::NetworkMessage; use jsonrpc_v2::{Data, MapRouter, RequestObject, Server}; +use message_pool::{MessagePool, MpoolRpcProvider}; use std::sync::Arc; use tide::{Request, Response, StatusCode}; use wallet::KeyStore; /// This is where you store persistant data, or at least access to stateful data. - pub struct RpcState where DB: BlockStore + Send + Sync + 'static, @@ -23,6 +24,7 @@ where { pub store: Arc, pub keystore: Arc>, + pub mpool: Arc>>, pub bad_blocks: Arc, pub sync_state: Arc>, pub network_send: Sender, @@ -41,6 +43,7 @@ where KS: KeyStore + Send + Sync + 'static, { use chain_api::*; + use mpool_api::*; use sync_api::*; use wallet_api::*; @@ -69,6 +72,15 @@ where chain_api::chain_get_block::, ) .with_method("Filecoin.ChainHead", chain_head::) + // Message Pool API + .with_method( + "Filecoin.MpoolEstimateGasPrice", + mpool_estimate_gas_price::, + ) + .with_method("Filecoin.MpoolGetNonce", mpool_get_sequence::) + .with_method("Filecoin.MpoolPending", mpool_pending::) + .with_method("Filecoin.MpoolPush", mpool_push::) + .with_method("Filecoin.MpoolPushMessage", mpool_push_message::) // Sync API .with_method("Filecoin.SyncCheckBad", sync_check_bad::) .with_method("Filecoin.SyncMarkBad", sync_mark_bad::) diff --git a/node/rpc/src/mpool_api.rs b/node/rpc/src/mpool_api.rs new file mode 100644 index 000000000000..e3ec749e195f --- /dev/null +++ b/node/rpc/src/mpool_api.rs @@ -0,0 +1,154 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::RpcState; + +use address::Address; +use blocks::TipsetKeys; +use blockstore::BlockStore; +use cid::json::{vec::CidJsonVec, CidJson}; +use encoding::Cbor; +use jsonrpc_v2::{Data, Error as JsonRpcError, Params}; +use message::Message; +use message::{ + signed_message::json::SignedMessageJson, unsigned_message::json::UnsignedMessageJson, + SignedMessage, +}; +use std::collections::HashSet; +use std::str::FromStr; +use wallet::KeyStore; + +/// Estimate the gas price for an Address +pub(crate) async fn mpool_estimate_gas_price( + data: Data>, + Params(params): Params<(u64, String, u64, TipsetKeys)>, +) -> Result +where + DB: BlockStore + Send + Sync + 'static, + KS: KeyStore + Send + Sync + 'static, +{ + let (nblocks, sender_str, gas_limit, tsk) = params; + let sender = Address::from_str(&sender_str)?; + let price = data + .mpool + .estimate_gas_price(nblocks, sender, gas_limit, tsk)?; + Ok(price.to_string()) +} + +/// get the sequence of given address in mpool +pub(crate) async fn mpool_get_sequence( + data: Data>, + Params(params): Params<(String,)>, +) -> Result +where + DB: BlockStore + Send + Sync + 'static, + KS: KeyStore + Send + Sync + 'static, +{ + let (addr_str,) = params; + let address = Address::from_str(&addr_str)?; + let sequence = data.mpool.get_sequence(&address).await?; + Ok(sequence) +} + +/// Return Vec of pending messages in mpool +pub(crate) async fn mpool_pending( + data: Data>, + Params(params): Params<(CidJsonVec,)>, +) -> Result, JsonRpcError> +where + DB: BlockStore + Send + Sync + 'static, + KS: KeyStore + Send + Sync + 'static, +{ + let (CidJsonVec(cid_vec),) = params; + let tsk = TipsetKeys::new(cid_vec); + let mut ts = chain::tipset_from_keys(data.store.as_ref(), &tsk)?; + + let (mut pending, mpts) = data.mpool.pending().await?; + + let mut have_cids = HashSet::new(); + for item in pending.iter() { + have_cids.insert(item.cid()?); + } + + if mpts.epoch() > ts.epoch() { + return Ok(pending); + } + + loop { + if mpts.epoch() == ts.epoch() { + if mpts == ts { + return Ok(pending); + } + + // mpts has different blocks than ts + let have = data.mpool.as_ref().messages_for_blocks(ts.blocks()).await?; + + for sm in have { + have_cids.insert(sm.cid()?); + } + } + + let msgs = data.mpool.as_ref().messages_for_blocks(ts.blocks()).await?; + + for m in msgs { + if have_cids.contains(&m.cid()?) { + continue; + } + + have_cids.insert(m.cid()?); + pending.push(m); + } + + if mpts.epoch() >= ts.epoch() { + return Ok(pending); + } + + ts = chain::tipset_from_keys(data.store.as_ref(), ts.parents())?; + } +} + +/// Add SignedMessage to mpool, return msg CID +pub(crate) async fn mpool_push( + data: Data>, + Params(params): Params<(SignedMessageJson,)>, +) -> Result +where + DB: BlockStore + Send + Sync + 'static, + KS: KeyStore + Send + Sync + 'static, +{ + let (SignedMessageJson(smsg),) = params; + + let cid = data.mpool.as_ref().push(smsg).await?; + + Ok(CidJson(cid)) +} + +/// Sign given UnsignedMessage and add it to mpool, return SignedMessage +pub(crate) async fn mpool_push_message( + data: Data>, + Params(params): Params<(UnsignedMessageJson,)>, +) -> Result +where + DB: BlockStore + Send + Sync + 'static, + KS: KeyStore + Send + Sync + 'static, +{ + // TODO handle defaults for sequence, gas limit and gas price + let (UnsignedMessageJson(umsg),) = params; + + let from = umsg.from(); + let msg_cid = umsg.cid()?; + + let keystore = data.keystore.as_ref().write().await; + let key = wallet::find_key(&from, &*keystore)?; + let sig = wallet::sign( + *key.key_info.key_type(), + key.key_info.private_key(), + msg_cid.to_bytes().as_slice(), + )?; + + let smsg = SignedMessage::new_from_parts(umsg, sig)?; + + data.mpool.as_ref().push(smsg.clone()).await?; + + Ok(SignedMessageJson(smsg)) +} diff --git a/node/rpc/src/sync_api.rs b/node/rpc/src/sync_api.rs index 3f867fed1154..47b404daca8d 100644 --- a/node/rpc/src/sync_api.rs +++ b/node/rpc/src/sync_api.rs @@ -87,10 +87,14 @@ where mod tests { use super::*; use async_std::sync::{channel, Receiver, RwLock}; + use async_std::task; + use blocks::{BlockHeader, Tipset}; + use chain::ChainStore; use chain_sync::SyncStage; - use db::MemoryDB; + use db::{MemoryDB, Store}; use forest_libp2p::NetworkMessage; use futures::StreamExt; + use message_pool::{MessagePool, MpoolRpcProvider}; use serde_json::from_str; use std::sync::Arc; use wallet::MemKeyStore; @@ -102,9 +106,31 @@ mod tests { Receiver, ) { let (network_send, network_rx) = channel(5); + + let pool = task::block_on(async { + let mut cs = ChainStore::new(Arc::new(MemoryDB::default())); + let bz = hex::decode("8f4200008158207672662070726f6f66303030303030307672662070726f6f663030303030303081408182005820000000000000000000000000000000000000000000000000000000000000000080804000d82a5827000171a0e402206b5f2a7a2c2be076e0635b908016ddca0de082e14d9c8d776a017660628b5bfdd82a5827000171a0e4022001cd927fdccd7938faba323e32e70c44541b8a83f5dc941d90866565ef5af14ad82a5827000171a0e402208d6f0e09e0453685b8816895cd56a7ee2fce600026ee23ac445d78f020c1ca40f61a5ebdc1b8f600").unwrap(); + let header = BlockHeader::unmarshal_cbor(&bz).unwrap(); + let ts = Tipset::new(vec![header]).unwrap(); + let subscriber = cs.subscribe(); + let db = cs.db.clone(); + let tsk = ts.key().cids.clone(); + cs.set_heaviest_tipset(Arc::new(ts)).await.unwrap(); + + for i in tsk { + let bz2 = bz.clone(); + db.as_ref().write(i.key(), bz2).unwrap(); + } + let provider = MpoolRpcProvider::new(subscriber, cs.db); + MessagePool::new(provider, "test".to_string()) + .await + .unwrap() + }); + let state = Arc::new(RpcState { store: Arc::new(MemoryDB::default()), keystore: Arc::new(RwLock::new(wallet::MemKeyStore::new())), + mpool: Arc::new(pool), bad_blocks: Default::default(), sync_state: Default::default(), network_send, diff --git a/vm/address/src/lib.rs b/vm/address/src/lib.rs index 4a14e8da83e7..0f08bb58a884 100644 --- a/vm/address/src/lib.rs +++ b/vm/address/src/lib.rs @@ -44,6 +44,7 @@ const NETWORK_DEFAULT: Network = Network::Testnet; /// Address is the struct that defines the protocol and data payload conversion from either /// a public key or value +/// TODO add Address JSON implementation #[derive(PartialEq, Eq, Clone, Debug, Hash, Copy)] pub struct Address { network: Network,