diff --git a/Cargo.lock b/Cargo.lock index a16f19fe8169e..52c3e1b875e95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1374,7 +1374,9 @@ dependencies = [ name = "polkadot" version = "0.2.0" dependencies = [ + "ctrlc 1.1.1 (git+https://github.com/paritytech/rust-ctrlc.git)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-cli 0.2.0", ] @@ -1405,7 +1407,6 @@ dependencies = [ "app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "atty 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)", - "ctrlc 1.1.1 (git+https://github.com/paritytech/rust-ctrlc.git)", "ed25519 0.1.0", "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1441,10 +1442,14 @@ dependencies = [ name = "polkadot-collator" version = "0.1.0" dependencies = [ + "ed25519 0.1.0", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "polkadot-parachain 0.1.0", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "polkadot-api 0.1.0", + "polkadot-cli 0.2.0", "polkadot-primitives 0.1.0", "polkadot-runtime 0.1.0", + "substrate-client 0.1.0", "substrate-codec 0.1.0", "substrate-primitives 0.1.0", ] @@ -1460,7 +1465,6 @@ dependencies = [ "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-api 0.1.0", - "polkadot-collator 0.1.0", "polkadot-parachain 0.1.0", "polkadot-primitives 0.1.0", "polkadot-runtime 0.1.0", diff --git a/Cargo.toml b/Cargo.toml index 79317f7e22c16..95abadcc0809c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,8 @@ authors = ["Parity Technologies "] [dependencies] error-chain = "0.12" polkadot-cli = { path = "polkadot/cli" } +futures = "0.1" +ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" } [workspace] members = [ diff --git a/polkadot/cli/Cargo.toml b/polkadot/cli/Cargo.toml index 6c1ecd2b5827f..0d47f2eea76c2 100644 --- a/polkadot/cli/Cargo.toml +++ b/polkadot/cli/Cargo.toml @@ -20,7 +20,6 @@ ed25519 = { path = "../../substrate/ed25519" } app_dirs = "1.2" tokio = "0.1.7" futures = "0.1.17" -ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" } fdlimit = "0.1" parking_lot = "0.4" serde_json = "1.0" diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 6dcddfc2435bb..9d2846b8e524a 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -24,10 +24,9 @@ extern crate atty; extern crate ansi_term; extern crate regex; extern crate time; +extern crate fdlimit; extern crate futures; extern crate tokio; -extern crate ctrlc; -extern crate fdlimit; extern crate ed25519; extern crate triehash; extern crate parking_lot; @@ -66,6 +65,11 @@ mod informant; mod chain_spec; pub use chain_spec::ChainSpec; +pub use client::error::Error as ClientError; +pub use client::backend::Backend as ClientBackend; +pub use state_machine::Backend as StateMachineBackend; +pub use polkadot_primitives::Block as PolkadotBlock; +pub use service::{Components as ServiceComponents, Service}; use std::io::{self, Write, Read, stdin, stdout}; use std::fs::File; @@ -117,6 +121,26 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf { .unwrap_or_else(default_base_path) } +/// Additional worker making use of the node, to run asynchronously before shutdown. +/// +/// This will be invoked with the service and spawn a future that resolves +/// when complete. +pub trait Worker { + /// A future that resolves when the work is done or the node should exit. + /// This will be run on a tokio runtime. + type Work: Future; + + /// An exit scheduled for the future. + type Exit: Future + Send + 'static; + + /// Don't work, but schedule an exit. + fn exit_only(self) -> Self::Exit; + + /// Do work and schedule exit. + fn work(self, service: &Service) -> Self::Work + where ClientError: From<<<::Backend as ClientBackend>::State as StateMachineBackend>::Error>; +} + /// Parse command line arguments and start the node. /// /// IANA unassigned port ranges that we could use: @@ -125,9 +149,10 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf { /// 9556-9591 Unassigned /// 9803-9874 Unassigned /// 9926-9949 Unassigned -pub fn run(args: I) -> error::Result<()> where +pub fn run(args: I, worker: W) -> error::Result<()> where I: IntoIterator, T: Into + Clone, + W: Worker, { let yaml = load_yaml!("./cli.yml"); let matches = match clap::App::from_yaml(yaml).version(&(crate_version!().to_owned() + "\n")[..]).get_matches_from_safe(args) { @@ -154,11 +179,11 @@ pub fn run(args: I) -> error::Result<()> where } if let Some(matches) = matches.subcommand_matches("export-blocks") { - return export_blocks(matches); + return export_blocks(matches, worker.exit_only()); } if let Some(matches) = matches.subcommand_matches("import-blocks") { - return import_blocks(matches); + return import_blocks(matches, worker.exit_only()); } let spec = load_spec(&matches)?; @@ -255,8 +280,8 @@ pub fn run(args: I) -> error::Result<()> where }; match role == service::Role::LIGHT { - true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf)?, - false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf)?, + true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf, worker)?, + false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf, worker)?, } // TODO: hard exit if this stalls? @@ -272,16 +297,19 @@ fn build_spec(matches: &clap::ArgMatches) -> error::Result<()> { Ok(()) } -fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> { +fn export_blocks(matches: &clap::ArgMatches, exit: E) -> error::Result<()> + where E: Future + Send + 'static +{ let base_path = base_path(matches); let spec = load_spec(&matches)?; let mut config = service::Configuration::default_with_spec(spec); config.database_path = db_path(&base_path).to_string_lossy().into(); info!("DB path: {}", config.database_path); let client = service::new_client(config)?; - let (exit_send, exit) = std::sync::mpsc::channel(); - ctrlc::CtrlC::set_handler(move || { - exit_send.clone().send(()).expect("Error sending exit notification"); + let (exit_send, exit_recv) = std::sync::mpsc::channel(); + ::std::thread::spawn(move || { + let _ = exit.wait(); + let _ = exit_send.send(()); }); info!("Exporting blocks"); let mut block: u32 = match matches.value_of("from") { @@ -310,7 +338,7 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> { } loop { - if exit.try_recv().is_ok() { + if exit_recv.try_recv().is_ok() { break; } match client.block(&BlockId::number(block as u64))? { @@ -334,15 +362,19 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> { Ok(()) } -fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> { +fn import_blocks(matches: &clap::ArgMatches, exit: E) -> error::Result<()> + where E: Future + Send + 'static +{ let spec = load_spec(&matches)?; let base_path = base_path(matches); let mut config = service::Configuration::default_with_spec(spec); config.database_path = db_path(&base_path).to_string_lossy().into(); let client = service::new_client(config)?; - let (exit_send, exit) = std::sync::mpsc::channel(); - ctrlc::CtrlC::set_handler(move || { - exit_send.clone().send(()).expect("Error sending exit notification"); + let (exit_send, exit_recv) = std::sync::mpsc::channel(); + + ::std::thread::spawn(move || { + let _ = exit.wait(); + let _ = exit_send.send(()); }); let mut file: Box = match matches.value_of("INPUT") { @@ -354,7 +386,7 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> { let count: u32 = Slicable::decode(&mut file).ok_or("Error reading file")?; let mut block = 0; for _ in 0 .. count { - if exit.try_recv().is_ok() { + if exit_recv.try_recv().is_ok() { break; } match SignedBlock::decode(&mut file) { @@ -377,27 +409,19 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> { Ok(()) } -fn run_until_exit(runtime: &mut Runtime, service: service::Service, matches: &clap::ArgMatches, sys_conf: SystemConfiguration) -> error::Result<()> +fn run_until_exit( + runtime: &mut Runtime, + service: service::Service, + matches: &clap::ArgMatches, + sys_conf: SystemConfiguration, + worker: W, +) -> error::Result<()> where C: service::Components, + W: Worker, client::error::Error: From<<<::Backend as client::backend::Backend>::State as state_machine::Backend>::Error>, { - let exit = { - let (exit_send, exit) = exit_future::signal(); - let exit_send = ::std::cell::RefCell::new(Some(exit_send)); - ctrlc::CtrlC::set_handler(move || { - let exit_send = exit_send - .try_borrow_mut() - .expect("only borrowed in non-reetrant signal handler; qed") - .take(); - - if let Some(signal) = exit_send { - signal.fire(); - } - }); - - exit - }; + let (exit_send, exit) = exit_future::signal(); let executor = runtime.executor(); informant::start(&service, exit.clone(), executor.clone()); @@ -422,7 +446,8 @@ fn run_until_exit(runtime: &mut Runtime, service: service::Service, matche ) }; - let _ = exit.wait(); + let _ = worker.work(&service).wait(); + exit_send.fire(); Ok(()) } diff --git a/polkadot/collator/Cargo.toml b/polkadot/collator/Cargo.toml index 78e4823d20942..c91b6a5d24166 100644 --- a/polkadot/collator/Cargo.toml +++ b/polkadot/collator/Cargo.toml @@ -2,12 +2,16 @@ name = "polkadot-collator" version = "0.1.0" authors = ["Parity Technologies "] -description = "Abstract collation logic" +description = "Collator node implementation" [dependencies] futures = "0.1.17" +substrate-client = { path = "../../substrate/client" } substrate-codec = { path = "../../substrate/codec", version = "0.1" } substrate-primitives = { path = "../../substrate/primitives", version = "0.1" } +polkadot-api = { path = "../api" } polkadot-runtime = { path = "../runtime", version = "0.1" } polkadot-primitives = { path = "../primitives", version = "0.1" } -polkadot-parachain = { path = "../parachain", version = "0.1" } +polkadot-cli = { path = "../cli" } +log = "0.4" +ed25519 = { path = "../../substrate/ed25519" } diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 94fda9ceda3ec..f7557f353e195 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! Collation Logic. +//! Collation node logic. //! //! A collator node lives on a distinct parachain and submits a proposal for //! a state transition, along with a proof for its validity @@ -28,7 +28,7 @@ //! destination B as egress(X)[A -> B] //! //! On every block, each parachain will be intended to route messages from some -//! subset of all the other parachains. +//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3) //! //! Since the egress information is unique to every block, when routing from a //! parachain a collator must gather all egress posts from that parachain @@ -45,25 +45,41 @@ //! to be performed, as the collation logic itself. extern crate futures; +extern crate substrate_client as client; extern crate substrate_codec as codec; extern crate substrate_primitives as primitives; +extern crate ed25519; + +extern crate polkadot_api; +extern crate polkadot_cli; extern crate polkadot_runtime; extern crate polkadot_primitives; +#[macro_use] +extern crate log; + use std::collections::{BTreeSet, BTreeMap}; +use std::sync::Arc; -use futures::{stream, Stream, Future, IntoFuture}; -use polkadot_primitives::parachain::{self, CandidateSignature, ConsolidatedIngress, Message, Id as ParaId}; +use futures::{future, stream, Stream, Future, IntoFuture}; +use client::BlockchainEvents; +use polkadot_api::PolkadotApi; +use polkadot_primitives::BlockId; +use polkadot_primitives::parachain::{self, BlockData, HeadData, ConsolidatedIngress, Collation, Message, Id as ParaId}; +use polkadot_cli::{ClientError, ServiceComponents, ClientBackend, PolkadotBlock, StateMachineBackend, Service}; +use polkadot_cli::Worker; /// Parachain context needed for collation. /// /// This can be implemented through an externally attached service or a stub. -pub trait ParachainContext { - /// Produce a candidate, given the latest ingress queue information. +/// This is expected to be a lightweight, shared type like an Arc. +pub trait ParachainContext: Clone { + /// Produce a candidate, given the latest ingress queue information and the last parachain head. fn produce_candidate>( &self, + last_head: HeadData, ingress: I, - ) -> (parachain::BlockData, polkadot_primitives::AccountId, CandidateSignature); + ) -> (BlockData, HeadData); } /// Relay chain context needed to collate. @@ -120,29 +136,145 @@ pub fn collate_ingress<'a, R>(relay_context: R) .map(ConsolidatedIngress) } -/// Produce a candidate for the parachain. -pub fn collate<'a, R: 'a, P>(local_id: ParaId, relay_context: R, para_context: P) - -> impl Future + 'a +/// Produce a candidate for the parachain, with given contexts, parent head, and signing key. +pub fn collate<'a, R, P>( + local_id: ParaId, + last_head: HeadData, + relay_context: R, + para_context: P, + key: Arc, +) + -> impl Future + 'a where - R: RelayChainContext, - R::Error: 'a, + R: RelayChainContext + 'a, + R::Error: 'a, R::FutureEgress: 'a, P: ParachainContext + 'a, { collate_ingress(relay_context).map(move |ingress| { - let (block_data, _, signature) = para_context.produce_candidate( + let (block_data, head_data) = para_context.produce_candidate( + last_head, ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg))) ); - parachain::Candidate { + let signature = key.sign(&block_data.0[..]).into(); + let pubkey_bytes: [u8; 32] = key.public().into(); + + let receipt = parachain::CandidateReceipt { parachain_index: local_id, - collator_signature: signature, - block: block_data, - unprocessed_ingress: ingress, + collator: pubkey_bytes.into(), + signature, + head_data, + balance_uploads: Vec::new(), + egress_queue_roots: Vec::new(), + fees: 0, + block_data_hash: block_data.hash(), + }; + + parachain::Collation { + receipt, + block_data, } }) } +/// Polkadot-api context. +struct ApiContext; + +impl RelayChainContext for ApiContext { + type Error = (); + type FutureEgress = Result>, Self::Error>; + + fn routing_parachains(&self) -> BTreeSet { + BTreeSet::new() + } + + fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress { + Ok(Vec::new()) + } +} + +struct CollationNode { + parachain_context: P, + exit: E, + para_id: ParaId, + key: Arc, +} + +impl Worker for CollationNode where + P: ParachainContext + 'static, + E: Future + Send + 'static +{ + type Work = Box>; + type Exit = E; + + fn exit_only(self) -> Self::Exit { + self.exit + } + + fn work(self, service: &Service) -> Self::Work + where ClientError: From<<<::Backend as ClientBackend>::State as StateMachineBackend>::Error>, + { + let CollationNode { parachain_context, exit, para_id, key } = self; + let client = service.client(); + let api = service.api(); + + let work = client.import_notification_stream() + .and_then(move |notification| { + let id = BlockId::hash(notification.hash); + + match api.parachain_head(&id, para_id) { + Ok(Some(last_head)) => { + let collation_work = collate( + para_id, + HeadData(last_head), + ApiContext, + parachain_context.clone(), + key.clone(), + ).map(Some); + + future::Either::A(collation_work) + } + Ok(None) => { + info!("Parachain {:?} appears to be inactive. Cannot collate.", id); + future::Either::B(future::ok(None)) + } + Err(e) => { + warn!("Could not collate for parachain {:?}: {:?}", id, e); + future::Either::B(future::ok(None)) // returning error would shut down the collation node + } + } + }) + .for_each(|_collation: Option| { + // TODO: import into network. + Ok(()) + }); + + let work_and_exit = work.select(exit).then(|_| Ok(())); + Box::new(work_and_exit) as Box<_> + } +} + +/// Run a collator node with the given `RelayChainContext` and `ParachainContext` and +/// arguments to the underlying polkadot node. +/// +/// Provide a future which resolves when the node should exit. +/// This function blocks until done. +pub fn run_collator( + parachain_context: P, + para_id: ParaId, + exit: E, + key: Arc, + args: Vec<::std::ffi::OsString> +) -> polkadot_cli::error::Result<()> where + P: ParachainContext + 'static, + E: IntoFuture, + E::Future: Send + 'static, +{ + let node_logic = CollationNode { parachain_context, exit: exit.into_future(), para_id, key }; + polkadot_cli::run(args, node_logic) +} + #[cfg(test)] mod tests { use super::*; @@ -170,7 +302,7 @@ mod tests { } } - #[test] + #[test] fn collates_ingress() { let route_from = |x: &[ParaId]| { let mut set = BTreeSet::new(); diff --git a/polkadot/consensus/Cargo.toml b/polkadot/consensus/Cargo.toml index 91501f2692ee9..5af40035e2e50 100644 --- a/polkadot/consensus/Cargo.toml +++ b/polkadot/consensus/Cargo.toml @@ -12,7 +12,6 @@ error-chain = "0.12" log = "0.3" exit-future = "0.1" polkadot-api = { path = "../api" } -polkadot-collator = { path = "../collator" } polkadot-parachain = { path = "../parachain" } polkadot-primitives = { path = "../primitives" } polkadot-runtime = { path = "../runtime" } diff --git a/polkadot/consensus/src/collation.rs b/polkadot/consensus/src/collation.rs index d2cd297ce3ac8..db490a0eb17d8 100644 --- a/polkadot/consensus/src/collation.rs +++ b/polkadot/consensus/src/collation.rs @@ -23,18 +23,10 @@ use std::sync::Arc; use polkadot_api::PolkadotApi; use polkadot_primitives::{Hash, AccountId, BlockId}; -use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt}; +use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic}; use futures::prelude::*; -/// A full collation. -pub struct Collation { - /// Block data. - pub block_data: BlockData, - /// The candidate receipt itself. - pub receipt: CandidateReceipt, -} - /// Encapsulates connections to collators and allows collation on any parachain. /// /// This is expected to be a lightweight, shared type like an `Arc`. diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs index 2fabf48088fb7..9db60b3797b56 100644 --- a/polkadot/consensus/src/lib.rs +++ b/polkadot/consensus/src/lib.rs @@ -32,7 +32,6 @@ extern crate ed25519; extern crate parking_lot; extern crate polkadot_api; -extern crate polkadot_collator as collator; extern crate polkadot_statement_table as table; extern crate polkadot_parachain as parachain; extern crate polkadot_transaction_pool as transaction_pool; @@ -79,7 +78,7 @@ use futures::future; use collation::CollationFetch; use dynamic_inclusion::DynamicInclusion; -pub use self::collation::{validate_collation, Collators, Collation}; +pub use self::collation::{validate_collation, Collators}; pub use self::error::{ErrorKind, Error}; pub use self::shared_table::{SharedTable, StatementProducer, ProducedStatements, Statement, SignedStatement, GenericStatement}; pub use service::Service; diff --git a/polkadot/consensus/src/shared_table/mod.rs b/polkadot/consensus/src/shared_table/mod.rs index 15116c5cfb562..21919dd4a4869 100644 --- a/polkadot/consensus/src/shared_table/mod.rs +++ b/polkadot/consensus/src/shared_table/mod.rs @@ -21,9 +21,8 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use table::{self, Table, Context as TableContextTrait}; -use collation::Collation; use polkadot_primitives::{Hash, SessionKey}; -use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt}; +use polkadot_primitives::parachain::{Id as ParaId, BlockData, Collation, Extrinsic, CandidateReceipt}; use parking_lot::Mutex; use futures::{future, prelude::*}; @@ -470,6 +469,7 @@ mod tests { let candidate = CandidateReceipt { parachain_index: para_id, collator: [1; 32].into(), + signature: Default::default(), head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]), balance_uploads: Vec::new(), egress_queue_roots: Vec::new(), @@ -519,6 +519,7 @@ mod tests { let candidate = CandidateReceipt { parachain_index: para_id, collator: [1; 32].into(), + signature: Default::default(), head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]), balance_uploads: Vec::new(), egress_queue_roots: Vec::new(), diff --git a/polkadot/network/src/consensus.rs b/polkadot/network/src/consensus.rs index b575856739c6a..a2a4950258140 100644 --- a/polkadot/network/src/consensus.rs +++ b/polkadot/network/src/consensus.rs @@ -23,9 +23,9 @@ use ed25519; use substrate_network::{self as net, generic_message as msg}; use substrate_network::consensus_gossip::ConsensusMessage; use polkadot_api::{PolkadotApi, LocalPolkadotApi}; -use polkadot_consensus::{Network, SharedTable, Collators, Collation}; +use polkadot_consensus::{Network, SharedTable, Collators}; use polkadot_primitives::{AccountId, Block, Hash, SessionKey}; -use polkadot_primitives::parachain::Id as ParaId; +use polkadot_primitives::parachain::{Id as ParaId, Collation}; use futures::{future, prelude::*}; use futures::sync::mpsc; diff --git a/polkadot/network/src/tests.rs b/polkadot/network/src/tests.rs index feb83a416e53d..5c5dcf508ec4c 100644 --- a/polkadot/network/src/tests.rs +++ b/polkadot/network/src/tests.rs @@ -22,6 +22,7 @@ use parking_lot::Mutex; use polkadot_consensus::GenericStatement; use polkadot_primitives::{Block, Hash, SessionKey}; use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData}; +use substrate_primitives::H512; use codec::Slicable; use substrate_network::{PeerId, PeerInfo, ClientHandle, Context, message::Message as SubstrateMessage, message::Role, specialization::Specialization, generic_message::Message as GenericMessage}; @@ -144,6 +145,7 @@ fn fetches_from_those_with_knowledge() { parachain_index: 5.into(), collator: [255; 32].into(), head_data: HeadData(vec![9, 9, 9]), + signature: H512::from([1; 64]).into(), balance_uploads: Vec::new(), egress_queue_roots: Vec::new(), fees: 1_000_000, diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs index 79b12aeecd428..dc77669785d04 100644 --- a/polkadot/primitives/src/parachain.rs +++ b/polkadot/primitives/src/parachain.rs @@ -134,26 +134,6 @@ impl Slicable for DutyRoster { #[cfg_attr(feature = "std", serde(deny_unknown_fields))] pub struct Extrinsic; -/// Candidate parachain block. -/// -/// https://github.com/w3f/polkadot-spec/blob/master/spec.md#candidate-para-chain-block -#[derive(PartialEq, Eq, Clone)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] -#[cfg_attr(feature = "std", serde(rename_all = "camelCase"))] -#[cfg_attr(feature = "std", serde(deny_unknown_fields))] -pub struct Candidate { - /// The ID of the parachain this is a proposal for. - pub parachain_index: Id, - /// Collator's signature - pub collator_signature: CandidateSignature, - /// Unprocessed ingress queue. - /// - /// Ordered by parachain ID and block number. - pub unprocessed_ingress: ConsolidatedIngress, - /// Block data - pub block: BlockData, -} - /// Candidate receipt type. #[derive(PartialEq, Eq, Clone)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] @@ -164,6 +144,8 @@ pub struct CandidateReceipt { pub parachain_index: Id, /// The collator's relay-chain account ID pub collator: super::AccountId, + /// Signature on block data by collator. + pub signature: CandidateSignature, /// The head-data pub head_data: HeadData, /// Balance uploads to the relay chain. @@ -182,6 +164,7 @@ impl Slicable for CandidateReceipt { self.parachain_index.using_encoded(|s| v.extend(s)); self.collator.using_encoded(|s| v.extend(s)); + self.signature.using_encoded(|s| v.extend(s)); self.head_data.0.using_encoded(|s| v.extend(s)); self.balance_uploads.using_encoded(|s| v.extend(s)); self.egress_queue_roots.using_encoded(|s| v.extend(s)); @@ -195,6 +178,7 @@ impl Slicable for CandidateReceipt { Some(CandidateReceipt { parachain_index: Slicable::decode(input)?, collator: Slicable::decode(input)?, + signature: Slicable::decode(input)?, head_data: Slicable::decode(input).map(HeadData)?, balance_uploads: Slicable::decode(input)?, egress_queue_roots: Slicable::decode(input)?, @@ -227,6 +211,18 @@ impl Ord for CandidateReceipt { } } +/// A full collation. +#[derive(PartialEq, Eq, Clone)] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] +#[cfg_attr(feature = "std", serde(rename_all = "camelCase"))] +#[cfg_attr(feature = "std", serde(deny_unknown_fields))] +pub struct Collation { + /// Block data. + pub block_data: BlockData, + /// Candidate receipt itself. + pub receipt: CandidateReceipt, +} + /// Parachain ingress queue message. #[derive(PartialEq, Eq, Clone)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 6a613f99f4eac..7ac4f714f2f8d 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -79,6 +79,7 @@ pub use chain_spec::ChainSpec; /// Polkadot service. pub struct Service { client: Arc>, + api: Arc, network: Arc, transaction_pool: Arc>, signal: Option, @@ -213,6 +214,7 @@ impl Service network: network, transaction_pool: transaction_pool, signal: Some(signal), + api: api, _consensus: consensus_service, }) } @@ -222,6 +224,11 @@ impl Service self.client.clone() } + /// Get shared polkadot-api instance. usually the same as the client. + pub fn api(&self) -> Arc { + self.api.clone() + } + /// Get shared network instance. pub fn network(&self) -> Arc { self.network.clone() diff --git a/polkadot/src/main.rs b/polkadot/src/main.rs index 50ff18462e29a..65c87d85f0a2e 100644 --- a/polkadot/src/main.rs +++ b/polkadot/src/main.rs @@ -19,12 +19,47 @@ #![warn(missing_docs)] extern crate polkadot_cli as cli; +extern crate ctrlc; +extern crate futures; #[macro_use] extern crate error_chain; +use cli::{ClientError, ServiceComponents, ClientBackend, PolkadotBlock, StateMachineBackend, Service}; +use futures::sync::oneshot; +use futures::{future, Future}; + +use std::cell::RefCell; + +// the regular polkadot worker simply does nothing until ctrl-c +struct Worker; +impl cli::Worker for Worker { + type Work = Self::Exit; + type Exit = future::MapErr, fn(oneshot::Canceled) -> ()>; + + fn exit_only(self) -> Self::Exit { + // can't use signal directly here because CtrlC takes only `Fn`. + let (exit_send, exit) = oneshot::channel(); + + let exit_send_cell = RefCell::new(Some(exit_send)); + ctrlc::CtrlC::set_handler(move || { + if let Some(exit_send) = exit_send_cell.try_borrow_mut().expect("signal handler not reentrant; qed").take() { + exit_send.send(()).expect("Error sending exit notification"); + } + }); + + exit.map_err(drop) + } + + fn work(self, _service: &Service) -> Self::Exit + where ClientError: From<<<::Backend as ClientBackend>::State as StateMachineBackend>::Error>, + { + self.exit_only() + } +} + quick_main!(run); fn run() -> cli::error::Result<()> { - cli::run(::std::env::args()) + cli::run(::std::env::args(), Worker) }