From bc2945cad7f672cc184cef9ac2eb2f4f16babcbb Mon Sep 17 00:00:00 2001 From: Guanghua Guo Date: Tue, 11 Sep 2018 15:50:17 +0800 Subject: [PATCH] Fix some network and consensus modules bug (#23) * Trim network module * Format code and add client import stream * Fix block import notify network * Fix validator AUTHORITY roles --- Cargo.lock | 1 - consensus/src/evaluation.rs | 4 +- consensus/src/lib.rs | 52 +++++++++++----------- consensus/src/service.rs | 13 +++--- network/Cargo.toml | 15 +++---- network/src/consensus.rs | 17 +++----- network/src/lib.rs | 87 ++++++++----------------------------- runtime/src/lib.rs | 1 - src/main.rs | 21 ++++++++- src/network.rs | 9 +++- 10 files changed, 92 insertions(+), 128 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d6e3b0107e28..7cfbd8905d255 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -314,7 +314,6 @@ dependencies = [ "ed25519 0.1.0 (git+https://github.com/chainx-org/substrate)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-bft 0.1.0 (git+https://github.com/chainx-org/substrate)", "substrate-codec 0.1.0 (git+https://github.com/chainx-org/substrate)", diff --git a/consensus/src/evaluation.rs b/consensus/src/evaluation.rs index 012426dfbd77a..c185b87e6f26d 100644 --- a/consensus/src/evaluation.rs +++ b/consensus/src/evaluation.rs @@ -1,12 +1,12 @@ // Copyright 2018 Chainpool. //! ChainX block evaluation and evaluation errors. +use chainx_runtime::{Block as ChainXGenericBlock, CheckedBlock}; +use chainx_primitives::{Block, Hash, BlockNumber, Timestamp}; use super::MAX_TRANSACTIONS_SIZE; use codec::{Decode, Encode}; -use chainx_runtime::{Block as ChainXGenericBlock, CheckedBlock}; -use chainx_primitives::{Block, Hash, BlockNumber, Timestamp}; error_chain! { links { diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 9549e8e9ea110..b7aa2bfd9dd90 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -1,59 +1,59 @@ // Copyright 2018 Chainpool. -extern crate substrate_bft as bft; -extern crate substrate_codec as codec; -extern crate substrate_primitives as primitives; -extern crate substrate_runtime_support as runtime_support; + extern crate substrate_runtime_primitives as runtime_primitives; +extern crate substrate_runtime_support as runtime_support; +extern crate substrate_primitives as primitives; extern crate substrate_client as client; -extern crate substrate_network; +extern crate substrate_codec as codec; extern crate substrate_extrinsic_pool; +extern crate substrate_bft as bft; +extern crate substrate_network; -extern crate chainx_runtime; extern crate chainx_primitives; -extern crate chainx_api; +extern crate chainx_runtime; extern crate chainx_pool; +extern crate chainx_api; -extern crate exit_future; -extern crate tokio; extern crate rhododendron; +extern crate exit_future; +extern crate parking_lot; #[macro_use] extern crate error_chain; #[macro_use] extern crate futures; +extern crate ed25519; +extern crate tokio; #[macro_use] extern crate log; -extern crate ed25519; -extern crate parking_lot; -mod evaluation; -mod error; +mod dynamic_inclusion; mod offline_tracker; +mod evaluation; mod service; -mod dynamic_inclusion; +mod error; -use std::sync::Arc; +use tokio::timer::{Delay, Interval}; use std::time::{Duration, Instant}; +use tokio::runtime::TaskExecutor; +use parking_lot::RwLock; +use futures::prelude::*; +use futures::future; +use std::sync::Arc; use codec::{Decode, Encode}; use primitives::AuthorityId; -use tokio::runtime::TaskExecutor; -use tokio::timer::{Delay, Interval}; -use chainx_primitives::{CandidateReceipt, BlockId, Hash, Block, Header, AccountId, BlockNumber, - Timestamp, SessionKey}; -use chainx_api::ChainXApi; -use futures::prelude::*; -use futures::future; -use parking_lot::RwLock; +use chainx_primitives::{CandidateReceipt, BlockId, Hash, Block, Header, AccountId, BlockNumber, Timestamp, SessionKey}; +use chainx_api::ChainXApi; -pub use self::error::{ErrorKind, Error}; pub use self::offline_tracker::OfflineTracker; -pub use service::Service; use dynamic_inclusion::DynamicInclusion; +pub use self::error::{ErrorKind, Error}; +pub use service::Service; +pub type TransactionPool = substrate_extrinsic_pool::Pool; /// Shared offline validator tracker. pub type SharedOfflineTracker = Arc>; -pub type TransactionPool = substrate_extrinsic_pool::Pool; // block size limit. const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024; diff --git a/consensus/src/service.rs b/consensus/src/service.rs index 6bd9fc77e8d82..5a3065010d32f 100644 --- a/consensus/src/service.rs +++ b/consensus/src/service.rs @@ -1,30 +1,29 @@ // Copyright 2018 Chainpool. //! Consensus service. - /// Consensus service. A long running service that manages BFT agreement over the network. /// /// This uses a handle to an underlying thread pool to dispatch heavy work /// such as candidate verification while performing event-driven work /// on a local event loop. -use std::thread; use std::time::{Duration, Instant}; use std::sync::Arc; +use std::thread; -use bft::{self, BftService}; use client::{BlockchainEvents, ChainHead, BlockBody}; +use super::{Network, ProposerFactory}; use ed25519; -use futures::prelude::*; +use error; use tokio::executor::current_thread::TaskExecutor as LocalThreadHandle; -use tokio::runtime::TaskExecutor as ThreadPoolHandle; use tokio::runtime::current_thread::Runtime as LocalRuntime; +use tokio::runtime::TaskExecutor as ThreadPoolHandle; use tokio::timer::{Delay, Interval}; +use futures::prelude::*; -use super::{Network, ProposerFactory}; -use error; use chainx_primitives::{Block, Header}; +use bft::{self, BftService}; use chainx_api::ChainXApi; use TransactionPool; diff --git a/network/Cargo.toml b/network/Cargo.toml index 78c50a05ee2b4..ed9cc03114795 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -4,17 +4,16 @@ version = "0.1.0" authors = ["Chainpool "] [dependencies] -parking_lot = "0.4" -chainx-api = { path = "../api" } -chainx-consensus = { path = "../consensus" } -chainx-primitives = { path = "../primitives" } -substrate-bft = { git = "https://github.com/chainx-org/substrate" } -substrate-codec = { git = "https://github.com/chainx-org/substrate" } substrate-codec-derive = { git = "https://github.com/chainx-org/substrate" } -substrate-network = { git = "https://github.com/chainx-org/substrate" } substrate-primitives = { git = "https://github.com/chainx-org/substrate" } +substrate-network = { git = "https://github.com/chainx-org/substrate" } +substrate-codec = { git = "https://github.com/chainx-org/substrate" } +substrate-bft = { git = "https://github.com/chainx-org/substrate" } +chainx-primitives = { path = "../primitives" } +chainx-consensus = { path = "../consensus" } +chainx-api = { path = "../api" } ed25519 = { git = "https://github.com/chainx-org/substrate" } +rhododendron = "0.3" futures = "0.1" tokio = "0.1.7" log = "0.4" -rhododendron = "0.3" diff --git a/network/src/consensus.rs b/network/src/consensus.rs index 81ae11041cada..1edb463a9a2de 100644 --- a/network/src/consensus.rs +++ b/network/src/consensus.rs @@ -4,23 +4,21 @@ //! This fulfills the `chainx_consensus::Network` trait, providing a hook to be called //! each time consensus begins on a new chain head. -use bft; -use ed25519; use substrate_network::{self as net, generic_message as msg}; use substrate_network::consensus_gossip::ConsensusMessage; + +use chainx_primitives::{Block, Hash, SessionKey}; use chainx_api::ChainXApi; use chainx_consensus::Network; -use chainx_primitives::{Block, Hash, SessionKey}; +use tokio::runtime::TaskExecutor; use futures::prelude::*; use futures::sync::mpsc; - use std::sync::Arc; +use ed25519; +use bft; -use tokio::runtime::TaskExecutor; -use parking_lot::Mutex; - -use super::{NetworkService, Knowledge, CurrentConsensus}; +use super::{NetworkService, CurrentConsensus}; /// Sink for output BFT messages. pub struct BftSink { @@ -287,15 +285,12 @@ impl Network for ConsensusNetwork

{ let (bft_send, bft_recv) = mpsc::unbounded(); - let knowledge = Arc::new(Mutex::new(Knowledge::new())); - let local_session_key = key.public().into(); let local_id = key.public().into(); let process_task = self.network.with_spec(|spec, ctx| { spec.new_consensus( ctx, CurrentConsensus { - knowledge, parent_hash, local_session_key, }, diff --git a/network/src/lib.rs b/network/src/lib.rs index 629cc5b522865..00e8d71502584 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -2,42 +2,39 @@ //! chainx-specific network implementation. //! -//! This manages gossip of consensus messages for BFT, communication between validators, +//! This manages gossip of consensus messages for BFT, communication between validators //! and more. -extern crate substrate_bft as bft; extern crate substrate_codec as codec; -extern crate substrate_network; +#[macro_use] +extern crate substrate_codec_derive; extern crate substrate_primitives; +extern crate substrate_bft as bft; +extern crate substrate_network; -extern crate chainx_api; -extern crate chainx_consensus; extern crate chainx_primitives; +extern crate chainx_consensus; +extern crate chainx_api; +extern crate rhododendron; extern crate ed25519; extern crate futures; -extern crate parking_lot; extern crate tokio; -extern crate rhododendron; - #[macro_use] extern crate log; -#[macro_use] -extern crate substrate_codec_derive; pub mod consensus; -use codec::Decode; -use parking_lot::Mutex; -use chainx_primitives::{Block, SessionKey, Hash, Header}; -use substrate_network::{NodeIndex, Context, Severity}; +use substrate_network::StatusMessage as GenericFullStatus; use substrate_network::consensus_gossip::ConsensusGossip; -use substrate_network::{message, generic_message}; +use substrate_network::{NodeIndex, Context, Severity}; use substrate_network::specialization::Specialization; -use substrate_network::StatusMessage as GenericFullStatus; +use substrate_network::{message, generic_message}; +use codec::Decode; + +use chainx_primitives::{Block, SessionKey, Hash, Header}; use std::collections::HashMap; -use std::sync::Arc; /// ChainX protocol id. pub const CHAINX_PROTOCOL_ID: substrate_network::ProtocolId = *b"pcx"; @@ -52,30 +49,7 @@ struct PeerInfo { claimed_validator: bool, } -#[derive(Default)] -struct KnowledgeEntry { - _knows_block_data: Vec, -} - -/// Tracks knowledge of peers. -struct Knowledge { - _candidates: HashMap, -} - -impl Knowledge { - pub fn new() -> Self { - Knowledge { _candidates: HashMap::new() } - } - - /* - fn note_candidate(&mut self, hash: Hash) { - let _entry = self.candidates.entry(hash).or_insert_with(Default::default); - } -*/ -} - struct CurrentConsensus { - knowledge: Arc>, parent_hash: Hash, local_session_key: SessionKey, } @@ -89,21 +63,12 @@ pub enum Message { SessionKey(SessionKey), } -/* -fn send_chainx_message(ctx: &mut Context, to: NodeIndex, message: Message) { - trace!(target: "c_net", "Sending chainx message to {}: {:?}", to, message); - let encoded = message.encode(); - ctx.send_message(to, generic_message::Message::ChainSpecific(encoded)) -} -*/ - /// ChainX protocol attachment for substrate. pub struct ChainXProtocol { peers: HashMap, consensus_gossip: ConsensusGossip, validators: HashMap, live_consensus: Option, - _next_req_id: u64, } impl ChainXProtocol { @@ -114,7 +79,6 @@ impl ChainXProtocol { consensus_gossip: ConsensusGossip::new(), validators: HashMap::new(), live_consensus: None, - _next_req_id: 1, } } @@ -132,15 +96,6 @@ impl ChainXProtocol { ); } - fn dispatch_pending_requests(&mut self, _ctx: &mut Context) { - let _consensus = match self.live_consensus { - Some(ref mut c) => c, - None => { - return; - } - }; - } - fn on_chainx_message( &mut self, ctx: &mut Context, @@ -177,8 +132,6 @@ impl ChainXProtocol { } self.validators.insert(key, who); } - - self.dispatch_pending_requests(ctx); } } @@ -189,16 +142,14 @@ impl Specialization for ChainXProtocol { fn on_connect(&mut self, ctx: &mut Context, who: NodeIndex, status: FullStatus) { let validator = status.roles.contains(substrate_network::Roles::AUTHORITY); - let _send_key = validator; - let mut peer_info = PeerInfo { + let peer_info = PeerInfo { validator_key: None, claimed_validator: validator, }; self.peers.insert(who, peer_info); self.consensus_gossip.new_peer(ctx, who, status.roles); - self.dispatch_pending_requests(ctx); } fn on_disconnect(&mut self, ctx: &mut Context, who: NodeIndex) { @@ -208,7 +159,6 @@ impl Specialization for ChainXProtocol { } self.consensus_gossip.peer_disconnected(ctx, who); - self.dispatch_pending_requests(ctx); } } @@ -244,10 +194,11 @@ impl Specialization for ChainXProtocol { self.consensus_gossip.abort(); } - fn maintain_peers(&mut self, ctx: &mut Context) { + fn maintain_peers(&mut self, _ctx: &mut Context) { self.consensus_gossip.collect_garbage(None); - self.dispatch_pending_requests(ctx); } - fn on_block_imported(&mut self, _ctx: &mut Context, _hash: Hash, _header: &Header) {} + fn on_block_imported(&mut self, _ctx: &mut Context, hash: Hash, header: &Header) { + info!("on_block_imported number:{:?}, hash:{:?}", header.number, hash); + } } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index ed22db484633e..e924f38ba9009 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -271,6 +271,5 @@ pub mod api { random_seed => |()| super::System::random_seed(), account_nonce => |account| super::System::account_nonce(&account), lookup_address => |address| super::Balances::lookup_address(address) - ); } diff --git a/src/main.rs b/src/main.rs index 5067d9156fb4b..55a1879652119 100644 --- a/src/main.rs +++ b/src/main.rs @@ -41,12 +41,16 @@ mod client; mod cli; mod rpc; +use substrate_client::BlockchainEvents; + use chainx_network::consensus::ConsensusNetwork; use chainx_pool::{TransactionPool, PoolApi}; use chainx_primitives::{Block, Hash}; use cli::ChainSpec; use tokio::runtime::Runtime; +use tokio::prelude::Stream; +use tokio::prelude::Future; use std::sync::Arc; fn main() { @@ -86,9 +90,22 @@ fn main() { client.clone(), )); - let network = network::build_network(port, boot_nodes, client.clone(), extrinsic_pool.clone()); - let validator_mode = matches.subcommand_matches("validator").is_some(); + let network = network::build_network(port, boot_nodes, client.clone(), extrinsic_pool.clone(), validator_mode); + + { + // block notifications + let network = network.clone(); + let events = client.import_notification_stream() + .for_each(move |notification| { + network.on_block_imported(notification.hash, ¬ification.header); + Ok(()) + }) + .select(exit.clone()) + .then(|_| Ok(())); + task_executor.spawn(events); + } + let _consensus = if validator_mode { let key = match matches.subcommand_matches("validator").unwrap().value_of("auth").unwrap_or("alice") { "alice" => { info!("Auth is alice"); ed25519::Pair::from_seed(b"Alice ") }, diff --git a/src/network.rs b/src/network.rs index 22d4327f6ed7b..95c9019028e4d 100644 --- a/src/network.rs +++ b/src/network.rs @@ -4,7 +4,7 @@ use std::net::Ipv4Addr; use std::iter; use Arc; -use substrate_network::{Params, TransactionPool}; +use substrate_network::{Params, TransactionPool, Roles}; use substrate_network_libp2p::AddrComponent; use substrate_network_libp2p; use substrate_network; @@ -18,6 +18,7 @@ pub fn build_network( boot_nodes: Vec, client: Arc, tx_pool: Arc>, + is_validator: bool, ) -> Arc { let mut net_conf = substrate_network_libp2p::NetworkConfiguration::new(); net_conf.listen_addresses = vec![ @@ -26,8 +27,12 @@ pub fn build_network( .collect(), ]; net_conf.boot_nodes = boot_nodes; + let mut config = substrate_network::ProtocolConfig::default(); + if is_validator { + config.roles = Roles::AUTHORITY; + } let param = NetworkParam { - config: substrate_network::ProtocolConfig::default(), + config: config, network_config: net_conf, chain: client, on_demand: None,