diff --git a/Cargo.lock b/Cargo.lock index d5eeea2ab489..a717b6dc8d68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6457,7 +6457,6 @@ dependencies = [ "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem-types", - "polkadot-overseer-all-subsystems-gen", "polkadot-overseer-gen", "polkadot-primitives", "sc-client-api", @@ -6466,16 +6465,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "polkadot-overseer-all-subsystems-gen" -version = "0.9.9" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "trybuild", -] - [[package]] name = "polkadot-overseer-gen" version = "0.9.9" @@ -6768,6 +6757,7 @@ dependencies = [ "kvdb", "kvdb-rocksdb", "log", + "lru", "pallet-babe", "pallet-im-online", "pallet-mmr-primitives", diff --git a/Cargo.toml b/Cargo.toml index 2b5cc259d429..74c9205a836e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,7 +72,6 @@ members = [ "node/overseer", "node/overseer/overseer-gen", "node/overseer/overseer-gen/proc-macro", - "node/overseer/all-subsystems-gen", "node/malus", "node/primitives", "node/service", diff --git a/cli/src/lib.rs b/cli/src/lib.rs index b7b45ed66a56..c07722d97540 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -29,7 +29,7 @@ pub use service::RuntimeApiCollection; pub use service::{self, Block, CoreApi, IdentifyVariant, ProvideRuntimeApi, TFullClient}; #[cfg(feature = "malus")] -pub use service::create_default_subsystems; +pub use service::overseer::prepared_overseer_builder; #[cfg(feature = "cli")] pub use cli::*; diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index 1a3429f7ab11..dfe08afe1a70 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -150,7 +150,7 @@ impl Artifacts { /// Inform the table about the artifact with the given ID. The state will be set to "preparing". /// - /// This function must be used only for brand new artifacts and should never be used for + /// This function must be used only for brand-new artifacts and should never be used for /// replacing existing ones. pub fn insert_preparing(&mut self, artifact_id: ArtifactId) { // See the precondition. @@ -159,7 +159,7 @@ impl Artifacts { /// Insert an artifact with the given ID as "prepared". /// - /// This function must be used only for brand new artifacts and should never be used for + /// This function must be used only for brand-new artifacts and should never be used for /// replacing existing ones. #[cfg(test)] pub fn insert_prepared(&mut self, artifact_id: ArtifactId, last_time_needed: SystemTime) { diff --git a/node/malus/src/variant-a.rs b/node/malus/src/variant-a.rs index a545bd360faf..ed395a9d884e 100644 --- a/node/malus/src/variant-a.rs +++ b/node/malus/src/variant-a.rs @@ -24,10 +24,10 @@ use color_eyre::eyre; use polkadot_cli::{ - create_default_subsystems, + prepared_overseer_builder, service::{ - AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, Overseer, - OverseerGen, OverseerGenArgs, ParachainHost, ProvideRuntimeApi, SpawnNamed, + AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, OverseerGen, + OverseerGenArgs, ParachainHost, ProvideRuntimeApi, SpawnNamed, }, Cli, }; @@ -37,7 +37,7 @@ use polkadot_cli::{ use polkadot_node_core_candidate_validation::CandidateValidationSubsystem; use polkadot_node_subsystem::{ messages::{AllMessages, CandidateValidationMessage}, - overseer::{self, OverseerHandle}, + overseer::{self, Overseer, OverseerConnector, OverseerHandle}, FromOverseer, }; @@ -86,6 +86,7 @@ struct BehaveMaleficient; impl OverseerGen for BehaveMaleficient { fn generate<'a, Spawner, RuntimeClient>( &self, + connector: OverseerConnector, args: OverseerGenArgs<'a, Spawner, RuntimeClient>, ) -> Result<(Overseer>, OverseerHandle), Error> where @@ -93,15 +94,10 @@ impl OverseerGen for BehaveMaleficient { RuntimeClient::Api: ParachainHost + BabeApi + AuthorityDiscoveryApi, Spawner: 'static + SpawnNamed + Clone + Unpin, { - let spawner = args.spawner.clone(); - let leaves = args.leaves.clone(); - let runtime_client = args.runtime_client.clone(); - let registry = args.registry.clone(); let candidate_validation_config = args.candidate_validation_config.clone(); - // modify the subsystem(s) as needed: - let all_subsystems = create_default_subsystems(args)?.replace_candidate_validation( - // create the filtered subsystem - |orig: CandidateValidationSubsystem| { + + prepared_overseer_builder(args)? + .replace_candidate_validation(|orig: CandidateValidationSubsystem| { InterceptedSubsystem::new( CandidateValidationSubsystem::with_config( candidate_validation_config, @@ -110,10 +106,8 @@ impl OverseerGen for BehaveMaleficient { ), Skippy::default(), ) - }, - ); - - Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner) + }) + .build_with_connector(connector) .map_err(|e| e.into()) } } diff --git a/node/network/availability-distribution/src/requester/session_cache.rs b/node/network/availability-distribution/src/requester/session_cache.rs index 25b53d25aaa7..5b87d1dcf9ce 100644 --- a/node/network/availability-distribution/src/requester/session_cache.rs +++ b/node/network/availability-distribution/src/requester/session_cache.rs @@ -56,7 +56,7 @@ pub struct SessionInfo { /// validators. pub validator_groups: Vec>, - /// Information about ourself: + /// Information about ourselves: pub our_index: ValidatorIndex, /// Remember to which group we belong, so we won't start fetching chunks for candidates with diff --git a/node/network/availability-recovery/src/futures_undead.rs b/node/network/availability-recovery/src/futures_undead.rs index 9715916590a6..550f41a9b3cf 100644 --- a/node/network/availability-recovery/src/futures_undead.rs +++ b/node/network/availability-recovery/src/futures_undead.rs @@ -18,7 +18,7 @@ //! futures will still get polled, but will not count towards length. So length will only count //! futures, which are still considered live. //! -//! Usecase: If futures take longer than we would like them too, we maybe able to request the data +//! Usecase: If futures take longer than we would like them too, we may be able to request the data //! from somewhere else as well. We don't really want to cancel the old future, because maybe it //! was almost done, thus we would have wasted time with our impatience. By simply making them //! not count towards length, we can make sure to have enough "live" requests ongoing, while at the diff --git a/node/network/dispute-distribution/src/error.rs b/node/network/dispute-distribution/src/error.rs index 7b7d7a64238f..00ac06310e8d 100644 --- a/node/network/dispute-distribution/src/error.rs +++ b/node/network/dispute-distribution/src/error.rs @@ -29,7 +29,7 @@ use crate::{sender, LOG_TARGET}; pub enum Error { /// Fatal errors of dispute distribution. Fatal(Fatal), - /// Non fatal errors of dispute distribution. + /// Non-fatal errors of dispute distribution. NonFatal(NonFatal), } diff --git a/node/network/statement-distribution/src/error.rs b/node/network/statement-distribution/src/error.rs index 4eb28eedc0ed..819440e6f29c 100644 --- a/node/network/statement-distribution/src/error.rs +++ b/node/network/statement-distribution/src/error.rs @@ -39,7 +39,7 @@ pub type FatalResult = std::result::Result; pub enum Error { /// Fatal errors of dispute distribution. Fatal(Fatal), - /// Non fatal errors of dispute distribution. + /// Non-fatal errors of dispute distribution. NonFatal(NonFatal), } diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index ffe54342a13b..b3ce65be25b7 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -105,7 +105,7 @@ const MAX_LARGE_STATEMENTS_PER_SENDER: usize = 20; /// The statement distribution subsystem. pub struct StatementDistribution { - /// Pointer to a keystore, which is required for determining this nodes validator index. + /// Pointer to a keystore, which is required for determining the nodes validator index. keystore: SyncCryptoStorePtr, /// Receiver for incoming large statement requests. req_receiver: Option>, diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index b22d57975aa1..4685c8c0258b 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -16,7 +16,6 @@ polkadot-node-subsystem-types = { path = "../subsystem-types" } polkadot-node-metrics = { path = "../metrics" } polkadot-primitives = { path = "../../primitives" } polkadot-overseer-gen = { path = "./overseer-gen" } -polkadot-overseer-all-subsystems-gen = { path = "./all-subsystems-gen" } tracing = "0.1.27" lru = "0.6" diff --git a/node/overseer/all-subsystems-gen/Cargo.toml b/node/overseer/all-subsystems-gen/Cargo.toml deleted file mode 100644 index e33377db3df8..000000000000 --- a/node/overseer/all-subsystems-gen/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "polkadot-overseer-all-subsystems-gen" -version = "0.9.9" -authors = ["Parity Technologies "] -edition = "2018" -description = "Small proc macro to create mocking level iface type helpers" - -[lib] -proc-macro = true - -[dependencies] -syn = { version = "1.0.76", features = ["full", "extra-traits"] } -quote = "1.0.9" -proc-macro2 = "1.0.24" - -[dev-dependencies] -trybuild = "1.0.45" diff --git a/node/overseer/all-subsystems-gen/src/lib.rs b/node/overseer/all-subsystems-gen/src/lib.rs deleted file mode 100644 index e524985f4543..000000000000 --- a/node/overseer/all-subsystems-gen/src/lib.rs +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright 2021 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -use std::collections::HashSet; - -use proc_macro2::TokenStream; -use quote::quote; - -use syn::{parse2, Error, GenericParam, Ident, Result, Type}; - -#[proc_macro_derive(AllSubsystemsGen)] -pub fn subsystems_gen(item: proc_macro::TokenStream) -> proc_macro::TokenStream { - let item: TokenStream = item.into(); - impl_subsystems_gen(item).unwrap_or_else(|err| err.to_compile_error()).into() -} - -fn impl_subsystems_gen(item: TokenStream) -> Result { - let span = proc_macro2::Span::call_site(); - let ds = parse2::(item.clone())?; - - match ds.fields { - syn::Fields::Named(named) => { - #[derive(Clone)] - struct NameTyTup { - field: Ident, - ty: Type, - } - let mut orig_generics = ds.generics; - // remove default types - orig_generics.params = orig_generics - .params - .into_iter() - .map(|mut generic| { - match generic { - GenericParam::Type(ref mut param) => { - param.eq_token = None; - param.default = None; - }, - _ => {}, - } - generic - }) - .collect(); - - // prepare a hashmap of generic type to member that uses it - let generic_types = orig_generics - .params - .iter() - .filter_map(|generic| { - if let GenericParam::Type(param) = generic { - Some(param.ident.clone()) - } else { - None - } - }) - .collect::>(); - - let strukt_ty = ds.ident; - - if generic_types.is_empty() { - return Err(Error::new( - strukt_ty.span(), - "struct must have at least one generic parameter.", - )) - } - - // collect all fields that exist, and all fields that are replaceable - let mut replacable_items = Vec::::with_capacity(64); - let mut all_fields = replacable_items.clone(); - - let mut duplicate_generic_detection = HashSet::::with_capacity(64); - - for field in named.named { - let field_ident = field - .ident - .clone() - .ok_or_else(|| Error::new(span, "Member field must have a name."))?; - let ty = field.ty.clone(); - let ntt = NameTyTup { field: field_ident, ty }; - - replacable_items.push(ntt.clone()); - - // assure every generic is used exactly once - let ty_ident = match field.ty { - Type::Path(path) => path.path.get_ident().cloned().ok_or_else(|| { - Error::new( - proc_macro2::Span::call_site(), - "Expected an identifier, but got a path.", - ) - }), - _ => return Err(Error::new(proc_macro2::Span::call_site(), "Must be path.")), - }?; - - if generic_types.contains(&ty_ident) { - if let Some(previous) = duplicate_generic_detection.replace(ty_ident) { - return Err(Error::new(previous.span(), "Generic type parameters may only be used for exactly one field, but is used more than once.")); - } - } - - all_fields.push(ntt); - } - - let msg = "Generated by #[derive(AllSubsystemsGen)] derive proc-macro."; - let mut additive = TokenStream::new(); - - // generate an impl of `fn replace_#name` - for NameTyTup { field: replacable_item, ty: replacable_item_ty } in replacable_items { - let keeper = &all_fields - .iter() - .filter(|ntt| ntt.field != replacable_item) - .map(|ntt| ntt.field.clone()) - .collect::>(); - let strukt_ty = strukt_ty.clone(); - let fname = Ident::new(&format!("replace_{}", replacable_item), span); - // adjust the generics such that the appropriate member type is replaced - let mut modified_generics = orig_generics.clone(); - modified_generics.params = modified_generics - .params - .into_iter() - .map(|mut generic| { - match generic { - GenericParam::Type(ref mut param) => { - param.eq_token = None; - param.default = None; - if match &replacable_item_ty { - Type::Path(path) => path - .path - .get_ident() - .filter(|&ident| ident == ¶m.ident) - .is_some(), - _ => false, - } { - param.ident = Ident::new("NEW", span); - } - }, - _ => {}, - } - generic - }) - .collect(); - - additive.extend(quote! { - impl #orig_generics #strukt_ty #orig_generics { - #[doc = #msg] - pub fn #fname < NEW, F > (self, gen_replacement_fn: F) -> #strukt_ty #modified_generics - where - F: FnOnce(#replacable_item_ty) -> NEW, - { - let Self { - // To be replaced field: - #replacable_item, - // Fields to keep: - #( - #keeper, - )* - } = self; - - // Some cases require that parts of the original are copied - // over, since they include a one time initialization. - let replacement = gen_replacement_fn(#replacable_item); - - #strukt_ty :: #modified_generics { - #replacable_item: replacement, - #( - #keeper, - )* - } - } - } - }); - } - - Ok(additive) - }, - syn::Fields::Unit => - Err(Error::new(span, "Must be a struct with named fields. Not an unit struct.")), - syn::Fields::Unnamed(_) => Err(Error::new( - span, - "Must be a struct with named fields. Not an unnamed fields struct.", - )), - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn basic() { - let item = quote! { - pub struct AllSubsystems { - pub a: A, - pub beee: B, - pub dj: CD, - } - }; - - let output = impl_subsystems_gen(item).expect("Simple example always works. qed"); - println!("//generated:"); - println!("{}", output); - } - - #[test] - fn ui() { - let t = trybuild::TestCases::new(); - t.compile_fail("tests/ui/err-*.rs"); - t.pass("tests/ui/ok-*.rs"); - } -} diff --git a/node/overseer/all-subsystems-gen/tests/ui/err-01-enum.rs b/node/overseer/all-subsystems-gen/tests/ui/err-01-enum.rs deleted file mode 100644 index ffcbecd0b3f4..000000000000 --- a/node/overseer/all-subsystems-gen/tests/ui/err-01-enum.rs +++ /dev/null @@ -1,13 +0,0 @@ -#![allow(dead_code)] - -use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen; - -#[derive(Clone, AllSubsystemsGen)] -enum AllSubsystems { - A(A), - B(B), -} - -fn main() { - let all = AllSubsystems::::A(0u8); -} diff --git a/node/overseer/all-subsystems-gen/tests/ui/err-01-enum.stderr b/node/overseer/all-subsystems-gen/tests/ui/err-01-enum.stderr deleted file mode 100644 index 5f61df1057cb..000000000000 --- a/node/overseer/all-subsystems-gen/tests/ui/err-01-enum.stderr +++ /dev/null @@ -1,5 +0,0 @@ -error: expected `struct` - --> $DIR/err-01-enum.rs:6:1 - | -6 | enum AllSubsystems { - | ^^^^ diff --git a/node/overseer/all-subsystems-gen/tests/ui/err-01-generic-used-twice.rs b/node/overseer/all-subsystems-gen/tests/ui/err-01-generic-used-twice.rs deleted file mode 100644 index 7c26eedf875f..000000000000 --- a/node/overseer/all-subsystems-gen/tests/ui/err-01-generic-used-twice.rs +++ /dev/null @@ -1,16 +0,0 @@ -#![allow(dead_code)] - -use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen; -#[derive(Clone, AllSubsystemsGen)] -struct AllSubsystems { - a: X, - b: X, -} - -fn main() { - let all = AllSubsystems:: { - a: 0_u16, - b: 1_u16, - }; - let _all = all.replace_a(|_| 77u8); -} diff --git a/node/overseer/all-subsystems-gen/tests/ui/err-01-generic-used-twice.stderr b/node/overseer/all-subsystems-gen/tests/ui/err-01-generic-used-twice.stderr deleted file mode 100644 index b089e8efdb42..000000000000 --- a/node/overseer/all-subsystems-gen/tests/ui/err-01-generic-used-twice.stderr +++ /dev/null @@ -1,14 +0,0 @@ -error: Generic type parameters may only be used for exactly one field, but is used more than once. - --> $DIR/err-01-generic-used-twice.rs:6:5 - | -6 | a: X, - | ^ - -error[E0599]: no method named `replace_a` found for struct `AllSubsystems` in the current scope - --> $DIR/err-01-generic-used-twice.rs:15:17 - | -5 | struct AllSubsystems { - | ----------------------- method `replace_a` not found for this -... -15 | let _all = all.replace_a(|_| 77u8); - | ^^^^^^^^^ method not found in `AllSubsystems` diff --git a/node/overseer/all-subsystems-gen/tests/ui/err-01-no-generic.rs b/node/overseer/all-subsystems-gen/tests/ui/err-01-no-generic.rs deleted file mode 100644 index d95e0ad3182d..000000000000 --- a/node/overseer/all-subsystems-gen/tests/ui/err-01-no-generic.rs +++ /dev/null @@ -1,17 +0,0 @@ -#![allow(dead_code)] - -use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen; - -#[derive(Clone, AllSubsystemsGen)] -struct AllSubsystems { - a: f32, - b: u16, -} - -fn main() { - let all = AllSubsystems { - a: 0_f32, - b: 1_u16, - }; - let _all = all.replace_a(|_| 77u8); -} diff --git a/node/overseer/all-subsystems-gen/tests/ui/err-01-no-generic.stderr b/node/overseer/all-subsystems-gen/tests/ui/err-01-no-generic.stderr deleted file mode 100644 index 1bbb7a5d51ba..000000000000 --- a/node/overseer/all-subsystems-gen/tests/ui/err-01-no-generic.stderr +++ /dev/null @@ -1,14 +0,0 @@ -error: struct must have at least one generic parameter. - --> $DIR/err-01-no-generic.rs:6:8 - | -6 | struct AllSubsystems { - | ^^^^^^^^^^^^^ - -error[E0599]: no method named `replace_a` found for struct `AllSubsystems` in the current scope - --> $DIR/err-01-no-generic.rs:16:17 - | -6 | struct AllSubsystems { - | -------------------- method `replace_a` not found for this -... -16 | let _all = all.replace_a(|_| 77u8); - | ^^^^^^^^^ method not found in `AllSubsystems` diff --git a/node/overseer/all-subsystems-gen/tests/ui/err-01-no-generics.stderr b/node/overseer/all-subsystems-gen/tests/ui/err-01-no-generics.stderr deleted file mode 100644 index 5ca7ec6c2385..000000000000 --- a/node/overseer/all-subsystems-gen/tests/ui/err-01-no-generics.stderr +++ /dev/null @@ -1,14 +0,0 @@ -error: Generic type parameters may only be used once have at least one generic parameter. - --> $DIR/err-01-no-generics.rs:7:5 - | -7 | a: X, - | ^ - -error[E0599]: no method named `replace_a` found for struct `AllSubsystems` in the current scope - --> $DIR/err-01-no-generics.rs:16:17 - | -6 | struct AllSubsystems { - | ----------------------- method `replace_a` not found for this -... -16 | let _all = all.replace_a(|_| 77u8); - | ^^^^^^^^^ method not found in `AllSubsystems` diff --git a/node/overseer/all-subsystems-gen/tests/ui/ok-01-w-generics.rs b/node/overseer/all-subsystems-gen/tests/ui/ok-01-w-generics.rs deleted file mode 100644 index 879cb6770fa8..000000000000 --- a/node/overseer/all-subsystems-gen/tests/ui/ok-01-w-generics.rs +++ /dev/null @@ -1,17 +0,0 @@ -#![allow(dead_code)] - -use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen; - -#[derive(Clone, AllSubsystemsGen)] -struct AllSubsystems { - a: A, - b: B, -} - -fn main() { - let all = AllSubsystems:: { - a: 0u8, - b: 1u16, - }; - let _all: AllSubsystems<_,_> = all.replace_a::(|_| 777_777u32); -} diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 6970054a3013..0ff8201594fb 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -28,8 +28,9 @@ use polkadot_node_subsystem_types::messages::{ }; use polkadot_overseer::{ self as overseer, + dummy::dummy_overseer_builder, gen::{FromOverseer, SpawnedSubsystem}, - AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerSignal, SubsystemError, + AllMessages, HeadSupportsParachains, OverseerSignal, SubsystemError, }; use polkadot_primitives::v1::Hash; @@ -169,12 +170,13 @@ fn main() { Delay::new(Duration::from_secs(1)).await; }); - let all_subsystems = AllSubsystems::<()>::dummy() + let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None) + .unwrap() .replace_candidate_validation(|_| Subsystem2) - .replace_candidate_backing(|orig| orig); + .replace_candidate_backing(|orig| orig) + .build() + .unwrap(); - let (overseer, _handle) = - Overseer::new(vec![], all_subsystems, None, AlwaysSupportsParachains, spawner).unwrap(); let overseer_fut = overseer.run().fuse(); let timer_stream = timer_stream; diff --git a/node/overseer/overseer-gen/proc-macro/Cargo.toml b/node/overseer/overseer-gen/proc-macro/Cargo.toml index 2940499c011a..2f5088d0ed2e 100644 --- a/node/overseer/overseer-gen/proc-macro/Cargo.toml +++ b/node/overseer/overseer-gen/proc-macro/Cargo.toml @@ -19,3 +19,9 @@ proc-macro-crate = "1.0.0" [dev-dependencies] assert_matches = "1.5.0" + +[features] +default = [] +# write the expanded version to a `overlord-expansion.rs` +# in the `cwd` +expansion = [] diff --git a/node/overseer/overseer-gen/proc-macro/src/impl_builder.rs b/node/overseer/overseer-gen/proc-macro/src/impl_builder.rs index 832e193fd4d1..a3afb8f29460 100644 --- a/node/overseer/overseer-gen/proc-macro/src/impl_builder.rs +++ b/node/overseer/overseer-gen/proc-macro/src/impl_builder.rs @@ -19,6 +19,39 @@ use syn::Ident; use super::*; +/// Returns all combinations for a single replacement: +/// 1. generic args with `NEW` in place +/// 2. subsystem type to be replaced +/// 3. the subsystem name to be replaced by a new type and value +/// 4. all other subsystems that are supposed to be kept +fn derive_replacable_generic_lists( + info: &OverseerInfo, +) -> Vec<(TokenStream, Ident, Ident, Vec)> { + // subsystem generic types + let builder_generic_ty = info.builder_generic_types(); + + let to_be_replaced_name = info.subsystem_names_without_wip(); + let baggage_generic_ty = &info.baggage_generic_types(); + + builder_generic_ty + .iter() + .enumerate() + .map(|(idx, to_be_replaced_ty)| { + let mut to_keep_name = to_be_replaced_name.clone(); + let to_be_replaced_name: Ident = to_keep_name.remove(idx); + + let mut builder_generic_ty = builder_generic_ty.clone(); + builder_generic_ty[idx] = format_ident!("NEW"); + + let generics_ts = quote! { + + }; + + (generics_ts, to_be_replaced_ty.clone(), to_be_replaced_name, to_keep_name) + }) + .collect::>() +} + /// Implement a builder pattern for the `Overseer`-type, /// which acts as the gateway to constructing the overseer. /// @@ -35,6 +68,12 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { .iter() .map(|subsystem_name| format_ident!("{}_with", subsystem_name)) .collect::>(); + let subsystem_name_replace_with = &info + .subsystem_names_without_wip() + .iter() + .map(|subsystem_name| format_ident!("replace_{}", subsystem_name)) + .collect::>(); + let builder_generic_ty = &info.builder_generic_types(); let channel_name = &info.channel_names_without_wip(""); @@ -50,6 +89,8 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { let baggage_name = &info.baggage_names(); let baggage_ty = &info.baggage_types(); + let subsystem_ctx_name = format_ident!("{}SubsystemContext", overseer_name); + let error_ty = &info.extern_error_ty; let support_crate = info.support_crate_name(); @@ -130,7 +171,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { &mut self.handle } /// Obtain access to the overseer handle. - pub fn as_handle(&mut self) -> &#handle { + pub fn as_handle(&self) -> &#handle { &self.handle } } @@ -151,7 +192,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { /// Convenience alias. type SubsystemInitFn = Box ::std::result::Result >; - /// Init kind of a field of the overseer. + /// Initialization type to be used for a field of the overseer. enum FieldInitMethod { /// Defer initialization to a point where the `handle` is available. Fn(SubsystemInitFn), @@ -238,13 +279,13 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { )* /// Complete the construction and create the overseer type. - pub fn build(mut self) -> ::std::result::Result<(#overseer_name #generics, #handle), #error_ty> { + pub fn build(self) -> ::std::result::Result<(#overseer_name #generics, #handle), #error_ty> { let connector = #connector ::default(); self.build_with_connector(connector) } /// Complete the construction and create the overseer type based on an existing `connector`. - pub fn build_with_connector(mut self, connector: #connector) -> ::std::result::Result<(#overseer_name #generics, #handle), #error_ty> + pub fn build_with_connector(self, connector: #connector) -> ::std::result::Result<(#overseer_name #generics, #handle), #error_ty> { let #connector { handle: events_tx, @@ -317,7 +358,6 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { #channel_name_tx, signal_tx, unbounded_meter, - channels_out.clone(), ctx, #subsystem_name, &mut running_subsystems, @@ -326,9 +366,9 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { #( let #baggage_name = self. #baggage_name .expect( - &format!("Baggage variable `{1}` of `{0}` ", - stringify!(#overseer_name), - stringify!( #baggage_name ) + &format!("Baggage variable `{0}` of `{1}` must be set by the user!", + stringify!(#baggage_name), + stringify!(#overseer_name) ) ); )* @@ -355,6 +395,83 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { } } }; + + let mut acc = TokenStream::new(); + + for ( + ( + ( + ref modified_generics, + ref to_be_replaced_ty, + ref to_be_replaced_name, + ref to_keep_name, + ), + subsystem_name_replace_with, + ), + consumes, + ) in derive_replacable_generic_lists(info) + .into_iter() + .zip(subsystem_name_replace_with.iter()) + .zip(consumes.iter()) + { + let replace1 = quote! { + /// Replace a subsystem by another implementation for the + /// consumable message type. + pub fn #subsystem_name_replace_with < NEW, F > + (self, gen_replacement_fn: F) -> #builder #modified_generics + where + #to_be_replaced_ty: 'static, + F: 'static + FnOnce(#to_be_replaced_ty) -> NEW, + NEW: #support_crate ::Subsystem<#subsystem_ctx_name< #consumes >, #error_ty>, + { + + let Self { + #to_be_replaced_name, + #( + #to_keep_name, + )* + #( + #baggage_name, + )* + spawner, + } = self; + + // Some cases require that parts of the original are copied + // over, since they include a one time initialization. + let replacement: FieldInitMethod = match #to_be_replaced_name { + FieldInitMethod::Fn(fx) => FieldInitMethod::Fn( + Box::new(move |handle: #handle| { + let orig = fx(handle)?; + Ok(gen_replacement_fn(orig)) + }) + ), + FieldInitMethod::Value(val) => FieldInitMethod::Value(gen_replacement_fn(val)), + FieldInitMethod::Uninitialized => panic!("Must have a value before it can be replaced. qed"), + }; + + #builder :: #modified_generics { + #to_be_replaced_name: replacement, + #( + #to_keep_name, + )* + #( + #baggage_name, + )* + spawner, + } + } + }; + acc.extend(replace1); + } + + ts.extend(quote! { + impl #builder_generics #builder #builder_generics + #builder_where_clause + { + #acc + } + }); + ts.extend(impl_task_kind(info)); ts } @@ -365,9 +482,6 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream { let support_crate = info.support_crate_name(); let ts = quote! { - - use #support_crate ::FutureExt as _; - /// Task kind to launch. pub trait TaskKind { /// Spawn a task, it depends on the implementer if this is blocking or not. @@ -397,8 +511,6 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream { signal_tx: #support_crate ::metered::MeteredSender< #signal >, // meter for the unbounded channel unbounded_meter: #support_crate ::metered::Meter, - // connection to the subsystems - channels_out: ChannelsOut, ctx: Ctx, s: SubSys, futures: &mut #support_crate ::FuturesUnordered >>, diff --git a/node/overseer/overseer-gen/proc-macro/src/impl_channels_out.rs b/node/overseer/overseer-gen/proc-macro/src/impl_channels_out.rs index f2d6e88b360b..08744e6c520d 100644 --- a/node/overseer/overseer-gen/proc-macro/src/impl_channels_out.rs +++ b/node/overseer/overseer-gen/proc-macro/src/impl_channels_out.rs @@ -94,8 +94,6 @@ pub(crate) fn impl_channels_out_struct(info: &OverseerInfo) -> Result = match message { #( #message_wrapper :: #consumes_variant (inner) => { diff --git a/node/overseer/overseer-gen/proc-macro/src/impl_overseer.rs b/node/overseer/overseer-gen/proc-macro/src/impl_overseer.rs index cddd0534c6ea..54b207eb1960 100644 --- a/node/overseer/overseer-gen/proc-macro/src/impl_overseer.rs +++ b/node/overseer/overseer-gen/proc-macro/src/impl_overseer.rs @@ -56,8 +56,6 @@ pub(crate) fn impl_overseer_struct(info: &OverseerInfo) -> proc_macro2::TokenStr syn::LitStr::new(overseer_name.to_string().to_lowercase().as_str(), overseer_name.span()); let ts = quote! { - const STOP_DELAY: ::std::time::Duration = ::std::time::Duration::from_secs(1); - /// Capacity of a bounded message channel between overseer and subsystem /// but also for bounded channels between two subsystems. const CHANNEL_CAPACITY: usize = #message_channel_capacity; @@ -116,10 +114,9 @@ pub(crate) fn impl_overseer_struct(info: &OverseerInfo) -> proc_macro2::TokenStr loop { select! { - _ = self.running_subsystems.next() => { - if self.running_subsystems.is_empty() { - break; - } + _ = self.running_subsystems.next() => + if self.running_subsystems.is_empty() { + break; }, _ = timeout_fut => break, complete => break, @@ -187,9 +184,6 @@ pub(crate) fn impl_overseen_subsystem(info: &OverseerInfo) -> proc_macro2::Token let support_crate = info.support_crate_name(); let ts = quote::quote! { - - use #support_crate ::futures::SinkExt as _; - /// A subsystem that the overseer oversees. /// /// Ties together the [`Subsystem`] itself and it's running instance diff --git a/node/overseer/overseer-gen/proc-macro/src/lib.rs b/node/overseer/overseer-gen/proc-macro/src/lib.rs index b967acfe8b65..1a0634b29720 100644 --- a/node/overseer/overseer-gen/proc-macro/src/lib.rs +++ b/node/overseer/overseer-gen/proc-macro/src/lib.rs @@ -98,5 +98,28 @@ pub(crate) fn impl_overseer_gen( additive.extend(impl_message_wrapper_enum(&info)?); additive.extend(impl_dispatch(&info)); + #[cfg(feature = "expansion")] + { + use std::io::Write; + + let cwd = std::env::current_dir().unwrap(); + let path: std::path::PathBuf = cwd.join("overlord-expansion.rs"); + let mut f = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&path) + .expect("File exists. qed"); + f.write_all( + &mut format!("// {:?} \n{}", std::time::SystemTime::now(), additive).as_bytes(), + ) + .expect("Got permissions to write to file. qed"); + std::process::Command::new("rustfmt") + .arg("--edition=2018") + .arg(&path) + .current_dir(cwd) + .spawn() + .expect("Running rustfmt works. qed"); + } Ok(additive) } diff --git a/node/overseer/src/dummy.rs b/node/overseer/src/dummy.rs new file mode 100644 index 000000000000..1a2e29bb8721 --- /dev/null +++ b/node/overseer/src/dummy.rs @@ -0,0 +1,137 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Legacy way of defining subsystems. +//! +//! In the future, everything should be set up using the generated +//! overseer builder pattern instead. + +use crate::{ + prometheus::Registry, AllMessages, HeadSupportsParachains, Metrics, MetricsTrait, Overseer, + OverseerBuilder, OverseerSignal, SpawnNamed, KNOWN_LEAVES_CACHE_SIZE, +}; +use lru::LruCache; +use polkadot_node_subsystem_types::errors::SubsystemError; +use polkadot_overseer_gen::{FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext}; + +/// A dummy subsystem that implements [`Subsystem`] for all +/// types of messages. Used for tests or as a placeholder. +#[derive(Clone, Copy, Debug)] +pub struct DummySubsystem; + +impl Subsystem for DummySubsystem +where + Context: SubsystemContext< + Signal = OverseerSignal, + Error = SubsystemError, + AllMessages = AllMessages, + >, +{ + fn start(self, mut ctx: Context) -> SpawnedSubsystem { + let future = Box::pin(async move { + loop { + match ctx.recv().await { + Err(_) => return Ok(()), + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()), + Ok(overseer_msg) => { + tracing::debug!( + target: "dummy-subsystem", + "Discarding a message sent from overseer {:?}", + overseer_msg + ); + continue + }, + } + } + }); + + SpawnedSubsystem { name: "dummy-subsystem", future } + } +} + +/// Create an overseer with all subsystem being `DummySubsystem`. +/// +/// Preferred way of initializing a dummy overseer for subsystem tests. +pub fn dummy_overseer_builder< + 'a, + Spawner: SpawnNamed + Send + Sync + 'static, + SupportsParachains: HeadSupportsParachains, +>( + spawner: Spawner, + supports_parachains: SupportsParachains, + registry: Option<&'a Registry>, +) -> Result< + OverseerBuilder< + Spawner, + SupportsParachains, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + >, + SubsystemError, +> { + let metrics: Metrics = ::register(registry)?; + + let builder = Overseer::builder() + .availability_distribution(DummySubsystem) + .availability_recovery(DummySubsystem) + .availability_store(DummySubsystem) + .bitfield_distribution(DummySubsystem) + .bitfield_signing(DummySubsystem) + .candidate_backing(DummySubsystem) + .candidate_validation(DummySubsystem) + .chain_api(DummySubsystem) + .collation_generation(DummySubsystem) + .collator_protocol(DummySubsystem) + .network_bridge(DummySubsystem) + .provisioner(DummySubsystem) + .runtime_api(DummySubsystem) + .statement_distribution(DummySubsystem) + .approval_distribution(DummySubsystem) + .approval_voting(DummySubsystem) + .gossip_support(DummySubsystem) + .dispute_coordinator(DummySubsystem) + .dispute_participation(DummySubsystem) + .dispute_distribution(DummySubsystem) + .chain_selection(DummySubsystem) + .activation_external_listeners(Default::default()) + .span_per_active_leaf(Default::default()) + .active_leaves(Default::default()) + .known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE)) + .leaves(Default::default()) + .spawner(spawner) + .metrics(metrics) + .supports_parachains(supports_parachains); + Ok(builder) +} diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 935cf6ebbef8..cf509c80ac40 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -62,7 +62,6 @@ use std::{ collections::{hash_map, HashMap}, fmt::{self, Debug}, - iter::FromIterator, pin::Pin, sync::Arc, time::Duration, @@ -70,7 +69,6 @@ use std::{ use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt}; use lru::LruCache; -use parking_lot::RwLock; use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; use polkadot_primitives::v1::{Block, BlockId, BlockNumber, Hash, ParachainHost}; @@ -91,15 +89,13 @@ pub use polkadot_node_subsystem_types::{ jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal, }; -// TODO legacy, to be deleted, left for easier integration -// TODO https://github.com/paritytech/polkadot/issues/3427 -mod subsystems; -pub use self::subsystems::{AllSubsystems, DummySubsystem}; - mod metrics; -use self::metrics::Metrics; +pub use self::metrics::Metrics; + +/// A dummy subsystem, mostly useful for placeholders and tests. +pub mod dummy; -use polkadot_node_metrics::{ +pub use polkadot_node_metrics::{ metrics::{prometheus, Metrics as MetricsTrait}, Metronome, }; @@ -116,7 +112,7 @@ pub use polkadot_overseer_gen::{ /// Store 2 days worth of blocks, not accounting for forks, /// in the LRU cache. Assumes a 6-second block time. -const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6; +pub const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6; #[cfg(test)] mod tests; @@ -142,18 +138,12 @@ where /// /// [`Overseer`]: struct.Overseer.html #[derive(Clone)] -pub enum Handle { - /// Used only at initialization to break the cyclic dependency. - // TODO: refactor in https://github.com/paritytech/polkadot/issues/3427 - Disconnected(Arc>>), - /// A handle to the overseer. - Connected(OverseerHandle), -} +pub struct Handle(pub OverseerHandle); impl Handle { - /// Create a new disconnected [`Handle`]. - pub fn new_disconnected() -> Self { - Self::Disconnected(Arc::new(RwLock::new(None))) + /// Create a new [`Handle`]. + pub fn new(raw: OverseerHandle) -> Self { + Self(raw) } /// Inform the `Overseer` that that some block was imported. @@ -202,58 +192,8 @@ impl Handle { /// Most basic operation, to stop a server. async fn send_and_log_error(&mut self, event: Event) { - self.try_connect(); - if let Self::Connected(ref mut handle) = self { - if handle.send(event).await.is_err() { - tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer"); - } - } else { - tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer"); - } - } - - /// Whether the handle is disconnected. - pub fn is_disconnected(&self) -> bool { - match self { - Self::Disconnected(ref x) => x.read().is_none(), - _ => false, - } - } - - /// Connect this handle and all disconnected clones of it to the overseer. - pub fn connect_to_overseer(&mut self, handle: OverseerHandle) { - match self { - Self::Disconnected(ref mut x) => { - let mut maybe_handle = x.write(); - if maybe_handle.is_none() { - tracing::info!(target: LOG_TARGET, "🖇️ Connecting all Handles to Overseer"); - *maybe_handle = Some(handle); - } else { - tracing::warn!( - target: LOG_TARGET, - "Attempting to connect a clone of a connected Handle", - ); - } - }, - _ => { - tracing::warn!( - target: LOG_TARGET, - "Attempting to connect an already connected Handle", - ); - }, - } - } - - /// Try upgrading from `Self::Disconnected` to `Self::Connected` state - /// after calling `connect_to_overseer` on `self` or a clone of `self`. - fn try_connect(&mut self) { - if let Self::Disconnected(ref mut x) = self { - let guard = x.write(); - if let Some(ref h) = *guard { - let handle = h.clone(); - drop(guard); - *self = Self::Connected(handle); - } + if self.0.send(event).await.is_err() { + tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer"); } } } @@ -347,7 +287,119 @@ pub async fn forward_events>(client: Arc

, mut hand } } -/// The `Overseer` itself. +/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s. +/// +/// This returns the overseer along with an [`OverseerHandle`] which can +/// be used to send messages from external parts of the codebase. +/// +/// The [`OverseerHandle`] returned from this function is connected to +/// the returned [`Overseer`]. +/// +/// ```text +/// +------------------------------------+ +/// | Overseer | +/// +------------------------------------+ +/// / | | \ +/// ................. subsystems................................... +/// . +-----------+ +-----------+ +----------+ +---------+ . +/// . | | | | | | | | . +/// . +-----------+ +-----------+ +----------+ +---------+ . +/// ............................................................... +/// | +/// probably `spawn` +/// a `job` +/// | +/// V +/// +-----------+ +/// | | +/// +-----------+ +/// +/// ``` +/// +/// [`Subsystem`]: trait.Subsystem.html +/// +/// # Example +/// +/// The [`Subsystems`] may be any type as long as they implement an expected interface. +/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with them. +/// For the sake of simplicity the termination of the example is done with a timeout. +/// ``` +/// # use std::time::Duration; +/// # use futures::{executor, pin_mut, select, FutureExt}; +/// # use futures_timer::Delay; +/// # use polkadot_primitives::v1::Hash; +/// # use polkadot_overseer::{ +/// # self as overseer, +/// # OverseerSignal, +/// # SubsystemSender as _, +/// # AllMessages, +/// # HeadSupportsParachains, +/// # Overseer, +/// # SubsystemError, +/// # gen::{ +/// # SubsystemContext, +/// # FromOverseer, +/// # SpawnedSubsystem, +/// # }, +/// # }; +/// # use polkadot_node_subsystem_types::messages::{ +/// # CandidateValidationMessage, CandidateBackingMessage, +/// # NetworkBridgeMessage, +/// # }; +/// +/// struct ValidationSubsystem; +/// +/// impl overseer::Subsystem for ValidationSubsystem +/// where +/// Ctx: overseer::SubsystemContext< +/// Message=CandidateValidationMessage, +/// AllMessages=AllMessages, +/// Signal=OverseerSignal, +/// Error=SubsystemError, +/// >, +/// { +/// fn start( +/// self, +/// mut ctx: Ctx, +/// ) -> SpawnedSubsystem { +/// SpawnedSubsystem { +/// name: "validation-subsystem", +/// future: Box::pin(async move { +/// loop { +/// Delay::new(Duration::from_secs(1)).await; +/// } +/// }), +/// } +/// } +/// } +/// +/// # fn main() { executor::block_on(async move { +/// +/// struct AlwaysSupportsParachains; +/// impl HeadSupportsParachains for AlwaysSupportsParachains { +/// fn head_supports_parachains(&self, _head: &Hash) -> bool { true } +/// } +/// let spawner = sp_core::testing::TaskExecutor::new(); +/// let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None) +/// .unwrap() +/// .replace_candidate_validation(|_| ValidationSubsystem) +/// .build() +/// .unwrap(); +/// +/// let timer = Delay::new(Duration::from_millis(50)).fuse(); +/// +/// let overseer_fut = overseer.run().fuse(); +/// pin_mut!(timer); +/// pin_mut!(overseer_fut); +/// +/// select! { +/// _ = overseer_fut => (), +/// _ = timer => (), +/// } +/// # +/// # }); +/// # } +/// ``` #[overlord( gen=AllMessages, event=Event, @@ -443,304 +495,65 @@ pub struct Overseer { pub metrics: Metrics, } -impl Overseer +/// Spawn the metrics metronome task. +pub fn spawn_metronome_metrics( + overseer: &mut Overseer, + metronome_metrics: Metrics, +) -> Result<(), SubsystemError> where - SupportsParachains: HeadSupportsParachains, S: SpawnNamed, + SupportsParachains: HeadSupportsParachains, { - /// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s. - /// - /// This returns the overseer along with an [`OverseerHandle`] which can - /// be used to send messages from external parts of the codebase. - /// - /// The [`OverseerHandle`] returned from this function is connected to - /// the returned [`Overseer`]. - /// - /// ```text - /// +------------------------------------+ - /// | Overseer | - /// +------------------------------------+ - /// / | | \ - /// ................. subsystems................................... - /// . +-----------+ +-----------+ +----------+ +---------+ . - /// . | | | | | | | | . - /// . +-----------+ +-----------+ +----------+ +---------+ . - /// ............................................................... - /// | - /// probably `spawn` - /// a `job` - /// | - /// V - /// +-----------+ - /// | | - /// +-----------+ - /// - /// ``` - /// - /// [`Subsystem`]: trait.Subsystem.html - /// - /// # Example - /// - /// The [`Subsystems`] may be any type as long as they implement an expected interface. - /// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with them. - /// For the sake of simplicity the termination of the example is done with a timeout. - /// ``` - /// # use std::time::Duration; - /// # use futures::{executor, pin_mut, select, FutureExt}; - /// # use futures_timer::Delay; - /// # use polkadot_primitives::v1::Hash; - /// # use polkadot_overseer::{ - /// # self as overseer, - /// # OverseerSignal, - /// # SubsystemSender as _, - /// # AllMessages, - /// # AllSubsystems, - /// # HeadSupportsParachains, - /// # Overseer, - /// # SubsystemError, - /// # gen::{ - /// # SubsystemContext, - /// # FromOverseer, - /// # SpawnedSubsystem, - /// # }, - /// # }; - /// # use polkadot_node_subsystem_types::messages::{ - /// # CandidateValidationMessage, CandidateBackingMessage, - /// # NetworkBridgeMessage, - /// # }; - /// - /// struct ValidationSubsystem; - /// - /// impl overseer::Subsystem for ValidationSubsystem - /// where - /// Ctx: overseer::SubsystemContext< - /// Message=CandidateValidationMessage, - /// AllMessages=AllMessages, - /// Signal=OverseerSignal, - /// Error=SubsystemError, - /// >, - /// { - /// fn start( - /// self, - /// mut ctx: Ctx, - /// ) -> SpawnedSubsystem { - /// SpawnedSubsystem { - /// name: "validation-subsystem", - /// future: Box::pin(async move { - /// loop { - /// Delay::new(Duration::from_secs(1)).await; - /// } - /// }), - /// } - /// } - /// } - /// - /// # fn main() { executor::block_on(async move { - /// - /// struct AlwaysSupportsParachains; - /// impl HeadSupportsParachains for AlwaysSupportsParachains { - /// fn head_supports_parachains(&self, _head: &Hash) -> bool { true } - /// } - /// let spawner = sp_core::testing::TaskExecutor::new(); - /// let all_subsystems = AllSubsystems::<()>::dummy() - /// .replace_candidate_validation(|_| ValidationSubsystem); - /// let (overseer, _handle) = Overseer::new( - /// vec![], - /// all_subsystems, - /// None, - /// AlwaysSupportsParachains, - /// spawner, - /// ).unwrap(); - /// - /// let timer = Delay::new(Duration::from_millis(50)).fuse(); - /// - /// let overseer_fut = overseer.run().fuse(); - /// pin_mut!(timer); - /// pin_mut!(overseer_fut); - /// - /// select! { - /// _ = overseer_fut => (), - /// _ = timer => (), - /// } - /// # - /// # }); - /// # } - /// ``` - pub fn new< - CV, - CB, - SD, - AD, - AR, - BS, - BD, - P, - RA, - AS, - NB, - CA, - CG, - CP, - ApD, - ApV, - GS, - DC, - DP, - DD, - CS, - >( - leaves: impl IntoIterator, - all_subsystems: AllSubsystems< - CV, - CB, - SD, - AD, - AR, - BS, - BD, - P, - RA, - AS, - NB, - CA, - CG, - CP, - ApD, - ApV, - GS, - DC, - DP, - DD, - CS, - >, - prometheus_registry: Option<&prometheus::Registry>, - supports_parachains: SupportsParachains, - s: S, - ) -> SubsystemResult<(Self, OverseerHandle)> - where - CV: Subsystem, SubsystemError> + Send, - CB: Subsystem, SubsystemError> + Send, - SD: Subsystem, SubsystemError> - + Send, - AD: Subsystem, SubsystemError> - + Send, - AR: Subsystem, SubsystemError> + Send, - BS: Subsystem, SubsystemError> + Send, - BD: Subsystem, SubsystemError> + Send, - P: Subsystem, SubsystemError> + Send, - RA: Subsystem, SubsystemError> + Send, - AS: Subsystem, SubsystemError> + Send, - NB: Subsystem, SubsystemError> + Send, - CA: Subsystem, SubsystemError> + Send, - CG: Subsystem, SubsystemError> + Send, - CP: Subsystem, SubsystemError> + Send, - ApD: - Subsystem, SubsystemError> + Send, - ApV: Subsystem, SubsystemError> + Send, - GS: Subsystem, SubsystemError> + Send, - DC: Subsystem, SubsystemError> + Send, - DP: Subsystem, SubsystemError> + Send, - DD: Subsystem, SubsystemError> + Send, - CS: Subsystem, SubsystemError> + Send, - S: SpawnNamed, - { - let metrics: Metrics = ::register(prometheus_registry)?; - - let (mut overseer, handle) = Self::builder() - .candidate_validation(all_subsystems.candidate_validation) - .candidate_backing(all_subsystems.candidate_backing) - .statement_distribution(all_subsystems.statement_distribution) - .availability_distribution(all_subsystems.availability_distribution) - .availability_recovery(all_subsystems.availability_recovery) - .bitfield_signing(all_subsystems.bitfield_signing) - .bitfield_distribution(all_subsystems.bitfield_distribution) - .provisioner(all_subsystems.provisioner) - .runtime_api(all_subsystems.runtime_api) - .availability_store(all_subsystems.availability_store) - .network_bridge(all_subsystems.network_bridge) - .chain_api(all_subsystems.chain_api) - .collation_generation(all_subsystems.collation_generation) - .collator_protocol(all_subsystems.collator_protocol) - .approval_distribution(all_subsystems.approval_distribution) - .approval_voting(all_subsystems.approval_voting) - .gossip_support(all_subsystems.gossip_support) - .dispute_coordinator(all_subsystems.dispute_coordinator) - .dispute_participation(all_subsystems.dispute_participation) - .dispute_distribution(all_subsystems.dispute_distribution) - .chain_selection(all_subsystems.chain_selection) - .leaves(Vec::from_iter( - leaves - .into_iter() - .map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)), - )) - .known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE)) - .active_leaves(Default::default()) - .span_per_active_leaf(Default::default()) - .activation_external_listeners(Default::default()) - .supports_parachains(supports_parachains) - .metrics(metrics.clone()) - .spawner(s) - .build()?; - - // spawn the metrics metronome task - { - struct ExtractNameAndMeters; - - impl<'a, T: 'a> MapSubsystem<&'a OverseenSubsystem> for ExtractNameAndMeters { - type Output = Option<(&'static str, SubsystemMeters)>; - - fn map_subsystem(&self, subsystem: &'a OverseenSubsystem) -> Self::Output { - subsystem - .instance - .as_ref() - .map(|instance| (instance.name, instance.meters.clone())) - } - } - let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters); - - #[cfg(feature = "memory-stats")] - let memory_stats = MemoryAllocationTracker::new().expect("Jemalloc is the default allocator. qed"); - - let metronome_metrics = metrics.clone(); - let metronome = - Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| { - #[cfg(feature = "memory-stats")] - match memory_stats.snapshot() { - Ok(memory_stats_snapshot) => { - tracing::trace!( - target: LOG_TARGET, - "memory_stats: {:?}", - &memory_stats_snapshot - ); - metronome_metrics.memory_stats_snapshot(memory_stats_snapshot); - }, - - Err(e) => tracing::debug!( - target: LOG_TARGET, - "Failed to obtain memory stats: {:?}", - e - ), - } + struct ExtractNameAndMeters; - // We combine the amount of messages from subsystems to the overseer - // as well as the amount of messages from external sources to the overseer - // into one `to_overseer` value. - metronome_metrics.channel_fill_level_snapshot( - subsystem_meters - .iter() - .cloned() - .filter_map(|x| x) - .map(|(name, ref meters)| (name, meters.read())), - ); + impl<'a, T: 'a> MapSubsystem<&'a OverseenSubsystem> for ExtractNameAndMeters { + type Output = Option<(&'static str, SubsystemMeters)>; - async move { () } - }); - overseer.spawner().spawn("metrics_metronome", Box::pin(metronome)); + fn map_subsystem(&self, subsystem: &'a OverseenSubsystem) -> Self::Output { + subsystem + .instance + .as_ref() + .map(|instance| (instance.name, instance.meters.clone())) } - - Ok((overseer, handle)) } + let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters); + + #[cfg(feature = "memory-stats")] + let memory_stats = MemoryAllocationTracker::new().expect("Jemalloc is the default allocator. qed"); + + let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| { + #[cfg(feature = "memory-stats")] + match memory_stats.snapshot() { + Ok(memory_stats_snapshot) => { + tracing::trace!(target: LOG_TARGET, "memory_stats: {:?}", &memory_stats_snapshot); + metronome_metrics.memory_stats_snapshot(memory_stats_snapshot); + }, + + Err(e) => tracing::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e), + } + // We combine the amount of messages from subsystems to the overseer + // as well as the amount of messages from external sources to the overseer + // into one `to_overseer` value. + metronome_metrics.channel_fill_level_snapshot( + subsystem_meters + .iter() + .cloned() + .filter_map(|x| x) + .map(|(name, ref meters)| (name, meters.read())), + ); + + async move { () } + }); + overseer.spawner().spawn("metrics_metronome", Box::pin(metronome)); + Ok(()) +} + +impl Overseer +where + SupportsParachains: HeadSupportsParachains, + S: SpawnNamed, +{ /// Stop the overseer. async fn stop(mut self) { let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await; @@ -748,6 +561,9 @@ where /// Run the `Overseer`. pub async fn run(mut self) -> SubsystemResult<()> { + let metrics = self.metrics.clone(); + spawn_metronome_metrics(&mut self, metrics)?; + // Notify about active leaves on startup before starting the loop for (hash, number) in std::mem::take(&mut self.leaves) { let _ = self.active_leaves.insert(hash, number); diff --git a/node/overseer/src/subsystems.rs b/node/overseer/src/subsystems.rs deleted file mode 100644 index 648528730d67..000000000000 --- a/node/overseer/src/subsystems.rs +++ /dev/null @@ -1,382 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Legacy way of defining subsystems. -//! -//! In the future, everything should be set up using the generated -//! overseer builder pattern instead. - -use crate::{AllMessages, OverseerSignal}; -use polkadot_node_subsystem_types::errors::SubsystemError; -use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen; -use polkadot_overseer_gen::{ - FromOverseer, MapSubsystem, SpawnedSubsystem, Subsystem, SubsystemContext, -}; - -/// A dummy subsystem that implements [`Subsystem`] for all -/// types of messages. Used for tests or as a placeholder. -#[derive(Clone, Copy, Debug)] -pub struct DummySubsystem; - -impl Subsystem for DummySubsystem -where - Context: SubsystemContext< - Signal = OverseerSignal, - Error = SubsystemError, - AllMessages = AllMessages, - >, -{ - fn start(self, mut ctx: Context) -> SpawnedSubsystem { - let future = Box::pin(async move { - loop { - match ctx.recv().await { - Err(_) => return Ok(()), - Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()), - Ok(overseer_msg) => { - tracing::debug!( - target: "dummy-subsystem", - "Discarding a message sent from overseer {:?}", - overseer_msg - ); - continue - }, - } - } - }); - - SpawnedSubsystem { name: "dummy-subsystem", future } - } -} - -/// This struct is passed as an argument to create a new instance of an [`Overseer`]. -/// -/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows -/// mocking in the test code: -/// -/// Each [`Subsystem`] is supposed to implement some interface that is generic over -/// message type that is specific to this [`Subsystem`]. At the moment not all -/// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`]. -#[derive(Debug, Clone, AllSubsystemsGen)] -pub struct AllSubsystems< - CV = (), - CB = (), - SD = (), - AD = (), - AR = (), - BS = (), - BD = (), - P = (), - RA = (), - AS = (), - NB = (), - CA = (), - CG = (), - CP = (), - ApD = (), - ApV = (), - GS = (), - DC = (), - DP = (), - DD = (), - CS = (), -> { - /// A candidate validation subsystem. - pub candidate_validation: CV, - /// A candidate backing subsystem. - pub candidate_backing: CB, - /// A statement distribution subsystem. - pub statement_distribution: SD, - /// An availability distribution subsystem. - pub availability_distribution: AD, - /// An availability recovery subsystem. - pub availability_recovery: AR, - /// A bitfield signing subsystem. - pub bitfield_signing: BS, - /// A bitfield distribution subsystem. - pub bitfield_distribution: BD, - /// A provisioner subsystem. - pub provisioner: P, - /// A runtime API subsystem. - pub runtime_api: RA, - /// An availability store subsystem. - pub availability_store: AS, - /// A network bridge subsystem. - pub network_bridge: NB, - /// A Chain API subsystem. - pub chain_api: CA, - /// A Collation Generation subsystem. - pub collation_generation: CG, - /// A Collator Protocol subsystem. - pub collator_protocol: CP, - /// An Approval Distribution subsystem. - pub approval_distribution: ApD, - /// An Approval Voting subsystem. - pub approval_voting: ApV, - /// A Connection Request Issuer subsystem. - pub gossip_support: GS, - /// A Dispute Coordinator subsystem. - pub dispute_coordinator: DC, - /// A Dispute Participation subsystem. - pub dispute_participation: DP, - /// A Dispute Distribution subsystem. - pub dispute_distribution: DD, - /// A Chain Selection subsystem. - pub chain_selection: CS, -} - -impl - AllSubsystems -{ - /// Create a new instance of [`AllSubsystems`]. - /// - /// Each subsystem is set to [`DummySystem`]. - /// - ///# Note - /// - /// Because of a bug in rustc it is required that when calling this function, - /// you provide a "random" type for the first generic parameter: - /// - /// ``` - /// polkadot_overseer::AllSubsystems::<()>::dummy(); - /// ``` - pub fn dummy() -> AllSubsystems< - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - > { - AllSubsystems { - candidate_validation: DummySubsystem, - candidate_backing: DummySubsystem, - statement_distribution: DummySubsystem, - availability_distribution: DummySubsystem, - availability_recovery: DummySubsystem, - bitfield_signing: DummySubsystem, - bitfield_distribution: DummySubsystem, - provisioner: DummySubsystem, - runtime_api: DummySubsystem, - availability_store: DummySubsystem, - network_bridge: DummySubsystem, - chain_api: DummySubsystem, - collation_generation: DummySubsystem, - collator_protocol: DummySubsystem, - approval_distribution: DummySubsystem, - approval_voting: DummySubsystem, - gossip_support: DummySubsystem, - dispute_coordinator: DummySubsystem, - dispute_participation: DummySubsystem, - dispute_distribution: DummySubsystem, - chain_selection: DummySubsystem, - } - } - - /// Reference every individual subsystem. - pub fn as_ref( - &self, - ) -> AllSubsystems< - &'_ CV, - &'_ CB, - &'_ SD, - &'_ AD, - &'_ AR, - &'_ BS, - &'_ BD, - &'_ P, - &'_ RA, - &'_ AS, - &'_ NB, - &'_ CA, - &'_ CG, - &'_ CP, - &'_ ApD, - &'_ ApV, - &'_ GS, - &'_ DC, - &'_ DP, - &'_ DD, - &'_ CS, - > { - AllSubsystems { - candidate_validation: &self.candidate_validation, - candidate_backing: &self.candidate_backing, - statement_distribution: &self.statement_distribution, - availability_distribution: &self.availability_distribution, - availability_recovery: &self.availability_recovery, - bitfield_signing: &self.bitfield_signing, - bitfield_distribution: &self.bitfield_distribution, - provisioner: &self.provisioner, - runtime_api: &self.runtime_api, - availability_store: &self.availability_store, - network_bridge: &self.network_bridge, - chain_api: &self.chain_api, - collation_generation: &self.collation_generation, - collator_protocol: &self.collator_protocol, - approval_distribution: &self.approval_distribution, - approval_voting: &self.approval_voting, - gossip_support: &self.gossip_support, - dispute_coordinator: &self.dispute_coordinator, - dispute_participation: &self.dispute_participation, - dispute_distribution: &self.dispute_distribution, - chain_selection: &self.chain_selection, - } - } - - /// Map each subsystem. - pub fn map_subsystems( - self, - mapper: Mapper, - ) -> AllSubsystems< - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - >::Output, - > - where - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem

, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem, - Mapper: MapSubsystem

, - Mapper: MapSubsystem, - { - AllSubsystems { - candidate_validation: >::map_subsystem( - &mapper, - self.candidate_validation, - ), - candidate_backing: >::map_subsystem( - &mapper, - self.candidate_backing, - ), - statement_distribution: >::map_subsystem( - &mapper, - self.statement_distribution, - ), - availability_distribution: >::map_subsystem( - &mapper, - self.availability_distribution, - ), - availability_recovery: >::map_subsystem( - &mapper, - self.availability_recovery, - ), - bitfield_signing: >::map_subsystem( - &mapper, - self.bitfield_signing, - ), - bitfield_distribution: >::map_subsystem( - &mapper, - self.bitfield_distribution, - ), - provisioner: >::map_subsystem(&mapper, self.provisioner), - runtime_api: >::map_subsystem(&mapper, self.runtime_api), - availability_store: >::map_subsystem( - &mapper, - self.availability_store, - ), - network_bridge: >::map_subsystem( - &mapper, - self.network_bridge, - ), - chain_api: >::map_subsystem(&mapper, self.chain_api), - collation_generation: >::map_subsystem( - &mapper, - self.collation_generation, - ), - collator_protocol: >::map_subsystem( - &mapper, - self.collator_protocol, - ), - approval_distribution: >::map_subsystem( - &mapper, - self.approval_distribution, - ), - approval_voting: >::map_subsystem( - &mapper, - self.approval_voting, - ), - gossip_support: >::map_subsystem( - &mapper, - self.gossip_support, - ), - dispute_coordinator: >::map_subsystem( - &mapper, - self.dispute_coordinator, - ), - dispute_participation: >::map_subsystem( - &mapper, - self.dispute_participation, - ), - dispute_distribution: >::map_subsystem( - &mapper, - self.dispute_distribution, - ), - chain_selection: >::map_subsystem( - &mapper, - self.chain_selection, - ), - } - } -} diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index 7564116e7141..520773168207 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -32,7 +32,9 @@ use polkadot_primitives::v1::{ ValidatorIndex, }; -use crate::{self as overseer, gen::Delay, HeadSupportsParachains, Overseer}; +use crate::{ + self as overseer, dummy::dummy_overseer_builder, gen::Delay, HeadSupportsParachains, Overseer, +}; use metered_channel as metered; use assert_matches::assert_matches; @@ -40,6 +42,15 @@ use sp_core::crypto::Pair as _; use super::*; +fn block_info_to_pair(blocks: impl IntoIterator) -> Vec<(Hash, BlockNumber)> { + use std::iter::FromIterator; + Vec::from_iter( + blocks + .into_iter() + .map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)), + ) +} + type SpawnedSubsystem = crate::gen::SpawnedSubsystem; struct TestSubsystem1(metered::MeteredSender); @@ -159,14 +170,13 @@ fn overseer_works() { let mut s1_rx = s1_rx.fuse(); let mut s2_rx = s2_rx.fuse(); - - let all_subsystems = AllSubsystems::<()>::dummy() + let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) + .unwrap() .replace_candidate_validation(move |_| TestSubsystem1(s1_tx)) - .replace_candidate_backing(move |_| TestSubsystem2(s2_tx)); - - let (overseer, handle) = - Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap(); - let mut handle = Handle::Connected(handle); + .replace_candidate_backing(move |_| TestSubsystem2(s2_tx)) + .build() + .unwrap(); + let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -219,17 +229,15 @@ fn overseer_metrics_work() { let third_block = BlockInfo { hash: third_block_hash, parent_hash: second_block_hash, number: 3 }; - let all_subsystems = AllSubsystems::<()>::dummy(); let registry = prometheus::Registry::new(); - let (overseer, handle) = Overseer::new( - vec![first_block], - all_subsystems, - Some(®istry), - MockSupportsParachains, - spawner, - ) - .unwrap(); - let mut handle = Handle::Connected(handle); + let (overseer, handle) = + dummy_overseer_builder(spawner, MockSupportsParachains, Some(®istry)) + .unwrap() + .leaves(block_info_to_pair(vec![first_block])) + .build() + .unwrap(); + + let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -256,13 +264,20 @@ fn overseer_metrics_work() { fn extract_metrics(registry: &prometheus::Registry) -> HashMap<&'static str, u64> { let gather = registry.gather(); - let gather = &gather[2..]; - assert_eq!(gather[0].get_name(), "parachain_activated_heads_total"); - assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total"); - assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total"); - let activated = gather[0].get_metric()[0].get_counter().get_value() as u64; - let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64; - let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64; + assert!(!gather.is_empty(), "Gathered metrics are not empty. qed"); + let extract = |name: &str| -> u64 { + gather + .iter() + .find(|&mf| dbg!(mf.get_name()) == dbg!(name)) + .expect(&format!("Must contain `{}` metric", name)) + .get_metric()[0] + .get_counter() + .get_value() as u64 + }; + + let activated = extract("parachain_activated_heads_total"); + let deactivated = extract("parachain_deactivated_heads_total"); + let relayed = extract("parachain_messages_relayed_total"); let mut result = HashMap::new(); result.insert("activated", activated); result.insert("deactivated", deactivated); @@ -278,10 +293,11 @@ fn overseer_ends_on_subsystem_exit() { let spawner = sp_core::testing::TaskExecutor::new(); executor::block_on(async move { - let all_subsystems = - AllSubsystems::<()>::dummy().replace_candidate_backing(|_| ReturnOnStart); - let (overseer, _handle) = - Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap(); + let (overseer, _handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) + .unwrap() + .replace_candidate_backing(|_| ReturnOnStart) + .build() + .unwrap(); overseer.run().await.unwrap(); }) @@ -379,13 +395,15 @@ fn overseer_start_stop_works() { let (tx_5, mut rx_5) = metered::channel(64); let (tx_6, mut rx_6) = metered::channel(64); - let all_subsystems = AllSubsystems::<()>::dummy() + + let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) + .unwrap() .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) - .replace_candidate_backing(move |_| TestSubsystem6(tx_6)); - let (overseer, handle) = - Overseer::new(vec![first_block], all_subsystems, None, MockSupportsParachains, spawner) - .unwrap(); - let mut handle = Handle::Connected(handle); + .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) + .leaves(block_info_to_pair(vec![first_block])) + .build() + .unwrap(); + let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -475,20 +493,16 @@ fn overseer_finalize_works() { let (tx_5, mut rx_5) = metered::channel(64); let (tx_6, mut rx_6) = metered::channel(64); - let all_subsystems = AllSubsystems::<()>::dummy() - .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) - .replace_candidate_backing(move |_| TestSubsystem6(tx_6)); - // start with two forks of different height. - let (overseer, handle) = Overseer::new( - vec![first_block, second_block], - all_subsystems, - None, - MockSupportsParachains, - spawner, - ) - .unwrap(); - let mut handle = Handle::Connected(handle); + + let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) + .unwrap() + .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) + .replace_candidate_backing(move |_| TestSubsystem6(tx_6)) + .leaves(block_info_to_pair(vec![first_block, second_block])) + .build() + .unwrap(); + let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -570,13 +584,13 @@ fn do_not_send_empty_leaves_update_on_block_finalization() { let (tx_5, mut rx_5) = metered::channel(64); - let all_subsystems = - AllSubsystems::<()>::dummy().replace_candidate_backing(move |_| TestSubsystem6(tx_5)); + let (overseer, handle) = dummy_overseer_builder(spawner, MockSupportsParachains, None) + .unwrap() + .replace_candidate_backing(move |_| TestSubsystem6(tx_5)) + .build() + .unwrap(); - let (overseer, handle) = - Overseer::new(Vec::new(), all_subsystems, None, MockSupportsParachains, spawner) - .unwrap(); - let mut handle = Handle::Connected(handle); + let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -826,32 +840,40 @@ fn overseer_all_subsystems_receive_signals_and_messages() { msgs_received.clone(), ); - let all_subsystems = AllSubsystems { - candidate_validation: subsystem.clone(), - candidate_backing: subsystem.clone(), - collation_generation: subsystem.clone(), - collator_protocol: subsystem.clone(), - statement_distribution: subsystem.clone(), - availability_distribution: subsystem.clone(), - availability_recovery: subsystem.clone(), - bitfield_signing: subsystem.clone(), - bitfield_distribution: subsystem.clone(), - provisioner: subsystem.clone(), - runtime_api: subsystem.clone(), - availability_store: subsystem.clone(), - network_bridge: subsystem.clone(), - chain_api: subsystem.clone(), - approval_distribution: subsystem.clone(), - approval_voting: subsystem.clone(), - gossip_support: subsystem.clone(), - dispute_coordinator: subsystem.clone(), - dispute_participation: subsystem.clone(), - dispute_distribution: subsystem.clone(), - chain_selection: subsystem.clone(), - }; - let (overseer, handle) = - Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap(); - let mut handle = Handle::Connected(handle); + let (overseer, handle) = Overseer::builder() + .candidate_validation(subsystem.clone()) + .candidate_backing(subsystem.clone()) + .collation_generation(subsystem.clone()) + .collator_protocol(subsystem.clone()) + .statement_distribution(subsystem.clone()) + .availability_distribution(subsystem.clone()) + .availability_recovery(subsystem.clone()) + .bitfield_signing(subsystem.clone()) + .bitfield_distribution(subsystem.clone()) + .provisioner(subsystem.clone()) + .runtime_api(subsystem.clone()) + .availability_store(subsystem.clone()) + .network_bridge(subsystem.clone()) + .chain_api(subsystem.clone()) + .approval_distribution(subsystem.clone()) + .approval_voting(subsystem.clone()) + .gossip_support(subsystem.clone()) + .dispute_coordinator(subsystem.clone()) + .dispute_participation(subsystem.clone()) + .dispute_distribution(subsystem.clone()) + .chain_selection(subsystem.clone()) + .leaves(Default::default()) + .span_per_active_leaf(Default::default()) + .active_leaves(Default::default()) + .activation_external_listeners(Default::default()) + .known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE)) + .supports_parachains(MockSupportsParachains) + .spawner(spawner) + .metrics(Metrics::default()) + .build() + .unwrap(); + + let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index c2c300fca74b..6e8490b67b91 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -223,7 +223,7 @@ pub struct Collation { pub hrmp_watermark: BlockNumber, } -/// Signal that is being returned back when a collation was seconded by a validator. +/// Signal that is being returned when a collation was seconded by a validator. #[derive(Debug)] pub struct CollationSecondedSignal { /// The hash of the relay chain block that was used as context to sign [`Self::statement`]. diff --git a/node/service/Cargo.toml b/node/service/Cargo.toml index 213afe4edb1a..66c936e9a6c3 100644 --- a/node/service/Cargo.toml +++ b/node/service/Cargo.toml @@ -26,6 +26,7 @@ sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste sc-basic-authorship = { git = "https://github.com/paritytech/substrate", branch = "master" } service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } telemetry = { package = "sc-telemetry", git = "https://github.com/paritytech/substrate", branch = "master" } +lru = "0.6" # Substrate Primitives sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index bcb181fa04b8..bbe171dd312c 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -24,12 +24,10 @@ mod parachains_db; mod relay_chain_selection; #[cfg(feature = "full-node")] -mod overseer; +pub mod overseer; #[cfg(feature = "full-node")] -pub use self::overseer::{ - create_default_subsystems, OverseerGen, OverseerGenArgs, RealOverseerGen, -}; +pub use self::overseer::{OverseerGen, OverseerGenArgs, RealOverseerGen}; #[cfg(all(test, feature = "disputes"))] mod tests; @@ -54,7 +52,7 @@ use { pub use sp_core::traits::SpawnNamed; #[cfg(feature = "full-node")] pub use { - polkadot_overseer::{Handle, Overseer, OverseerHandle}, + polkadot_overseer::{Handle, Overseer, OverseerConnector, OverseerHandle}, polkadot_primitives::v1::ParachainHost, sc_client_api::AuxStore, sp_authority_discovery::AuthorityDiscoveryApi, @@ -68,6 +66,8 @@ use polkadot_subsystem::jaeger; use std::{sync::Arc, time::Duration}; use prometheus_endpoint::Registry; +#[cfg(feature = "full-node")] +use service::KeystoreContainer; use service::RpcHandlers; use telemetry::TelemetryWorker; #[cfg(feature = "full-node")] @@ -302,14 +302,15 @@ fn jaeger_launch_collector_with_agent( } #[cfg(feature = "full-node")] -type FullSelectChain = relay_chain_selection::SelectRelayChainWithFallback; +type FullSelectChain = relay_chain_selection::SelectRelayChain; #[cfg(feature = "full-node")] -type FullGrandpaBlockImport = grandpa::GrandpaBlockImport< - FullBackend, - Block, - FullClient, - FullSelectChain, ->; +type FullGrandpaBlockImport = + grandpa::GrandpaBlockImport< + FullBackend, + Block, + FullClient, + ChainSelection, + >; #[cfg(feature = "light-node")] type LightBackend = service::TLightBackendWithHash; @@ -319,36 +320,29 @@ type LightClient = service::TLightClientWithBackend; #[cfg(feature = "full-node")] -fn new_partial( +struct Basics +where + RuntimeApi: ConstructRuntimeApi> + + Send + + Sync + + 'static, + RuntimeApi::RuntimeApi: + RuntimeApiCollection>, + ExecutorDispatch: NativeExecutionDispatch + 'static, +{ + task_manager: TaskManager, + client: Arc>, + backend: Arc, + keystore_container: KeystoreContainer, + telemetry: Option, +} + +#[cfg(feature = "full-node")] +fn new_partial_basics( config: &mut Configuration, jaeger_agent: Option, telemetry_worker_handle: Option, -) -> Result< - service::PartialComponents< - FullClient, - FullBackend, - FullSelectChain, - sc_consensus::DefaultImportQueue>, - sc_transaction_pool::FullPool>, - ( - impl service::RpcExtensionBuilder, - ( - babe::BabeBlockImport< - Block, - FullClient, - FullGrandpaBlockImport, - >, - grandpa::LinkHalf, FullSelectChain>, - babe::BabeLink, - beefy_gadget::notification::BeefySignedCommitmentSender, - ), - grandpa::SharedVoterState, - std::time::Duration, // slot-duration - Option, - ), - >, - Error, -> +) -> Result, Error> where RuntimeApi: ConstructRuntimeApi> + Send @@ -391,20 +385,60 @@ where )?; let client = Arc::new(client); - let telemetry = telemetry.map(|(worker, telemetry)| { + jaeger_launch_collector_with_agent(task_manager.spawn_handle(), &*config, jaeger_agent)?; + + let telemetry: Option<_> = telemetry.map(|(worker, telemetry)| { if let Some(worker) = worker { task_manager.spawn_handle().spawn("telemetry", worker.run()); } telemetry }); - jaeger_launch_collector_with_agent(task_manager.spawn_handle(), &*config, jaeger_agent)?; + Ok(Basics { task_manager, client, backend, keystore_container, telemetry }) +} - let select_chain = relay_chain_selection::SelectRelayChainWithFallback::new( - backend.clone(), - Handle::new_disconnected(), - polkadot_node_subsystem_util::metrics::Metrics::register(config.prometheus_registry())?, - ); +#[cfg(feature = "full-node")] +fn new_partial( + config: &mut Configuration, + basics: Basics, + select_chain: ChainSelection, +) -> Result< + service::PartialComponents< + FullClient, + FullBackend, + ChainSelection, + sc_consensus::DefaultImportQueue>, + sc_transaction_pool::FullPool>, + ( + impl service::RpcExtensionBuilder, + ( + babe::BabeBlockImport< + Block, + FullClient, + FullGrandpaBlockImport, + >, + grandpa::LinkHalf, ChainSelection>, + babe::BabeLink, + beefy_gadget::notification::BeefySignedCommitmentSender, + ), + grandpa::SharedVoterState, + std::time::Duration, // slot-duration + Option, + ), + >, + Error, +> +where + RuntimeApi: ConstructRuntimeApi> + + Send + + Sync + + 'static, + RuntimeApi::RuntimeApi: + RuntimeApiCollection>, + ExecutorDispatch: NativeExecutionDispatch + 'static, + ChainSelection: 'static + SelectChain, +{ + let Basics { task_manager, backend, client, keystore_container, telemetry, .. } = basics; let transaction_pool = sc_transaction_pool::BasicPool::new_full( config.transaction_pool.clone(), @@ -674,23 +708,50 @@ where let disable_grandpa = config.disable_grandpa; let name = config.network.node_name.clone(); - let service::PartialComponents { + let overseer_connector = OverseerConnector::default(); + + let handle = Handle(overseer_connector.as_handle().clone()); + + let basics = new_partial_basics::( + &mut config, + jaeger_agent, + telemetry_worker_handle, + )?; + + // we should remove this check before we deploy parachains on polkadot + // TODO: https://github.com/paritytech/polkadot/issues/3326 + let chain_spec = &config.chain_spec as &dyn IdentifyVariant; + + let is_relay_chain = chain_spec.is_kusama() || + chain_spec.is_westend() || + chain_spec.is_rococo() || + chain_spec.is_wococo(); + + let prometheus_registry = config.prometheus_registry().cloned(); + + use relay_chain_selection::SelectRelayChain; + + let select_chain = SelectRelayChain::new( + basics.backend.clone(), + is_relay_chain, + handle.clone(), + polkadot_node_subsystem_util::metrics::Metrics::register(prometheus_registry.as_ref())?, + ); + let service::PartialComponents::<_, _, SelectRelayChain<_>, _, _, _> { client, backend, mut task_manager, keystore_container, - mut select_chain, + select_chain, import_queue, transaction_pool, other: (rpc_extensions_builder, import_setup, rpc_setup, slot_duration, mut telemetry), - } = new_partial::( + } = new_partial::>( &mut config, - jaeger_agent, - telemetry_worker_handle, + basics, + select_chain, )?; - let prometheus_registry = config.prometheus_registry().cloned(); - let shared_voter_state = rpc_setup; let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht; @@ -850,8 +911,10 @@ where local_keystore.and_then(move |k| authority_discovery_service.map(|a| (a, k))); let overseer_handle = if let Some((authority_discovery_service, keystore)) = maybe_params { - let (overseer, overseer_handle) = overseer_gen + // already have access to the handle + let (overseer, _handle) = overseer_gen .generate::>( + overseer_connector, OverseerGenArgs { leaves: active_leaves, keystore, @@ -875,40 +938,29 @@ where dispute_coordinator_config, }, )?; - let handle = Handle::Connected(overseer_handle.clone()); - let handle_clone = handle.clone(); - task_manager.spawn_essential_handle().spawn_blocking( - "overseer", - Box::pin(async move { - use futures::{pin_mut, select, FutureExt}; + { + let handle = handle.clone(); + task_manager.spawn_essential_handle().spawn_blocking( + "overseer", + Box::pin(async move { + use futures::{pin_mut, select, FutureExt}; - let forward = polkadot_overseer::forward_events(overseer_client, handle_clone); + let forward = polkadot_overseer::forward_events(overseer_client, handle); - let forward = forward.fuse(); - let overseer_fut = overseer.run().fuse(); + let forward = forward.fuse(); + let overseer_fut = overseer.run().fuse(); - pin_mut!(overseer_fut); - pin_mut!(forward); + pin_mut!(overseer_fut); + pin_mut!(forward); - select! { - _ = forward => (), - _ = overseer_fut => (), - complete => (), - } - }), - ); - // we should remove this check before we deploy parachains on polkadot - // TODO: https://github.com/paritytech/polkadot/issues/3326 - let should_connect_overseer = chain_spec.is_kusama() || - chain_spec.is_westend() || - chain_spec.is_rococo() || - chain_spec.is_wococo(); - - if should_connect_overseer { - select_chain.connect_to_overseer(overseer_handle.clone()); - } else { - tracing::info!("Overseer is running in the disconnected state"); + select! { + _ = forward => (), + _ = overseer_fut => (), + complete => (), + } + }), + ); } Some(handle) } else { @@ -1228,6 +1280,32 @@ where Ok((task_manager, rpc_handlers)) } +#[cfg(feature = "full-node")] +macro_rules! chain_ops { + ($config:expr, $jaeger_agent:expr, $telemetry_worker_handle:expr; $scope:ident, $executor:ident, $variant:ident) => {{ + let telemetry_worker_handle = $telemetry_worker_handle; + let jaeger_agent = $jaeger_agent; + let mut config = $config; + let basics = new_partial_basics::<$scope::RuntimeApi, $executor>( + config, + jaeger_agent, + telemetry_worker_handle, + )?; + + use ::sc_consensus::LongestChain; + // use the longest chain selection, since there is no overseer available + let chain_selection = LongestChain::new(basics.backend.clone()); + + let service::PartialComponents { client, backend, import_queue, task_manager, .. } = + new_partial::<$scope::RuntimeApi, $executor, LongestChain<_, Block>>( + &mut config, + basics, + chain_selection, + )?; + Ok((Arc::new(Client::$variant(client)), backend, import_queue, task_manager)) + }}; +} + /// Builds a new object suitable for chain operations. #[cfg(feature = "full-node")] pub fn new_chain_ops( @@ -1244,50 +1322,27 @@ pub fn new_chain_ops( > { config.keystore = service::config::KeystoreConfig::InMemory; + let telemetry_worker_handle = None; + #[cfg(feature = "rococo-native")] if config.chain_spec.is_rococo() || config.chain_spec.is_wococo() { - let service::PartialComponents { client, backend, import_queue, task_manager, .. } = - new_partial::( - config, - jaeger_agent, - None, - )?; - return Ok((Arc::new(Client::Rococo(client)), backend, import_queue, task_manager)) + return chain_ops!(config, jaeger_agent, telemetry_worker_handle; rococo_runtime, RococoExecutorDispatch, Rococo) } #[cfg(feature = "kusama-native")] if config.chain_spec.is_kusama() { - let service::PartialComponents { client, backend, import_queue, task_manager, .. } = - new_partial::( - config, - jaeger_agent, - None, - )?; - return Ok((Arc::new(Client::Kusama(client)), backend, import_queue, task_manager)) + return chain_ops!(config, jaeger_agent, telemetry_worker_handle; kusama_runtime, KusamaExecutorDispatch, Kusama) } #[cfg(feature = "westend-native")] if config.chain_spec.is_westend() { - let service::PartialComponents { client, backend, import_queue, task_manager, .. } = - new_partial::( - config, - jaeger_agent, - None, - )?; - return Ok((Arc::new(Client::Westend(client)), backend, import_queue, task_manager)) + return chain_ops!(config, jaeger_agent, telemetry_worker_handle; westend_runtime, WestendExecutorDispatch, Westend) } #[cfg(feature = "polkadot-native")] { - let service::PartialComponents { client, backend, import_queue, task_manager, .. } = - new_partial::( - config, - jaeger_agent, - None, - )?; - return Ok((Arc::new(Client::Polkadot(client)), backend, import_queue, task_manager)) + return chain_ops!(config, jaeger_agent, telemetry_worker_handle; polkadot_runtime, PolkadotExecutorDispatch, Polkadot) } - #[cfg(not(feature = "polkadot-native"))] Err(Error::NoRuntime) } diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs index 66156fd18298..90f571c23cf4 100644 --- a/node/service/src/overseer.rs +++ b/node/service/src/overseer.rs @@ -15,6 +15,7 @@ // along with Polkadot. If not, see . use super::{AuthorityDiscoveryApi, Block, Error, Hash, IsCollator, Registry, SpawnNamed}; +use lru::LruCache; use polkadot_availability_distribution::IncomingRequestReceivers; use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig; use polkadot_node_core_av_store::Config as AvailabilityConfig; @@ -22,7 +23,15 @@ use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig use polkadot_node_core_chain_selection::Config as ChainSelectionConfig; use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig; use polkadot_node_network_protocol::request_response::{v1 as request_v1, IncomingRequestReceiver}; -use polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandle}; +#[cfg(feature = "malus")] +pub use polkadot_overseer::{ + dummy::{dummy_overseer_builder, DummySubsystem}, + HeadSupportsParachains, +}; +pub use polkadot_overseer::{ + BlockInfo, MetricsTrait, Overseer, OverseerBuilder, OverseerConnector, OverseerHandle, +}; + use polkadot_primitives::v1::ParachainHost; use sc_authority_discovery::Service as AuthorityDiscoveryService; use sc_client_api::AuxStore; @@ -99,12 +108,11 @@ where pub dispute_coordinator_config: DisputeCoordinatorConfig, } -/// Create a default, unaltered set of subsystems. -/// -/// A convenience for usage with malus, to avoid -/// repetitive code across multiple behavior strain implementations. -pub fn create_default_subsystems<'a, Spawner, RuntimeClient>( +/// Obtain a prepared `OverseerBuilder`, that is initialized +/// with all default values. +pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>( OverseerGenArgs { + leaves, keystore, runtime_client, parachains_db, @@ -124,10 +132,11 @@ pub fn create_default_subsystems<'a, Spawner, RuntimeClient>( candidate_validation_config, chain_selection_config, dispute_coordinator_config, - .. }: OverseerGenArgs<'a, Spawner, RuntimeClient>, ) -> Result< - AllSubsystems< + OverseerBuilder< + Spawner, + Arc, CandidateValidationSubsystem, CandidateBackingSubsystem, StatementDistributionSubsystem, @@ -161,41 +170,45 @@ where Spawner: 'static + SpawnNamed + Clone + Unpin, { use polkadot_node_subsystem_util::metrics::Metrics; + use std::iter::FromIterator; + + let metrics: polkadot_overseer::Metrics = + ::register(registry)?; - let all_subsystems = AllSubsystems { - availability_distribution: AvailabilityDistributionSubsystem::new( + let builder = Overseer::builder() + .availability_distribution(AvailabilityDistributionSubsystem::new( keystore.clone(), IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver }, Metrics::register(registry)?, - ), - availability_recovery: AvailabilityRecoverySubsystem::with_chunks_only( + )) + .availability_recovery(AvailabilityRecoverySubsystem::with_chunks_only( available_data_req_receiver, Metrics::register(registry)?, - ), - availability_store: AvailabilityStoreSubsystem::new( + )) + .availability_store(AvailabilityStoreSubsystem::new( parachains_db.clone(), availability_config, Metrics::register(registry)?, - ), - bitfield_distribution: BitfieldDistributionSubsystem::new(Metrics::register(registry)?), - bitfield_signing: BitfieldSigningSubsystem::new( + )) + .bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?)) + .bitfield_signing(BitfieldSigningSubsystem::new( spawner.clone(), keystore.clone(), Metrics::register(registry)?, - ), - candidate_backing: CandidateBackingSubsystem::new( + )) + .candidate_backing(CandidateBackingSubsystem::new( spawner.clone(), keystore.clone(), Metrics::register(registry)?, - ), - candidate_validation: CandidateValidationSubsystem::with_config( + )) + .candidate_validation(CandidateValidationSubsystem::with_config( candidate_validation_config, Metrics::register(registry)?, // candidate-validation metrics Metrics::register(registry)?, // validation host metrics - ), - chain_api: ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?), - collation_generation: CollationGenerationSubsystem::new(Metrics::register(registry)?), - collator_protocol: { + )) + .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?)) + .collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?)) + .collator_protocol({ let side = match is_collator { IsCollator::Yes(collator_pair) => ProtocolSide::Collator( network_service.local_peer_id().clone(), @@ -210,49 +223,60 @@ where }, }; CollatorProtocolSubsystem::new(side) - }, - network_bridge: NetworkBridgeSubsystem::new( + }) + .network_bridge(NetworkBridgeSubsystem::new( network_service.clone(), authority_discovery_service.clone(), Box::new(network_service.clone()), Metrics::register(registry)?, - ), - provisioner: ProvisionerSubsystem::new(spawner.clone(), (), Metrics::register(registry)?), - runtime_api: RuntimeApiSubsystem::new( + )) + .provisioner(ProvisionerSubsystem::new(spawner.clone(), (), Metrics::register(registry)?)) + .runtime_api(RuntimeApiSubsystem::new( runtime_client.clone(), Metrics::register(registry)?, spawner.clone(), - ), - statement_distribution: StatementDistributionSubsystem::new( + )) + .statement_distribution(StatementDistributionSubsystem::new( keystore.clone(), statement_req_receiver, Metrics::register(registry)?, - ), - approval_distribution: ApprovalDistributionSubsystem::new(Metrics::register(registry)?), - approval_voting: ApprovalVotingSubsystem::with_config( + )) + .approval_distribution(ApprovalDistributionSubsystem::new(Metrics::register(registry)?)) + .approval_voting(ApprovalVotingSubsystem::with_config( approval_voting_config, parachains_db.clone(), keystore.clone(), Box::new(network_service.clone()), Metrics::register(registry)?, - ), - gossip_support: GossipSupportSubsystem::new(keystore.clone()), - dispute_coordinator: DisputeCoordinatorSubsystem::new( + )) + .gossip_support(GossipSupportSubsystem::new(keystore.clone())) + .dispute_coordinator(DisputeCoordinatorSubsystem::new( parachains_db.clone(), dispute_coordinator_config, keystore.clone(), Metrics::register(registry)?, - ), - dispute_participation: DisputeParticipationSubsystem::new(), - dispute_distribution: DisputeDistributionSubsystem::new( + )) + .dispute_participation(DisputeParticipationSubsystem::new()) + .dispute_distribution(DisputeDistributionSubsystem::new( keystore.clone(), dispute_req_receiver, authority_discovery_service.clone(), Metrics::register(registry)?, - ), - chain_selection: ChainSelectionSubsystem::new(chain_selection_config, parachains_db), - }; - Ok(all_subsystems) + )) + .chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db)) + .leaves(Vec::from_iter( + leaves + .into_iter() + .map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)), + )) + .activation_external_listeners(Default::default()) + .span_per_active_leaf(Default::default()) + .active_leaves(Default::default()) + .supports_parachains(runtime_client) + .known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE)) + .metrics(metrics) + .spawner(spawner); + Ok(builder) } /// Trait for the `fn` generating the overseer. @@ -263,6 +287,7 @@ pub trait OverseerGen { /// Overwrite the full generation of the overseer, including the subsystems. fn generate<'a, Spawner, RuntimeClient>( &self, + connector: OverseerConnector, args: OverseerGenArgs<'a, Spawner, RuntimeClient>, ) -> Result<(Overseer>, OverseerHandle), Error> where @@ -271,19 +296,22 @@ pub trait OverseerGen { Spawner: 'static + SpawnNamed + Clone + Unpin, { let gen = RealOverseerGen; - RealOverseerGen::generate::(&gen, args) + RealOverseerGen::generate::(&gen, connector, args) } // It would be nice to make `create_subsystems` part of this trait, // but the amount of generic arguments that would be required as // as consequence make this rather annoying to implement and use. } +use polkadot_overseer::KNOWN_LEAVES_CACHE_SIZE; + /// The regular set of subsystems. pub struct RealOverseerGen; impl OverseerGen for RealOverseerGen { fn generate<'a, Spawner, RuntimeClient>( &self, + connector: OverseerConnector, args: OverseerGenArgs<'a, Spawner, RuntimeClient>, ) -> Result<(Overseer>, OverseerHandle), Error> where @@ -291,14 +319,8 @@ impl OverseerGen for RealOverseerGen { RuntimeClient::Api: ParachainHost + BabeApi + AuthorityDiscoveryApi, Spawner: 'static + SpawnNamed + Clone + Unpin, { - let spawner = args.spawner.clone(); - let leaves = args.leaves.clone(); - let runtime_client = args.runtime_client.clone(); - let registry = args.registry.clone(); - - let all_subsystems = create_default_subsystems::(args)?; - - Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner) + prepared_overseer_builder(args)? + .build_with_connector(connector) .map_err(|e| e.into()) } } diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index 184d526eac47..36ab34205d94 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -39,7 +39,7 @@ use super::{HeaderProvider, HeaderProviderProvider}; use consensus_common::{Error as ConsensusError, SelectChain}; use futures::channel::oneshot; use polkadot_node_subsystem_util::metrics::{self, prometheus}; -use polkadot_overseer::{AllMessages, Handle, OverseerHandle}; +use polkadot_overseer::{AllMessages, Handle}; use polkadot_primitives::v1::{ Block as PolkadotBlock, BlockNumber, Hash, Header as PolkadotHeader, }; @@ -109,66 +109,58 @@ impl Metrics { } /// A chain-selection implementation which provides safety for relay chains. -pub struct SelectRelayChainWithFallback> { - // A fallback to use in case the overseer is disconnected. - // - // This is used on relay chains which have not yet enabled - // parachains as well as situations where the node is offline. - fallback: sc_consensus::LongestChain, - selection: SelectRelayChain, +pub struct SelectRelayChain> { + /// If `false`, will only use the `longest_chain` selection. + is_relay_chain: bool, + longest_chain: sc_consensus::LongestChain, + selection: SelectRelayChainInner, } -impl Clone for SelectRelayChainWithFallback +impl Clone for SelectRelayChain where B: sc_client_api::Backend, - SelectRelayChain: Clone, + SelectRelayChainInner: Clone, { fn clone(&self) -> Self { - Self { fallback: self.fallback.clone(), selection: self.selection.clone() } + Self { + is_relay_chain: self.is_relay_chain, + longest_chain: self.longest_chain.clone(), + selection: self.selection.clone(), + } } } -impl SelectRelayChainWithFallback +impl SelectRelayChain where B: sc_client_api::Backend + 'static, { - /// Create a new [`SelectRelayChainWithFallback`] wrapping the given chain backend + /// Create a new [`SelectRelayChain`] wrapping the given chain backend /// and a handle to the overseer. - pub fn new(backend: Arc, overseer: Handle, metrics: Metrics) -> Self { - SelectRelayChainWithFallback { - fallback: sc_consensus::LongestChain::new(backend.clone()), - selection: SelectRelayChain::new(backend, overseer, metrics), + pub fn new(backend: Arc, is_relay_chain: bool, overseer: Handle, metrics: Metrics) -> Self { + SelectRelayChain { + is_relay_chain, + longest_chain: sc_consensus::LongestChain::new(backend.clone()), + selection: SelectRelayChainInner::new(backend, overseer, metrics), } } } -impl SelectRelayChainWithFallback -where - B: sc_client_api::Backend + 'static, -{ - /// Given an overseer handle, this connects the [`SelectRelayChainWithFallback`]'s - /// internal handle and its clones to the same overseer. - pub fn connect_to_overseer(&mut self, handle: OverseerHandle) { - self.selection.overseer.connect_to_overseer(handle); - } -} - #[async_trait::async_trait] -impl SelectChain for SelectRelayChainWithFallback +impl SelectChain for SelectRelayChain where B: sc_client_api::Backend + 'static, { async fn leaves(&self) -> Result, ConsensusError> { - if self.selection.overseer.is_disconnected() { - return self.fallback.leaves().await + if !self.is_relay_chain { + return self.longest_chain.leaves().await } self.selection.leaves().await } async fn best_chain(&self) -> Result { - if self.selection.overseer.is_disconnected() { - return self.fallback.best_chain().await + if !self.is_relay_chain { + return self.longest_chain.best_chain().await } self.selection.best_chain().await } @@ -179,34 +171,37 @@ where maybe_max_number: Option, ) -> Result, ConsensusError> { let longest_chain_best = - self.fallback.finality_target(target_hash, maybe_max_number).await?; + self.longest_chain.finality_target(target_hash, maybe_max_number).await?; - if self.selection.overseer.is_disconnected() { + if !self.is_relay_chain { return Ok(longest_chain_best) } self.selection - .finality_target_with_fallback(target_hash, longest_chain_best, maybe_max_number) + .finality_target_with_longest_chain(target_hash, longest_chain_best, maybe_max_number) .await } } -/// A chain-selection implementation which provides safety for relay chains -/// but does not handle situations where the overseer is not yet connected. -pub struct SelectRelayChain { +/// A chain-selection implementation which provides safety for relay chains. +/// +/// The chain selection algorithm is aware of ongoing disputes and avoids +/// finalizing those chains, but as consequence requires access to +/// a running [`Overseer`]. +pub struct SelectRelayChainInner { backend: Arc, overseer: OH, metrics: Metrics, } -impl SelectRelayChain +impl SelectRelayChainInner where B: HeaderProviderProvider, OH: OverseerHandleT, { - /// Create a new [`SelectRelayChain`] wrapping the given chain backend + /// Create a new [`SelectRelayChainInner`] wrapping the given chain backend /// and a handle to the overseer. pub fn new(backend: Arc, overseer: OH, metrics: Metrics) -> Self { - SelectRelayChain { backend, overseer, metrics } + SelectRelayChainInner { backend, overseer, metrics } } fn block_header(&self, hash: Hash) -> Result { @@ -234,13 +229,13 @@ where } } -impl Clone for SelectRelayChain +impl Clone for SelectRelayChainInner where B: HeaderProviderProvider + Send + Sync, OH: OverseerHandleT, { fn clone(&self) -> Self { - SelectRelayChain { + SelectRelayChainInner { backend: self.backend.clone(), overseer: self.overseer.clone(), metrics: self.metrics.clone(), @@ -273,7 +268,7 @@ impl OverseerHandleT for Handle { } } -impl SelectRelayChain +impl SelectRelayChainInner where B: HeaderProviderProvider, OH: OverseerHandleT, @@ -317,7 +312,7 @@ where /// /// It will also constrain the chain to only chains which are fully /// approved, and chains which contain no disputes. - pub(crate) async fn finality_target_with_fallback( + pub(crate) async fn finality_target_with_longest_chain( &self, target_hash: Hash, best_leaf: Option, diff --git a/node/service/src/tests.rs b/node/service/src/tests.rs index 34ac69d78cc7..7dc5fe19ecbd 100644 --- a/node/service/src/tests.rs +++ b/node/service/src/tests.rs @@ -79,7 +79,7 @@ fn test_harness>( let (finality_target_tx, finality_target_rx) = oneshot::channel::>(); - let select_relay_chain = SelectRelayChain::::new( + let select_relay_chain = SelectRelayChainInner::::new( Arc::new(case_vars.chain.clone()), context.sender().clone(), Default::default(), diff --git a/node/subsystem-test-helpers/src/lib.rs b/node/subsystem-test-helpers/src/lib.rs index 80dae08825b0..5832ddb7e15f 100644 --- a/node/subsystem-test-helpers/src/lib.rs +++ b/node/subsystem-test-helpers/src/lib.rs @@ -372,7 +372,7 @@ mod tests { use super::*; use futures::executor::block_on; use polkadot_node_subsystem::messages::CollatorProtocolMessage; - use polkadot_overseer::{AllSubsystems, Handle, HeadSupportsParachains, Overseer}; + use polkadot_overseer::{dummy::dummy_overseer_builder, Handle, HeadSupportsParachains}; use polkadot_primitives::v1::Hash; struct AlwaysSupportsParachains; @@ -386,17 +386,15 @@ mod tests { fn forward_subsystem_works() { let spawner = sp_core::testing::TaskExecutor::new(); let (tx, rx) = mpsc::channel(2); - let all_subsystems = - AllSubsystems::<()>::dummy().replace_collator_protocol(|_| ForwardSubsystem(tx)); - let (overseer, handle) = Overseer::new( - Vec::new(), - all_subsystems, - None, - AlwaysSupportsParachains, - spawner.clone(), - ) - .unwrap(); - let mut handle = Handle::Connected(handle); + let (overseer, handle) = + dummy_overseer_builder(spawner.clone(), AlwaysSupportsParachains, None) + .unwrap() + .replace_collator_protocol(|_| ForwardSubsystem(tx)) + .leaves(vec![]) + .build() + .unwrap(); + + let mut handle = Handle(handle); spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed()); diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 691289614d14..232d3e68f963 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -513,7 +513,7 @@ pub enum ChainApiMessage { /// Get the cumulative weight of the given block, by hash. /// If the block or weight is unknown, this returns `None`. /// - /// Note: this the weight within the low-level fork-choice rule, + /// Note: this is the weight within the low-level fork-choice rule, /// not the high-level one implemented in the chain-selection subsystem. /// /// Weight is used for comparing blocks in a fork-choice rule. diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs index f0918ab1dc02..ffe13e5868a9 100644 --- a/node/subsystem/src/lib.rs +++ b/node/subsystem/src/lib.rs @@ -24,7 +24,9 @@ pub use jaeger::*; pub use polkadot_node_jaeger as jaeger; -pub use polkadot_overseer::{self as overseer, ActiveLeavesUpdate, DummySubsystem, OverseerSignal}; +pub use polkadot_overseer::{ + self as overseer, dummy::DummySubsystem, ActiveLeavesUpdate, OverseerSignal, +}; pub use polkadot_node_subsystem_types::{ errors::{self, *}, diff --git a/runtime/common/src/crowdloan.rs b/runtime/common/src/crowdloan.rs index a97825d445c2..d7fe48c496f8 100644 --- a/runtime/common/src/crowdloan.rs +++ b/runtime/common/src/crowdloan.rs @@ -275,7 +275,7 @@ pub mod pallet { FirstPeriodTooFarInFuture, /// Last lease period must be greater than first lease period. LastPeriodBeforeFirstPeriod, - /// The last lease period cannot be more then 3 periods after the first period. + /// The last lease period cannot be more than 3 periods after the first period. LastPeriodTooFarInFuture, /// The campaign ends before the current block number. The end must be in the future. CannotEndInPast, diff --git a/runtime/common/src/elections.rs b/runtime/common/src/elections.rs index 4741898f061c..ebf46d5ccb22 100644 --- a/runtime/common/src/elections.rs +++ b/runtime/common/src/elections.rs @@ -67,8 +67,8 @@ pub type GenesisElectionOf = /// pallet-election-provider-multi-phase. pub const MINER_MAX_ITERATIONS: u32 = 10; -/// A source of random balance for the NPoS Solver, which is meant to be run by the offchain worker -/// election miner. +/// A source of random balance for the NPoS Solver, which is meant to be run by the off-chain worker election +/// miner. pub struct OffchainRandomBalancing; impl frame_support::pallet_prelude::Get> for OffchainRandomBalancing diff --git a/runtime/common/src/purchase.rs b/runtime/common/src/purchase.rs index 5f86bf4226d5..42a545730c94 100644 --- a/runtime/common/src/purchase.rs +++ b/runtime/common/src/purchase.rs @@ -34,7 +34,7 @@ use sp_std::prelude::*; type BalanceOf = <::Currency as Currency<::AccountId>>::Balance; -/// The kind of a statement an account needs to make for a claim to be valid. +/// The kind of statement an account needs to make for a claim to be valid. #[derive(Encode, Decode, Clone, Copy, Eq, PartialEq, RuntimeDebug, TypeInfo)] pub enum AccountValidity { /// Account is not valid. diff --git a/runtime/common/src/traits.rs b/runtime/common/src/traits.rs index eb5e0eda517b..938fefd963c0 100644 --- a/runtime/common/src/traits.rs +++ b/runtime/common/src/traits.rs @@ -127,7 +127,7 @@ pub trait Leaser { ) -> Result<(), LeaseError>; /// Return the amount of balance currently held in reserve on `leaser`'s account for leasing `para`. This won't - /// go down outside of a lease period. + /// go down outside a lease period. fn deposit_held( para: ParaId, leaser: &Self::AccountId, diff --git a/runtime/parachains/src/hrmp.rs b/runtime/parachains/src/hrmp.rs index f087dfd3a405..c9cb13edff2b 100644 --- a/runtime/parachains/src/hrmp.rs +++ b/runtime/parachains/src/hrmp.rs @@ -386,7 +386,7 @@ pub mod pallet { /// parameters. /// /// - `proposed_max_capacity` - specifies how many messages can be in the channel at once. - /// - `proposed_max_message_size` - specifies the maximum size of any of the messages. + /// - `proposed_max_message_size` - specifies the maximum size of the messages. /// /// These numbers are a subject to the relay-chain configuration limits. /// diff --git a/xcm/pallet-xcm/src/lib.rs b/xcm/pallet-xcm/src/lib.rs index 972e3f926829..b222c73a9c69 100644 --- a/xcm/pallet-xcm/src/lib.rs +++ b/xcm/pallet-xcm/src/lib.rs @@ -79,7 +79,7 @@ pub mod pallet { /// The overarching event type. type Event: From> + IsType<::Event>; - /// Required origin for sending XCM messages. If successful, the it resolves to `MultiLocation` + /// Required origin for sending XCM messages. If successful, it resolves to `MultiLocation` /// which exists as an interior location within this chain's XCM context. type SendXcmOrigin: EnsureOrigin<::Origin, Success = MultiLocation>; @@ -328,7 +328,7 @@ pub mod pallet { #[pallet::storage] pub(super) type SafeXcmVersion = StorageValue<_, XcmVersion, OptionQuery>; - /// Latest versions that we know various locations support. + /// The Latest versions that we know various locations support. #[pallet::storage] pub(super) type SupportedVersion = StorageDoubleMap< _, diff --git a/xcm/src/v1/multilocation.rs b/xcm/src/v1/multilocation.rs index 9c472553a37c..a8f1872bf042 100644 --- a/xcm/src/v1/multilocation.rs +++ b/xcm/src/v1/multilocation.rs @@ -99,7 +99,7 @@ impl MultiLocation { MultiLocation { parents, interior: Junctions::Here } } - /// Whether or not the `MultiLocation` has no parents and has a `Here` interior. + /// Whether the `MultiLocation` has no parents and has a `Here` interior. pub const fn is_here(&self) -> bool { self.parents == 0 && self.interior.len() == 0 } @@ -119,7 +119,7 @@ impl MultiLocation { self.parents } - /// Returns boolean indicating whether or not `self` contains only the specified amount of + /// Returns boolean indicating whether `self` contains only the specified amount of /// parents and no interior junctions. pub const fn contains_parents_only(&self, count: u8) -> bool { matches!(self.interior, Junctions::Here) && self.parents == count