From 5e6e720074de718416c4588506d1989dc626db24 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Sun, 26 Mar 2023 15:07:17 +0300 Subject: [PATCH 01/40] Pass `SessionInfo` directly to `CandidateEnvironment::new` otherwise it should be an async function --- node/core/dispute-coordinator/src/import.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/node/core/dispute-coordinator/src/import.rs b/node/core/dispute-coordinator/src/import.rs index 4f6edc5fcef0..4097c8c2c683 100644 --- a/node/core/dispute-coordinator/src/import.rs +++ b/node/core/dispute-coordinator/src/import.rs @@ -31,7 +31,6 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use polkadot_node_primitives::{ disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement, Timestamp, }; -use polkadot_node_subsystem_util::rolling_session_window::RollingSessionWindow; use polkadot_primitives::{ CandidateReceipt, DisputeStatement, IndexedVec, SessionIndex, SessionInfo, ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature, @@ -56,10 +55,9 @@ impl<'a> CandidateEnvironment<'a> { /// Return: `None` in case session is outside of session window. pub fn new( keystore: &LocalKeystore, - session_window: &'a RollingSessionWindow, + session: &'a SessionInfo, session_index: SessionIndex, ) -> Option { - let session = session_window.session_info(session_index)?; let controlled_indices = find_controlled_validator_indices(keystore, &session.validators); Some(Self { session_index, session, controlled_indices }) } From 2e8f1ec789d218ee9d7f761fd651d22660772126 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 27 Mar 2023 13:56:40 +0300 Subject: [PATCH 02/40] Replace calls to `RollingSessionWindow` with `RuntimeInfo` Adjust `dispute-coordinator` initialization to use `RuntimeInfo` --- .../dispute-coordinator/src/initialized.rs | 90 +++++++++---------- node/core/dispute-coordinator/src/lib.rs | 79 +++++++++------- 2 files changed, 89 insertions(+), 80 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index f1e6d516d6cf..0078edd4fc91 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -26,7 +26,8 @@ use futures::{ use sc_keystore::LocalKeystore; use polkadot_node_primitives::{ - disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement, Timestamp, + disputes::ValidCandidateVotes, new_session_window_size, CandidateVotes, DisputeStatus, + SessionWindowSize, SignedDisputeStatement, Timestamp, }; use polkadot_node_subsystem::{ messages::{ @@ -35,8 +36,9 @@ use polkadot_node_subsystem::{ }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, }; -use polkadot_node_subsystem_util::rolling_session_window::{ - RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable, +use polkadot_node_subsystem_util::{ + rolling_session_window::{SessionWindowUpdate, SessionsUnavailable}, + runtime::RuntimeInfo, }; use polkadot_primitives::{ BlockNumber, CandidateHash, CandidateReceipt, CompactStatement, DisputeStatement, @@ -65,6 +67,8 @@ use super::{ OverlayedBackend, }; +const SESSION_WINDOW_SIZE: SessionWindowSize = new_session_window_size!(6); + /// After the first active leaves update we transition to `Initialized` state. /// /// Before the first active leaves update we can't really do much. We cannot check incoming @@ -72,7 +76,7 @@ use super::{ /// ... pub struct Initialized { keystore: Arc, - rolling_session_window: RollingSessionWindow, + runtime_info: RuntimeInfo, highest_session: SessionIndex, spam_slots: SpamSlots, participation: Participation, @@ -89,19 +93,19 @@ impl Initialized { /// Make initialized subsystem, ready to `run`. pub fn new( subsystem: DisputeCoordinatorSubsystem, - rolling_session_window: RollingSessionWindow, + runtime_info: RuntimeInfo, spam_slots: SpamSlots, scraper: ChainScraper, + highest_session: SessionIndex, ) -> Self { let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem; let (participation_sender, participation_receiver) = mpsc::channel(1); let participation = Participation::new(participation_sender, metrics.clone()); - let highest_session = rolling_session_window.latest_session(); Self { keystore, - rolling_session_window, + runtime_info, highest_session, spam_slots, scraper, @@ -360,23 +364,13 @@ impl Initialized { // the new active leaf as if we received them via gossip. for (candidate_receipt, backers) in backing_validators_per_candidate { // Obtain the session info, for sake of `ValidatorId`s - // either from the rolling session window. - // Must be called _after_ `fn cache_session_info_for_head` - // which guarantees that the session info is available - // for the current session. - let session_info: &SessionInfo = - if let Some(session_info) = self.rolling_session_window.session_info(session) { - session_info - } else { - gum::warn!( - target: LOG_TARGET, - ?session, - "Could not retrieve session info from rolling session window", - ); - return Ok(()) - }; - let relay_parent = candidate_receipt.descriptor.relay_parent; + let session_info: &SessionInfo = &self + .runtime_info + .get_session_info_by_index(ctx.sender(), relay_parent, session) // TODO: is it correct to use `relay_parent` here?? + .await? + .session_info; + let candidate_hash = candidate_receipt.hash(); gum::trace!( target: LOG_TARGET, @@ -470,18 +464,11 @@ impl Initialized { ?session, "Importing dispute votes from chain for candidate" ); - let session_info = - if let Some(session_info) = self.rolling_session_window.session_info(session) { - session_info - } else { - gum::warn!( - target: LOG_TARGET, - ?candidate_hash, - ?session, - "Could not retrieve session info from rolling session window for recently concluded dispute" - ); - continue - }; + let session_info = &self + .runtime_info + .get_session_info_by_index(ctx.sender(), fix_me, session) + .await? + .session_info; let statements = statements .into_iter() @@ -717,16 +704,18 @@ impl Initialized { now: Timestamp, ) -> FatalResult { gum::trace!(target: LOG_TARGET, ?statements, "In handle import statements"); - if !self.rolling_session_window.contains(session) { + if self.session_is_ancient(session) { // It is not valid to participate in an ancient dispute (spam?) or too new. return Ok(ImportStatementsResult::InvalidImport) } - let env = match CandidateEnvironment::new( - &self.keystore, - &self.rolling_session_window, - session, - ) { + let session_info = &self + .runtime_info + .get_session_info_by_index(ctx.sender(), fix_me, session) + .await? + .session_info; + + let env = match CandidateEnvironment::new(&self.keystore, session_info, session) { None => { gum::warn!( target: LOG_TARGET, @@ -1120,12 +1109,17 @@ impl Initialized { ?now, "Issuing local statement for candidate!" ); + let session_info = &self + .runtime_info + .get_session_info_by_index( + ctx.sender(), + candidate_receipt.descriptor.relay_parent, + session, + ) + .await? + .session_info; // Load environment: - let env = match CandidateEnvironment::new( - &self.keystore, - &self.rolling_session_window, - session, - ) { + let env = match CandidateEnvironment::new(&self.keystore, session_info, session) { None => { gum::warn!( target: LOG_TARGET, @@ -1233,6 +1227,10 @@ impl Initialized { Ok(()) } + + fn session_is_ancient(self, session_idx: SessionIndex) -> bool { + return session_idx < self.highest_session.saturating_sub(6 - 1) // TODO: use constant here + } } /// Messages to be handled in this subsystem. diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index d82c3a06c65e..e6a250a91f08 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -41,6 +41,7 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_util::{ database::Database, rolling_session_window::{DatabaseParams, RollingSessionWindow}, + runtime::RuntimeInfo, }; use polkadot_primitives::{DisputeStatement, ScrapedOnChainVotes, SessionInfo, ValidatorIndex}; @@ -213,28 +214,20 @@ impl DisputeCoordinatorSubsystem { let db_params = DatabaseParams { db: self.store.clone(), db_column: self.config.col_session_data }; - let (first_leaf, rolling_session_window) = - match get_rolling_session_window(ctx, db_params).await { - Ok(Some(update)) => update, - Ok(None) => { - gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting"); - return Ok(None) - }, - Err(e) => { - e.split()?.log(); - continue - }, - }; + let first_leaf = match wait_for_first_leaf(ctx).await { + Ok(Some(activated_leaf)) => activated_leaf, + Ok(None) => continue, + Err(e) => { + e.split()?.log(); + continue + }, + }; + let mut runtime_info = RuntimeInfo::new(None); + // TODO: fill in the cache here? let mut overlay_db = OverlayedBackend::new(&mut backend); let (participations, votes, spam_slots, ordering_provider) = match self - .handle_startup( - ctx, - first_leaf.clone(), - &rolling_session_window, - &mut overlay_db, - clock, - ) + .handle_startup(ctx, first_leaf.clone(), &mut runtime_info, &mut overlay_db, clock) .await { Ok(v) => v, @@ -248,11 +241,21 @@ impl DisputeCoordinatorSubsystem { backend.write(ops)?; } + // We assume the highest session means in the new context + let highest_session = + runtime_info.get_session_index_for_child(ctx.sender(), first_leaf.hash).await?; + return Ok(Some(( participations, votes, first_leaf, - Initialized::new(self, rolling_session_window, spam_slots, ordering_provider), + Initialized::new( + self, + runtime_info, + spam_slots, + ordering_provider, + highest_session, + ), backend, ))) } @@ -267,7 +270,7 @@ impl DisputeCoordinatorSubsystem { &self, ctx: &mut Context, initial_head: ActivatedLeaf, - rolling_session_window: &RollingSessionWindow, + runtime_info: &mut RuntimeInfo, overlay_db: &mut OverlayedBackend<'_, impl Backend>, clock: &dyn Clock, ) -> Result<( @@ -276,8 +279,13 @@ impl DisputeCoordinatorSubsystem { SpamSlots, ChainScraper, )> { + // TODO: this is supposed to be 'earliest session'. What it means in this context? + let session_idx_at_startup = runtime_info + .get_session_index_for_child(ctx.sender(), initial_head.hash) + .await?; + // Prune obsolete disputes: - db::v1::note_earliest_session(overlay_db, rolling_session_window.earliest_session())?; + db::v1::note_earliest_session(overlay_db, session_idx_at_startup)?; let now = clock.now(); @@ -292,23 +300,26 @@ impl DisputeCoordinatorSubsystem { }, }; + let session_info = &runtime_info + .get_session_info(ctx.sender(), initial_head.hash) + .await? + .session_info; let mut participation_requests = Vec::new(); let mut spam_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new(); let (scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?; for ((session, ref candidate_hash), _) in active_disputes { - let env = - match CandidateEnvironment::new(&self.keystore, &rolling_session_window, session) { - None => { - gum::warn!( - target: LOG_TARGET, - session, - "We are lacking a `SessionInfo` for handling db votes on startup." - ); + let env = match CandidateEnvironment::new(&self.keystore, &session_info, session) { + None => { + gum::warn!( + target: LOG_TARGET, + session, + "We are lacking a `SessionInfo` for handling db votes on startup." + ); - continue - }, - Some(env) => env, - }; + continue + }, + Some(env) => env, + }; let votes: CandidateVotes = match overlay_db.load_candidate_votes(session, candidate_hash) { From 0fe349bf6332e7f10a8f91b77e78e3d06de0d5f8 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 27 Mar 2023 15:38:59 +0300 Subject: [PATCH 03/40] Modify `dispute-coordinator` initialization --- .../dispute-coordinator/src/initialized.rs | 45 +++++++++---------- node/core/dispute-coordinator/src/lib.rs | 4 +- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 0078edd4fc91..f9a8d95a3fb8 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -69,12 +69,19 @@ use super::{ const SESSION_WINDOW_SIZE: SessionWindowSize = new_session_window_size!(6); +// Initial data for `dispute-coordinator`. It is provided only at first start. +pub struct InitialData { + pub participations: Vec<(ParticipationPriority, ParticipationRequest)>, + pub votes: Vec, + pub leaf: ActivatedLeaf, +} + /// After the first active leaves update we transition to `Initialized` state. /// /// Before the first active leaves update we can't really do much. We cannot check incoming /// statements for validity, we cannot query orderings, we have no valid `RollingSessionWindow`, /// ... -pub struct Initialized { +pub(crate) struct Initialized { keystore: Arc, runtime_info: RuntimeInfo, highest_session: SessionIndex, @@ -118,30 +125,20 @@ impl Initialized { /// Run the initialized subsystem. /// - /// Optionally supply initial participations and a first leaf to process. + /// `initialization_data` is optional. It is passed on first start and is `None` on subsystem restarts. pub async fn run( mut self, mut ctx: Context, mut backend: B, - mut participations: Vec<(ParticipationPriority, ParticipationRequest)>, - mut votes: Vec, - mut first_leaf: Option, + mut initial_data: Option, clock: Box, ) -> FatalResult<()> where B: Backend, { loop { - let res = self - .run_until_error( - &mut ctx, - &mut backend, - &mut participations, - &mut votes, - &mut first_leaf, - &*clock, - ) - .await; + let res = + self.run_until_error(&mut ctx, &mut backend, &mut initial_data, &*clock).await; if let Ok(()) = res { gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting"); return Ok(()) @@ -159,21 +156,21 @@ impl Initialized { &mut self, ctx: &mut Context, backend: &mut B, - participations: &mut Vec<(ParticipationPriority, ParticipationRequest)>, - on_chain_votes: &mut Vec, - first_leaf: &mut Option, + initial_data: &mut Option, clock: &dyn Clock, ) -> Result<()> where B: Backend, { - for (priority, request) in participations.drain(..) { - self.participation.queue_participation(ctx, priority, request).await?; - } - + if let Some(InitialData { participations, votes: on_chain_votes, leaf: first_leaf }) = + initial_data.take() { + for (priority, request) in participations { + self.participation.queue_participation(ctx, priority, request).await?; + } + let mut overlay_db = OverlayedBackend::new(backend); - for votes in on_chain_votes.drain(..) { + for votes in on_chain_votes { let _ = self .process_on_chain_votes(ctx, &mut overlay_db, votes, clock.now()) .await @@ -189,9 +186,7 @@ impl Initialized { let ops = overlay_db.into_write_ops(); backend.write(ops)?; } - } - if let Some(first_leaf) = first_leaf.take() { // Also provide first leaf to participation for good measure. self.participation .process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf)) diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index e6a250a91f08..76b13299fc51 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -66,7 +66,7 @@ pub(crate) mod error; /// Subsystem after receiving the first active leaf. mod initialized; -use initialized::Initialized; +use initialized::{InitialData, Initialized}; /// Provider of data scraped from chain. /// @@ -188,7 +188,7 @@ impl DisputeCoordinatorSubsystem { }; initialized - .run(ctx, backend, participations, votes, Some(first_leaf), clock) + .run(ctx, backend, Some(InitialData { participations, votes, leaf: first_leaf }), clock) .await } From bd4e7b61e018f25eb7e3692ac83c52c54109b9bb Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 27 Mar 2023 15:45:34 +0300 Subject: [PATCH 04/40] Pass `Hash` to `process_on_chain_votes` so that `RuntimeInfo` calls can be made Remove some fixmes --- .../dispute-coordinator/src/initialized.rs | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index f9a8d95a3fb8..8ab1d5d16198 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -172,7 +172,13 @@ impl Initialized { let mut overlay_db = OverlayedBackend::new(backend); for votes in on_chain_votes { let _ = self - .process_on_chain_votes(ctx, &mut overlay_db, votes, clock.now()) + .process_on_chain_votes( + ctx, + &mut overlay_db, + votes, + clock.now(), + first_leaf.hash, + ) .await .map_err(|error| { gum::warn!( @@ -324,15 +330,16 @@ impl Initialized { // The `runtime-api` subsystem has an internal queue which serializes the execution, // so there is no point in running these in parallel for votes in scraped_updates.on_chain_votes { - let _ = self.process_on_chain_votes(ctx, overlay_db, votes, now).await.map_err( - |error| { + let _ = self + .process_on_chain_votes(ctx, overlay_db, votes, now, new_leaf.hash) + .await + .map_err(|error| { gum::warn!( target: LOG_TARGET, ?error, "Skipping scraping block due to error", ); - }, - ); + }); } } @@ -348,6 +355,7 @@ impl Initialized { overlay_db: &mut OverlayedBackend<'_, impl Backend>, votes: ScrapedOnChainVotes, now: u64, + leaf_hash: Hash, ) -> Result<()> { let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = votes; @@ -461,7 +469,7 @@ impl Initialized { ); let session_info = &self .runtime_info - .get_session_info_by_index(ctx.sender(), fix_me, session) + .get_session_info_by_index(ctx.sender(), leaf_hash, session) .await? .session_info; From bdd43a52fc5853e90eb3c225e306195e1173378f Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 27 Mar 2023 16:10:13 +0300 Subject: [PATCH 05/40] Pass `Hash` to `handle_import_statements` to perform `RuntimeInfo` calls --- node/core/dispute-coordinator/src/initialized.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 8ab1d5d16198..1a2d93edc30b 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -440,6 +440,7 @@ impl Initialized { session, statements, now, + leaf_hash, ) .await?; match import_result { @@ -516,6 +517,7 @@ impl Initialized { session, statements, now, + leaf_hash, ) .await?; match import_result { @@ -557,6 +559,7 @@ impl Initialized { ?session, "DisputeCoordinatorMessage::ImportStatements" ); + let parent_hash = candidate_receipt.descriptor.relay_parent; let outcome = self .handle_import_statements( ctx, @@ -565,6 +568,7 @@ impl Initialized { session, statements, now, + parent_hash, // TODO: will the relay parent do the job? ) .await?; let report = move || match pending_confirmation { @@ -705,6 +709,7 @@ impl Initialized { session: SessionIndex, statements: Vec<(SignedDisputeStatement, ValidatorIndex)>, now: Timestamp, + leaf_hash: Hash, ) -> FatalResult { gum::trace!(target: LOG_TARGET, ?statements, "In handle import statements"); if self.session_is_ancient(session) { @@ -714,7 +719,7 @@ impl Initialized { let session_info = &self .runtime_info - .get_session_info_by_index(ctx.sender(), fix_me, session) + .get_session_info_by_index(ctx.sender(), leaf_hash, session) .await? .session_info; @@ -1197,6 +1202,7 @@ impl Initialized { } // Do import + let relay_parent = candidate_receipt.descriptor.relay_parent; //TODO: Again... is this okay? if !statements.is_empty() { match self .handle_import_statements( @@ -1206,6 +1212,7 @@ impl Initialized { session, statements, now, + relay_parent, ) .await? { @@ -1231,7 +1238,7 @@ impl Initialized { Ok(()) } - fn session_is_ancient(self, session_idx: SessionIndex) -> bool { + fn session_is_ancient(&self, session_idx: SessionIndex) -> bool { return session_idx < self.highest_session.saturating_sub(6 - 1) // TODO: use constant here } } From 1f16bc5239bbe56083bc66f431ab7cba27dc33ee Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 27 Mar 2023 20:33:57 +0300 Subject: [PATCH 06/40] remove todo comments --- node/core/dispute-coordinator/src/initialized.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 1a2d93edc30b..a89cc8ce3768 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -568,7 +568,7 @@ impl Initialized { session, statements, now, - parent_hash, // TODO: will the relay parent do the job? + parent_hash, ) .await?; let report = move || match pending_confirmation { @@ -1202,7 +1202,7 @@ impl Initialized { } // Do import - let relay_parent = candidate_receipt.descriptor.relay_parent; //TODO: Again... is this okay? + let relay_parent = candidate_receipt.descriptor.relay_parent; if !statements.is_empty() { match self .handle_import_statements( From 00d36556ff821f36e73daf48a281704095287d54 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 28 Mar 2023 15:29:24 +0300 Subject: [PATCH 07/40] Remove `error` from `Initialized` Rework new session handling code --- .../dispute-coordinator/src/initialized.rs | 77 ++++++------------- node/core/dispute-coordinator/src/lib.rs | 30 +------- 2 files changed, 24 insertions(+), 83 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index a89cc8ce3768..5ccc8cd0f525 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -36,10 +36,7 @@ use polkadot_node_subsystem::{ }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, }; -use polkadot_node_subsystem_util::{ - rolling_session_window::{SessionWindowUpdate, SessionsUnavailable}, - runtime::RuntimeInfo, -}; +use polkadot_node_subsystem_util::runtime::RuntimeInfo; use polkadot_primitives::{ BlockNumber, CandidateHash, CandidateReceipt, CompactStatement, DisputeStatement, DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, SessionInfo, @@ -47,7 +44,7 @@ use polkadot_primitives::{ }; use crate::{ - error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, + error::{log_error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, import::{CandidateEnvironment, CandidateVoteState}, is_potential_spam, metrics::Metrics, @@ -57,7 +54,7 @@ use crate::{ use super::{ backend::Backend, - db, make_dispute_message, + make_dispute_message, participation::{ self, Participation, ParticipationPriority, ParticipationRequest, ParticipationStatement, WorkerMessageReceiver, @@ -90,9 +87,6 @@ pub(crate) struct Initialized { scraper: ChainScraper, participation_receiver: WorkerMessageReceiver, metrics: Metrics, - // This tracks only rolling session window failures. - // It can be a `Vec` if the need to track more arises. - error: Option, } #[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)] @@ -119,7 +113,6 @@ impl Initialized { participation, participation_receiver, metrics, - error: None, } } @@ -288,37 +281,31 @@ impl Initialized { self.participation.process_active_leaves_update(ctx, &update).await?; if let Some(new_leaf) = update.activated { - match self - .rolling_session_window - .cache_session_info_for_head(ctx.sender(), new_leaf.hash) - .await - { + let session_idx = + self.runtime_info.get_session_index_for_child(ctx.sender(), new_leaf.hash).await; + + match session_idx { + Ok(session_idx) => { + // Dummy fetch to ensure the session is cached + let _ = self + .runtime_info + .get_session_info_by_index(ctx.sender(), new_leaf.hash, session_idx) + .await?; + + if session_idx > self.highest_session { + self.highest_session = session_idx; + self.spam_slots + .prune_old(session_idx.saturating_sub(SESSION_WINDOW_SIZE.get() - 1)); + } + }, Err(e) => { gum::warn!( target: LOG_TARGET, err = ?e, "Failed to update session cache for disputes", ); - self.error = Some(e); }, - Ok(SessionWindowUpdate::Advanced { - new_window_end: window_end, - new_window_start, - .. - }) => { - self.error = None; - let session = window_end; - if self.highest_session < session { - gum::trace!(target: LOG_TARGET, session, "Observed new session. Pruning"); - - self.highest_session = session; - - db::v1::note_earliest_session(overlay_db, new_window_start)?; - self.spam_slots.prune_old(new_window_start); - } - }, - Ok(SessionWindowUpdate::Unchanged) => {}, - }; + } gum::trace!( target: LOG_TARGET, @@ -587,9 +574,6 @@ impl Initialized { } }, DisputeCoordinatorMessage::RecentDisputes(tx) => { - // Return error if session information is missing. - self.ensure_available_session_info()?; - gum::trace!(target: LOG_TARGET, "Loading recent disputes from db"); let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { disputes @@ -603,9 +587,6 @@ impl Initialized { ); }, DisputeCoordinatorMessage::ActiveDisputes(tx) => { - // Return error if session information is missing. - self.ensure_available_session_info()?; - gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::ActiveDisputes"); let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { @@ -623,9 +604,6 @@ impl Initialized { ); }, DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => { - // Return error if session information is missing. - self.ensure_available_session_info()?; - gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::QueryCandidateVotes"); let mut query_output = Vec::new(); @@ -667,8 +645,6 @@ impl Initialized { block_descriptions, tx, } => { - // Return error if session information is missing. - self.ensure_available_session_info()?; gum::trace!( target: LOG_TARGET, "DisputeCoordinatorMessage::DetermineUndisputedChain" @@ -688,15 +664,6 @@ impl Initialized { Ok(Box::new(|| Ok(()))) } - // Helper function for checking subsystem errors in message processing. - fn ensure_available_session_info(&self) -> Result<()> { - if let Some(subsystem_error) = self.error.clone() { - return Err(Error::RollingSessionWindow(subsystem_error)) - } - - Ok(()) - } - // We use fatal result rather than result here. Reason being, We for example increase // spam slots in this function. If then the import fails for some non fatal and // unrelated reason, we should likely actually decrement previously incremented spam @@ -1239,7 +1206,7 @@ impl Initialized { } fn session_is_ancient(&self, session_idx: SessionIndex) -> bool { - return session_idx < self.highest_session.saturating_sub(6 - 1) // TODO: use constant here + return session_idx < self.highest_session.saturating_sub(SESSION_WINDOW_SIZE.get() - 1) } } diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 76b13299fc51..9afd324bb858 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -38,15 +38,11 @@ use polkadot_node_subsystem::{ messages::DisputeDistributionMessage, overseer, ActivatedLeaf, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, }; -use polkadot_node_subsystem_util::{ - database::Database, - rolling_session_window::{DatabaseParams, RollingSessionWindow}, - runtime::RuntimeInfo, -}; +use polkadot_node_subsystem_util::{database::Database, runtime::RuntimeInfo}; use polkadot_primitives::{DisputeStatement, ScrapedOnChainVotes, SessionInfo, ValidatorIndex}; use crate::{ - error::{FatalResult, JfyiError, Result}, + error::{FatalResult, Result}, metrics::Metrics, status::{get_active_with_status, SystemClock}, }; @@ -211,9 +207,6 @@ impl DisputeCoordinatorSubsystem { B: Backend + 'static, { loop { - let db_params = - DatabaseParams { db: self.store.clone(), db_column: self.config.col_session_data }; - let first_leaf = match wait_for_first_leaf(ctx).await { Ok(Some(activated_leaf)) => activated_leaf, Ok(None) => continue, @@ -385,25 +378,6 @@ impl DisputeCoordinatorSubsystem { } } -/// Wait for `ActiveLeavesUpdate` on startup, returns `None` if `Conclude` signal came first. -#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)] -async fn get_rolling_session_window( - ctx: &mut Context, - db_params: DatabaseParams, -) -> Result> { - if let Some(leaf) = { wait_for_first_leaf(ctx) }.await? { - let sender = ctx.sender().clone(); - Ok(Some(( - leaf.clone(), - RollingSessionWindow::new(sender, leaf.hash, db_params) - .await - .map_err(JfyiError::RollingSessionWindow)?, - ))) - } else { - Ok(None) - } -} - /// Wait for `ActiveLeavesUpdate`, returns `None` if `Conclude` signal came first. #[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)] async fn wait_for_first_leaf(ctx: &mut Context) -> Result> { From d9fa73e88f5d61b0ad1142cd3410b5fb912125fd Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 28 Mar 2023 22:23:56 +0300 Subject: [PATCH 08/40] Remove db code which is no longer used --- node/core/dispute-coordinator/src/backend.rs | 16 ------- node/core/dispute-coordinator/src/db/v1.rs | 49 +------------------- node/core/dispute-coordinator/src/lib.rs | 8 ---- 3 files changed, 2 insertions(+), 71 deletions(-) diff --git a/node/core/dispute-coordinator/src/backend.rs b/node/core/dispute-coordinator/src/backend.rs index d49ace492549..5e7d8dc8d38f 100644 --- a/node/core/dispute-coordinator/src/backend.rs +++ b/node/core/dispute-coordinator/src/backend.rs @@ -91,15 +91,6 @@ impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> { self.candidate_votes.is_empty() } - /// Load the earliest session, if any. - pub fn load_earliest_session(&self) -> FatalResult> { - if let Some(val) = self.earliest_session { - return Ok(Some(val)) - } - - self.inner.load_earliest_session() - } - /// Load the recent disputes, if any. pub fn load_recent_disputes(&self) -> FatalResult> { if let Some(val) = &self.recent_disputes { @@ -122,13 +113,6 @@ impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> { self.inner.load_candidate_votes(session, candidate_hash) } - /// Prepare a write to the "earliest session" field of the DB. - /// - /// Later calls to this function will override earlier ones. - pub fn write_earliest_session(&mut self, session: SessionIndex) { - self.earliest_session = Some(session); - } - /// Prepare a write to the recent disputes stored in the DB. /// /// Later calls to this function will override earlier ones. diff --git a/node/core/dispute-coordinator/src/db/v1.rs b/node/core/dispute-coordinator/src/db/v1.rs index c0f3c9925f1b..da087377bff9 100644 --- a/node/core/dispute-coordinator/src/db/v1.rs +++ b/node/core/dispute-coordinator/src/db/v1.rs @@ -19,7 +19,7 @@ use polkadot_node_primitives::DisputeStatus; use polkadot_node_subsystem_util::database::{DBTransaction, Database}; use polkadot_primitives::{ - CandidateHash, CandidateReceipt, Hash, InvalidDisputeStatementKind, SessionIndex, + CandidateHash, CandidateReceipt, InvalidDisputeStatementKind, SessionIndex, ValidDisputeStatementKind, ValidatorIndex, ValidatorSignature, }; @@ -28,7 +28,7 @@ use std::sync::Arc; use parity_scale_codec::{Decode, Encode}; use crate::{ - backend::{Backend, BackendWriteOp, OverlayedBackend}, + backend::{Backend, BackendWriteOp}, error::{FatalError, FatalResult}, metrics::Metrics, LOG_TARGET, @@ -309,51 +309,6 @@ pub(crate) fn load_recent_disputes( .map_err(|e| FatalError::DbReadFailed(e)) } -/// Maybe prune data in the DB based on the provided session index. -/// -/// This is intended to be called on every block, and as such will be used to populate the DB on -/// first launch. If the on-disk data does not need to be pruned, only a single storage read -/// will be performed. -/// -/// If one or more ancient sessions are pruned, all metadata on candidates within the ancient -/// session will be deleted. -pub(crate) fn note_earliest_session( - overlay_db: &mut OverlayedBackend<'_, impl Backend>, - new_earliest_session: SessionIndex, -) -> FatalResult<()> { - match overlay_db.load_earliest_session()? { - None => { - // First launch - write new-earliest. - overlay_db.write_earliest_session(new_earliest_session); - }, - Some(prev_earliest) if new_earliest_session > prev_earliest => { - // Prune all data in the outdated sessions. - overlay_db.write_earliest_session(new_earliest_session); - - // Clear recent disputes metadata. - { - let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default(); - - let lower_bound = (new_earliest_session, CandidateHash(Hash::repeat_byte(0x00))); - - let new_recent_disputes = recent_disputes.split_off(&lower_bound); - // Any remanining disputes are considered ancient and must be pruned. - let pruned_disputes = recent_disputes; - - if pruned_disputes.len() != 0 { - overlay_db.write_recent_disputes(new_recent_disputes); - // Note: Deleting old candidate votes is handled in `write` based on the earliest session. - } - } - }, - Some(_) => { - // nothing to do. - }, - } - - Ok(()) -} - /// Until what session votes have been cleaned up already. /// /// That is the db has already been purged of votes for sessions older than the returned diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 9afd324bb858..4f58d8df9d51 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -272,14 +272,6 @@ impl DisputeCoordinatorSubsystem { SpamSlots, ChainScraper, )> { - // TODO: this is supposed to be 'earliest session'. What it means in this context? - let session_idx_at_startup = runtime_info - .get_session_index_for_child(ctx.sender(), initial_head.hash) - .await?; - - // Prune obsolete disputes: - db::v1::note_earliest_session(overlay_db, session_idx_at_startup)?; - let now = clock.now(); let active_disputes = match overlay_db.load_recent_disputes() { From e34dbcf1bd857434c11c4623c4cbe6ea896b0428 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 28 Mar 2023 22:25:52 +0300 Subject: [PATCH 09/40] Update stale comment and remove unneeded type specification --- node/core/dispute-coordinator/src/initialized.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 5ccc8cd0f525..cb1fa22461e9 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -39,8 +39,8 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_util::runtime::RuntimeInfo; use polkadot_primitives::{ BlockNumber, CandidateHash, CandidateReceipt, CompactStatement, DisputeStatement, - DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, SessionInfo, - ValidDisputeStatementKind, ValidatorId, ValidatorIndex, + DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, ValidDisputeStatementKind, + ValidatorId, ValidatorIndex, }; use crate::{ @@ -118,7 +118,7 @@ impl Initialized { /// Run the initialized subsystem. /// - /// `initialization_data` is optional. It is passed on first start and is `None` on subsystem restarts. + /// `initial_data` is optional. It is passed on first start and is `None` on subsystem restarts. pub async fn run( mut self, mut ctx: Context, @@ -355,7 +355,7 @@ impl Initialized { for (candidate_receipt, backers) in backing_validators_per_candidate { // Obtain the session info, for sake of `ValidatorId`s let relay_parent = candidate_receipt.descriptor.relay_parent; - let session_info: &SessionInfo = &self + let session_info = &self .runtime_info .get_session_info_by_index(ctx.sender(), relay_parent, session) // TODO: is it correct to use `relay_parent` here?? .await? From 234581f36b5f8988c53e58103b27ee3ab8c76910 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 29 Mar 2023 14:19:56 +0300 Subject: [PATCH 10/40] Cache SessionInfo on startup --- .../dispute-coordinator/src/initialized.rs | 7 +-- node/core/dispute-coordinator/src/lib.rs | 61 ++++++++++++++++--- 2 files changed, 56 insertions(+), 12 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index cb1fa22461e9..f5fe44177553 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -26,8 +26,7 @@ use futures::{ use sc_keystore::LocalKeystore; use polkadot_node_primitives::{ - disputes::ValidCandidateVotes, new_session_window_size, CandidateVotes, DisputeStatus, - SessionWindowSize, SignedDisputeStatement, Timestamp, + disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement, Timestamp, }; use polkadot_node_subsystem::{ messages::{ @@ -49,7 +48,7 @@ use crate::{ is_potential_spam, metrics::Metrics, status::{get_active_with_status, Clock}, - DisputeCoordinatorSubsystem, LOG_TARGET, + DisputeCoordinatorSubsystem, LOG_TARGET, SESSION_WINDOW_SIZE, }; use super::{ @@ -64,8 +63,6 @@ use super::{ OverlayedBackend, }; -const SESSION_WINDOW_SIZE: SessionWindowSize = new_session_window_size!(6); - // Initial data for `dispute-coordinator`. It is provided only at first start. pub struct InitialData { pub participations: Vec<(ParticipationPriority, ParticipationRequest)>, diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 4f58d8df9d51..d71fc87937be 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -24,7 +24,7 @@ //! validation results as well as a sink for votes received by other subsystems. When importing a dispute vote from //! another node, this will trigger dispute participation to recover and validate the block. -use std::sync::Arc; +use std::{num::NonZeroUsize, sync::Arc}; use futures::FutureExt; @@ -32,13 +32,17 @@ use gum::CandidateHash; use sc_keystore::LocalKeystore; use polkadot_node_primitives::{ - CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement, + new_session_window_size, CandidateVotes, DisputeMessage, DisputeMessageCheckError, + SessionWindowSize, SignedDisputeStatement, }; use polkadot_node_subsystem::{ messages::DisputeDistributionMessage, overseer, ActivatedLeaf, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, }; -use polkadot_node_subsystem_util::{database::Database, runtime::RuntimeInfo}; +use polkadot_node_subsystem_util::{ + database::Database, + runtime::{Config as RuntimeInfoConfig, RuntimeInfo}, +}; use polkadot_primitives::{DisputeStatement, ScrapedOnChainVotes, SessionInfo, ValidatorIndex}; use crate::{ @@ -108,6 +112,8 @@ mod tests; pub(crate) const LOG_TARGET: &str = "parachain::dispute-coordinator"; +const SESSION_WINDOW_SIZE: SessionWindowSize = new_session_window_size!(6); + /// An implementation of the dispute coordinator subsystem. pub struct DisputeCoordinatorSubsystem { config: Config, @@ -216,8 +222,13 @@ impl DisputeCoordinatorSubsystem { }, }; - let mut runtime_info = RuntimeInfo::new(None); - // TODO: fill in the cache here? + // `RuntimeInfo` cache should match the dispute `SESSION_WINDOW_SIZE` so that we can + // keep all sessions for a dispute window + let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(SESSION_WINDOW_SIZE.get() as usize) + .expect("SESSION_WINDOW_SIZE can't be 0; qed."), + }); let mut overlay_db = OverlayedBackend::new(&mut backend); let (participations, votes, spam_slots, ordering_provider) = match self .handle_startup(ctx, first_leaf.clone(), &mut runtime_info, &mut overlay_db, clock) @@ -234,9 +245,45 @@ impl DisputeCoordinatorSubsystem { backend.write(ops)?; } - // We assume the highest session means in the new context + // We assume the highest session is the passed leaf. If we can't get the session index + // we can't initialize the subsystem so we'll wait for a new leaf let highest_session = - runtime_info.get_session_index_for_child(ctx.sender(), first_leaf.hash).await?; + match runtime_info.get_session_index_for_child(ctx.sender(), first_leaf.hash).await + { + Ok(session_idx) => session_idx, + Err(e) => { + gum::warn!( + target: LOG_TARGET, + leaf_hash = ?first_leaf.hash, + err = ?e, + "Can't get session index during subsystem initialization. Retrying." + ); + continue // the initialization loop + }, + }; + + // Cache the sessions. A failure to fetch a session here is not that critical so we + // won't abort the initialization + for idx in + highest_session.saturating_sub(SESSION_WINDOW_SIZE.get() - 1)..highest_session + { + match runtime_info + .get_session_info_by_index(ctx.sender(), first_leaf.hash, idx) + .await + { + Ok(_) => { /* do nothing */ }, + Err(e) => { + gum::warn!( + target: LOG_TARGET, + leaf_hash = ?first_leaf.hash, + session_idx = idx, + err = ?e, + "Can't cache SessionInfo during subsystem initialization. Skipping session." + ); + continue // the caching loop + }, + } + } return Ok(Some(( participations, From 7e2a5498c27b651b773064e7d2aca6ad8efd3826 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 29 Mar 2023 14:27:10 +0300 Subject: [PATCH 11/40] Use `DISPUTE_WINDOW` from primitives --- node/core/dispute-coordinator/src/initialized.rs | 9 +++++---- node/core/dispute-coordinator/src/lib.rs | 16 ++++++---------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index f5fe44177553..f54fa54bd16f 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -26,7 +26,8 @@ use futures::{ use sc_keystore::LocalKeystore; use polkadot_node_primitives::{ - disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement, Timestamp, + disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement, + Timestamp, DISPUTE_WINDOW, }; use polkadot_node_subsystem::{ messages::{ @@ -48,7 +49,7 @@ use crate::{ is_potential_spam, metrics::Metrics, status::{get_active_with_status, Clock}, - DisputeCoordinatorSubsystem, LOG_TARGET, SESSION_WINDOW_SIZE, + DisputeCoordinatorSubsystem, LOG_TARGET, }; use super::{ @@ -292,7 +293,7 @@ impl Initialized { if session_idx > self.highest_session { self.highest_session = session_idx; self.spam_slots - .prune_old(session_idx.saturating_sub(SESSION_WINDOW_SIZE.get() - 1)); + .prune_old(session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)); } }, Err(e) => { @@ -1203,7 +1204,7 @@ impl Initialized { } fn session_is_ancient(&self, session_idx: SessionIndex) -> bool { - return session_idx < self.highest_session.saturating_sub(SESSION_WINDOW_SIZE.get() - 1) + return session_idx < self.highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1) } } diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index d71fc87937be..f344ff4c8a23 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -32,8 +32,8 @@ use gum::CandidateHash; use sc_keystore::LocalKeystore; use polkadot_node_primitives::{ - new_session_window_size, CandidateVotes, DisputeMessage, DisputeMessageCheckError, - SessionWindowSize, SignedDisputeStatement, + CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement, + DISPUTE_WINDOW, }; use polkadot_node_subsystem::{ messages::DisputeDistributionMessage, overseer, ActivatedLeaf, FromOrchestra, OverseerSignal, @@ -112,8 +112,6 @@ mod tests; pub(crate) const LOG_TARGET: &str = "parachain::dispute-coordinator"; -const SESSION_WINDOW_SIZE: SessionWindowSize = new_session_window_size!(6); - /// An implementation of the dispute coordinator subsystem. pub struct DisputeCoordinatorSubsystem { config: Config, @@ -222,12 +220,12 @@ impl DisputeCoordinatorSubsystem { }, }; - // `RuntimeInfo` cache should match the dispute `SESSION_WINDOW_SIZE` so that we can + // `RuntimeInfo` cache should match `DISPUTE_WINDOW` so that we can // keep all sessions for a dispute window let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { keystore: None, - session_cache_lru_size: NonZeroUsize::new(SESSION_WINDOW_SIZE.get() as usize) - .expect("SESSION_WINDOW_SIZE can't be 0; qed."), + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), }); let mut overlay_db = OverlayedBackend::new(&mut backend); let (participations, votes, spam_slots, ordering_provider) = match self @@ -264,9 +262,7 @@ impl DisputeCoordinatorSubsystem { // Cache the sessions. A failure to fetch a session here is not that critical so we // won't abort the initialization - for idx in - highest_session.saturating_sub(SESSION_WINDOW_SIZE.get() - 1)..highest_session - { + for idx in highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1)..highest_session { match runtime_info .get_session_info_by_index(ctx.sender(), first_leaf.hash, idx) .await From 8f8b427b6acf2bd2582b6457d5b514c73dea4c33 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 29 Mar 2023 14:45:14 +0300 Subject: [PATCH 12/40] Fix caching in `process_active_leaves_update` --- .../dispute-coordinator/src/initialized.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index f54fa54bd16f..7c8df45d3ee0 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -284,11 +284,23 @@ impl Initialized { match session_idx { Ok(session_idx) => { - // Dummy fetch to ensure the session is cached - let _ = self + // Dummy fetch to ensure the session is cached. + match self .runtime_info .get_session_info_by_index(ctx.sender(), new_leaf.hash, session_idx) - .await?; + .await + { + Ok(_) => { /* do nothing */ }, + Err(e) => { + gum::warn!( + target: LOG_TARGET, + session_idx, + leaf_hash = ?new_leaf.hash, + err = ?e, + "Error caching SessionInfo on ActiveLeaves update" + ); + }, + } if session_idx > self.highest_session { self.highest_session = session_idx; From 903e3b2276ebac55722e675202af933ad364ac93 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 29 Mar 2023 15:05:57 +0300 Subject: [PATCH 13/40] handle_import_statements: leaf_hash -> block_hash --- node/core/dispute-coordinator/src/initialized.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 7c8df45d3ee0..1d6fabfbe4c6 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -686,7 +686,7 @@ impl Initialized { session: SessionIndex, statements: Vec<(SignedDisputeStatement, ValidatorIndex)>, now: Timestamp, - leaf_hash: Hash, + block_hash: Hash, ) -> FatalResult { gum::trace!(target: LOG_TARGET, ?statements, "In handle import statements"); if self.session_is_ancient(session) { @@ -696,7 +696,7 @@ impl Initialized { let session_info = &self .runtime_info - .get_session_info_by_index(ctx.sender(), leaf_hash, session) + .get_session_info_by_index(ctx.sender(), block_hash, session) .await? .session_info; From ab6a7cf03fca71d0ea0c2ff9140ac20144bfe6ce Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 29 Mar 2023 15:59:44 +0300 Subject: [PATCH 14/40] Restore `ensure_available_session_info` --- node/core/dispute-coordinator/src/error.rs | 6 ++-- .../dispute-coordinator/src/initialized.rs | 32 +++++++++++++++++-- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/node/core/dispute-coordinator/src/error.rs b/node/core/dispute-coordinator/src/error.rs index 7a059b8861d8..6fa31b1699ff 100644 --- a/node/core/dispute-coordinator/src/error.rs +++ b/node/core/dispute-coordinator/src/error.rs @@ -18,7 +18,7 @@ use fatality::Nested; use futures::channel::oneshot; use polkadot_node_subsystem::{errors::ChainApiError, SubsystemError}; -use polkadot_node_subsystem_util::{rolling_session_window::SessionsUnavailable, runtime}; +use polkadot_node_subsystem_util::runtime; use crate::{db, participation, LOG_TARGET}; use parity_scale_codec::Error as CodecError; @@ -96,8 +96,8 @@ pub enum Error { Codec(#[from] CodecError), /// `RollingSessionWindow` was not able to retrieve `SessionInfo`s. - #[error("Sessions unavailable in `RollingSessionWindow`: {0}")] - RollingSessionWindow(#[from] SessionsUnavailable), + #[error("Session can't be fetched via `RuntimeInfo`")] + SessionInfo, #[error(transparent)] QueueError(#[from] participation::QueueError), diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 1d6fabfbe4c6..20f48abe3fe2 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -36,7 +36,7 @@ use polkadot_node_subsystem::{ }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, }; -use polkadot_node_subsystem_util::runtime::RuntimeInfo; +use polkadot_node_subsystem_util::runtime::{Error as RuntimeError, RuntimeInfo}; use polkadot_primitives::{ BlockNumber, CandidateHash, CandidateReceipt, CompactStatement, DisputeStatement, DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, ValidDisputeStatementKind, @@ -44,7 +44,7 @@ use polkadot_primitives::{ }; use crate::{ - error::{log_error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, + error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, import::{CandidateEnvironment, CandidateVoteState}, is_potential_spam, metrics::Metrics, @@ -85,6 +85,9 @@ pub(crate) struct Initialized { scraper: ChainScraper, participation_receiver: WorkerMessageReceiver, metrics: Metrics, + // This tracks only rolling session window failures. + // It can be a `Vec` if the need to track more arises. + error: Option, } #[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)] @@ -111,6 +114,7 @@ impl Initialized { participation, participation_receiver, metrics, + error: None, } } @@ -290,7 +294,9 @@ impl Initialized { .get_session_info_by_index(ctx.sender(), new_leaf.hash, session_idx) .await { - Ok(_) => { /* do nothing */ }, + Ok(_) => { + self.error = None; + }, Err(e) => { gum::warn!( target: LOG_TARGET, @@ -299,6 +305,7 @@ impl Initialized { err = ?e, "Error caching SessionInfo on ActiveLeaves update" ); + self.error = Some(e); }, } @@ -584,6 +591,9 @@ impl Initialized { } }, DisputeCoordinatorMessage::RecentDisputes(tx) => { + // Return error if session information is missing. + self.ensure_available_session_info()?; + gum::trace!(target: LOG_TARGET, "Loading recent disputes from db"); let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { disputes @@ -597,6 +607,9 @@ impl Initialized { ); }, DisputeCoordinatorMessage::ActiveDisputes(tx) => { + // Return error if session information is missing. + self.ensure_available_session_info()?; + gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::ActiveDisputes"); let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { @@ -614,6 +627,9 @@ impl Initialized { ); }, DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => { + // Return error if session information is missing. + self.ensure_available_session_info()?; + gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::QueryCandidateVotes"); let mut query_output = Vec::new(); @@ -655,6 +671,8 @@ impl Initialized { block_descriptions, tx, } => { + // Return error if session information is missing. + self.ensure_available_session_info()?; gum::trace!( target: LOG_TARGET, "DisputeCoordinatorMessage::DetermineUndisputedChain" @@ -674,6 +692,14 @@ impl Initialized { Ok(Box::new(|| Ok(()))) } + // Helper function for checking subsystem errors in message processing. + fn ensure_available_session_info(&self) -> Result<()> { + match self.error { + Some(_) => Err(Error::SessionInfo), + None => Ok(()), + } + } + // We use fatal result rather than result here. Reason being, We for example increase // spam slots in this function. If then the import fails for some non fatal and // unrelated reason, we should likely actually decrement previously incremented spam From 493f006f49ebbe1fbda6a8317ba2bdba853d2160 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 29 Mar 2023 16:08:40 +0300 Subject: [PATCH 15/40] Don't interrupt `process_on_chain_votes` if SessionInfo can't be fetched --- .../dispute-coordinator/src/initialized.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 20f48abe3fe2..9e4a598c3ea9 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -472,11 +472,23 @@ impl Initialized { ?session, "Importing dispute votes from chain for candidate" ); - let session_info = &self + let session_info = match self .runtime_info .get_session_info_by_index(ctx.sender(), leaf_hash, session) - .await? - .session_info; + .await + { + Ok(extended_session_info) => &extended_session_info.session_info, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + ?candidate_hash, + ?session, + ?err, + "Could not retrieve session info for recently concluded dispute" + ); + continue + }, + }; let statements = statements .into_iter() From 4212016aea439ca5ea429d1dd28cb8cfb1e9ade0 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 29 Mar 2023 16:12:52 +0300 Subject: [PATCH 16/40] Small style improvements in logging --- node/core/dispute-coordinator/src/initialized.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 9e4a598c3ea9..9448482c2b5a 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -297,15 +297,15 @@ impl Initialized { Ok(_) => { self.error = None; }, - Err(e) => { + Err(err) => { gum::warn!( target: LOG_TARGET, session_idx, leaf_hash = ?new_leaf.hash, - err = ?e, + ?err, "Error caching SessionInfo on ActiveLeaves update" ); - self.error = Some(e); + self.error = Some(err); }, } @@ -315,10 +315,10 @@ impl Initialized { .prune_old(session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)); } }, - Err(e) => { + Err(err) => { gum::warn!( target: LOG_TARGET, - err = ?e, + ?err, "Failed to update session cache for disputes", ); }, @@ -1192,10 +1192,10 @@ impl Initialized { statements.push((signed_dispute_statement, *index)); }, Ok(None) => {}, - Err(e) => { + Err(err) => { gum::error!( target: LOG_TARGET, - err = ?e, + ?err, "Encountered keystore error while signing dispute statement", ); }, From 16d4647a5fff28594931f81cf866edc125bd9c98 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 29 Mar 2023 16:17:10 +0300 Subject: [PATCH 17/40] process_on_chain_votes: leaf_hash -> block_hash --- node/core/dispute-coordinator/src/initialized.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 9448482c2b5a..45f99244f439 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -359,7 +359,7 @@ impl Initialized { overlay_db: &mut OverlayedBackend<'_, impl Backend>, votes: ScrapedOnChainVotes, now: u64, - leaf_hash: Hash, + block_hash: Hash, ) -> Result<()> { let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = votes; @@ -444,7 +444,7 @@ impl Initialized { session, statements, now, - leaf_hash, + block_hash, ) .await?; match import_result { @@ -474,7 +474,7 @@ impl Initialized { ); let session_info = match self .runtime_info - .get_session_info_by_index(ctx.sender(), leaf_hash, session) + .get_session_info_by_index(ctx.sender(), block_hash, session) .await { Ok(extended_session_info) => &extended_session_info.session_info, @@ -533,7 +533,7 @@ impl Initialized { session, statements, now, - leaf_hash, + block_hash, ) .await?; match import_result { From c2775a6d1e2c2931c14e8c049673bda2a731b695 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Thu, 30 Mar 2023 15:23:45 +0300 Subject: [PATCH 18/40] Restore `note_earliest_session` - it is required to prune disputes and votes --- node/core/dispute-coordinator/src/backend.rs | 16 ++++++ node/core/dispute-coordinator/src/db/v1.rs | 49 ++++++++++++++++++- .../dispute-coordinator/src/initialized.rs | 5 ++ node/core/dispute-coordinator/src/lib.rs | 12 ++++- 4 files changed, 79 insertions(+), 3 deletions(-) diff --git a/node/core/dispute-coordinator/src/backend.rs b/node/core/dispute-coordinator/src/backend.rs index 5e7d8dc8d38f..d49ace492549 100644 --- a/node/core/dispute-coordinator/src/backend.rs +++ b/node/core/dispute-coordinator/src/backend.rs @@ -91,6 +91,15 @@ impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> { self.candidate_votes.is_empty() } + /// Load the earliest session, if any. + pub fn load_earliest_session(&self) -> FatalResult> { + if let Some(val) = self.earliest_session { + return Ok(Some(val)) + } + + self.inner.load_earliest_session() + } + /// Load the recent disputes, if any. pub fn load_recent_disputes(&self) -> FatalResult> { if let Some(val) = &self.recent_disputes { @@ -113,6 +122,13 @@ impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> { self.inner.load_candidate_votes(session, candidate_hash) } + /// Prepare a write to the "earliest session" field of the DB. + /// + /// Later calls to this function will override earlier ones. + pub fn write_earliest_session(&mut self, session: SessionIndex) { + self.earliest_session = Some(session); + } + /// Prepare a write to the recent disputes stored in the DB. /// /// Later calls to this function will override earlier ones. diff --git a/node/core/dispute-coordinator/src/db/v1.rs b/node/core/dispute-coordinator/src/db/v1.rs index da087377bff9..c0f3c9925f1b 100644 --- a/node/core/dispute-coordinator/src/db/v1.rs +++ b/node/core/dispute-coordinator/src/db/v1.rs @@ -19,7 +19,7 @@ use polkadot_node_primitives::DisputeStatus; use polkadot_node_subsystem_util::database::{DBTransaction, Database}; use polkadot_primitives::{ - CandidateHash, CandidateReceipt, InvalidDisputeStatementKind, SessionIndex, + CandidateHash, CandidateReceipt, Hash, InvalidDisputeStatementKind, SessionIndex, ValidDisputeStatementKind, ValidatorIndex, ValidatorSignature, }; @@ -28,7 +28,7 @@ use std::sync::Arc; use parity_scale_codec::{Decode, Encode}; use crate::{ - backend::{Backend, BackendWriteOp}, + backend::{Backend, BackendWriteOp, OverlayedBackend}, error::{FatalError, FatalResult}, metrics::Metrics, LOG_TARGET, @@ -309,6 +309,51 @@ pub(crate) fn load_recent_disputes( .map_err(|e| FatalError::DbReadFailed(e)) } +/// Maybe prune data in the DB based on the provided session index. +/// +/// This is intended to be called on every block, and as such will be used to populate the DB on +/// first launch. If the on-disk data does not need to be pruned, only a single storage read +/// will be performed. +/// +/// If one or more ancient sessions are pruned, all metadata on candidates within the ancient +/// session will be deleted. +pub(crate) fn note_earliest_session( + overlay_db: &mut OverlayedBackend<'_, impl Backend>, + new_earliest_session: SessionIndex, +) -> FatalResult<()> { + match overlay_db.load_earliest_session()? { + None => { + // First launch - write new-earliest. + overlay_db.write_earliest_session(new_earliest_session); + }, + Some(prev_earliest) if new_earliest_session > prev_earliest => { + // Prune all data in the outdated sessions. + overlay_db.write_earliest_session(new_earliest_session); + + // Clear recent disputes metadata. + { + let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default(); + + let lower_bound = (new_earliest_session, CandidateHash(Hash::repeat_byte(0x00))); + + let new_recent_disputes = recent_disputes.split_off(&lower_bound); + // Any remanining disputes are considered ancient and must be pruned. + let pruned_disputes = recent_disputes; + + if pruned_disputes.len() != 0 { + overlay_db.write_recent_disputes(new_recent_disputes); + // Note: Deleting old candidate votes is handled in `write` based on the earliest session. + } + } + }, + Some(_) => { + // nothing to do. + }, + } + + Ok(()) +} + /// Until what session votes have been cleaned up already. /// /// That is the db has already been purged of votes for sessions older than the returned diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 45f99244f439..144a6bb8eb87 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -44,6 +44,7 @@ use polkadot_primitives::{ }; use crate::{ + db, error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, import::{CandidateEnvironment, CandidateVoteState}, is_potential_spam, @@ -311,6 +312,10 @@ impl Initialized { if session_idx > self.highest_session { self.highest_session = session_idx; + db::v1::note_earliest_session( + overlay_db, + session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1), + )?; self.spam_slots .prune_old(session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)); } diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index f344ff4c8a23..dcd9a510d3ea 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -328,10 +328,20 @@ impl DisputeCoordinatorSubsystem { }, }; + let session_idx = runtime_info + .get_session_index_for_child(ctx.sender(), initial_head.hash) + .await?; let session_info = &runtime_info - .get_session_info(ctx.sender(), initial_head.hash) + .get_session_info_by_index(ctx.sender(), initial_head.hash, session_idx) .await? .session_info; + + // Prune obsolete disputes: + db::v1::note_earliest_session( + overlay_db, + session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1), + )?; + let mut participation_requests = Vec::new(); let mut spam_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new(); let (scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?; From c027da372eb1b8aa09f7fcc47624d8a061226f4c Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 31 Mar 2023 16:01:14 +0300 Subject: [PATCH 19/40] Cache new sessions only when there is an actual session change --- .../dispute-coordinator/src/initialized.rs | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 144a6bb8eb87..74d3cc97ac4c 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -289,28 +289,30 @@ impl Initialized { match session_idx { Ok(session_idx) => { - // Dummy fetch to ensure the session is cached. - match self - .runtime_info - .get_session_info_by_index(ctx.sender(), new_leaf.hash, session_idx) - .await - { - Ok(_) => { - self.error = None; - }, - Err(err) => { - gum::warn!( - target: LOG_TARGET, - session_idx, - leaf_hash = ?new_leaf.hash, - ?err, - "Error caching SessionInfo on ActiveLeaves update" - ); - self.error = Some(err); - }, - } - if session_idx > self.highest_session { + // Dummy fetch to ensure the sessions are cached. + for idx in self.highest_session + 1..=session_idx { + match self + .runtime_info + .get_session_info_by_index(ctx.sender(), new_leaf.hash, idx) + .await + { + Ok(_) => { + self.error = None; + }, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + session_idx, + leaf_hash = ?new_leaf.hash, + ?err, + "Error caching SessionInfo on ActiveLeaves update" + ); + self.error = Some(err); + }, + } + } + self.highest_session = session_idx; db::v1::note_earliest_session( overlay_db, From 5ad43195145f60352187f6b517f6467d81f2f89e Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 31 Mar 2023 16:01:36 +0300 Subject: [PATCH 20/40] Fix tests --- node/core/dispute-coordinator/src/tests.rs | 68 +++++----------------- 1 file changed, 14 insertions(+), 54 deletions(-) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 8d72ed79100d..5036ddc320dd 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -281,14 +281,8 @@ impl TestState { ))) .await; - self.handle_sync_queries( - virtual_overseer, - block_hash, - block_number, - session, - candidate_events, - ) - .await; + self.handle_sync_queries(virtual_overseer, block_hash, session, candidate_events) + .await; } /// Returns any sent `DisputeMessage`s. @@ -296,7 +290,6 @@ impl TestState { &mut self, virtual_overseer: &mut VirtualOverseer, block_hash: Hash, - block_number: BlockNumber, session: SessionIndex, candidate_events: Vec, ) -> Vec { @@ -336,59 +329,20 @@ impl TestState { // Queries for fetching earliest unfinalized block session. See `RollingSessionWindow`. if self.known_session.is_none() { - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - s_tx, - )) => { - let _ = s_tx.send(Ok(block_number)); - } - ); - - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( - number, - s_tx, - )) => { - assert_eq!(block_number, number); - let _ = s_tx.send(Ok(Some(block_hash))); - } - ); - assert_matches!( overseer_recv(virtual_overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( h, - RuntimeApiRequest::SessionIndexForChild(s_tx), + RuntimeApiRequest::SessionInfo(session_index, tx), )) => { assert_eq!(h, block_hash); - let _ = s_tx.send(Ok(session)); + assert_eq!(session_index, 1); // the session of the first leaf + let _ = tx.send(Ok(Some(self.session_info()))); } ); } - // No queries, if subsystem knows about this session already. - if self.known_session == Some(session) { - continue - } self.known_session = Some(session); - - loop { - // answer session info queries until the current session is reached. - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionInfo(session_index, tx), - )) => { - assert_eq!(h, block_hash); - - let _ = tx.send(Ok(Some(self.session_info()))); - if session_index == session { break } - } - ); - } }, AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(tx)) => { assert!( @@ -439,6 +393,13 @@ impl TestState { } let _ = response_channel.send(Ok(response)); }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionInfo(session_index, tx), + )) => { + assert_eq!(session_index, 0); // caching session 0 + let _ = tx.send(Ok(Some(self.session_info()))); + }, msg => { panic!("Received unexpected message in `handle_sync_queries`: {:?}", msg); }, @@ -481,9 +442,8 @@ impl TestState { let events = if n == 1 { std::mem::take(&mut initial_events) } else { Vec::new() }; - let mut new_messages = self - .handle_sync_queries(virtual_overseer, *leaf, n as BlockNumber, session, events) - .await; + let mut new_messages = + self.handle_sync_queries(virtual_overseer, *leaf, session, events).await; messages.append(&mut new_messages); } messages From 5b0c103647f7e0e2e1acc8ac68fa34beeb332906 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Sat, 1 Apr 2023 14:31:55 +0300 Subject: [PATCH 21/40] `CandidateEnvironment::new` gets `session_idx` and fetches SessionInfo by itself to avoid the invariant where the input SessionIndex and SessionInfo parameters don't match --- node/core/dispute-coordinator/src/import.rs | 26 ++++++++++---- .../dispute-coordinator/src/initialized.rs | 36 ++++++++++--------- node/core/dispute-coordinator/src/lib.rs | 15 +++++--- 3 files changed, 49 insertions(+), 28 deletions(-) diff --git a/node/core/dispute-coordinator/src/import.rs b/node/core/dispute-coordinator/src/import.rs index 4097c8c2c683..4853702416ab 100644 --- a/node/core/dispute-coordinator/src/import.rs +++ b/node/core/dispute-coordinator/src/import.rs @@ -31,8 +31,10 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use polkadot_node_primitives::{ disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement, Timestamp, }; +use polkadot_node_subsystem::overseer; +use polkadot_node_subsystem_util::runtime::RuntimeInfo; use polkadot_primitives::{ - CandidateReceipt, DisputeStatement, IndexedVec, SessionIndex, SessionInfo, + CandidateReceipt, DisputeStatement, Hash, IndexedVec, SessionIndex, SessionInfo, ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature, }; use sc_keystore::LocalKeystore; @@ -49,17 +51,29 @@ pub struct CandidateEnvironment<'a> { controlled_indices: HashSet, } +#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)] impl<'a> CandidateEnvironment<'a> { /// Create `CandidateEnvironment`. /// /// Return: `None` in case session is outside of session window. - pub fn new( + pub async fn new( keystore: &LocalKeystore, - session: &'a SessionInfo, + ctx: &mut Context, + runtime_info: &'a mut RuntimeInfo, session_index: SessionIndex, - ) -> Option { - let controlled_indices = find_controlled_validator_indices(keystore, &session.validators); - Some(Self { session_index, session, controlled_indices }) + relay_parent: Hash, + ) -> Option> { + let session_info = match runtime_info + .get_session_info_by_index(ctx.sender(), relay_parent, session_index) + .await + { + Ok(extended_session_info) => &extended_session_info.session_info, + Err(_) => return None, + }; + + let controlled_indices = + find_controlled_validator_indices(keystore, &session_info.validators); + Some(Self { session_index, session: session_info, controlled_indices }) } /// Validators in the candidate's session. diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 74d3cc97ac4c..4e9a0d358dca 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -739,13 +739,15 @@ impl Initialized { return Ok(ImportStatementsResult::InvalidImport) } - let session_info = &self - .runtime_info - .get_session_info_by_index(ctx.sender(), block_hash, session) - .await? - .session_info; - - let env = match CandidateEnvironment::new(&self.keystore, session_info, session) { + let env = match CandidateEnvironment::new( + &self.keystore, + ctx, + &mut self.runtime_info, + session, + block_hash, + ) + .await + { None => { gum::warn!( target: LOG_TARGET, @@ -1139,17 +1141,17 @@ impl Initialized { ?now, "Issuing local statement for candidate!" ); - let session_info = &self - .runtime_info - .get_session_info_by_index( - ctx.sender(), - candidate_receipt.descriptor.relay_parent, - session, - ) - .await? - .session_info; + // Load environment: - let env = match CandidateEnvironment::new(&self.keystore, session_info, session) { + let env = match CandidateEnvironment::new( + &self.keystore, + ctx, + &mut self.runtime_info, + session, + candidate_receipt.descriptor.relay_parent, + ) + .await + { None => { gum::warn!( target: LOG_TARGET, diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index dcd9a510d3ea..bb51413fb878 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -331,10 +331,6 @@ impl DisputeCoordinatorSubsystem { let session_idx = runtime_info .get_session_index_for_child(ctx.sender(), initial_head.hash) .await?; - let session_info = &runtime_info - .get_session_info_by_index(ctx.sender(), initial_head.hash, session_idx) - .await? - .session_info; // Prune obsolete disputes: db::v1::note_earliest_session( @@ -344,9 +340,18 @@ impl DisputeCoordinatorSubsystem { let mut participation_requests = Vec::new(); let mut spam_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new(); + let leaf_hash = initial_head.hash; let (scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?; for ((session, ref candidate_hash), _) in active_disputes { - let env = match CandidateEnvironment::new(&self.keystore, &session_info, session) { + let env = match CandidateEnvironment::new( + &self.keystore, + ctx, + runtime_info, + session_idx, + leaf_hash, + ) + .await + { None => { gum::warn!( target: LOG_TARGET, From 41be7ae16791182d302c7c3ebdb61327340434b5 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Sat, 1 Apr 2023 15:00:36 +0300 Subject: [PATCH 22/40] Fix handling of missing session info --- .../dispute-coordinator/src/initialized.rs | 81 ++++++++++--------- 1 file changed, 45 insertions(+), 36 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 4e9a0d358dca..58d8dce9f2b3 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -288,45 +288,43 @@ impl Initialized { self.runtime_info.get_session_index_for_child(ctx.sender(), new_leaf.hash).await; match session_idx { - Ok(session_idx) => { - if session_idx > self.highest_session { - // Dummy fetch to ensure the sessions are cached. - for idx in self.highest_session + 1..=session_idx { - match self - .runtime_info - .get_session_info_by_index(ctx.sender(), new_leaf.hash, idx) - .await - { - Ok(_) => { - self.error = None; - }, - Err(err) => { - gum::warn!( - target: LOG_TARGET, - session_idx, - leaf_hash = ?new_leaf.hash, - ?err, - "Error caching SessionInfo on ActiveLeaves update" - ); - self.error = Some(err); - }, - } + Ok(session_idx) if session_idx > self.highest_session => { + // There is a new sesion. Perform a dummy fetch to cache it. + for idx in self.highest_session + 1..=session_idx { + match self + .runtime_info + .get_session_info_by_index(ctx.sender(), new_leaf.hash, idx) + .await + { + Ok(_) => { + self.error = None; + }, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + session_idx, + leaf_hash = ?new_leaf.hash, + ?err, + "Error caching SessionInfo on ActiveLeaves update" + ); + self.error = Some(err); + }, } - - self.highest_session = session_idx; - db::v1::note_earliest_session( - overlay_db, - session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1), - )?; - self.spam_slots - .prune_old(session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)); } + + self.highest_session = session_idx; + db::v1::note_earliest_session( + overlay_db, + session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1), + )?; + self.spam_slots.prune_old(session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)); }, + Ok(_) => { /* no new session => nothing to cache */ }, Err(err) => { gum::warn!( target: LOG_TARGET, ?err, - "Failed to update session cache for disputes", + "Failed to update session cache for disputes - can't fetch session index", ); }, } @@ -379,11 +377,22 @@ impl Initialized { for (candidate_receipt, backers) in backing_validators_per_candidate { // Obtain the session info, for sake of `ValidatorId`s let relay_parent = candidate_receipt.descriptor.relay_parent; - let session_info = &self + let session_info = match self .runtime_info - .get_session_info_by_index(ctx.sender(), relay_parent, session) // TODO: is it correct to use `relay_parent` here?? - .await? - .session_info; + .get_session_info_by_index(ctx.sender(), relay_parent, session) + .await + { + Ok(extended_session_info) => &extended_session_info.session_info, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + ?session, + ?err, + "Could not retrieve session info from RuntimeInfo", + ); + return Ok(()) + }, + }; let candidate_hash = candidate_receipt.hash(); gum::trace!( From eb01fe523d160f5a2a8a4e76badbf0478daded0c Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Sat, 1 Apr 2023 22:05:43 +0300 Subject: [PATCH 23/40] Move sessions caching in `handle_startup` and fix tests --- node/core/dispute-coordinator/src/lib.rs | 82 ++++++++++------------ node/core/dispute-coordinator/src/tests.rs | 33 ++++----- 2 files changed, 52 insertions(+), 63 deletions(-) diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index bb51413fb878..eb96be5172be 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -43,7 +43,9 @@ use polkadot_node_subsystem_util::{ database::Database, runtime::{Config as RuntimeInfoConfig, RuntimeInfo}, }; -use polkadot_primitives::{DisputeStatement, ScrapedOnChainVotes, SessionInfo, ValidatorIndex}; +use polkadot_primitives::{ + DisputeStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidatorIndex, +}; use crate::{ error::{FatalResult, Result}, @@ -228,7 +230,7 @@ impl DisputeCoordinatorSubsystem { .expect("DISPUTE_WINDOW can't be 0; qed."), }); let mut overlay_db = OverlayedBackend::new(&mut backend); - let (participations, votes, spam_slots, ordering_provider) = match self + let (participations, votes, spam_slots, ordering_provider, highest_session) = match self .handle_startup(ctx, first_leaf.clone(), &mut runtime_info, &mut overlay_db, clock) .await { @@ -243,44 +245,6 @@ impl DisputeCoordinatorSubsystem { backend.write(ops)?; } - // We assume the highest session is the passed leaf. If we can't get the session index - // we can't initialize the subsystem so we'll wait for a new leaf - let highest_session = - match runtime_info.get_session_index_for_child(ctx.sender(), first_leaf.hash).await - { - Ok(session_idx) => session_idx, - Err(e) => { - gum::warn!( - target: LOG_TARGET, - leaf_hash = ?first_leaf.hash, - err = ?e, - "Can't get session index during subsystem initialization. Retrying." - ); - continue // the initialization loop - }, - }; - - // Cache the sessions. A failure to fetch a session here is not that critical so we - // won't abort the initialization - for idx in highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1)..highest_session { - match runtime_info - .get_session_info_by_index(ctx.sender(), first_leaf.hash, idx) - .await - { - Ok(_) => { /* do nothing */ }, - Err(e) => { - gum::warn!( - target: LOG_TARGET, - leaf_hash = ?first_leaf.hash, - session_idx = idx, - err = ?e, - "Can't cache SessionInfo during subsystem initialization. Skipping session." - ); - continue // the caching loop - }, - } - } - return Ok(Some(( participations, votes, @@ -314,6 +278,7 @@ impl DisputeCoordinatorSubsystem { Vec, SpamSlots, ChainScraper, + SessionIndex, )> { let now = clock.now(); @@ -328,14 +293,37 @@ impl DisputeCoordinatorSubsystem { }, }; - let session_idx = runtime_info + // We assume the highest session is the passed leaf. If we can't get the session index + // we can't initialize the subsystem so we'll wait for a new leaf + let highest_session = runtime_info .get_session_index_for_child(ctx.sender(), initial_head.hash) .await?; + // Cache the sessions. A failure to fetch a session here is not that critical so we + // won't abort the initialization + for idx in highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1)..=highest_session { + match runtime_info + .get_session_info_by_index(ctx.sender(), initial_head.hash, idx) + .await + { + Ok(_) => { /* do nothing */ }, + Err(e) => { + gum::warn!( + target: LOG_TARGET, + leaf_hash = ?initial_head.hash, + session_idx = idx, + err = ?e, + "Can't cache SessionInfo during subsystem initialization. Skipping session." + ); + continue + }, + } + } + // Prune obsolete disputes: db::v1::note_earliest_session( overlay_db, - session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1), + highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1), )?; let mut participation_requests = Vec::new(); @@ -347,7 +335,7 @@ impl DisputeCoordinatorSubsystem { &self.keystore, ctx, runtime_info, - session_idx, + highest_session, leaf_hash, ) .await @@ -424,7 +412,13 @@ impl DisputeCoordinatorSubsystem { } } - Ok((participation_requests, votes, SpamSlots::recover_from_state(spam_disputes), scraper)) + Ok(( + participation_requests, + votes, + SpamSlots::recover_from_state(spam_disputes), + scraper, + highest_session, + )) } } diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 5036ddc320dd..66cf63586985 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -327,19 +327,21 @@ impl TestState { assert_eq!(h, block_hash); let _ = tx.send(Ok(session)); - // Queries for fetching earliest unfinalized block session. See `RollingSessionWindow`. + // Queries for session caching - see `handle_startup` if self.known_session.is_none() { - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionInfo(session_index, tx), - )) => { - assert_eq!(h, block_hash); - assert_eq!(session_index, 1); // the session of the first leaf - let _ = tx.send(Ok(Some(self.session_info()))); - } - ); + for i in 0..=session { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionInfo(session_index, tx), + )) => { + assert_eq!(h, block_hash); + assert_eq!(session_index, i); + let _ = tx.send(Ok(Some(self.session_info()))); + } + ); + } } self.known_session = Some(session); @@ -393,13 +395,6 @@ impl TestState { } let _ = response_channel.send(Ok(response)); }, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _, - RuntimeApiRequest::SessionInfo(session_index, tx), - )) => { - assert_eq!(session_index, 0); // caching session 0 - let _ = tx.send(Ok(Some(self.session_info()))); - }, msg => { panic!("Received unexpected message in `handle_sync_queries`: {:?}", msg); }, From 3581e34699a3f3cf22553d5ab9e3796828494d26 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 3 Apr 2023 14:23:50 +0300 Subject: [PATCH 24/40] Load `relay_parent` from db in `handle_import_statements` instead of passing it as a parameter via two functions --- .../dispute-coordinator/src/initialized.rs | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 58d8dce9f2b3..c5d7111e21df 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -460,7 +460,6 @@ impl Initialized { session, statements, now, - block_hash, ) .await?; match import_result { @@ -549,7 +548,6 @@ impl Initialized { session, statements, now, - block_hash, ) .await?; match import_result { @@ -591,7 +589,6 @@ impl Initialized { ?session, "DisputeCoordinatorMessage::ImportStatements" ); - let parent_hash = candidate_receipt.descriptor.relay_parent; let outcome = self .handle_import_statements( ctx, @@ -600,7 +597,6 @@ impl Initialized { session, statements, now, - parent_hash, ) .await?; let report = move || match pending_confirmation { @@ -740,7 +736,6 @@ impl Initialized { session: SessionIndex, statements: Vec<(SignedDisputeStatement, ValidatorIndex)>, now: Timestamp, - block_hash: Hash, ) -> FatalResult { gum::trace!(target: LOG_TARGET, ?statements, "In handle import statements"); if self.session_is_ancient(session) { @@ -748,12 +743,31 @@ impl Initialized { return Ok(ImportStatementsResult::InvalidImport) } + let relay_parent = match &candidate_receipt { + MaybeCandidateReceipt::Provides(candidate_receipt) => + candidate_receipt.descriptor().relay_parent, + MaybeCandidateReceipt::AssumeBackingVotePresent(candidate_hash) => { + match overlay_db.load_candidate_votes(session, &candidate_hash)? { + Some(votes) => votes.candidate_receipt.descriptor().relay_parent, + None => { + gum::warn!( + target: LOG_TARGET, + session, + ?candidate_hash, + "Cannot import votes, without `CandidateReceipt` available!" + ); + return Ok(ImportStatementsResult::InvalidImport) + }, + } + }, + }; + let env = match CandidateEnvironment::new( &self.keystore, ctx, &mut self.runtime_info, session, - block_hash, + relay_parent, ) .await { @@ -1235,7 +1249,6 @@ impl Initialized { } // Do import - let relay_parent = candidate_receipt.descriptor.relay_parent; if !statements.is_empty() { match self .handle_import_statements( @@ -1245,7 +1258,6 @@ impl Initialized { session, statements, now, - relay_parent, ) .await? { From dbae23c9451939882449952e9cd4172b5dd5d70a Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Mon, 3 Apr 2023 14:52:20 +0300 Subject: [PATCH 25/40] Don't do two db reads --- .../dispute-coordinator/src/initialized.rs | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index c5d7111e21df..915acbf0deb1 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -743,12 +743,19 @@ impl Initialized { return Ok(ImportStatementsResult::InvalidImport) } - let relay_parent = match &candidate_receipt { + // At this point we should either have a `CandidateReceipt` or have the relay parent stored + // in the database. It's extracted here so that we can obtain `SessionInfo` from the runtime. + // In case we do query the database we'll save the result in `maybe_votes_in_db` which is + // `Option>`. We need nested option to differentiate later if the db + // call was made and it returned `None` (`Some(None)`/`Some(Some(votes))`) or the db call was + // not made at all (`None`). + let (relay_parent, maybe_votes_in_db) = match &candidate_receipt { MaybeCandidateReceipt::Provides(candidate_receipt) => - candidate_receipt.descriptor().relay_parent, + (candidate_receipt.descriptor().relay_parent, None), MaybeCandidateReceipt::AssumeBackingVotePresent(candidate_hash) => { match overlay_db.load_candidate_votes(session, &candidate_hash)? { - Some(votes) => votes.candidate_receipt.descriptor().relay_parent, + Some(votes) => + (votes.candidate_receipt.descriptor().relay_parent, Some(Some(votes))), None => { gum::warn!( target: LOG_TARGET, @@ -793,6 +800,12 @@ impl Initialized { "Number of validators" ); + // If we have already fetched the votes - use them here. + let votes_in_db = match maybe_votes_in_db { + Some(votes) => votes, + None => overlay_db.load_candidate_votes(session, &candidate_hash)?, + }; + // In case we are not provided with a candidate receipt // we operate under the assumption, that a previous vote // which included a `CandidateReceipt` was seen. @@ -801,10 +814,7 @@ impl Initialized { // There is one exception: A sufficiently sophisticated attacker could prevent // us from seeing the backing votes by withholding arbitrary blocks, and hence we do // not have a `CandidateReceipt` available. - let old_state = match overlay_db - .load_candidate_votes(session, &candidate_hash)? - .map(CandidateVotes::from) - { + let old_state = match votes_in_db.map(CandidateVotes::from) { Some(votes) => CandidateVoteState::new(votes, &env, now), None => if let MaybeCandidateReceipt::Provides(candidate_receipt) = candidate_receipt { From 866e4e69bdcee74e690e520631a855837963396c Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 4 Apr 2023 11:18:32 +0300 Subject: [PATCH 26/40] Fix the twisted logic in `handle_import_statements` --- .../dispute-coordinator/src/initialized.rs | 40 ++++++++----------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 915acbf0deb1..bb68d778900e 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -743,29 +743,29 @@ impl Initialized { return Ok(ImportStatementsResult::InvalidImport) } + let candidate_hash = candidate_receipt.hash(); + let votes_in_db = overlay_db.load_candidate_votes(session, &candidate_hash)?; + // At this point we should either have a `CandidateReceipt` or have the relay parent stored // in the database. It's extracted here so that we can obtain `SessionInfo` from the runtime. // In case we do query the database we'll save the result in `maybe_votes_in_db` which is // `Option>`. We need nested option to differentiate later if the db // call was made and it returned `None` (`Some(None)`/`Some(Some(votes))`) or the db call was // not made at all (`None`). - let (relay_parent, maybe_votes_in_db) = match &candidate_receipt { + let relay_parent = match &candidate_receipt { MaybeCandidateReceipt::Provides(candidate_receipt) => - (candidate_receipt.descriptor().relay_parent, None), - MaybeCandidateReceipt::AssumeBackingVotePresent(candidate_hash) => { - match overlay_db.load_candidate_votes(session, &candidate_hash)? { - Some(votes) => - (votes.candidate_receipt.descriptor().relay_parent, Some(Some(votes))), - None => { - gum::warn!( - target: LOG_TARGET, - session, - ?candidate_hash, - "Cannot import votes, without `CandidateReceipt` available!" - ); - return Ok(ImportStatementsResult::InvalidImport) - }, - } + candidate_receipt.descriptor().relay_parent, + MaybeCandidateReceipt::AssumeBackingVotePresent(candidate_hash) => match &votes_in_db { + Some(votes) => votes.candidate_receipt.descriptor().relay_parent, + None => { + gum::warn!( + target: LOG_TARGET, + session, + ?candidate_hash, + "Cannot obtain relay parent without `CandidateReceipt` available!" + ); + return Ok(ImportStatementsResult::InvalidImport) + }, }, }; @@ -790,8 +790,6 @@ impl Initialized { Some(env) => env, }; - let candidate_hash = candidate_receipt.hash(); - gum::trace!( target: LOG_TARGET, ?candidate_hash, @@ -800,12 +798,6 @@ impl Initialized { "Number of validators" ); - // If we have already fetched the votes - use them here. - let votes_in_db = match maybe_votes_in_db { - Some(votes) => votes, - None => overlay_db.load_candidate_votes(session, &candidate_hash)?, - }; - // In case we are not provided with a candidate receipt // we operate under the assumption, that a previous vote // which included a `CandidateReceipt` was seen. From 6ba8b4fbb932a2fe59274487a9219c47a6139f56 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 4 Apr 2023 11:54:23 +0300 Subject: [PATCH 27/40] fixup --- node/core/dispute-coordinator/src/initialized.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index bb68d778900e..c1c4c8dbda2f 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -745,13 +745,6 @@ impl Initialized { let candidate_hash = candidate_receipt.hash(); let votes_in_db = overlay_db.load_candidate_votes(session, &candidate_hash)?; - - // At this point we should either have a `CandidateReceipt` or have the relay parent stored - // in the database. It's extracted here so that we can obtain `SessionInfo` from the runtime. - // In case we do query the database we'll save the result in `maybe_votes_in_db` which is - // `Option>`. We need nested option to differentiate later if the db - // call was made and it returned `None` (`Some(None)`/`Some(Some(votes))`) or the db call was - // not made at all (`None`). let relay_parent = match &candidate_receipt { MaybeCandidateReceipt::Provides(candidate_receipt) => candidate_receipt.descriptor().relay_parent, From f248b2033444fe071e36acd1f130a210021961a9 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 4 Apr 2023 12:01:56 +0300 Subject: [PATCH 28/40] Small style fix --- node/core/dispute-coordinator/src/lib.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index eb96be5172be..21f2723bba1a 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -302,22 +302,19 @@ impl DisputeCoordinatorSubsystem { // Cache the sessions. A failure to fetch a session here is not that critical so we // won't abort the initialization for idx in highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1)..=highest_session { - match runtime_info + if let Err(e) = runtime_info .get_session_info_by_index(ctx.sender(), initial_head.hash, idx) .await { - Ok(_) => { /* do nothing */ }, - Err(e) => { - gum::warn!( - target: LOG_TARGET, - leaf_hash = ?initial_head.hash, - session_idx = idx, - err = ?e, - "Can't cache SessionInfo during subsystem initialization. Skipping session." - ); - continue - }, - } + gum::warn!( + target: LOG_TARGET, + leaf_hash = ?initial_head.hash, + session_idx = idx, + err = ?e, + "Can't cache SessionInfo during subsystem initialization. Skipping session." + ); + continue + }; } // Prune obsolete disputes: From 7010b0a3241fd2506f396b2da447594aaa1073bf Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 4 Apr 2023 15:17:02 +0300 Subject: [PATCH 29/40] Decrease log levels for caching errors to debug and fix a typo --- node/core/dispute-coordinator/src/initialized.rs | 6 +++--- node/core/dispute-coordinator/src/lib.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index c1c4c8dbda2f..97c70bf0c8ec 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -289,7 +289,7 @@ impl Initialized { match session_idx { Ok(session_idx) if session_idx > self.highest_session => { - // There is a new sesion. Perform a dummy fetch to cache it. + // There is a new session. Perform a dummy fetch to cache it. for idx in self.highest_session + 1..=session_idx { match self .runtime_info @@ -300,7 +300,7 @@ impl Initialized { self.error = None; }, Err(err) => { - gum::warn!( + gum::debug!( target: LOG_TARGET, session_idx, leaf_hash = ?new_leaf.hash, @@ -321,7 +321,7 @@ impl Initialized { }, Ok(_) => { /* no new session => nothing to cache */ }, Err(err) => { - gum::warn!( + gum::debug!( target: LOG_TARGET, ?err, "Failed to update session cache for disputes - can't fetch session index", diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 21f2723bba1a..664da2cb342b 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -306,7 +306,7 @@ impl DisputeCoordinatorSubsystem { .get_session_info_by_index(ctx.sender(), initial_head.hash, idx) .await { - gum::warn!( + gum::debug!( target: LOG_TARGET, leaf_hash = ?initial_head.hash, session_idx = idx, From 034947303d7598705ca18a1e611f6ad88988c27d Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 5 Apr 2023 14:21:20 +0300 Subject: [PATCH 30/40] Update outdated comment --- node/core/dispute-coordinator/src/initialized.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 97c70bf0c8ec..5a0f368853ca 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -75,7 +75,7 @@ pub struct InitialData { /// After the first active leaves update we transition to `Initialized` state. /// /// Before the first active leaves update we can't really do much. We cannot check incoming -/// statements for validity, we cannot query orderings, we have no valid `RollingSessionWindow`, +/// statements for validity, we cannot query orderings, we have no valid `SessionInfo`, /// ... pub(crate) struct Initialized { keystore: Arc, From fd865f003964c73ade724cc96da056db5deada12 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 5 Apr 2023 14:26:55 +0300 Subject: [PATCH 31/40] Remove `ensure_available_session_info` --- .../dispute-coordinator/src/initialized.rs | 34 ++----------------- 1 file changed, 3 insertions(+), 31 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 5a0f368853ca..c9420c30ee38 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -36,7 +36,7 @@ use polkadot_node_subsystem::{ }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, }; -use polkadot_node_subsystem_util::runtime::{Error as RuntimeError, RuntimeInfo}; +use polkadot_node_subsystem_util::runtime::RuntimeInfo; use polkadot_primitives::{ BlockNumber, CandidateHash, CandidateReceipt, CompactStatement, DisputeStatement, DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, ValidDisputeStatementKind, @@ -45,7 +45,7 @@ use polkadot_primitives::{ use crate::{ db, - error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, + error::{log_error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, import::{CandidateEnvironment, CandidateVoteState}, is_potential_spam, metrics::Metrics, @@ -86,9 +86,6 @@ pub(crate) struct Initialized { scraper: ChainScraper, participation_receiver: WorkerMessageReceiver, metrics: Metrics, - // This tracks only rolling session window failures. - // It can be a `Vec` if the need to track more arises. - error: Option, } #[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)] @@ -115,7 +112,6 @@ impl Initialized { participation, participation_receiver, metrics, - error: None, } } @@ -296,9 +292,7 @@ impl Initialized { .get_session_info_by_index(ctx.sender(), new_leaf.hash, idx) .await { - Ok(_) => { - self.error = None; - }, + Ok(_) => {}, Err(err) => { gum::debug!( target: LOG_TARGET, @@ -307,7 +301,6 @@ impl Initialized { ?err, "Error caching SessionInfo on ActiveLeaves update" ); - self.error = Some(err); }, } } @@ -615,9 +608,6 @@ impl Initialized { } }, DisputeCoordinatorMessage::RecentDisputes(tx) => { - // Return error if session information is missing. - self.ensure_available_session_info()?; - gum::trace!(target: LOG_TARGET, "Loading recent disputes from db"); let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { disputes @@ -631,11 +621,7 @@ impl Initialized { ); }, DisputeCoordinatorMessage::ActiveDisputes(tx) => { - // Return error if session information is missing. - self.ensure_available_session_info()?; - gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::ActiveDisputes"); - let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { disputes } else { @@ -651,11 +637,7 @@ impl Initialized { ); }, DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => { - // Return error if session information is missing. - self.ensure_available_session_info()?; - gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::QueryCandidateVotes"); - let mut query_output = Vec::new(); for (session_index, candidate_hash) in query { if let Some(v) = @@ -695,8 +677,6 @@ impl Initialized { block_descriptions, tx, } => { - // Return error if session information is missing. - self.ensure_available_session_info()?; gum::trace!( target: LOG_TARGET, "DisputeCoordinatorMessage::DetermineUndisputedChain" @@ -716,14 +696,6 @@ impl Initialized { Ok(Box::new(|| Ok(()))) } - // Helper function for checking subsystem errors in message processing. - fn ensure_available_session_info(&self) -> Result<()> { - match self.error { - Some(_) => Err(Error::SessionInfo), - None => Ok(()), - } - } - // We use fatal result rather than result here. Reason being, We for example increase // spam slots in this function. If then the import fails for some non fatal and // unrelated reason, we should likely actually decrement previously incremented spam From 978ad4f223d517faa7a7fbad96e3f8de4fa17501 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 5 Apr 2023 15:07:26 +0300 Subject: [PATCH 32/40] Load relay parent from db in `process_on_chain_votes` --- .../dispute-coordinator/src/initialized.rs | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index c9420c30ee38..0c20d5de4c31 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -164,13 +164,7 @@ impl Initialized { let mut overlay_db = OverlayedBackend::new(backend); for votes in on_chain_votes { let _ = self - .process_on_chain_votes( - ctx, - &mut overlay_db, - votes, - clock.now(), - first_leaf.hash, - ) + .process_on_chain_votes(ctx, &mut overlay_db, votes, clock.now()) .await .map_err(|error| { gum::warn!( @@ -332,16 +326,15 @@ impl Initialized { // The `runtime-api` subsystem has an internal queue which serializes the execution, // so there is no point in running these in parallel for votes in scraped_updates.on_chain_votes { - let _ = self - .process_on_chain_votes(ctx, overlay_db, votes, now, new_leaf.hash) - .await - .map_err(|error| { + let _ = self.process_on_chain_votes(ctx, overlay_db, votes, now).await.map_err( + |error| { gum::warn!( target: LOG_TARGET, ?error, "Skipping scraping block due to error", ); - }); + }, + ); } } @@ -357,7 +350,6 @@ impl Initialized { overlay_db: &mut OverlayedBackend<'_, impl Backend>, votes: ScrapedOnChainVotes, now: u64, - block_hash: Hash, ) -> Result<()> { let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = votes; @@ -480,9 +472,23 @@ impl Initialized { ?session, "Importing dispute votes from chain for candidate" ); + + let relay_parent = match overlay_db.load_candidate_votes(session, &candidate_hash)? { + Some(votes) => votes.candidate_receipt.descriptor().relay_parent, + None => { + gum::warn!( + target: LOG_TARGET, + ?candidate_hash, + ?session, + "Can't load `CandidateReceipt`. Skipping." + ); + continue + }, + }; + let session_info = match self .runtime_info - .get_session_info_by_index(ctx.sender(), block_hash, session) + .get_session_info_by_index(ctx.sender(), relay_parent, session) .await { Ok(extended_session_info) => &extended_session_info.session_info, From b65ff6ea109aaed788f29517c135099862bbedaf Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 5 Apr 2023 18:51:40 +0300 Subject: [PATCH 33/40] Revert "Load relay parent from db in `process_on_chain_votes`" This reverts commit 978ad4f223d517faa7a7fbad96e3f8de4fa17501. --- .../dispute-coordinator/src/initialized.rs | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 0c20d5de4c31..c9420c30ee38 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -164,7 +164,13 @@ impl Initialized { let mut overlay_db = OverlayedBackend::new(backend); for votes in on_chain_votes { let _ = self - .process_on_chain_votes(ctx, &mut overlay_db, votes, clock.now()) + .process_on_chain_votes( + ctx, + &mut overlay_db, + votes, + clock.now(), + first_leaf.hash, + ) .await .map_err(|error| { gum::warn!( @@ -326,15 +332,16 @@ impl Initialized { // The `runtime-api` subsystem has an internal queue which serializes the execution, // so there is no point in running these in parallel for votes in scraped_updates.on_chain_votes { - let _ = self.process_on_chain_votes(ctx, overlay_db, votes, now).await.map_err( - |error| { + let _ = self + .process_on_chain_votes(ctx, overlay_db, votes, now, new_leaf.hash) + .await + .map_err(|error| { gum::warn!( target: LOG_TARGET, ?error, "Skipping scraping block due to error", ); - }, - ); + }); } } @@ -350,6 +357,7 @@ impl Initialized { overlay_db: &mut OverlayedBackend<'_, impl Backend>, votes: ScrapedOnChainVotes, now: u64, + block_hash: Hash, ) -> Result<()> { let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = votes; @@ -472,23 +480,9 @@ impl Initialized { ?session, "Importing dispute votes from chain for candidate" ); - - let relay_parent = match overlay_db.load_candidate_votes(session, &candidate_hash)? { - Some(votes) => votes.candidate_receipt.descriptor().relay_parent, - None => { - gum::warn!( - target: LOG_TARGET, - ?candidate_hash, - ?session, - "Can't load `CandidateReceipt`. Skipping." - ); - continue - }, - }; - let session_info = match self .runtime_info - .get_session_info_by_index(ctx.sender(), relay_parent, session) + .get_session_info_by_index(ctx.sender(), block_hash, session) .await { Ok(extended_session_info) => &extended_session_info.session_info, From 71e2b7b68aa1007d1ac1d9953f63f754bfbdbc24 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 5 Apr 2023 22:13:49 +0300 Subject: [PATCH 34/40] Keep track of highest seen session and last session cached without gaps. --- .../dispute-coordinator/src/initialized.rs | 65 ++++++++++++++----- node/core/dispute-coordinator/src/lib.rs | 32 ++++++--- 2 files changed, 71 insertions(+), 26 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index c9420c30ee38..d02a0ce9be8f 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -72,6 +72,20 @@ pub struct InitialData { pub leaf: ActivatedLeaf, } +// Wrapper types used in `new()` to avoid wrong initialization +pub(crate) struct LastConsecutiveCachedSessionIndex(SessionIndex); +impl From for LastConsecutiveCachedSessionIndex { + fn from(session: SessionIndex) -> Self { + LastConsecutiveCachedSessionIndex(session) + } +} +pub(crate) struct HighestSeenSessionIndex(SessionIndex); +impl From for HighestSeenSessionIndex { + fn from(session: SessionIndex) -> Self { + HighestSeenSessionIndex(session) + } +} + /// After the first active leaves update we transition to `Initialized` state. /// /// Before the first active leaves update we can't really do much. We cannot check incoming @@ -80,7 +94,16 @@ pub struct InitialData { pub(crate) struct Initialized { keystore: Arc, runtime_info: RuntimeInfo, - highest_session: SessionIndex, + /// This is the highest `SessionIndex` seen via `ActiveLeavesUpdate`. It doen't matter if it was + /// cached successfully or not. It is used to detect ancient disputes. + highest_session_seen: SessionIndex, + /// This is the `SessionIndex` up to which the cache contains no gaps. For we have seen session 5 + /// and we managed successfully to cache sessions 1, 2, 3 and 5 (caching session 4 failed for + /// some reason). In this case we have got `highest_session_seen == 5` and + /// `last_consecutive_cached_session == 3`. + /// On each `ActiveLeavesUpdate` we'll start fetching sessions from + /// `last_consecutive_cached_session + 1` up to the session index of the leaf. + last_consecutive_cached_session: SessionIndex, spam_slots: SpamSlots, participation: Participation, scraper: ChainScraper, @@ -96,7 +119,8 @@ impl Initialized { runtime_info: RuntimeInfo, spam_slots: SpamSlots, scraper: ChainScraper, - highest_session: SessionIndex, + highest_session_seen: HighestSeenSessionIndex, + last_consecutive_cached_session: LastConsecutiveCachedSessionIndex, ) -> Self { let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem; @@ -106,7 +130,8 @@ impl Initialized { Self { keystore, runtime_info, - highest_session, + highest_session_seen: highest_session_seen.0, + last_consecutive_cached_session: last_consecutive_cached_session.0, spam_slots, scraper, participation, @@ -284,28 +309,32 @@ impl Initialized { self.runtime_info.get_session_index_for_child(ctx.sender(), new_leaf.hash).await; match session_idx { - Ok(session_idx) if session_idx > self.highest_session => { + Ok(session_idx) if session_idx > self.last_consecutive_cached_session => { // There is a new session. Perform a dummy fetch to cache it. - for idx in self.highest_session + 1..=session_idx { - match self + let mut gap_in_cache = false; + for idx in self.last_consecutive_cached_session + 1..=session_idx { + if let Err(err) = self .runtime_info .get_session_info_by_index(ctx.sender(), new_leaf.hash, idx) .await { - Ok(_) => {}, - Err(err) => { - gum::debug!( - target: LOG_TARGET, - session_idx, - leaf_hash = ?new_leaf.hash, - ?err, - "Error caching SessionInfo on ActiveLeaves update" - ); - }, + gum::debug!( + target: LOG_TARGET, + session_idx, + leaf_hash = ?new_leaf.hash, + ?err, + "Error caching SessionInfo on ActiveLeaves update" + ); + gap_in_cache = true; + } + + if !gap_in_cache { + self.last_consecutive_cached_session = idx; } } - self.highest_session = session_idx; + self.highest_session_seen = session_idx; + db::v1::note_earliest_session( overlay_db, session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1), @@ -1251,7 +1280,7 @@ impl Initialized { } fn session_is_ancient(&self, session_idx: SessionIndex) -> bool { - return session_idx < self.highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1) + return session_idx < self.highest_session_seen.saturating_sub(DISPUTE_WINDOW.get() - 1) } } diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 664da2cb342b..176284f7dc96 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -43,9 +43,7 @@ use polkadot_node_subsystem_util::{ database::Database, runtime::{Config as RuntimeInfoConfig, RuntimeInfo}, }; -use polkadot_primitives::{ - DisputeStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidatorIndex, -}; +use polkadot_primitives::{DisputeStatement, ScrapedOnChainVotes, SessionInfo, ValidatorIndex}; use crate::{ error::{FatalResult, Result}, @@ -68,7 +66,9 @@ pub(crate) mod error; /// Subsystem after receiving the first active leaf. mod initialized; -use initialized::{InitialData, Initialized}; +use initialized::{ + HighestSeenSessionIndex, InitialData, Initialized, LastConsecutiveCachedSessionIndex, +}; /// Provider of data scraped from chain. /// @@ -230,7 +230,14 @@ impl DisputeCoordinatorSubsystem { .expect("DISPUTE_WINDOW can't be 0; qed."), }); let mut overlay_db = OverlayedBackend::new(&mut backend); - let (participations, votes, spam_slots, ordering_provider, highest_session) = match self + let ( + participations, + votes, + spam_slots, + ordering_provider, + highest_session_seen, + last_consecutive_cached_session, + ) = match self .handle_startup(ctx, first_leaf.clone(), &mut runtime_info, &mut overlay_db, clock) .await { @@ -254,7 +261,8 @@ impl DisputeCoordinatorSubsystem { runtime_info, spam_slots, ordering_provider, - highest_session, + highest_session_seen, + last_consecutive_cached_session, ), backend, ))) @@ -278,7 +286,8 @@ impl DisputeCoordinatorSubsystem { Vec, SpamSlots, ChainScraper, - SessionIndex, + HighestSeenSessionIndex, + LastConsecutiveCachedSessionIndex, )> { let now = clock.now(); @@ -299,6 +308,8 @@ impl DisputeCoordinatorSubsystem { .get_session_index_for_child(ctx.sender(), initial_head.hash) .await?; + let mut last_consecutive_cached_session = 0; + let mut gap_in_cache = false; // Cache the sessions. A failure to fetch a session here is not that critical so we // won't abort the initialization for idx in highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1)..=highest_session { @@ -313,8 +324,12 @@ impl DisputeCoordinatorSubsystem { err = ?e, "Can't cache SessionInfo during subsystem initialization. Skipping session." ); + gap_in_cache = true; continue }; + if !gap_in_cache { + last_consecutive_cached_session = idx; + } } // Prune obsolete disputes: @@ -414,7 +429,8 @@ impl DisputeCoordinatorSubsystem { votes, SpamSlots::recover_from_state(spam_disputes), scraper, - highest_session, + HighestSeenSessionIndex::from(highest_session), + LastConsecutiveCachedSessionIndex::from(last_consecutive_cached_session), )) } } From 8466d1524834400b744c6470a7df249de32247f5 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 12 Apr 2023 13:14:03 +0300 Subject: [PATCH 35/40] Apply suggestions from code review Co-authored-by: ordian --- node/core/dispute-coordinator/src/initialized.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index d02a0ce9be8f..52dad65e2290 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -97,7 +97,7 @@ pub(crate) struct Initialized { /// This is the highest `SessionIndex` seen via `ActiveLeavesUpdate`. It doen't matter if it was /// cached successfully or not. It is used to detect ancient disputes. highest_session_seen: SessionIndex, - /// This is the `SessionIndex` up to which the cache contains no gaps. For we have seen session 5 + /// This is the `SessionIndex` up to which the cache contains no gaps. For example, we have seen session 5 /// and we managed successfully to cache sessions 1, 2, 3 and 5 (caching session 4 failed for /// some reason). In this case we have got `highest_session_seen == 5` and /// `last_consecutive_cached_session == 3`. From e0ebe59f7c5fe145a3ea1442d30e01e99bc867ef Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 12 Apr 2023 14:17:59 +0300 Subject: [PATCH 36/40] Handle session caching failure on startup correctly --- .../dispute-coordinator/src/initialized.rs | 23 ++++++++++++++----- node/core/dispute-coordinator/src/lib.rs | 11 +++++---- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 52dad65e2290..cb0e0090dca1 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -103,7 +103,8 @@ pub(crate) struct Initialized { /// `last_consecutive_cached_session == 3`. /// On each `ActiveLeavesUpdate` we'll start fetching sessions from /// `last_consecutive_cached_session + 1` up to the session index of the leaf. - last_consecutive_cached_session: SessionIndex, + /// None means that the cache is empty. + last_consecutive_cached_session: Option, spam_slots: SpamSlots, participation: Participation, scraper: ChainScraper, @@ -120,7 +121,7 @@ impl Initialized { spam_slots: SpamSlots, scraper: ChainScraper, highest_session_seen: HighestSeenSessionIndex, - last_consecutive_cached_session: LastConsecutiveCachedSessionIndex, + last_consecutive_cached_session: Option, ) -> Self { let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem; @@ -131,7 +132,7 @@ impl Initialized { keystore, runtime_info, highest_session_seen: highest_session_seen.0, - last_consecutive_cached_session: last_consecutive_cached_session.0, + last_consecutive_cached_session: last_consecutive_cached_session.map(|v| v.0), spam_slots, scraper, participation, @@ -309,10 +310,20 @@ impl Initialized { self.runtime_info.get_session_index_for_child(ctx.sender(), new_leaf.hash).await; match session_idx { - Ok(session_idx) if session_idx > self.last_consecutive_cached_session => { + Ok(session_idx) + if self.last_consecutive_cached_session.is_none() || + session_idx > + self.last_consecutive_cached_session.expect( + "The first clause explicitly handles `None` case. qed.", + ) => + { // There is a new session. Perform a dummy fetch to cache it. let mut gap_in_cache = false; - for idx in self.last_consecutive_cached_session + 1..=session_idx { + for idx in self + .last_consecutive_cached_session + .unwrap_or(session_idx.saturating_sub(DISPUTE_WINDOW.get() - 2)) + + 1..=session_idx + { if let Err(err) = self .runtime_info .get_session_info_by_index(ctx.sender(), new_leaf.hash, idx) @@ -329,7 +340,7 @@ impl Initialized { } if !gap_in_cache { - self.last_consecutive_cached_session = idx; + self.last_consecutive_cached_session = Some(idx); } } diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 176284f7dc96..d44cb2583ba3 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -287,7 +287,7 @@ impl DisputeCoordinatorSubsystem { SpamSlots, ChainScraper, HighestSeenSessionIndex, - LastConsecutiveCachedSessionIndex, + Option, )> { let now = clock.now(); @@ -308,7 +308,9 @@ impl DisputeCoordinatorSubsystem { .get_session_index_for_child(ctx.sender(), initial_head.hash) .await?; - let mut last_consecutive_cached_session = 0; + // `Option` because caching might fail. In that case an actual value will be populated on the + // next leaf update. + let mut last_consecutive_cached_session = None; let mut gap_in_cache = false; // Cache the sessions. A failure to fetch a session here is not that critical so we // won't abort the initialization @@ -328,7 +330,8 @@ impl DisputeCoordinatorSubsystem { continue }; if !gap_in_cache { - last_consecutive_cached_session = idx; + last_consecutive_cached_session = + Some(LastConsecutiveCachedSessionIndex::from(idx)); } } @@ -430,7 +433,7 @@ impl DisputeCoordinatorSubsystem { SpamSlots::recover_from_state(spam_disputes), scraper, HighestSeenSessionIndex::from(highest_session), - LastConsecutiveCachedSessionIndex::from(last_consecutive_cached_session), + last_consecutive_cached_session, )) } } From 838dda0aba271db6b20b9f19bbdab9aeac6cf147 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 12 Apr 2023 21:47:13 +0300 Subject: [PATCH 37/40] Update node/core/dispute-coordinator/src/initialized.rs Co-authored-by: ordian --- node/core/dispute-coordinator/src/initialized.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index cb0e0090dca1..626056d2df26 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -321,7 +321,7 @@ impl Initialized { let mut gap_in_cache = false; for idx in self .last_consecutive_cached_session - .unwrap_or(session_idx.saturating_sub(DISPUTE_WINDOW.get() - 2)) + + .unwrap_or(session_idx.saturating_sub(DISPUTE_WINDOW.get())) + 1..=session_idx { if let Err(err) = self From 358f4804b61b3a0de70bc768b70b4d3e02440500 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 12 Apr 2023 22:09:52 +0300 Subject: [PATCH 38/40] Simplify session caching retries --- .../dispute-coordinator/src/initialized.rs | 51 ++++++------------- node/core/dispute-coordinator/src/lib.rs | 27 ++++------ 2 files changed, 26 insertions(+), 52 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 626056d2df26..a9ff7e28b28d 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -72,20 +72,6 @@ pub struct InitialData { pub leaf: ActivatedLeaf, } -// Wrapper types used in `new()` to avoid wrong initialization -pub(crate) struct LastConsecutiveCachedSessionIndex(SessionIndex); -impl From for LastConsecutiveCachedSessionIndex { - fn from(session: SessionIndex) -> Self { - LastConsecutiveCachedSessionIndex(session) - } -} -pub(crate) struct HighestSeenSessionIndex(SessionIndex); -impl From for HighestSeenSessionIndex { - fn from(session: SessionIndex) -> Self { - HighestSeenSessionIndex(session) - } -} - /// After the first active leaves update we transition to `Initialized` state. /// /// Before the first active leaves update we can't really do much. We cannot check incoming @@ -104,7 +90,7 @@ pub(crate) struct Initialized { /// On each `ActiveLeavesUpdate` we'll start fetching sessions from /// `last_consecutive_cached_session + 1` up to the session index of the leaf. /// None means that the cache is empty. - last_consecutive_cached_session: Option, + gaps_in_cache: bool, spam_slots: SpamSlots, participation: Participation, scraper: ChainScraper, @@ -120,8 +106,8 @@ impl Initialized { runtime_info: RuntimeInfo, spam_slots: SpamSlots, scraper: ChainScraper, - highest_session_seen: HighestSeenSessionIndex, - last_consecutive_cached_session: Option, + highest_session_seen: SessionIndex, + gaps_in_cache: bool, ) -> Self { let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem; @@ -131,8 +117,8 @@ impl Initialized { Self { keystore, runtime_info, - highest_session_seen: highest_session_seen.0, - last_consecutive_cached_session: last_consecutive_cached_session.map(|v| v.0), + highest_session_seen, + gaps_in_cache, spam_slots, scraper, participation, @@ -311,19 +297,18 @@ impl Initialized { match session_idx { Ok(session_idx) - if self.last_consecutive_cached_session.is_none() || - session_idx > - self.last_consecutive_cached_session.expect( - "The first clause explicitly handles `None` case. qed.", - ) => + if self.gaps_in_cache || session_idx > self.highest_session_seen => { + // If error has occurred during last session caching - fetch the whole window + // Otherwise - cache only the new sessions + let lower_bound = if self.gaps_in_cache { + session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1) + } else { + self.highest_session_seen + }; + // There is a new session. Perform a dummy fetch to cache it. - let mut gap_in_cache = false; - for idx in self - .last_consecutive_cached_session - .unwrap_or(session_idx.saturating_sub(DISPUTE_WINDOW.get())) + - 1..=session_idx - { + for idx in lower_bound..=session_idx { if let Err(err) = self .runtime_info .get_session_info_by_index(ctx.sender(), new_leaf.hash, idx) @@ -336,11 +321,7 @@ impl Initialized { ?err, "Error caching SessionInfo on ActiveLeaves update" ); - gap_in_cache = true; - } - - if !gap_in_cache { - self.last_consecutive_cached_session = Some(idx); + self.gaps_in_cache = true; } } diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index d44cb2583ba3..a4fbceb8b73e 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -43,7 +43,9 @@ use polkadot_node_subsystem_util::{ database::Database, runtime::{Config as RuntimeInfoConfig, RuntimeInfo}, }; -use polkadot_primitives::{DisputeStatement, ScrapedOnChainVotes, SessionInfo, ValidatorIndex}; +use polkadot_primitives::{ + DisputeStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidatorIndex, +}; use crate::{ error::{FatalResult, Result}, @@ -66,9 +68,7 @@ pub(crate) mod error; /// Subsystem after receiving the first active leaf. mod initialized; -use initialized::{ - HighestSeenSessionIndex, InitialData, Initialized, LastConsecutiveCachedSessionIndex, -}; +use initialized::{InitialData, Initialized}; /// Provider of data scraped from chain. /// @@ -236,7 +236,7 @@ impl DisputeCoordinatorSubsystem { spam_slots, ordering_provider, highest_session_seen, - last_consecutive_cached_session, + gaps_in_cache, ) = match self .handle_startup(ctx, first_leaf.clone(), &mut runtime_info, &mut overlay_db, clock) .await @@ -262,7 +262,7 @@ impl DisputeCoordinatorSubsystem { spam_slots, ordering_provider, highest_session_seen, - last_consecutive_cached_session, + gaps_in_cache, ), backend, ))) @@ -286,8 +286,8 @@ impl DisputeCoordinatorSubsystem { Vec, SpamSlots, ChainScraper, - HighestSeenSessionIndex, - Option, + SessionIndex, + bool, )> { let now = clock.now(); @@ -308,9 +308,6 @@ impl DisputeCoordinatorSubsystem { .get_session_index_for_child(ctx.sender(), initial_head.hash) .await?; - // `Option` because caching might fail. In that case an actual value will be populated on the - // next leaf update. - let mut last_consecutive_cached_session = None; let mut gap_in_cache = false; // Cache the sessions. A failure to fetch a session here is not that critical so we // won't abort the initialization @@ -329,10 +326,6 @@ impl DisputeCoordinatorSubsystem { gap_in_cache = true; continue }; - if !gap_in_cache { - last_consecutive_cached_session = - Some(LastConsecutiveCachedSessionIndex::from(idx)); - } } // Prune obsolete disputes: @@ -432,8 +425,8 @@ impl DisputeCoordinatorSubsystem { votes, SpamSlots::recover_from_state(spam_disputes), scraper, - HighestSeenSessionIndex::from(highest_session), - last_consecutive_cached_session, + highest_session, + gap_in_cache, )) } } From a7981b8c4ab168bd537f88778a7a1fc7f873fbc6 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 12 Apr 2023 22:18:35 +0300 Subject: [PATCH 39/40] Update stale comment --- node/core/dispute-coordinator/src/initialized.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index a9ff7e28b28d..e68cfd2c6833 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -83,13 +83,7 @@ pub(crate) struct Initialized { /// This is the highest `SessionIndex` seen via `ActiveLeavesUpdate`. It doen't matter if it was /// cached successfully or not. It is used to detect ancient disputes. highest_session_seen: SessionIndex, - /// This is the `SessionIndex` up to which the cache contains no gaps. For example, we have seen session 5 - /// and we managed successfully to cache sessions 1, 2, 3 and 5 (caching session 4 failed for - /// some reason). In this case we have got `highest_session_seen == 5` and - /// `last_consecutive_cached_session == 3`. - /// On each `ActiveLeavesUpdate` we'll start fetching sessions from - /// `last_consecutive_cached_session + 1` up to the session index of the leaf. - /// None means that the cache is empty. + /// Will be set to `true` if an error occured during the last caching attempt gaps_in_cache: bool, spam_slots: SpamSlots, participation: Participation, From 9a47ca6aa04dd0dd7d24ab1e3594516c8fd719b2 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Sat, 15 Apr 2023 12:37:20 +0300 Subject: [PATCH 40/40] Fix lower bound calculation for session caching --- node/core/dispute-coordinator/src/initialized.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index e68cfd2c6833..b3937c025908 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -298,7 +298,7 @@ impl Initialized { let lower_bound = if self.gaps_in_cache { session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1) } else { - self.highest_session_seen + self.highest_session_seen + 1 }; // There is a new session. Perform a dummy fetch to cache it.