Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
relay chain selection and dispute-coordinator fixes and improveme…
Browse files Browse the repository at this point in the history
…nts (#4752)

* Dont error in finality_target_with_longest_chain

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Add error flag

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Add error flag in dispute-coordinator

Make sure to send errors to subsystems requesting data depending on missing session info

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Scrape ancestors

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fmt

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Fix naming

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* review feedback

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fmt

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* 💬 fixes

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* consume

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* typo

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* review fixes

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Bump scraped blocks LRU capacity

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* 🧯 🔥

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* remove prints

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Increase scraped blocks cache size

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* more review fixes

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* another fix

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix target_ancestor

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Scrape up to max finalized block

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* undo comment change

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Limit ancestry lookup to last finalized block or
max finality lag

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* debug damage

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
  • Loading branch information
sandreim authored Jan 26, 2022
1 parent 47a8eb7 commit 102bc8c
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 57 deletions.
178 changes: 159 additions & 19 deletions node/core/dispute-coordinator/src/real/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

//! Dispute coordinator subsystem in initialized state (after first active leaf is received).
use std::{collections::HashSet, sync::Arc};
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};

use futures::{
channel::{mpsc, oneshot},
FutureExt, StreamExt,
};
use lru::LruCache;

use sc_keystore::LocalKeystore;

Expand All @@ -37,7 +41,7 @@ use polkadot_node_subsystem::{
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SubsystemContext,
};
use polkadot_node_subsystem_util::rolling_session_window::{
RollingSessionWindow, SessionWindowUpdate,
RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable,
};
use polkadot_primitives::{
v1::{
Expand All @@ -48,11 +52,12 @@ use polkadot_primitives::{
v2::SessionInfo,
};

use crate::{metrics::Metrics, real::DisputeCoordinatorSubsystem, LOG_TARGET};

use crate::{
error::{log_error, Fatal, FatalResult, NonFatal, NonFatalResult, Result},
error::{log_error, Error, Fatal, FatalResult, NonFatal, NonFatalResult, Result},
metrics::Metrics,
real::{ordering::get_finalized_block_number, DisputeCoordinatorSubsystem},
status::{get_active_with_status, Clock, DisputeStatus, Timestamp},
LOG_TARGET,
};

use super::{
Expand All @@ -66,6 +71,11 @@ use super::{
OverlayedBackend,
};

// The capacity and scrape depth are equal to the maximum allowed unfinalized depth.
const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 500;
// This is in sync with `MAX_FINALITY_LAG` in relay chain selection.
const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 500;

/// After the first active leaves update we transition to `Initialized` state.
///
/// Before the first active leaves update we can't really do much. We cannot check incoming
Expand All @@ -80,6 +90,11 @@ pub struct Initialized {
ordering_provider: OrderingProvider,
participation_receiver: WorkerMessageReceiver,
metrics: Metrics,
// This tracks only rolling session window failures.
// It can be a `Vec` if the need to track more arises.
error: Option<SessionsUnavailable>,
/// Latest relay blocks that have been successfully scraped.
last_scraped_blocks: LruCache<Hash, ()>,
}

impl Initialized {
Expand All @@ -105,6 +120,8 @@ impl Initialized {
participation,
participation_receiver,
metrics,
error: None,
last_scraped_blocks: LruCache::new(LRU_SCRAPED_BLOCKS_CAPACITY),
}
}

Expand Down Expand Up @@ -245,22 +262,26 @@ impl Initialized {
.await?;
self.participation.process_active_leaves_update(ctx, &update).await?;

let new_activations = update.activated.into_iter().map(|a| a.hash);
for new_leaf in new_activations {
match self.rolling_session_window.cache_session_info_for_head(ctx, new_leaf).await {
if let Some(new_leaf) = update.activated {
match self
.rolling_session_window
.cache_session_info_for_head(ctx, new_leaf.hash)
.await
{
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to update session cache for disputes",
target: LOG_TARGET,
err = ?e,
"Failed to update session cache for disputes",
);
continue
self.error = Some(e);
},
Ok(SessionWindowUpdate::Advanced {
new_window_end: window_end,
new_window_start,
..
}) => {
self.error = None;
let session = window_end;
if self.highest_session < session {
tracing::trace!(
Expand All @@ -277,7 +298,82 @@ impl Initialized {
},
Ok(SessionWindowUpdate::Unchanged) => {},
};
self.scrape_on_chain_votes(ctx, overlay_db, new_leaf, now).await?;

// Scrape the head if above rolling session update went well.
if self.error.is_none() {
let _ = self
.scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now)
.await
.map_err(|err| {
tracing::warn!(
target: LOG_TARGET,
"Skipping scraping block #{}({}) due to error: {}",
new_leaf.number,
new_leaf.hash,
err
);
});
}

// Try to scrape any blocks for which we could not get the current session or did not receive an
// active leaves update.
let ancestors = match get_finalized_block_number(ctx.sender()).await {
Ok(block_number) => {
// Limit our search to last finalized block, or up to max finality lag.
let block_number = std::cmp::max(
block_number,
new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS),
);
// Fetch ancestry up to and including the last finalized block.
// `get_block_ancestors()` doesn't include the target block in the ancestry, so we'll need to
// pass in it's parent.
OrderingProvider::get_block_ancestors(
ctx.sender(),
new_leaf.hash,
new_leaf.number,
block_number.saturating_sub(1),
&mut self.last_scraped_blocks,
)
.await
.unwrap_or_else(|err| {
tracing::debug!(
target: LOG_TARGET,
activated_leaf = ?new_leaf,
error = ?err,
"Skipping leaf ancestors due to an error",
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
})
},
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
activated_leaf = ?new_leaf,
error = ?err,
"Skipping leaf ancestors scraping",
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
},
};

// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel.
for ancestor in ancestors {
let _ = self.scrape_on_chain_votes(ctx, overlay_db, ancestor, now).await.map_err(
|err| {
tracing::warn!(
target: LOG_TARGET,
hash = ?ancestor,
error = ?err,
"Skipping scraping block due to error",
);
},
);
}
}

Ok(())
Expand All @@ -293,6 +389,11 @@ impl Initialized {
new_leaf: Hash,
now: u64,
) -> Result<()> {
// Avoid scraping twice.
if self.last_scraped_blocks.get(&new_leaf).is_some() {
return Ok(())
}

// obtain the concluded disputes as well as the candidate backing votes
// from the new leaf
let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = {
Expand Down Expand Up @@ -331,6 +432,9 @@ impl Initialized {
};

if backing_validators_per_candidate.is_empty() && disputes.is_empty() {
// This block is not interesting as it doesnt contain any backing votes or disputes. We'll
// mark it here as scraped to prevent further processing.
self.last_scraped_blocks.put(new_leaf, ());
return Ok(())
}

Expand Down Expand Up @@ -413,6 +517,7 @@ impl Initialized {
}

if disputes.is_empty() {
self.last_scraped_blocks.put(new_leaf, ());
return Ok(())
}

Expand Down Expand Up @@ -490,6 +595,8 @@ impl Initialized {
"Attempted import of on-chain statement of concluded dispute failed"),
}
}

self.last_scraped_blocks.put(new_leaf, ());
Ok(())
}

Expand Down Expand Up @@ -533,18 +640,39 @@ impl Initialized {
}
},
DisputeCoordinatorMessage::RecentDisputes(tx) => {
let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
// Return error if session information is missing.
self.ensure_available_session_info()?;

let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
disputes
} else {
BTreeMap::new()
};

let _ = tx.send(recent_disputes.keys().cloned().collect());
},
DisputeCoordinatorMessage::ActiveDisputes(tx) => {
let recent_disputes =
overlay_db.load_recent_disputes()?.unwrap_or_default().into_iter();
let _ =
tx.send(get_active_with_status(recent_disputes, now).map(|(k, _)| k).collect());
// Return error if session information is missing.
self.ensure_available_session_info()?;

let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
disputes
} else {
BTreeMap::new()
};

let _ = tx.send(
get_active_with_status(recent_disputes.into_iter(), now)
.map(|(k, _)| k)
.collect(),
);
},
DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => {
// Return error if session information is missing.
self.ensure_available_session_info()?;

let mut query_output = Vec::new();
for (session_index, candidate_hash) in query.into_iter() {
for (session_index, candidate_hash) in query {
if let Some(v) =
overlay_db.load_candidate_votes(session_index, &candidate_hash)?
{
Expand Down Expand Up @@ -581,6 +709,9 @@ impl Initialized {
block_descriptions,
tx,
} => {
// Return error if session information is missing.
self.ensure_available_session_info()?;

let undisputed_chain = determine_undisputed_chain(
overlay_db,
base_number,
Expand All @@ -595,6 +726,15 @@ impl Initialized {
Ok(Box::new(|| Ok(())))
}

// Helper function for checking subsystem errors in message processing.
fn ensure_available_session_info(&self) -> Result<()> {
if let Some(subsystem_error) = self.error.clone() {
return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error)))
}

Ok(())
}

async fn handle_import_statements(
&mut self,
ctx: &mut impl SubsystemContext,
Expand Down
4 changes: 4 additions & 0 deletions node/core/dispute-coordinator/src/real/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ impl DisputeCoordinatorSubsystem {
},
};

// Before we move to the initialized state we need to check if we got at
// least on finality notification to prevent large ancestry block scraping,
// when the node is syncing.

let mut overlay_db = OverlayedBackend::new(&mut backend);
let (participations, spam_slots, ordering_provider) = match self
.handle_startup(
Expand Down
Loading

0 comments on commit 102bc8c

Please sign in to comment.