diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index cbc8e02cbb62..4e4a8d0298ae 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -16,12 +16,16 @@ //! Dispute coordinator subsystem in initialized state (after first active leaf is received). -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, +}; use futures::{ channel::{mpsc, oneshot}, FutureExt, StreamExt, }; +use lru::LruCache; use sc_keystore::LocalKeystore; @@ -37,7 +41,7 @@ use polkadot_node_subsystem::{ overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SubsystemContext, }; use polkadot_node_subsystem_util::rolling_session_window::{ - RollingSessionWindow, SessionWindowUpdate, + RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable, }; use polkadot_primitives::{ v1::{ @@ -48,11 +52,12 @@ use polkadot_primitives::{ v2::SessionInfo, }; -use crate::{metrics::Metrics, real::DisputeCoordinatorSubsystem, LOG_TARGET}; - use crate::{ - error::{log_error, Fatal, FatalResult, NonFatal, NonFatalResult, Result}, + error::{log_error, Error, Fatal, FatalResult, NonFatal, NonFatalResult, Result}, + metrics::Metrics, + real::{ordering::get_finalized_block_number, DisputeCoordinatorSubsystem}, status::{get_active_with_status, Clock, DisputeStatus, Timestamp}, + LOG_TARGET, }; use super::{ @@ -66,6 +71,11 @@ use super::{ OverlayedBackend, }; +// The capacity and scrape depth are equal to the maximum allowed unfinalized depth. +const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 500; +// This is in sync with `MAX_FINALITY_LAG` in relay chain selection. +const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 500; + /// After the first active leaves update we transition to `Initialized` state. /// /// Before the first active leaves update we can't really do much. We cannot check incoming @@ -80,6 +90,11 @@ pub struct Initialized { ordering_provider: OrderingProvider, participation_receiver: WorkerMessageReceiver, metrics: Metrics, + // This tracks only rolling session window failures. + // It can be a `Vec` if the need to track more arises. + error: Option, + /// Latest relay blocks that have been successfully scraped. + last_scraped_blocks: LruCache, } impl Initialized { @@ -105,6 +120,8 @@ impl Initialized { participation, participation_receiver, metrics, + error: None, + last_scraped_blocks: LruCache::new(LRU_SCRAPED_BLOCKS_CAPACITY), } } @@ -245,22 +262,26 @@ impl Initialized { .await?; self.participation.process_active_leaves_update(ctx, &update).await?; - let new_activations = update.activated.into_iter().map(|a| a.hash); - for new_leaf in new_activations { - match self.rolling_session_window.cache_session_info_for_head(ctx, new_leaf).await { + if let Some(new_leaf) = update.activated { + match self + .rolling_session_window + .cache_session_info_for_head(ctx, new_leaf.hash) + .await + { Err(e) => { tracing::warn!( - target: LOG_TARGET, - err = ?e, - "Failed to update session cache for disputes", + target: LOG_TARGET, + err = ?e, + "Failed to update session cache for disputes", ); - continue + self.error = Some(e); }, Ok(SessionWindowUpdate::Advanced { new_window_end: window_end, new_window_start, .. }) => { + self.error = None; let session = window_end; if self.highest_session < session { tracing::trace!( @@ -277,7 +298,82 @@ impl Initialized { }, Ok(SessionWindowUpdate::Unchanged) => {}, }; - self.scrape_on_chain_votes(ctx, overlay_db, new_leaf, now).await?; + + // Scrape the head if above rolling session update went well. + if self.error.is_none() { + let _ = self + .scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now) + .await + .map_err(|err| { + tracing::warn!( + target: LOG_TARGET, + "Skipping scraping block #{}({}) due to error: {}", + new_leaf.number, + new_leaf.hash, + err + ); + }); + } + + // Try to scrape any blocks for which we could not get the current session or did not receive an + // active leaves update. + let ancestors = match get_finalized_block_number(ctx.sender()).await { + Ok(block_number) => { + // Limit our search to last finalized block, or up to max finality lag. + let block_number = std::cmp::max( + block_number, + new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS), + ); + // Fetch ancestry up to and including the last finalized block. + // `get_block_ancestors()` doesn't include the target block in the ancestry, so we'll need to + // pass in it's parent. + OrderingProvider::get_block_ancestors( + ctx.sender(), + new_leaf.hash, + new_leaf.number, + block_number.saturating_sub(1), + &mut self.last_scraped_blocks, + ) + .await + .unwrap_or_else(|err| { + tracing::debug!( + target: LOG_TARGET, + activated_leaf = ?new_leaf, + error = ?err, + "Skipping leaf ancestors due to an error", + ); + // We assume this is a spurious error so we'll move forward with an + // empty ancestry. + Vec::new() + }) + }, + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + activated_leaf = ?new_leaf, + error = ?err, + "Skipping leaf ancestors scraping", + ); + // We assume this is a spurious error so we'll move forward with an + // empty ancestry. + Vec::new() + }, + }; + + // The `runtime-api` subsystem has an internal queue which serializes the execution, + // so there is no point in running these in parallel. + for ancestor in ancestors { + let _ = self.scrape_on_chain_votes(ctx, overlay_db, ancestor, now).await.map_err( + |err| { + tracing::warn!( + target: LOG_TARGET, + hash = ?ancestor, + error = ?err, + "Skipping scraping block due to error", + ); + }, + ); + } } Ok(()) @@ -293,6 +389,11 @@ impl Initialized { new_leaf: Hash, now: u64, ) -> Result<()> { + // Avoid scraping twice. + if self.last_scraped_blocks.get(&new_leaf).is_some() { + return Ok(()) + } + // obtain the concluded disputes as well as the candidate backing votes // from the new leaf let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = { @@ -331,6 +432,9 @@ impl Initialized { }; if backing_validators_per_candidate.is_empty() && disputes.is_empty() { + // This block is not interesting as it doesnt contain any backing votes or disputes. We'll + // mark it here as scraped to prevent further processing. + self.last_scraped_blocks.put(new_leaf, ()); return Ok(()) } @@ -413,6 +517,7 @@ impl Initialized { } if disputes.is_empty() { + self.last_scraped_blocks.put(new_leaf, ()); return Ok(()) } @@ -490,6 +595,8 @@ impl Initialized { "Attempted import of on-chain statement of concluded dispute failed"), } } + + self.last_scraped_blocks.put(new_leaf, ()); Ok(()) } @@ -533,18 +640,39 @@ impl Initialized { } }, DisputeCoordinatorMessage::RecentDisputes(tx) => { - let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default(); + // Return error if session information is missing. + self.ensure_available_session_info()?; + + let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { + disputes + } else { + BTreeMap::new() + }; + let _ = tx.send(recent_disputes.keys().cloned().collect()); }, DisputeCoordinatorMessage::ActiveDisputes(tx) => { - let recent_disputes = - overlay_db.load_recent_disputes()?.unwrap_or_default().into_iter(); - let _ = - tx.send(get_active_with_status(recent_disputes, now).map(|(k, _)| k).collect()); + // Return error if session information is missing. + self.ensure_available_session_info()?; + + let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { + disputes + } else { + BTreeMap::new() + }; + + let _ = tx.send( + get_active_with_status(recent_disputes.into_iter(), now) + .map(|(k, _)| k) + .collect(), + ); }, DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => { + // Return error if session information is missing. + self.ensure_available_session_info()?; + let mut query_output = Vec::new(); - for (session_index, candidate_hash) in query.into_iter() { + for (session_index, candidate_hash) in query { if let Some(v) = overlay_db.load_candidate_votes(session_index, &candidate_hash)? { @@ -581,6 +709,9 @@ impl Initialized { block_descriptions, tx, } => { + // Return error if session information is missing. + self.ensure_available_session_info()?; + let undisputed_chain = determine_undisputed_chain( overlay_db, base_number, @@ -595,6 +726,15 @@ impl Initialized { Ok(Box::new(|| Ok(()))) } + // Helper function for checking subsystem errors in message processing. + fn ensure_available_session_info(&self) -> Result<()> { + if let Some(subsystem_error) = self.error.clone() { + return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error))) + } + + Ok(()) + } + async fn handle_import_statements( &mut self, ctx: &mut impl SubsystemContext, diff --git a/node/core/dispute-coordinator/src/real/mod.rs b/node/core/dispute-coordinator/src/real/mod.rs index 5446d91ddc45..6d6d7be85abc 100644 --- a/node/core/dispute-coordinator/src/real/mod.rs +++ b/node/core/dispute-coordinator/src/real/mod.rs @@ -203,6 +203,10 @@ impl DisputeCoordinatorSubsystem { }, }; + // Before we move to the initialized state we need to check if we got at + // least on finality notification to prevent large ancestry block scraping, + // when the node is syncing. + let mut overlay_db = OverlayedBackend::new(&mut backend); let (participations, spam_slots, ordering_provider) = match self .handle_startup( diff --git a/node/core/dispute-coordinator/src/real/ordering/mod.rs b/node/core/dispute-coordinator/src/real/ordering/mod.rs index c6f1a89904af..52650a9cd252 100644 --- a/node/core/dispute-coordinator/src/real/ordering/mod.rs +++ b/node/core/dispute-coordinator/src/real/ordering/mod.rs @@ -184,19 +184,43 @@ impl OrderingProvider { update: &ActiveLeavesUpdate, ) -> Result<()> { if let Some(activated) = update.activated.as_ref() { - // Fetch ancestors of the activated leaf. - let ancestors = self - .get_block_ancestors(sender, activated.hash, activated.number) - .await - .unwrap_or_else(|err| { + // Fetch last finalized block. + let ancestors = match get_finalized_block_number(sender).await { + Ok(block_number) => { + // Fetch ancestry up to last finalized block. + Self::get_block_ancestors( + sender, + activated.hash, + activated.number, + block_number, + &mut self.last_observed_blocks, + ) + .await + .unwrap_or_else(|err| { + tracing::debug!( + target: LOG_TARGET, + activated_leaf = ?activated, + error = ?err, + "Skipping leaf ancestors due to an error", + ); + // We assume this is a spurious error so we'll move forward with an + // empty ancestry. + Vec::new() + }) + }, + Err(err) => { tracing::debug!( target: LOG_TARGET, activated_leaf = ?activated, - "Skipping leaf ancestors due to an error: {}", - err + error = ?err, + "Failed to retrieve last finalized block number", ); + // We assume this is a spurious error so we'll move forward with an + // empty ancestry. Vec::new() - }); + }, + }; + // Ancestors block numbers are consecutive in the descending order. let earliest_block_number = activated.number - ancestors.len() as u32; let block_numbers = (earliest_block_number..=activated.number).rev(); @@ -242,23 +266,22 @@ impl OrderingProvider { } /// Returns ancestors of `head` in the descending order, stopping - /// either at the block present in cache or the latest finalized block. + /// either at the block present in cache or at `target_ancestor`. /// /// Suited specifically for querying non-finalized chains, thus /// doesn't rely on block numbers. /// /// Both `head` and last are **not** included in the result. - async fn get_block_ancestors( - &mut self, + pub async fn get_block_ancestors( sender: &mut Sender, mut head: Hash, mut head_number: BlockNumber, + target_ancestor: BlockNumber, + lookup_cache: &mut LruCache, ) -> Result> { let mut ancestors = Vec::new(); - let finalized_block_number = get_finalized_block_number(sender).await?; - - if self.last_observed_blocks.get(&head).is_some() || head_number <= finalized_block_number { + if lookup_cache.get(&head).is_some() || head_number <= target_ancestor { return Ok(ancestors) } @@ -297,10 +320,10 @@ impl OrderingProvider { let block_numbers = (earliest_block_number..head_number).rev(); for (block_number, hash) in block_numbers.zip(&hashes) { - // Return if we either met finalized/cached block or + // Return if we either met target/cached block or // hit the size limit for the returned ancestry of head. - if self.last_observed_blocks.get(hash).is_some() || - block_number <= finalized_block_number || + if lookup_cache.get(hash).is_some() || + block_number <= target_ancestor || ancestors.len() >= Self::ANCESTRY_SIZE_LIMIT { return Ok(ancestors) @@ -345,7 +368,9 @@ async fn get_block_number( send_message_fatal(sender, ChainApiMessage::BlockNumber(relay_parent, tx), rx).await } -async fn get_finalized_block_number(sender: &mut impl SubsystemSender) -> FatalResult { +pub async fn get_finalized_block_number( + sender: &mut impl SubsystemSender, +) -> FatalResult { let (number_tx, number_rx) = oneshot::channel(); send_message_fatal(sender, ChainApiMessage::FinalizedBlockNumber(number_tx), number_rx).await } diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index 56789e2aa60a..59f97ab7e2a3 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -54,7 +54,7 @@ use std::sync::Arc; /// /// This is a safety net that should be removed at some point in the future. // Until it's not, make sure to also update `MAX_HEADS_LOOK_BACK` in `approval-voting` -// when changing its value. +// and `MAX_BATCH_SCRAPE_ANCESTORS` in `dispute-coordinator` when changing its value. const MAX_FINALITY_LAG: polkadot_primitives::v1::BlockNumber = 500; const LOG_TARGET: &str = "parachain::chain-selection"; @@ -522,15 +522,32 @@ where std::any::type_name::(), ) .await; - let (subchain_number, subchain_head) = rx - .await - .map_err(Error::DetermineUndisputedChainCanceled) - .map_err(|e| ConsensusError::Other(Box::new(e)))?; - // The the total lag accounting for disputes. - let lag_disputes = initial_leaf_number.saturating_sub(subchain_number); - self.metrics.note_disputes_finality_lag(lag_disputes); - (lag_disputes, subchain_head) + // Try to fetch response from `dispute-coordinator`. If an error occurs we just log it + // and return `target_hash` as maximal vote. It is safer to contain this error here + // and not push it up the stack to cause additional issues in GRANDPA/BABE. + let (lag, subchain_head) = + match rx.await.map_err(Error::DetermineUndisputedChainCanceled) { + // If request succeded we will receive (block number, block hash). + Ok((subchain_number, subchain_head)) => { + // The the total lag accounting for disputes. + let lag_disputes = initial_leaf_number.saturating_sub(subchain_number); + self.metrics.note_disputes_finality_lag(lag_disputes); + (lag_disputes, subchain_head) + }, + Err(e) => { + tracing::error!( + target: LOG_TARGET, + error = ?e, + "Call to `DetermineUndisputedChain` failed", + ); + // We need to return a sane finality target. But, we are unable to ensure we are not + // finalizing something that is being disputed or has been concluded as invalid. We will be + // conservative here and not vote for finality above the ancestor passed in. + return Ok(target_hash) + }, + }; + (lag, subchain_head) } else { (lag, subchain_head) }; diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs index 95fb633349c1..5e11d2fe5446 100644 --- a/node/subsystem-util/src/rolling_session_window.rs +++ b/node/subsystem-util/src/rolling_session_window.rs @@ -34,8 +34,8 @@ use polkadot_node_subsystem::{ use thiserror::Error; /// Sessions unavailable in state to cache. -#[derive(Debug)] -pub enum SessionsUnavailableKind { +#[derive(Debug, Clone)] +pub enum SessionsUnavailableReason { /// Runtime API subsystem was unavailable. RuntimeApiUnavailable(oneshot::Canceled), /// The runtime API itself returned an error. @@ -45,7 +45,7 @@ pub enum SessionsUnavailableKind { } /// Information about the sessions being fetched. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SessionsUnavailableInfo { /// The desired window start. pub window_start: SessionIndex, @@ -56,10 +56,10 @@ pub struct SessionsUnavailableInfo { } /// Sessions were unavailable to fetch from the state for some reason. -#[derive(Debug, Error)] +#[derive(Debug, Error, Clone)] pub struct SessionsUnavailable { /// The error kind. - kind: SessionsUnavailableKind, + kind: SessionsUnavailableReason, /// The info about the session window, if any. info: Option, } @@ -229,12 +229,12 @@ async fn get_session_index_for_head( Ok(Ok(s)) => Ok(s), Ok(Err(e)) => return Err(SessionsUnavailable { - kind: SessionsUnavailableKind::RuntimeApi(e), + kind: SessionsUnavailableReason::RuntimeApi(e), info: None, }), Err(e) => return Err(SessionsUnavailable { - kind: SessionsUnavailableKind::RuntimeApiUnavailable(e), + kind: SessionsUnavailableReason::RuntimeApiUnavailable(e), info: None, }), } @@ -245,7 +245,7 @@ async fn load_all_sessions( block_hash: Hash, start: SessionIndex, end_inclusive: SessionIndex, -) -> Result, SessionsUnavailableKind> { +) -> Result, SessionsUnavailableReason> { let mut v = Vec::new(); for i in start..=end_inclusive { let (tx, rx) = oneshot::channel(); @@ -257,9 +257,9 @@ async fn load_all_sessions( let session_info = match rx.await { Ok(Ok(Some(s))) => s, - Ok(Ok(None)) => return Err(SessionsUnavailableKind::Missing(i)), - Ok(Err(e)) => return Err(SessionsUnavailableKind::RuntimeApi(e)), - Err(canceled) => return Err(SessionsUnavailableKind::RuntimeApiUnavailable(canceled)), + Ok(Ok(None)) => return Err(SessionsUnavailableReason::Missing(i)), + Ok(Err(e)) => return Err(SessionsUnavailableReason::RuntimeApi(e)), + Err(canceled) => return Err(SessionsUnavailableReason::RuntimeApiUnavailable(canceled)), }; v.push(session_info);