diff --git a/Cargo.lock b/Cargo.lock index 7b45912e531ba..b796a298c0359 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -922,6 +922,11 @@ name = "libc" version = "0.2.36" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "linked-hash-map" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "local-encoding" version = "0.2.0" @@ -1398,6 +1403,7 @@ dependencies = [ "substrate-network 0.1.0", "substrate-primitives 0.1.0", "substrate-runtime-io 0.1.0", + "substrate-state-machine 0.1.0", "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1952,6 +1958,7 @@ dependencies = [ "ethcore-network 1.12.0 (git+https://github.com/paritytech/parity.git)", "ethcore-network-devp2p 1.12.0 (git+https://github.com/paritytech/parity.git)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2779,6 +2786,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c8f31047daa365f19be14b47c29df4f7c3b581832407daabe6ae77397619237d" "checksum lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a6f08839bc70ef4a3fe1d566d5350f519c5912ea86be0df1740a7d247c7fc0ef" "checksum libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "1e5d97d6708edaa407429faa671b942dc0f2727222fb6b6539bf1db936e4b121" +"checksum linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "70fb39025bc7cdd76305867c4eccf2f2dcf6e9a57f5b21a93e1c2d86cd03ec9e" "checksum local-encoding 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e1ceb20f39ff7ae42f3ff9795f3986b1daad821caaa1e1732a0944103a5a1a66" "checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" "checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2" diff --git a/demo/cli/src/lib.rs b/demo/cli/src/lib.rs index 0330192b5816a..306f26e8885c5 100644 --- a/demo/cli/src/lib.rs +++ b/demo/cli/src/lib.rs @@ -82,56 +82,61 @@ pub fn run(args: I) -> error::Result<()> where // Create client let executor = demo_executor::Executor::new(); - let mut storage = Default::default(); - let god_key = hex!["3d866ec8a9190c8343c2fc593d21d8a6d0c5c4763aaab2349de3a6111d64d124"]; - - let genesis_config = GenesisConfig { - consensus: Some(ConsensusConfig { - code: vec![], // TODO - authorities: vec![god_key.clone()], - }), - system: None, -// block_time: 5, // 5 second block time. - session: Some(SessionConfig { - validators: vec![god_key.clone()], - session_length: 720, // that's 1 hour per session. - }), - staking: Some(StakingConfig { - current_era: 0, - intentions: vec![], - transaction_fee: 100, - balances: vec![(god_key.clone(), 1u64 << 63)].into_iter().collect(), - validator_count: 12, - sessions_per_era: 24, // 24 hours per era. - bonding_duration: 90, // 90 days per bond. - }), - democracy: Some(DemocracyConfig { - launch_period: 120 * 24 * 14, // 2 weeks per public referendum - voting_period: 120 * 24 * 28, // 4 weeks to discuss & vote on an active referendum - minimum_deposit: 1000, // 1000 as the minimum deposit for a referendum - }), - council: Some(CouncilConfig { - active_council: vec![], - candidacy_bond: 1000, // 1000 to become a council candidate - voter_bond: 100, // 100 down to vote for a candidate - present_slash_per_voter: 1, // slash by 1 per voter for an invalid presentation. - carry_count: 24, // carry over the 24 runners-up to the next council election - presentation_duration: 120 * 24, // one day for presenting winners. - approval_voting_period: 7 * 120 * 24, // one week period between possible council elections. - term_duration: 180 * 120 * 24, // 180 day term duration for the council. - desired_seats: 0, // start with no council: we'll raise this once the stake has been dispersed a bit. - inactive_grace_period: 1, // one addition vote should go by before an inactive voter can be reaped. - - cooloff_period: 90 * 120 * 24, // 90 day cooling off period if council member vetoes a proposal. - voting_period: 7 * 120 * 24, // 7 day voting period for council members. - }), - }; - let prepare_genesis = || { - storage = genesis_config.build_externalities(); - let block = genesis::construct_genesis_block(&storage); - (primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) - }; - let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?); + + struct GenesisBuilder; + + impl client::GenesisBuilder for GenesisBuilder { + fn build(self) -> (primitives::Header, Vec<(Vec, Vec)>) { + let god_key = hex!["3d866ec8a9190c8343c2fc593d21d8a6d0c5c4763aaab2349de3a6111d64d124"]; + let genesis_config = GenesisConfig { + consensus: Some(ConsensusConfig { + code: vec![], // TODO + authorities: vec![god_key.clone()], + }), + system: None, + // block_time: 5, // 5 second block time. + session: Some(SessionConfig { + validators: vec![god_key.clone()], + session_length: 720, // that's 1 hour per session. + }), + staking: Some(StakingConfig { + current_era: 0, + intentions: vec![], + transaction_fee: 100, + balances: vec![(god_key.clone(), 1u64 << 63)].into_iter().collect(), + validator_count: 12, + sessions_per_era: 24, // 24 hours per era. + bonding_duration: 90, // 90 days per bond. + }), + democracy: Some(DemocracyConfig { + launch_period: 120 * 24 * 14, // 2 weeks per public referendum + voting_period: 120 * 24 * 28, // 4 weeks to discuss & vote on an active referendum + minimum_deposit: 1000, // 1000 as the minimum deposit for a referendum + }), + council: Some(CouncilConfig { + active_council: vec![], + candidacy_bond: 1000, // 1000 to become a council candidate + voter_bond: 100, // 100 down to vote for a candidate + present_slash_per_voter: 1, // slash by 1 per voter for an invalid presentation. + carry_count: 24, // carry over the 24 runners-up to the next council election + presentation_duration: 120 * 24, // one day for presenting winners. + approval_voting_period: 7 * 120 * 24, // one week period between possible council elections. + term_duration: 180 * 120 * 24, // 180 day term duration for the council. + desired_seats: 0, // start with no council: we'll raise this once the stake has been dispersed a bit. + inactive_grace_period: 1, // one addition vote should go by before an inactive voter can be reaped. + + cooloff_period: 90 * 120 * 24, // 90 day cooling off period if council member vetoes a proposal. + voting_period: 7 * 120 * 24, // 7 day voting period for council members. + }), + }; + + let storage = genesis_config.build_externalities(); + let block = genesis::construct_genesis_block(&storage); + (primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) + } + } + + let client = Arc::new(client::new_in_mem(executor, GenesisBuilder)?); let mut core = ::tokio_core::reactor::Core::new().expect("Unable to spawn event loop."); let _rpc_servers = { diff --git a/polkadot/api/src/full.rs b/polkadot/api/src/full.rs new file mode 100644 index 0000000000000..68c79819e5e89 --- /dev/null +++ b/polkadot/api/src/full.rs @@ -0,0 +1,352 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Strongly typed API for full Polkadot client. + +use client::backend::{Backend, LocalBackend}; +use client::{self, Client, LocalCallExecutor}; +use polkadot_executor::Executor as LocalDispatch; +use substrate_executor::{NativeExecutionDispatch, NativeExecutor}; +use state_machine::{self, OverlayedChanges}; + +use primitives::{AccountId, BlockId, Hash, Index, SessionKey, Timestamp}; +use primitives::parachain::{DutyRoster, CandidateReceipt, Id as ParaId}; +use runtime::{self, Block, Header, UncheckedExtrinsic, Extrinsic, Call, TimestampCall, ParachainsCall}; + +use {CheckedBlockId, BlockBuilder, PolkadotApi, LocalPolkadotApi, ErrorKind, Error, Result}; + +/// A checked block ID used for the substrate-client implementation of CheckedBlockId; +#[derive(Debug, Clone, Copy)] +pub struct CheckedId(pub BlockId); + +impl CheckedBlockId for CheckedId { + fn block_id(&self) -> &BlockId { + &self.0 + } +} + +// set up the necessary scaffolding to execute a set of calls to the runtime. +// this creates a new block on top of the given ID and initialises it. +macro_rules! with_runtime { + ($client: ident, $at: expr, $exec: expr) => {{ + let parent = $at.block_id(); + let header = Header { + parent_hash: $client.block_hash_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))?, + number: $client.block_number_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))? + 1, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + }; + + $client.state_at(parent).map_err(Error::from).and_then(|state| { + let mut changes = Default::default(); + let mut ext = state_machine::Ext::new(&mut changes, &state); + + ::substrate_executor::with_native_environment(&mut ext, || { + ::runtime::Executive::initialise_block(&header); + ($exec)() + }).map_err(Into::into) + }) + }} +} + +/// A polkadot block builder. +#[derive(Debug, Clone)] +pub struct ClientBlockBuilder { + parent: BlockId, + changes: OverlayedChanges, + state: S, + header: Header, + timestamp: Timestamp, + extrinsics: Vec, +} + +impl ClientBlockBuilder + where S::Error: Into +{ + // initialises a block, ready to allow extrinsics to be applied. + fn initialise_block(&mut self) -> Result<()> { + let result = { + let mut ext = state_machine::Ext::new(&mut self.changes, &self.state); + let h = self.header.clone(); + + ::substrate_executor::with_native_environment( + &mut ext, + || runtime::Executive::initialise_block(&h), + ).map_err(Into::into) + }; + + match result { + Ok(_) => { + self.changes.commit_prospective(); + Ok(()) + } + Err(e) => { + self.changes.discard_prospective(); + Err(e) + } + } + } + + // executes a extrinsic, inherent or otherwise, without appending to the list. + fn apply_extrinsic(&mut self, extrinsic: UncheckedExtrinsic) -> Result<()> { + let result = { + let mut ext = state_machine::Ext::new(&mut self.changes, &self.state); + + ::substrate_executor::with_native_environment( + &mut ext, + move || runtime::Executive::apply_extrinsic(extrinsic), + ).map_err(Into::into) + }; + + match result { + Ok(_) => { + self.changes.commit_prospective(); + Ok(()) + } + Err(e) => { + self.changes.discard_prospective(); + Err(e) + } + } + } +} + +impl BlockBuilder for ClientBlockBuilder + where S::Error: Into +{ + fn push_extrinsic(&mut self, extrinsic: UncheckedExtrinsic) -> Result<()> { + // Check that this is not an "inherent" extrinsic. + if extrinsic.signature == Default::default() { + bail!(ErrorKind::PushedInherentTransaction(extrinsic)); + } else { + self.apply_extrinsic(extrinsic.clone())?; + self.extrinsics.push(extrinsic); + Ok(()) + } + } + + fn bake(mut self) -> Block { + let mut ext = state_machine::Ext::new(&mut self.changes, &self.state); + + let final_header = ::substrate_executor::with_native_environment( + &mut ext, + move || runtime::Executive::finalise_block() + ).expect("all inherent extrinsics pushed; all other extrinsics executed correctly; qed"); + Block { + header: final_header, + extrinsics: self.extrinsics, + } + } +} + +impl PolkadotApi for Client>> + where ::client::error::Error: From<<::State as state_machine::backend::Backend>::Error> +{ + type CheckedBlockId = CheckedId; + type BlockBuilder = ClientBlockBuilder; + + fn check_id(&self, id: BlockId) -> Result { + // bail if the code is not the same as the natively linked. + if self.code_at(&id)? != LocalDispatch::native_equivalent() { + bail!("This node is out of date. Block authoring may not work correctly. Bailing.") + } + + Ok(CheckedId(id)) + } + + fn session_keys(&self, at: &CheckedId) -> Result> { + with_runtime!(self, at, ::runtime::Consensus::authorities) + } + + fn validators(&self, at: &CheckedId) -> Result> { + with_runtime!(self, at, ::runtime::Session::validators) + } + + fn random_seed(&self, at: &CheckedId) -> Result { + with_runtime!(self, at, ::runtime::System::random_seed) + } + + fn duty_roster(&self, at: &CheckedId) -> Result { + with_runtime!(self, at, ::runtime::Parachains::calculate_duty_roster) + } + + fn timestamp(&self, at: &CheckedId) -> Result { + with_runtime!(self, at, ::runtime::Timestamp::now) + } + + fn evaluate_block(&self, at: &CheckedId, block: Block) -> Result { + use substrate_executor::error::ErrorKind as ExecErrorKind; + + let res = with_runtime!(self, at, || ::runtime::Executive::execute_block(block)); + match res { + Ok(()) => Ok(true), + Err(err) => match err.kind() { + &ErrorKind::Executor(ExecErrorKind::Runtime) => Ok(false), + _ => Err(err) + } + } + } + + fn index(&self, at: &CheckedId, account: AccountId) -> Result { + with_runtime!(self, at, || ::runtime::System::account_index(account)) + } + + fn active_parachains(&self, at: &CheckedId) -> Result> { + with_runtime!(self, at, ::runtime::Parachains::active_parachains) + } + + fn parachain_code(&self, at: &CheckedId, parachain: ParaId) -> Result>> { + with_runtime!(self, at, || ::runtime::Parachains::parachain_code(parachain)) + } + + fn parachain_head(&self, at: &CheckedId, parachain: ParaId) -> Result>> { + with_runtime!(self, at, || ::runtime::Parachains::parachain_head(parachain)) + } + + fn build_block(&self, parent: &CheckedId, timestamp: Timestamp, parachains: Vec) -> Result { + let parent = parent.block_id(); + let header = Header { + parent_hash: self.block_hash_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))?, + number: self.block_number_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))? + 1, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + }; + + let extrinsics = vec![ + UncheckedExtrinsic { + extrinsic: Extrinsic { + signed: Default::default(), + index: Default::default(), + function: Call::Timestamp(TimestampCall::set(timestamp)), + }, + signature: Default::default(), + }, + UncheckedExtrinsic { + extrinsic: Extrinsic { + signed: Default::default(), + index: Default::default(), + function: Call::Parachains(ParachainsCall::set_heads(parachains)), + }, + signature: Default::default(), + } + ]; + + let mut builder = ClientBlockBuilder { + parent: *parent, + changes: OverlayedChanges::default(), + state: self.state_at(parent)?, + header, + timestamp, + extrinsics: extrinsics.clone(), + }; + + builder.initialise_block()?; + + for inherent in extrinsics { + builder.apply_extrinsic(inherent)?; + } + + Ok(builder) + } +} + +impl LocalPolkadotApi for Client>> + where ::client::error::Error: From<<::State as state_machine::backend::Backend>::Error> +{} + +#[cfg(test)] +mod tests { + use super::*; + use keyring::Keyring; + use codec::Slicable; + use client::{self, LocalCallExecutor}; + use client::in_mem::Backend as InMemory; + use substrate_executor::NativeExecutionDispatch; + use substrate_primitives::{self, Header}; + use runtime::{GenesisConfig, ConsensusConfig, SessionConfig, BuildExternalities}; + + fn validators() -> Vec { + vec![ + Keyring::One.to_raw_public(), + Keyring::Two.to_raw_public(), + ] + } + + fn client() -> Client>> { + struct GenesisBuilder; + + impl client::GenesisBuilder for GenesisBuilder { + fn build(self) -> (Header, Vec<(Vec, Vec)>) { + let genesis_config = GenesisConfig { + consensus: Some(ConsensusConfig { + code: LocalDispatch::native_equivalent().to_vec(), + authorities: validators(), + }), + system: None, + session: Some(SessionConfig { + validators: validators(), + session_length: 100, + }), + council: Some(Default::default()), + democracy: Some(Default::default()), + parachains: Some(Default::default()), + staking: Some(Default::default()), + }; + + let storage = genesis_config.build_externalities(); + let block = ::client::genesis::construct_genesis_block(&storage); + (substrate_primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) + } + } + + ::client::new_in_mem(LocalDispatch::new(), GenesisBuilder).unwrap() + } + + #[test] + fn gets_session_and_validator_keys() { + let client = client(); + let id = client.check_id(BlockId::Number(0)).unwrap(); + assert_eq!(client.session_keys(&id).unwrap(), validators()); + assert_eq!(client.validators(&id).unwrap(), validators()); + } + + #[test] + fn build_block() { + let client = client(); + + let id = client.check_id(BlockId::Number(0)).unwrap(); + let block_builder = client.build_block(&id, 1_000_000, Vec::new()).unwrap(); + let block = block_builder.bake(); + + assert_eq!(block.header.number, 1); + assert!(block.header.extrinsics_root != Default::default()); + } + + #[test] + fn fails_to_check_id_for_unknown_block() { + assert!(client().check_id(BlockId::Number(100)).is_err()); + } + + #[test] + fn gets_random_seed_with_genesis() { + let client = client(); + + let id = client.check_id(BlockId::Number(0)).unwrap(); + assert!(client.random_seed(&id).is_ok()); + } +} diff --git a/polkadot/api/src/lib.rs b/polkadot/api/src/lib.rs index fd24f52766eb8..bef92d4d9f509 100644 --- a/polkadot/api/src/lib.rs +++ b/polkadot/api/src/lib.rs @@ -34,14 +34,12 @@ extern crate error_chain; #[cfg(test)] extern crate substrate_keyring as keyring; -use client::backend::Backend; -use client::Client; -use polkadot_executor::Executor as LocalDispatch; -use substrate_executor::{NativeExecutionDispatch, NativeExecutor}; -use state_machine::OverlayedChanges; +pub mod full; +pub mod light; + use primitives::{AccountId, BlockId, Hash, Index, SessionKey, Timestamp}; use primitives::parachain::{DutyRoster, CandidateReceipt, Id as ParaId}; -use runtime::{Block, Header, UncheckedExtrinsic, Extrinsic, Call, TimestampCall, ParachainsCall}; +use runtime::{Block, UncheckedExtrinsic}; error_chain! { errors { @@ -152,315 +150,8 @@ pub trait PolkadotApi { fn build_block(&self, parent: &Self::CheckedBlockId, timestamp: Timestamp, parachains: Vec) -> Result; } -/// A checked block ID used for the substrate-client implementation of CheckedBlockId; -#[derive(Debug, Clone, Copy)] -pub struct CheckedId(BlockId); - -impl CheckedBlockId for CheckedId { - fn block_id(&self) -> &BlockId { - &self.0 - } -} - -// set up the necessary scaffolding to execute a set of calls to the runtime. -// this creates a new block on top of the given ID and initialises it. -macro_rules! with_runtime { - ($client: ident, $at: expr, $exec: expr) => {{ - let parent = $at.block_id(); - let header = Header { - parent_hash: $client.block_hash_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))?, - number: $client.block_number_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))? + 1, - state_root: Default::default(), - extrinsics_root: Default::default(), - digest: Default::default(), - }; - - $client.state_at(parent).map_err(Error::from).and_then(|state| { - let mut changes = Default::default(); - let mut ext = state_machine::Ext::new(&mut changes, &state); - - ::substrate_executor::with_native_environment(&mut ext, || { - ::runtime::Executive::initialise_block(&header); - ($exec)() - }).map_err(Into::into) - }) - }} -} - -impl PolkadotApi for Client> - where ::client::error::Error: From<<::State as state_machine::backend::Backend>::Error> -{ - type CheckedBlockId = CheckedId; - type BlockBuilder = ClientBlockBuilder; - - fn check_id(&self, id: BlockId) -> Result { - // bail if the code is not the same as the natively linked. - if self.code_at(&id)? != LocalDispatch::native_equivalent() { - bail!("This node is out of date. Block authoring may not work correctly. Bailing.") - } - - Ok(CheckedId(id)) - } - - fn session_keys(&self, at: &CheckedId) -> Result> { - with_runtime!(self, at, ::runtime::Consensus::authorities) - } - - fn validators(&self, at: &CheckedId) -> Result> { - with_runtime!(self, at, ::runtime::Session::validators) - } - - fn random_seed(&self, at: &CheckedId) -> Result { - with_runtime!(self, at, ::runtime::System::random_seed) - } - - fn duty_roster(&self, at: &CheckedId) -> Result { - with_runtime!(self, at, ::runtime::Parachains::calculate_duty_roster) - } - - fn timestamp(&self, at: &CheckedId) -> Result { - with_runtime!(self, at, ::runtime::Timestamp::now) - } - - fn evaluate_block(&self, at: &CheckedId, block: Block) -> Result { - use substrate_executor::error::ErrorKind as ExecErrorKind; - - let res = with_runtime!(self, at, || ::runtime::Executive::execute_block(block)); - match res { - Ok(()) => Ok(true), - Err(err) => match err.kind() { - &ErrorKind::Executor(ExecErrorKind::Runtime) => Ok(false), - _ => Err(err) - } - } - } - - fn index(&self, at: &CheckedId, account: AccountId) -> Result { - with_runtime!(self, at, || ::runtime::System::account_index(account)) - } - - fn active_parachains(&self, at: &CheckedId) -> Result> { - with_runtime!(self, at, ::runtime::Parachains::active_parachains) - } - - fn parachain_code(&self, at: &CheckedId, parachain: ParaId) -> Result>> { - with_runtime!(self, at, || ::runtime::Parachains::parachain_code(parachain)) - } - - fn parachain_head(&self, at: &CheckedId, parachain: ParaId) -> Result>> { - with_runtime!(self, at, || ::runtime::Parachains::parachain_head(parachain)) - } - - fn build_block(&self, parent: &CheckedId, timestamp: Timestamp, parachains: Vec) -> Result { - let parent = parent.block_id(); - let header = Header { - parent_hash: self.block_hash_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))?, - number: self.block_number_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))? + 1, - state_root: Default::default(), - extrinsics_root: Default::default(), - digest: Default::default(), - }; - - let extrinsics = vec![ - UncheckedExtrinsic { - extrinsic: Extrinsic { - signed: Default::default(), - index: Default::default(), - function: Call::Timestamp(TimestampCall::set(timestamp)), - }, - signature: Default::default(), - }, - UncheckedExtrinsic { - extrinsic: Extrinsic { - signed: Default::default(), - index: Default::default(), - function: Call::Parachains(ParachainsCall::set_heads(parachains)), - }, - signature: Default::default(), - } - ]; - - let mut builder = ClientBlockBuilder { - parent: *parent, - changes: OverlayedChanges::default(), - state: self.state_at(parent)?, - header, - timestamp, - extrinsics: extrinsics.clone(), - }; - - builder.initialise_block()?; - - for inherent in extrinsics { - builder.apply_extrinsic(inherent)?; - } - - Ok(builder) - } -} - -/// A polkadot block builder. -#[derive(Debug, Clone)] -pub struct ClientBlockBuilder { - parent: BlockId, - changes: OverlayedChanges, - state: S, - header: Header, - timestamp: Timestamp, - extrinsics: Vec, -} - -impl ClientBlockBuilder - where S::Error: Into -{ - // initialises a block, ready to allow extrinsics to be applied. - fn initialise_block(&mut self) -> Result<()> { - let result = { - let mut ext = state_machine::Ext::new(&mut self.changes, &self.state); - let h = self.header.clone(); - - ::substrate_executor::with_native_environment( - &mut ext, - || runtime::Executive::initialise_block(&h), - ).map_err(Into::into) - }; - - match result { - Ok(_) => { - self.changes.commit_prospective(); - Ok(()) - } - Err(e) => { - self.changes.discard_prospective(); - Err(e) - } - } - } - - // executes a extrinsic, inherent or otherwise, without appending to the list. - fn apply_extrinsic(&mut self, extrinsic: UncheckedExtrinsic) -> Result<()> { - let result = { - let mut ext = state_machine::Ext::new(&mut self.changes, &self.state); +/// Mark for all Polkadot API implementations, that are making use of state data, stored locally. +pub trait LocalPolkadotApi: PolkadotApi {} - ::substrate_executor::with_native_environment( - &mut ext, - move || runtime::Executive::apply_extrinsic(extrinsic), - ).map_err(Into::into) - }; - - match result { - Ok(_) => { - self.changes.commit_prospective(); - Ok(()) - } - Err(e) => { - self.changes.discard_prospective(); - Err(e) - } - } - } -} - -impl BlockBuilder for ClientBlockBuilder - where S::Error: Into -{ - fn push_extrinsic(&mut self, extrinsic: UncheckedExtrinsic) -> Result<()> { - // Check that this is not an "inherent" extrinsic. - if extrinsic.signature == Default::default() { - bail!(ErrorKind::PushedInherentTransaction(extrinsic)); - } else { - self.apply_extrinsic(extrinsic.clone())?; - self.extrinsics.push(extrinsic); - Ok(()) - } - } - - fn bake(mut self) -> Block { - let mut ext = state_machine::Ext::new(&mut self.changes, &self.state); - - let final_header = ::substrate_executor::with_native_environment( - &mut ext, - move || runtime::Executive::finalise_block() - ).expect("all inherent extrinsics pushed; all other extrinsics executed correctly; qed"); - Block { - header: final_header, - extrinsics: self.extrinsics, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use keyring::Keyring; - use codec::Slicable; - use client::in_mem::Backend as InMemory; - use substrate_executor::NativeExecutionDispatch; - use runtime::{GenesisConfig, ConsensusConfig, SessionConfig, BuildExternalities}; - - fn validators() -> Vec { - vec![ - Keyring::One.to_raw_public(), - Keyring::Two.to_raw_public(), - ] - } - - fn client() -> Client> { - let genesis_config = GenesisConfig { - consensus: Some(ConsensusConfig { - code: LocalDispatch::native_equivalent().to_vec(), - authorities: validators(), - }), - system: None, - session: Some(SessionConfig { - validators: validators(), - session_length: 100, - }), - council: Some(Default::default()), - democracy: Some(Default::default()), - parachains: Some(Default::default()), - staking: Some(Default::default()), - }; - ::client::new_in_mem( - LocalDispatch::new(), - || { - let storage = genesis_config.build_externalities(); - let block = ::client::genesis::construct_genesis_block(&storage); - (substrate_primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) - } - ).unwrap() - } - - #[test] - fn gets_session_and_validator_keys() { - let client = client(); - let id = client.check_id(BlockId::Number(0)).unwrap(); - assert_eq!(client.session_keys(&id).unwrap(), validators()); - assert_eq!(client.validators(&id).unwrap(), validators()); - } - - #[test] - fn build_block() { - let client = client(); - - let id = client.check_id(BlockId::Number(0)).unwrap(); - let block_builder = client.build_block(&id, 1_000_000, Vec::new()).unwrap(); - let block = block_builder.bake(); - - assert_eq!(block.header.number, 1); - assert!(block.header.extrinsics_root != Default::default()); - } - - #[test] - fn fails_to_check_id_for_unknown_block() { - assert!(client().check_id(BlockId::Number(100)).is_err()); - } - - #[test] - fn gets_random_seed_with_genesis() { - let client = client(); - - let id = client.check_id(BlockId::Number(0)).unwrap(); - assert!(client.random_seed(&id).is_ok()); - } -} +/// Mark for all Polkadot API implementations, that are fetching required state data from remote nodes. +pub trait RemotePolkadotApi: PolkadotApi {} diff --git a/polkadot/api/src/light.rs b/polkadot/api/src/light.rs new file mode 100644 index 0000000000000..6038a8ec1fbcd --- /dev/null +++ b/polkadot/api/src/light.rs @@ -0,0 +1,106 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Strongly typed API for light Polkadot client. + +use std::sync::Arc; +use client::backend::{Backend, RemoteBackend}; +use client::{Client, CallExecutor}; +use codec::Slicable; +use state_machine; +use primitives::{AccountId, BlockId, Hash, Index, SessionKey, Timestamp}; +use primitives::parachain::{DutyRoster, CandidateReceipt, Id as ParaId}; +use runtime::{Block, UncheckedExtrinsic}; +use full::CheckedId; +use {PolkadotApi, RemotePolkadotApi, BlockBuilder, CheckedBlockId, Result, ErrorKind}; + +/// Remote polkadot API implementation. +pub struct RemotePolkadotApiWrapper(pub Arc>); + +/// Block builder for light client. +pub struct LightBlockBuilder; + +impl PolkadotApi for RemotePolkadotApiWrapper + where ::client::error::Error: From<<::State as state_machine::backend::Backend>::Error> +{ + type CheckedBlockId = CheckedId; + type BlockBuilder = LightBlockBuilder; + + fn check_id(&self, id: BlockId) -> Result { + Ok(CheckedId(id)) + } + + fn session_keys(&self, at: &CheckedId) -> Result> { + self.0.executor().call(at.block_id(), "authorities", &[]) + .and_then(|r| Vec::::decode(&mut &r.return_data[..]) + .ok_or("error decoding session keys".into())) + .map_err(Into::into) + } + + fn validators(&self, _at: &CheckedId) -> Result> { + Err(ErrorKind::UnknownRuntime.into()) + } + + fn random_seed(&self, _at: &Self::CheckedBlockId) -> Result { + Err(ErrorKind::UnknownRuntime.into()) + } + + fn duty_roster(&self, _at: &CheckedId) -> Result { + Err(ErrorKind::UnknownRuntime.into()) + } + + fn timestamp(&self, _at: &CheckedId) -> Result { + Err(ErrorKind::UnknownRuntime.into()) + } + + fn evaluate_block(&self, _at: &CheckedId, _block: Block) -> Result { + Err(ErrorKind::UnknownRuntime.into()) + } + + fn index(&self, _at: &CheckedId, _account: AccountId) -> Result { + Err(ErrorKind::UnknownRuntime.into()) + } + + fn active_parachains(&self, _at: &Self::CheckedBlockId) -> Result> { + Err(ErrorKind::UnknownRuntime.into()) + } + + fn parachain_code(&self, _at: &Self::CheckedBlockId, _parachain: ParaId) -> Result>> { + Err(ErrorKind::UnknownRuntime.into()) + } + + fn parachain_head(&self, _at: &Self::CheckedBlockId, _parachain: ParaId) -> Result>> { + Err(ErrorKind::UnknownRuntime.into()) + } + + fn build_block(&self, _parent: &CheckedId, _timestamp: Timestamp, _parachains: Vec) -> Result { + Err(ErrorKind::UnknownRuntime.into()) + } +} + +impl RemotePolkadotApi for RemotePolkadotApiWrapper + where ::client::error::Error: From<<::State as state_machine::backend::Backend>::Error> +{} + +impl BlockBuilder for LightBlockBuilder { + fn push_extrinsic(&mut self, _extrinsic: UncheckedExtrinsic) -> Result<()> { + Err(ErrorKind::UnknownRuntime.into()) + } + + fn bake(self) -> Block { + unimplemented!() + } +} diff --git a/polkadot/cli/src/cli.yml b/polkadot/cli/src/cli.yml index c839ba94f7238..81ea904959b09 100644 --- a/polkadot/cli/src/cli.yml +++ b/polkadot/cli/src/cli.yml @@ -37,6 +37,10 @@ args: long: validator help: Enable validator mode takes_value: false + - light: + long: light + help: Run in light client mode + takes_value: false - port: long: port value_name: PORT diff --git a/polkadot/cli/src/informant.rs b/polkadot/cli/src/informant.rs index 70c49f9c448f4..54affa839ee61 100644 --- a/polkadot/cli/src/informant.rs +++ b/polkadot/cli/src/informant.rs @@ -23,12 +23,18 @@ use tokio_core::reactor; use network::{SyncState, SyncProvider}; use runtime_support::Hashable; use primitives::block::HeaderHash; -use client::BlockchainEvents; +use state_machine; +use client::{self, BlockchainEvents}; const TIMER_INTERVAL_MS: u64 = 5000; /// Spawn informant on the event loop -pub fn start(service: &Service, handle: reactor::Handle) { +pub fn start(service: &Service, handle: reactor::Handle) + where + B: client::backend::Backend + Send + Sync + 'static, + E: client::CallExecutor + Send + Sync + 'static, + client::error::Error: From<<::State as state_machine::backend::Backend>::Error> +{ let interval = reactor::Interval::new_at(Instant::now(), Duration::from_millis(TIMER_INTERVAL_MS), &handle) .expect("Error creating informant timer"); diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 604c665910230..0934755f20d5d 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -107,16 +107,7 @@ pub fn run(args: I) -> error::Result<()> where I: IntoIterator, T: Into + Clone, { - let mut core = reactor::Core::new().expect("tokio::Core could not be created"); - let exit = { - // can't use signal directly here because CtrlC takes only `Fn`. - let (exit_send, exit) = mpsc::channel(1); - ctrlc::CtrlC::set_handler(move || { - exit_send.clone().send(()).wait().expect("Error sending exit notification"); - }); - - exit - }; + let core = reactor::Core::new().expect("tokio::Core could not be created"); let yaml = load_yaml!("./cli.yml"); let matches = match clap::App::from_yaml(yaml).version(crate_version!()).get_matches_from_safe(args) { @@ -152,10 +143,12 @@ pub fn run(args: I) -> error::Result<()> where if matches.is_present("collator") { info!("Starting collator."); role = service::Role::COLLATOR; - } - else if matches.is_present("validator") { + } else if matches.is_present("validator") { info!("Starting validator."); role = service::Role::VALIDATOR; + } else if matches.is_present("light") { + info!("Starting light."); + role = service::Role::LIGHT; } match matches.value_of("chain") { @@ -195,13 +188,33 @@ pub fn run(args: I) -> error::Result<()> where config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect(); - let service = service::Service::new(config)?; + match role == service::Role::LIGHT { + true => run_until_exit(core, service::new_light(config)?, &matches), + false => run_until_exit(core, service::new_full(config)?, &matches), + } +} + +fn run_until_exit(mut core: reactor::Core, service: service::Service, matches: &clap::ArgMatches) -> error::Result<()> + where + B: client::backend::Backend + Send + Sync + 'static, + E: client::CallExecutor + Send + Sync + 'static, + client::error::Error: From<<::State as state_machine::backend::Backend>::Error> +{ + let exit = { + // can't use signal directly here because CtrlC takes only `Fn`. + let (exit_send, exit) = mpsc::channel(1); + ctrlc::CtrlC::set_handler(move || { + exit_send.clone().send(()).wait().expect("Error sending exit notification"); + }); + + exit + }; informant::start(&service, core.handle()); let _rpc_servers = { - let http_address = parse_address("127.0.0.1:9933", "rpc-port", &matches)?; - let ws_address = parse_address("127.0.0.1:9944", "ws-port", &matches)?; + let http_address = parse_address("127.0.0.1:9933", "rpc-port", matches)?; + let ws_address = parse_address("127.0.0.1:9944", "ws-port", matches)?; let handler = || { let chain = rpc::apis::chain::Chain::new(service.client(), core.remote()); diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index cae27b16fd9b3..2177e874125bb 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -29,7 +29,7 @@ use ed25519; use futures::prelude::*; use futures::{future, Canceled}; use parking_lot::Mutex; -use polkadot_api::PolkadotApi; +use polkadot_api::LocalPolkadotApi; use polkadot_primitives::AccountId; use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt}; use primitives::{Hash, AuthorityId}; @@ -233,15 +233,17 @@ pub struct Service { impl Service { /// Create and start a new instance. - pub fn new( + pub fn new( client: Arc, + api: Arc, network: Arc, transaction_pool: Arc>, parachain_empty_duration: Duration, key: ed25519::Pair, ) -> Service where - C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static, + A: LocalPolkadotApi + Send + Sync + 'static, + C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + Send + Sync + 'static, { let (signal, exit) = ::exit_future::signal(); let thread = thread::spawn(move || { @@ -249,7 +251,7 @@ impl Service { let key = Arc::new(key); let factory = ProposerFactory { - client: client.clone(), + client: api.clone(), transaction_pool: transaction_pool.clone(), network: Network(network.clone()), collators: NoCollators, diff --git a/polkadot/service/Cargo.toml b/polkadot/service/Cargo.toml index 469dc2e2018db..bb9fb170bd2d4 100644 --- a/polkadot/service/Cargo.toml +++ b/polkadot/service/Cargo.toml @@ -27,3 +27,4 @@ substrate-client = { path = "../../substrate/client" } substrate-client-db = { path = "../../substrate/client/db" } substrate-codec = { path = "../../substrate/codec" } substrate-executor = { path = "../../substrate/executor" } +substrate-state-machine = { path = "../../substrate/state-machine" } diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 7ed649b672d4c..3dde4242bc520 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -35,6 +35,7 @@ extern crate substrate_network as network; extern crate substrate_codec as codec; extern crate substrate_client_db as client_db; extern crate substrate_executor; +extern crate substrate_state_machine as state_machine; extern crate exit_future; extern crate tokio_core; @@ -55,7 +56,7 @@ use futures::prelude::*; use parking_lot::Mutex; use tokio_core::reactor::Core; use codec::Slicable; -use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash}; +use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash, Header}; use primitives::{AuthorityId, hashing}; use transaction_pool::TransactionPool; use substrate_executor::NativeExecutor; @@ -64,31 +65,39 @@ use keystore::Store as Keystore; use polkadot_api::PolkadotApi; use polkadot_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfig, SessionConfig, StakingConfig, BuildExternalities}; -use client::{genesis, BlockchainEvents}; +use client::backend::Backend; +use client::{genesis, Client, BlockchainEvents, CallExecutor}; use network::ManageNetwork; use exit_future::Signal; pub use self::error::{ErrorKind, Error}; pub use config::{Configuration, Role, ChainSpec}; -type Client = client::Client>; +type CodeExecutor = NativeExecutor; /// Polkadot service. -pub struct Service { +pub struct Service { thread: Option>, - client: Arc, + client: Arc>, network: Arc, transaction_pool: Arc>, signal: Option, _consensus: Option, } -struct TransactionPoolAdapter { +struct TransactionPoolAdapter where A: Send + Sync, E: Send + Sync { pool: Arc>, - client: Arc, + client: Arc>, + api: Arc, } -impl network::TransactionPool for TransactionPoolAdapter { +impl network::TransactionPool for TransactionPoolAdapter + where + B: Backend + Send + Sync, + E: client::CallExecutor + Send + Sync, + client::error::Error: From<<::State as state_machine::backend::Backend>::Error>, + A: PolkadotApi + Send + Sync, +{ fn transactions(&self) -> Vec<(ExtrinsicHash, Vec)> { let best_block = match self.client.info() { Ok(info) => info.chain.best_hash, @@ -97,10 +106,11 @@ impl network::TransactionPool for TransactionPoolAdapter { return Vec::new(); } }; - let id = self.client.check_id(BlockId::Hash(best_block)).expect("Best block is always valid; qed."); + + let id = self.api.check_id(BlockId::Hash(best_block)).expect("Best block is always valid; qed."); let mut pool = self.pool.lock(); - pool.cull(None, transaction_pool::Ready::create(id, &*self.client)); - pool.pending(transaction_pool::Ready::create(id, &*self.client)).map(|t| { + pool.cull(None, transaction_pool::Ready::create(id.clone(), &*self.api)); + pool.pending(transaction_pool::Ready::create(id, &*self.api)).map(|t| { let hash = ::primitives::Hash::from(&t.hash()[..]); let tx = codec::Slicable::encode(t.as_transaction()); (hash, tx) @@ -257,16 +267,89 @@ fn local_testnet_config() -> ChainConfig { ]) } -impl Service { +struct GenesisBuilder { + config: GenesisConfig, +} + +impl client::GenesisBuilder for GenesisBuilder { + fn build(self) -> (Header, Vec<(Vec, Vec)>) { + let storage = self.config.build_externalities(); + let block = genesis::construct_genesis_block(&storage); + (primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) + } +} + +/// Creates light client and register protocol with the network service +pub fn new_light(config: Configuration) -> Result>>, error::Error> { + Service::new(move |_, executor, genesis_builder: GenesisBuilder| { + let client_backend = client::light::new_light_backend(); + let fetch_checker = Arc::new(client::light::new_fetch_checker(client_backend.clone(), executor)); + let fetcher = Arc::new(network::OnDemand::new(fetch_checker)); + let client = client::light::new_light(client_backend, fetcher.clone(), genesis_builder)?; + Ok((Arc::new(client), Some(fetcher))) + }, + |client| Arc::new(polkadot_api::light::RemotePolkadotApiWrapper(client.clone())), + |_client, _network, _tx_pool, _keystore| Ok(None), + config) +} + +/// Creates full client and register protocol with the network service +pub fn new_full(config: Configuration) -> Result>, error::Error> { + let is_validator = (config.roles & Role::VALIDATOR) == Role::VALIDATOR; + Service::new(|db_settings, executor, genesis_builder: GenesisBuilder| + Ok((Arc::new(client_db::new_client(db_settings, executor, genesis_builder)?), None)), + |client| client, + |client, network, tx_pool, keystore| { + if !is_validator { + return Ok(None); + } + + // Load the first available key. Code above makes sure it exisis. + let key = keystore.load(&keystore.contents()?[0], "")?; + info!("Using authority key {:?}", key.public()); + Ok(Some(consensus::Service::new( + client.clone(), + client.clone(), + network.clone(), + tx_pool.clone(), + ::std::time::Duration::from_millis(4000), // TODO: dynamic + key, + ))) + }, + config) +} + +impl Service + where + B: Backend + Send + Sync + 'static, + E: CallExecutor + Send + Sync + 'static, + client::error::Error: From<<::State as state_machine::backend::Backend>::Error> +{ /// Creates and register protocol with the network service - pub fn new(mut config: Configuration) -> Result { + fn new(client_creator: F, api_creator: G, consensus_creator: C, mut config: Configuration) -> Result + where + F: FnOnce( + client_db::DatabaseSettings, + CodeExecutor, + GenesisBuilder, + ) -> Result<(Arc>, Option>>), error::Error>, + G: Fn( + Arc>, + ) -> Arc, + C: Fn( + Arc>, + Arc, + Arc>, + &Keystore + ) -> Result, error::Error>, + A: PolkadotApi + Send + Sync + 'static, + { use std::sync::Barrier; let (signal, exit) = ::exit_future::signal(); // Create client let executor = polkadot_executor::Executor::new(); - let mut storage = Default::default(); let mut keystore = Keystore::open(config.keystore_path.into())?; for seed in &config.keys { @@ -285,10 +368,8 @@ impl Service { }; config.network.boot_nodes.extend(boot_nodes); - let prepare_genesis = || { - storage = genesis_config.build_externalities(); - let block = genesis::construct_genesis_block(&storage); - (primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) + let genesis_builder = GenesisBuilder { + config: genesis_config, }; let db_settings = client_db::DatabaseSettings { @@ -296,13 +377,15 @@ impl Service { path: config.database_path.into(), }; - let client = Arc::new(client_db::new_client(db_settings, executor, prepare_genesis)?); + let (client, on_demand) = client_creator(db_settings, executor, genesis_builder)?; + let api = api_creator(client.clone()); let best_header = client.best_block_header()?; info!("Starting Polkadot. Best block is #{}", best_header.number); let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool))); let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { pool: transaction_pool.clone(), client: client.clone(), + api: api.clone(), }); let network_params = network::Params { config: network::ProtocolConfig { @@ -310,11 +393,13 @@ impl Service { }, network_config: config.network, chain: client.clone(), + on_demand: on_demand.clone().map(|d| d as Arc), transaction_pool: transaction_pool_adapter, }; let network = network::Service::new(network_params)?; let barrier = ::std::sync::Arc::new(Barrier::new(2)); + on_demand.map(|on_demand| on_demand.set_service_link(Arc::downgrade(&network))); let thread = { let client = client.clone(); @@ -347,20 +432,7 @@ impl Service { barrier.wait(); // Spin consensus service if configured - let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR { - // Load the first available key. Code above makes sure it exisis. - let key = keystore.load(&keystore.contents()?[0], "")?; - info!("Using authority key {:?}", key.public()); - Some(consensus::Service::new( - client.clone(), - network.clone(), - transaction_pool.clone(), - ::std::time::Duration::from_millis(4000), // TODO: dynamic - key, - )) - } else { - None - }; + let consensus_service = consensus_creator(client.clone(), network.clone(), transaction_pool.clone(), &keystore)?; Ok(Service { thread: Some(thread), @@ -373,7 +445,7 @@ impl Service { } /// Get shared client instance. - pub fn client(&self) -> Arc { + pub fn client(&self) -> Arc> { self.client.clone() } @@ -396,7 +468,12 @@ fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) { } /// Produce a task which prunes any finalized transactions from the pool. -pub fn prune_imported(client: &Client, pool: &Mutex, hash: HeaderHash) { +pub fn prune_imported(client: &Client, pool: &Mutex, hash: HeaderHash) + where + B: Backend + Send + Sync, + E: CallExecutor + Send + Sync, + client::error::Error: From<<::State as state_machine::backend::Backend>::Error> +{ let id = BlockId::Hash(hash); match client.body(&id) { Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]), @@ -405,7 +482,7 @@ pub fn prune_imported(client: &Client, pool: &Mutex, hash: Head } } -impl Drop for Service { +impl Drop for Service { fn drop(&mut self) { self.network.stop_network(); diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index 5a21021862944..6ba67621d8ecc 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -65,14 +65,15 @@ pub struct DatabaseSettings { pub fn new_client( settings: DatabaseSettings, executor: E, - build_genesis: F -) -> Result, client::error::Error> + genesis_builder: F, +) -> Result>, client::error::Error> where E: CodeExecutor, - F: FnOnce() -> (block::Header, Vec<(Vec, Vec)>) + F: client::GenesisBuilder, { - let backend = Backend::new(&settings)?; - Ok(client::Client::new(backend, executor, build_genesis)?) + let backend = Arc::new(Backend::new(&settings)?); + let executor = client::LocalCallExecutor::new(backend.clone(), executor); + Ok(client::Client::new(backend, executor, genesis_builder)?) } mod columns { @@ -265,8 +266,8 @@ pub struct BlockImportOperation { impl client::backend::BlockImportOperation for BlockImportOperation { type State = DbState; - fn state(&self) -> Result<&Self::State, client::error::Error> { - Ok(&self.old_state) + fn state(&self) -> Result, client::error::Error> { + Ok(Some(&self.old_state)) } fn set_block_data(&mut self, header: block::Header, body: Option, justification: Option, is_best: bool) -> Result<(), client::error::Error> { @@ -534,6 +535,8 @@ impl client::backend::Backend for Backend { } } +impl client::backend::LocalBackend for Backend {} + #[cfg(test)] mod tests { use super::*; diff --git a/substrate/client/src/backend.rs b/substrate/client/src/backend.rs index dad95dc0fdbee..f6c44275f8aed 100644 --- a/substrate/client/src/backend.rs +++ b/substrate/client/src/backend.rs @@ -26,8 +26,8 @@ pub trait BlockImportOperation { /// Associated state backend type. type State: StateBackend; - /// Returns pending state. - fn state(&self) -> error::Result<&Self::State>; + /// Returns pending state. Returns None for backends with locally-unavailable state data. + fn state(&self) -> error::Result>; /// Append block data to the transaction. fn set_block_data(&mut self, header: block::Header, body: Option, justification: Option, is_new_best: bool) -> error::Result<()>; /// Inject storage data into the database. @@ -44,7 +44,7 @@ pub trait BlockImportOperation { /// /// The same applies for live `BlockImportOperation`s: while an import operation building on a parent `P` /// is alive, the state for `P` should not be pruned. -pub trait Backend { +pub trait Backend: Send + Sync { /// Associated block insertion operation type. type BlockImportOperation: BlockImportOperation; /// Associated blockchain backend type. @@ -62,3 +62,9 @@ pub trait Backend { /// Returns state backend with post-state of given block. fn state_at(&self, block: BlockId) -> error::Result; } + +/// Mark for all Backend implementations, that are making use of state data, stored locally. +pub trait LocalBackend: Backend {} + +/// Mark for all Backend implementations, that are fetching required state data from remote nodes. +pub trait RemoteBackend: Backend {} diff --git a/substrate/client/src/block_builder.rs b/substrate/client/src/block_builder.rs index 5190a01634677..37e26a6a27e3b 100644 --- a/substrate/client/src/block_builder.rs +++ b/substrate/client/src/block_builder.rs @@ -18,16 +18,16 @@ use std::vec::Vec; use codec::{Joiner, Slicable}; -use state_machine::{self, CodeExecutor}; +use state_machine; use primitives::{Header, Block}; use primitives::block::{Id as BlockId, Extrinsic}; -use {backend, error, Client}; +use {backend, error, Client, CallExecutor}; use triehash::ordered_trie_root; /// Utility for building new (valid) blocks from a stream of transactions. pub struct BlockBuilder where B: backend::Backend, - E: CodeExecutor + Clone, + E: CallExecutor + Clone, error::Error: From<<::State as state_machine::backend::Backend>::Error>, { header: Header, @@ -39,7 +39,7 @@ pub struct BlockBuilder where impl BlockBuilder where B: backend::Backend, - E: CodeExecutor + Clone, + E: CallExecutor + Clone, error::Error: From<<::State as state_machine::backend::Backend>::Error>, { /// Create a new instance of builder from the given client, building on the latest block. @@ -59,7 +59,7 @@ impl BlockBuilder where digest: Default::default(), }, transactions: Default::default(), - executor: client.clone_executor(), + executor: client.executor().clone(), state: client.state_at(block_id)?, changes: Default::default(), }) @@ -69,7 +69,10 @@ impl BlockBuilder where /// can be validly executed (by executing it); if it is invalid, it'll be returned along with /// the error. Otherwise, it will return a mutable reference to self (in order to chain). pub fn push(&mut self, tx: Extrinsic) -> error::Result<()> { - let (output, _) = state_machine::execute(&self.state, &mut self.changes, &self.executor, "execute_transaction", + let (output, _) = self.executor.call_at_state( + &self.state, + &mut self.changes, + "execute_transaction", &vec![].and(&self.header).and(&tx))?; self.header = Header::decode(&mut &output[..]).expect("Header came straight out of runtime so must be valid"); self.transactions.push(tx); @@ -79,7 +82,10 @@ impl BlockBuilder where /// Consume the builder to return a valid `Block` containing all pushed transactions. pub fn bake(mut self) -> error::Result { self.header.extrinsics_root = ordered_trie_root(self.transactions.iter().map(Slicable::encode)).0.into(); - let (output, _) = state_machine::execute(&self.state, &mut self.changes, &self.executor, "finalise_block", + let (output, _) = self.executor.call_at_state( + &self.state, + &mut self.changes, + "finalise_block", &self.header.encode())?; self.header = Header::decode(&mut &output[..]).expect("Header came straight out of runtime so must be valid"); Ok(Block { diff --git a/substrate/client/src/call_executor.rs b/substrate/client/src/call_executor.rs new file mode 100644 index 0000000000000..b1b4e2a15653c --- /dev/null +++ b/substrate/client/src/call_executor.rs @@ -0,0 +1,203 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use std::sync::Arc; +use futures::{IntoFuture, Future}; +use primitives::block::Id as BlockId; +use state_machine::{self, OverlayedChanges, Backend as StateBackend, CodeExecutor}; +use state_machine::backend::InMemory as InMemoryStateBackend; +use triehash::trie_root; + +use backend; +use blockchain::Backend as ChainBackend; +use error; +use light::{Fetcher, RemoteCallRequest}; + +/// Information regarding the result of a call. +#[derive(Debug)] +pub struct CallResult { + /// The data that was returned from the call. + pub return_data: Vec, + /// The changes made to the state by the call. + pub changes: OverlayedChanges, +} + +/// Method call executor. +pub trait CallExecutor { + /// Externalities error type. + type Error: state_machine::Error; + + /// Execute a call to a contract on top of state in a block of given hash. + /// + /// No changes are made. + fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> Result; + + /// Execute a call to a contract on top of given state. + /// + /// No changes are made. + fn call_at_state(&self, state: &S, overlay: &mut OverlayedChanges, method: &str, call_data: &[u8]) -> Result<(Vec, S::Transaction), error::Error>; +} + +/// Call executor that executes methods locally, querying all required +/// data from local backend. +pub struct LocalCallExecutor { + backend: Arc, + executor: E, +} + +/// Call executor that executes methods on remote node, querying execution proof +/// and checking proof by re-executing locally. +pub struct RemoteCallExecutor { + backend: Arc, + fetcher: Arc, +} + +impl LocalCallExecutor { + /// Creates new instance of local call executor. + pub fn new(backend: Arc, executor: E) -> Self { + LocalCallExecutor { backend, executor } + } +} + +impl Clone for LocalCallExecutor where E: Clone { + fn clone(&self) -> Self { + LocalCallExecutor { + backend: self.backend.clone(), + executor: self.executor.clone(), + } + } +} + +impl CallExecutor for LocalCallExecutor + where + B: backend::LocalBackend, + E: CodeExecutor, + error::Error: From<<::State as StateBackend>::Error>, +{ + type Error = E::Error; + + fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> error::Result { + let mut changes = OverlayedChanges::default(); + let (return_data, _) = self.call_at_state(&self.backend.state_at(*id)?, &mut changes, method, call_data)?; + Ok(CallResult{ return_data, changes }) + } + + fn call_at_state(&self, state: &S, changes: &mut OverlayedChanges, method: &str, call_data: &[u8]) -> error::Result<(Vec, S::Transaction)> { + state_machine::execute( + state, + changes, + &self.executor, + method, + call_data, + ).map_err(Into::into) + } +} + +impl RemoteCallExecutor { + /// Creates new instance of remote call executor. + pub fn new(backend: Arc, fetcher: Arc) -> Self { + RemoteCallExecutor { backend, fetcher } + } +} + +impl CallExecutor for RemoteCallExecutor + where + B: backend::RemoteBackend, + F: Fetcher, + error::Error: From<<::State as StateBackend>::Error>, +{ + type Error = error::Error; + + fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> error::Result { + let block_hash = match *id { + BlockId::Hash(hash) => hash, + BlockId::Number(number) => self.backend.blockchain().hash(number)? + .ok_or_else(|| error::ErrorKind::UnknownBlock(BlockId::Number(number)))?, + }; + + self.fetcher.remote_call(RemoteCallRequest { + block: block_hash, + method: method.into(), + call_data: call_data.to_vec(), + }).into_future().wait() + } + + fn call_at_state(&self, _state: &S, _changes: &mut OverlayedChanges, _method: &str, _call_data: &[u8]) -> error::Result<(Vec, S::Transaction)> { + Err(error::ErrorKind::NotAvailableOnLightClient.into()) + } +} + +/// Check remote execution proof. +pub fn check_execution_proof(backend: &B, executor: &E, request: &RemoteCallRequest, remote_proof: (Vec, Vec>)) -> Result + where + B: backend::RemoteBackend, + E: CodeExecutor, + error::Error: From<<::State as StateBackend>::Error>, +{ + let (remote_result, remote_proof) = remote_proof; + + let remote_state = state_from_execution_proof(remote_proof); + let remote_state_root = trie_root(remote_state.pairs().into_iter()).0; + + let local_header = backend.blockchain().header(BlockId::Hash(request.block))?; + let local_header = local_header.ok_or_else(|| error::ErrorKind::UnknownBlock(BlockId::Hash(request.block)))?; + let local_state_root = local_header.state_root; + + if remote_state_root != *local_state_root { + return Err(error::ErrorKind::InvalidExecutionProof.into()); + } + + let mut changes = OverlayedChanges::default(); + let (local_result, _) = state_machine::execute( + &remote_state, + &mut changes, + executor, + &request.method, + &request.call_data, + )?; + + if local_result != remote_result { + return Err(error::ErrorKind::InvalidExecutionProof.into()); + } + + Ok(CallResult { return_data: local_result, changes }) +} + +/// Convert state to execution proof. Proof is simple the whole state (temporary). +// TODO [light]: this method must be removed after trie-based proofs are landed. +pub fn state_to_execution_proof(state: &B) -> Vec> { + state.pairs().into_iter() + .flat_map(|(k, v)| ::std::iter::once(k).chain(::std::iter::once(v))) + .collect() +} + +/// Convert execution proof to in-memory state for check. Reverse function for state_to_execution_proof. +// TODO [light]: this method must be removed after trie-based proofs are landed. +fn state_from_execution_proof(proof: Vec>) -> InMemoryStateBackend { + let mut changes = Vec::new(); + let mut proof_iter = proof.into_iter(); + loop { + let key = proof_iter.next(); + let value = proof_iter.next(); + if let (Some(key), Some(value)) = (key, value) { + changes.push((key, Some(value))); + } else { + break; + } + } + + InMemoryStateBackend::default().update(changes) +} diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 30ae5a261134a..2e298b6563e7c 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -16,25 +16,33 @@ //! Substrate Client +use std::sync::Arc; use futures::sync::mpsc; use parking_lot::Mutex; use primitives::{self, block, AuthorityId}; use primitives::block::Id as BlockId; use primitives::storage::{StorageKey, StorageData}; use runtime_support::Hashable; -use codec::{KeyedVec, Slicable}; +use codec::Slicable; use state_machine::{self, Ext, OverlayedChanges, Backend as StateBackend, CodeExecutor}; use backend::{self, BlockImportOperation}; use blockchain::{self, Info as ChainInfo, Backend as ChainBackend}; +use call_executor::{CallExecutor, LocalCallExecutor}; use {error, in_mem, block_builder, runtime_io, bft}; /// Type that implements `futures::Stream` of block import events. pub type BlockchainEventStream = mpsc::UnboundedReceiver; +/// Polkadot Client genesis block builder. +pub trait GenesisBuilder { + /// Build genesis block. + fn build(self) -> (block::Header, Vec<(Vec, Vec)>); +} + /// Polkadot Client pub struct Client { - backend: B, + backend: Arc, executor: E, import_notification_sinks: Mutex>>, } @@ -63,14 +71,6 @@ pub struct ClientInfo { pub best_queued_hash: Option, } -/// Information regarding the result of a call. -pub struct CallResult { - /// The data that was returned from the call. - pub return_data: Vec, - /// The changes made to the state by the call. - pub changes: OverlayedChanges, -} - /// Block import result. #[derive(Debug)] pub enum ImportResult { @@ -146,32 +146,34 @@ impl JustifiedHeader { /// Create an instance of in-memory client. pub fn new_in_mem( executor: E, - build_genesis: F -) -> error::Result> + genesis_builder: F +) -> error::Result>> where E: CodeExecutor, - F: FnOnce() -> (block::Header, Vec<(Vec, Vec)>) + F: GenesisBuilder, { - Client::new(in_mem::Backend::new(), executor, build_genesis) + let backend = Arc::new(in_mem::Backend::new()); + let executor = LocalCallExecutor::new(backend.clone(), executor); + Client::new(backend, executor, genesis_builder) } impl Client where B: backend::Backend, - E: CodeExecutor, + E: CallExecutor, error::Error: From<<::State as StateBackend>::Error>, { /// Creates new Polkadot Client with given blockchain and code executor. pub fn new( - backend: B, + backend: Arc, executor: E, - build_genesis: F + genesis_builder: F, ) -> error::Result where - F: FnOnce() -> (block::Header, Vec<(Vec, Vec)>) + F: GenesisBuilder { if backend.blockchain().header(BlockId::Number(0))?.is_none() { trace!("Empty database, writing genesis block"); - let (genesis_header, genesis_store) = build_genesis(); + let (genesis_header, genesis_store) = genesis_builder.build(); let mut op = backend.begin_operation(BlockId::Hash(block::HeaderHash::default()))?; op.reset_storage(genesis_store.into_iter())?; op.set_block_data(genesis_header, Some(vec![]), None, true)?; @@ -190,7 +192,7 @@ impl Client where } /// Expose backend reference. To be used in tests only - pub fn backend(&self) -> &B { + pub fn backend(&self) -> &Arc { &self.backend } @@ -207,36 +209,29 @@ impl Client where self.storage(id, &StorageKey(b":code".to_vec())).map(|data| data.0) } - /// Clone a new instance of Executor. - pub fn clone_executor(&self) -> E where E: Clone { - self.executor.clone() + /// Get the set of authorities at a given block. + pub fn authorities_at(&self, id: &BlockId) -> error::Result> { + self.executor.call(id, "authorities",&[]) + .and_then(|r| Vec::::decode(&mut &r.return_data[..]) + .ok_or(error::ErrorKind::AuthLenInvalid.into())) } - /// Get the current set of authorities from storage. - pub fn authorities_at(&self, id: &BlockId) -> error::Result> { - let state = self.state_at(id)?; - (0..u32::decode(&mut state.storage(b":auth:len")?.ok_or(error::ErrorKind::AuthLenEmpty)?.as_slice()).ok_or(error::ErrorKind::AuthLenInvalid)?) - .map(|i| state.storage(&i.to_keyed_vec(b":auth:")) - .map_err(|e| error::Error::from(e).into()) - .and_then(|v| v.ok_or(error::ErrorKind::AuthEmpty(i))) - .and_then(|s| AuthorityId::decode(&mut s.as_slice()).ok_or(error::ErrorKind::AuthInvalid(i))) - .map_err(Into::into) - ).collect() + /// Get call executor reference. + pub fn executor(&self) -> &E { + &self.executor } - /// Execute a call to a contract on top of state in a block of given hash. + /// Execute a call to a contract on top of state in a block of given hash + /// AND returning execution proof. /// /// No changes are made. - pub fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> error::Result { - let mut changes = OverlayedChanges::default(); - let (return_data, _) = state_machine::execute( - &self.state_at(id)?, - &mut changes, - &self.executor, - method, - call_data, - )?; - Ok(CallResult { return_data, changes }) + pub fn execution_proof(&self, id: &BlockId, method: &str, call_data: &[u8]) -> error::Result<(Vec, Vec>)> { + use call_executor::state_to_execution_proof; + + let result = self.executor.call(id, method, call_data); + let result = result?.return_data; + let proof = self.backend.state_at(*id).map(|state| state_to_execution_proof(&state))?; + Ok((result, proof)) } /// Set up the native execution environment to call into a native runtime code. @@ -297,21 +292,28 @@ impl Client where } let mut transaction = self.backend.begin_operation(BlockId::Hash(header.parent_hash))?; - let mut overlay = OverlayedChanges::default(); - - let (_out, storage_update) = state_machine::execute( - transaction.state()?, - &mut overlay, - &self.executor, - "execute_block", - &block::Block { header: header.clone(), transactions: body.clone().unwrap_or_default().clone() }.encode() - )?; + let storage_update = match transaction.state()? { + Some(transaction_state) => { + let mut overlay = Default::default(); + let (_, storage_update) = self.executor.call_at_state( + transaction_state, + &mut overlay, + "execute_block", + &block::Block { header: header.clone(), transactions: body.clone().unwrap_or_default().clone() }.encode(), + )?; + + Some(storage_update) + }, + None => None, + }; let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1; let hash: block::HeaderHash = header.blake2_256().into(); trace!("Imported {}, (#{}), best={}, origin={:?}", hash, header.number, is_new_best, origin); transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?; - transaction.update_storage(storage_update)?; + if let Some(storage_update) = storage_update { + transaction.update_storage(storage_update)?; + } self.backend.commit_operation(transaction)?; if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast { @@ -392,7 +394,7 @@ impl Client where impl bft::BlockImport for Client where B: backend::Backend, - E: state_machine::CodeExecutor, + E: CallExecutor, error::Error: From<::Error> { fn import_block(&self, block: block::Block, justification: bft::Justification) { @@ -408,7 +410,7 @@ impl bft::BlockImport for Client impl bft::Authorities for Client where B: backend::Backend, - E: state_machine::CodeExecutor, + E: CallExecutor, error::Error: From<::Error> { fn authorities(&self, at: &BlockId) -> Result, bft::Error> { @@ -419,7 +421,7 @@ impl bft::Authorities for Client impl BlockchainEvents for Client where B: backend::Backend, - E: state_machine::CodeExecutor, + E: CallExecutor, error::Error: From<::Error> { /// Get block import event stream. @@ -433,7 +435,7 @@ impl BlockchainEvents for Client impl ChainHead for Client where B: backend::Backend, - E: state_machine::CodeExecutor, + E: CallExecutor, error::Error: From<::Error> { fn best_block_header(&self) -> error::Result { diff --git a/substrate/client/src/error.rs b/substrate/client/src/error.rs index 6b34ac6eb6a07..9f0e0ac1f8f8e 100644 --- a/substrate/client/src/error.rs +++ b/substrate/client/src/error.rs @@ -81,6 +81,24 @@ error_chain! { description("bad justification for header"), display("bad justification for header: {}", h), } + + /// Not available on light client. + NotAvailableOnLightClient { + description("not available on light client"), + display("This method is not currently available when running in light client mode"), + } + + /// Invalid remote proof. + InvalidExecutionProof { + description("invalid execution proof"), + display("Remote node has responded with invalid execution proof"), + } + + /// Invalid remote proof. + RemoteFetchCancelled { + description("remote fetch cancelled"), + display("Remote data fetch has been cancelled"), + } } } diff --git a/substrate/client/src/in_mem.rs b/substrate/client/src/in_mem.rs index 138b6f27a24f8..cb9102fb7829c 100644 --- a/substrate/client/src/in_mem.rs +++ b/substrate/client/src/in_mem.rs @@ -26,10 +26,6 @@ use primitives::block::{self, Id as BlockId, HeaderHash}; use blockchain::{self, BlockStatus}; use state_machine::backend::{Backend as StateBackend, InMemory}; -fn header_hash(header: &block::Header) -> block::HeaderHash { - header.blake2_256().into() -} - struct PendingBlock { block: Block, is_best: bool, @@ -65,14 +61,8 @@ impl Clone for Blockchain { } impl Blockchain { - fn id(&self, id: BlockId) -> Option { - match id { - BlockId::Hash(h) => Some(h), - BlockId::Number(n) => self.storage.read().hashes.get(&n).cloned(), - } - } - - fn new() -> Blockchain { + /// Create new in-memory blockchain storage. + pub fn new() -> Blockchain { Blockchain { storage: RwLock::new( BlockchainStorage { @@ -85,7 +75,16 @@ impl Blockchain { } } - fn insert(&self, hash: HeaderHash, header: block::Header, justification: Option, body: Option, is_new_best: bool) { + /// Get header hash of given block. + pub fn id(&self, id: BlockId) -> Option { + match id { + BlockId::Hash(h) => Some(h), + BlockId::Number(n) => self.storage.read().hashes.get(&n).cloned(), + } + } + + /// Insert block. + pub fn insert(&self, hash: HeaderHash, header: block::Header, justification: Option, body: Option, is_new_best: bool) { let number = header.number; let mut storage = self.storage.write(); storage.blocks.insert(hash, Block { @@ -113,7 +112,7 @@ impl Blockchain { let this = self.storage.read(); let other = other.storage.read(); this.hashes == other.hashes - && this.best_hash == other.best_hash + && this.best_hash == other.best_hash && this.best_number == other.best_number && this.genesis_hash == other.genesis_hash } @@ -163,8 +162,8 @@ pub struct BlockImportOperation { impl backend::BlockImportOperation for BlockImportOperation { type State = InMemory; - fn state(&self) -> error::Result<&Self::State> { - Ok(&self.old_state) + fn state(&self) -> error::Result> { + Ok(Some(&self.old_state)) } fn set_block_data(&mut self, header: block::Header, body: Option, justification: Option, is_new_best: bool) -> error::Result<()> { @@ -227,7 +226,7 @@ impl backend::Backend for Backend { fn commit_operation(&self, operation: Self::BlockImportOperation) -> error::Result<()> { if let Some(pending_block) = operation.pending_block { - let hash = header_hash(&pending_block.block.header); + let hash = pending_block.block.header.blake2_256().into(); let old_state = &operation.old_state; self.states.write().insert(hash, operation.new_state.unwrap_or_else(|| old_state.clone())); self.blockchain.insert(hash, pending_block.block.header, pending_block.block.justification, pending_block.block.body, pending_block.is_best); @@ -246,3 +245,5 @@ impl backend::Backend for Backend { } } } + +impl backend::LocalBackend for Backend {} diff --git a/substrate/client/src/lib.rs b/substrate/client/src/lib.rs index 7ef6b65d94314..8045480fbd5c5 100644 --- a/substrate/client/src/lib.rs +++ b/substrate/client/src/lib.rs @@ -17,6 +17,7 @@ //! Substrate Client and associated logic. #![warn(missing_docs)] +#![recursion_limit="128"] extern crate substrate_bft as bft; extern crate substrate_codec as codec; @@ -43,12 +44,17 @@ pub mod backend; pub mod in_mem; pub mod genesis; pub mod block_builder; +pub mod light; +mod call_executor; mod client; pub use client::{ new_in_mem, BlockStatus, BlockOrigin, BlockchainEventStream, BlockchainEvents, - Client, ClientInfo, CallResult, ChainHead, - ImportResult, + Client, ClientInfo, ChainHead, + ImportResult, GenesisBuilder, }; pub use blockchain::Info as ChainInfo; +pub use call_executor::{ + CallResult, CallExecutor, LocalCallExecutor, RemoteCallExecutor, +}; diff --git a/substrate/client/src/light.rs b/substrate/client/src/light.rs new file mode 100644 index 0000000000000..0e9b735be8460 --- /dev/null +++ b/substrate/client/src/light.rs @@ -0,0 +1,243 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Light client backend. Only stores headers and justifications of blocks. +//! Everything else is requested from full nodes on demand. + +use std::sync::Arc; +use futures::future::IntoFuture; +use primitives; +use primitives::block::{self, Id as BlockId, HeaderHash}; +use runtime_support::Hashable; +use state_machine::CodeExecutor; +use state_machine::backend::Backend as StateBackend; +use blockchain::{self, BlockStatus}; +use backend; +use call_executor::{CallResult, RemoteCallExecutor, check_execution_proof}; +use client::{Client, GenesisBuilder}; +use error; +use in_mem::Blockchain as InMemBlockchain; + +/// Remote call request. +pub struct RemoteCallRequest { + /// Call at state of given block. + pub block: HeaderHash, + /// Method to call. + pub method: String, + /// Call data. + pub call_data: Vec, +} + +/// Light client data fetcher. Implementations of this trait must check if remote data +/// is correct (see FetchedDataChecker) and return already checked data. +pub trait Fetcher: Send + Sync { + /// Remote call result future. + type RemoteCallResult: IntoFuture; + + /// Fetch remote call result. + fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult; +} + +/// Light client remote data checker. +pub trait FetchChecker: Send + Sync { + /// Check remote method execution proof. + fn check_execution_proof(&self, request: &RemoteCallRequest, remote_proof: (Vec, Vec>)) -> error::Result; +} + +/// Light client backend. +pub struct Backend { + blockchain: Blockchain, +} + +/// Light client blockchain. +pub struct Blockchain { + storage: InMemBlockchain, +} + +/// Block (header and justification) import operation. +pub struct BlockImportOperation { + pending_block: Option, +} + +/// On-demand state. +#[derive(Clone)] +pub struct OnDemandState { + /// Hash of the block, state is valid for. + _block: HeaderHash, +} + +/// Remote data checker. +pub struct LightDataChecker { + /// Backend reference. + backend: Arc, + /// Executor. + executor: E, +} + +struct PendingBlock { + header: block::Header, + justification: Option, + is_best: bool, +} + +impl backend::Backend for Backend { + type BlockImportOperation = BlockImportOperation; + type Blockchain = Blockchain; + type State = OnDemandState; + + fn begin_operation(&self, _block: BlockId) -> error::Result { + Ok(BlockImportOperation { + pending_block: None, + }) + } + + fn commit_operation(&self, operation: Self::BlockImportOperation) -> error::Result<()> { + if let Some(pending_block) = operation.pending_block { + let hash = pending_block.header.blake2_256().into(); + self.blockchain.storage.insert(hash, pending_block.header, pending_block.justification, None, pending_block.is_best); + } + Ok(()) + } + + fn blockchain(&self) -> &Blockchain { + &self.blockchain + } + + fn state_at(&self, block: BlockId) -> error::Result { + Ok(OnDemandState { + _block: self.blockchain.storage.id(block).ok_or(error::ErrorKind::UnknownBlock(block))?, + }) + } +} + +impl backend::RemoteBackend for Backend {} + +impl backend::BlockImportOperation for BlockImportOperation { + type State = OnDemandState; + + fn state(&self) -> error::Result> { + // None means 'locally-stateless' backend + Ok(None) + } + + fn set_block_data(&mut self, header: block::Header, _body: Option, justification: Option, is_new_best: bool) -> error::Result<()> { + assert!(self.pending_block.is_none(), "Only one block per operation is allowed"); + self.pending_block = Some(PendingBlock { + header, + justification, + is_best: is_new_best, + }); + Ok(()) + } + + fn update_storage(&mut self, _update: ::Transaction) -> error::Result<()> { + // we're not storing anything locally => ignore changes + Ok(()) + } + + fn reset_storage, Vec)>>(&mut self, _iter: I) -> error::Result<()> { + // we're not storing anything locally => ignore changes + Ok(()) + } +} + +impl blockchain::Backend for Blockchain { + fn header(&self, id: BlockId) -> error::Result> { + self.storage.header(id) + } + + fn body(&self, _id: BlockId) -> error::Result> { + // TODO [light]: fetch from remote node + Ok(None) + } + + fn justification(&self, id: BlockId) -> error::Result> { + self.storage.justification(id) + } + + fn info(&self) -> error::Result { + self.storage.info() + } + + fn status(&self, id: BlockId) -> error::Result { + self.storage.status(id) + } + + fn hash(&self, number: block::Number) -> error::Result> { + self.storage.hash(number) + } +} + +impl StateBackend for OnDemandState { + type Error = error::Error; + type Transaction = (); + + fn storage(&self, _key: &[u8]) -> Result>, Self::Error> { + // TODO [light]: fetch from remote node + Err(error::ErrorKind::NotAvailableOnLightClient.into()) + } + + fn storage_root(&self, _delta: I) -> ([u8; 32], Self::Transaction) + where I: IntoIterator, Option>)> { + ([0; 32], ()) + } + + fn pairs(&self) -> Vec<(Vec, Vec)> { + // whole state is not available on light node + Vec::new() + } +} + +impl FetchChecker for LightDataChecker + where + E: CodeExecutor, +{ + fn check_execution_proof(&self, request: &RemoteCallRequest, remote_proof: (Vec, Vec>)) -> error::Result { + check_execution_proof(&*self.backend, &self.executor, request, remote_proof) + } +} + +/// Create an instance of light client backend. +pub fn new_light_backend() -> Arc { + let storage = InMemBlockchain::new(); + let blockchain = Blockchain { storage }; + Arc::new(Backend { blockchain }) +} + +/// Create an instance of light client. +pub fn new_light( + backend: Arc, + fetcher: Arc, + genesis_builder: B, +) -> error::Result>> + where + F: Fetcher, + B: GenesisBuilder, +{ + let executor = RemoteCallExecutor::new(backend.clone(), fetcher); + Client::new(backend, executor, genesis_builder) +} + +/// Create an instance of fetch data checker. +pub fn new_fetch_checker( + backend: Arc, + executor: E, +) -> LightDataChecker + where + E: CodeExecutor, +{ + LightDataChecker { backend, executor } +} diff --git a/substrate/network/Cargo.toml b/substrate/network/Cargo.toml index 6a540d4b76a66..ca715c551c1b1 100644 --- a/substrate/network/Cargo.toml +++ b/substrate/network/Cargo.toml @@ -17,6 +17,7 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" futures = "0.1.17" +linked-hash-map = "0.5" ethcore-network = { git = "https://github.com/paritytech/parity.git" } ethcore-network-devp2p = { git = "https://github.com/paritytech/parity.git" } ethcore-io = { git = "https://github.com/paritytech/parity.git" } diff --git a/substrate/network/src/chain.rs b/substrate/network/src/chain.rs index 10d7a0c7d5a79..cb09f88eada54 100644 --- a/substrate/network/src/chain.rs +++ b/substrate/network/src/chain.rs @@ -16,7 +16,7 @@ //! Blockchain access trait -use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus, BlockOrigin}; +use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus, BlockOrigin, CallExecutor}; use client::error::Error; use state_machine; use primitives::block::{self, Id as BlockId}; @@ -43,11 +43,14 @@ pub trait Client: Send + Sync { /// Get block justification. fn justification(&self, id: &BlockId) -> Result, Error>; + + /// Get method execution proof. + fn execution_proof(&self, block: &block::HeaderHash, method: &str, data: &[u8]) -> Result<(Vec, Vec>), Error>; } impl Client for PolkadotClient where B: client::backend::Backend + Send + Sync + 'static, - E: state_machine::CodeExecutor + Send + Sync + 'static, + E: CallExecutor + Send + Sync + 'static, Error: From<<::State as state_machine::backend::Backend>::Error>, { fn import(&self, is_best: bool, header: block::Header, justification: Justification, body: Option) -> Result { @@ -80,4 +83,8 @@ impl Client for PolkadotClient where fn justification(&self, id: &BlockId) -> Result, Error> { (self as &PolkadotClient).justification(id) } + + fn execution_proof(&self, block: &block::HeaderHash, method: &str, data: &[u8]) -> Result<(Vec, Vec>), Error> { + (self as &PolkadotClient).execution_proof(&BlockId::Hash(block.clone()), method, data) + } } diff --git a/substrate/network/src/lib.rs b/substrate/network/src/lib.rs index 3cc9d994eee91..f894def9c4f6c 100644 --- a/substrate/network/src/lib.rs +++ b/substrate/network/src/lib.rs @@ -22,6 +22,7 @@ extern crate ethcore_network_devp2p as network_devp2p; extern crate ethcore_network as network; extern crate ethcore_io as core_io; +extern crate linked_hash_map; extern crate rand; extern crate parking_lot; extern crate substrate_primitives as primitives; @@ -53,6 +54,7 @@ mod config; mod chain; mod blocks; mod consensus; +mod on_demand; pub mod error; #[cfg(test)] mod test; @@ -66,6 +68,7 @@ pub use network_devp2p::{ConnectionFilter, ConnectionDirection}; pub use message::{Statement, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal}; pub use error::Error; pub use config::{Role, ProtocolConfig}; +pub use on_demand::{OnDemand, OnDemandService, Response as OnDemandResponse}; // TODO: move it elsewhere fn header_hash(header: &primitives::Header) -> primitives::block::HeaderHash { diff --git a/substrate/network/src/message.rs b/substrate/network/src/message.rs index 8e1fe804dda05..032cd375f0901 100644 --- a/substrate/network/src/message.rs +++ b/substrate/network/src/message.rs @@ -247,6 +247,10 @@ pub enum Message { CandidateResponse(CandidateResponse), /// BFT Consensus statement. BftMessage(LocalizedBftMessage), + /// Remote method call request. + RemoteCallRequest(RemoteCallRequest), + /// Remote method call response. + RemoteCallResponse(RemoteCallResponse), } #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] @@ -319,3 +323,27 @@ pub struct BlockAnnounce { /// New block header. pub header: Header, } + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +/// Remote call request. +pub struct RemoteCallRequest { + /// Unique request id. + pub id: RequestId, + /// Block at which to perform call. + pub block: HeaderHash, + /// Method name. + pub method: String, + /// Call data. + pub data: Vec, +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +/// Remote call response. +pub struct RemoteCallResponse { + /// Id of a request this response was made for. + pub id: RequestId, + /// Method return value. + pub value: Vec, + /// Execution proof. + pub proof: Vec>, +} diff --git a/substrate/network/src/on_demand.rs b/substrate/network/src/on_demand.rs new file mode 100644 index 0000000000000..76180f616d0bc --- /dev/null +++ b/substrate/network/src/on_demand.rs @@ -0,0 +1,423 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see .? + +//! On-demand requests service. + +use std::collections::VecDeque; +use std::sync::{Arc, Weak}; +use std::time::{Instant, Duration}; +use futures::{Future, Poll}; +use futures::sync::oneshot::{channel, Receiver, Sender}; +use linked_hash_map::LinkedHashMap; +use linked_hash_map::Entry; +use parking_lot::Mutex; +use client; +use client::light::{Fetcher, FetchChecker, RemoteCallRequest}; +use io::SyncIo; +use message; +use network::PeerId; +use service; + +/// Remote request timeout. +const REQUEST_TIMEOUT: Duration = Duration::from_secs(15); + +/// On-demand service API. +pub trait OnDemandService: Send + Sync { + /// When new node is connected. + fn on_connect(&self, peer: PeerId, role: service::Role); + + /// When node is disconnected. + fn on_disconnect(&self, peer: PeerId); + + /// Maintain peers requests. + fn maintain_peers(&self, io: &mut SyncIo); + + /// When response is received from remote node. + fn on_remote_response(&self, io: &mut SyncIo, peer: PeerId, response: message::RemoteCallResponse); +} + +/// On-demand requests service. Dispatches requests to appropriate peers. +pub struct OnDemand { + core: Mutex>, + checker: Arc, +} + +/// On-demand response. +pub struct Response { + receiver: Receiver, +} + +#[derive(Default)] +struct OnDemandCore { + service: Weak, + next_request_id: u64, + pending_requests: VecDeque, + active_peers: LinkedHashMap, + idle_peers: VecDeque, +} + +struct Request { + id: u64, + timestamp: Instant, + sender: Sender, + request: RemoteCallRequest, +} + +impl Future for Response { + type Item = client::CallResult; + type Error = client::error::Error; + + fn poll(&mut self) -> Poll { + self.receiver.poll() + .map_err(|_| client::error::ErrorKind::RemoteFetchCancelled.into()) + } +} + +impl OnDemand where E: service::ExecuteInContext { + /// Creates new on-demand service. + pub fn new(checker: Arc) -> Self { + OnDemand { + checker, + core: Mutex::new(OnDemandCore { + service: Weak::new(), + next_request_id: 0, + pending_requests: VecDeque::new(), + active_peers: LinkedHashMap::new(), + idle_peers: VecDeque::new(), + }) + } + } + + /// Sets weak reference to network service. + pub fn set_service_link(&self, service: Weak) { + self.core.lock().service = service; + } + + /// Execute method call on remote node, returning execution result and proof. + pub fn remote_call(&self, request: RemoteCallRequest) -> Response { + let (sender, receiver) = channel(); + let result = Response { + receiver: receiver, + }; + + { + let mut core = self.core.lock(); + core.insert(sender, request); + core.dispatch(); + } + + result + } +} + +impl OnDemandService for OnDemand where E: service::ExecuteInContext { + fn on_connect(&self, peer: PeerId, role: service::Role) { + if !role.intersects(service::Role::FULL | service::Role::COLLATOR | service::Role::VALIDATOR) { // TODO: correct? + return; + } + + let mut core = self.core.lock(); + core.add_peer(peer); + core.dispatch(); + } + + fn on_disconnect(&self, peer: PeerId) { + let mut core = self.core.lock(); + core.remove_peer(peer); + core.dispatch(); + } + + fn maintain_peers(&self, io: &mut SyncIo) { + let mut core = self.core.lock(); + for bad_peer in core.maintain_peers() { + trace!(target: "sync", "Remote request timeout for peer {}", bad_peer); + io.disconnect_peer(bad_peer); + } + core.dispatch(); + } + + fn on_remote_response(&self, io: &mut SyncIo, peer: PeerId, response: message::RemoteCallResponse) { + let mut core = self.core.lock(); + match core.remove(peer, response.id) { + Some(request) => match self.checker.check_execution_proof(&request.request, (response.value, response.proof)) { + Ok(response) => { + // we do not bother if receiver has been dropped already + let _ = request.sender.send(response); + }, + Err(error) => { + trace!(target: "sync", "Failed to check remote response from peer {}: {}", peer, error); + io.disconnect_peer(peer); + core.remove_peer(peer); + core.insert(request.sender, request.request); + }, + }, + None => { + trace!(target: "sync", "Invalid remote response from peer {}", peer); + io.disconnect_peer(peer); + core.remove_peer(peer); + }, + } + + core.dispatch(); + } +} + +impl Fetcher for OnDemand where E: service::ExecuteInContext { + type RemoteCallResult = Response; + + fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult { + self.remote_call(request) + } +} + +impl OnDemandCore where E: service::ExecuteInContext { + pub fn add_peer(&mut self, peer: PeerId) { + self.idle_peers.push_back(peer); + } + + pub fn remove_peer(&mut self, peer: PeerId) { + if let Some(request) = self.active_peers.remove(&peer) { + self.pending_requests.push_front(request); + return; + } + + if let Some(idle_index) = self.idle_peers.iter().position(|i| *i == peer) { + self.idle_peers.swap_remove_back(idle_index); + } + } + + pub fn maintain_peers(&mut self) -> Vec { + let now = Instant::now(); + let mut bad_peers = Vec::new(); + loop { + match self.active_peers.front() { + Some((_, request)) if now - request.timestamp >= REQUEST_TIMEOUT => (), + _ => return bad_peers, + } + + let (bad_peer, request) = self.active_peers.pop_front().expect("front() is Some as checked above"); + self.pending_requests.push_front(request); + bad_peers.push(bad_peer); + } + } + + pub fn insert(&mut self, sender: Sender, request: RemoteCallRequest) { + let request_id = self.next_request_id; + self.next_request_id += 1; + + self.pending_requests.push_back(Request { + id: request_id, + timestamp: Instant::now(), + sender, + request, + }); + } + + pub fn remove(&mut self, peer: PeerId, id: u64) -> Option { + match self.active_peers.entry(peer) { + Entry::Occupied(entry) => match entry.get().id == id { + true => { + self.idle_peers.push_back(peer); + Some(entry.remove()) + }, + false => None, + }, + Entry::Vacant(_) => None, + } + } + + pub fn dispatch(&mut self) { + let service = match self.service.upgrade() { + Some(service) => service, + None => return, + }; + + while !self.pending_requests.is_empty() { + let peer = match self.idle_peers.pop_front() { + Some(peer) => peer, + None => return, + }; + + let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed"); + request.timestamp = Instant::now(); + trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer); + + service.execute_in_context(|ctx, protocol| { + let message = message::RemoteCallRequest { + id: request.id, + block: request.request.block, + method: request.request.method.clone(), + data: request.request.call_data.clone(), + }; + + protocol.send_message(ctx, peer, message::Message::RemoteCallRequest(message)) + }); + self.active_peers.insert(peer, request); + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::VecDeque; + use std::sync::Arc; + use std::time::Instant; + use futures::Future; + use parking_lot::RwLock; + use client; + use client::light::{FetchChecker, RemoteCallRequest}; + use io::NetSyncIo; + use message; + use network::PeerId; + use protocol::Protocol; + use service::{Role, ExecuteInContext}; + use test::TestIo; + use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService}; + + struct DummyExecutor; + struct DummyFetchChecker { ok: bool } + + impl ExecuteInContext for DummyExecutor { + fn execute_in_context(&self, _closure: F) {} + } + + impl FetchChecker for DummyFetchChecker { + fn check_execution_proof(&self, _request: &RemoteCallRequest, remote_proof: (Vec, Vec>)) -> client::error::Result { + match self.ok { + true => Ok(client::CallResult { + return_data: remote_proof.0, + changes: Default::default(), + }), + false => Err(client::error::ErrorKind::Backend("Test error".into()).into()), + } + } + } + + fn dummy(ok: bool) -> (Arc, Arc>) { + let executor = Arc::new(DummyExecutor); + let service = Arc::new(OnDemand::new(Arc::new(DummyFetchChecker { ok }))); + service.set_service_link(Arc::downgrade(&executor)); + (executor, service) + } + + fn total_peers(on_demand: &OnDemand) -> usize { + let core = on_demand.core.lock(); + core.idle_peers.len() + core.active_peers.len() + } + + fn receive_response(on_demand: &OnDemand, network: &mut TestIo, peer: PeerId, id: message::RequestId) { + on_demand.on_remote_response(network, peer, message::RemoteCallResponse { + id: id, + value: vec![1], + proof: vec![vec![2]], + }); + } + + #[test] + fn knows_about_peers_roles() { + let (_, on_demand) = dummy(true); + on_demand.on_connect(0, Role::LIGHT); + on_demand.on_connect(1, Role::FULL); + on_demand.on_connect(2, Role::COLLATOR); + on_demand.on_connect(3, Role::VALIDATOR); + assert_eq!(vec![1, 2, 3], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); + } + + #[test] + fn disconnects_from_idle_peer() { + let (_, on_demand) = dummy(true); + on_demand.on_connect(0, Role::FULL); + assert_eq!(1, total_peers(&*on_demand)); + on_demand.on_disconnect(0); + assert_eq!(0, total_peers(&*on_demand)); + } + + #[test] + fn disconnects_from_timeouted_peer() { + let (_x, on_demand) = dummy(true); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + + on_demand.on_connect(0, Role::FULL); + on_demand.on_connect(1, Role::FULL); + assert_eq!(vec![0, 1], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); + assert!(on_demand.core.lock().active_peers.is_empty()); + + on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] }); + assert_eq!(vec![1], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); + assert_eq!(vec![0], on_demand.core.lock().active_peers.keys().cloned().collect::>()); + + on_demand.core.lock().active_peers[&0].timestamp = Instant::now() - REQUEST_TIMEOUT - REQUEST_TIMEOUT; + on_demand.maintain_peers(&mut network); + assert!(on_demand.core.lock().idle_peers.is_empty()); + assert_eq!(vec![1], on_demand.core.lock().active_peers.keys().cloned().collect::>()); + assert!(network.to_disconnect.contains(&0)); + } + + #[test] + fn disconnects_from_peer_on_response_with_wrong_id() { + let (_x, on_demand) = dummy(true); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + on_demand.on_connect(0, Role::FULL); + + on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] }); + receive_response(&*on_demand, &mut network, 0, 1); + assert!(network.to_disconnect.contains(&0)); + assert_eq!(on_demand.core.lock().pending_requests.len(), 1); + } + + #[test] + fn disconnects_from_peer_on_incorrect_response() { + let (_x, on_demand) = dummy(false); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + on_demand.on_connect(0, Role::FULL); + + on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] }); + receive_response(&*on_demand, &mut network, 0, 0); + assert!(network.to_disconnect.contains(&0)); + assert_eq!(on_demand.core.lock().pending_requests.len(), 1); + } + + #[test] + fn disconnects_from_peer_on_unexpected_response() { + let (_x, on_demand) = dummy(true); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + on_demand.on_connect(0, Role::FULL); + + receive_response(&*on_demand, &mut network, 0, 0); + assert!(network.to_disconnect.contains(&0)); + } + + #[test] + fn receives_remote_response() { + let (_x, on_demand) = dummy(true); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + on_demand.on_connect(0, Role::FULL); + + let response = on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] }); + let thread = ::std::thread::spawn(move || { + let result = response.wait().unwrap(); + assert_eq!(result.return_data, vec![1]); + }); + + receive_response(&*on_demand, &mut network, 0, 0); + thread.join().unwrap(); + } +} diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index b2b87e80ff553..8e9b3ace68902 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -32,6 +32,7 @@ use consensus::Consensus; use service::{Role, TransactionPool, StatementStream, BftMessageStream}; use config::ProtocolConfig; use chain::Client; +use on_demand::OnDemandService; use io::SyncIo; use error; use super::header_hash; @@ -46,6 +47,7 @@ const MAX_BLOCK_DATA_RESPONSE: u32 = 128; pub struct Protocol { config: ProtocolConfig, chain: Arc, + on_demand: Option>, genesis_hash: HeaderHash, sync: RwLock, consensus: Mutex, @@ -112,14 +114,16 @@ pub struct TransactionStats { impl Protocol { /// Create a new instance. - pub fn new(config: ProtocolConfig, chain: Arc, transaction_pool: Arc) -> error::Result { + pub fn new(config: ProtocolConfig, chain: Arc, on_demand: Option>, transaction_pool: Arc) -> error::Result { let info = chain.info()?; let best_hash = info.chain.best_hash; + let sync = ChainSync::new(config.roles, &info); let protocol = Protocol { config: config, chain: chain, + on_demand: on_demand, genesis_hash: info.chain.genesis_hash, - sync: RwLock::new(ChainSync::new(&info)), + sync: RwLock::new(sync), consensus: Mutex::new(Consensus::new(best_hash)), peers: RwLock::new(HashMap::new()), handshaking_peers: RwLock::new(HashMap::new()), @@ -185,6 +189,8 @@ impl Protocol { Message::CandidateResponse(r) => self.on_candidate_response(io, peer_id, r), Message::BftMessage(m) => self.on_bft_message(io, peer_id, m, blake2_256(data).into()), Message::Transactions(m) => self.on_transactions(io, peer_id, m), + Message::RemoteCallRequest(request) => self.on_remote_call_request(io, peer_id, request), + Message::RemoteCallResponse(response) => self.on_remote_call_response(io, peer_id, response) } } @@ -232,6 +238,7 @@ impl Protocol { if removed { self.consensus.lock().peer_disconnected(io, self, peer); self.sync.write().peer_disconnected(io, self, peer); + self.on_demand.as_ref().map(|s| s.on_disconnect(peer)); } } @@ -345,6 +352,7 @@ impl Protocol { /// Perform time based maintenance. pub fn tick(&self, io: &mut SyncIo) { self.maintain_peers(io); + self.on_demand.as_ref().map(|s| s.maintain_peers(io)); self.consensus.lock().collect_garbage(None); } @@ -388,8 +396,6 @@ impl Protocol { return; } - let mut sync = self.sync.write(); - let mut consensus = self.consensus.lock(); { let mut peers = self.peers.write(); let mut handshaking_peers = self.handshaking_peers.write(); @@ -423,8 +429,10 @@ impl Protocol { handshaking_peers.remove(&peer_id); debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id)); } - sync.new_peer(io, self, peer_id); - consensus.new_peer(io, self, peer_id, &status.roles); + + self.sync.write().new_peer(io, self, peer_id); + self.consensus.lock().new_peer(io, self, peer_id, &status.roles); + self.on_demand.as_ref().map(|s| s.on_connect(peer_id, message::Role::as_flags(&status.roles))); } /// Called when peer sends us new transactions @@ -523,6 +531,27 @@ impl Protocol { self.consensus.lock().collect_garbage(Some((hash, &header))); } + fn on_remote_call_request(&self, io: &mut SyncIo, peer_id: PeerId, request: message::RemoteCallRequest) { + trace!(target: "sync", "Remote request {} from {} ({} at {})", request.id, peer_id, request.method, request.block); + let (value, proof) = match self.chain.execution_proof(&request.block, &request.method, &request.data) { + Ok((value, proof)) => (value, proof), + Err(error) => { + trace!(target: "sync", "Remote request {} from {} ({} at {}) failed with: {}", + request.id, peer_id, request.method, request.block, error); + (Default::default(), Default::default()) + }, + }; + + self.send_message(io, peer_id, message::Message::RemoteCallResponse(message::RemoteCallResponse { + id: request.id, value, proof, + })); + } + + fn on_remote_call_response(&self, io: &mut SyncIo, peer_id: PeerId, response: message::RemoteCallResponse) { + trace!(target: "sync", "Remote response {} from {}", response.id, peer_id); + self.on_demand.as_ref().map(|s| s.on_remote_response(io, peer_id, response)); + } + pub fn transactions_stats(&self) -> BTreeMap { BTreeMap::new() } diff --git a/substrate/network/src/service.rs b/substrate/network/src/service.rs index f32c88a26339b..c664d0bd06129 100644 --- a/substrate/network/src/service.rs +++ b/substrate/network/src/service.rs @@ -31,6 +31,7 @@ use config::{ProtocolConfig}; use error::Error; use chain::Client; use message::{Statement, LocalizedBftMessage}; +use on_demand::OnDemandService; /// Polkadot devp2p protocol id pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot"; @@ -107,6 +108,12 @@ pub trait ConsensusService: Send + Sync { fn send_bft_message(&self, message: LocalizedBftMessage); } +/// Service able to execute closure in the network context. +pub trait ExecuteInContext: Send + Sync { + /// Execute closure in network context. + fn execute_in_context(&self, closure: F); +} + /// devp2p Protocol handler struct ProtocolHandler { protocol: Protocol, @@ -137,6 +144,8 @@ pub struct Params { pub network_config: NetworkConfiguration, /// Polkadot relay chain access point. pub chain: Arc, + /// On-demand service reference. + pub on_demand: Option>, /// Transaction pool. pub transaction_pool: Arc, } @@ -156,7 +165,7 @@ impl Service { let sync = Arc::new(Service { network: service, handler: Arc::new(ProtocolHandler { - protocol: Protocol::new(params.config, params.chain.clone(), params.transaction_pool)?, + protocol: Protocol::new(params.config, params.chain, params.on_demand, params.transaction_pool)?, }), }); @@ -200,6 +209,14 @@ impl Drop for Service { } } +impl ExecuteInContext for Service { + fn execute_in_context(&self, closure: F) { + self.network.with_context(DOT_PROTOCOL_ID, |context| { + closure(&mut NetSyncIo::new(context), &self.handler.protocol) + }); + } +} + impl SyncProvider for Service { /// Get sync status fn status(&self) -> ProtocolStatus { diff --git a/substrate/network/src/sync.rs b/substrate/network/src/sync.rs index 9cef0cd021631..4bf6891aa2d70 100644 --- a/substrate/network/src/sync.rs +++ b/substrate/network/src/sync.rs @@ -22,6 +22,7 @@ use client::{ImportResult, BlockStatus, ClientInfo}; use primitives::block::{HeaderHash, Number as BlockNumber, Header, Id as BlockId}; use blocks::{self, BlockCollection}; use message::{self, Message}; +use service::Role; use super::header_hash; // Maximum blocks to request in a single packet. @@ -37,10 +38,10 @@ struct PeerSync { #[derive(Copy, Clone, Eq, PartialEq, Debug)] enum PeerSyncState { - AncestorSearch(BlockNumber), - Available, - DownloadingNew(BlockNumber), - DownloadingStale(HeaderHash), + AncestorSearch(BlockNumber), + Available, + DownloadingNew(BlockNumber), + DownloadingStale(HeaderHash), } /// Relay chain sync strategy. @@ -73,14 +74,22 @@ pub struct Status { impl ChainSync { /// Create a new instance. - pub fn new(info: &ClientInfo) -> ChainSync { + pub fn new(role: Role, info: &ClientInfo) -> ChainSync { + let mut required_block_attributes = vec![ + message::BlockAttribute::Header, + message::BlockAttribute::Justification + ]; + if role.intersects(Role::FULL | Role::VALIDATOR | Role::COLLATOR) { + required_block_attributes.push(message::BlockAttribute::Body); + } + ChainSync { genesis_hash: info.chain.genesis_hash, peers: HashMap::new(), blocks: BlockCollection::new(), best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash), best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number), - required_block_attributes: vec![message::BlockAttribute::Header, message::BlockAttribute::Body, message::BlockAttribute::Justification], + required_block_attributes: required_block_attributes, } } diff --git a/substrate/network/src/test/mod.rs b/substrate/network/src/test/mod.rs index a62bf5177ca0f..3699802daf9e4 100644 --- a/substrate/network/src/test/mod.rs +++ b/substrate/network/src/test/mod.rs @@ -227,7 +227,7 @@ impl TestNet { for _ in 0..n { let client = Arc::new(test_client::new()); let tx_pool = Arc::new(EmptyTransactionPool); - let sync = Protocol::new(config.clone(), client.clone(), tx_pool).unwrap(); + let sync = Protocol::new(config.clone(), client.clone(), None, tx_pool).unwrap(); net.peers.push(Arc::new(Peer { sync: sync, client: client, diff --git a/substrate/rpc/src/chain/mod.rs b/substrate/rpc/src/chain/mod.rs index c5c179837c3ed..f6e767ef66270 100644 --- a/substrate/rpc/src/chain/mod.rs +++ b/substrate/rpc/src/chain/mod.rs @@ -81,7 +81,7 @@ impl Chain { impl ChainApi for Chain where B: client::backend::Backend + Send + Sync + 'static, - E: state_machine::CodeExecutor + Send + Sync + 'static, + E: client::CallExecutor + Send + Sync + 'static, client::error::Error: From<<::State as state_machine::backend::Backend>::Error>, { type Metadata = ::metadata::Metadata; diff --git a/substrate/rpc/src/chain/tests.rs b/substrate/rpc/src/chain/tests.rs index 6d44ba7a3f708..0bfb29fd5d5c5 100644 --- a/substrate/rpc/src/chain/tests.rs +++ b/substrate/rpc/src/chain/tests.rs @@ -28,13 +28,12 @@ fn should_return_header() { client: Arc::new(test_client::new()), subscriptions: Subscriptions::new(remote), }; - assert_matches!( client.header(client.client.genesis_hash()), Ok(Some(ref x)) if x == &block::Header { parent_hash: 0.into(), number: 0, - state_root: "6da331d07a82d99f4debaafb0110a2e36244ed34162f9a7f6312a23fd52989ed".into(), + state_root: "0c81ab6cfac8c8d7201d78cb699b6b79d714462a4ba00abcacce22444babe315".into(), extrinsics_root: "56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421".into(), digest: Default::default(), } @@ -70,7 +69,7 @@ fn should_notify_about_latest_block() { // assert notification send to transport let (notification, next) = core.run(transport.into_future()).unwrap(); assert_eq!(notification, Some( - r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"digest":{"logs":[]},"extrinsicsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","number":1,"parentHash":"0x4c4ab196ed07bbd5b8c901ae5092d9d3990cbb4d44421af8e988af7d3c2a4226","stateRoot":"0x75b634da2a0d272e8a5145ab704406d3b50676c7739f977f2ccb2d0e5a0cdbd0"},"subscription":0}}"#.to_owned() + r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"digest":{"logs":[]},"extrinsicsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","number":1,"parentHash":"0x72ae67388233893fb4594f13df56d4e654aa8721763bcd0bd4e187fee7b2f349","stateRoot":"0x2e1f2f1c53ffb1767fe1abf4fe5953cc87c7650d4af2d4393d1f72324f2cc5d7"},"subscription":0}}"#.to_owned() )); // no more notifications on this channel assert_eq!(core.run(next.into_future()).unwrap().0, None); diff --git a/substrate/rpc/src/state/mod.rs b/substrate/rpc/src/state/mod.rs index 29d372aa8f022..9fa41d38a14bb 100644 --- a/substrate/rpc/src/state/mod.rs +++ b/substrate/rpc/src/state/mod.rs @@ -22,7 +22,7 @@ mod error; mod tests; use std::sync::Arc; -use client::{self, Client}; +use client::{self, Client, CallExecutor}; use primitives::{block, Hash, blake2_256}; use primitives::storage::{StorageKey, StorageData}; use primitives::hexdisplay::HexDisplay; @@ -69,7 +69,7 @@ build_rpc_trait! { impl StateApi for Arc> where B: client::backend::Backend + Send + Sync + 'static, - E: state_machine::CodeExecutor + Send + Sync + 'static, + E: CallExecutor + Send + Sync + 'static, client::error::Error: From<<::State as state_machine::backend::Backend>::Error>, { fn storage_at(&self, key: StorageKey, block: block::HeaderHash) -> Result { @@ -79,7 +79,7 @@ impl StateApi for Arc> where fn call_at(&self, method: String, data: Vec, block: block::HeaderHash) -> Result> { trace!(target: "rpc", "Calling runtime at {:?} for method {} ({})", block, method, HexDisplay::from(&data)); - Ok(self.as_ref().call(&block::Id::Hash(block), &method, &data)?.return_data) + Ok(self.as_ref().executor().call(&block::Id::Hash(block), &method, &data)?.return_data) } fn storage_hash_at(&self, key: StorageKey, block: block::HeaderHash) -> Result { diff --git a/substrate/test-client/src/client_ext.rs b/substrate/test-client/src/client_ext.rs index 3af87c4db4fcc..071a889d1efd8 100644 --- a/substrate/test-client/src/client_ext.rs +++ b/substrate/test-client/src/client_ext.rs @@ -39,7 +39,7 @@ pub trait TestClient { impl TestClient for Client { fn new_for_tests() -> Self { - client::new_in_mem(NativeExecutor::new(), prepare_genesis).unwrap() + client::new_in_mem(NativeExecutor::new(), GenesisBuilder).unwrap() } fn justify_and_import(&self, origin: client::BlockOrigin, block: block::Block) -> client::error::Result<()> { @@ -95,14 +95,18 @@ fn genesis_config() -> GenesisConfig { ], 1000) } -fn prepare_genesis() -> (block::Header, Vec<(Vec, Vec)>) { - let mut storage = genesis_config().genesis_map(); - let block = client::genesis::construct_genesis_block(&storage); - storage.extend(additional_storage_with_genesis(&block)); +struct GenesisBuilder; - ( - block::Header::decode(&mut block.header.encode().as_ref()) - .expect("to_vec() always gives a valid serialisation; qed"), - storage.into_iter().collect() - ) +impl client::GenesisBuilder for GenesisBuilder { + fn build(self) -> (block::Header, Vec<(Vec, Vec)>) { + let mut storage = genesis_config().genesis_map(); + let block = client::genesis::construct_genesis_block(&storage); + storage.extend(additional_storage_with_genesis(&block)); + + ( + block::Header::decode(&mut block.header.encode().as_ref()) + .expect("to_vec() always gives a valid serialisation; qed"), + storage.into_iter().collect() + ) + } } diff --git a/substrate/test-client/src/lib.rs b/substrate/test-client/src/lib.rs index 123a8a6398699..dacf41e851993 100644 --- a/substrate/test-client/src/lib.rs +++ b/substrate/test-client/src/lib.rs @@ -46,7 +46,7 @@ pub use self::native_executor::NativeExecutor; pub type Backend = client::in_mem::Backend; /// Test client executor. -pub type Executor = executor::NativeExecutor; +pub type Executor = client::LocalCallExecutor>; /// Creates new client instance used for tests. pub fn new() -> client::Client { diff --git a/substrate/test-runtime/src/lib.rs b/substrate/test-runtime/src/lib.rs index fb09970820225..d264417b547ca 100644 --- a/substrate/test-runtime/src/lib.rs +++ b/substrate/test-runtime/src/lib.rs @@ -68,6 +68,7 @@ pub mod api { use system; impl_stubs!( + authorities => |()| system::authorities(), execute_block => |block| system::execute_block(block), execute_transaction => |(header, utx)| system::execute_transaction(utx, header), finalise_block => |header| system::finalise_block(header) diff --git a/substrate/test-runtime/src/system.rs b/substrate/test-runtime/src/system.rs index eaa741fa3387e..7f57cbc28f6b2 100644 --- a/substrate/test-runtime/src/system.rs +++ b/substrate/test-runtime/src/system.rs @@ -18,6 +18,7 @@ //! and depositing logs. use rstd::prelude::*; +use primitives::AuthorityId; use runtime_io::{storage_root, enumerated_trie_root, ed25519_verify}; use runtime_support::{Hashable, storage}; use codec::{KeyedVec, Slicable}; @@ -26,6 +27,8 @@ use super::{AccountId, UncheckedTransaction, H256 as Hash, Block, Header}; const NONCE_OF: &[u8] = b"nonce:"; const BALANCE_OF: &[u8] = b"balance:"; const LATEST_BLOCK_HASH: &[u8] = b"latest"; +const AUTHORITY_AT: &'static[u8] = b":auth:"; +const AUTHORITY_COUNT: &'static[u8] = b":auth:len"; pub fn latest_block_hash() -> Hash { storage::get(LATEST_BLOCK_HASH).expect("There must always be a latest block") @@ -39,6 +42,14 @@ pub fn nonce_of(who: AccountId) -> u64 { storage::get_or(&who.to_keyed_vec(NONCE_OF), 0) } +/// Get authorities ar given block. +pub fn authorities() -> Vec { + let len: u32 = storage::unhashed::get(AUTHORITY_COUNT).expect("There are always authorities in test-runtime"); + (0..len) + .map(|i| storage::unhashed::get(&i.to_keyed_vec(AUTHORITY_AT)).expect("Authority is properly encoded in test-runtime")) + .collect() +} + /// Actually execute all transitioning for `block`. pub fn execute_block(block: Block) { let ref header = block.header; diff --git a/substrate/test-runtime/wasm/genesis.wasm b/substrate/test-runtime/wasm/genesis.wasm index 7daba8a15c586..c182bcf42f3e9 100644 Binary files a/substrate/test-runtime/wasm/genesis.wasm and b/substrate/test-runtime/wasm/genesis.wasm differ diff --git a/substrate/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm b/substrate/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm index d8d0d5eb193bf..c182bcf42f3e9 100644 Binary files a/substrate/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm and b/substrate/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm differ diff --git a/substrate/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.wasm b/substrate/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.wasm index 222585017ca5b..7ef117e9c5db8 100755 Binary files a/substrate/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.wasm and b/substrate/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.wasm differ