diff --git a/Cargo.lock b/Cargo.lock index a345a467c695..35c9415c5a98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4415,6 +4415,32 @@ dependencies = [ "sp-runtime", ] +[[package]] +name = "polkadot-node-core-proposer" +version = "0.1.0" +dependencies = [ + "futures 0.3.5", + "futures-timer 3.0.2", + "log 0.4.8", + "parity-scale-codec", + "polkadot-node-subsystem", + "polkadot-overseer", + "polkadot-primitives", + "sc-basic-authorship", + "sc-block-builder", + "sc-client-api", + "sc-telemetry", + "sp-api", + "sp-blockchain", + "sp-consensus", + "sp-core", + "sp-inherents", + "sp-runtime", + "sp-transaction-pool", + "tokio-executor 0.2.0-alpha.6", + "wasm-timer", +] + [[package]] name = "polkadot-node-primitives" version = "0.1.0" @@ -4762,6 +4788,7 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.9.0", "polkadot-network", + "polkadot-node-core-proposer", "polkadot-node-subsystem", "polkadot-overseer", "polkadot-primitives", @@ -4769,7 +4796,6 @@ dependencies = [ "polkadot-runtime", "polkadot-test-runtime-client", "sc-authority-discovery", - "sc-basic-authorship", "sc-block-builder", "sc-chain-spec", "sc-client-api", diff --git a/Cargo.toml b/Cargo.toml index 2723f0fa7901..5bf037cfc816 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ members = [ "service", "validation", + "node/core/proposer", "node/network/bridge", "node/overseer", "node/primitives", diff --git a/node/core/proposer/Cargo.toml b/node/core/proposer/Cargo.toml new file mode 100644 index 000000000000..3723b80bfea4 --- /dev/null +++ b/node/core/proposer/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "polkadot-node-core-proposer" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +futures = "0.3.4" +futures-timer = "3.0.1" +log = "0.4.8" +parity-scale-codec = "1.3.0" +polkadot-node-subsystem = { path = "../../subsystem" } +polkadot-overseer = { path = "../../overseer" } +polkadot-primitives = { path = "../../../primitives" } +sc-basic-authorship = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-block-builder = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-telemetry = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "master" } +tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] } +wasm-timer = "0.2.4" diff --git a/node/core/proposer/src/lib.rs b/node/core/proposer/src/lib.rs new file mode 100644 index 000000000000..b53ac5729fa9 --- /dev/null +++ b/node/core/proposer/src/lib.rs @@ -0,0 +1,262 @@ +use futures::prelude::*; +use futures::select; +use polkadot_node_subsystem::{messages::{AllMessages, ProvisionerInherentData, ProvisionerMessage}, SubsystemError}; +use polkadot_overseer::OverseerHandler; +use polkadot_primitives::{ + inclusion_inherent, + parachain::ParachainHost, + Block, Hash, Header, +}; +use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider}; +use sp_api::{ApiExt, ProvideRuntimeApi}; +use sp_blockchain::HeaderBackend; +use sp_consensus::{Proposal, RecordProof}; +use sp_inherents::InherentData; +use sp_runtime::traits::{DigestFor, HashFor}; +use sp_transaction_pool::TransactionPool; +use std::{fmt, pin::Pin, sync::Arc, time}; + +/// How long proposal can take before we give up and err out +const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_secs(2); + +/// Custom Proposer factory for Polkadot +pub struct ProposerFactory { + inner: sc_basic_authorship::ProposerFactory, + overseer: OverseerHandler, +} + +impl ProposerFactory { + pub fn new( + client: Arc, + transaction_pool: Arc, + overseer: OverseerHandler, + ) -> Self { + ProposerFactory { + inner: sc_basic_authorship::ProposerFactory::new( + client, + transaction_pool, + None, + ), + overseer, + } + } +} + +impl sp_consensus::Environment + for ProposerFactory +where + TxPool: 'static + TransactionPool, + Client: 'static + + BlockBuilderProvider + + ProvideRuntimeApi + + HeaderBackend + + Send + + Sync, + Client::Api: + ParachainHost + BlockBuilderApi + ApiExt, + Backend: + 'static + sc_client_api::Backend>, + // Rust bug: https://github.com/rust-lang/rust/issues/24159 + sp_api::StateBackendFor: sp_api::StateBackend> + Send, +{ + type CreateProposer = Pin> + Send + 'static, + >>; + type Proposer = Proposer; + type Error = Error; + + fn init(&mut self, parent_header: &Header) -> Self::CreateProposer { + // create the inner proposer + let proposer = self.inner.init(parent_header).into_inner(); + + // data to be moved into the future + let overseer = self.overseer.clone(); + let parent_header_hash = parent_header.hash(); + + async move { + Ok(Proposer { + inner: proposer?, + overseer, + parent_header_hash, + }) + }.boxed() + } +} + +/// Custom Proposer for Polkadot. +/// +/// This proposer gets the ProvisionerInherentData and injects it into the wrapped +/// proposer's inherent data, then delegates the actual proposal generation. +pub struct Proposer, Backend, Client> { + inner: sc_basic_authorship::Proposer, + overseer: OverseerHandler, + parent_header_hash: Hash, +} + +// This impl has the same generic bounds as the Proposer impl. +impl Proposer +where + TxPool: 'static + TransactionPool, + Client: 'static + + BlockBuilderProvider + + ProvideRuntimeApi + + HeaderBackend + + Send + + Sync, + Client::Api: + ParachainHost + BlockBuilderApi + ApiExt, + Backend: + 'static + sc_client_api::Backend>, + // Rust bug: https://github.com/rust-lang/rust/issues/24159 + sp_api::StateBackendFor: sp_api::StateBackend> + Send, +{ + /// Get provisioner inherent data + /// + /// This function has a constant timeout: `PROPOSE_TIMEOUT`. + fn get_provisioner_data(&self) -> impl Future> { + // clone this (lightweight) data because we're going to move it into the future + let mut overseer = self.overseer.clone(); + let parent_header_hash = self.parent_header_hash.clone(); + + let mut provisioner_inherent_data = async move { + let (sender, receiver) = futures::channel::oneshot::channel(); + + // strictly speaking, we don't _have_ to .await this send_msg before opening the + // receiver; it's possible that the response there would be ready slightly before + // this call completes. IMO it's not worth the hassle or overhead of spawning a + // distinct task for that kind of miniscule efficiency improvement. + overseer.send_msg(AllMessages::Provisioner( + ProvisionerMessage::RequestInherentData(parent_header_hash, sender), + )).await?; + + receiver.await.map_err(Error::ClosedChannelFromProvisioner) + } + .boxed() + .fuse(); + + let mut timeout = wasm_timer::Delay::new(PROPOSE_TIMEOUT).fuse(); + + async move { + select! { + pid = provisioner_inherent_data => pid, + _ = timeout => Err(Error::Timeout), + } + } + } +} + +impl sp_consensus::Proposer for Proposer +where + TxPool: 'static + TransactionPool, + Client: 'static + + BlockBuilderProvider + + ProvideRuntimeApi + + HeaderBackend + + Send + + Sync, + Client::Api: + ParachainHost + BlockBuilderApi + ApiExt, + Backend: + 'static + sc_client_api::Backend>, + // Rust bug: https://github.com/rust-lang/rust/issues/24159 + sp_api::StateBackendFor: sp_api::StateBackend> + Send, +{ + type Transaction = sc_client_api::TransactionFor; + type Proposal = Pin>, Error>> + Send, + >>; + type Error = Error; + + fn propose( + self, + mut inherent_data: InherentData, + inherent_digests: DigestFor, + max_duration: time::Duration, + record_proof: RecordProof, + ) -> Self::Proposal { + let provisioner_data = self.get_provisioner_data(); + + async move { + let provisioner_data = match provisioner_data.await { + Ok(pd) => pd, + Err(err) => { + log::warn!("could not get provisioner inherent data; injecting default data: {}", err); + Default::default() + } + }; + + inherent_data.put_data( + inclusion_inherent::INHERENT_IDENTIFIER, + &provisioner_data, + )?; + + self.inner + .propose(inherent_data, inherent_digests, max_duration, record_proof) + .await + .map_err(Into::into) + } + .boxed() + } +} + +// It would have been more ergonomic to use thiserror to derive the +// From implementations, Display, and std::error::Error, but unfortunately +// two of the wrapped errors (sp_inherents::Error, SubsystemError) also +// don't impl std::error::Error, which breaks the thiserror derive. +#[derive(Debug)] +pub enum Error { + Consensus(sp_consensus::Error), + Blockchain(sp_blockchain::Error), + Inherent(sp_inherents::Error), + Timeout, + ClosedChannelFromProvisioner(futures::channel::oneshot::Canceled), + Subsystem(SubsystemError) +} + +impl From for Error { + fn from(e: sp_consensus::Error) -> Error { + Error::Consensus(e) + } +} + +impl From for Error { + fn from(e: sp_blockchain::Error) -> Error { + Error::Blockchain(e) + } +} + +impl From for Error { + fn from(e: sp_inherents::Error) -> Error { + Error::Inherent(e) + } +} + +impl From for Error { + fn from(e: SubsystemError) -> Error { + Error::Subsystem(e) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Consensus(err) => write!(f, "consensus error: {}", err), + Self::Blockchain(err) => write!(f, "blockchain error: {}", err), + Self::Inherent(err) => write!(f, "inherent error: {:?}", err), + Self::Timeout => write!(f, "timeout: provisioner did not return inherent data after {:?}", PROPOSE_TIMEOUT), + Self::ClosedChannelFromProvisioner(err) => write!(f, "provisioner closed inherent data channel before sending: {}", err), + Self::Subsystem(err) => write!(f, "subsystem error: {:?}", err), + } + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Consensus(err) => Some(err), + Self::Blockchain(err) => Some(err), + Self::ClosedChannelFromProvisioner(err) => Some(err), + _ => None + } + } +} diff --git a/node/service/Cargo.toml b/node/service/Cargo.toml index 1036cff494a2..a9552b56ad2b 100644 --- a/node/service/Cargo.toml +++ b/node/service/Cargo.toml @@ -20,6 +20,7 @@ kusama-runtime = { path = "../../runtime/kusama" } westend-runtime = { path = "../../runtime/westend" } polkadot-network = { path = "../../network", optional = true } polkadot-rpc = { path = "../../rpc" } +polkadot-node-core-proposer = { path = "../core/proposer" } sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } @@ -57,7 +58,6 @@ sp-session = { git = "https://github.com/paritytech/substrate", branch = "master sp-offchain = { package = "sp-offchain", git = "https://github.com/paritytech/substrate", branch = "master" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", git = "https://github.com/paritytech/substrate", branch = "master" } frame-benchmarking = { git = "https://github.com/paritytech/substrate", branch = "master" } -sc-basic-authorship = { git = "https://github.com/paritytech/substrate", branch = "master" } [dev-dependencies] polkadot-test-runtime-client = { path = "../../runtime/test-runtime/client" } diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 69cdc1b22915..4ea8ddd50b5c 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -34,6 +34,7 @@ use polkadot_subsystem::{ Subsystem, SubsystemContext, SpawnedSubsystem, messages::{CandidateValidationMessage, CandidateBackingMessage}, }; +use polkadot_node_core_proposer::ProposerFactory; use sp_trie::PrefixedMemoryDB; pub use service::{ Role, PruningMode, TransactionPoolOptions, Error, RuntimeGenesis, @@ -362,6 +363,7 @@ macro_rules! new_full { .collect(); let (overseer, handler) = real_overseer(leaves, spawner)?; + let handler_clone = handler.clone(); task_manager.spawn_essential_handle().spawn_blocking("overseer", Box::pin(async move { use futures::{pin_mut, select, FutureExt}; @@ -388,11 +390,10 @@ macro_rules! new_full { let can_author_with = consensus_common::CanAuthorWithNativeVersion::new(client.executor().clone()); - // TODO: custom proposer (https://github.com/paritytech/polkadot/issues/1248) - let proposer = sc_basic_authorship::ProposerFactory::new( + let proposer = ProposerFactory::new( client.clone(), transaction_pool, - None, + handler_clone, ); let babe_config = babe::BabeParams { diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index c22581349078..b8732d5aa5ae 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -28,7 +28,7 @@ use sc_network::{ObservedRole, ReputationChange, PeerId}; use polkadot_primitives::{BlockNumber, Hash, Signature}; use polkadot_primitives::parachain::{ AbridgedCandidateReceipt, PoVBlock, ErasureChunk, BackedCandidate, Id as ParaId, - SignedAvailabilityBitfield, SigningContext, ValidatorId, ValidationCode, ValidatorIndex, + SignedAvailabilityBitfield, SignedAvailabilityBitfields, SigningContext, ValidatorId, ValidationCode, ValidatorIndex, }; use polkadot_node_primitives::{ MisbehaviorReport, SignedFullStatement, View, ProtocolId, @@ -190,6 +190,11 @@ pub enum ProvisionableData { Dispute(Hash, Signature), } +/// This data needs to make its way from the provisioner into the InherentData. +/// +/// There, it is used to construct the InclusionInherent. +pub type ProvisionerInherentData = (SignedAvailabilityBitfields, Vec); + /// Message to the Provisioner. /// /// In all cases, the Hash is that of the relay parent. @@ -198,6 +203,12 @@ pub enum ProvisionerMessage { /// This message allows potential block authors to be kept updated with all new authorship data /// as it becomes available. RequestBlockAuthorshipData(Hash, mpsc::Sender), + /// This message allows external subsystems to request the set of bitfields and backed candidates + /// associated with a particular potential block hash. + /// + /// This is expected to be used by a proposer, to inject that information into the InherentData + /// where it can be assembled into the InclusionInherent. + RequestInherentData(Hash, oneshot::Sender), /// This data should become part of a relay chain block ProvisionableData(ProvisionableData), } diff --git a/primitives/src/parachain.rs b/primitives/src/parachain.rs index 33a0f0363d46..03448a29fa70 100644 --- a/primitives/src/parachain.rs +++ b/primitives/src/parachain.rs @@ -724,7 +724,7 @@ impl From> for AvailabilityBitfield { pub type SignedAvailabilityBitfield = Signed; /// A set of signed availability bitfields. Should be sorted by validator index, ascending. -#[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)] +#[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug, Default)] pub struct SignedAvailabilityBitfields(pub Vec); impl From> for SignedAvailabilityBitfields { diff --git a/roadmap/implementors-guide/src/node/utility/provisioner.md b/roadmap/implementors-guide/src/node/utility/provisioner.md index 33fb394f1b19..f646e738c467 100644 --- a/roadmap/implementors-guide/src/node/utility/provisioner.md +++ b/roadmap/implementors-guide/src/node/utility/provisioner.md @@ -38,6 +38,12 @@ At initialization, this subsystem has no outputs. Block authors can send a `Prov Note that block authors must re-send a `ProvisionerMessage::RequestBlockAuthorshipData` for each relay parent they are interested in receiving provisionable data for. +## Block Production + +When a validator is selected by BABE to author a block, it becomes a block producer. The provisioner is the subsystem best suited to choosing which specific backed candidates and availability bitfields should be assembled into the block. To engage this functionality, a `ProvisionerMessage::RequestInherentData` is sent; the response is a set of non-conflicting candidates and the appropriate bitfields. Non-conflicting generally means that there are never two distinct parachain candidates included for the same parachain. + +One might ask: given `ProvisionerMessage::RequestInherentData`, what's the point of `ProvisionerMessage::RequestBlockAuthorshipData`? The answer is that the block authorship data includes more information than is present in the inherent data; disputes, for example. + ## Functionality The subsystem should maintain a set of handles to Block Authorship Provisioning Jobs that are currently live. diff --git a/roadmap/implementors-guide/src/types/overseer-protocol.md b/roadmap/implementors-guide/src/types/overseer-protocol.md index 7b211bd14f6c..7908def9409c 100644 --- a/roadmap/implementors-guide/src/types/overseer-protocol.md +++ b/roadmap/implementors-guide/src/types/overseer-protocol.md @@ -201,6 +201,11 @@ enum ProvisionableData { Dispute(Hash, Signature), } +/// This data needs to make its way from the provisioner into the InherentData. +/// +/// There, it is used to construct the InclusionInherent. +type ProvisionerInherentData = (SignedAvailabilityBitfields, Vec); + /// Message to the Provisioner. /// /// In all cases, the Hash is that of the relay parent. @@ -208,6 +213,12 @@ enum ProvisionerMessage { /// This message allows potential block authors to be kept updated with all new authorship data /// as it becomes available. RequestBlockAuthorshipData(Hash, Sender), + /// This message allows external subsystems to request the set of bitfields and backed candidates + /// associated with a particular potential block hash. + /// + /// This is expected to be used by a proposer, to inject that information into the InherentData + /// where it can be assembled into the InclusionInherent. + RequestInherentData(Hash, oneshot::Sender), /// This data should become part of a relay chain block ProvisionableData(ProvisionableData), }