Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ashanti/connect state transition #454

Merged
merged 35 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
dc95fb8
connect state transition
StaticallyTypedAnxiety May 26, 2020
950d2cc
fixed merge
StaticallyTypedAnxiety May 27, 2020
9c7a76c
changed error messages
StaticallyTypedAnxiety May 27, 2020
d0f6670
fixed comments
StaticallyTypedAnxiety May 27, 2020
d8568b8
Merge branch 'master' into ashanti/connect_state_transition
StaticallyTypedAnxiety May 27, 2020
b409ec5
to reference
StaticallyTypedAnxiety May 27, 2020
0ec6356
Merge branch 'ashanti/connect_state_transition' of https://github.com…
StaticallyTypedAnxiety May 27, 2020
a0c92fe
reverted to some references
StaticallyTypedAnxiety May 27, 2020
b267f8b
few fixes
StaticallyTypedAnxiety May 27, 2020
1137a8f
moved rwlock
StaticallyTypedAnxiety May 28, 2020
75ebfa5
removed comment
StaticallyTypedAnxiety May 28, 2020
f6602fd
using spawn instead of spawn_blocking
StaticallyTypedAnxiety May 28, 2020
cbe6145
added spans
StaticallyTypedAnxiety May 28, 2020
91911f9
Merge branch 'master' into ashanti/connect_state_transition
StaticallyTypedAnxiety May 28, 2020
39cf3fc
updated with rand
StaticallyTypedAnxiety May 28, 2020
cf6432c
Update blockchain/state_manager/src/lib.rs
StaticallyTypedAnxiety May 29, 2020
4f5e513
Update blockchain/state_manager/src/lib.rs
StaticallyTypedAnxiety May 29, 2020
df1c0d4
Update blockchain/chain_sync/src/sync.rs
StaticallyTypedAnxiety May 29, 2020
f8fb735
fixed duplicate check
StaticallyTypedAnxiety May 29, 2020
2c44573
Merge branch 'ashanti/connect_state_transition' of https://github.com…
StaticallyTypedAnxiety May 29, 2020
8d12e2c
use take high order function
StaticallyTypedAnxiety May 29, 2020
fef67a4
a few fixes
StaticallyTypedAnxiety Jun 1, 2020
748a6ec
cargo fmt
StaticallyTypedAnxiety Jun 1, 2020
3feb1e3
Merge branch 'master' into ashanti/connect_state_transition
StaticallyTypedAnxiety Jun 1, 2020
a4d1666
moved complex closure out of boolean
StaticallyTypedAnxiety Jun 1, 2020
c60af0e
Merge branch 'ashanti/connect_state_transition' of https://github.com…
StaticallyTypedAnxiety Jun 1, 2020
27dc689
use spawn blocking
StaticallyTypedAnxiety Jun 1, 2020
a843f13
cargo fmt
StaticallyTypedAnxiety Jun 1, 2020
1136e9a
changed remote url
StaticallyTypedAnxiety Jun 1, 2020
4cc5b8d
Merge branch 'master' into ashanti/connect_state_transition
StaticallyTypedAnxiety Jun 1, 2020
3780cf9
renamed get_bs
StaticallyTypedAnxiety Jun 1, 2020
68a1aab
Merge branch 'ashanti/connect_state_transition' of https://github.com…
StaticallyTypedAnxiety Jun 1, 2020
ebecb26
used cids in blockheader
StaticallyTypedAnxiety Jun 2, 2020
d607c2a
change from len to epoch
StaticallyTypedAnxiety Jun 2, 2020
9b129f3
Merge branch 'master' into ashanti/connect_state_transition
StaticallyTypedAnxiety Jun 2, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion blockchain/blocks/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl BlockHeader {
.ok_or_else(|| Error::InvalidSignature("Signature is nil in header".to_owned()))?;

signature
.verify(&self.cid().to_bytes(), addr)
.verify(&self.cid().to_bytes(), &addr)
.map_err(|e| Error::InvalidSignature(format!("Block signature invalid: {}", e)))?;

Ok(())
Expand Down
1 change: 1 addition & 0 deletions blockchain/chain_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ lru = "0.4.3"
thiserror = "1.0"
num-traits = "0.2"


[dev-dependencies]
test_utils = { version = "0.1.0", path = "../../utils/test_utils/", features = ["test_constructors"] }
71 changes: 46 additions & 25 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub struct ChainSyncer<DB, TBeacon> {
beacon: Arc<TBeacon>,

/// manages retrieving and updates state objects
state_manager: StateManager<DB>,
state_manager: Arc<StateManager<DB>>,

/// Bucket queue for incoming tipsets
sync_queue: SyncBucketSet,
Expand Down Expand Up @@ -98,9 +98,9 @@ struct MsgMetaData {
sequence: u64,
}

impl<DB, TBeacon> ChainSyncer<DB, TBeacon>
impl<DB, TBeacon: 'static> ChainSyncer<DB, TBeacon>
where
TBeacon: Beacon,
TBeacon: Beacon + Send,
DB: BlockStore + Sync + Send + 'static,
{
pub fn new(
Expand All @@ -110,7 +110,7 @@ where
network_rx: Receiver<NetworkEvent>,
genesis: Tipset,
) -> Result<Self, Error> {
let state_manager = StateManager::new(chain_store.db.clone());
let state_manager = Arc::new(StateManager::new(chain_store.db.clone()));

// Split incoming channel to handle blocksync requests
let (rpc_send, rpc_rx) = channel(20);
Expand Down Expand Up @@ -490,13 +490,21 @@ where
Ok(fts)
}
// Block message validation checks
fn check_block_msgs(db: Arc<DB>, block: Block, tip: &Tipset) -> Result<(), Error> {
async fn check_block_msgs(
state_manager: Arc<StateManager<DB>>,
block: Block,
tip: Tipset,
) -> Result<(), Error> {
//do the initial loop here
// Check Block Message and Signatures in them
let mut pub_keys = Vec::new();
let mut cids = Vec::new();
for m in block.bls_msgs() {
let pk = StateManager::get_bls_public_key(&db, m.from(), tip.parent_state())?;
let pk = StateManager::get_bls_public_key(
&state_manager.get_bs(),
m.from(),
tip.parent_state(),
)?;
pub_keys.push(pk);
cids.push(m.cid()?.to_bytes());
}
Expand All @@ -523,6 +531,7 @@ where
"No bls signature included in the block header".to_owned(),
));
}

// check msgs for validity
fn check_msg<M, DB: BlockStore>(
msg: &M,
Expand Down Expand Up @@ -572,9 +581,14 @@ where
Ok(())
}
let mut msg_meta_data: HashMap<Address, MsgMetaData> = HashMap::default();
// TODO retrieve tipset state and load state tree
// temporary
let tree = StateTree::new(db.as_ref());
let db = state_manager.get_bs();
let (state_root, _) = state_manager
.tipset_state(&tip)
.await
.map_err(|_| Error::Validation("Could not update state".to_owned()))?;
let tree = StateTree::new_from_root(db.as_ref(), &state_root).map_err(|_| {
Error::Validation("Could not load from new state root in state manager".to_owned())
})?;
// loop through bls messages and check msg validity
for m in block.bls_msgs() {
check_msg(m, &mut msg_meta_data, &tree)?;
Expand All @@ -600,7 +614,6 @@ where
async fn validate(&self, block: &Block) -> Result<(), Error> {
let mut error_vec: Vec<String> = Vec::new();
let mut validations = FuturesUnordered::new();

let header = block.header();

// check if block has been signed
Expand All @@ -617,28 +630,36 @@ where

let b = block.clone();

let db = Arc::clone(&self.chain_store.db);
let parent_clone = parent_tipset.clone();
// check messages to ensure valid state transitions
let x = task::spawn_blocking(move || Self::check_block_msgs(db, b, &parent_clone));
let sm = self.state_manager.clone();
let x = task::spawn(Self::check_block_msgs(sm, b, parent_clone));
validations.push(x);

// TODO use computed state_root instead of parent_tipset.parent_state()

// block signature check
let (state_root, _) = self
.state_manager
.tipset_state(&parent_tipset)
.await
.map_err(|_| Error::Validation("Could not update state".to_owned()))?;
let work_addr_result = self
.state_manager
.get_miner_work_addr(&parent_tipset.parent_state(), header.miner_address());
.get_miner_work_addr(&state_root, header.miner_address());

// temp header needs to live long enough in static context returned by task::spawn
let signature = block.header().signature().clone();
let cid_bytes = block.header().cid().to_bytes().clone();
match work_addr_result {
Ok(work_addr) => {
let temp_header = header.clone();
let block_sig_task = task::spawn_blocking(move || {
temp_header
.check_block_signature(&work_addr)
.map_err(Error::Blockchain)
});
validations.push(block_sig_task)
}
Ok(_) => validations.push(task::spawn(async move {
signature
.ok_or_else(|| {
Error::Blockchain(blocks::Error::InvalidSignature(
"Signature is nil in header".to_owned(),
))
})?
.verify(&cid_bytes, &work_addr_result.unwrap())
.map_err(|e| Error::Blockchain(blocks::Error::InvalidSignature(e)))
})),
Err(err) => error_vec.push(err.to_string()),
}

Expand Down Expand Up @@ -697,7 +718,7 @@ where
}

for b in fts.blocks() {
if let Err(e) = self.validate(b).await {
if let Err(e) = self.validate(&b).await {
self.bad_blocks.put(b.cid().clone(), e.to_string());
return Err(Error::Other("Invalid blocks detected".to_string()));
}
Expand Down
5 changes: 5 additions & 0 deletions blockchain/state_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ forest_blocks = { path = "../../blockchain/blocks" }
thiserror = "1.0"
interpreter = { path = "../../vm/interpreter/" }
ipld_amt = { path = "../../ipld/amt/" }
clock = { path = "../../node/clock" }
chain = { path = "../chain" }
async-std = "1.5.0"
async-log = "2.0.0"
log = "0.4.8"
97 changes: 95 additions & 2 deletions blockchain/state_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,29 @@ mod errors;
pub use self::errors::*;
use actor::{init, miner, power, ActorState, INIT_ACTOR_ADDR, STORAGE_POWER_ACTOR_ADDR};
use address::{Address, Protocol};
use async_log::span;
use async_std::sync::RwLock;
use blockstore::BlockStore;
use blockstore::BufferedBlockStore;
use chain::ChainStore;
use cid::Cid;
use encoding::de::DeserializeOwned;
use forest_blocks::FullTipset;
use forest_blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys};
use interpreter::{resolve_to_key_addr, ChainRand, DefaultSyscalls, VM};
use ipld_amt::Amt;
use log::trace;
use num_bigint::BigUint;
use state_tree::StateTree;
use std::collections::HashMap;
use std::error::Error as StdError;
use std::sync::Arc;

/// Intermediary for retrieving state objects and updating actor states
pub type CidPair = (Cid, Cid);

pub struct StateManager<DB> {
bs: Arc<DB>,
cache: RwLock<HashMap<TipsetKeys, CidPair>>,
}

impl<DB> StateManager<DB>
Expand All @@ -29,7 +37,10 @@ where
{
/// constructor
pub fn new(bs: Arc<DB>) -> Self {
Self { bs }
Self {
bs,
cache: RwLock::new(HashMap::new()),
}
}
/// Loads actor state from IPLD Store
fn load_actor_state<D>(&self, addr: &Address, state_cid: &Cid) -> Result<D, Error>
Expand All @@ -50,6 +61,11 @@ where
let state = StateTree::new_from_root(self.bs.as_ref(), state_cid).map_err(Error::State)?;
state.get_actor(addr).map_err(Error::State)
}

pub fn get_bs(&self) -> Arc<DB> {
StaticallyTypedAnxiety marked this conversation as resolved.
Show resolved Hide resolved
self.bs.clone()
}

/// Returns the network name from the init actor state
pub fn get_network_name(&self, st: &Cid) -> Result<String, Error> {
let state: init::State = self.load_actor_state(&*INIT_ACTOR_ADDR, st)?;
Expand Down Expand Up @@ -122,6 +138,83 @@ where
Ok((state_root, rect_root))
}

pub async fn tipset_state(&self, tipset: &Tipset) -> Result<(Cid, Cid), Box<dyn StdError>> {
span!("tipset_state", {
trace!("tipset {:?}", tipset.cids());
// if exists in cache return
if let Some(cid_pair) = self.cache.read().await.get(&tipset.key()) {
return Ok(cid_pair.clone());
}

if tipset.len() == 0 {
StaticallyTypedAnxiety marked this conversation as resolved.
Show resolved Hide resolved
StaticallyTypedAnxiety marked this conversation as resolved.
Show resolved Hide resolved
// NB: This is here because the process that executes blocks requires that the
// block miner reference a valid miner in the state tree. Unless we create some
// magical genesis miner, this won't work properly, so we short circuit here
// This avoids the question of 'who gets paid the genesis block reward'
let message_receipts = tipset
.blocks()
.first()
.ok_or_else(|| Error::Other("Could not get message receipts".to_string()))?;
let cid_pair = (
tipset.parent_state().clone(),
message_receipts.message_receipts().clone(),
);
self.cache
.write()
.await
.insert(tipset.key().clone(), cid_pair.clone());
return Ok(cid_pair);
}

let block_headers = tipset.blocks();
// generic constants are not implemented yet this is a lowcost method for now
let cid_pair = self.compute_tipset_state(&block_headers)?;
self.cache
.write()
.await
.insert(tipset.key().clone(), cid_pair.clone());
Ok(cid_pair)
})
}

pub fn compute_tipset_state<'a>(
&'a self,
blocks: &[BlockHeader],
) -> Result<(Cid, Cid), Box<dyn StdError>> {
span!("compute_tipset_state", {
let check_for_duplicates = |s: &BlockHeader| {
blocks
.iter()
.filter(|val| val.miner_address() == s.miner_address())
.take(2)
.count()
};
if blocks.iter().any(|s| check_for_duplicates(s) > 1) {
// Duplicate Miner found
return Err(Box::new(Error::Other(
"Could not get message receipts".to_string(),
)));
}
austinabell marked this conversation as resolved.
Show resolved Hide resolved

let chain_store = ChainStore::new(self.bs.clone());
let blocks = blocks
.iter()
.map::<Result<Block, Box<dyn StdError>>, _>(|s: &BlockHeader| {
let (bls_messages, secp_messages) = chain_store.messages(&s)?;
Ok(Block {
header: s.clone(),
bls_messages,
secp_messages,
})
})
.collect::<Result<Vec<Block>, _>>()?;
// convert tipset to fulltipset
let full_tipset = FullTipset::new(blocks)?;
let chain_rand = ChainRand::new(full_tipset.to_tipset().key().to_owned());
austinabell marked this conversation as resolved.
Show resolved Hide resolved
self.apply_blocks(&full_tipset, &chain_rand)
austinabell marked this conversation as resolved.
Show resolved Hide resolved
})
}

/// Returns a bls public key from provided address
pub fn get_bls_public_key(
db: &Arc<DB>,
Expand Down
2 changes: 1 addition & 1 deletion tests/serialization-vectors