Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Provisioner should ignore unconfirmed disputes #6294

Merged
merged 5 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,9 @@ impl Initialized {

let _ = tx.send(
get_active_with_status(recent_disputes.into_iter(), now)
.map(|(k, _)| k)
.map(|((session_idx, candidate_hash), dispute_status)| {
(session_idx, candidate_hash, dispute_status)
})
.collect(),
);
},
Expand Down
29 changes: 23 additions & 6 deletions node/core/dispute-coordinator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use futures::{

use polkadot_node_subsystem_util::database::Database;

use polkadot_node_primitives::{SignedDisputeStatement, SignedFullStatement, Statement};
use polkadot_node_primitives::{
DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement,
};
use polkadot_node_subsystem::{
messages::{
ApprovalVotingMessage, ChainApiMessage, DisputeCoordinatorMessage,
Expand Down Expand Up @@ -682,7 +684,10 @@ fn too_many_unconfirmed_statements_are_considered_spam() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down Expand Up @@ -812,7 +817,10 @@ fn approval_vote_import_works() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down Expand Up @@ -934,7 +942,10 @@ fn dispute_gets_confirmed_via_participation() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Active)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down Expand Up @@ -1099,7 +1110,10 @@ fn dispute_gets_confirmed_at_byzantine_threshold() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash1)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash1, DisputeStatus::Confirmed)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down Expand Up @@ -1358,7 +1372,10 @@ fn conflicting_votes_lead_to_dispute_participation() {
})
.await;

assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash)]);
assert_eq!(
rx.await.unwrap(),
vec![(session, candidate_hash, DisputeStatus::Active)]
);

let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ where
onchain.len(),
);

// Filter out unconfirmed disputes. However if the dispute is already onchain - don't skip it.
// In this case we'd better push as much fresh votes as possible to bring it to conclusion faster.
let recent_disputes = recent_disputes
.into_iter()
.filter(|d| d.2.is_confirmed_concluded() || onchain.contains_key(&(d.0, d.1)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Good reasoning. If it is already on chain, we also know it is good.

.collect::<Vec<_>>();

let partitioned = partition_recent_disputes(recent_disputes, &onchain);
metrics.on_partition_recent_disputes(&partitioned);

Expand Down
89 changes: 78 additions & 11 deletions node/core/provisioner/src/disputes/prioritized_selection/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ impl TestDisputes {
pub fn add_unconfirmed_disputes_concluded_onchain(
&mut self,
dispute_count: usize,
) -> (u32, usize) {
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let onchain_votes_count = self.validators_count * 80 / 100;
let session_idx = 0;
Expand All @@ -444,7 +444,7 @@ impl TestDisputes {
pub fn add_unconfirmed_disputes_unconcluded_onchain(
&mut self,
dispute_count: usize,
) -> (u32, usize) {
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let onchain_votes_count = self.validators_count * 40 / 100;
let session_idx = 1;
Expand All @@ -459,22 +459,25 @@ impl TestDisputes {
(session_idx, (local_votes_count - onchain_votes_count) * dispute_count)
}

pub fn add_unconfirmed_disputes_unknown_onchain(
pub fn add_confirmed_disputes_unknown_onchain(
&mut self,
dispute_count: usize,
) -> (u32, usize) {
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let session_idx = 2;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active);
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Confirmed);
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
}
(session_idx, local_votes_count * dispute_count)
}

pub fn add_concluded_disputes_known_onchain(&mut self, dispute_count: usize) -> (u32, usize) {
pub fn add_concluded_disputes_known_onchain(
&mut self,
dispute_count: usize,
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let onchain_votes_count = self.validators_count * 75 / 100;
let session_idx = 3;
Expand All @@ -488,7 +491,10 @@ impl TestDisputes {
(session_idx, (local_votes_count - onchain_votes_count) * dispute_count)
}

pub fn add_concluded_disputes_unknown_onchain(&mut self, dispute_count: usize) -> (u32, usize) {
pub fn add_concluded_disputes_unknown_onchain(
&mut self,
dispute_count: usize,
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 90 / 100;
let session_idx = 4;
let lf = leaf();
Expand All @@ -500,6 +506,40 @@ impl TestDisputes {
(session_idx, local_votes_count * dispute_count)
}

pub fn add_unconfirmed_disputes_known_onchain(
&mut self,
dispute_count: usize,
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 10 / 100;
let onchain_votes_count = self.validators_count * 10 / 100;
let session_idx = 5;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active);
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
self.add_onchain_dispute(d, onchain_votes_count);
}

(session_idx, (local_votes_count - onchain_votes_count) * dispute_count)
}

pub fn add_unconfirmed_disputes_unknown_onchain(
&mut self,
dispute_count: usize,
) -> (SessionIndex, usize) {
let local_votes_count = self.validators_count * 10 / 100;
let session_idx = 6;
let lf = leaf();
let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone());
for _ in 0..dispute_count {
let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active);
self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone());
}

(session_idx, local_votes_count * dispute_count)
}

fn generate_local_votes<T: Clone>(
statement_kind: T,
start_idx: usize,
Expand Down Expand Up @@ -554,9 +594,9 @@ fn normal_flow() {
// concluded disputes known onchain - these should be ignored
let (_, _) = input.add_concluded_disputes_known_onchain(DISPUTES_PER_BATCH);

// active disputes unknown onchain
// confirmed disputes unknown onchain
let (second_idx, second_votes) =
input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_BATCH);
input.add_confirmed_disputes_unknown_onchain(DISPUTES_PER_BATCH);

let metrics = metrics::Metrics::new_dummy();
let mut vote_queries: usize = 0;
Expand Down Expand Up @@ -635,8 +675,8 @@ fn many_batches() {
// concluded disputes known onchain
input.add_concluded_disputes_known_onchain(DISPUTES_PER_PARTITION);

// active disputes unknown onchain
input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION);
// confirmed disputes unknown onchain
input.add_confirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION);

let metrics = metrics::Metrics::new_dummy();
let mut vote_queries: usize = 0;
Expand Down Expand Up @@ -720,3 +760,30 @@ fn votes_above_limit() {
ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT
);
}

#[test]
fn unconfirmed_are_handled_correctly() {
const VALIDATOR_COUNT: usize = 10;
const DISPUTES_PER_PARTITION: usize = 50;

let mut input = TestDisputes::new(VALIDATOR_COUNT);

// Add unconfirmed known onchain -> this should be pushed
let (pushed_idx, _) = input.add_unconfirmed_disputes_known_onchain(DISPUTES_PER_PARTITION);

// Add unconfirmed unknown onchain -> this should be ignored
input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION);

let metrics = metrics::Metrics::new_dummy();
let mut vote_queries: usize = 0;
test_harness(
|r| mock_overseer(r, &mut input, &mut vote_queries),
|mut tx: TestSubsystemSender| async move {
let lf = leaf();
let result = select_disputes(&mut tx, &metrics, &lf).await;

assert!(result.len() == DISPUTES_PER_PARTITION);
result.iter().for_each(|d| assert!(d.session == pushed_idx));
},
);
}
64 changes: 24 additions & 40 deletions node/core/provisioner/src/disputes/random_selection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,51 +42,35 @@ enum RequestType {
}

/// Request open disputes identified by `CandidateHash` and the `SessionIndex`.
async fn request_disputes(
/// Returns only confirmed/concluded disputes. The rest are filtered out.
async fn request_confirmed_disputes(
sender: &mut impl overseer::ProvisionerSenderTrait,
active_or_recent: RequestType,
) -> Vec<(SessionIndex, CandidateHash)> {
let disputes = match active_or_recent {
RequestType::Recent => {
let (tx, rx) = oneshot::channel();
let msg = DisputeCoordinatorMessage::RecentDisputes(tx);
sender.send_unbounded_message(msg);
let recent_disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Channel closed: unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};
recent_disputes
.into_iter()
.map(|(sesion_idx, candodate_hash, _)| (sesion_idx, candodate_hash))
.collect::<Vec<_>>()
},
RequestType::Active => {
let (tx, rx) = oneshot::channel();
let msg = DisputeCoordinatorMessage::ActiveDisputes(tx);
sender.send_unbounded_message(msg);
let active_disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};
active_disputes
let (tx, rx) = oneshot::channel();
let msg = match active_or_recent {
RequestType::Recent => DisputeCoordinatorMessage::RecentDisputes(tx),
RequestType::Active => DisputeCoordinatorMessage::ActiveDisputes(tx),
};

sender.send_unbounded_message(msg);
let disputes = match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Channel closed: unable to gather {:?} disputes",
active_or_recent
);
Vec::new()
},
};

disputes
.into_iter()
.filter(|d| d.2.is_confirmed_concluded())
.map(|d| (d.0, d.1))
.collect()
}

/// Extend `acc` by `n` random, picks of not-yet-present in `acc` items of `recent` without repetition and additions of recent.
Expand Down Expand Up @@ -132,15 +116,15 @@ where
// In case of an overload condition, we limit ourselves to active disputes, and fill up to the
// upper bound of disputes to pass to wasm `fn create_inherent_data`.
// If the active ones are already exceeding the bounds, randomly select a subset.
let recent = request_disputes(sender, RequestType::Recent).await;
let recent = request_confirmed_disputes(sender, RequestType::Recent).await;
let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
gum::warn!(
target: LOG_TARGET,
"Recent disputes are excessive ({} > {}), reduce to active ones, and selected",
recent.len(),
MAX_DISPUTES_FORWARDED_TO_RUNTIME
);
let mut active = request_disputes(sender, RequestType::Active).await;
let mut active = request_confirmed_disputes(sender, RequestType::Active).await;
let n_active = active.len();
let active = if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME {
let mut picked = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME);
Expand Down
4 changes: 3 additions & 1 deletion node/network/dispute-distribution/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,9 @@ async fn get_active_disputes<Context>(

// Caller scope is in `update_leaves` and this is bounded by fork count.
ctx.send_unbounded_message(DisputeCoordinatorMessage::ActiveDisputes(tx));
rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled)
rx.await
.map_err(|_| JfyiError::AskActiveDisputesCanceled)
.map(|disputes| disputes.into_iter().map(|d| (d.0, d.1)).collect())
}

/// Get all locally available dispute votes for a given dispute.
Expand Down
4 changes: 2 additions & 2 deletions node/subsystem-types/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,13 @@ pub enum DisputeCoordinatorMessage {
/// - or the imported statements are backing/approval votes, which are always accepted.
pending_confirmation: Option<oneshot::Sender<ImportStatementsResult>>,
},
/// Fetch a list of all recent disputes the co-ordinator is aware of.
/// Fetch a list of all recent disputes the coordinator is aware of.
/// These are disputes which have occurred any time in recent sessions,
/// and which may have already concluded.
RecentDisputes(oneshot::Sender<Vec<(SessionIndex, CandidateHash, DisputeStatus)>>),
/// Fetch a list of all active disputes that the coordinator is aware of.
/// These disputes are either not yet concluded or recently concluded.
ActiveDisputes(oneshot::Sender<Vec<(SessionIndex, CandidateHash)>>),
ActiveDisputes(oneshot::Sender<Vec<(SessionIndex, CandidateHash, DisputeStatus)>>),
/// Get candidate votes for a candidate.
QueryCandidateVotes(
Vec<(SessionIndex, CandidateHash)>,
Expand Down