Skip to content

Commit

Permalink
Apply Blocks and refactor (#374)
Browse files Browse the repository at this point in the history
* Fix syncing logic

* Refactor state tree and setup buffered blockstore

* Implement apply tipset messages in vm and fix logic

* Update based on comments

* Fix bitvec test

* Typo and redundant trait definition
  • Loading branch information
austinabell authored Apr 26, 2020
1 parent 8bc201a commit 5355acd
Show file tree
Hide file tree
Showing 25 changed files with 529 additions and 233 deletions.
8 changes: 4 additions & 4 deletions blockchain/blocks/src/header.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::{EPostProof, Error, FullTipset, Ticket, TipSetKeys};
use super::{EPostProof, Error, Ticket, TipSetKeys, Tipset};
use address::Address;
use cid::{multihash::Blake2b256, Cid};
use clock::ChainEpoch;
Expand Down Expand Up @@ -281,7 +281,7 @@ impl BlockHeader {
Ok(())
}
/// Validates timestamps to ensure BlockHeader was generated at the correct time
pub fn validate_timestamps(&self, base_tipset: &FullTipset) -> Result<(), Error> {
pub fn validate_timestamps(&self, base_tipset: &Tipset) -> Result<(), Error> {
// first check that it is not in the future; see https://github.com/filecoin-project/specs/blob/6ab401c0b92efb6420c6e198ec387cf56dc86057/validation.md
// allowing for some small grace period to deal with small asynchrony
// using ALLOWABLE_CLOCK_DRIFT from Lotus; see https://github.com/filecoin-project/lotus/blob/master/build/params_shared.go#L34:7
Expand All @@ -295,8 +295,8 @@ impl BlockHeader {
const FIXED_BLOCK_DELAY: u64 = 45;
// check that it is appropriately delayed from its parents including null blocks
if self.timestamp()
< base_tipset.tipset()?.min_timestamp()?
+ FIXED_BLOCK_DELAY * (self.epoch() - base_tipset.tipset()?.epoch())
< base_tipset.min_timestamp()?
+ FIXED_BLOCK_DELAY * (self.epoch() - base_tipset.epoch())
{
return Err(Error::Validation(
"Header was generated too soon".to_string(),
Expand Down
32 changes: 30 additions & 2 deletions blockchain/blocks/src/tipset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ impl Tipset {
pub fn blocks(&self) -> &[BlockHeader] {
&self.blocks
}
/// Returns all blocks in tipset
pub fn into_blocks(self) -> Vec<BlockHeader> {
self.blocks
}
/// Returns the smallest ticket of all blocks in the tipset
pub fn min_ticket(&self) -> Result<Ticket, Error> {
if self.blocks.is_empty() {
Expand Down Expand Up @@ -218,12 +222,28 @@ impl FullTipset {
pub fn new(blks: Vec<Block>) -> Self {
Self { blocks: blks }
}
/// Returns all blocks in a full tipset
/// Returns reference to all blocks in a full tipset
pub fn blocks(&self) -> &[Block] {
&self.blocks
}
/// Returns all blocks in a full tipset
pub fn into_blocks(self) -> Vec<Block> {
self.blocks
}
// TODO: conversions from full to regular tipset should not return a result
// and should be validated on creation instead
/// Returns a Tipset
pub fn into_tipset(self) -> Result<Tipset, Error> {
let mut headers = Vec::new();

for block in self.into_blocks() {
headers.push(block.header)
}
let tip: Tipset = Tipset::new(headers)?;
Ok(tip)
}
/// Returns a Tipset
pub fn tipset(&self) -> Result<Tipset, Error> {
pub fn to_tipset(&self) -> Result<Tipset, Error> {
let mut headers = Vec::new();

for block in self.blocks() {
Expand All @@ -232,4 +252,12 @@ impl FullTipset {
let tip: Tipset = Tipset::new(headers)?;
Ok(tip)
}
/// Returns the state root for the tipset parent.
pub fn parent_state(&self) -> &Cid {
self.blocks[0].header().state_root()
}
/// Returns epoch of the tipset
pub fn epoch(&self) -> ChainEpoch {
self.blocks[0].header().epoch()
}
}
10 changes: 5 additions & 5 deletions blockchain/chain/src/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use log::{info, warn};
use message::{SignedMessage, UnsignedMessage};
use num_bigint::BigUint;
use num_traits::Zero;
use state_tree::{HamtStateTree, StateTree};
use state_tree::StateTree;
use std::sync::Arc;

const GENESIS_KEY: &str = "gen_block";
Expand Down Expand Up @@ -230,10 +230,10 @@ where
pub fn fill_tipsets(&self, ts: Tipset) -> Result<FullTipset, Error> {
let mut blocks: Vec<Block> = Vec::with_capacity(ts.blocks().len());

for header in ts.blocks() {
let (bls_messages, secp_messages) = self.messages(header)?;
for header in ts.into_blocks() {
let (bls_messages, secp_messages) = self.messages(&header)?;
blocks.push(Block {
header: header.clone(),
header,
bls_messages,
secp_messages,
});
Expand Down Expand Up @@ -263,7 +263,7 @@ where
}

let mut tpow = BigUint::zero();
let state = HamtStateTree::new_from_root(self.db.as_ref(), ts.parent_state())?;
let state = StateTree::new_from_root(self.db.as_ref(), ts.parent_state())?;
if let Some(act) = state.get_actor(&*STORAGE_POWER_ACTOR_ADDR)? {
if let Some(state) = self
.db
Expand Down
2 changes: 1 addition & 1 deletion blockchain/chain_sync/src/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl SyncNetworkContext {

let ts = bs_res.into_result()?;
ts.iter()
.map(|fts| fts.tipset().map_err(|e| e.to_string()))
.map(|fts| fts.to_tipset().map_err(|e| e.to_string()))
.collect::<Result<_, _>>()
}
/// Send a blocksync request for full tipsets (includes messages)
Expand Down
3 changes: 2 additions & 1 deletion blockchain/chain_sync/src/network_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ impl NetworkHandler {
Some(event) => {
// Update peer on this thread before sending hello
if let NetworkEvent::Hello { source, .. } = &event {
peer_manager.add_peer(source.clone()).await;
// TODO should probably add peer with their tipset/ not handled seperately
peer_manager.add_peer(source.clone(), None).await;
}

// TODO revisit, doing this to avoid blocking this thread but can handle better
Expand Down
31 changes: 24 additions & 7 deletions blockchain/chain_sync/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use async_std::sync::RwLock;
use blocks::Tipset;
use libp2p::core::PeerId;
use log::debug;
use std::collections::HashSet;
use std::collections::HashMap;
use std::sync::Arc;

/// Thread safe peer manager
#[derive(Default)]
pub struct PeerManager {
// TODO potentially separate or expand to handle blocksync peers/ peers that haven't sent hello
/// Hash set of full peers available
full_peers: RwLock<HashSet<PeerId>>,
full_peers: RwLock<HashMap<PeerId, Option<Arc<Tipset>>>>,
}

impl PeerManager {
/// Adds a PeerId to the set of managed peers
pub async fn add_peer(&self, peer_id: PeerId) {
pub async fn add_peer(&self, peer_id: PeerId, ts: Option<Arc<Tipset>>) {
debug!("Added PeerId to full peers list: {}", &peer_id);
self.full_peers.write().await.insert(peer_id);
self.full_peers.write().await.insert(peer_id, ts);
}

/// Returns true if peer set is empty
Expand All @@ -28,13 +31,27 @@ impl PeerManager {
/// Retrieves a cloned PeerId to be used to send network request
pub async fn get_peer(&self) -> Option<PeerId> {
// TODO replace this with a shuffled or more random sample
self.full_peers.read().await.iter().next().cloned()
self.full_peers
.read()
.await
.iter()
.next()
.map(|(k, _)| k.clone())
}

/// Retrieves all tipsets from current peer set
pub async fn get_peer_heads(&self) -> Vec<Arc<Tipset>> {
self.full_peers
.read()
.await
.iter()
.filter_map(|(_, v)| v.clone())
.collect()
}

/// Removes a peer from the set and returns true if the value was present previously
pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
// TODO replace this with a shuffled or more random sample
self.full_peers.write().await.remove(peer_id)
self.full_peers.write().await.remove(peer_id).is_some()
}

/// Gets count of full peers managed
Expand Down
88 changes: 35 additions & 53 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use log::{debug, info, warn};
use lru::LruCache;
use message::{Message, SignedMessage, UnsignedMessage};
use state_manager::StateManager;
use state_tree::{HamtStateTree, StateTree};
use state_tree::StateTree;
use std::cmp::min;
use std::collections::HashMap;
use std::convert::TryFrom;
Expand Down Expand Up @@ -186,7 +186,7 @@ where
)
.await
{
if self.inform_new_head(&fts).await.is_err() {
if self.inform_new_head(source.clone(), &fts).await.is_err() {
warn!("Failed to sync with provided tipset",);
};
} else {
Expand Down Expand Up @@ -353,7 +353,7 @@ where
/// informs the syncer about a new potential tipset
/// This should be called when connecting to new peers, and additionally
/// when receiving new blocks from the network
pub async fn inform_new_head(&mut self, fts: &FullTipset) -> Result<(), Error> {
pub async fn inform_new_head(&mut self, peer: PeerId, fts: &FullTipset) -> Result<(), Error> {
// check if full block is nil and if so return error
if fts.blocks().is_empty() {
return Err(Error::NoBlocks);
Expand All @@ -374,29 +374,39 @@ where
let target_weight = fts.blocks()[0].header().weight();

if target_weight.gt(&best_weight) {
// initial sync
if self.get_state() == &SyncState::Init {
if let Some(best_target) = self.select_sync_target(fts.tipset()?.clone()).await {
self.sync(&best_target).await?;
return Ok(());
}
}
self.schedule_tipset(Arc::new(fts.tipset()?)).await?;
self.set_peer_head(peer, Arc::new(fts.to_tipset()?)).await?;
}
// incoming tipset from miners does not appear to be better than our best chain, ignoring for now
Ok(())
}
/// Retrieves the heaviest tipset in the sync queue; considered best target head
async fn select_sync_target(&mut self, ts: Tipset) -> Option<Arc<Tipset>> {
let mut heads = Vec::new();
heads.push(ts);

// sort tipsets by epoch
heads.sort_by_key(|header| (header.epoch()));
async fn set_peer_head(&mut self, peer: PeerId, ts: Arc<Tipset>) -> Result<(), Error> {
self.peer_manager
.add_peer(peer, Some(Arc::clone(&ts)))
.await;

// Only update target on initial sync
if self.get_state() == &SyncState::Init {
if let Some(best_target) = self.select_sync_target().await {
// TODO revisit this if using for full node, shouldn't start syncing on first update
self.sync(&best_target).await?;
return Ok(());
}
}
self.schedule_tipset(ts).await?;

Ok(())
}

/// Retrieves the heaviest tipset in the sync queue; considered best target head
async fn select_sync_target(&mut self) -> Option<Arc<Tipset>> {
// Retrieve all peer heads from peer manager
let mut heads = self.peer_manager.get_peer_heads().await;
heads.sort_by_key(|h| h.epoch());

// insert tipsets into sync queue
for tip in heads {
self.sync_queue.insert(Arc::new(tip));
self.sync_queue.insert(tip);
}

if self.sync_queue.buckets().len() > 1 {
Expand Down Expand Up @@ -549,14 +559,13 @@ where
));
}
// check msgs for validity
fn check_msg<M, ST>(
fn check_msg<M, DB: BlockStore>(
msg: &M,
msg_meta_data: &mut HashMap<Address, MsgMetaData>,
tree: &ST,
tree: &StateTree<DB>,
) -> Result<(), Error>
where
M: Message,
ST: StateTree,
{
let updated_state: MsgMetaData = match msg_meta_data.get(msg.from()) {
// address is present begin validity checks
Expand Down Expand Up @@ -600,7 +609,7 @@ where
let mut msg_meta_data: HashMap<Address, MsgMetaData> = HashMap::default();
// TODO retrieve tipset state and load state tree
// temporary
let tree = HamtStateTree::new(self.chain_store.db.as_ref());
let tree = StateTree::new(self.chain_store.db.as_ref());
// loop through bls messages and check msg validity
for m in block.bls_msgs() {
check_msg(m, &mut msg_meta_data, &tree)?;
Expand Down Expand Up @@ -631,11 +640,10 @@ where
return Err(Error::Validation("Signature is nil in header".to_owned()));
}

let base_tipset = self.load_fts(&TipSetKeys::new(header.parents().cids.clone()))?;
let parent_tipset = base_tipset.tipset()?;
let parent_tipset = self.chain_store.tipset_from_keys(header.parents())?;

// time stamp checks
header.validate_timestamps(&base_tipset)?;
header.validate_timestamps(&parent_tipset)?;

// check messages to ensure valid state transitions
self.check_block_msgs(block.clone(), &parent_tipset)?;
Expand Down Expand Up @@ -671,7 +679,7 @@ where
}
/// validates tipsets and adds header data to tipset tracker
fn validate_tipsets(&mut self, fts: FullTipset) -> Result<(), Error> {
if fts.tipset()? == self.genesis {
if fts.to_tipset()? == self.genesis {
return Ok(());
}

Expand Down Expand Up @@ -940,40 +948,14 @@ mod tests {
let to = construct_tipset(1, 10);

task::block_on(async move {
cs.peer_manager.add_peer(source.clone()).await;
cs.peer_manager.add_peer(source.clone(), None).await;
assert_eq!(cs.peer_manager.len().await, 1);

let return_set = cs.sync_headers_reverse(head, &to).await;
assert_eq!(return_set.unwrap().len(), 4);
});
}

#[test]
fn select_target_test() {
let ts_1 = construct_tipset(1, 5);
let ts_2 = construct_tipset(2, 10);
let ts_3 = construct_tipset(3, 7);

let db = Arc::new(MemoryDB::default());
let mut chain_store = ChainStore::new(db);
let gen_header = dummy_header();
chain_store.set_genesis(gen_header).unwrap();

let mut cs = chain_syncer_setup(chain_store);

task::spawn(async move {
assert_eq!(
cs.select_sync_target(ts_1.clone()).await.unwrap(),
Arc::new(ts_1)
);
assert_eq!(
cs.select_sync_target(ts_2.clone()).await.unwrap(),
Arc::new(ts_2.clone())
);
assert_eq!(cs.select_sync_target(ts_3).await.unwrap(), Arc::new(ts_2));
});
}

#[test]
fn compute_msg_data_given_msgs_test() {
let (bls, secp) = construct_messages();
Expand Down
5 changes: 4 additions & 1 deletion blockchain/state_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ state_tree = { path = "../../vm/state_tree/" }
default_runtime = { path = "../../vm/default_runtime/" }
blockstore = { package = "ipld_blockstore", path = "../../ipld/blockstore/" }
forest_blocks = { path = "../../blockchain/blocks" }
thiserror = "1.0"
thiserror = "1.0"
interpreter = { path = "../../vm/interpreter/" }
runtime = { path = "../../vm/runtime/" }
ipld_amt = { path = "../../ipld/amt/" }
3 changes: 3 additions & 0 deletions blockchain/state_manager/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ pub enum Error {
/// Error orginating from state
#[error("{0}")]
State(String),
/// Error from VM execution
#[error("{0}")]
VM(String),
/// Actor for given address not found
#[error("Actor for address: {0} does not exist")]
ActorNotFound(String),
Expand Down
Loading

0 comments on commit 5355acd

Please sign in to comment.