From 3365d96c5c34fa74a152be6e5e759bd2d31a4782 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Thu, 23 Dec 2021 23:25:59 +0300 Subject: [PATCH 1/5] availability-distribution: look for leaf ancestors --- .../availability-distribution/src/error.rs | 8 +- .../src/requester/mod.rs | 170 +++++++++++++++--- 2 files changed, 156 insertions(+), 22 deletions(-) diff --git a/node/network/availability-distribution/src/error.rs b/node/network/availability-distribution/src/error.rs index d3ff182b33a3..72916b31c987 100644 --- a/node/network/availability-distribution/src/error.rs +++ b/node/network/availability-distribution/src/error.rs @@ -23,7 +23,7 @@ use thiserror::Error; use futures::channel::oneshot; use polkadot_node_subsystem_util::runtime; -use polkadot_subsystem::SubsystemError; +use polkadot_subsystem::{ChainApiError, SubsystemError}; use crate::LOG_TARGET; @@ -62,6 +62,12 @@ pub enum Fatal { /// Errors coming from runtime::Runtime. #[error("Error while accessing runtime information: {0}")] Runtime(#[from] runtime::Fatal), + + #[error("Oneshot for receiving response from Chain API got cancelled")] + ChainApiSenderDropped(#[source] oneshot::Canceled), + + #[error("Retrieving response from Chain API unexpectedly failed with error: {0}")] + ChainApi(#[from] ChainApiError), } /// Non-fatal errors of this subsystem. diff --git a/node/network/availability-distribution/src/requester/mod.rs b/node/network/availability-distribution/src/requester/mod.rs index 6812fa69e954..665d92e19f33 100644 --- a/node/network/availability-distribution/src/requester/mod.rs +++ b/node/network/availability-distribution/src/requester/mod.rs @@ -27,18 +27,23 @@ use std::{ }; use futures::{ - channel::mpsc, + channel::{mpsc, oneshot}, task::{Context, Poll}, Stream, }; -use polkadot_node_subsystem_util::runtime::{get_occupied_cores, RuntimeInfo}; -use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore}; +use polkadot_node_subsystem_util::{ + request_session_index_for_child, + runtime::{self, get_occupied_cores, RuntimeInfo}, +}; +use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore, SessionIndex}; use polkadot_subsystem::{ - messages::AllMessages, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext, + messages::{AllMessages, ChainApiMessage}, + ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext, }; -use super::{Metrics, LOG_TARGET}; +use super::{Metrics, Result, LOG_TARGET}; +use crate::error::Fatal; /// Cache for session information. mod session_cache; @@ -64,6 +69,13 @@ pub struct Requester { /// Localized information about sessions we are currently interested in. session_cache: SessionCache, + /// Chain of up to [`LEAF_ANCESTRY_LEN_WITHIN_SESSION`] ancestors of activated leaves. + /// leaf -> [leaf parent, leaf grandparent, ...] + /// Fetch tasks also get launched for array `relay_ancestors[leaf]`, this helps with + /// possibly missing activated leaves updates, as well as with "slow" validators which do not + /// manage to fetch their chunks in time. + relay_ancestors: HashMap>, + /// Sender to be cloned for `FetchTask`s. tx: mpsc::Sender, @@ -75,14 +87,25 @@ pub struct Requester { } impl Requester { + /// How many ancestors of the leaf should we consider along with it. + pub(crate) const LEAF_ANCESTRY_LEN_WITHIN_SESSION: usize = 3; + /// Create a new `Requester`. /// /// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress /// by advancing the stream. pub fn new(metrics: Metrics) -> Self { let (tx, rx) = mpsc::channel(1); - Requester { fetches: HashMap::new(), session_cache: SessionCache::new(), tx, rx, metrics } + Requester { + fetches: HashMap::new(), + session_cache: SessionCache::new(), + relay_ancestors: HashMap::new(), + tx, + rx, + metrics, + } } + /// Update heads that need availability distribution. /// /// For all active heads we will be fetching our chunks for availability distribution. @@ -91,50 +114,75 @@ impl Requester { ctx: &mut Context, runtime: &mut RuntimeInfo, update: ActiveLeavesUpdate, - ) -> super::Result<()> + ) -> Result<()> where Context: SubsystemContext, { tracing::trace!(target: LOG_TARGET, ?update, "Update fetching heads"); let ActiveLeavesUpdate { activated, deactivated } = update; - // Stale leaves happen after a reversion - we don't want to re-run availability there. - let activated = activated.and_then(|h| match h.status { - LeafStatus::Stale => None, - LeafStatus::Fresh => Some(h), - }); // Order important! We need to handle activated, prior to deactivated, otherwise we might // cancel still needed jobs. - self.start_requesting_chunks(ctx, runtime, activated.into_iter()).await?; + if let Some(activated) = activated { + // Stale leaves happen after a reversion - we don't want to re-run availability there. + if let LeafStatus::Fresh = activated.status { + self.start_requesting_chunks(ctx, runtime, activated).await?; + } + } self.stop_requesting_chunks(deactivated.into_iter()); Ok(()) } - /// Start requesting chunks for newly imported heads. + /// Start requesting chunks for newly imported head. + /// + /// This will also request [`SESSION_ANCESTRY_LEN`] leaf ancestors from the same session + /// and start requesting chunks for them too. async fn start_requesting_chunks( &mut self, ctx: &mut Context, runtime: &mut RuntimeInfo, - new_heads: impl Iterator, - ) -> super::Result<()> + new_head: ActivatedLeaf, + ) -> Result<()> where Context: SubsystemContext, { - for ActivatedLeaf { hash: leaf, .. } in new_heads { - let cores = get_occupied_cores(ctx, leaf).await?; + let ActivatedLeaf { hash: leaf, .. } = new_head; + let ancestors_in_session = + get_block_ancestors_in_same_session(ctx, leaf, Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION) + .await + .unwrap_or_else(|err| { + tracing::debug!( + target: LOG_TARGET, + leaf = ?leaf, + "Failed to fetch leaf ancestors in the same session due to an error: {}", + err + ); + Vec::new() + }); + for hash in std::iter::once(leaf).chain(ancestors_in_session.clone()) { + let cores = get_occupied_cores(ctx, hash).await?; tracing::trace!( target: LOG_TARGET, occupied_cores = ?cores, "Query occupied core" ); - self.add_cores(ctx, runtime, leaf, cores).await?; + self.add_cores(ctx, runtime, hash, cores).await?; } + + // Save leaf ancestors for tasks pruning in the future. + self.relay_ancestors.insert(leaf, ancestors_in_session); + Ok(()) } /// Stop requesting chunks for obsolete heads. /// fn stop_requesting_chunks(&mut self, obsolete_leaves: impl Iterator) { - let obsolete_leaves: HashSet<_> = obsolete_leaves.collect(); + let obsolete_leaves: HashSet<_> = obsolete_leaves + .flat_map(|leaf| { + let leaf_ancestors = self.relay_ancestors.remove(&leaf).unwrap_or_default(); + std::iter::once(leaf).chain(leaf_ancestors) + }) + .collect(); self.fetches.retain(|_, task| { task.remove_leaves(&obsolete_leaves); task.is_live() @@ -154,7 +202,7 @@ impl Requester { runtime: &mut RuntimeInfo, leaf: Hash, cores: impl IntoIterator, - ) -> super::Result<()> + ) -> Result<()> where Context: SubsystemContext, { @@ -215,3 +263,83 @@ impl Stream for Requester { } } } + +/// Requests up to `limit` ancestor hashes of relay parent in the same session. +async fn get_block_ancestors_in_same_session( + ctx: &mut Context, + head: Hash, + limit: usize, +) -> Result> +where + Context: SubsystemContext, +{ + // The order is parent, grandparent, ... + // + // `limit + 1` since a session index for the last element in ancestry + // is obtained through its parent. It always gets truncated because + // `session_ancestry_len` can only be incremented `ancestors.len() - 1` times. + let mut ancestors = get_block_ancestors(ctx, head, limit + 1).await?; + + // `head` is the child of the first block in `ancestors`, request its session index. + let head_session_index = match ancestors.first() { + Some(parent) => get_session_index_for_child(ctx, *parent).await?, + None => { + // No first element, i.e. empty. + return Ok(ancestors) + }, + }; + + let mut session_ancestry_len = 0; + for parent in ancestors.iter().skip(1) { + // Parent is the i-th ancestor, request session index for its child -- (i-1)th element. + let session_index = get_session_index_for_child(ctx, *parent).await?; + if session_index == head_session_index { + session_ancestry_len += 1; + } else { + break + } + } + + // Drop the rest. + ancestors.truncate(session_ancestry_len); + + Ok(ancestors) +} + +/// Request up to `limit` ancestor hashes of relay parent from the Chain API. +async fn get_block_ancestors( + ctx: &mut Context, + relay_parent: Hash, + limit: usize, +) -> Result> +where + Context: SubsystemContext, +{ + let (tx, rx) = oneshot::channel(); + ctx.send_message(ChainApiMessage::Ancestors { + hash: relay_parent, + k: limit, + response_channel: tx, + }) + .await; + + let ancestors = rx.await.map_err(Fatal::ChainApiSenderDropped)?.map_err(Fatal::ChainApi)?; + Ok(ancestors) +} + +/// Request session index for the child of the relay parent from the Runtime API. +async fn get_session_index_for_child( + ctx: &mut Context, + relay_parent: Hash, +) -> Result +where + Context: SubsystemContext, +{ + let rx = request_session_index_for_child(relay_parent, ctx.sender()).await; + let session_index = rx + .await + .map_err(|err| runtime::Error::Fatal(runtime::Fatal::RuntimeRequestCanceled(err)))? + .map_err(|err| runtime::Error::NonFatal(runtime::NonFatal::RuntimeRequest(err)))?; + + Ok(session_index) +} From b5e97a2e603af48cb93f363564c54d11b0d446a6 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 17 Jan 2022 17:31:00 +0300 Subject: [PATCH 2/5] Re-use subsystem-util --- .../src/requester/mod.rs | 57 +++++++------------ 1 file changed, 21 insertions(+), 36 deletions(-) diff --git a/node/network/availability-distribution/src/requester/mod.rs b/node/network/availability-distribution/src/requester/mod.rs index 665d92e19f33..eff2beac84bc 100644 --- a/node/network/availability-distribution/src/requester/mod.rs +++ b/node/network/availability-distribution/src/requester/mod.rs @@ -32,11 +32,8 @@ use futures::{ Stream, }; -use polkadot_node_subsystem_util::{ - request_session_index_for_child, - runtime::{self, get_occupied_cores, RuntimeInfo}, -}; -use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore, SessionIndex}; +use polkadot_node_subsystem_util::runtime::{get_occupied_cores, RuntimeInfo}; +use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore}; use polkadot_subsystem::{ messages::{AllMessages, ChainApiMessage}, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext, @@ -146,18 +143,22 @@ impl Requester { Context: SubsystemContext, { let ActivatedLeaf { hash: leaf, .. } = new_head; - let ancestors_in_session = - get_block_ancestors_in_same_session(ctx, leaf, Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION) - .await - .unwrap_or_else(|err| { - tracing::debug!( - target: LOG_TARGET, - leaf = ?leaf, - "Failed to fetch leaf ancestors in the same session due to an error: {}", - err - ); - Vec::new() - }); + let ancestors_in_session = get_block_ancestors_in_same_session( + ctx, + runtime, + leaf, + Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION, + ) + .await + .unwrap_or_else(|err| { + tracing::debug!( + target: LOG_TARGET, + leaf = ?leaf, + "Failed to fetch leaf ancestors in the same session due to an error: {}", + err + ); + Vec::new() + }); for hash in std::iter::once(leaf).chain(ancestors_in_session.clone()) { let cores = get_occupied_cores(ctx, hash).await?; tracing::trace!( @@ -267,6 +268,7 @@ impl Stream for Requester { /// Requests up to `limit` ancestor hashes of relay parent in the same session. async fn get_block_ancestors_in_same_session( ctx: &mut Context, + runtime: &mut RuntimeInfo, head: Hash, limit: usize, ) -> Result> @@ -282,7 +284,7 @@ where // `head` is the child of the first block in `ancestors`, request its session index. let head_session_index = match ancestors.first() { - Some(parent) => get_session_index_for_child(ctx, *parent).await?, + Some(parent) => runtime.get_session_index(ctx.sender(), *parent).await?, None => { // No first element, i.e. empty. return Ok(ancestors) @@ -292,7 +294,7 @@ where let mut session_ancestry_len = 0; for parent in ancestors.iter().skip(1) { // Parent is the i-th ancestor, request session index for its child -- (i-1)th element. - let session_index = get_session_index_for_child(ctx, *parent).await?; + let session_index = runtime.get_session_index(ctx.sender(), *parent).await?; if session_index == head_session_index { session_ancestry_len += 1; } else { @@ -326,20 +328,3 @@ where let ancestors = rx.await.map_err(Fatal::ChainApiSenderDropped)?.map_err(Fatal::ChainApi)?; Ok(ancestors) } - -/// Request session index for the child of the relay parent from the Runtime API. -async fn get_session_index_for_child( - ctx: &mut Context, - relay_parent: Hash, -) -> Result -where - Context: SubsystemContext, -{ - let rx = request_session_index_for_child(relay_parent, ctx.sender()).await; - let session_index = rx - .await - .map_err(|err| runtime::Error::Fatal(runtime::Fatal::RuntimeRequestCanceled(err)))? - .map_err(|err| runtime::Error::NonFatal(runtime::NonFatal::RuntimeRequest(err)))?; - - Ok(session_index) -} From b01fc015b2dae448d8b7ed8d7a928d0bf393297e Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Tue, 18 Jan 2022 18:01:15 +0300 Subject: [PATCH 3/5] Rework ancestry tasks scheduling --- .../src/requester/mod.rs | 38 ++++++------------- 1 file changed, 12 insertions(+), 26 deletions(-) diff --git a/node/network/availability-distribution/src/requester/mod.rs b/node/network/availability-distribution/src/requester/mod.rs index eff2beac84bc..13c06eb599ec 100644 --- a/node/network/availability-distribution/src/requester/mod.rs +++ b/node/network/availability-distribution/src/requester/mod.rs @@ -66,13 +66,6 @@ pub struct Requester { /// Localized information about sessions we are currently interested in. session_cache: SessionCache, - /// Chain of up to [`LEAF_ANCESTRY_LEN_WITHIN_SESSION`] ancestors of activated leaves. - /// leaf -> [leaf parent, leaf grandparent, ...] - /// Fetch tasks also get launched for array `relay_ancestors[leaf]`, this helps with - /// possibly missing activated leaves updates, as well as with "slow" validators which do not - /// manage to fetch their chunks in time. - relay_ancestors: HashMap>, - /// Sender to be cloned for `FetchTask`s. tx: mpsc::Sender, @@ -93,14 +86,7 @@ impl Requester { /// by advancing the stream. pub fn new(metrics: Metrics) -> Self { let (tx, rx) = mpsc::channel(1); - Requester { - fetches: HashMap::new(), - session_cache: SessionCache::new(), - relay_ancestors: HashMap::new(), - tx, - rx, - metrics, - } + Requester { fetches: HashMap::new(), session_cache: SessionCache::new(), tx, rx, metrics } } /// Update heads that need availability distribution. @@ -159,31 +145,31 @@ impl Requester { ); Vec::new() }); - for hash in std::iter::once(leaf).chain(ancestors_in_session.clone()) { + // Also spawn or bump tasks for candidates in ancestry in the same session. + for hash in std::iter::once(leaf).chain(ancestors_in_session) { let cores = get_occupied_cores(ctx, hash).await?; tracing::trace!( target: LOG_TARGET, occupied_cores = ?cores, "Query occupied core" ); - self.add_cores(ctx, runtime, hash, cores).await?; + // Important: + // We mark the whole ancestry as live in the **leaf** hash, so we don't need to track + // any tasks separately. + // + // The next time the subsystem receives leaf update, some of spawned task will be bumped + // to be live in fresh relay parent, while some might get dropped due to the current leaf + // being deactivated. + self.add_cores(ctx, runtime, leaf, cores).await?; } - // Save leaf ancestors for tasks pruning in the future. - self.relay_ancestors.insert(leaf, ancestors_in_session); - Ok(()) } /// Stop requesting chunks for obsolete heads. /// fn stop_requesting_chunks(&mut self, obsolete_leaves: impl Iterator) { - let obsolete_leaves: HashSet<_> = obsolete_leaves - .flat_map(|leaf| { - let leaf_ancestors = self.relay_ancestors.remove(&leaf).unwrap_or_default(); - std::iter::once(leaf).chain(leaf_ancestors) - }) - .collect(); + let obsolete_leaves: HashSet<_> = obsolete_leaves.collect(); self.fetches.retain(|_, task| { task.remove_leaves(&obsolete_leaves); task.is_live() From 0f54ea3d71c6023e092e70222c09a447c7e8d166 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Fri, 21 Jan 2022 20:14:31 +0300 Subject: [PATCH 4/5] Requester tests --- .../src/requester/fetch_task/mod.rs | 2 +- .../src/requester/mod.rs | 3 + .../src/requester/tests.rs | 336 ++++++++++++++++++ 3 files changed, 340 insertions(+), 1 deletion(-) create mode 100644 node/network/availability-distribution/src/requester/tests.rs diff --git a/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/node/network/availability-distribution/src/requester/fetch_task/mod.rs index b3331b54a6de..a05ee0cd2d4c 100644 --- a/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -64,7 +64,7 @@ pub struct FetchTask { /// In other words, for which relay chain parents this candidate is considered live. /// This is updated on every `ActiveLeavesUpdate` and enables us to know when we can safely /// stop keeping track of that candidate/chunk. - live_in: HashSet, + pub(crate) live_in: HashSet, /// We keep the task around in until `live_in` becomes empty, to make /// sure we won't re-fetch an already fetched candidate. diff --git a/node/network/availability-distribution/src/requester/mod.rs b/node/network/availability-distribution/src/requester/mod.rs index 13c06eb599ec..b61c773f69ce 100644 --- a/node/network/availability-distribution/src/requester/mod.rs +++ b/node/network/availability-distribution/src/requester/mod.rs @@ -42,6 +42,9 @@ use polkadot_subsystem::{ use super::{Metrics, Result, LOG_TARGET}; use crate::error::Fatal; +#[cfg(test)] +mod tests; + /// Cache for session information. mod session_cache; use session_cache::SessionCache; diff --git a/node/network/availability-distribution/src/requester/tests.rs b/node/network/availability-distribution/src/requester/tests.rs new file mode 100644 index 000000000000..f44589ff9b88 --- /dev/null +++ b/node/network/availability-distribution/src/requester/tests.rs @@ -0,0 +1,336 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use std::{future::Future, sync::Arc}; + +use futures::FutureExt; + +use polkadot_node_network_protocol::jaeger; +use polkadot_node_primitives::{BlockData, ErasureChunk, PoV, SpawnNamed}; +use polkadot_node_subsystem_util::runtime::RuntimeInfo; +use polkadot_primitives::v1::{ + BlockNumber, CoreState, GroupIndex, Hash, Id, ScheduledCore, SessionIndex, SessionInfo, +}; +use polkadot_subsystem::{ + messages::{ + AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage, + NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest, + }, + ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, +}; +use polkadot_subsystem_testhelpers::{ + make_subsystem_context, mock::make_ferdie_keystore, TestSubsystemContext, + TestSubsystemContextHandle, +}; + +use sp_core::testing::TaskExecutor; + +use crate::tests::mock::{get_valid_chunk_data, make_session_info, OccupiedCoreBuilder}; + +use super::Requester; + +fn get_erasure_chunk() -> ErasureChunk { + let pov = PoV { block_data: BlockData(vec![45, 46, 47]) }; + get_valid_chunk_data(pov).1 +} + +#[derive(Clone)] +struct TestState { + /// Simulated relay chain heads. For each block except genesis + /// there exists a single corresponding candidate, handled in [`spawn_virtual_overseer`]. + pub relay_chain: Vec, + pub session_info: SessionInfo, + // Defines a way to compute a session index for the block with + // a given number. Returns 1 for all blocks by default. + pub session_index_for_block: fn(BlockNumber) -> SessionIndex, +} + +impl TestState { + fn new() -> Self { + let relay_chain: Vec<_> = (0u8..10).map(Hash::repeat_byte).collect(); + let session_info = make_session_info(); + let session_index_for_block = |_| 1; + Self { relay_chain, session_info, session_index_for_block } + } +} + +fn spawn_virtual_overseer( + pool: TaskExecutor, + test_state: TestState, + mut ctx_handle: TestSubsystemContextHandle, +) { + pool.spawn( + "virtual-overseer", + None, + async move { + loop { + let msg = ctx_handle.try_recv().await; + if msg.is_none() { + break + } + match msg.unwrap() { + AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(..)) => {}, + AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryChunk( + .., + tx, + )) => { + let chunk = get_erasure_chunk(); + tx.send(Some(chunk)).expect("Receiver is expected to be alive"); + }, + AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreChunk { + tx, + .. + }) => { + // Silently accept it. + tx.send(Ok(())).expect("Receiver is expected to be alive"); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, req)) => { + match req { + RuntimeApiRequest::SessionIndexForChild(tx) => { + let chain = &test_state.relay_chain; + let block_number = chain + .iter() + .position(|h| *h == hash) + .expect("Invalid session index request"); + // Compute session index. + let session_index_for_block = test_state.session_index_for_block; + + tx.send(Ok(session_index_for_block(block_number as u32 + 1))) + .expect("Receiver should still be alive"); + }, + RuntimeApiRequest::SessionInfo(_, tx) => { + tx.send(Ok(Some(test_state.session_info.clone()))) + .expect("Receiver should be alive."); + }, + RuntimeApiRequest::AvailabilityCores(tx) => { + let para_id = Id::from(1); + let maybe_block_position = + test_state.relay_chain.iter().position(|h| *h == hash); + let cores = match maybe_block_position { + Some(block_num) => { + let core = if block_num == 0 { + CoreState::Scheduled(ScheduledCore { + para_id, + collator: None, + }) + } else { + CoreState::Occupied( + OccupiedCoreBuilder { + group_responsible: GroupIndex(1), + para_id, + relay_parent: hash, + } + .build() + .0, + ) + }; + vec![core] + }, + None => Vec::new(), + }; + tx.send(Ok(cores)).expect("Receiver should be alive.") + }, + _ => { + panic!("Unexpected runtime request: {:?}", req); + }, + } + }, + AllMessages::ChainApi(ChainApiMessage::Ancestors { + hash, + k, + response_channel, + }) => { + let chain = &test_state.relay_chain; + let maybe_block_position = chain.iter().position(|h| *h == hash); + let ancestors = maybe_block_position + .map(|idx| chain[..idx].iter().rev().take(k).copied().collect()) + .unwrap_or_default(); + response_channel + .send(Ok(ancestors)) + .expect("Receiver is expected to be alive"); + }, + msg => panic!("Unexpected overseer message: {:?}", msg), + } + } + } + .boxed(), + ); +} + +fn test_harness>( + test_state: TestState, + test_fx: impl FnOnce(TestSubsystemContext) -> T, +) { + let pool = TaskExecutor::new(); + let (ctx, ctx_handle) = make_subsystem_context(pool.clone()); + + spawn_virtual_overseer(pool, test_state, ctx_handle); + + futures::executor::block_on(test_fx(ctx)); +} + +#[test] +fn check_ancestry_lookup_in_same_session() { + let test_state = TestState::new(); + let mut requester = Requester::new(Default::default()); + let keystore = make_ferdie_keystore(); + let mut runtime = RuntimeInfo::new(Some(keystore)); + + test_harness(test_state.clone(), |mut ctx| async move { + let chain = &test_state.relay_chain; + + let block_number = 1; + let update = ActiveLeavesUpdate { + activated: Some(ActivatedLeaf { + hash: chain[block_number], + number: block_number as u32, + status: LeafStatus::Fresh, + span: Arc::new(jaeger::Span::Disabled), + }), + deactivated: Vec::new().into(), + }; + + requester + .update_fetching_heads(&mut ctx, &mut runtime, update) + .await + .expect("Leaf processing failed"); + let fetch_tasks = &requester.fetches; + assert_eq!(fetch_tasks.len(), 1); + let block_1_candidate = + *fetch_tasks.keys().next().expect("A task is checked to be present; qed"); + + let block_number = 2; + let update = ActiveLeavesUpdate { + activated: Some(ActivatedLeaf { + hash: chain[block_number], + number: block_number as u32, + status: LeafStatus::Fresh, + span: Arc::new(jaeger::Span::Disabled), + }), + deactivated: Vec::new().into(), + }; + + requester + .update_fetching_heads(&mut ctx, &mut runtime, update) + .await + .expect("Leaf processing failed"); + let fetch_tasks = &requester.fetches; + assert_eq!(fetch_tasks.len(), 2); + let task = fetch_tasks.get(&block_1_candidate).expect("Leaf hasn't been deactivated yet"); + // The task should be live in both blocks 1 and 2. + assert_eq!(task.live_in.len(), 2); + let block_2_candidate = *fetch_tasks + .keys() + .find(|hash| **hash != block_1_candidate) + .expect("Two tasks are present, the first one corresponds to block 1 candidate; qed"); + + // Deactivate both blocks but keep the second task as a + // part of ancestry. + let block_number = 2 + Requester::LEAF_ANCESTRY_LEN_WITHIN_SESSION; + let update = ActiveLeavesUpdate { + activated: Some(ActivatedLeaf { + hash: test_state.relay_chain[block_number], + number: block_number as u32, + status: LeafStatus::Fresh, + span: Arc::new(jaeger::Span::Disabled), + }), + deactivated: vec![chain[1], chain[2]].into(), + }; + requester + .update_fetching_heads(&mut ctx, &mut runtime, update) + .await + .expect("Leaf processing failed"); + let fetch_tasks = &requester.fetches; + // The leaf + K its ancestors. + assert_eq!(fetch_tasks.len(), Requester::LEAF_ANCESTRY_LEN_WITHIN_SESSION + 1); + + let block_2_task = fetch_tasks + .get(&block_2_candidate) + .expect("Expected to be live as a part of ancestry"); + assert_eq!(block_2_task.live_in.len(), 1); + }); +} + +#[test] +fn check_ancestry_lookup_in_different_sessions() { + let mut test_state = TestState::new(); + let mut requester = Requester::new(Default::default()); + let keystore = make_ferdie_keystore(); + let mut runtime = RuntimeInfo::new(Some(keystore)); + + test_state.session_index_for_block = |block_number| match block_number { + 0..=3 => 1, + _ => 2, + }; + + test_harness(test_state.clone(), |mut ctx| async move { + let chain = &test_state.relay_chain; + + let block_number = 3; + let update = ActiveLeavesUpdate { + activated: Some(ActivatedLeaf { + hash: chain[block_number], + number: block_number as u32, + status: LeafStatus::Fresh, + span: Arc::new(jaeger::Span::Disabled), + }), + deactivated: Vec::new().into(), + }; + + requester + .update_fetching_heads(&mut ctx, &mut runtime, update) + .await + .expect("Leaf processing failed"); + let fetch_tasks = &requester.fetches; + assert_eq!(fetch_tasks.len(), 3.min(Requester::LEAF_ANCESTRY_LEN_WITHIN_SESSION + 1)); + + let block_number = 4; + let update = ActiveLeavesUpdate { + activated: Some(ActivatedLeaf { + hash: chain[block_number], + number: block_number as u32, + status: LeafStatus::Fresh, + span: Arc::new(jaeger::Span::Disabled), + }), + deactivated: vec![chain[1], chain[2], chain[3]].into(), + }; + + requester + .update_fetching_heads(&mut ctx, &mut runtime, update) + .await + .expect("Leaf processing failed"); + let fetch_tasks = &requester.fetches; + assert_eq!(fetch_tasks.len(), 1); + + let block_number = 5; + let update = ActiveLeavesUpdate { + activated: Some(ActivatedLeaf { + hash: chain[block_number], + number: block_number as u32, + status: LeafStatus::Fresh, + span: Arc::new(jaeger::Span::Disabled), + }), + deactivated: vec![chain[4]].into(), + }; + + requester + .update_fetching_heads(&mut ctx, &mut runtime, update) + .await + .expect("Leaf processing failed"); + let fetch_tasks = &requester.fetches; + assert_eq!(fetch_tasks.len(), 2.min(Requester::LEAF_ANCESTRY_LEN_WITHIN_SESSION + 1)); + }); +} From d2d2e647f0311431366e5d4b729e2773847c6d1e Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Fri, 21 Jan 2022 22:56:33 +0300 Subject: [PATCH 5/5] Improve readability for ancestors lookup --- node/network/availability-distribution/src/requester/mod.rs | 6 ++++-- node/subsystem-types/src/lib.rs | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/node/network/availability-distribution/src/requester/mod.rs b/node/network/availability-distribution/src/requester/mod.rs index b61c773f69ce..2f9a9069cd46 100644 --- a/node/network/availability-distribution/src/requester/mod.rs +++ b/node/network/availability-distribution/src/requester/mod.rs @@ -270,9 +270,10 @@ where // is obtained through its parent. It always gets truncated because // `session_ancestry_len` can only be incremented `ancestors.len() - 1` times. let mut ancestors = get_block_ancestors(ctx, head, limit + 1).await?; + let mut ancestors_iter = ancestors.iter(); // `head` is the child of the first block in `ancestors`, request its session index. - let head_session_index = match ancestors.first() { + let head_session_index = match ancestors_iter.next() { Some(parent) => runtime.get_session_index(ctx.sender(), *parent).await?, None => { // No first element, i.e. empty. @@ -281,7 +282,8 @@ where }; let mut session_ancestry_len = 0; - for parent in ancestors.iter().skip(1) { + // The first parent is skipped. + for parent in ancestors_iter { // Parent is the i-th ancestor, request session index for its child -- (i-1)th element. let session_index = runtime.get_session_index(ctx.sender(), *parent).await?; if session_index == head_session_index { diff --git a/node/subsystem-types/src/lib.rs b/node/subsystem-types/src/lib.rs index dca7d56a98b4..797e195a5854 100644 --- a/node/subsystem-types/src/lib.rs +++ b/node/subsystem-types/src/lib.rs @@ -40,7 +40,7 @@ pub use polkadot_node_jaeger as jaeger; const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8; /// The status of an activated leaf. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum LeafStatus { /// A leaf is fresh when it's the first time the leaf has been encountered. /// Most leaves should be fresh.