From a5baf2249795da4823d8cc0db5a70f3a15595228 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 15 Nov 2022 14:26:04 +0200 Subject: [PATCH 1/5] Fix typos --- node/core/provisioner/src/disputes/random_selection/mod.rs | 2 +- node/subsystem-types/src/messages.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/node/core/provisioner/src/disputes/random_selection/mod.rs b/node/core/provisioner/src/disputes/random_selection/mod.rs index a25d3445ac6e..4d7f706cc39e 100644 --- a/node/core/provisioner/src/disputes/random_selection/mod.rs +++ b/node/core/provisioner/src/disputes/random_selection/mod.rs @@ -64,7 +64,7 @@ async fn request_disputes( }; recent_disputes .into_iter() - .map(|(sesion_idx, candodate_hash, _)| (sesion_idx, candodate_hash)) + .map(|(session_idx, candidate_hash, _)| (session_idx, candidate_hash)) .collect::>() }, RequestType::Active => { diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 6e4983813984..23f41b8537d9 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -268,7 +268,7 @@ pub enum DisputeCoordinatorMessage { /// - or the imported statements are backing/approval votes, which are always accepted. pending_confirmation: Option>, }, - /// Fetch a list of all recent disputes the co-ordinator is aware of. + /// Fetch a list of all recent disputes the coordinator is aware of. /// These are disputes which have occurred any time in recent sessions, /// and which may have already concluded. RecentDisputes(oneshot::Sender>), From 2a0d5982d11ba1eb45ebcd6a0336c9c0f44b7895 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 15 Nov 2022 15:52:04 +0200 Subject: [PATCH 2/5] Filter unconfirmed disputes in provisioner - random_selection * Rework dispute coordinator to return `DisputeStatus` with `ActiveDisputes` message. * Rework the random_selection implementation of `select_disptues` in `provisioner` to return only confirmed disputes. --- .../dispute-coordinator/src/initialized.rs | 4 +- node/core/dispute-coordinator/src/tests.rs | 29 +++++++-- .../src/disputes/random_selection/mod.rs | 64 +++++++------------ .../dispute-distribution/src/sender/mod.rs | 4 +- node/subsystem-types/src/messages.rs | 2 +- 5 files changed, 54 insertions(+), 49 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 0df1a620826c..ab9faca39868 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -617,7 +617,9 @@ impl Initialized { let _ = tx.send( get_active_with_status(recent_disputes.into_iter(), now) - .map(|(k, _)| k) + .map(|((session_idx, candidate_hash), dispute_status)| { + (session_idx, candidate_hash, dispute_status) + }) .collect(), ); }, diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index b2f779041a4c..d44f46ec3442 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -31,7 +31,9 @@ use futures::{ use polkadot_node_subsystem_util::database::Database; -use polkadot_node_primitives::{SignedDisputeStatement, SignedFullStatement, Statement}; +use polkadot_node_primitives::{ + DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement, +}; use polkadot_node_subsystem::{ messages::{ ApprovalVotingMessage, ChainApiMessage, DisputeCoordinatorMessage, @@ -682,7 +684,10 @@ fn too_many_unconfirmed_statements_are_considered_spam() { }) .await; - assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]); + assert_eq!( + rx.await.unwrap(), + vec![(session, candidate_hash1, DisputeStatus::Active)] + ); let (tx, rx) = oneshot::channel(); virtual_overseer @@ -812,7 +817,10 @@ fn approval_vote_import_works() { }) .await; - assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]); + assert_eq!( + rx.await.unwrap(), + vec![(session, candidate_hash1, DisputeStatus::Active)] + ); let (tx, rx) = oneshot::channel(); virtual_overseer @@ -934,7 +942,10 @@ fn dispute_gets_confirmed_via_participation() { }) .await; - assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]); + assert_eq!( + rx.await.unwrap(), + vec![(session, candidate_hash1, DisputeStatus::Active)] + ); let (tx, rx) = oneshot::channel(); virtual_overseer @@ -1099,7 +1110,10 @@ fn dispute_gets_confirmed_at_byzantine_threshold() { }) .await; - assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]); + assert_eq!( + rx.await.unwrap(), + vec![(session, candidate_hash1, DisputeStatus::Confirmed)] + ); let (tx, rx) = oneshot::channel(); virtual_overseer @@ -1358,7 +1372,10 @@ fn conflicting_votes_lead_to_dispute_participation() { }) .await; - assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash)]); + assert_eq!( + rx.await.unwrap(), + vec![(session, candidate_hash, DisputeStatus::Active)] + ); let (tx, rx) = oneshot::channel(); virtual_overseer diff --git a/node/core/provisioner/src/disputes/random_selection/mod.rs b/node/core/provisioner/src/disputes/random_selection/mod.rs index 4d7f706cc39e..9a827475aa55 100644 --- a/node/core/provisioner/src/disputes/random_selection/mod.rs +++ b/node/core/provisioner/src/disputes/random_selection/mod.rs @@ -42,51 +42,35 @@ enum RequestType { } /// Request open disputes identified by `CandidateHash` and the `SessionIndex`. -async fn request_disputes( +/// Returns only confirmed/concluded disputes. The rest are filtered out. +async fn request_confirmed_disputes( sender: &mut impl overseer::ProvisionerSenderTrait, active_or_recent: RequestType, ) -> Vec<(SessionIndex, CandidateHash)> { - let disputes = match active_or_recent { - RequestType::Recent => { - let (tx, rx) = oneshot::channel(); - let msg = DisputeCoordinatorMessage::RecentDisputes(tx); - sender.send_unbounded_message(msg); - let recent_disputes = match rx.await { - Ok(r) => r, - Err(oneshot::Canceled) => { - gum::warn!( - target: LOG_TARGET, - "Channel closed: unable to gather {:?} disputes", - active_or_recent - ); - Vec::new() - }, - }; - recent_disputes - .into_iter() - .map(|(session_idx, candidate_hash, _)| (session_idx, candidate_hash)) - .collect::>() - }, - RequestType::Active => { - let (tx, rx) = oneshot::channel(); - let msg = DisputeCoordinatorMessage::ActiveDisputes(tx); - sender.send_unbounded_message(msg); - let active_disputes = match rx.await { - Ok(r) => r, - Err(oneshot::Canceled) => { - gum::warn!( - target: LOG_TARGET, - "Unable to gather {:?} disputes", - active_or_recent - ); - Vec::new() - }, - }; - active_disputes + let (tx, rx) = oneshot::channel(); + let msg = match active_or_recent { + RequestType::Recent => DisputeCoordinatorMessage::RecentDisputes(tx), + RequestType::Active => DisputeCoordinatorMessage::ActiveDisputes(tx), + }; + + sender.send_unbounded_message(msg); + let disputes = match rx.await { + Ok(r) => r, + Err(oneshot::Canceled) => { + gum::warn!( + target: LOG_TARGET, + "Channel closed: unable to gather {:?} disputes", + active_or_recent + ); + Vec::new() }, }; disputes + .into_iter() + .filter(|d| d.2.is_confirmed_concluded()) + .map(|d| (d.0, d.1)) + .collect() } /// Extend `acc` by `n` random, picks of not-yet-present in `acc` items of `recent` without repetition and additions of recent. @@ -132,7 +116,7 @@ where // In case of an overload condition, we limit ourselves to active disputes, and fill up to the // upper bound of disputes to pass to wasm `fn create_inherent_data`. // If the active ones are already exceeding the bounds, randomly select a subset. - let recent = request_disputes(sender, RequestType::Recent).await; + let recent = request_confirmed_disputes(sender, RequestType::Recent).await; let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME { gum::warn!( target: LOG_TARGET, @@ -140,7 +124,7 @@ where recent.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME ); - let mut active = request_disputes(sender, RequestType::Active).await; + let mut active = request_confirmed_disputes(sender, RequestType::Active).await; let n_active = active.len(); let active = if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME { let mut picked = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME); diff --git a/node/network/dispute-distribution/src/sender/mod.rs b/node/network/dispute-distribution/src/sender/mod.rs index 331fe12c61e2..64299e52bf23 100644 --- a/node/network/dispute-distribution/src/sender/mod.rs +++ b/node/network/dispute-distribution/src/sender/mod.rs @@ -430,7 +430,9 @@ async fn get_active_disputes( // Caller scope is in `update_leaves` and this is bounded by fork count. ctx.send_unbounded_message(DisputeCoordinatorMessage::ActiveDisputes(tx)); - rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled) + rx.await + .map_err(|_| JfyiError::AskActiveDisputesCanceled) + .map(|disputes| disputes.into_iter().map(|d| (d.0, d.1)).collect()) } /// Get all locally available dispute votes for a given dispute. diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 23f41b8537d9..cb7caebcaa23 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -274,7 +274,7 @@ pub enum DisputeCoordinatorMessage { RecentDisputes(oneshot::Sender>), /// Fetch a list of all active disputes that the coordinator is aware of. /// These disputes are either not yet concluded or recently concluded. - ActiveDisputes(oneshot::Sender>), + ActiveDisputes(oneshot::Sender>), /// Get candidate votes for a candidate. QueryCandidateVotes( Vec<(SessionIndex, CandidateHash)>, From af9ca8f928c9b5a6b772cc664e0fc514e841c26d Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 15 Nov 2022 16:37:25 +0200 Subject: [PATCH 3/5] Filter unconfirmed disputes in provisioner - prioritized_selection --- .../src/disputes/prioritized_selection/mod.rs | 7 +++++++ .../src/disputes/prioritized_selection/tests.rs | 15 ++++++--------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/node/core/provisioner/src/disputes/prioritized_selection/mod.rs b/node/core/provisioner/src/disputes/prioritized_selection/mod.rs index 07426ef1a75b..dcfcd0d1c2f0 100644 --- a/node/core/provisioner/src/disputes/prioritized_selection/mod.rs +++ b/node/core/provisioner/src/disputes/prioritized_selection/mod.rs @@ -139,6 +139,13 @@ where onchain.len(), ); + // Filter out unconfirmed disputes. However if the dispute is already onchain - don't skip it. + // In this case we'd better push as much fresh votes as possible to bring it to conclusion faster. + let recent_disputes = recent_disputes + .into_iter() + .filter(|d| d.2.is_confirmed_concluded() || onchain.contains_key(&(d.0, d.1))) + .collect::>(); + let partitioned = partition_recent_disputes(recent_disputes, &onchain); metrics.on_partition_recent_disputes(&partitioned); diff --git a/node/core/provisioner/src/disputes/prioritized_selection/tests.rs b/node/core/provisioner/src/disputes/prioritized_selection/tests.rs index f76107dc65d4..0a83f89d5712 100644 --- a/node/core/provisioner/src/disputes/prioritized_selection/tests.rs +++ b/node/core/provisioner/src/disputes/prioritized_selection/tests.rs @@ -459,16 +459,13 @@ impl TestDisputes { (session_idx, (local_votes_count - onchain_votes_count) * dispute_count) } - pub fn add_unconfirmed_disputes_unknown_onchain( - &mut self, - dispute_count: usize, - ) -> (u32, usize) { + pub fn add_confirmed_disputes_unknown_onchain(&mut self, dispute_count: usize) -> (u32, usize) { let local_votes_count = self.validators_count * 90 / 100; let session_idx = 2; let lf = leaf(); let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone()); for _ in 0..dispute_count { - let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active); + let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Confirmed); self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone()); } (session_idx, local_votes_count * dispute_count) @@ -554,9 +551,9 @@ fn normal_flow() { // concluded disputes known onchain - these should be ignored let (_, _) = input.add_concluded_disputes_known_onchain(DISPUTES_PER_BATCH); - // active disputes unknown onchain + // confirmed disputes unknown onchain let (second_idx, second_votes) = - input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_BATCH); + input.add_confirmed_disputes_unknown_onchain(DISPUTES_PER_BATCH); let metrics = metrics::Metrics::new_dummy(); let mut vote_queries: usize = 0; @@ -635,8 +632,8 @@ fn many_batches() { // concluded disputes known onchain input.add_concluded_disputes_known_onchain(DISPUTES_PER_PARTITION); - // active disputes unknown onchain - input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION); + // confirmed disputes unknown onchain + input.add_confirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION); let metrics = metrics::Metrics::new_dummy(); let mut vote_queries: usize = 0; From 25d8de666667239ddeaacc6a9d18b8386499b827 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 15 Nov 2022 17:39:01 +0200 Subject: [PATCH 4/5] Add test for unconfirmed disputes handling --- .../disputes/prioritized_selection/tests.rs | 80 +++++++++++++++++-- 1 file changed, 75 insertions(+), 5 deletions(-) diff --git a/node/core/provisioner/src/disputes/prioritized_selection/tests.rs b/node/core/provisioner/src/disputes/prioritized_selection/tests.rs index 0a83f89d5712..1f8d4f180263 100644 --- a/node/core/provisioner/src/disputes/prioritized_selection/tests.rs +++ b/node/core/provisioner/src/disputes/prioritized_selection/tests.rs @@ -426,7 +426,7 @@ impl TestDisputes { pub fn add_unconfirmed_disputes_concluded_onchain( &mut self, dispute_count: usize, - ) -> (u32, usize) { + ) -> (SessionIndex, usize) { let local_votes_count = self.validators_count * 90 / 100; let onchain_votes_count = self.validators_count * 80 / 100; let session_idx = 0; @@ -444,7 +444,7 @@ impl TestDisputes { pub fn add_unconfirmed_disputes_unconcluded_onchain( &mut self, dispute_count: usize, - ) -> (u32, usize) { + ) -> (SessionIndex, usize) { let local_votes_count = self.validators_count * 90 / 100; let onchain_votes_count = self.validators_count * 40 / 100; let session_idx = 1; @@ -459,7 +459,10 @@ impl TestDisputes { (session_idx, (local_votes_count - onchain_votes_count) * dispute_count) } - pub fn add_confirmed_disputes_unknown_onchain(&mut self, dispute_count: usize) -> (u32, usize) { + pub fn add_confirmed_disputes_unknown_onchain( + &mut self, + dispute_count: usize, + ) -> (SessionIndex, usize) { let local_votes_count = self.validators_count * 90 / 100; let session_idx = 2; let lf = leaf(); @@ -471,7 +474,10 @@ impl TestDisputes { (session_idx, local_votes_count * dispute_count) } - pub fn add_concluded_disputes_known_onchain(&mut self, dispute_count: usize) -> (u32, usize) { + pub fn add_concluded_disputes_known_onchain( + &mut self, + dispute_count: usize, + ) -> (SessionIndex, usize) { let local_votes_count = self.validators_count * 90 / 100; let onchain_votes_count = self.validators_count * 75 / 100; let session_idx = 3; @@ -485,7 +491,10 @@ impl TestDisputes { (session_idx, (local_votes_count - onchain_votes_count) * dispute_count) } - pub fn add_concluded_disputes_unknown_onchain(&mut self, dispute_count: usize) -> (u32, usize) { + pub fn add_concluded_disputes_unknown_onchain( + &mut self, + dispute_count: usize, + ) -> (SessionIndex, usize) { let local_votes_count = self.validators_count * 90 / 100; let session_idx = 4; let lf = leaf(); @@ -497,6 +506,40 @@ impl TestDisputes { (session_idx, local_votes_count * dispute_count) } + pub fn add_unconfirmed_disputes_known_onchain( + &mut self, + dispute_count: usize, + ) -> (SessionIndex, usize) { + let local_votes_count = self.validators_count * 10 / 100; + let onchain_votes_count = self.validators_count * 10 / 100; + let session_idx = 5; + let lf = leaf(); + let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone()); + for _ in 0..dispute_count { + let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active); + self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone()); + self.add_onchain_dispute(d, onchain_votes_count); + } + + (session_idx, (local_votes_count - onchain_votes_count) * dispute_count) + } + + pub fn add_unconfirmed_disputes_unknown_onchain( + &mut self, + dispute_count: usize, + ) -> (SessionIndex, usize) { + let local_votes_count = self.validators_count * 10 / 100; + let session_idx = 6; + let lf = leaf(); + let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone()); + for _ in 0..dispute_count { + let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active); + self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone()); + } + + (session_idx, local_votes_count * dispute_count) + } + fn generate_local_votes( statement_kind: T, start_idx: usize, @@ -717,3 +760,30 @@ fn votes_above_limit() { ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT ); } + +#[test] +fn unconfirmed_are_handled_correctly() { + const VALIDATOR_COUNT: usize = 10; + const DISPUTES_PER_PARTITION: usize = 50; + + let mut input = TestDisputes::new(VALIDATOR_COUNT); + + // Add unconfirmed known onchain -> this should be pushed + let (pushed_idx, _) = input.add_unconfirmed_disputes_known_onchain(DISPUTES_PER_PARTITION); + + // Add unconfirmed unknown onchain -> this should be ignored + input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION); + + let metrics = metrics::Metrics::new_dummy(); + let mut vote_queries: usize = 0; + test_harness( + |r| mock_overseer(r, &mut input, &mut vote_queries), + |mut tx: TestSubsystemSender| async move { + let lf = leaf(); + let result = select_disputes(&mut tx, &metrics, &lf).await; + + assert!(result.len() == DISPUTES_PER_PARTITION); + result.iter().for_each(|d| assert!(d.session == pushed_idx)); + }, + ); +} From b1f5e60638da8205619493a28bdfa2fa9983ae61 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 16 Nov 2022 15:04:57 +0200 Subject: [PATCH 5/5] Fix `dispute-distribution` tests --- node/network/dispute-distribution/src/tests/mod.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/node/network/dispute-distribution/src/tests/mod.rs b/node/network/dispute-distribution/src/tests/mod.rs index 56cdd467fd62..d6381239965b 100644 --- a/node/network/dispute-distribution/src/tests/mod.rs +++ b/node/network/dispute-distribution/src/tests/mod.rs @@ -45,7 +45,7 @@ use polkadot_node_network_protocol::{ request_response::{v1::DisputeResponse, Recipient, Requests}, IfDisconnected, }; -use polkadot_node_primitives::{CandidateVotes, UncheckedDisputeMessage}; +use polkadot_node_primitives::{CandidateVotes, DisputeStatus, UncheckedDisputeMessage}; use polkadot_node_subsystem::{ messages::{ AllMessages, DisputeCoordinatorMessage, DisputeDistributionMessage, ImportStatementsResult, @@ -658,7 +658,7 @@ fn dispute_retries_and_works_across_session_boundaries() { Some(old_head), MOCK_SESSION_INDEX, None, - vec![(MOCK_SESSION_INDEX, candidate.hash())], + vec![(MOCK_SESSION_INDEX, candidate.hash(), DisputeStatus::Active)], ) .await; @@ -673,7 +673,7 @@ fn dispute_retries_and_works_across_session_boundaries() { Some(old_head2), MOCK_NEXT_SESSION_INDEX, Some(MOCK_NEXT_SESSION_INFO.clone()), - vec![(MOCK_SESSION_INDEX, candidate.hash())], + vec![(MOCK_SESSION_INDEX, candidate.hash(), DisputeStatus::Active)], ) .await; @@ -832,7 +832,7 @@ async fn activate_leaf( // New session if we expect the subsystem to request it. new_session: Option, // Currently active disputes to send to the subsystem. - active_disputes: Vec<(SessionIndex, CandidateHash)>, + active_disputes: Vec<(SessionIndex, CandidateHash, DisputeStatus)>, ) { let has_active_disputes = !active_disputes.is_empty(); handle @@ -934,7 +934,10 @@ async fn handle_subsystem_startup( None, MOCK_SESSION_INDEX, Some(MOCK_SESSION_INFO.clone()), - ongoing_dispute.into_iter().map(|c| (MOCK_SESSION_INDEX, c)).collect(), + ongoing_dispute + .into_iter() + .map(|c| (MOCK_SESSION_INDEX, c, DisputeStatus::Active)) + .collect(), ) .await; relay_parent