From 86563cb96ddad91517302161b6c35dfa06db94b1 Mon Sep 17 00:00:00 2001 From: Andrei Sandu <54316454+sandreim@users.noreply.github.com> Date: Mon, 9 Jan 2023 13:34:30 +0200 Subject: [PATCH 1/6] Remove unused code (#6525) Signed-off-by: Andrei Sandu Signed-off-by: Andrei Sandu --- node/subsystem-types/src/messages.rs | 113 --------------------------- 1 file changed, 113 deletions(-) diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index cb0834c5ba34..bf1dbe552f5f 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -58,12 +58,6 @@ use std::{ pub mod network_bridge_event; pub use network_bridge_event::NetworkBridgeEvent; -/// Subsystem messages where each message is always bound to a relay parent. -pub trait BoundToRelayParent { - /// Returns the relay parent this message is bound to. - fn relay_parent(&self) -> Hash; -} - /// Messages received by the Candidate Backing subsystem. #[derive(Debug)] pub enum CandidateBackingMessage { @@ -78,16 +72,6 @@ pub enum CandidateBackingMessage { Statement(Hash, SignedFullStatement), } -impl BoundToRelayParent for CandidateBackingMessage { - fn relay_parent(&self) -> Hash { - match self { - Self::GetBackedCandidates(hash, _, _) => *hash, - Self::Second(hash, _, _) => *hash, - Self::Statement(hash, _) => *hash, - } - } -} - /// Blanket error for validation failing for internal reasons. #[derive(Debug, Error)] #[error("Validation failed with {0:?}")] @@ -170,17 +154,6 @@ pub enum CandidateValidationMessage { ), } -impl CandidateValidationMessage { - /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option { - match self { - Self::ValidateFromChainState(_, _, _, _) => None, - Self::ValidateFromExhaustive(_, _, _, _, _, _) => None, - Self::PreCheck(relay_parent, _, _) => Some(*relay_parent), - } - } -} - /// Messages received by the Collator Protocol subsystem. #[derive(Debug, derive_more::From)] pub enum CollatorProtocolMessage { @@ -218,12 +191,6 @@ impl Default for CollatorProtocolMessage { } } -impl BoundToRelayParent for CollatorProtocolMessage { - fn relay_parent(&self) -> Hash { - Default::default() - } -} - /// Messages received by the dispute coordinator subsystem. /// /// NOTE: Any response oneshots might get cancelled if the `DisputeCoordinator` was not yet @@ -397,23 +364,6 @@ pub enum NetworkBridgeTxMessage { }, } -impl NetworkBridgeTxMessage { - /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option { - match self { - Self::ReportPeer(_, _) => None, - Self::DisconnectPeer(_, _) => None, - Self::SendValidationMessage(_, _) => None, - Self::SendCollationMessage(_, _) => None, - Self::SendValidationMessages(_) => None, - Self::SendCollationMessages(_) => None, - Self::ConnectToValidators { .. } => None, - Self::ConnectToResolvedValidators { .. } => None, - Self::SendRequests { .. } => None, - } - } -} - /// Availability Distribution Message. #[derive(Debug)] pub enum AvailabilityDistributionMessage { @@ -463,16 +413,6 @@ pub enum BitfieldDistributionMessage { NetworkBridgeUpdate(NetworkBridgeEvent), } -impl BitfieldDistributionMessage { - /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option { - match self { - Self::DistributeBitfield(hash, _) => Some(*hash), - Self::NetworkBridgeUpdate(_) => None, - } - } -} - /// Availability store subsystem message. #[derive(Debug)] pub enum AvailabilityStoreMessage { @@ -526,13 +466,6 @@ pub enum AvailabilityStoreMessage { }, } -impl AvailabilityStoreMessage { - /// In fact, none of the `AvailabilityStore` messages assume a particular relay parent. - pub fn relay_parent(&self) -> Option { - None - } -} - /// A response channel for the result of a chain API request. pub type ChainApiResponseChannel = oneshot::Sender>; @@ -575,13 +508,6 @@ pub enum ChainApiMessage { }, } -impl ChainApiMessage { - /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option { - None - } -} - /// Chain selection subsystem messages #[derive(Debug)] pub enum ChainSelectionMessage { @@ -594,20 +520,6 @@ pub enum ChainSelectionMessage { BestLeafContaining(Hash, oneshot::Sender>), } -impl ChainSelectionMessage { - /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option { - // None of the messages, even the ones containing specific - // block hashes, can be considered to have those blocks as - // a relay parent. - match *self { - ChainSelectionMessage::Approved(_) => None, - ChainSelectionMessage::Leaves(_) => None, - ChainSelectionMessage::BestLeafContaining(..) => None, - } - } -} - /// A sender for the result of a runtime API request. pub type RuntimeApiSender = oneshot::Sender>; @@ -702,15 +614,6 @@ pub enum RuntimeApiMessage { Request(Hash, RuntimeApiRequest), } -impl RuntimeApiMessage { - /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option { - match self { - Self::Request(hash, _) => Some(*hash), - } - } -} - /// Statement distribution message. #[derive(Debug, derive_more::From)] pub enum StatementDistributionMessage { @@ -762,15 +665,6 @@ pub enum ProvisionerMessage { ProvisionableData(Hash, ProvisionableData), } -impl BoundToRelayParent for ProvisionerMessage { - fn relay_parent(&self) -> Hash { - match self { - Self::RequestInherentData(hash, _) => *hash, - Self::ProvisionableData(hash, _) => *hash, - } - } -} - /// Message to the Collation Generation subsystem. #[derive(Debug)] pub enum CollationGenerationMessage { @@ -778,13 +672,6 @@ pub enum CollationGenerationMessage { Initialize(CollationGenerationConfig), } -impl CollationGenerationMessage { - /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option { - None - } -} - /// The result type of [`ApprovalVotingMessage::CheckAndImportAssignment`] request. #[derive(Debug, Clone, PartialEq, Eq)] pub enum AssignmentCheckResult { From 9d4173d4b18dad9bd8590567b76eb0e99883ee34 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 9 Jan 2023 13:46:14 +0200 Subject: [PATCH 2/6] Fix flaky test in `dispute-coordinator` (#6524) https://github.com/paritytech/polkadot/pull/6494 updates disputes participation priority on Active Leaves update. This operation might trigger participation in some cases and as a result some of the message ordering is not as nice as it used to be. As a side effect of this `resume_dispute_without_local_statement` was failing occasionally. The solution is not to expect that `BlockNumber`, `CandidateEvents`, `FetchOnChainVotes` and `Ancestors` messages are executed after `FinalizedBlockNumber` and in any specific order. This should be okay as the code is in helper function and doesn't affect the actual test behaviour. Fixes https://github.com/paritytech/polkadot/issues/6514 --- node/core/dispute-coordinator/src/tests.rs | 80 +++++++++------------- 1 file changed, 34 insertions(+), 46 deletions(-) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 023c95d5e23c..b5c2a6bd8e3f 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -395,57 +395,45 @@ impl TestState { ); finished_steps.got_scraping_information = true; tx.send(Ok(0)).unwrap(); - - // If the activated block number is > 1 the scraper will ask for block ancestors. Handle this case. - if block_number > 1 { - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::Ancestors{ - hash, - k, - response_channel, - }) => { - assert_eq!(hash, block_hash); // A bit restrictive, remove if it causes problems. - let target_header = self.headers.get(&hash).expect("The function is called for this block so it should exist"); - let mut response = Vec::new(); - for i in target_header.number.saturating_sub(k as u32)..target_header.number { - response.push(self.block_num_to_header.get(&i).expect("headers and block_num_to_header should always be in sync").clone()); - } - let _ = response_channel.send(Ok(response)); - } - ); - } - - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _new_leaf, - RuntimeApiRequest::CandidateEvents(tx), - )) => { - tx.send(Ok(candidate_events.clone())).unwrap(); - } - ); - gum::trace!("After answering runtime api request"); - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _new_leaf, - RuntimeApiRequest::FetchOnChainVotes(tx), - )) => { - //add some `BackedCandidates` or resolved disputes here as needed - tx.send(Ok(Some(ScrapedOnChainVotes { - session, - backing_validators_per_candidate: Vec::default(), - disputes: MultiDisputeStatementSet::default(), - }))).unwrap(); - } - ); - gum::trace!("After answering runtime API request (votes)"); }, AllMessages::ChainApi(ChainApiMessage::BlockNumber(hash, tx)) => { let block_num = self.headers.get(&hash).map(|header| header.number); tx.send(Ok(block_num)).unwrap(); }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _new_leaf, + RuntimeApiRequest::CandidateEvents(tx), + )) => { + tx.send(Ok(candidate_events.clone())).unwrap(); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _new_leaf, + RuntimeApiRequest::FetchOnChainVotes(tx), + )) => { + //add some `BackedCandidates` or resolved disputes here as needed + tx.send(Ok(Some(ScrapedOnChainVotes { + session, + backing_validators_per_candidate: Vec::default(), + disputes: MultiDisputeStatementSet::default(), + }))) + .unwrap(); + }, + AllMessages::ChainApi(ChainApiMessage::Ancestors { hash, k, response_channel }) => { + let target_header = self + .headers + .get(&hash) + .expect("The function is called for this block so it should exist"); + let mut response = Vec::new(); + for i in target_header.number.saturating_sub(k as u32)..target_header.number { + response.push( + self.block_num_to_header + .get(&i) + .expect("headers and block_num_to_header should always be in sync") + .clone(), + ); + } + let _ = response_channel.send(Ok(response)); + }, msg => { panic!("Received unexpected message in `handle_sync_queries`: {:?}", msg); }, From 1c935f35139f3f43cb22009dc5c010dfbe6ad9e7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 9 Jan 2023 12:05:11 +0000 Subject: [PATCH 3/6] Bump tokio from 1.22.0 to 1.24.1 (#6523) Bumps [tokio](https://github.com/tokio-rs/tokio) from 1.22.0 to 1.24.1. - [Release notes](https://github.com/tokio-rs/tokio/releases) - [Commits](https://github.com/tokio-rs/tokio/compare/tokio-1.22.0...tokio-1.24.1) --- updated-dependencies: - dependency-name: tokio dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 6 +++--- Cargo.toml | 2 +- node/jaeger/Cargo.toml | 2 +- node/metrics/Cargo.toml | 2 +- node/test/service/Cargo.toml | 4 ++-- node/zombienet-backchannel/Cargo.toml | 2 +- parachain/test-parachains/adder/collator/Cargo.toml | 2 +- parachain/test-parachains/undying/collator/Cargo.toml | 2 +- runtime/kusama/Cargo.toml | 2 +- runtime/polkadot/Cargo.toml | 2 +- runtime/rococo/Cargo.toml | 2 +- runtime/westend/Cargo.toml | 2 +- utils/remote-ext-tests/bags-list/Cargo.toml | 2 +- utils/staking-miner/Cargo.toml | 2 +- 14 files changed, 17 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6a9f908457ad..6c5420adb46c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11852,9 +11852,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.22.0" +version = "1.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d76ce4a75fb488c605c54bf610f221cea8b0dafb53333c1a67e8ee199dcd2ae3" +checksum = "1d9f76183f91ecfb55e1d7d5602bd1d979e38a3a522fe900241cf195624d67ae" dependencies = [ "autocfg", "bytes", @@ -11867,7 +11867,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "winapi", + "windows-sys 0.42.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6d01e2b25553..4621c91d5511 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ tikv-jemallocator = "0.5.0" assert_cmd = "2.0.4" nix = "0.24.1" tempfile = "3.2.0" -tokio = "1.22.0" +tokio = "1.24.1" substrate-rpc-client = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-core-primitives = { path = "core-primitives" } diff --git a/node/jaeger/Cargo.toml b/node/jaeger/Cargo.toml index b940307a4920..938ae3853e39 100644 --- a/node/jaeger/Cargo.toml +++ b/node/jaeger/Cargo.toml @@ -14,6 +14,6 @@ polkadot-node-primitives = { path = "../primitives" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } thiserror = "1.0.31" -tokio = "1.22.0" +tokio = "1.24.1" log = "0.4.17" parity-scale-codec = { version = "3.1.5", default-features = false } diff --git a/node/metrics/Cargo.toml b/node/metrics/Cargo.toml index b49928dc8462..815df94a8488 100644 --- a/node/metrics/Cargo.toml +++ b/node/metrics/Cargo.toml @@ -28,7 +28,7 @@ assert_cmd = "2.0.4" nix = "0.24.1" tempfile = "3.2.0" hyper = { version = "0.14.20", default-features = false, features = ["http1", "tcp"] } -tokio = "1.22.0" +tokio = "1.24.1" polkadot-test-service = { path = "../test/service", features=["runtime-metrics"]} substrate-test-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/test/service/Cargo.toml b/node/test/service/Cargo.toml index 6fab3150a623..9fc695210a82 100644 --- a/node/test/service/Cargo.toml +++ b/node/test/service/Cargo.toml @@ -10,7 +10,7 @@ hex = "0.4.3" gum = { package = "tracing-gum", path = "../../gum" } rand = "0.8.5" tempfile = "3.2.0" -tokio = "1.22.0" +tokio = "1.24.1" # Polkadot dependencies polkadot-overseer = { path = "../../overseer" } @@ -61,7 +61,7 @@ substrate-test-client = { git = "https://github.com/paritytech/substrate", branc pallet-balances = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } serde_json = "1.0.81" substrate-test-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } -tokio = { version = "1.22.0", features = ["macros"] } +tokio = { version = "1.24.1", features = ["macros"] } [features] runtime-metrics=["polkadot-test-runtime/runtime-metrics"] diff --git a/node/zombienet-backchannel/Cargo.toml b/node/zombienet-backchannel/Cargo.toml index 74e6751eb7de..f80ff23dd6ac 100644 --- a/node/zombienet-backchannel/Cargo.toml +++ b/node/zombienet-backchannel/Cargo.toml @@ -9,7 +9,7 @@ authors.workspace = true edition.workspace = true [dependencies] -tokio = { version = "1.22.0", default-features = false, features = ["macros", "net", "rt-multi-thread", "sync"] } +tokio = { version = "1.24.1", default-features = false, features = ["macros", "net", "rt-multi-thread", "sync"] } url = "2.0.0" tokio-tungstenite = "0.17" futures-util = "0.3.23" diff --git a/parachain/test-parachains/adder/collator/Cargo.toml b/parachain/test-parachains/adder/collator/Cargo.toml index 3f13eecb96e3..51b9ccad8097 100644 --- a/parachain/test-parachains/adder/collator/Cargo.toml +++ b/parachain/test-parachains/adder/collator/Cargo.toml @@ -44,4 +44,4 @@ substrate-test-utils = { git = "https://github.com/paritytech/substrate", branch sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } -tokio = { version = "1.22.0", features = ["macros"] } +tokio = { version = "1.24.1", features = ["macros"] } diff --git a/parachain/test-parachains/undying/collator/Cargo.toml b/parachain/test-parachains/undying/collator/Cargo.toml index 2543b6f91930..36a40cb779ec 100644 --- a/parachain/test-parachains/undying/collator/Cargo.toml +++ b/parachain/test-parachains/undying/collator/Cargo.toml @@ -44,4 +44,4 @@ substrate-test-utils = { git = "https://github.com/paritytech/substrate", branch sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } -tokio = { version = "1.22.0", features = ["macros"] } +tokio = { version = "1.24.1", features = ["macros"] } diff --git a/runtime/kusama/Cargo.toml b/runtime/kusama/Cargo.toml index 744220a84681..d53082803f8e 100644 --- a/runtime/kusama/Cargo.toml +++ b/runtime/kusama/Cargo.toml @@ -109,7 +109,7 @@ sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" } separator = "0.4.1" serde_json = "1.0.81" remote-externalities = { git = "https://github.com/paritytech/substrate", branch = "master", package = "frame-remote-externalities" } -tokio = { version = "1.22.0", features = ["macros"] } +tokio = { version = "1.24.1", features = ["macros"] } sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } [build-dependencies] diff --git a/runtime/polkadot/Cargo.toml b/runtime/polkadot/Cargo.toml index 941171392dac..8c715491cbf4 100644 --- a/runtime/polkadot/Cargo.toml +++ b/runtime/polkadot/Cargo.toml @@ -101,7 +101,7 @@ sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" } serde_json = "1.0.81" separator = "0.4.1" remote-externalities = { git = "https://github.com/paritytech/substrate", branch = "master", package = "frame-remote-externalities" } -tokio = { version = "1.22.0", features = ["macros"] } +tokio = { version = "1.24.1", features = ["macros"] } sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } [build-dependencies] diff --git a/runtime/rococo/Cargo.toml b/runtime/rococo/Cargo.toml index 0e07e073bd35..2b944a508f8f 100644 --- a/runtime/rococo/Cargo.toml +++ b/runtime/rococo/Cargo.toml @@ -98,7 +98,7 @@ sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" } separator = "0.4.1" serde_json = "1.0.81" sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } -tokio = { version = "1.22.0", features = ["macros"] } +tokio = { version = "1.24.1", features = ["macros"] } [build-dependencies] substrate-wasm-builder = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/runtime/westend/Cargo.toml b/runtime/westend/Cargo.toml index 5afa4469876f..76f45bd5306e 100644 --- a/runtime/westend/Cargo.toml +++ b/runtime/westend/Cargo.toml @@ -101,7 +101,7 @@ keyring = { package = "sp-keyring", git = "https://github.com/paritytech/substra sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" } serde_json = "1.0.81" remote-externalities = { git = "https://github.com/paritytech/substrate", branch = "master", package = "frame-remote-externalities" } -tokio = { version = "1.22.0", features = ["macros"] } +tokio = { version = "1.24.1", features = ["macros"] } sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } [build-dependencies] diff --git a/utils/remote-ext-tests/bags-list/Cargo.toml b/utils/remote-ext-tests/bags-list/Cargo.toml index d77d2017aab3..f8b08767f898 100644 --- a/utils/remote-ext-tests/bags-list/Cargo.toml +++ b/utils/remote-ext-tests/bags-list/Cargo.toml @@ -19,4 +19,4 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } clap = { version = "4.0.9", features = ["derive"] } log = "0.4.17" -tokio = { version = "1.22.0", features = ["macros"] } +tokio = { version = "1.24.1", features = ["macros"] } diff --git a/utils/staking-miner/Cargo.toml b/utils/staking-miner/Cargo.toml index 63810aaa2fd6..20d19451489a 100644 --- a/utils/staking-miner/Cargo.toml +++ b/utils/staking-miner/Cargo.toml @@ -14,7 +14,7 @@ paste = "1.0.7" serde = "1.0.137" serde_json = "1.0" thiserror = "1.0.31" -tokio = { version = "1.22.0", features = ["macros", "rt-multi-thread", "sync"] } +tokio = { version = "1.24.1", features = ["macros", "rt-multi-thread", "sync"] } remote-externalities = { git = "https://github.com/paritytech/substrate", branch = "master", package = "frame-remote-externalities" } signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } From 7b3eb681b3a42743333ceac71a0b42046b661f64 Mon Sep 17 00:00:00 2001 From: Bradley Olson <34992650+BradleyOlson64@users.noreply.github.com> Date: Mon, 9 Jan 2023 07:27:19 -0800 Subject: [PATCH 4/6] Guide changes 3 (#6520) * Guide changes * Adding clarification --- .../src/node/collators/collation-generation.md | 1 - .../implementers-guide/src/node/utility/chain-selection.md | 7 +++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/roadmap/implementers-guide/src/node/collators/collation-generation.md b/roadmap/implementers-guide/src/node/collators/collation-generation.md index 0a17a8619ab1..d7c62fee39f8 100644 --- a/roadmap/implementers-guide/src/node/collators/collation-generation.md +++ b/roadmap/implementers-guide/src/node/collators/collation-generation.md @@ -99,7 +99,6 @@ On `ActiveLeavesUpdate`: * If there is no collation generation config, ignore. * Otherwise, for each `activated` head in the update: * Determine if the para is scheduled on any core by fetching the `availability_cores` Runtime API. - > TODO: figure out what to do in the case of occupied cores; see [this issue](https://github.com/paritytech/polkadot/issues/1573). * Determine an occupied core assumption to make about the para. Scheduled cores can make `OccupiedCoreAssumption::Free`. * Use the Runtime API subsystem to fetch the full validation data. * Invoke the `collator`, and use its outputs to produce a `CandidateReceipt`, signed with the configuration's `key`. diff --git a/roadmap/implementers-guide/src/node/utility/chain-selection.md b/roadmap/implementers-guide/src/node/utility/chain-selection.md index 423b5ed406c0..fc6a9820143b 100644 --- a/roadmap/implementers-guide/src/node/utility/chain-selection.md +++ b/roadmap/implementers-guide/src/node/utility/chain-selection.md @@ -24,10 +24,13 @@ Delete data for all orphaned chains and update all metadata descending from the Update the approval status of the referenced block. If the block was stagnant and thus non-viable and is now viable, then the metadata of all of its descendants needs to be updated as well, as they may no longer be stagnant either. Update the set of viable leaves accordingly. +### `ChainSelectionMessage::Leaves` + +Gets all leaves of the chain, i.e. block hashes that are suitable to build upon and have no suitable children. Supplies the leaves in descending order by score. + ### `ChainSelectionMessage::BestLeafContaining` -If the required block is unknown or not viable, then return `None`. -Iterate over all leaves, returning the first leaf containing the required block in its chain, and `None` otherwise. +If the required block is unknown or not viable, then return `None`. Iterate over all leaves in order of descending weight, returning the first leaf containing the required block in its chain, and `None` otherwise. ### Periodically From be487ae3f8b3fda3cb0a2efe27b1500b550d1ae4 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 10 Jan 2023 04:51:13 -0500 Subject: [PATCH 5/6] Replace async-std with tokio in PVF subsystem (#6419) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Replace async-std with tokio in PVF subsystem * Rework workers to use `select!` instead of a mutex The improvement in code readability is more important than the thread overhead. * Remove unnecessary `fuse` * Add explanation for `expect()` * Update node/core/pvf/src/worker_common.rs Co-authored-by: Bastian Köcher * Update node/core/pvf/src/worker_common.rs Co-authored-by: Bastian Köcher * Address some review comments * Shutdown tokio runtime * Run cargo fmt * Add a small note about retries * Fix up merge * Rework `cpu_time_monitor_loop` to return when other thread finishes * Add error string to PrepareError::IoErr variant * Log when artifacts fail to prepare * Fix `cpu_time_monitor_loop`; fix test * Fix text * Fix a couple of potential minor data races. First data race was due to logging in the CPU monitor thread even if the job (other thread) finished. It can technically finish before or after the log. Maybe best would be to move this log to the `select!`s, where we are guaranteed to have chosen the timed-out branch, although there would be a bit of duplication. Also, it was possible for this thread to complete before we executed `finished_tx.send` in the other thread, which would trigger an error as the receiver has already been dropped. And right now, such a spurious error from `send` would be returned even if the job otherwise succeeded. * Update Cargo.lock Co-authored-by: Bastian Köcher --- Cargo.lock | 149 +-------------- node/core/candidate-validation/src/lib.rs | 9 +- node/core/candidate-validation/src/tests.rs | 2 +- node/core/pvf/Cargo.toml | 3 +- node/core/pvf/src/artifacts.rs | 19 +- node/core/pvf/src/error.rs | 6 +- node/core/pvf/src/execute/queue.rs | 3 +- node/core/pvf/src/execute/worker.rs | 101 +++++----- node/core/pvf/src/host.rs | 49 +++-- node/core/pvf/src/lib.rs | 1 + node/core/pvf/src/prepare/pool.rs | 13 +- node/core/pvf/src/prepare/queue.rs | 16 +- node/core/pvf/src/prepare/worker.rs | 164 ++++++++-------- node/core/pvf/src/worker_common.rs | 201 ++++++++------------ node/core/pvf/tests/it/adder.rs | 8 +- node/core/pvf/tests/it/main.rs | 28 ++- node/core/pvf/tests/it/worker_common.rs | 4 +- node/primitives/src/lib.rs | 2 +- 18 files changed, 302 insertions(+), 476 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6c5420adb46c..ad9d2b913bca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -305,57 +305,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" -[[package]] -name = "async-attributes" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" -dependencies = [ - "quote", - "syn", -] - -[[package]] -name = "async-channel" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - -[[package]] -name = "async-executor" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "871f9bb5e0a22eeb7e8cf16641feb87c9dc67032ccf8ff49e772eb9941d3a965" -dependencies = [ - "async-task", - "concurrent-queue", - "fastrand", - "futures-lite", - "once_cell", - "slab", -] - -[[package]] -name = "async-global-executor" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9586ec52317f36de58453159d48351bc244bc24ced3effc1fce22f3d48664af6" -dependencies = [ - "async-channel", - "async-executor", - "async-io", - "async-mutex", - "blocking", - "futures-lite", - "num_cpus", - "once_cell", -] - [[package]] name = "async-io" version = "1.6.0" @@ -384,65 +333,6 @@ dependencies = [ "event-listener", ] -[[package]] -name = "async-mutex" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" -dependencies = [ - "event-listener", -] - -[[package]] -name = "async-process" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83137067e3a2a6a06d67168e49e68a0957d215410473a740cea95a2425c0b7c6" -dependencies = [ - "async-io", - "blocking", - "cfg-if", - "event-listener", - "futures-lite", - "libc", - "once_cell", - "signal-hook", - "winapi", -] - -[[package]] -name = "async-std" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" -dependencies = [ - "async-attributes", - "async-channel", - "async-global-executor", - "async-io", - "async-lock", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", - "futures-lite", - "gloo-timers", - "kv-log-macro", - "log", - "memchr", - "once_cell", - "pin-project-lite 0.2.7", - "pin-utils", - "slab", - "wasm-bindgen-futures", -] - -[[package]] -name = "async-task" -version = "4.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" - [[package]] name = "async-trait" version = "0.1.58" @@ -753,20 +643,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" -[[package]] -name = "blocking" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046e47d4b2d391b1f6f8b407b1deb8dee56c1852ccd868becf2710f601b5f427" -dependencies = [ - "async-channel", - "async-task", - "atomic-waker", - "fastrand", - "futures-lite", - "once_cell", -] - [[package]] name = "bounded-vec" version = "0.6.0" @@ -2911,19 +2787,6 @@ dependencies = [ "regex", ] -[[package]] -name = "gloo-timers" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47204a46aaff920a1ea58b11d03dec6f704287d27561724a4631e450654a891f" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "group" version = "0.12.1" @@ -3665,15 +3528,6 @@ dependencies = [ "sp-weights", ] -[[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log", -] - [[package]] name = "kvdb" version = "0.13.0" @@ -6930,8 +6784,6 @@ version = "0.9.33" dependencies = [ "always-assert", "assert_matches", - "async-process", - "async-std", "cpu-time", "futures", "futures-timer", @@ -6956,6 +6808,7 @@ dependencies = [ "tempfile", "test-parachain-adder", "test-parachain-halt", + "tokio", "tracing-gum", ] diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index 743a053f2ec7..dd2827e751fe 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -627,7 +627,8 @@ trait ValidationBackend { self.validate_candidate(pvf.clone(), timeout, params.encode()).await; // If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the - // assumption that the conditions that caused this error may have been transient. + // assumption that the conditions that caused this error may have been transient. Note that + // this error is only a result of execution itself and not of preparation. if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) = validation_result { @@ -676,12 +677,12 @@ impl ValidationBackend for ValidationHost { async fn precheck_pvf(&mut self, pvf: Pvf) -> Result { let (tx, rx) = oneshot::channel(); - if let Err(_) = self.precheck_pvf(pvf, tx).await { + if let Err(err) = self.precheck_pvf(pvf, tx).await { // Return an IO error if there was an error communicating with the host. - return Err(PrepareError::IoErr) + return Err(PrepareError::IoErr(err)) } - let precheck_result = rx.await.or(Err(PrepareError::IoErr))?; + let precheck_result = rx.await.map_err(|err| PrepareError::IoErr(err.to_string()))?; precheck_result } diff --git a/node/core/candidate-validation/src/tests.rs b/node/core/candidate-validation/src/tests.rs index c6003c734973..476e4ea7f985 100644 --- a/node/core/candidate-validation/src/tests.rs +++ b/node/core/candidate-validation/src/tests.rs @@ -1053,5 +1053,5 @@ fn precheck_properly_classifies_outcomes() { inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid); inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed); - inner(Err(PrepareError::IoErr), PreCheckOutcome::Failed); + inner(Err(PrepareError::IoErr("fizz".to_owned())), PreCheckOutcome::Failed); } diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml index 2aaf408ae56d..e00092826428 100644 --- a/node/core/pvf/Cargo.toml +++ b/node/core/pvf/Cargo.toml @@ -10,8 +10,6 @@ path = "bin/puppet_worker.rs" [dependencies] always-assert = "0.1" -async-std = { version = "1.11.0", features = ["attributes"] } -async-process = "1.3.0" assert_matches = "1.4.0" cpu-time = "1.0.0" futures = "0.3.21" @@ -21,6 +19,7 @@ gum = { package = "tracing-gum", path = "../../gum" } pin-project = "1.0.9" rand = "0.8.5" tempfile = "3.3.0" +tokio = { version = "1.22.0", features = ["fs", "process"] } rayon = "1.5.1" parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] } diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index 413d73b4c558..297ed0829cca 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -16,10 +16,10 @@ use crate::{error::PrepareError, host::PrepareResultSender}; use always_assert::always; -use async_std::path::{Path, PathBuf}; use polkadot_parachain::primitives::ValidationCodeHash; use std::{ collections::HashMap, + path::{Path, PathBuf}, time::{Duration, SystemTime}, }; @@ -136,8 +136,8 @@ impl Artifacts { pub async fn new(cache_path: &Path) -> Self { // Make sure that the cache path directory and all its parents are created. // First delete the entire cache. Nodes are long-running so this should populate shortly. - let _ = async_std::fs::remove_dir_all(cache_path).await; - let _ = async_std::fs::create_dir_all(cache_path).await; + let _ = tokio::fs::remove_dir_all(cache_path).await; + let _ = tokio::fs::create_dir_all(cache_path).await; Self { artifacts: HashMap::new() } } @@ -214,9 +214,8 @@ impl Artifacts { #[cfg(test)] mod tests { use super::{ArtifactId, Artifacts}; - use async_std::path::Path; use sp_core::H256; - use std::str::FromStr; + use std::{path::Path, str::FromStr}; #[test] fn from_file_name() { @@ -252,11 +251,9 @@ mod tests { ); } - #[test] - fn artifacts_removes_cache_on_startup() { - let fake_cache_path = async_std::task::block_on(async move { - crate::worker_common::tmpfile("test-cache").await.unwrap() - }); + #[tokio::test] + async fn artifacts_removes_cache_on_startup() { + let fake_cache_path = crate::worker_common::tmpfile("test-cache").await.unwrap(); let fake_artifact_path = { let mut p = fake_cache_path.clone(); p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234"); @@ -271,7 +268,7 @@ mod tests { // this should remove it and re-create. let p = &fake_cache_path; - async_std::task::block_on(async { Artifacts::new(p).await }); + Artifacts::new(p).await; assert_eq!(std::fs::read_dir(&fake_cache_path).unwrap().count(), 0); diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index 01d8c78d39ca..a679b2f96062 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -34,7 +34,7 @@ pub enum PrepareError { TimedOut, /// An IO error occurred while receiving the result from the worker process. This state is reported by the /// validation host (not by the worker). - IoErr, + IoErr(String), /// The temporary file for the artifact could not be created at the given cache path. This state is reported by the /// validation host (not by the worker). CreateTmpFileErr(String), @@ -54,7 +54,7 @@ impl PrepareError { use PrepareError::*; match self { Prevalidation(_) | Preparation(_) | Panic(_) => true, - TimedOut | IoErr | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false, + TimedOut | IoErr(_) | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false, } } } @@ -67,7 +67,7 @@ impl fmt::Display for PrepareError { Preparation(err) => write!(f, "preparation: {}", err), Panic(err) => write!(f, "panic: {}", err), TimedOut => write!(f, "prepare: timeout"), - IoErr => write!(f, "prepare: io error while receiving response"), + IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err), CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err), RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err), } diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs index 72b6e450351b..f2f1b4e0cfff 100644 --- a/node/core/pvf/src/execute/queue.rs +++ b/node/core/pvf/src/execute/queue.rs @@ -24,7 +24,6 @@ use crate::{ worker_common::{IdleWorker, WorkerHandle}, InvalidCandidate, ValidationError, LOG_TARGET, }; -use async_std::path::PathBuf; use futures::{ channel::mpsc, future::BoxFuture, @@ -32,7 +31,7 @@ use futures::{ Future, FutureExt, }; use slotmap::HopSlotMap; -use std::{collections::VecDeque, fmt, time::Duration}; +use std::{collections::VecDeque, fmt, path::PathBuf, time::Duration}; slotmap::new_key_type! { struct Worker; } diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index 105accf18e2b..df928efaa642 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -19,30 +19,22 @@ use crate::{ executor_intf::Executor, worker_common::{ bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, - spawn_with_program_path, worker_event_loop, IdleWorker, JobKind, SpawnErr, WorkerHandle, + spawn_with_program_path, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, }; -use async_std::{ - io, - os::unix::net::UnixStream, - path::{Path, PathBuf}, - task, -}; use cpu_time::ProcessTime; -use futures::FutureExt; +use futures::{pin_mut, select_biased, FutureExt}; use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; use polkadot_parachain::primitives::ValidationResult; use std::{ - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread, + path::{Path, PathBuf}, + sync::{mpsc::channel, Arc}, time::Duration, }; +use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -235,10 +227,10 @@ impl Response { /// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies /// the path to the socket used to communicate with the host. pub fn worker_entrypoint(socket_path: &str) { - worker_event_loop("execute", socket_path, |mut stream| async move { - let executor = Executor::new().map_err(|e| { + worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move { + let executor = Arc::new(Executor::new().map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) - })?; + })?); loop { let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; @@ -249,52 +241,61 @@ pub fn worker_entrypoint(socket_path: &str) { artifact_path.display(), ); - // Create a lock flag. We set it when either thread finishes. - let lock = Arc::new(AtomicBool::new(false)); + // Used to signal to the cpu time monitor thread that it can finish. + let (finished_tx, finished_rx) = channel::<()>(); let cpu_time_start = ProcessTime::now(); - // Spawn a new thread that runs the CPU time monitor. Continuously wakes up from - // sleeping and then either sleeps for the remaining CPU time, or kills the process if - // we exceed the CPU timeout. - let (stream_2, cpu_time_start_2, execution_timeout_2, lock_2) = - (stream.clone(), cpu_time_start, execution_timeout, lock.clone()); - let handle = - thread::Builder::new().name("CPU time monitor".into()).spawn(move || { - task::block_on(async { - cpu_time_monitor_loop( - JobKind::Execute, - stream_2, - cpu_time_start_2, - execution_timeout_2, - lock_2, - ) - .await; - }) - })?; + // Spawn a new thread that runs the CPU time monitor. + let thread_fut = rt_handle + .spawn_blocking(move || { + cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx) + }) + .fuse(); + let executor_2 = executor.clone(); + let execute_fut = rt_handle + .spawn_blocking(move || { + validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start) + }) + .fuse(); - let response = - validate_using_artifact(&artifact_path, ¶ms, &executor, cpu_time_start).await; + pin_mut!(thread_fut); + pin_mut!(execute_fut); - let lock_result = - lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); - if lock_result.is_err() { - // The other thread is still sending an error response over the socket. Wait on it - // and return. - let _ = handle.join(); - // Monitor thread detected timeout and likely already terminated the process, - // nothing to do. - continue - } + let response = select_biased! { + // If this future is not selected, the join handle is dropped and the thread will + // finish in the background. + join_res = thread_fut => { + match join_res { + Ok(Some(cpu_time_elapsed)) => { + // Log if we exceed the timeout and the other thread hasn't finished. + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "execute job took {}ms cpu time, exceeded execute timeout {}ms", + cpu_time_elapsed.as_millis(), + execution_timeout.as_millis(), + ); + Response::TimedOut + }, + Ok(None) => Response::InternalError("error communicating over finished channel".into()), + Err(e) => Response::InternalError(format!("{}", e)), + } + }, + execute_res = execute_fut => { + let _ = finished_tx.send(()); + execute_res.unwrap_or_else(|e| Response::InternalError(format!("{}", e))) + }, + }; send_response(&mut stream, response).await?; } }); } -async fn validate_using_artifact( +fn validate_using_artifact( artifact_path: &Path, params: &[u8], - executor: &Executor, + executor: Arc, cpu_time_start: ProcessTime, ) -> Response { let descriptor_bytes = match unsafe { diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 96aed4eae7a8..b41ebd3c425b 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -28,7 +28,6 @@ use crate::{ prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET, }; use always_assert::never; -use async_std::path::{Path, PathBuf}; use futures::{ channel::{mpsc, oneshot}, Future, FutureExt, SinkExt, StreamExt, @@ -36,6 +35,7 @@ use futures::{ use polkadot_parachain::primitives::ValidationResult; use std::{ collections::HashMap, + path::{Path, PathBuf}, time::{Duration, SystemTime}, }; @@ -171,7 +171,7 @@ pub struct Config { impl Config { /// Create a new instance of the configuration. pub fn new(cache_path: std::path::PathBuf, program_path: std::path::PathBuf) -> Self { - // Do not contaminate the other parts of the codebase with the types from `async_std`. + // Do not contaminate the other parts of the codebase with the types from `tokio`. let cache_path = PathBuf::from(cache_path); let program_path = PathBuf::from(program_path); @@ -723,10 +723,19 @@ async fn handle_prepare_done( *state = match result { Ok(cpu_time_elapsed) => ArtifactState::Prepared { last_time_needed: SystemTime::now(), cpu_time_elapsed }, - Err(error) => ArtifactState::FailedToProcess { - last_time_failed: SystemTime::now(), - num_failures: *num_failures + 1, - error, + Err(error) => { + gum::debug!( + target: LOG_TARGET, + artifact_id = ?artifact_id, + num_failures = ?num_failures, + "Failed to process artifact: {}", + error + ); + ArtifactState::FailedToProcess { + last_time_failed: SystemTime::now(), + num_failures: *num_failures + 1, + error, + } }, }; @@ -778,7 +787,7 @@ async fn sweeper_task(mut sweeper_rx: mpsc::Receiver) { match sweeper_rx.next().await { None => break, Some(condemned) => { - let result = async_std::fs::remove_file(&condemned).await; + let result = tokio::fs::remove_file(&condemned).await; gum::trace!( target: LOG_TARGET, ?result, @@ -827,7 +836,7 @@ mod tests { const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); - #[async_std::test] + #[tokio::test] async fn pulse_test() { let pulse = pulse_every(Duration::from_millis(100)); futures::pin_mut!(pulse); @@ -1017,19 +1026,19 @@ mod tests { } } - #[async_std::test] + #[tokio::test] async fn shutdown_on_handle_drop() { let test = Builder::default().build(); - let join_handle = async_std::task::spawn(test.run); + let join_handle = tokio::task::spawn(test.run); // Dropping the handle will lead to conclusion of the read part and thus will make the event // loop to stop, which in turn will resolve the join handle. drop(test.to_host_tx); - join_handle.await; + join_handle.await.unwrap(); } - #[async_std::test] + #[tokio::test] async fn pruning() { let mock_now = SystemTime::now() - Duration::from_millis(1000); @@ -1059,7 +1068,7 @@ mod tests { test.poll_ensure_to_sweeper_is_empty().await; } - #[async_std::test] + #[tokio::test] async fn execute_pvf_requests() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1159,7 +1168,7 @@ mod tests { ); } - #[async_std::test] + #[tokio::test] async fn precheck_pvf() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1214,7 +1223,7 @@ mod tests { } } - #[async_std::test] + #[tokio::test] async fn test_prepare_done() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1301,7 +1310,7 @@ mod tests { // Test that multiple prechecking requests do not trigger preparation retries if the first one // failed. - #[async_std::test] + #[tokio::test] async fn test_precheck_prepare_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1344,7 +1353,7 @@ mod tests { // Test that multiple execution requests trigger preparation retries if the first one failed due // to a potentially non-reproducible error. - #[async_std::test] + #[tokio::test] async fn test_execute_prepare_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1414,7 +1423,7 @@ mod tests { // Test that multiple execution requests don't trigger preparation retries if the first one // failed due to a reproducible error (e.g. Prevalidation). - #[async_std::test] + #[tokio::test] async fn test_execute_prepare_no_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1480,7 +1489,7 @@ mod tests { } // Test that multiple heads-up requests trigger preparation retries if the first one failed. - #[async_std::test] + #[tokio::test] async fn test_heads_up_prepare_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); @@ -1521,7 +1530,7 @@ mod tests { ); } - #[async_std::test] + #[tokio::test] async fn cancellation() { let mut test = Builder::default().build(); let mut host = test.host_handle(); diff --git a/node/core/pvf/src/lib.rs b/node/core/pvf/src/lib.rs index 1aabb1100437..0e858147bd29 100644 --- a/node/core/pvf/src/lib.rs +++ b/node/core/pvf/src/lib.rs @@ -113,6 +113,7 @@ pub use pvf::Pvf; pub use host::{start, Config, ValidationHost}; pub use metrics::Metrics; +pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR; pub use execute::worker_entrypoint as execute_worker_entrypoint; pub use prepare::worker_entrypoint as prepare_worker_entrypoint; diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 3319d44e7fb4..0d39623c99db 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -22,12 +22,17 @@ use crate::{ LOG_TARGET, }; use always_assert::never; -use async_std::path::{Path, PathBuf}; use futures::{ channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt, }; use slotmap::HopSlotMap; -use std::{fmt, sync::Arc, task::Poll, time::Duration}; +use std::{ + fmt, + path::{Path, PathBuf}, + sync::Arc, + task::Poll, + time::Duration, +}; slotmap::new_key_type! { pub struct Worker; } @@ -322,14 +327,14 @@ fn handle_mux( Ok(()) }, - Outcome::IoErr => { + Outcome::IoErr(err) => { if attempt_retire(metrics, spawned, worker) { reply( from_pool, FromPool::Concluded { worker, rip: true, - result: Err(PrepareError::IoErr), + result: Err(PrepareError::IoErr(err)), }, )?; } diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index e78351af9839..c44301c7427b 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -19,10 +19,10 @@ use super::pool::{self, Worker}; use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pvf, LOG_TARGET}; use always_assert::{always, never}; -use async_std::path::PathBuf; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; use std::{ collections::{HashMap, VecDeque}, + path::PathBuf, time::Duration, }; @@ -603,7 +603,7 @@ mod tests { } } - #[async_std::test] + #[tokio::test] async fn properly_concludes() { let mut test = Test::new(2, 2); @@ -625,7 +625,7 @@ mod tests { assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); } - #[async_std::test] + #[tokio::test] async fn dont_spawn_over_soft_limit_unless_critical() { let mut test = Test::new(2, 3); let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; @@ -669,7 +669,7 @@ mod tests { assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); } - #[async_std::test] + #[tokio::test] async fn cull_unwanted() { let mut test = Test::new(1, 2); let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; @@ -707,7 +707,7 @@ mod tests { assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1)); } - #[async_std::test] + #[tokio::test] async fn worker_mass_die_out_doesnt_stall_queue() { let mut test = Test::new(2, 2); @@ -741,7 +741,7 @@ mod tests { assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); } - #[async_std::test] + #[tokio::test] async fn doesnt_resurrect_ripped_worker_if_no_work() { let mut test = Test::new(2, 2); @@ -761,12 +761,12 @@ mod tests { test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, - result: Err(PrepareError::IoErr), + result: Err(PrepareError::IoErr("test".into())), }); test.poll_ensure_to_pool_is_empty().await; } - #[async_std::test] + #[tokio::test] async fn rip_for_start_work() { let mut test = Test::new(2, 2); diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 5b4212e1e313..d3550fe3afe6 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -19,29 +19,22 @@ use crate::{ error::{PrepareError, PrepareResult}, worker_common::{ bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, - spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, JobKind, SpawnErr, - WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, + spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, + JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, }; -use async_std::{ - io, - os::unix::net::UnixStream, - path::{Path, PathBuf}, - task, -}; use cpu_time::ProcessTime; +use futures::{pin_mut, select_biased, FutureExt}; use parity_scale_codec::{Decode, Encode}; use sp_core::hexdisplay::HexDisplay; use std::{ panic, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread, + path::{Path, PathBuf}, + sync::{mpsc::channel, Arc}, time::Duration, }; +use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -71,7 +64,7 @@ pub enum Outcome { /// An IO error occurred while receiving the result from the worker process. /// /// This doesn't return an idle worker instance, thus this worker is no longer usable. - IoErr, + IoErr(String), } /// Given the idle token of a worker and parameters of work, communicates with the worker and @@ -86,7 +79,7 @@ pub async fn start_work( artifact_path: PathBuf, preparation_timeout: Duration, ) -> Outcome { - let IdleWorker { mut stream, pid } = worker; + let IdleWorker { stream, pid } = worker; gum::debug!( target: LOG_TARGET, @@ -95,7 +88,7 @@ pub async fn start_work( artifact_path.display(), ); - with_tmp_file(stream.clone(), pid, cache_path, |tmp_file| async move { + with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move { if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await { gum::warn!( target: LOG_TARGET, @@ -116,7 +109,7 @@ pub async fn start_work( // load, but the CPU resources of the child can only be measured from the parent after the // child process terminates. let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; - let result = async_std::future::timeout(timeout, framed_recv(&mut stream)).await; + let result = tokio::time::timeout(timeout, framed_recv(&mut stream)).await; match result { // Received bytes from worker within the time limit. @@ -138,7 +131,7 @@ pub async fn start_work( "failed to recv a prepare response: {:?}", err, ); - Outcome::IoErr + Outcome::IoErr(err.to_string()) }, Err(_) => { // Timed out here on the host. @@ -169,7 +162,7 @@ async fn handle_response_bytes( // By convention we expect encoded `PrepareResult`. let result = match PrepareResult::decode(&mut response_bytes.as_slice()) { Ok(result) => result, - Err(_) => { + Err(err) => { // We received invalid bytes from the worker. let bound_bytes = &response_bytes[..response_bytes.len().min(4)]; gum::warn!( @@ -178,7 +171,7 @@ async fn handle_response_bytes( "received unexpected response from the prepare worker: {}", HexDisplay::from(&bound_bytes), ); - return Outcome::IoErr + return Outcome::IoErr(err.to_string()) }, }; let cpu_time_elapsed = match result { @@ -198,11 +191,6 @@ async fn handle_response_bytes( preparation_timeout.as_millis(), tmp_file.display(), ); - - // Return a timeout error. - // - // NOTE: The artifact exists, but is located in a temporary file which - // will be cleared by `with_tmp_file`. return Outcome::TimedOut } @@ -214,8 +202,8 @@ async fn handle_response_bytes( artifact_path.display(), ); - match async_std::fs::rename(&tmp_file, &artifact_path).await { - Ok(_) => Outcome::Concluded { worker, result }, + match tokio::fs::rename(&tmp_file, &artifact_path).await { + Ok(()) => Outcome::Concluded { worker, result }, Err(err) => { gum::warn!( target: LOG_TARGET, @@ -237,7 +225,7 @@ async fn handle_response_bytes( async fn with_tmp_file(stream: UnixStream, pid: u32, cache_path: &Path, f: F) -> Outcome where Fut: futures::Future, - F: FnOnce(PathBuf) -> Fut, + F: FnOnce(PathBuf, UnixStream) -> Fut, { let tmp_file = match tmpfile_in("prepare-artifact-", cache_path).await { Ok(f) => f, @@ -255,14 +243,14 @@ where }, }; - let outcome = f(tmp_file.clone()).await; + let outcome = f(tmp_file.clone(), stream).await; // The function called above is expected to move `tmp_file` to a new location upon success. However, // the function may as well fail and in that case we should remove the tmp file here. // // In any case, we try to remove the file here so that there are no leftovers. We only report // errors that are different from the `NotFound`. - match async_std::fs::remove_file(tmp_file).await { + match tokio::fs::remove_file(tmp_file).await { Ok(()) => (), Err(err) if err.kind() == std::io::ErrorKind::NotFound => (), Err(err) => { @@ -312,74 +300,78 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf, /// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies /// the path to the socket used to communicate with the host. pub fn worker_entrypoint(socket_path: &str) { - worker_event_loop("prepare", socket_path, |mut stream| async move { + worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { loop { let (code, dest, preparation_timeout) = recv_request(&mut stream).await?; - gum::debug!( target: LOG_TARGET, worker_pid = %std::process::id(), "worker: preparing artifact", ); - // Create a lock flag. We set it when either thread finishes. - let lock = Arc::new(AtomicBool::new(false)); + // Used to signal to the cpu time monitor thread that it can finish. + let (finished_tx, finished_rx) = channel::<()>(); let cpu_time_start = ProcessTime::now(); - // Spawn a new thread that runs the CPU time monitor. Continuously wakes up from - // sleeping and then either sleeps for the remaining CPU time, or kills the process if - // we exceed the CPU timeout. - let (stream_2, cpu_time_start_2, preparation_timeout_2, lock_2) = - (stream.clone(), cpu_time_start, preparation_timeout, lock.clone()); - let handle = - thread::Builder::new().name("CPU time monitor".into()).spawn(move || { - task::block_on(async { - cpu_time_monitor_loop( - JobKind::Prepare, - stream_2, - cpu_time_start_2, - preparation_timeout_2, - lock_2, - ) - .await; - }) - })?; - - // Prepares the artifact in a separate thread. - let result = match prepare_artifact(&code).await { - Err(err) => { - // Serialized error will be written into the socket. - Err(err) + // Spawn a new thread that runs the CPU time monitor. + let thread_fut = rt_handle + .spawn_blocking(move || { + cpu_time_monitor_loop(cpu_time_start, preparation_timeout, finished_rx) + }) + .fuse(); + let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)).fuse(); + + pin_mut!(thread_fut); + pin_mut!(prepare_fut); + + let result = select_biased! { + // If this future is not selected, the join handle is dropped and the thread will + // finish in the background. + join_res = thread_fut => { + match join_res { + Ok(Some(cpu_time_elapsed)) => { + // Log if we exceed the timeout and the other thread hasn't finished. + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", + cpu_time_elapsed.as_millis(), + preparation_timeout.as_millis(), + ); + Err(PrepareError::TimedOut) + }, + Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())), + Err(err) => Err(PrepareError::IoErr(err.to_string())), + } }, - Ok(compiled_artifact) => { + compilation_res = prepare_fut => { let cpu_time_elapsed = cpu_time_start.elapsed(); - - let lock_result = - lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); - if lock_result.is_err() { - // The other thread is still sending an error response over the socket. Wait on it and - // return. - let _ = handle.join(); - // Monitor thread detected timeout and likely already terminated the - // process, nothing to do. - continue + let _ = finished_tx.send(()); + + match compilation_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) { + Err(err) => { + // Serialized error will be written into the socket. + Err(err) + }, + Ok(compiled_artifact) => { + // Write the serialized artifact into a temp file. + // + // PVF host only keeps artifacts statuses in its memory, successfully + // compiled code gets stored on the disk (and consequently deserialized + // by execute-workers). The prepare worker is only required to send `Ok` + // to the pool to indicate the success. + + gum::debug!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: writing artifact to {}", + dest.display(), + ); + tokio::fs::write(&dest, &compiled_artifact).await?; + + Ok(cpu_time_elapsed) + }, } - - // Write the serialized artifact into a temp file. - // - // PVF host only keeps artifacts statuses in its memory, successfully compiled code gets stored - // on the disk (and consequently deserialized by execute-workers). The prepare worker is only - // required to send `Ok` to the pool to indicate the success. - - gum::debug!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "worker: writing artifact to {}", - dest.display(), - ); - async_std::fs::write(&dest, &compiled_artifact).await?; - - Ok(cpu_time_elapsed) }, }; @@ -388,7 +380,7 @@ pub fn worker_entrypoint(socket_path: &str) { }); } -async fn prepare_artifact(code: &[u8]) -> Result { +fn prepare_artifact(code: &[u8]) -> Result { panic::catch_unwind(|| { let blob = match crate::executor_intf::prevalidate(code) { Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index e052bd77ed06..9cda5f8cd0b7 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -16,31 +16,26 @@ //! Common logic for implementation of worker processes. -use crate::{execute::ExecuteResponse, PrepareError, LOG_TARGET}; -use async_std::{ - io, - net::Shutdown, - os::unix::net::{UnixListener, UnixStream}, - path::{Path, PathBuf}, -}; +use crate::LOG_TARGET; use cpu_time::ProcessTime; -use futures::{ - never::Never, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, FutureExt as _, -}; +use futures::{never::Never, FutureExt as _}; use futures_timer::Delay; -use parity_scale_codec::Encode; use pin_project::pin_project; use rand::Rng; use std::{ fmt, mem, + path::{Path, PathBuf}, pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::mpsc::{Receiver, RecvTimeoutError}, task::{Context, Poll}, time::Duration, }; +use tokio::{ + io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf}, + net::{UnixListener, UnixStream}, + process, + runtime::{Handle, Runtime}, +}; /// A multiple of the job timeout (in CPU time) for which we are willing to wait on the host (in /// wall clock time). This is lenient because CPU time may go slower than wall clock time. @@ -50,21 +45,6 @@ pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4; /// child process. pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50); -#[derive(Copy, Clone, Debug)] -pub enum JobKind { - Prepare, - Execute, -} - -impl fmt::Display for JobKind { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Prepare => write!(f, "prepare"), - Self::Execute => write!(f, "execute"), - } - } -} - /// This is publicly exposed only for integration tests. #[doc(hidden)] pub async fn spawn_with_program_path( @@ -77,7 +57,7 @@ pub async fn spawn_with_program_path( with_transient_socket_path(debug_id, |socket_path| { let socket_path = socket_path.to_owned(); async move { - let listener = UnixListener::bind(&socket_path).await.map_err(|err| { + let listener = UnixListener::bind(&socket_path).map_err(|err| { gum::warn!( target: LOG_TARGET, %debug_id, @@ -132,7 +112,7 @@ where // Best effort to remove the socket file. Under normal circumstances the socket will be removed // by the worker. We make sure that it is removed here, just in case a failed rendezvous. - let _ = async_std::fs::remove_file(socket_path).await; + let _ = tokio::fs::remove_file(socket_path).await; result } @@ -163,7 +143,7 @@ pub async fn tmpfile_in(prefix: &str, dir: &Path) -> io::Result { for _ in 0..NUM_RETRIES { let candidate_path = tmppath(prefix, dir); - if !candidate_path.exists().await { + if !candidate_path.exists() { return Ok(candidate_path) } } @@ -179,28 +159,22 @@ pub async fn tmpfile(prefix: &str) -> io::Result { pub fn worker_event_loop(debug_id: &'static str, socket_path: &str, mut event_loop: F) where - F: FnMut(UnixStream) -> Fut, + F: FnMut(Handle, UnixStream) -> Fut, Fut: futures::Future>, { - let err = async_std::task::block_on::<_, io::Result>(async move { - let stream = UnixStream::connect(socket_path).await?; - let _ = async_std::fs::remove_file(socket_path).await; - - let result = event_loop(stream.clone()).await; - - if let Err(err) = stream.shutdown(Shutdown::Both) { - // Log, but don't return error here, as it may shadow any error from `event_loop`. - gum::debug!( - target: LOG_TARGET, - "error shutting down stream at path {}: {}", - socket_path, - err - ); - } + let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it."); + let handle = rt.handle(); + let err = rt + .block_on(async move { + let stream = UnixStream::connect(socket_path).await?; + let _ = tokio::fs::remove_file(socket_path).await; - result - }) - .unwrap_err(); // it's never `Ok` because it's `Ok(Never)` + let result = event_loop(handle.clone(), stream).await; + + result + }) + // It's never `Ok` because it's `Ok(Never)`. + .unwrap_err(); gum::debug!( target: LOG_TARGET, @@ -209,74 +183,45 @@ where debug_id, err, ); + + // We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast + // as possible and not wait for stalled validation to finish. This isn't strictly necessary now, + // but may be in the future. + rt.shutdown_background(); } /// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up -/// from sleeping and then either sleeps for the remaining CPU time, or sends back a timeout error -/// if we exceed the CPU timeout. +/// and then either blocks for the remaining CPU time, or returns if we exceed the CPU timeout. /// -/// NOTE: If the job completes and this thread is still sleeping, it will continue sleeping in the -/// background. When it wakes, it will see that the flag has been set and return. -pub async fn cpu_time_monitor_loop( - job_kind: JobKind, - mut stream: UnixStream, +/// Returning `Some` indicates that we should send a `TimedOut` error to the host. Will return +/// `None` if the other thread finishes first, without us timing out. +/// +/// NOTE: Sending a `TimedOut` error to the host will cause the worker, whether preparation or +/// execution, to be killed by the host. We do not kill the process here because it would interfere +/// with the proper handling of this error. +pub fn cpu_time_monitor_loop( cpu_time_start: ProcessTime, timeout: Duration, - lock: Arc, -) { + finished_rx: Receiver<()>, +) -> Option { loop { let cpu_time_elapsed = cpu_time_start.elapsed(); // Treat the timeout as CPU time, which is less subject to variance due to load. - if cpu_time_elapsed > timeout { - let result = lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); - if result.is_err() { - // Hit the job-completed case first, return from this thread. - return - } - - // Log if we exceed the timeout. - gum::warn!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "{job_kind} job took {}ms cpu time, exceeded {job_kind} timeout {}ms", - cpu_time_elapsed.as_millis(), - timeout.as_millis(), - ); - - // Send back a `TimedOut` error. - // - // NOTE: This will cause the worker, whether preparation or execution, to be killed by - // the host. We do not kill the process here because it would interfere with the proper - // handling of this error. - let encoded_result = match job_kind { - JobKind::Prepare => { - let result: Result<(), PrepareError> = Err(PrepareError::TimedOut); - result.encode() - }, - JobKind::Execute => { - let result = ExecuteResponse::TimedOut; - result.encode() - }, - }; - // If we error here there is nothing we can do apart from log it. The receiving side - // will just have to time out. - if let Err(err) = framed_send(&mut stream, encoded_result.as_slice()).await { - gum::warn!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "{job_kind} worker -> pvf host: error sending result over the socket: {:?}", - err - ); + if cpu_time_elapsed <= timeout { + // Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep + // is wall clock time. The CPU clock may be slower than the wall clock. + let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD; + match finished_rx.recv_timeout(sleep_interval) { + // Received finish signal. + Ok(()) => return None, + // Timed out, restart loop. + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => return None, } - - return } - // Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep - // is wall clock time. The CPU clock may be slower than the wall clock. - let sleep_interval = timeout - cpu_time_elapsed + JOB_TIMEOUT_OVERHEAD; - std::thread::sleep(sleep_interval); + return Some(cpu_time_elapsed) } } @@ -317,9 +262,10 @@ pub enum SpawnErr { /// This future relies on the fact that a child process's stdout `fd` is closed upon it's termination. #[pin_project] pub struct WorkerHandle { - child: async_process::Child, + child: process::Child, + child_id: u32, #[pin] - stdout: async_process::ChildStdout, + stdout: process::ChildStdout, program: PathBuf, drop_box: Box<[u8]>, } @@ -330,13 +276,16 @@ impl WorkerHandle { extra_args: &[&str], socket_path: impl AsRef, ) -> io::Result { - let mut child = async_process::Command::new(program.as_ref()) + let mut child = process::Command::new(program.as_ref()) .args(extra_args) .arg(socket_path.as_ref().as_os_str()) - .stdout(async_process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) .kill_on_drop(true) .spawn()?; + let child_id = child + .id() + .ok_or(io::Error::new(io::ErrorKind::Other, "could not get id of spawned process"))?; let stdout = child .stdout .take() @@ -344,6 +293,7 @@ impl WorkerHandle { Ok(WorkerHandle { child, + child_id, stdout, program: program.as_ref().to_path_buf(), // We don't expect the bytes to be ever read. But in case we do, we should not use a buffer @@ -361,7 +311,7 @@ impl WorkerHandle { /// Returns the process id of this worker. pub fn id(&self) -> u32 { - self.child.id() + self.child_id } } @@ -370,15 +320,20 @@ impl futures::Future for WorkerHandle { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); - match futures::ready!(AsyncRead::poll_read(me.stdout, cx, &mut *me.drop_box)) { - Ok(0) => { - // 0 means `EOF` means the child was terminated. Resolve. - Poll::Ready(()) - }, - Ok(_bytes_read) => { - // weird, we've read something. Pretend that never happened and reschedule ourselves. - cx.waker().wake_by_ref(); - Poll::Pending + // Create a `ReadBuf` here instead of storing it in `WorkerHandle` to avoid a lifetime + // parameter on `WorkerHandle`. Creating the `ReadBuf` is fairly cheap. + let mut read_buf = ReadBuf::new(&mut *me.drop_box); + match futures::ready!(AsyncRead::poll_read(me.stdout, cx, &mut read_buf)) { + Ok(()) => { + if read_buf.filled().len() > 0 { + // weird, we've read something. Pretend that never happened and reschedule + // ourselves. + cx.waker().wake_by_ref(); + Poll::Pending + } else { + // Nothing read means `EOF` means the child was terminated. Resolve. + Poll::Ready(()) + } }, Err(err) => { // The implementation is guaranteed to not to return `WouldBlock` and Interrupted. This @@ -387,8 +342,8 @@ impl futures::Future for WorkerHandle { // Log the status code. gum::debug!( target: LOG_TARGET, - worker_pid = %me.child.id(), - status_code = ?me.child.try_status(), + worker_pid = %me.child_id, + status_code = ?me.child.try_wait().ok().flatten().map(|c| c.to_string()), "pvf worker ({}): {:?}", me.program.display(), err, diff --git a/node/core/pvf/tests/it/adder.rs b/node/core/pvf/tests/it/adder.rs index 69b6b7d21979..8eb57e4d9026 100644 --- a/node/core/pvf/tests/it/adder.rs +++ b/node/core/pvf/tests/it/adder.rs @@ -22,7 +22,7 @@ use polkadot_parachain::primitives::{ ValidationParams, }; -#[async_std::test] +#[tokio::test] async fn execute_good_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; @@ -50,7 +50,7 @@ async fn execute_good_block_on_parent() { assert_eq!(new_head.post_state, hash_state(512)); } -#[async_std::test] +#[tokio::test] async fn execute_good_chain_on_parent() { let mut number = 0; let mut parent_hash = [0; 32]; @@ -88,7 +88,7 @@ async fn execute_good_chain_on_parent() { } } -#[async_std::test] +#[tokio::test] async fn execute_bad_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; @@ -113,7 +113,7 @@ async fn execute_bad_block_on_parent() { .unwrap_err(); } -#[async_std::test] +#[tokio::test] async fn stress_spawn() { let host = std::sync::Arc::new(TestHost::new()); diff --git a/node/core/pvf/tests/it/main.rs b/node/core/pvf/tests/it/main.rs index a6aaf5d369d4..07754ef8693d 100644 --- a/node/core/pvf/tests/it/main.rs +++ b/node/core/pvf/tests/it/main.rs @@ -14,13 +14,15 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use async_std::sync::Mutex; +use assert_matches::assert_matches; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost, + JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult}; use std::time::Duration; +use tokio::sync::Mutex; mod adder; mod worker_common; @@ -47,7 +49,7 @@ impl TestHost { let mut config = Config::new(cache_dir.path().to_owned(), program_path); f(&mut config); let (host, task) = start(config, Metrics::default()); - let _ = async_std::task::spawn(task); + let _ = tokio::task::spawn(task); Self { _cache_dir: cache_dir, host: Mutex::new(host) } } @@ -77,10 +79,11 @@ impl TestHost { } } -#[async_std::test] +#[tokio::test] async fn terminates_on_timeout() { let host = TestHost::new(); + let start = std::time::Instant::now(); let result = host .validate_candidate( halt::wasm_binary_unwrap(), @@ -97,10 +100,14 @@ async fn terminates_on_timeout() { Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) => {}, r => panic!("{:?}", r), } + + let duration = std::time::Instant::now().duration_since(start); + assert!(duration >= TEST_EXECUTION_TIMEOUT); + assert!(duration < TEST_EXECUTION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR); } -#[async_std::test] -async fn parallel_execution() { +#[tokio::test] +async fn ensure_parallel_execution() { // Run some jobs that do not complete, thus timing out. let host = TestHost::new(); let execute_pvf_future_1 = host.validate_candidate( @@ -123,7 +130,14 @@ async fn parallel_execution() { ); let start = std::time::Instant::now(); - let (_, _) = futures::join!(execute_pvf_future_1, execute_pvf_future_2); + let (res1, res2) = futures::join!(execute_pvf_future_1, execute_pvf_future_2); + assert_matches!( + (res1, res2), + ( + Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)), + Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) + ) + ); // Total time should be < 2 x TEST_EXECUTION_TIMEOUT (two workers run in parallel). let duration = std::time::Instant::now().duration_since(start); @@ -136,7 +150,7 @@ async fn parallel_execution() { ); } -#[async_std::test] +#[tokio::test] async fn execute_queue_doesnt_stall_if_workers_died() { let host = TestHost::new_with_config(|cfg| { cfg.execute_workers_max_num = 5; diff --git a/node/core/pvf/tests/it/worker_common.rs b/node/core/pvf/tests/it/worker_common.rs index 464b80a9fe58..7e00d005df19 100644 --- a/node/core/pvf/tests/it/worker_common.rs +++ b/node/core/pvf/tests/it/worker_common.rs @@ -18,7 +18,7 @@ use crate::PUPPET_EXE; use polkadot_node_core_pvf::testing::worker_common::{spawn_with_program_path, SpawnErr}; use std::time::Duration; -#[async_std::test] +#[tokio::test] async fn spawn_timeout() { let result = spawn_with_program_path("integration-test", PUPPET_EXE, &["sleep"], Duration::from_secs(2)) @@ -26,7 +26,7 @@ async fn spawn_timeout() { assert!(matches!(result, Err(SpawnErr::AcceptTimeout))); } -#[async_std::test] +#[tokio::test] async fn should_connect() { let _ = spawn_with_program_path( "integration-test", diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index da0a0eca80be..9af65b3d601e 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -227,7 +227,7 @@ pub type UncheckedSignedFullStatement = UncheckedSigned Date: Tue, 10 Jan 2023 12:04:05 +0100 Subject: [PATCH 6/6] Fix cycle dispute-coordinator <-> dispute-distribution (#6489) * First iteration of message sender. * dyn Fn variant (no cloning) * Full implementation + Clone, without allocs on `Send` * Further clarifications/cleanup. * MessageSender -> NestingSender * Doc update/clarification. * dispute-coordinator: Send disputes on startup. + Some fixes, cleanup. * Fix whitespace. * Dispute distribution fixes, cleanup. * Cargo.lock * Fix spaces. * More format fixes. What is cargo fmt doing actually? * More fmt fixes. * Fix nesting sender. * Fixes. * Whitespace * Enable logging. * Guide update. * Fmt fixes, typos. * Remove unused function. * Simplifications, doc fixes. * Update roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md Co-authored-by: Marcin S. * Fmt + doc example fix. Co-authored-by: eskimor Co-authored-by: Marcin S. --- Cargo.lock | 2 + node/core/dispute-coordinator/Cargo.toml | 1 + node/core/dispute-coordinator/src/import.rs | 104 +++--- .../dispute-coordinator/src/initialized.rs | 136 ++----- node/core/dispute-coordinator/src/lib.rs | 267 +++++++++++--- node/core/dispute-coordinator/src/tests.rs | 168 +++------ node/network/dispute-distribution/src/lib.rs | 70 ++-- .../dispute-distribution/src/sender/mod.rs | 344 +++++++----------- .../src/sender/send_task.rs | 22 +- .../dispute-distribution/src/tests/mod.rs | 62 +--- node/primitives/src/disputes/message.rs | 2 +- node/subsystem-util/Cargo.toml | 1 + node/subsystem-util/src/lib.rs | 6 + node/subsystem-util/src/nesting_sender.rs | 207 +++++++++++ .../src/node/disputes/dispute-coordinator.md | 21 +- .../src/node/disputes/dispute-distribution.md | 7 +- 16 files changed, 778 insertions(+), 642 deletions(-) create mode 100644 node/subsystem-util/src/nesting_sender.rs diff --git a/Cargo.lock b/Cargo.lock index ad9d2b913bca..7bd0f3bf3d56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6737,6 +6737,7 @@ dependencies = [ "sp-core", "sp-keyring", "sp-keystore", + "sp-tracing", "thiserror", "tracing-gum", ] @@ -7005,6 +7006,7 @@ dependencies = [ "env_logger 0.9.0", "fatality", "futures", + "futures-channel", "itertools", "kvdb", "kvdb-memorydb", diff --git a/node/core/dispute-coordinator/Cargo.toml b/node/core/dispute-coordinator/Cargo.toml index 2ecc4d5e331f..d4efffd0fee8 100644 --- a/node/core/dispute-coordinator/Cargo.toml +++ b/node/core/dispute-coordinator/Cargo.toml @@ -31,6 +31,7 @@ assert_matches = "1.4.0" test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../../primitives/test-helpers" } futures-timer = "3.0.2" sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } [features] # If not enabled, the dispute coordinator will do nothing. diff --git a/node/core/dispute-coordinator/src/import.rs b/node/core/dispute-coordinator/src/import.rs index 84adae167c7a..6573fd6907a9 100644 --- a/node/core/dispute-coordinator/src/import.rs +++ b/node/core/dispute-coordinator/src/import.rs @@ -87,60 +87,69 @@ impl<'a> CandidateEnvironment<'a> { /// Whether or not we already issued some statement about a candidate. pub enum OwnVoteState { - /// We already voted/issued a statement for the candidate. - Voted, - /// We already voted/issued a statement for the candidate and it was an approval vote. + /// Our votes, if any. + Voted(Vec<(ValidatorIndex, (DisputeStatement, ValidatorSignature))>), + + /// We are not a parachain validator in the session. /// - /// Needs special treatment as we have to make sure to propagate it to peers, to guarantee the - /// dispute can conclude. - VotedApproval(Vec<(ValidatorIndex, ValidatorSignature)>), - /// We not yet voted for the dispute. - NoVote, + /// Hence we cannot vote. + CannotVote, } impl OwnVoteState { fn new<'a>(votes: &CandidateVotes, env: &CandidateEnvironment<'a>) -> Self { - let mut our_valid_votes = env - .controlled_indices() + let controlled_indices = env.controlled_indices(); + if controlled_indices.is_empty() { + return Self::CannotVote + } + + let our_valid_votes = controlled_indices .iter() .filter_map(|i| votes.valid.raw().get_key_value(i)) - .peekable(); - let mut our_invalid_votes = - env.controlled_indices.iter().filter_map(|i| votes.invalid.get_key_value(i)); - let has_valid_votes = our_valid_votes.peek().is_some(); - let has_invalid_votes = our_invalid_votes.next().is_some(); - let our_approval_votes: Vec<_> = our_valid_votes - .filter_map(|(index, (k, sig))| { - if let ValidDisputeStatementKind::ApprovalChecking = k { - Some((*index, sig.clone())) - } else { - None - } - }) - .collect(); + .map(|(index, (kind, sig))| (*index, (DisputeStatement::Valid(*kind), sig.clone()))); + let our_invalid_votes = controlled_indices + .iter() + .filter_map(|i| votes.invalid.get_key_value(i)) + .map(|(index, (kind, sig))| (*index, (DisputeStatement::Invalid(*kind), sig.clone()))); - if !our_approval_votes.is_empty() { - return Self::VotedApproval(our_approval_votes) - } - if has_valid_votes || has_invalid_votes { - return Self::Voted - } - Self::NoVote + Self::Voted(our_valid_votes.chain(our_invalid_votes).collect()) } - /// Whether or not we issued a statement for the candidate already. - fn voted(&self) -> bool { + /// Is a vote from us missing but we are a validator able to vote? + fn vote_missing(&self) -> bool { match self { - Self::Voted | Self::VotedApproval(_) => true, - Self::NoVote => false, + Self::Voted(votes) if votes.is_empty() => true, + Self::Voted(_) | Self::CannotVote => false, } } /// Get own approval votes, if any. - fn approval_votes(&self) -> Option<&Vec<(ValidatorIndex, ValidatorSignature)>> { + /// + /// Empty iterator means, no approval votes. `None` means, there will never be any (we cannot + /// vote). + fn approval_votes( + &self, + ) -> Option> { match self { - Self::VotedApproval(votes) => Some(&votes), - _ => None, + Self::Voted(votes) => Some(votes.iter().filter_map(|(index, (kind, sig))| { + if let DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking) = kind { + Some((*index, sig)) + } else { + None + } + })), + Self::CannotVote => None, + } + } + + /// Get our votes if there are any. + /// + /// Empty iterator means, no votes. `None` means, there will never be any (we cannot + /// vote). + fn votes(&self) -> Option<&Vec<(ValidatorIndex, (DisputeStatement, ValidatorSignature))>> { + match self { + Self::Voted(votes) => Some(&votes), + Self::CannotVote => None, } } } @@ -170,7 +179,7 @@ impl CandidateVoteState { valid: ValidCandidateVotes::new(), invalid: BTreeMap::new(), }; - Self { votes, own_vote: OwnVoteState::NoVote, dispute_status: None } + Self { votes, own_vote: OwnVoteState::CannotVote, dispute_status: None } } /// Create a new `CandidateVoteState` from already existing votes. @@ -327,16 +336,25 @@ impl CandidateVoteState { self.dispute_status.map_or(false, |s| s.is_confirmed_concluded()) } - /// This machine already cast some vote in that dispute/for that candidate. - pub fn has_own_vote(&self) -> bool { - self.own_vote.voted() + /// Are we a validator in the session, but have not yet voted? + pub fn own_vote_missing(&self) -> bool { + self.own_vote.vote_missing() } /// Own approval votes if any: - pub fn own_approval_votes(&self) -> Option<&Vec<(ValidatorIndex, ValidatorSignature)>> { + pub fn own_approval_votes( + &self, + ) -> Option> { self.own_vote.approval_votes() } + /// Get own votes if there are any. + pub fn own_votes( + &self, + ) -> Option<&Vec<(ValidatorIndex, (DisputeStatement, ValidatorSignature))>> { + self.own_vote.votes() + } + /// Whether or not there is a dispute and it has already enough valid votes to conclude. pub fn has_concluded_for(&self) -> bool { self.dispute_status.map_or(false, |s| s.has_concluded_for()) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 9f4415ba36a6..a186ae17e6d8 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -26,8 +26,7 @@ use futures::{ use sc_keystore::LocalKeystore; use polkadot_node_primitives::{ - disputes::ValidCandidateVotes, CandidateVotes, DisputeMessage, DisputeMessageCheckError, - DisputeStatus, SignedDisputeStatement, Timestamp, + disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement, Timestamp, }; use polkadot_node_subsystem::{ messages::{ @@ -48,6 +47,7 @@ use polkadot_primitives::v2::{ use crate::{ error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, import::{CandidateEnvironment, CandidateVoteState}, + is_potential_spam, metrics::Metrics, status::{get_active_with_status, Clock}, DisputeCoordinatorSubsystem, LOG_TARGET, @@ -55,7 +55,7 @@ use crate::{ use super::{ backend::Backend, - db, + db, make_dispute_message, participation::{ self, Participation, ParticipationPriority, ParticipationRequest, ParticipationStatement, WorkerMessageReceiver, @@ -396,19 +396,19 @@ impl Initialized { CompactStatement::Valid(_) => ValidDisputeStatementKind::BackingValid(relay_parent), }; - debug_assert!( - SignedDisputeStatement::new_checked( + debug_assert!( + SignedDisputeStatement::new_checked( DisputeStatement::Valid(valid_statement_kind), candidate_hash, session, validator_public.clone(), validator_signature.clone(), - ).is_ok(), - "Scraped backing votes had invalid signature! candidate: {:?}, session: {:?}, validator_public: {:?}", - candidate_hash, - session, - validator_public, - ); + ).is_ok(), + "Scraped backing votes had invalid signature! candidate: {:?}, session: {:?}, validator_public: {:?}", + candidate_hash, + session, + validator_public, + ); let signed_dispute_statement = SignedDisputeStatement::new_unchecked_from_trusted_source( DisputeStatement::Valid(valid_statement_kind), @@ -492,20 +492,20 @@ impl Initialized { }) .cloned()?; - debug_assert!( - SignedDisputeStatement::new_checked( + debug_assert!( + SignedDisputeStatement::new_checked( dispute_statement.clone(), candidate_hash, session, validator_public.clone(), validator_signature.clone(), - ).is_ok(), - "Scraped dispute votes had invalid signature! candidate: {:?}, session: {:?}, dispute_statement: {:?}, validator_public: {:?}", - candidate_hash, - session, + ).is_ok(), + "Scraped dispute votes had invalid signature! candidate: {:?}, session: {:?}, dispute_statement: {:?}, validator_public: {:?}", + candidate_hash, + session, dispute_statement, - validator_public, - ); + validator_public, + ); Some(( SignedDisputeStatement::new_unchecked_from_trusted_source( @@ -845,18 +845,16 @@ impl Initialized { let is_included = self.scraper.is_candidate_included(&candidate_hash); let is_backed = self.scraper.is_candidate_backed(&candidate_hash); - let has_own_vote = new_state.has_own_vote(); + let own_vote_missing = new_state.own_vote_missing(); let is_disputed = new_state.is_disputed(); - let has_controlled_indices = !env.controlled_indices().is_empty(); let is_confirmed = new_state.is_confirmed(); - let potential_spam = - !is_included && !is_backed && !new_state.is_confirmed() && !new_state.has_own_vote(); - // We participate only in disputes which are included, backed or confirmed - let allow_participation = is_included || is_backed || is_confirmed; + let potential_spam = is_potential_spam(&self.scraper, &new_state, &candidate_hash); + // We participate only in disputes which are not potential spam. + let allow_participation = !potential_spam; gum::trace!( target: LOG_TARGET, - has_own_vote = ?new_state.has_own_vote(), + ?own_vote_missing, ?potential_spam, ?is_included, ?candidate_hash, @@ -903,7 +901,7 @@ impl Initialized { // - `is_included` lands in prioritised queue // - `is_confirmed` | `is_backed` lands in best effort queue // We don't participate in disputes on finalized candidates. - if !has_own_vote && is_disputed && has_controlled_indices && allow_participation { + if own_vote_missing && is_disputed && allow_participation { let priority = ParticipationPriority::with_priority_if(is_included); gum::trace!( target: LOG_TARGET, @@ -930,9 +928,8 @@ impl Initialized { target: LOG_TARGET, ?candidate_hash, ?is_confirmed, - ?has_own_vote, + ?own_vote_missing, ?is_disputed, - ?has_controlled_indices, ?allow_participation, ?is_included, ?is_backed, @@ -946,10 +943,9 @@ impl Initialized { // Also send any already existing approval vote on new disputes: if import_result.is_freshly_disputed() { - let no_votes = Vec::new(); - let our_approval_votes = new_state.own_approval_votes().unwrap_or(&no_votes); + let our_approval_votes = new_state.own_approval_votes().into_iter().flatten(); for (validator_index, sig) in our_approval_votes { - let pub_key = match env.validators().get(*validator_index) { + let pub_key = match env.validators().get(validator_index) { None => { gum::error!( target: LOG_TARGET, @@ -979,7 +975,7 @@ impl Initialized { env.session_info(), &new_state.votes(), statement, - *validator_index, + validator_index, ) { Err(err) => { gum::error!( @@ -1150,9 +1146,9 @@ impl Initialized { Ok(None) => {}, Err(e) => { gum::error!( - target: LOG_TARGET, - err = ?e, - "Encountered keystore error while signing dispute statement", + target: LOG_TARGET, + err = ?e, + "Encountered keystore error while signing dispute statement", ); }, } @@ -1251,74 +1247,6 @@ impl MaybeCandidateReceipt { } } -#[derive(Debug, thiserror::Error)] -enum DisputeMessageCreationError { - #[error("There was no opposite vote available")] - NoOppositeVote, - #[error("Found vote had an invalid validator index that could not be found")] - InvalidValidatorIndex, - #[error("Statement found in votes had invalid signature.")] - InvalidStoredStatement, - #[error(transparent)] - InvalidStatementCombination(DisputeMessageCheckError), -} - -fn make_dispute_message( - info: &SessionInfo, - votes: &CandidateVotes, - our_vote: SignedDisputeStatement, - our_index: ValidatorIndex, -) -> std::result::Result { - let validators = &info.validators; - - let (valid_statement, valid_index, invalid_statement, invalid_index) = - if let DisputeStatement::Valid(_) = our_vote.statement() { - let (validator_index, (statement_kind, validator_signature)) = - votes.invalid.iter().next().ok_or(DisputeMessageCreationError::NoOppositeVote)?; - let other_vote = SignedDisputeStatement::new_checked( - DisputeStatement::Invalid(*statement_kind), - *our_vote.candidate_hash(), - our_vote.session_index(), - validators - .get(*validator_index) - .ok_or(DisputeMessageCreationError::InvalidValidatorIndex)? - .clone(), - validator_signature.clone(), - ) - .map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?; - (our_vote, our_index, other_vote, *validator_index) - } else { - let (validator_index, (statement_kind, validator_signature)) = votes - .valid - .raw() - .iter() - .next() - .ok_or(DisputeMessageCreationError::NoOppositeVote)?; - let other_vote = SignedDisputeStatement::new_checked( - DisputeStatement::Valid(*statement_kind), - *our_vote.candidate_hash(), - our_vote.session_index(), - validators - .get(*validator_index) - .ok_or(DisputeMessageCreationError::InvalidValidatorIndex)? - .clone(), - validator_signature.clone(), - ) - .map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?; - (other_vote, *validator_index, our_vote, our_index) - }; - - DisputeMessage::from_signed_statements( - valid_statement, - valid_index, - invalid_statement, - invalid_index, - votes.candidate_receipt.clone(), - info, - ) - .map_err(DisputeMessageCreationError::InvalidStatementCombination) -} - /// Determine the best block and its block number. /// Assumes `block_descriptions` are sorted from the one /// with the lowest `BlockNumber` to the highest. diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index e7ac66ce2ece..fd7ae67beaa5 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -28,17 +28,21 @@ use std::sync::Arc; use futures::FutureExt; +use gum::CandidateHash; use sc_keystore::LocalKeystore; -use polkadot_node_primitives::CandidateVotes; +use polkadot_node_primitives::{ + CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement, +}; use polkadot_node_subsystem::{ - overseer, ActivatedLeaf, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, + messages::DisputeDistributionMessage, overseer, ActivatedLeaf, FromOrchestra, OverseerSignal, + SpawnedSubsystem, SubsystemError, }; use polkadot_node_subsystem_util::{ database::Database, rolling_session_window::{DatabaseParams, RollingSessionWindow}, }; -use polkadot_primitives::v2::{ScrapedOnChainVotes, ValidatorIndex, ValidatorPair}; +use polkadot_primitives::v2::{DisputeStatement, ScrapedOnChainVotes, SessionInfo, ValidatorIndex}; use crate::{ error::{FatalResult, JfyiError, Result}, @@ -50,6 +54,7 @@ use db::v1::DbBackend; use fatality::Split; use self::{ + import::{CandidateEnvironment, CandidateVoteState}, participation::{ParticipationPriority, ParticipationRequest}, spam_slots::{SpamSlots, UnconfirmedDisputes}, }; @@ -274,10 +279,13 @@ impl DisputeCoordinatorSubsystem { // Prune obsolete disputes: db::v1::note_earliest_session(overlay_db, rolling_session_window.earliest_session())?; + let now = clock.now(); + let active_disputes = match overlay_db.load_recent_disputes() { - Ok(Some(disputes)) => - get_active_with_status(disputes.into_iter(), clock.now()).collect(), - Ok(None) => Vec::new(), + Ok(disputes) => disputes + .map(|disputes| get_active_with_status(disputes.into_iter(), now)) + .into_iter() + .flatten(), Err(e) => { gum::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e); return Err(e.into()) @@ -285,9 +293,23 @@ impl DisputeCoordinatorSubsystem { }; let mut participation_requests = Vec::new(); - let mut unconfirmed_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new(); + let mut spam_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new(); let (scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?; - for ((session, ref candidate_hash), status) in active_disputes { + for ((session, ref candidate_hash), _) in active_disputes { + let env = + match CandidateEnvironment::new(&self.keystore, &rolling_session_window, session) { + None => { + gum::warn!( + target: LOG_TARGET, + session, + "We are lacking a `SessionInfo` for handling db votes on startup." + ); + + continue + }, + Some(env) => env, + }; + let votes: CandidateVotes = match overlay_db.load_candidate_votes(session, candidate_hash) { Ok(Some(votes)) => votes.into(), @@ -301,60 +323,52 @@ impl DisputeCoordinatorSubsystem { continue }, }; + let vote_state = CandidateVoteState::new(votes, &env, now); - let validators = match rolling_session_window.session_info(session) { - None => { - gum::warn!( + let potential_spam = is_potential_spam(&scraper, &vote_state, candidate_hash); + let is_included = + scraper.is_candidate_included(&vote_state.votes().candidate_receipt.hash()); + + if potential_spam { + gum::trace!( + target: LOG_TARGET, + ?session, + ?candidate_hash, + "Found potential spam dispute on startup" + ); + spam_disputes + .insert((session, *candidate_hash), vote_state.votes().voted_indices()); + } else { + // Participate if need be: + if vote_state.own_vote_missing() { + gum::trace!( target: LOG_TARGET, - session, - "Missing info for session which has an active dispute", + ?session, + ?candidate_hash, + "Found valid dispute, with no vote from us on startup - participating." ); - continue - }, - Some(info) => info.validators.clone(), - }; - - let voted_indices = votes.voted_indices(); - - // Determine if there are any missing local statements for this dispute. Validators are - // filtered if: - // 1) their statement already exists, or - // 2) the validator key is not in the local keystore (i.e. the validator is remote). - // The remaining set only contains local validators that are also missing statements. - let missing_local_statement = validators - .iter() - .enumerate() - .map(|(index, validator)| (ValidatorIndex(index as _), validator)) - .any(|(index, validator)| { - !voted_indices.contains(&index) && - self.keystore - .key_pair::(validator) - .ok() - .map_or(false, |v| v.is_some()) - }); - - let is_included = scraper.is_candidate_included(&votes.candidate_receipt.hash()); - - if !status.is_confirmed_concluded() && !is_included { - unconfirmed_disputes.insert((session, *candidate_hash), voted_indices); - } - - // Participate for all non-concluded disputes which do not have a - // recorded local statement. - if missing_local_statement { - participation_requests.push(( - ParticipationPriority::with_priority_if(is_included), - ParticipationRequest::new(votes.candidate_receipt.clone(), session), - )); + participation_requests.push(( + ParticipationPriority::with_priority_if(is_included), + ParticipationRequest::new( + vote_state.votes().candidate_receipt.clone(), + session, + ), + )); + } + // Else make sure our own vote is distributed: + else { + gum::trace!( + target: LOG_TARGET, + ?session, + ?candidate_hash, + "Found valid dispute, with vote from us on startup - send vote." + ); + send_dispute_messages(ctx, &env, &vote_state).await; + } } } - Ok(( - participation_requests, - votes, - SpamSlots::recover_from_state(unconfirmed_disputes), - scraper, - )) + Ok((participation_requests, votes, SpamSlots::recover_from_state(spam_disputes), scraper)) } } @@ -407,3 +421,144 @@ async fn wait_for_first_leaf(ctx: &mut Context) -> Result( + scraper: &ChainScraper, + vote_state: &CandidateVoteState, + candidate_hash: &CandidateHash, +) -> bool { + let is_disputed = vote_state.is_disputed(); + let is_included = scraper.is_candidate_included(candidate_hash); + let is_backed = scraper.is_candidate_backed(candidate_hash); + let is_confirmed = vote_state.is_confirmed(); + + is_disputed && !is_included && !is_backed && !is_confirmed +} + +/// Tell dispute-distribution to send all our votes. +/// +/// Should be called on startup for all active disputes where there are votes from us already. +#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)] +async fn send_dispute_messages( + ctx: &mut Context, + env: &CandidateEnvironment<'_>, + vote_state: &CandidateVoteState, +) { + for own_vote in vote_state.own_votes().into_iter().flatten() { + let (validator_index, (kind, sig)) = own_vote; + let public_key = if let Some(key) = env.session_info().validators.get(*validator_index) { + key.clone() + } else { + gum::error!( + target: LOG_TARGET, + ?validator_index, + session_index = ?env.session_index(), + "Could not find our own key in `SessionInfo`" + ); + continue + }; + let our_vote_signed = SignedDisputeStatement::new_checked( + kind.clone(), + vote_state.votes().candidate_receipt.hash(), + env.session_index(), + public_key, + sig.clone(), + ); + let our_vote_signed = match our_vote_signed { + Ok(signed) => signed, + Err(()) => { + gum::error!( + target: LOG_TARGET, + "Checking our own signature failed - db corruption?" + ); + continue + }, + }; + let dispute_message = match make_dispute_message( + env.session_info(), + vote_state.votes(), + our_vote_signed, + *validator_index, + ) { + Err(err) => { + gum::debug!(target: LOG_TARGET, ?err, "Creating dispute message failed."); + continue + }, + Ok(dispute_message) => dispute_message, + }; + + ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message)).await; + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DisputeMessageCreationError { + #[error("There was no opposite vote available")] + NoOppositeVote, + #[error("Found vote had an invalid validator index that could not be found")] + InvalidValidatorIndex, + #[error("Statement found in votes had invalid signature.")] + InvalidStoredStatement, + #[error(transparent)] + InvalidStatementCombination(DisputeMessageCheckError), +} + +/// Create a `DisputeMessage` to be sent to `DisputeDistribution`. +pub fn make_dispute_message( + info: &SessionInfo, + votes: &CandidateVotes, + our_vote: SignedDisputeStatement, + our_index: ValidatorIndex, +) -> std::result::Result { + let validators = &info.validators; + + let (valid_statement, valid_index, invalid_statement, invalid_index) = + if let DisputeStatement::Valid(_) = our_vote.statement() { + let (validator_index, (statement_kind, validator_signature)) = + votes.invalid.iter().next().ok_or(DisputeMessageCreationError::NoOppositeVote)?; + let other_vote = SignedDisputeStatement::new_checked( + DisputeStatement::Invalid(*statement_kind), + *our_vote.candidate_hash(), + our_vote.session_index(), + validators + .get(*validator_index) + .ok_or(DisputeMessageCreationError::InvalidValidatorIndex)? + .clone(), + validator_signature.clone(), + ) + .map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?; + (our_vote, our_index, other_vote, *validator_index) + } else { + let (validator_index, (statement_kind, validator_signature)) = votes + .valid + .raw() + .iter() + .next() + .ok_or(DisputeMessageCreationError::NoOppositeVote)?; + let other_vote = SignedDisputeStatement::new_checked( + DisputeStatement::Valid(*statement_kind), + *our_vote.candidate_hash(), + our_vote.session_index(), + validators + .get(*validator_index) + .ok_or(DisputeMessageCreationError::InvalidValidatorIndex)? + .clone(), + validator_signature.clone(), + ) + .map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?; + (other_vote, *validator_index, our_vote, our_index) + }; + + DisputeMessage::from_signed_statements( + valid_statement, + valid_index, + invalid_statement, + invalid_index, + votes.candidate_receipt.clone(), + info, + ) + .map_err(DisputeMessageCreationError::InvalidStatementCombination) +} diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index b5c2a6bd8e3f..12f7cfd3eb43 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -32,7 +32,7 @@ use futures::{ use polkadot_node_subsystem_util::database::Database; use polkadot_node_primitives::{ - DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement, + DisputeMessage, DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement, }; use polkadot_node_subsystem::{ messages::{ @@ -291,6 +291,7 @@ impl TestState { .await; } + /// Returns any sent `DisputeMessage`s. async fn handle_sync_queries( &mut self, virtual_overseer: &mut VirtualOverseer, @@ -298,7 +299,7 @@ impl TestState { block_number: BlockNumber, session: SessionIndex, candidate_events: Vec, - ) { + ) -> Vec { // Order of messages is not fixed (different on initializing): #[derive(Debug)] struct FinishedSteps { @@ -316,6 +317,7 @@ impl TestState { } let mut finished_steps = FinishedSteps::new(); + let mut sent_disputes = Vec::new(); while !finished_steps.is_done() { let recv = overseer_recv(virtual_overseer).await; @@ -400,6 +402,9 @@ impl TestState { let block_num = self.headers.get(&hash).map(|header| header.number); tx.send(Ok(block_num)).unwrap(); }, + AllMessages::DisputeDistribution(DisputeDistributionMessage::SendDispute(msg)) => { + sent_disputes.push(msg); + }, AllMessages::RuntimeApi(RuntimeApiMessage::Request( _new_leaf, RuntimeApiRequest::CandidateEvents(tx), @@ -439,14 +444,25 @@ impl TestState { }, } } + return sent_disputes } async fn handle_resume_sync( &mut self, virtual_overseer: &mut VirtualOverseer, session: SessionIndex, - ) { + ) -> Vec { + self.handle_resume_sync_with_events(virtual_overseer, session, Vec::new()).await + } + + async fn handle_resume_sync_with_events( + &mut self, + virtual_overseer: &mut VirtualOverseer, + session: SessionIndex, + mut initial_events: Vec, + ) -> Vec { let leaves: Vec = self.headers.keys().cloned().collect(); + let mut messages = Vec::new(); for (n, leaf) in leaves.iter().enumerate() { gum::debug!( block_number= ?n, @@ -463,15 +479,14 @@ impl TestState { ))) .await; - self.handle_sync_queries( - virtual_overseer, - *leaf, - n as BlockNumber, - session, - Vec::new(), - ) - .await; + let events = if n == 1 { std::mem::take(&mut initial_events) } else { Vec::new() }; + + let mut new_messages = self + .handle_sync_queries(virtual_overseer, *leaf, n as BlockNumber, session, events) + .await; + messages.append(&mut new_messages); } + messages } fn session_info(&self) -> SessionInfo { @@ -2148,6 +2163,7 @@ fn concluded_supermajority_against_non_active_after_time() { #[test] fn resume_dispute_without_local_statement() { + sp_tracing::init_for_tests(); let session = 1; test_harness(|mut test_state, mut virtual_overseer| { @@ -2188,10 +2204,8 @@ fn resume_dispute_without_local_statement() { handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; - // Missing availability -> No local vote. // Participation won't happen here because the dispute is neither backed, not confirmed // nor the candidate is included. Or in other words - we'll refrain from participation. - assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); { @@ -2216,7 +2230,17 @@ fn resume_dispute_without_local_statement() { // local statement for the active dispute. .resume(|mut test_state, mut virtual_overseer| { Box::pin(async move { - test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = make_valid_candidate_receipt(); + // Candidate is now backed: + let dispute_messages = test_state + .handle_resume_sync_with_events( + &mut virtual_overseer, + session, + vec![make_candidate_backed_event(candidate_receipt.clone())], + ) + .await; + + assert_eq!(dispute_messages.len(), 0, "We don't expect any messages sent here."); let candidate_receipt = make_valid_candidate_receipt(); let candidate_hash = candidate_receipt.hash(); @@ -2282,6 +2306,7 @@ fn resume_dispute_without_local_statement() { #[test] fn resume_dispute_with_local_statement() { + sp_tracing::init_for_tests(); let session = 1; test_harness(|mut test_state, mut virtual_overseer| { @@ -2359,10 +2384,19 @@ fn resume_dispute_with_local_statement() { }) }) // Alice should not send a DisputeParticiationMessage::Participate on restart since she has a - // local statement for the active dispute. + // local statement for the active dispute, instead she should try to (re-)send her vote. .resume(|mut test_state, mut virtual_overseer| { + let candidate_receipt = make_valid_candidate_receipt(); Box::pin(async move { - test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let messages = test_state + .handle_resume_sync_with_events( + &mut virtual_overseer, + session, + vec![make_candidate_backed_event(candidate_receipt.clone())], + ) + .await; + + assert_eq!(messages.len(), 1, "A message should have gone out."); // Assert that subsystem is not sending Participation messages because we issued a local statement assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); @@ -2390,7 +2424,12 @@ fn resume_dispute_without_local_statement_or_local_key() { let candidate_hash = candidate_receipt.hash(); test_state - .activate_leaf_at_session(&mut virtual_overseer, session, 1, Vec::new()) + .activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + vec![make_candidate_included_event(candidate_receipt.clone())], + ) .await; let (valid_vote, invalid_vote) = generate_opposing_votes_pair( @@ -2464,101 +2503,6 @@ fn resume_dispute_without_local_statement_or_local_key() { }); } -#[test] -fn resume_dispute_with_local_statement_without_local_key() { - let session = 1; - - let test_state = TestState::default(); - let mut test_state = test_state.resume(|mut test_state, mut virtual_overseer| { - Box::pin(async move { - test_state.handle_resume_sync(&mut virtual_overseer, session).await; - - let candidate_receipt = make_valid_candidate_receipt(); - let candidate_hash = candidate_receipt.hash(); - - test_state - .activate_leaf_at_session(&mut virtual_overseer, session, 1, Vec::new()) - .await; - - let local_valid_vote = test_state - .issue_explicit_statement_with_index( - ValidatorIndex(0), - candidate_hash, - session, - true, - ) - .await; - - let (valid_vote, invalid_vote) = generate_opposing_votes_pair( - &test_state, - ValidatorIndex(1), - ValidatorIndex(2), - candidate_hash, - session, - VoteType::Explicit, - ) - .await; - - let (pending_confirmation, confirmation_rx) = oneshot::channel(); - virtual_overseer - .send(FromOrchestra::Communication { - msg: DisputeCoordinatorMessage::ImportStatements { - candidate_receipt: candidate_receipt.clone(), - session, - statements: vec![ - (local_valid_vote, ValidatorIndex(0)), - (valid_vote, ValidatorIndex(1)), - (invalid_vote, ValidatorIndex(2)), - ], - pending_confirmation: Some(pending_confirmation), - }, - }) - .await; - handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) - .await; - - assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); - - { - let (tx, rx) = oneshot::channel(); - - virtual_overseer - .send(FromOrchestra::Communication { - msg: DisputeCoordinatorMessage::ActiveDisputes(tx), - }) - .await; - - assert_eq!(rx.await.unwrap().len(), 1); - } - - virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; - assert!(virtual_overseer.try_recv().await.is_none()); - - test_state - }) - }); - - // No keys: - test_state.subsystem_keystore = - make_keystore(vec![Sr25519Keyring::Two.to_seed()].into_iter()).into(); - // Two should not send a DisputeParticiationMessage::Participate on restart since we gave - // her a non existing key. - test_state.resume(|mut test_state, mut virtual_overseer| { - Box::pin(async move { - test_state.handle_resume_sync(&mut virtual_overseer, session).await; - - // Assert that subsystem is not sending Participation messages because we don't - // have a key. - assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); - - virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; - assert!(virtual_overseer.try_recv().await.is_none()); - - test_state - }) - }); -} - #[test] fn issue_valid_local_statement_does_cause_distribution_but_not_duplicate_participation() { issue_local_statement_does_cause_distribution_but_not_duplicate_participation(true); diff --git a/node/network/dispute-distribution/src/lib.rs b/node/network/dispute-distribution/src/lib.rs index f109d5e6a40e..babdb303bbda 100644 --- a/node/network/dispute-distribution/src/lib.rs +++ b/node/network/dispute-distribution/src/lib.rs @@ -29,6 +29,7 @@ use std::{num::NonZeroUsize, time::Duration}; use futures::{channel::mpsc, FutureExt, StreamExt, TryFutureExt}; use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery; +use polkadot_node_subsystem_util::nesting_sender::NestingSender; use sp_keystore::SyncCryptoStorePtr; use polkadot_node_network_protocol::request_response::{incoming::IncomingRequestReceiver, v1}; @@ -51,33 +52,33 @@ use polkadot_node_subsystem_util::{runtime, runtime::RuntimeInfo}; /// to this subsystem, unknown dispute. This is to make sure, we get our vote out, even on /// restarts. /// -/// The actual work of sending and keeping track of transmission attempts to each validator for a -/// particular dispute are done by [`SendTask`]. The purpose of the `DisputeSender` is to keep -/// track of all ongoing disputes and start and clean up `SendTask`s accordingly. +/// The actual work of sending and keeping track of transmission attempts to each validator for a +/// particular dispute are done by [`SendTask`]. The purpose of the `DisputeSender` is to keep +/// track of all ongoing disputes and start and clean up `SendTask`s accordingly. mod sender; -use self::sender::{DisputeSender, TaskFinish}; +use self::sender::{DisputeSender, DisputeSenderMessage}; -/// ## The receiver [`DisputesReceiver`] +/// ## The receiver [`DisputesReceiver`] /// -/// The receiving side is implemented as `DisputesReceiver` and is run as a separate long running task within -/// this subsystem ([`DisputesReceiver::run`]). +/// The receiving side is implemented as `DisputesReceiver` and is run as a separate long running task within +/// this subsystem ([`DisputesReceiver::run`]). /// -/// Conceptually all the receiver has to do, is waiting for incoming requests which are passed in -/// via a dedicated channel and forwarding them to the dispute coordinator via -/// `DisputeCoordinatorMessage::ImportStatements`. Being the interface to the network and untrusted -/// nodes, the reality is not that simple of course. Before importing statements the receiver will -/// batch up imports as well as possible for efficient imports while maintaining timely dispute -/// resolution and handling of spamming validators: +/// Conceptually all the receiver has to do, is waiting for incoming requests which are passed in +/// via a dedicated channel and forwarding them to the dispute coordinator via +/// `DisputeCoordinatorMessage::ImportStatements`. Being the interface to the network and untrusted +/// nodes, the reality is not that simple of course. Before importing statements the receiver will +/// batch up imports as well as possible for efficient imports while maintaining timely dispute +/// resolution and handling of spamming validators: /// -/// - Drop all messages from non validator nodes, for this it requires the [`AuthorityDiscovery`] -/// service. -/// - Drop messages from a node, if it sends at a too high rate. -/// - Filter out duplicate messages (over some period of time). -/// - Drop any obviously invalid votes (invalid signatures for example). -/// - Ban peers whose votes were deemed invalid. +/// - Drop all messages from non validator nodes, for this it requires the [`AuthorityDiscovery`] +/// service. +/// - Drop messages from a node, if it sends at a too high rate. +/// - Filter out duplicate messages (over some period of time). +/// - Drop any obviously invalid votes (invalid signatures for example). +/// - Ban peers whose votes were deemed invalid. /// -/// In general dispute-distribution works on limiting the work the dispute-coordinator will have to -/// do, while at the same time making it aware of new disputes as fast as possible. +/// In general dispute-distribution works on limiting the work the dispute-coordinator will have to +/// do, while at the same time making it aware of new disputes as fast as possible. /// /// For successfully imported votes, we will confirm the receipt of the message back to the sender. /// This way a received confirmation guarantees, that the vote has been stored to disk by the @@ -87,7 +88,7 @@ use self::receiver::DisputesReceiver; /// Error and [`Result`] type for this subsystem. mod error; -use error::{log_error, FatalError, FatalResult, Result}; +use error::{log_error, Error, FatalError, FatalResult, Result}; #[cfg(test)] mod tests; @@ -118,10 +119,10 @@ pub struct DisputeDistributionSubsystem { runtime: RuntimeInfo, /// Sender for our dispute requests. - disputes_sender: DisputeSender, + disputes_sender: DisputeSender, - /// Receive messages from `SendTask`. - sender_rx: mpsc::Receiver, + /// Receive messages from `DisputeSender` background tasks. + sender_rx: mpsc::Receiver, /// Receiver for incoming requests. req_receiver: Option>, @@ -167,7 +168,7 @@ where session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) .expect("Dispute window can not be 0; qed"), }); - let (tx, sender_rx) = mpsc::channel(1); + let (tx, sender_rx) = NestingSender::new_root(1); let disputes_sender = DisputeSender::new(tx, metrics.clone()); Self { runtime, @@ -216,9 +217,16 @@ where log_error(result, "on FromOrchestra")?; }, MuxedMessage::Sender(result) => { - self.disputes_sender - .on_task_message(result.ok_or(FatalError::SenderExhausted)?) - .await; + let result = self + .disputes_sender + .on_message( + &mut ctx, + &mut self.runtime, + result.ok_or(FatalError::SenderExhausted)?, + ) + .await + .map_err(Error::Sender); + log_error(result, "on_message")?; }, } } @@ -260,14 +268,14 @@ enum MuxedMessage { /// Messages from other subsystems. Subsystem(FatalResult>), /// Messages from spawned sender background tasks. - Sender(Option), + Sender(Option), } #[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)] impl MuxedMessage { async fn receive( ctx: &mut Context, - from_sender: &mut mpsc::Receiver, + from_sender: &mut mpsc::Receiver, ) -> Self { // We are only fusing here to make `select` happy, in reality we will quit if the stream // ends. diff --git a/node/network/dispute-distribution/src/sender/mod.rs b/node/network/dispute-distribution/src/sender/mod.rs index a54033945d6f..d373b1861f19 100644 --- a/node/network/dispute-distribution/src/sender/mod.rs +++ b/node/network/dispute-distribution/src/sender/mod.rs @@ -21,19 +21,17 @@ use std::{ time::Duration, }; -use futures::{ - channel::{mpsc, oneshot}, - future::poll_fn, - Future, -}; +use futures::{channel::oneshot, future::poll_fn, Future}; use futures_timer::Delay; use indexmap::{map::Entry, IndexMap}; use polkadot_node_network_protocol::request_response::v1::DisputeRequest; -use polkadot_node_primitives::{CandidateVotes, DisputeMessage, SignedDisputeStatement}; -use polkadot_node_subsystem::{messages::DisputeCoordinatorMessage, overseer, ActiveLeavesUpdate}; -use polkadot_node_subsystem_util::runtime::RuntimeInfo; -use polkadot_primitives::v2::{CandidateHash, DisputeStatement, Hash, SessionIndex}; +use polkadot_node_primitives::{DisputeMessage, DisputeStatus}; +use polkadot_node_subsystem::{ + messages::DisputeCoordinatorMessage, overseer, ActiveLeavesUpdate, SubsystemSender, +}; +use polkadot_node_subsystem_util::{nesting_sender::NestingSender, runtime::RuntimeInfo}; +use polkadot_primitives::v2::{CandidateHash, Hash, SessionIndex}; /// For each ongoing dispute we have a `SendTask` which takes care of it. /// @@ -53,6 +51,15 @@ pub use error::{Error, FatalError, JfyiError, Result}; use self::error::JfyiErrorResult; use crate::{Metrics, LOG_TARGET, SEND_RATE_LIMIT}; +/// Messages as sent by background tasks. +#[derive(Debug)] +pub enum DisputeSenderMessage { + /// A task finished. + TaskFinish(TaskFinish), + /// A request for active disputes to the dispute-coordinator finished. + ActiveDisputesReady(JfyiErrorResult>), +} + /// The `DisputeSender` keeps track of all ongoing disputes we need to send statements out. /// /// For each dispute a `SendTask` is responsible for sending to the concerned validators for that @@ -60,7 +67,7 @@ use crate::{Metrics, LOG_TARGET, SEND_RATE_LIMIT}; /// sessions/validator sets and cleans them up when they become obsolete. /// /// The unit of work for the `DisputeSender` is a dispute, represented by `SendTask`s. -pub struct DisputeSender { +pub struct DisputeSender { /// All heads we currently consider active. active_heads: Vec, @@ -72,10 +79,13 @@ pub struct DisputeSender { /// All ongoing dispute sendings this subsystem is aware of. /// /// Using an `IndexMap` so items can be iterated in the order of insertion. - disputes: IndexMap, + disputes: IndexMap>, /// Sender to be cloned for `SendTask`s. - tx: mpsc::Sender, + tx: NestingSender, + + /// `Some` if we are waiting for a response `DisputeCoordinatorMessage::ActiveDisputes`. + waiting_for_active_disputes: Option, /// Future for delaying too frequent creation of dispute sending tasks. rate_limit: RateLimit, @@ -84,15 +94,25 @@ pub struct DisputeSender { metrics: Metrics, } +/// State we keep while waiting for active disputes. +/// +/// When we send `DisputeCoordinatorMessage::ActiveDisputes`, this is the state we keep while +/// waiting for the response. +struct WaitForActiveDisputesState { + /// Have we seen any new sessions since last refresh? + have_new_sessions: bool, +} + #[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)] -impl DisputeSender { +impl DisputeSender { /// Create a new `DisputeSender` which can be used to start dispute sendings. - pub fn new(tx: mpsc::Sender, metrics: Metrics) -> Self { + pub fn new(tx: NestingSender, metrics: Metrics) -> Self { Self { active_heads: Vec::new(), active_sessions: HashMap::new(), disputes: IndexMap::new(), tx, + waiting_for_active_disputes: None, rate_limit: RateLimit::new(), metrics, } @@ -122,7 +142,7 @@ impl DisputeSender { ctx, runtime, &self.active_sessions, - self.tx.clone(), + NestingSender::new(self.tx.clone(), DisputeSenderMessage::TaskFinish), req, &self.metrics, ) @@ -133,14 +153,47 @@ impl DisputeSender { Ok(()) } + /// Receive message from a background task. + pub async fn on_message( + &mut self, + ctx: &mut Context, + runtime: &mut RuntimeInfo, + msg: DisputeSenderMessage, + ) -> Result<()> { + match msg { + DisputeSenderMessage::TaskFinish(msg) => { + let TaskFinish { candidate_hash, receiver, result } = msg; + + self.metrics.on_sent_request(result.as_metrics_label()); + + let task = match self.disputes.get_mut(&candidate_hash) { + None => { + // Can happen when a dispute ends, with messages still in queue: + gum::trace!( + target: LOG_TARGET, + ?result, + "Received `FromSendingTask::Finished` for non existing dispute." + ); + return Ok(()) + }, + Some(task) => task, + }; + task.on_finished_send(&receiver, result); + }, + DisputeSenderMessage::ActiveDisputesReady(result) => { + let state = self.waiting_for_active_disputes.take(); + let have_new_sessions = state.map(|s| s.have_new_sessions).unwrap_or(false); + let active_disputes = result?; + self.handle_new_active_disputes(ctx, runtime, active_disputes, have_new_sessions) + .await?; + }, + } + Ok(()) + } + /// Take care of a change in active leaves. /// - /// - Initiate a retry of failed sends which are still active. - /// - Get new authorities to send messages to. - /// - Get rid of obsolete tasks and disputes. - /// - Get dispute sending started in case we missed one for some reason (e.g. on node startup) - /// - /// This function ensures the `SEND_RATE_LIMIT`, therefore it might block. + /// Update our knowledge on sessions and initiate fetching for new active disputes. pub async fn update_leaves( &mut self, ctx: &mut Context, @@ -154,14 +207,58 @@ impl DisputeSender { let have_new_sessions = self.refresh_sessions(ctx, runtime).await?; - let active_disputes = get_active_disputes(ctx).await?; - let unknown_disputes = { - let mut disputes = active_disputes.clone(); - disputes.retain(|(_, c)| !self.disputes.contains_key(c)); - disputes - }; + // Not yet waiting for data, request an update: + match self.waiting_for_active_disputes.take() { + None => { + self.waiting_for_active_disputes = + Some(WaitForActiveDisputesState { have_new_sessions }); + let mut sender = ctx.sender().clone(); + let mut tx = self.tx.clone(); + + let get_active_disputes_task = async move { + let result = get_active_disputes(&mut sender).await; + let result = + tx.send_message(DisputeSenderMessage::ActiveDisputesReady(result)).await; + if let Err(err) = result { + gum::debug!( + target: LOG_TARGET, + ?err, + "Sending `DisputeSenderMessage` from background task failed." + ); + } + }; + + ctx.spawn("get_active_disputes", Box::pin(get_active_disputes_task)) + .map_err(FatalError::SpawnTask)?; + }, + Some(state) => { + let have_new_sessions = state.have_new_sessions || have_new_sessions; + let new_state = WaitForActiveDisputesState { have_new_sessions }; + self.waiting_for_active_disputes = Some(new_state); + gum::debug!( + target: LOG_TARGET, + "Dispute coordinator slow? We are still waiting for data on next active leaves update." + ); + }, + } + Ok(()) + } - let active_disputes: HashSet<_> = active_disputes.into_iter().map(|(_, c)| c).collect(); + /// Handle new active disputes response. + /// + /// - Initiate a retry of failed sends which are still active. + /// - Get new authorities to send messages to. + /// - Get rid of obsolete tasks and disputes. + /// + /// This function ensures the `SEND_RATE_LIMIT`, therefore it might block. + async fn handle_new_active_disputes( + &mut self, + ctx: &mut Context, + runtime: &mut RuntimeInfo, + active_disputes: Vec<(SessionIndex, CandidateHash, DisputeStatus)>, + have_new_sessions: bool, + ) -> Result<()> { + let active_disputes: HashSet<_> = active_disputes.into_iter().map(|(_, c, _)| c).collect(); // Cleanup obsolete senders (retain keeps order of remaining elements): self.disputes @@ -188,165 +285,9 @@ impl DisputeSender { should_rate_limit = sends_happened && have_new_sessions; } } - - // This should only be non-empty on startup, but if not - we got you covered. - // - // Initial order will not be maintained in that case, but that should be fine as disputes - // recovered at startup will be relatively "old" anyway and we assume that no more than a - // third of the validators will go offline at any point in time anyway. - for dispute in unknown_disputes { - // Rate limiting handled inside `start_send_for_dispute` (calls `start_sender`). - self.start_send_for_dispute(ctx, runtime, dispute).await?; - } Ok(()) } - /// Receive message from a sending task. - pub async fn on_task_message(&mut self, msg: TaskFinish) { - let TaskFinish { candidate_hash, receiver, result } = msg; - - self.metrics.on_sent_request(result.as_metrics_label()); - - let task = match self.disputes.get_mut(&candidate_hash) { - None => { - // Can happen when a dispute ends, with messages still in queue: - gum::trace!( - target: LOG_TARGET, - ?result, - "Received `FromSendingTask::Finished` for non existing dispute." - ); - return - }, - Some(task) => task, - }; - task.on_finished_send(&receiver, result); - } - - /// Call `start_sender` on all passed in disputes. - /// - /// Recover necessary votes for building up `DisputeMessage` and start sending for all of them. - async fn start_send_for_dispute( - &mut self, - ctx: &mut Context, - runtime: &mut RuntimeInfo, - dispute: (SessionIndex, CandidateHash), - ) -> Result<()> { - let (session_index, candidate_hash) = dispute; - // A relay chain head is required as context for receiving session info information from runtime and - // storage. We will iterate `active_sessions` to find a suitable head. We assume that there is at - // least one active head which, by `session_index`, is at least as recent as the `dispute` passed in. - // We need to avoid picking an older one from a session that might not yet exist in storage. - // Related to . - let ref_head = self - .active_sessions - .iter() - .find_map(|(active_session_index, head_hash)| { - // There might be more than one session index that is at least as recent as the dispute - // so we just pick the first one. Keep in mind we are talking about the session index for the - // child of block identified by `head_hash` and not the session index for the block. - if active_session_index >= &session_index { - Some(head_hash) - } else { - None - } - }) - .ok_or(JfyiError::NoActiveHeads)?; - - let info = runtime - .get_session_info_by_index(ctx.sender(), *ref_head, session_index) - .await?; - let our_index = match info.validator_info.our_index { - None => { - gum::trace!( - target: LOG_TARGET, - "Not a validator in that session - not starting dispute sending." - ); - return Ok(()) - }, - Some(index) => index, - }; - - let votes = match get_candidate_votes(ctx, session_index, candidate_hash).await? { - None => { - gum::debug!( - target: LOG_TARGET, - ?session_index, - ?candidate_hash, - "No votes for active dispute?! - possible, due to race." - ); - return Ok(()) - }, - Some(votes) => votes, - }; - - let our_valid_vote = votes.valid.raw().get(&our_index); - - let our_invalid_vote = votes.invalid.get(&our_index); - - let (valid_vote, invalid_vote) = if let Some(our_valid_vote) = our_valid_vote { - // Get some invalid vote as well: - let invalid_vote = - votes.invalid.iter().next().ok_or(JfyiError::MissingVotesFromCoordinator)?; - ((&our_index, our_valid_vote), invalid_vote) - } else if let Some(our_invalid_vote) = our_invalid_vote { - // Get some valid vote as well: - let valid_vote = - votes.valid.raw().iter().next().ok_or(JfyiError::MissingVotesFromCoordinator)?; - (valid_vote, (&our_index, our_invalid_vote)) - } else { - // There is no vote from us yet - nothing to do. - return Ok(()) - }; - let (valid_index, (kind, signature)) = valid_vote; - let valid_public = info - .session_info - .validators - .get(*valid_index) - .ok_or(JfyiError::InvalidStatementFromCoordinator)?; - let valid_signed = SignedDisputeStatement::new_checked( - DisputeStatement::Valid(*kind), - candidate_hash, - session_index, - valid_public.clone(), - signature.clone(), - ) - .map_err(|()| JfyiError::InvalidStatementFromCoordinator)?; - - let (invalid_index, (kind, signature)) = invalid_vote; - let invalid_public = info - .session_info - .validators - .get(*invalid_index) - .ok_or(JfyiError::InvalidValidatorIndexFromCoordinator)?; - let invalid_signed = SignedDisputeStatement::new_checked( - DisputeStatement::Invalid(*kind), - candidate_hash, - session_index, - invalid_public.clone(), - signature.clone(), - ) - .map_err(|()| JfyiError::InvalidValidatorIndexFromCoordinator)?; - - // Reconstructing the checked signed dispute statements is hardly useful here and wasteful, - // but I don't want to enable a bypass for the below smart constructor and this code path - // is supposed to be only hit on startup basically. - // - // Revisit this decision when the `from_signed_statements` is unneeded for the normal code - // path as well. - let message = DisputeMessage::from_signed_statements( - valid_signed, - *valid_index, - invalid_signed, - *invalid_index, - votes.candidate_receipt, - &info.session_info, - ) - .map_err(JfyiError::InvalidDisputeFromCoordinator)?; - - // Finally, get the party started: - self.start_sender(ctx, runtime, message).await - } - /// Make active sessions correspond to currently active heads. /// /// Returns: true if sessions changed. @@ -431,33 +372,14 @@ async fn get_active_session_indices( } /// Retrieve Set of active disputes from the dispute coordinator. -#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)] -async fn get_active_disputes( - ctx: &mut Context, -) -> JfyiErrorResult> { +async fn get_active_disputes( + sender: &mut Sender, +) -> JfyiErrorResult> +where + Sender: SubsystemSender, +{ let (tx, rx) = oneshot::channel(); - // Caller scope is in `update_leaves` and this is bounded by fork count. - ctx.send_unbounded_message(DisputeCoordinatorMessage::ActiveDisputes(tx)); - rx.await - .map_err(|_| JfyiError::AskActiveDisputesCanceled) - .map(|disputes| disputes.into_iter().map(|d| (d.0, d.1)).collect()) -} - -/// Get all locally available dispute votes for a given dispute. -#[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)] -async fn get_candidate_votes( - ctx: &mut Context, - session_index: SessionIndex, - candidate_hash: CandidateHash, -) -> JfyiErrorResult> { - let (tx, rx) = oneshot::channel(); - // Caller scope is in `update_leaves` and this is bounded by fork count. - ctx.send_unbounded_message(DisputeCoordinatorMessage::QueryCandidateVotes( - vec![(session_index, candidate_hash)], - tx, - )); - rx.await - .map(|v| v.get(0).map(|inner| inner.to_owned().2)) - .map_err(|_| JfyiError::AskCandidateVotesCanceled) + sender.send_message(DisputeCoordinatorMessage::ActiveDisputes(tx)).await; + rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled) } diff --git a/node/network/dispute-distribution/src/sender/send_task.rs b/node/network/dispute-distribution/src/sender/send_task.rs index 3852adbc141b..20d72efc145c 100644 --- a/node/network/dispute-distribution/src/sender/send_task.rs +++ b/node/network/dispute-distribution/src/sender/send_task.rs @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet}; -use futures::{channel::mpsc, future::RemoteHandle, Future, FutureExt, SinkExt}; +use futures::{future::RemoteHandle, Future, FutureExt}; use polkadot_node_network_protocol::{ request_response::{ @@ -27,7 +27,7 @@ use polkadot_node_network_protocol::{ IfDisconnected, }; use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, overseer}; -use polkadot_node_subsystem_util::{metrics, runtime::RuntimeInfo}; +use polkadot_node_subsystem_util::{metrics, nesting_sender::NestingSender, runtime::RuntimeInfo}; use polkadot_primitives::v2::{ AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, ValidatorIndex, }; @@ -44,7 +44,7 @@ use crate::{ /// Keeps track of all the validators that have to be reached for a dispute. /// /// The unit of work for a `SendTask` is an authority/validator. -pub struct SendTask { +pub struct SendTask { /// The request we are supposed to get out to all `parachain` validators of the dispute's session /// and to all current authorities. request: DisputeRequest, @@ -58,7 +58,7 @@ pub struct SendTask { has_failed_sends: bool, /// Sender to be cloned for tasks. - tx: mpsc::Sender, + tx: NestingSender, } /// Status of a particular vote/statement delivery to a particular validator. @@ -100,7 +100,7 @@ impl TaskResult { } #[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)] -impl SendTask { +impl SendTask { /// Initiates sending a dispute message to peers. /// /// Creation of new `SendTask`s is subject to rate limiting. As each `SendTask` will trigger @@ -110,7 +110,7 @@ impl SendTask { ctx: &mut Context, runtime: &mut RuntimeInfo, active_sessions: &HashMap, - tx: mpsc::Sender, + tx: NestingSender, request: DisputeRequest, metrics: &Metrics, ) -> Result { @@ -272,9 +272,9 @@ impl SendTask { /// /// And spawn tasks for handling the response. #[overseer::contextbounds(DisputeDistribution, prefix = self::overseer)] -async fn send_requests( +async fn send_requests( ctx: &mut Context, - tx: mpsc::Sender, + tx: NestingSender, receivers: Vec, req: DisputeRequest, metrics: &Metrics, @@ -307,11 +307,11 @@ async fn send_requests( } /// Future to be spawned in a task for awaiting a response. -async fn wait_response_task( +async fn wait_response_task( pending_response: impl Future>, candidate_hash: CandidateHash, receiver: AuthorityDiscoveryId, - mut tx: mpsc::Sender, + mut tx: NestingSender, _timer: Option, ) { let result = pending_response.await; @@ -320,7 +320,7 @@ async fn wait_response_task( Ok(DisputeResponse::Confirmed) => TaskFinish { candidate_hash, receiver, result: TaskResult::Succeeded }, }; - if let Err(err) = tx.feed(msg).await { + if let Err(err) = tx.send_message(msg).await { gum::debug!( target: LOG_TARGET, %err, diff --git a/node/network/dispute-distribution/src/tests/mod.rs b/node/network/dispute-distribution/src/tests/mod.rs index d6381239965b..42709ecd5475 100644 --- a/node/network/dispute-distribution/src/tests/mod.rs +++ b/node/network/dispute-distribution/src/tests/mod.rs @@ -45,7 +45,7 @@ use polkadot_node_network_protocol::{ request_response::{v1::DisputeResponse, Recipient, Requests}, IfDisconnected, }; -use polkadot_node_primitives::{CandidateVotes, DisputeStatus, UncheckedDisputeMessage}; +use polkadot_node_primitives::DisputeStatus; use polkadot_node_subsystem::{ messages::{ AllMessages, DisputeCoordinatorMessage, DisputeDistributionMessage, ImportStatementsResult, @@ -479,65 +479,6 @@ fn receive_rate_limit_is_enforced() { test_harness(test); } -#[test] -fn disputes_are_recovered_at_startup() { - let test = |mut handle: TestSubsystemContextHandle, _| async move { - let relay_parent = Hash::random(); - let candidate = make_candidate_receipt(relay_parent); - - let _ = handle_subsystem_startup(&mut handle, Some(candidate.hash())).await; - - let message = make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX).await; - // Requests needed session info: - assert_matches!( - handle.recv().await, - AllMessages::DisputeCoordinator( - DisputeCoordinatorMessage::QueryCandidateVotes( - query, - tx, - ) - ) => { - let (session_index, candidate_hash) = query.get(0).unwrap().clone(); - assert_eq!(session_index, MOCK_SESSION_INDEX); - assert_eq!(candidate_hash, candidate.hash()); - let unchecked: UncheckedDisputeMessage = message.into(); - tx.send(vec![(session_index, candidate_hash, CandidateVotes { - candidate_receipt: candidate, - valid: [( - unchecked.valid_vote.validator_index, - (unchecked.valid_vote.kind, - unchecked.valid_vote.signature - ), - )].into_iter().collect(), - invalid: [( - unchecked.invalid_vote.validator_index, - ( - unchecked.invalid_vote.kind, - unchecked.invalid_vote.signature - ), - )].into_iter().collect(), - })]) - .expect("Receiver should stay alive."); - } - ); - - let expected_receivers = { - let info = &MOCK_SESSION_INFO; - info.discovery_keys - .clone() - .into_iter() - .filter(|a| a != &Sr25519Keyring::Ferdie.public().into()) - .collect() - // All validators are also authorities in the first session, so we are - // done here. - }; - check_sent_requests(&mut handle, expected_receivers, true).await; - - conclude(&mut handle).await; - }; - test_harness(test); -} - #[test] fn send_dispute_gets_cleaned_up() { let test = |mut handle: TestSubsystemContextHandle, _| async move { @@ -605,6 +546,7 @@ fn send_dispute_gets_cleaned_up() { #[test] fn dispute_retries_and_works_across_session_boundaries() { + sp_tracing::try_init_simple(); let test = |mut handle: TestSubsystemContextHandle, _| async move { let old_head = handle_subsystem_startup(&mut handle, None).await; diff --git a/node/primitives/src/disputes/message.rs b/node/primitives/src/disputes/message.rs index c31ff1ecb283..02b33434b9b2 100644 --- a/node/primitives/src/disputes/message.rs +++ b/node/primitives/src/disputes/message.rs @@ -28,7 +28,7 @@ use polkadot_primitives::v2::{ CandidateReceipt, DisputeStatement, SessionIndex, SessionInfo, ValidatorIndex, }; -/// A dispute initiating/participating message that is guaranteed to have been built from signed +/// A dispute initiating/participating message that have been built from signed /// statements. /// /// And most likely has been constructed correctly. This is used with diff --git a/node/subsystem-util/Cargo.toml b/node/subsystem-util/Cargo.toml index 90abc62f97af..6d8f12be16c0 100644 --- a/node/subsystem-util/Cargo.toml +++ b/node/subsystem-util/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true [dependencies] async-trait = "0.1.57" futures = "0.3.21" +futures-channel = "0.3.23" itertools = "0.10" parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] } parking_lot = "0.11.2" diff --git a/node/subsystem-util/src/lib.rs b/node/subsystem-util/src/lib.rs index 7bfada5757a1..293577279eb8 100644 --- a/node/subsystem-util/src/lib.rs +++ b/node/subsystem-util/src/lib.rs @@ -72,6 +72,12 @@ pub mod runtime; /// Database trait for subsystem. pub mod database; +/// Nested message sending +/// +/// Useful for having mostly synchronous code, with submodules spawning short lived asynchronous +/// tasks, sending messages back. +pub mod nesting_sender; + mod determine_new_blocks; #[cfg(test)] diff --git a/node/subsystem-util/src/nesting_sender.rs b/node/subsystem-util/src/nesting_sender.rs new file mode 100644 index 000000000000..34ff7d84cbf1 --- /dev/null +++ b/node/subsystem-util/src/nesting_sender.rs @@ -0,0 +1,207 @@ +// Copyright 2022-2023 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! ## Background +//! +//! Writing concurrent and even multithreaded by default is inconvenient and slow: No references +//! hence lots of needless cloning and data duplication, locks, mutexes, ... We should reach +//! for concurrency and parallelism when there is an actual need, not just because we can and it is +//! reasonably safe in Rust. +//! +//! I very much agree with many points in this blog post for example: +//! +//! +//! +//! Another very good post by Pierre (Tomaka): +//! +//! +//! +//! ## Architecture +//! +//! This module helps with this in part. It does not break the multithreaded by default approach, +//! but it breaks the `spawn everything` approach. So once you `spawn` you will still be +//! multithreaded by default, despite that for most tasks we spawn (which just wait for network or some +//! message to arrive), that is very much pointless and needless overhead. You will just spawn less in +//! the first place. +//! +//! By default your code is single threaded, except when actually needed: +//! - need to wait for long running synchronous IO (a threaded runtime is actually useful here) +//! - need to wait for some async event (message to arrive) +//! - need to do some hefty CPU bound processing (a thread is required here as well) +//! +//! and it is not acceptable to block the main task for waiting for the result, because we actually +//! really have other things to do or at least need to stay responsive just in case. +//! +//! With the types and traits in this module you can achieve exactly that: You write modules which +//! just execute logic and can call into the functions of other modules - yes we are calling normal +//! functions. For the case a module you are calling into requires an occasional background task, +//! you provide it with a `NestingSender` that it can pass to any spawned tasks. +//! +//! This way you don't have to spawn a task for each module just for it to be able to handle +//! asynchronous events. The module relies on the using/enclosing code/module to forward it any +//! asynchronous messages in a structured way. +//! +//! What makes this architecture nice is the separation of concerns - at the top you only have to +//! provide a sender and dispatch received messages to the root module - it is completely +//! irrelevant how complex that module is, it might consist of child modules also having the need +//! to spawn and receive messages, which in turn do the same, still the root logic stays unchanged. +//! Everything is isolated to the level where it belongs, while we still keep a single task scope +//! in all non blocking/not CPU intensive parts, which allows us to share data via references for +//! example. +//! +//! Because the wrapping is optional and transparent to the lower modules, each module can also be +//! used at the top directly without any wrapping, e.g. for standalone use or for testing purposes. +//! +//! Checkout the documentation of [`NestingSender`][nesting_sender::NestingSender] below for a basic usage example. For a real +//! world usage I would like to point you to the dispute-distribution subsystem which makes use of +//! this architecture. +//! +//! ## Limitations +//! +//! Nothing is ever for free of course: Each level adds an indirect function call to message +//! sending. which should be cheap enough for most applications, but something to keep in mind. In +//! particular we avoided the use of of async traits, which would have required memory allocations +//! on each send. Also cloning of [`NestingSender`][nesting_sender::NestingSender] is more +//! expensive than cloning a plain mpsc::Sender, the overhead should be negligible though. +//! +//! Further limitations: Because everything is routed to the same channel, it is not possible with +//! this approach to put back pressure on only a single source (as all are the same). If a module +//! has a task that requires this, it indeed has to spawn a long running task which can do the +//! back-pressure on that message source or we make it its own subsystem. This is just one of the +//! situations that justifies the complexity of asynchronism. + +use std::{convert::identity, sync::Arc}; + +use futures::{channel::mpsc, SinkExt}; + +/// A message sender that supports sending nested messages. +/// +/// This sender wraps an `mpsc::Sender` and a conversion function for converting given messages of +/// type `Mnested` to the message type actually supported by the mpsc (`M`). +/// +/// Example: +/// +/// ```rust +/// # use polkadot_node_subsystem_util::nesting_sender::NestingSender; +/// +/// enum RootMessage { +/// Child1Message(ChildMessage), +/// Child2Message(OtherChildMessage), +/// SomeOwnMessage, +/// } +/// +/// enum ChildMessage { +/// TaskFinished(u32), +/// } +/// +/// enum OtherChildMessage { +/// QueryResult(bool), +/// } +/// +/// // We would then pass in a `NestingSender` to our child module of the following type: +/// type ChildSender = NestingSender; +/// +/// // Types in the child module can (and should) be generic over the root type: +/// struct ChildState { +/// tx: NestingSender, +/// } +/// +/// +/// // Create the root message sender: +/// +/// let (root_sender, receiver) = NestingSender::new_root(1); +/// // Get a sender for the child module based on that root sender: +/// let child_sender = NestingSender::new(root_sender.clone(), RootMessage::Child1Message); +/// // pass `child_sender` to child module ... +/// ``` +/// +/// `ChildMessage` could itself have a constructor with messages of a child of its own and can use +/// `NestingSender::new` with its own sender and a conversion function to provide a further nested +/// sender, suitable for the child module. +pub struct NestingSender { + sender: mpsc::Sender, + conversion: Arc M + 'static + Send + Sync>, +} + +impl NestingSender +where + M: 'static, +{ + /// Create a new "root" sender. + /// + /// This is a sender that directly passes messages to the internal mpsc. + /// + /// Params: The channel size of the created mpsc. + /// Returns: The newly constructed `NestingSender` and the corresponding mpsc receiver. + pub fn new_root(channel_size: usize) -> (Self, mpsc::Receiver) { + let (sender, receiver) = mpsc::channel(channel_size); + let s = Self { sender, conversion: Arc::new(identity) }; + (s, receiver) + } +} + +impl NestingSender +where + M: 'static, + Mnested: 'static, +{ + /// Create a new `NestingSender` which wraps a given "parent" sender. + /// + /// By passing in a necessary conversion from `Mnested` to `Mparent` (the `Mnested` of the + /// parent sender), we can construct a derived `NestingSender` from a + /// `NestingSender`. + /// + /// Resulting sender does the following conversion: + /// + /// ```text + /// Mnested -> Mparent -> M + /// Inputs: + /// F(Mparent) -> M (via parent) + /// F(Mnested) -> Mparent (via child_conversion) + /// Result: F(Mnested) -> M + /// ``` + pub fn new( + parent: NestingSender, + child_conversion: fn(Mnested) -> Mparent, + ) -> Self + where + Mparent: 'static, + { + let NestingSender { sender, conversion } = parent; + Self { sender, conversion: Arc::new(move |x| conversion(child_conversion(x))) } + } + + /// Send a message via the underlying mpsc. + /// + /// Necessary conversion is accomplished. + pub async fn send_message(&mut self, m: Mnested) -> Result<(), mpsc::SendError> { + // Flushing on an mpsc means to wait for the receiver to pick up the data - we don't want + // to wait for that. + self.sender.feed((self.conversion)(m)).await + } +} + +// Helper traits and implementations: + +impl Clone for NestingSender +where + M: 'static, + M1: 'static, +{ + fn clone(&self) -> Self { + Self { sender: self.sender.clone(), conversion: self.conversion.clone() } + } +} diff --git a/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md b/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md index dc01664c295a..3076bbd1380a 100644 --- a/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md +++ b/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md @@ -400,12 +400,12 @@ and only if all of the following conditions are satisfied: Whenever any vote on a dispute is imported these conditions are checked. If the dispute is found not to be potential spam, then spam slots for the disputed candidate hash are cleared. This decrements the spam count for every validator -which had voted invalid. +which had voted invalid. -To keep spam slots from filling up unnecessarily we want to clear spam slots +To keep spam slots from filling up unnecessarily we want to clear spam slots whenever a candidate is seen to be backed or included. Fortunately this behavior is acheived by clearing slots on vote import as described above. Because on chain -backing votes are processed when a block backing the disputed candidate is discovered, spam slots are cleared for every backed candidate. Included +backing votes are processed when a block backing the disputed candidate is discovered, spam slots are cleared for every backed candidate. Included candidates have also been seen as backed on the same fork, so decrementing spam slots is handled in that case as well. @@ -681,12 +681,17 @@ struct State { ``` ### On startup -When the subsystem is initialised it waits for a new leaf (message `OverseerSignal::ActiveLeaves`). -The leaf is used to initialise a `RollingSessionWindow` instance (contains leaf hash and -`DISPUTE_WINDOW` which is a constant. -Next the active disputes are loaded from the DB. The subsystem checks if there are disputes for -which a local statement is not issued. A list of these is passed to the main loop. +When the subsystem is initialised it waits for a new leaf (message +`OverseerSignal::ActiveLeaves`). The leaf is used to initialise a +`RollingSessionWindow` instance (contains leaf hash and `DISPUTE_WINDOW` which +is a constant). + +Next the active disputes are loaded from the DB and initialize spam slots +accordingly, then for each loaded dispute, we either send a +`DisputeDistribution::SendDispute` if there is a local vote from us available or +if there is none and participation is in order, we push the dispute to +participation. ### The main loop diff --git a/roadmap/implementers-guide/src/node/disputes/dispute-distribution.md b/roadmap/implementers-guide/src/node/disputes/dispute-distribution.md index 6b8e5ec03cf4..3a45f53c45d7 100644 --- a/roadmap/implementers-guide/src/node/disputes/dispute-distribution.md +++ b/roadmap/implementers-guide/src/node/disputes/dispute-distribution.md @@ -339,11 +339,8 @@ coordinator. ### Node Startup -On startup we need to check with the dispute coordinator for any ongoing -disputes and assume we have not yet sent our statement for those. In case we -find an explicit statement from ourselves via -`DisputeCoordinatorMessage::QueryCandidateVotes` we will pretend to just have -received a `SendDispute` message for that candidate. +Nothing special happens on node startup. We expect the `dispute-coordinator` to +inform us about any ongoing disputes via `SendDispute` messages. ## Backing and Approval Votes