Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Provisioner should ignore unconfirmed disputes (#6294)
Browse files Browse the repository at this point in the history
* Fix typos

* Filter unconfirmed disputes in provisioner -  random_selection

* Rework dispute coordinator to return `DisputeStatus` with
`ActiveDisputes` message.
* Rework the random_selection implementation of `select_disptues` in
  `provisioner` to return only confirmed disputes.

* Filter unconfirmed disputes in provisioner - prioritized_selection

* Add test for unconfirmed disputes handling

* Fix `dispute-distribution` tests
  • Loading branch information
tdimitrov authored Nov 16, 2022
1 parent a3d25c7 commit c289ace
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 66 deletions.
4 changes: 3 additions & 1 deletion node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,9 @@ impl Initialized {

let _ = tx.send(
get_active_with_status(recent_disputes.into_iter(), now)
.map(|(k, _)| k)
.map(|((session_idx, candidate_hash), dispute_status)| {
(session_idx, candidate_hash, dispute_status)
})
.collect(),
);
},
Expand Down
29 changes: 23 additions & 6 deletions node/core/dispute-coordinator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use futures::{

use polkadot_node_subsystem_util::database::Database;

use polkadot_node_primitives::{SignedDisputeStatement, SignedFullStatement, Statement};
use polkadot_node_primitives::{
DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement,
};
use polkadot_node_subsystem::{
messages::{
ApprovalVotingMessage, ChainApiMessage, DisputeCoordinatorMessage,
Expand Down Expand Up @@ -682,7 +684,10 @@ fn too_many_unconfirmed_statements_are_considered_spam() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down Expand Up @@ -812,7 +817,10 @@ fn approval_vote_import_works() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down Expand Up @@ -934,7 +942,10 @@ fn dispute_gets_confirmed_via_participation() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down Expand Up @@ -1099,7 +1110,10 @@ fn dispute_gets_confirmed_at_byzantine_threshold() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Confirmed)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down Expand Up @@ -1358,7 +1372,10 @@ fn conflicting_votes_lead_to_dispute_participation() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash, DisputeStatus::Active)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ where
onchain.len(),
);

// Filter out unconfirmed disputes. However if the dispute is already onchain - don't skip it.
// In this case we'd better push as much fresh votes as possible to bring it to conclusion faster.
let recent_disputes = recent_disputes
.into_iter()
.filter(|d| d.2.is_confirmed_concluded() || onchain.contains_key(&(d.0, d.1)))
.collect::<Vec<_>>();

let partitioned = partition_recent_disputes(recent_disputes, &onchain);
metrics.on_partition_recent_disputes(&partitioned);

Expand Down
89 changes: 78 additions & 11 deletions node/core/provisioner/src/disputes/prioritized_selection/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ impl TestDisputes {
pub fn add_unconfirmed_disputes_concluded_onchain(
&mut self,
dispute_count: usize,
) -> (u32, usize) {
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let onchain_votes_count = self.validators_count * 80 / 100;
let session_idx = 0;
Expand All @@ -444,7 +444,7 @@ impl TestDisputes {
pub fn add_unconfirmed_disputes_unconcluded_onchain(
&mut self,
dispute_count: usize,
) -> (u32, usize) {
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let onchain_votes_count = self.validators_count * 40 / 100;
let session_idx = 1;
Expand All @@ -459,22 +459,25 @@ impl TestDisputes {
(session_idx, (local_votes_count - onchain_votes_count) * dispute_count)
}

pub fn add_unconfirmed_disputes_unknown_onchain(
pub fn add_confirmed_disputes_unknown_onchain(
&mut self,
dispute_count: usize,
) -> (u32, usize) {
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let session_idx = 2;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active);
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Confirmed);
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
}
(session_idx, local_votes_count * dispute_count)
}

pub fn add_concluded_disputes_known_onchain(&mut self, dispute_count: usize) -> (u32, usize) {
pub fn add_concluded_disputes_known_onchain(
&mut self,
dispute_count: usize,
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let onchain_votes_count = self.validators_count * 75 / 100;
let session_idx = 3;
Expand All @@ -488,7 +491,10 @@ impl TestDisputes {
(session_idx, (local_votes_count - onchain_votes_count) * dispute_count)
}

pub fn add_concluded_disputes_unknown_onchain(&mut self, dispute_count: usize) -> (u32, usize) {
pub fn add_concluded_disputes_unknown_onchain(
&mut self,
dispute_count: usize,
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let session_idx = 4;
let lf = leaf();
Expand All @@ -500,6 +506,40 @@ impl TestDisputes {
(session_idx, local_votes_count * dispute_count)
}

pub fn add_unconfirmed_disputes_known_onchain(
&mut self,
dispute_count: usize,
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 10 / 100;
let onchain_votes_count = self.validators_count * 10 / 100;
let session_idx = 5;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active);
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
self.add_onchain_dispute(d, onchain_votes_count);
}

(session_idx, (local_votes_count - onchain_votes_count) * dispute_count)
}

pub fn add_unconfirmed_disputes_unknown_onchain(
&mut self,
dispute_count: usize,
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 10 / 100;
let session_idx = 6;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active);
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
}

(session_idx, local_votes_count * dispute_count)
}

fn generate_local_votes<T: Clone>(
statement_kind: T,
start_idx: usize,
Expand Down Expand Up @@ -554,9 +594,9 @@ fn normal_flow() {
// concluded disputes known onchain - these should be ignored
let (_, _) = input.add_concluded_disputes_known_onchain(DISPUTES_PER_BATCH);

// active disputes unknown onchain
// confirmed disputes unknown onchain
let (second_idx, second_votes) =
input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_BATCH);
input.add_confirmed_disputes_unknown_onchain(DISPUTES_PER_BATCH);

let metrics = metrics::Metrics::new_dummy();
let mut vote_queries: usize = 0;
Expand Down Expand Up @@ -635,8 +675,8 @@ fn many_batches() {
// concluded disputes known onchain
input.add_concluded_disputes_known_onchain(DISPUTES_PER_PARTITION);

// active disputes unknown onchain
input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION);
// confirmed disputes unknown onchain
input.add_confirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION);

let metrics = metrics::Metrics::new_dummy();
let mut vote_queries: usize = 0;
Expand Down Expand Up @@ -720,3 +760,30 @@ fn votes_above_limit() {
ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT
);
}

#[test]
fn unconfirmed_are_handled_correctly() {
const VALIDATOR_COUNT: usize = 10;
const DISPUTES_PER_PARTITION: usize = 50;

let mut input = TestDisputes::new(VALIDATOR_COUNT);

// Add unconfirmed known onchain -> this should be pushed
let (pushed_idx, _) = input.add_unconfirmed_disputes_known_onchain(DISPUTES_PER_PARTITION);

// Add unconfirmed unknown onchain -> this should be ignored
input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION);

let metrics = metrics::Metrics::new_dummy();
let mut vote_queries: usize = 0;
test_harness(
|r| mock_overseer(r, &mut input, &mut vote_queries),
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let result = select_disputes(&mut tx, &metrics, &lf).await;

assert!(result.len() == DISPUTES_PER_PARTITION);
result.iter().for_each(|d| assert!(d.session == pushed_idx));
},
);
}
64 changes: 24 additions & 40 deletions node/core/provisioner/src/disputes/random_selection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,51 +42,35 @@ enum RequestType {
}

/// Request open disputes identified by `CandidateHash` and the `SessionIndex`.
async fn request_disputes(
/// Returns only confirmed/concluded disputes. The rest are filtered out.
async fn request_confirmed_disputes(
sender: &mut impl overseer::ProvisionerSenderTrait,
active_or_recent: RequestType,
) -> Vec<(SessionIndex, CandidateHash)> {
let disputes = match active_or_recent {
RequestType::Recent => {
let (tx, rx) = oneshot::channel();
let msg = DisputeCoordinatorMessage::RecentDisputes(tx);
sender.send_unbounded_message(msg);
let recent_disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Channel closed: unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};
recent_disputes
.into_iter()
.map(|(sesion_idx, candodate_hash, _)| (sesion_idx, candodate_hash))
.collect::<Vec<_>>()
},
RequestType::Active => {
let (tx, rx) = oneshot::channel();
let msg = DisputeCoordinatorMessage::ActiveDisputes(tx);
sender.send_unbounded_message(msg);
let active_disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};
active_disputes
let (tx, rx) = oneshot::channel();
let msg = match active_or_recent {
RequestType::Recent => DisputeCoordinatorMessage::RecentDisputes(tx),
RequestType::Active => DisputeCoordinatorMessage::ActiveDisputes(tx),
};

sender.send_unbounded_message(msg);
let disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Channel closed: unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};

disputes
.into_iter()
.filter(|d| d.2.is_confirmed_concluded())
.map(|d| (d.0, d.1))
.collect()
}

/// Extend `acc` by `n` random, picks of not-yet-present in `acc` items of `recent` without repetition and additions of recent.
Expand Down Expand Up @@ -132,15 +116,15 @@ where
// In case of an overload condition, we limit ourselves to active disputes, and fill up to the
// upper bound of disputes to pass to wasm `fn create_inherent_data`.
// If the active ones are already exceeding the bounds, randomly select a subset.
let recent = request_disputes(sender, RequestType::Recent).await;
let recent = request_confirmed_disputes(sender, RequestType::Recent).await;
let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
gum::warn!(
target: LOG_TARGET,
"Recent disputes are excessive ({} > {}), reduce to active ones, and selected",
recent.len(),
MAX_DISPUTES_FORWARDED_TO_RUNTIME
);
let mut active = request_disputes(sender, RequestType::Active).await;
let mut active = request_confirmed_disputes(sender, RequestType::Active).await;
let n_active = active.len();
let active = if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
let mut picked = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME);
Expand Down
4 changes: 3 additions & 1 deletion node/network/dispute-distribution/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,9 @@ async fn get_active_disputes<Context>(

// Caller scope is in `update_leaves` and this is bounded by fork count.
ctx.send_unbounded_message(DisputeCoordinatorMessage::ActiveDisputes(tx));
rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled)
rx.await
.map_err(|_| JfyiError::AskActiveDisputesCanceled)
.map(|disputes| disputes.into_iter().map(|d| (d.0, d.1)).collect())
}

/// Get all locally available dispute votes for a given dispute.
Expand Down
Loading

0 comments on commit c289ace

Please sign in to comment.