Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ashanti/connect state transition #454

Merged
merged 35 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
dc95fb8
connect state transition
StaticallyTypedAnxiety May 26, 2020
950d2cc
fixed merge
StaticallyTypedAnxiety May 27, 2020
9c7a76c
changed error messages
StaticallyTypedAnxiety May 27, 2020
d0f6670
fixed comments
StaticallyTypedAnxiety May 27, 2020
d8568b8
Merge branch 'master' into ashanti/connect_state_transition
StaticallyTypedAnxiety May 27, 2020
b409ec5
to reference
StaticallyTypedAnxiety May 27, 2020
0ec6356
Merge branch 'ashanti/connect_state_transition' of https://github.com…
StaticallyTypedAnxiety May 27, 2020
a0c92fe
reverted to some references
StaticallyTypedAnxiety May 27, 2020
b267f8b
few fixes
StaticallyTypedAnxiety May 27, 2020
1137a8f
moved rwlock
StaticallyTypedAnxiety May 28, 2020
75ebfa5
removed comment
StaticallyTypedAnxiety May 28, 2020
f6602fd
using spawn instead of spawn_blocking
StaticallyTypedAnxiety May 28, 2020
cbe6145
added spans
StaticallyTypedAnxiety May 28, 2020
91911f9
Merge branch 'master' into ashanti/connect_state_transition
StaticallyTypedAnxiety May 28, 2020
39cf3fc
updated with rand
StaticallyTypedAnxiety May 28, 2020
cf6432c
Update blockchain/state_manager/src/lib.rs
StaticallyTypedAnxiety May 29, 2020
4f5e513
Update blockchain/state_manager/src/lib.rs
StaticallyTypedAnxiety May 29, 2020
df1c0d4
Update blockchain/chain_sync/src/sync.rs
StaticallyTypedAnxiety May 29, 2020
f8fb735
fixed duplicate check
StaticallyTypedAnxiety May 29, 2020
2c44573
Merge branch 'ashanti/connect_state_transition' of https://github.com…
StaticallyTypedAnxiety May 29, 2020
8d12e2c
use take high order function
StaticallyTypedAnxiety May 29, 2020
fef67a4
a few fixes
StaticallyTypedAnxiety Jun 1, 2020
748a6ec
cargo fmt
StaticallyTypedAnxiety Jun 1, 2020
3feb1e3
Merge branch 'master' into ashanti/connect_state_transition
StaticallyTypedAnxiety Jun 1, 2020
a4d1666
moved complex closure out of boolean
StaticallyTypedAnxiety Jun 1, 2020
c60af0e
Merge branch 'ashanti/connect_state_transition' of https://github.com…
StaticallyTypedAnxiety Jun 1, 2020
27dc689
use spawn blocking
StaticallyTypedAnxiety Jun 1, 2020
a843f13
cargo fmt
StaticallyTypedAnxiety Jun 1, 2020
1136e9a
changed remote url
StaticallyTypedAnxiety Jun 1, 2020
4cc5b8d
Merge branch 'master' into ashanti/connect_state_transition
StaticallyTypedAnxiety Jun 1, 2020
3780cf9
renamed get_bs
StaticallyTypedAnxiety Jun 1, 2020
68a1aab
Merge branch 'ashanti/connect_state_transition' of https://github.com…
StaticallyTypedAnxiety Jun 1, 2020
ebecb26
used cids in blockheader
StaticallyTypedAnxiety Jun 2, 2020
d607c2a
change from len to epoch
StaticallyTypedAnxiety Jun 2, 2020
9b129f3
Merge branch 'master' into ashanti/connect_state_transition
StaticallyTypedAnxiety Jun 2, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions blockchain/blocks/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,14 +294,14 @@ impl BlockHeader {
Ok(())
}
/// Check to ensure block signature is valid
pub fn check_block_signature(&self, addr: &Address) -> Result<(), Error> {
pub fn check_block_signature(&self, addr: Address) -> Result<(), Error> {
StaticallyTypedAnxiety marked this conversation as resolved.
Show resolved Hide resolved
let signature = self
.signature()
.as_ref()
.ok_or_else(|| Error::InvalidSignature("Signature is nil in header".to_owned()))?;

signature
.verify(&self.cid().to_bytes(), addr)
.verify(&self.cid().to_bytes(), &addr)
.map_err(|e| Error::InvalidSignature(format!("Block signature invalid: {}", e)))?;

Ok(())
Expand Down
1 change: 1 addition & 0 deletions blockchain/chain_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ lru = "0.4.3"
thiserror = "1.0"
num-traits = "0.2"


[dev-dependencies]
test_utils = { version = "0.1.0", path = "../../utils/test_utils/", features = ["test_constructors"] }
76 changes: 52 additions & 24 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::peer_manager::PeerManager;
use super::{Error, SyncNetworkContext};
use address::Address;
use amt::Amt;
use async_std::sync::{channel, Receiver, Sender};
use async_std::sync::{channel, Receiver, RwLock, Sender};
use async_std::task;
use beacon::Beacon;
use blocks::{Block, FullTipset, Tipset, TipsetKeys, TxMeta};
Expand All @@ -22,6 +22,7 @@ use encoding::{Cbor, Error as EncodingError};
use forest_libp2p::{
hello::HelloMessage, BlockSyncRequest, NetworkEvent, NetworkMessage, MESSAGES,
};
use futures::future::FutureExt;
use futures::stream::{FuturesUnordered, StreamExt};
use ipld_blockstore::BlockStore;
use libp2p::core::PeerId;
Expand Down Expand Up @@ -65,7 +66,7 @@ pub struct ChainSyncer<DB, TBeacon> {
beacon: Arc<TBeacon>,

/// manages retrieving and updates state objects
state_manager: StateManager<DB>,
state_manager: Arc<RwLock<StateManager<DB>>>,
StaticallyTypedAnxiety marked this conversation as resolved.
Show resolved Hide resolved

/// Bucket queue for incoming tipsets
sync_queue: SyncBucketSet,
Expand Down Expand Up @@ -98,9 +99,9 @@ struct MsgMetaData {
sequence: u64,
}

impl<DB, TBeacon> ChainSyncer<DB, TBeacon>
impl<DB, TBeacon: 'static> ChainSyncer<DB, TBeacon>
where
TBeacon: Beacon,
TBeacon: Beacon + Send,
DB: BlockStore + Sync + Send + 'static,
{
pub fn new(
Expand All @@ -110,7 +111,7 @@ where
network_rx: Receiver<NetworkEvent>,
genesis: Tipset,
) -> Result<Self, Error> {
let state_manager = StateManager::new(chain_store.db.clone());
let state_manager = Arc::new(RwLock::new(StateManager::new(chain_store.db.clone())));

// Split incoming channel to handle blocksync requests
let (rpc_send, rpc_rx) = channel(20);
Expand Down Expand Up @@ -490,7 +491,12 @@ where
Ok(fts)
}
// Block message validation checks
fn check_block_msgs(db: Arc<DB>, block: Block, tip: &Tipset) -> Result<(), Error> {
async fn check_block_msgs(
state_manager: Arc<RwLock<StateManager<DB>>>,
db: Arc<DB>,
block: Block,
tip: Tipset,
) -> Result<(), Error> {
//do the initial loop here
// Check Block Message and Signatures in them
let mut pub_keys = Vec::new();
Expand Down Expand Up @@ -523,6 +529,7 @@ where
"No bls signature included in the block header".to_owned(),
));
}

// check msgs for validity
fn check_msg<M, DB: BlockStore>(
msg: &M,
Expand Down Expand Up @@ -572,9 +579,15 @@ where
Ok(())
}
let mut msg_meta_data: HashMap<Address, MsgMetaData> = HashMap::default();
// TODO retrieve tipset state and load state tree
// temporary
let tree = StateTree::new(db.as_ref());
let (state_root, _) = state_manager
.write()
.await
.tipset_state(&tip)
.map_err(|_| Error::Validation("Could not update state".to_owned()))?;
let database = &*db.clone();
let tree = StateTree::new_from_root(database, &state_root).map_err(|_| {
StaticallyTypedAnxiety marked this conversation as resolved.
Show resolved Hide resolved
Error::Validation("Could not load from new state root in state manager".to_owned())
})?;
// loop through bls messages and check msg validity
for m in block.bls_msgs() {
check_msg(m, &mut msg_meta_data, &tree)?;
Expand All @@ -597,10 +610,9 @@ where
}

/// Validates block semantically according to https://github.com/filecoin-project/specs/blob/6ab401c0b92efb6420c6e198ec387cf56dc86057/validation.md
async fn validate(&self, block: &Block) -> Result<(), Error> {
async fn validate(&self, block: Block) -> Result<(), Error> {
austinabell marked this conversation as resolved.
Show resolved Hide resolved
let mut error_vec: Vec<String> = Vec::new();
let mut validations = FuturesUnordered::new();

let header = block.header();

// check if block has been signed
Expand All @@ -620,30 +632,44 @@ where
let db = Arc::clone(&self.chain_store.db);
let parent_clone = parent_tipset.clone();
// check messages to ensure valid state transitions
let x = task::spawn_blocking(move || Self::check_block_msgs(db, b, &parent_clone));
let sm = self.state_manager.clone();
let x = Self::check_block_msgs(sm, db, b, parent_clone).boxed();
validations.push(x);

// TODO use computed state_root instead of parent_tipset.parent_state()

// block signature check
let (state_root, _) = self
.state_manager
.write()
.await
.tipset_state(&parent_tipset)
.map_err(|_| Error::Validation("Could not update state".to_owned()))?;
let work_addr_result = self
.state_manager
.get_miner_work_addr(&parent_tipset.parent_state(), header.miner_address());
.read()
.await
.get_miner_work_addr(&state_root, header.miner_address());

// temp header needs to live long enough
match work_addr_result {
Ok(work_addr) => {
let temp_header = header.clone();
let block_sig_task = task::spawn_blocking(move || {
temp_header
.check_block_signature(&work_addr)
.map_err(Error::Blockchain)
});
validations.push(block_sig_task)
Ok(_) => {

validations.push(
async {
block
.header()
.check_block_signature(work_addr_result.unwrap().clone()) //work_addr_result lives longer than unwrapped_value in scope
StaticallyTypedAnxiety marked this conversation as resolved.
Show resolved Hide resolved
.map_err(Error::Blockchain)
}
.boxed(),
)
}
Err(err) => error_vec.push(err.to_string()),
}

let slash = self
.state_manager
.read()
.await
.is_miner_slashed(header.miner_address(), &parent_tipset.parent_state())
.unwrap_or_else(|err| {
error_vec.push(err.to_string());
Expand All @@ -662,6 +688,8 @@ where

let power_result = self
.state_manager
.read()
.await
.get_power(&parent_tipset.parent_state(), header.miner_address());
// ticket winner check
match power_result {
Expand Down Expand Up @@ -697,7 +725,7 @@ where
}

for b in fts.blocks() {
if let Err(e) = self.validate(b).await {
if let Err(e) = self.validate(b.clone()).await {
StaticallyTypedAnxiety marked this conversation as resolved.
Show resolved Hide resolved
austinabell marked this conversation as resolved.
Show resolved Hide resolved
self.bad_blocks.put(b.cid().clone(), e.to_string());
return Err(Error::Other("Invalid blocks detected".to_string()));
}
Expand Down
3 changes: 3 additions & 0 deletions blockchain/state_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ forest_blocks = { path = "../../blockchain/blocks" }
thiserror = "1.0"
interpreter = { path = "../../vm/interpreter/" }
ipld_amt = { path = "../../ipld/amt/" }
clock = { path = "../../node/clock" }
chain = { path = "../chain" }
log = "0.4.8"
110 changes: 108 additions & 2 deletions blockchain/state_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,30 @@ use actor::{init, miner, power, ActorState, INIT_ACTOR_ADDR, STORAGE_POWER_ACTOR
use address::{Address, Protocol};
use blockstore::BlockStore;
use blockstore::BufferedBlockStore;
use chain::ChainStore;
use cid::Cid;
use clock::ChainEpoch;
use encoding::de::DeserializeOwned;
use forest_blocks::FullTipset;
use forest_blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys};
use interpreter::{resolve_to_key_addr, DefaultSyscalls, VM};
use ipld_amt::Amt;
use log::trace;
use num_bigint::BigUint;
use state_tree::StateTree;
use std::collections::HashMap;
use std::error::Error as StdError;
use std::sync::Arc;

/// Intermediary for retrieving state objects and updating actor states

pub type CidPair = (Cid, Cid);

StaticallyTypedAnxiety marked this conversation as resolved.
Show resolved Hide resolved
pub type ForkFunctions<'a, DB> =
&'a dyn Fn(&'a StateManager<DB>, Cid) -> Result<Cid, Box<dyn StdError>>;

pub struct StateManager<DB> {
bs: Arc<DB>,
cache: HashMap<TipsetKeys, CidPair>,
}

impl<DB> StateManager<DB>
Expand All @@ -29,7 +40,10 @@ where
{
/// constructor
pub fn new(bs: Arc<DB>) -> Self {
Self { bs }
Self {
bs,
cache: HashMap::new(),
}
}
/// Loads actor state from IPLD Store
fn load_actor_state<D>(&self, addr: &Address, state_cid: &Cid) -> Result<D, Error>
Expand Down Expand Up @@ -117,6 +131,98 @@ where
Ok((state_root, rect_root))
}

pub fn tipset_state(&mut self, tipset: &Tipset) -> Result<(Cid, Cid), Box<dyn StdError>> {
trace!("tipSetState");
StaticallyTypedAnxiety marked this conversation as resolved.
Show resolved Hide resolved

// if exists in cache return
if let Some(cid_pair) = self.cache.get(&tipset.key()) {
return Ok(cid_pair.clone());
}

if tipset.len() == 0 {
ec2 marked this conversation as resolved.
Show resolved Hide resolved
// NB: This is here because the process that executes blocks requires that the
// block miner reference a valid miner in the state tree. Unless we create some
// magical genesis miner, this won't work properly, so we short circuit here
// This avoids the question of 'who gets paid the genesis block reward'
let message_receipts = tipset
.blocks()
.first()
.ok_or_else(|| Error::Other("Could not get message receipts".to_string()))?;
let cid_pair = (
tipset.parent_state().clone(),
message_receipts.message_receipts().clone(),
);
self.cache.insert(tipset.key().clone(), cid_pair.clone());
return Ok(cid_pair);
}

let block_headers = tipset.blocks().to_vec();
// generic constants are not implemented yet this is a lowcost method for now
let cid_pair = self.compute_tipset_state(&block_headers, &HashMap::new())?;
StaticallyTypedAnxiety marked this conversation as resolved.
Show resolved Hide resolved
self.cache.insert(tipset.key().clone(), cid_pair.clone());
Ok(cid_pair)
}

pub fn compute_tipset_state<'a>(
&'a self,
blocks: &[BlockHeader],
forks_at_heights: &'a HashMap<ChainEpoch, ForkFunctions<'a, DB>>,
austinabell marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<(Cid, Cid), Box<dyn StdError>> {
trace!("compute tipset state");
StaticallyTypedAnxiety marked this conversation as resolved.
Show resolved Hide resolved
if blocks.len() > 2
&& blocks
.iter()
.zip(blocks.iter().skip(0))
.any(|(a, b)| a.miner_address() == b.miner_address())
{
// Duplicate Minor found
StaticallyTypedAnxiety marked this conversation as resolved.
Show resolved Hide resolved
return Err(Box::new(Error::Other(
"Could not get message receipts".to_string(),
)));
}

let parents_cid = &blocks
.first()
.ok_or_else(|| Error::Other("Could not get message receipts".to_string()))?
.parents()
.cids;

if !parents_cid.is_empty() {
let parents_first = parents_cid
.first()
.ok_or_else(|| Error::Other("Could not get message receipts".to_string()))?;
self.bs
.get(parents_first)?
.ok_or_else(|| Error::Other("Could not get message receipts".to_string()))?;
// handle state forks
let func_to_execute = forks_at_heights
.get(
&blocks
.first()
.ok_or_else(|| Error::Other("Could not get message receipts".to_string()))?
.epoch(),
)
.ok_or_else(|| Error::Other("Could not get message receipts".to_string()))?;
func_to_execute(&self, parents_first.clone())?;
};

let chain_store = ChainStore::new(self.bs.clone());
let blocks = blocks
.iter()
.map::<Result<Block, Box<dyn StdError>>, _>(|s: &BlockHeader| {
let (bls_messages, secp_messages) = chain_store.messages(&s)?;
Ok(Block {
header: s.clone(),
bls_messages,
secp_messages,
})
})
.collect::<Result<Vec<Block>, _>>()?;
let full_tipset = FullTipset::new(blocks)?;
// convert tipset to fulltipset
self.apply_blocks(&full_tipset)
}
austinabell marked this conversation as resolved.
Show resolved Hide resolved

/// Returns a bls public key from provided address
pub fn get_bls_public_key(
db: &Arc<DB>,
Expand Down
2 changes: 1 addition & 1 deletion tests/serialization-vectors
2 changes: 1 addition & 1 deletion vm/interpreter/src/default_syscalls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ where
.ok_or_else(|| format!("actor state not found {:?}", actor.state.to_string()))?;

let work_address = resolve_to_key_addr(&state, self.store, &ms.info.worker)?;
bh.check_block_signature(&work_address)?;
bh.check_block_signature(work_address)?;
Ok(())
}
}