Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Update dispute participation on active leaves update (#6303)
Browse files Browse the repository at this point in the history
* Passed candidate events from scraper to participation

* First draft PR 5875

* Added support for timestamp in changes

* Some necessary refactoring

* Removed SessionIndex from unconfirmed_disputes key

* Removed duplicate logic in import statements

* Replaced queue_participation call with re-prio

* Simplifying refactor. Backed were already handled

* Removed unneeded spam slots logic

* Implementers guide edits

* Undid the spam slots refactor

* Added comments and implementers guide edit

* Added test for participation upon backing

* Round of fixes + ran fmt

* Round of changes + fmt

* Error handling draft

* Changed errors to bubble up from reprioritization

* Starting to construct new test

* Clarifying participation function rename

* Reprio test draft

* Very rough bump to priority queue test draft

* Improving logging

* Most concise reproduction of error on third import

* Add `handle_approval_vote_request`

* Removing reprioritization on included event test

* Removing unneeded test config

* cargo fmt

* Test works

* Fixing final nits

* Tweaks to test Tsveto figured out

Co-authored-by: eskimor <eskimor@no-such-url.com>
Co-authored-by: Tsvetomir Dimitrov <tsvetomir@parity.io>
  • Loading branch information
3 people authored Dec 30, 2022
1 parent 5fe334d commit db498ce
Show file tree
Hide file tree
Showing 6 changed files with 435 additions and 36 deletions.
35 changes: 22 additions & 13 deletions node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,13 @@ impl Initialized {
update: ActiveLeavesUpdate,
now: u64,
) -> Result<()> {
let on_chain_votes =
let scraped_updates =
self.scraper.process_active_leaves_update(ctx.sender(), &update).await?;
log_error(
self.participation
.bump_to_priority_for_candidates(ctx, &scraped_updates.included_receipts)
.await,
)?;
self.participation.process_active_leaves_update(ctx, &update).await?;

if let Some(new_leaf) = update.activated {
Expand Down Expand Up @@ -308,7 +313,7 @@ impl Initialized {

// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel.
for votes in on_chain_votes {
for votes in scraped_updates.on_chain_votes {
let _ = self.process_on_chain_votes(ctx, overlay_db, votes, now).await.map_err(
|error| {
gum::warn!(
Expand Down Expand Up @@ -416,6 +421,8 @@ impl Initialized {
})
.collect();

// Importantly, handling import statements for backing votes also
// clears spam slots for any newly backed candidates
let import_result = self
.handle_import_statements(
ctx,
Expand Down Expand Up @@ -837,8 +844,15 @@ impl Initialized {
let new_state = import_result.new_state();

let is_included = self.scraper.is_candidate_included(&candidate_hash);

let potential_spam = !is_included && !new_state.is_confirmed() && !new_state.has_own_vote();
let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
let has_own_vote = new_state.has_own_vote();
let is_disputed = new_state.is_disputed();
let has_controlled_indices = !env.controlled_indices().is_empty();
let is_confirmed = new_state.is_confirmed();
let potential_spam =
!is_included && !is_backed && !new_state.is_confirmed() && !new_state.has_own_vote();
// We participate only in disputes which are included, backed or confirmed
let allow_participation = is_included || is_backed || is_confirmed;

gum::trace!(
target: LOG_TARGET,
Expand All @@ -851,8 +865,11 @@ impl Initialized {
"Is spam?"
);

// This check is responsible for all clearing of spam slots. It runs
// whenever a vote is imported from on or off chain, and decrements
// slots whenever a candidate is newly backed, confirmed, or has our
// own vote.
if !potential_spam {
// Former spammers have not been spammers after all:
self.spam_slots.clear(&(session, candidate_hash));

// Potential spam:
Expand Down Expand Up @@ -880,14 +897,6 @@ impl Initialized {
}
}

let has_own_vote = new_state.has_own_vote();
let is_disputed = new_state.is_disputed();
let has_controlled_indices = !env.controlled_indices().is_empty();
let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
let is_confirmed = new_state.is_confirmed();
// We participate only in disputes which are included, backed or confirmed
let allow_participation = is_included || is_backed || is_confirmed;

// Participate in dispute if we did not cast a vote before and actually have keys to cast a
// local vote. Disputes should fall in one of the categories below, otherwise we will refrain
// from participation:
Expand Down
16 changes: 16 additions & 0 deletions node/core/dispute-coordinator/src/participation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ pub use queues::{ParticipationPriority, ParticipationRequest, QueueError};
/// This should be a relatively low value, while we might have a speedup once we fetched the data,
/// due to multi-core architectures, but the fetching itself can not be improved by parallel
/// requests. This means that higher numbers make it harder for a single dispute to resolve fast.
#[cfg(not(test))]
const MAX_PARALLEL_PARTICIPATIONS: usize = 3;
#[cfg(test)]
pub(crate) const MAX_PARALLEL_PARTICIPATIONS: usize = 1;

/// Keep track of disputes we need to participate in.
///
Expand Down Expand Up @@ -212,6 +215,19 @@ impl Participation {
Ok(())
}

/// Moving any request concerning the given candidates from best-effort to
/// priority, ignoring any candidates that don't have any queued participation requests.
pub async fn bump_to_priority_for_candidates<Context>(
&mut self,
ctx: &mut Context,
included_receipts: &Vec<CandidateReceipt>,
) -> Result<()> {
for receipt in included_receipts {
self.queue.prioritize_if_present(ctx.sender(), receipt).await?;
}
Ok(())
}

/// Dequeue until `MAX_PARALLEL_PARTICIPATIONS` is reached.
async fn dequeue_until_capacity<Context>(
&mut self,
Expand Down
25 changes: 25 additions & 0 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,31 @@ impl Queues {
self.pop_best_effort().map(|d| d.1)
}

/// Reprioritizes any participation requests pertaining to the
/// passed candidates from best effort to priority.
pub async fn prioritize_if_present(
&mut self,
sender: &mut impl overseer::DisputeCoordinatorSenderTrait,
receipt: &CandidateReceipt,
) -> Result<()> {
let comparator = CandidateComparator::new(sender, receipt).await?;
self.prioritize_with_comparator(comparator)?;
Ok(())
}

fn prioritize_with_comparator(
&mut self,
comparator: CandidateComparator,
) -> std::result::Result<(), QueueError> {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}
if let Some(request) = self.best_effort.remove(&comparator) {
self.priority.insert(comparator, request);
}
Ok(())
}

fn queue_with_comparator(
&mut self,
comparator: CandidateComparator,
Expand Down
52 changes: 39 additions & 13 deletions node/core/dispute-coordinator/src/scraping/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use polkadot_node_subsystem::{
};
use polkadot_node_subsystem_util::runtime::{get_candidate_events, get_on_chain_votes};
use polkadot_primitives::v2::{
BlockNumber, CandidateEvent, CandidateHash, Hash, ScrapedOnChainVotes,
BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash, ScrapedOnChainVotes,
};

use crate::{
Expand All @@ -51,6 +51,24 @@ const LRU_OBSERVED_BLOCKS_CAPACITY: NonZeroUsize = match NonZeroUsize::new(20) {
None => panic!("Observed blocks cache size must be non-zero"),
};

/// ScrapedUpdates
///
/// Updates to on_chain_votes and included receipts for new active leaf and its unprocessed
/// ancestors.
///
/// on_chain_votes: New votes as seen on chain
/// included_receipts: Newly included parachain block candidate receipts as seen on chain
pub struct ScrapedUpdates {
pub on_chain_votes: Vec<ScrapedOnChainVotes>,
pub included_receipts: Vec<CandidateReceipt>,
}

impl ScrapedUpdates {
pub fn new() -> Self {
Self { on_chain_votes: Vec::new(), included_receipts: Vec::new() }
}
}

/// Chain scraper
///
/// Scrapes unfinalized chain in order to collect information from blocks.
Expand Down Expand Up @@ -104,8 +122,8 @@ impl ChainScraper {
};
let update =
ActiveLeavesUpdate { activated: Some(initial_head), deactivated: Default::default() };
let votes = s.process_active_leaves_update(sender, &update).await?;
Ok((s, votes))
let updates = s.process_active_leaves_update(sender, &update).await?;
Ok((s, updates.on_chain_votes))
}

/// Check whether we have seen a candidate included on any chain.
Expand All @@ -122,18 +140,19 @@ impl ChainScraper {
///
/// and updates current heads, so we can query candidates for all non finalized blocks.
///
/// Returns: On chain vote for the leaf and any ancestors we might not yet have seen.
/// Returns: On chain votes and included candidate receipts for the leaf and any
/// ancestors we might not yet have seen.
pub async fn process_active_leaves_update<Sender>(
&mut self,
sender: &mut Sender,
update: &ActiveLeavesUpdate,
) -> Result<Vec<ScrapedOnChainVotes>>
) -> Result<ScrapedUpdates>
where
Sender: overseer::DisputeCoordinatorSenderTrait,
{
let activated = match update.activated.as_ref() {
Some(activated) => activated,
None => return Ok(Vec::new()),
None => return Ok(ScrapedUpdates::new()),
};

// Fetch ancestry up to last finalized block.
Expand All @@ -147,20 +166,22 @@ impl ChainScraper {

let block_hashes = std::iter::once(activated.hash).chain(ancestors);

let mut on_chain_votes = Vec::new();
let mut scraped_updates = ScrapedUpdates::new();
for (block_number, block_hash) in block_numbers.zip(block_hashes) {
gum::trace!(?block_number, ?block_hash, "In ancestor processing.");

self.process_candidate_events(sender, block_number, block_hash).await?;
let receipts_for_block =
self.process_candidate_events(sender, block_number, block_hash).await?;
scraped_updates.included_receipts.extend(receipts_for_block);

if let Some(votes) = get_on_chain_votes(sender, block_hash).await? {
on_chain_votes.push(votes);
scraped_updates.on_chain_votes.push(votes);
}
}

self.last_observed_blocks.put(activated.hash, ());

Ok(on_chain_votes)
Ok(scraped_updates)
}

/// Prune finalized candidates.
Expand All @@ -187,17 +208,21 @@ impl ChainScraper {
/// Process candidate events of a block.
///
/// Keep track of all included and backed candidates.
///
/// Returns freshly included candidate receipts
async fn process_candidate_events<Sender>(
&mut self,
sender: &mut Sender,
block_number: BlockNumber,
block_hash: Hash,
) -> Result<()>
) -> Result<Vec<CandidateReceipt>>
where
Sender: overseer::DisputeCoordinatorSenderTrait,
{
let events = get_candidate_events(sender, block_hash).await?;
let mut included_receipts: Vec<CandidateReceipt> = Vec::new();
// Get included and backed events:
for ev in get_candidate_events(sender, block_hash).await? {
for ev in events {
match ev {
CandidateEvent::CandidateIncluded(receipt, _, _, _) => {
let candidate_hash = receipt.hash();
Expand All @@ -208,6 +233,7 @@ impl ChainScraper {
"Processing included event"
);
self.included_candidates.insert(block_number, candidate_hash);
included_receipts.push(receipt);
},
CandidateEvent::CandidateBacked(receipt, _, _, _) => {
let candidate_hash = receipt.hash();
Expand All @@ -224,7 +250,7 @@ impl ChainScraper {
},
}
}
Ok(())
Ok(included_receipts)
}

/// Returns ancestors of `head` in the descending order, stopping
Expand Down
Loading

0 comments on commit db498ce

Please sign in to comment.