Skip to content

Commit

Permalink
chainHead: Track reported blocks to capture notification gaps (#5856)
Browse files Browse the repository at this point in the history
There are cases during warp sync or re-orgs, where we receive a
notification with a block parent that was not reported in the past. This
PR extends the tracking state to catch those cases and report a `Stop`
event to the user.

This PR adds a new state to the RPC-v2 chainHead to track which blocks
have been reported.

In the past we relied on the pinning mechanism to provide us details if
a block is pinned or not.
However, the pinning state keeps the minimal information around for
pinning. Therefore, unpinning a block will cause the state to disappear.

Closes: #5761

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
  • Loading branch information
lexnv and skunert authored Oct 7, 2024
1 parent ee0c460 commit d66a5a4
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 18 deletions.
17 changes: 17 additions & 0 deletions prdoc/pr_5856.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Extend state tracking of chainHead to capture notification gaps

doc:
- audience: Node Dev
description: |
This PR extends the state tracking of the RPC-v2 chainHead methods.
ChainHead tracks the reported blocks to detect notification gaps.
This state tracking ensures we can detect `NewBlock` events for
which we did not report previously the parent hash.

crates:
- name: sc-rpc-spec-v2
bump: minor

151 changes: 133 additions & 18 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::chain_head::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
},
subscription::{SubscriptionManagement, SubscriptionManagementError},
subscription::{InsertedSubscriptionData, SubscriptionManagement, SubscriptionManagementError},
};
use futures::{
channel::oneshot,
Expand Down Expand Up @@ -53,8 +53,6 @@ use std::{
/// `Initialized` event.
const MAX_FINALIZED_BLOCKS: usize = 16;

use super::subscription::InsertedSubscriptionData;

/// Generates the events of the `chainHead_follow` method.
pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
Expand All @@ -71,11 +69,76 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
current_best_block: Option<Block::Hash>,
/// LRU cache of pruned blocks.
pruned_blocks: LruMap<Block::Hash, ()>,
/// LRU cache of announced blocks.
announced_blocks: AnnouncedBlocks<Block>,
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
max_lagging_distance: usize,
}

struct AnnouncedBlocks<Block: BlockT> {
/// Unfinalized blocks.
blocks: LruMap<Block::Hash, ()>,
/// Finalized blocks.
finalized: MostRecentFinalizedBlocks<Block>,
}

/// Wrapper over LRU to efficiently lookup hashes and remove elements as FIFO queue.
///
/// For the finalized blocks we use `peek` to avoid moving the block counter to the front.
/// This effectively means that the LRU acts as a FIFO queue. Otherwise, we might
/// end up with scenarios where the "finalized block" in the end of LRU is overwritten which
/// may not necessarily be the oldest finalized block i.e, possible that "get" promotes an
/// older finalized block because it was accessed more recently.
struct MostRecentFinalizedBlocks<Block: BlockT>(LruMap<Block::Hash, ()>);

impl<Block: BlockT> MostRecentFinalizedBlocks<Block> {
/// Insert the finalized block hash into the LRU cache.
fn insert(&mut self, block: Block::Hash) {
self.0.insert(block, ());
}

/// Check if the block is contained in the LRU cache.
fn contains(&mut self, block: &Block::Hash) -> Option<&()> {
self.0.peek(block)
}
}

impl<Block: BlockT> AnnouncedBlocks<Block> {
/// Creates a new `AnnouncedBlocks`.
fn new() -> Self {
Self {
// The total number of pinned blocks is `MAX_PINNED_BLOCKS`, ensure we don't
// exceed the limit.
blocks: LruMap::new(ByLength::new((MAX_PINNED_BLOCKS - MAX_FINALIZED_BLOCKS) as u32)),
// We are keeping a smaller number of announced finalized blocks in memory.
// This is because the `Finalized` event might be triggered before the `NewBlock` event.
finalized: MostRecentFinalizedBlocks(LruMap::new(ByLength::new(
MAX_FINALIZED_BLOCKS as u32,
))),
}
}

/// Insert the block into the announced blocks.
fn insert(&mut self, block: Block::Hash, finalized: bool) {
if finalized {
// When a block is declared as finalized, it is removed from the unfinalized blocks.
//
// Given that the finalized blocks are bounded to `MAX_FINALIZED_BLOCKS`,
// this ensures we keep the minimum number of blocks in memory.
self.blocks.remove(&block);
self.finalized.insert(block);
} else {
self.blocks.insert(block, ());
}
}

/// Check if the block was previously announced.
fn was_announced(&mut self, block: &Block::Hash) -> bool {
self.blocks.get(block).is_some() || self.finalized.contains(block).is_some()
}
}

impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
/// Create a new [`ChainHeadFollower`].
pub fn new(
Expand All @@ -96,6 +159,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
pruned_blocks: LruMap::new(ByLength::new(
MAX_PINNED_BLOCKS.try_into().unwrap_or(u32::MAX),
)),
announced_blocks: AnnouncedBlocks::new(),
max_lagging_distance,
}
}
Expand Down Expand Up @@ -214,7 +278,7 @@ where
///
/// This edge-case can happen for parachains where the relay chain syncs slower to
/// the head of the chain than the parachain node that is synced already.
fn distace_within_reason(
fn distance_within_reason(
&self,
block: Block::Hash,
finalized: Block::Hash,
Expand Down Expand Up @@ -250,7 +314,7 @@ where
// Ensure all leaves are within a reasonable distance from the finalized block,
// before traversing the tree.
for leaf in &leaves {
self.distace_within_reason(*leaf, finalized)?;
self.distance_within_reason(*leaf, finalized)?;
}

for leaf in leaves {
Expand Down Expand Up @@ -326,6 +390,10 @@ where
let finalized_block_hash = startup_point.finalized_hash;
let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);

for finalized in &finalized_block_hashes {
self.announced_blocks.insert(*finalized, true);
}

let initialized_event = FollowEvent::Initialized(Initialized {
finalized_block_hashes: finalized_block_hashes.into(),
finalized_block_runtime,
Expand All @@ -336,6 +404,13 @@ where

finalized_block_descendants.push(initialized_event);
for (child, parent) in initial_blocks.into_iter() {
// If the parent was not announced we have a gap currently.
// This can happen during a WarpSync.
if !self.announced_blocks.was_announced(&parent) {
return Err(SubscriptionManagementError::BlockHeaderAbsent);
}
self.announced_blocks.insert(child, false);

let new_runtime = self.generate_runtime_event(child, Some(parent));

let event = FollowEvent::NewBlock(NewBlock {
Expand All @@ -351,6 +426,11 @@ where
// Generate a new best block event.
let best_block_hash = startup_point.best_hash;
if best_block_hash != finalized_block_hash {
if !self.announced_blocks.was_announced(&best_block_hash) {
return Err(SubscriptionManagementError::BlockHeaderAbsent);
}
self.announced_blocks.insert(best_block_hash, true);

let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
self.current_best_block = Some(best_block_hash);
finalized_block_descendants.push(best_block);
Expand Down Expand Up @@ -408,21 +488,41 @@ where
notification: BlockImportNotification<Block>,
startup_point: &StartupPoint<Block>,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
// The block was already pinned by the initial block events or by the finalized event.
if !self.sub_handle.pin_block(&self.sub_id, notification.hash)? {
return Ok(Default::default())
}
let block_hash = notification.hash;

// Ensure we are only reporting blocks after the starting point.
if *notification.header.number() < startup_point.finalized_number {
return Ok(Default::default())
}

Ok(self.generate_import_events(
notification.hash,
*notification.header.parent_hash(),
notification.is_new_best,
))
// Ensure the block can be pinned before generating the events.
if !self.sub_handle.pin_block(&self.sub_id, block_hash)? {
// The block is already pinned, this is similar to the check above.
//
// The `SubscriptionManagement` ensures the block is tracked until (short lived):
// - 2 calls to `pin_block` are made (from `Finalized` and `NewBlock` branches).
// - the block is unpinned by the user
//
// This is rather a sanity checks for edge-cases (in theory), where
// [`MAX_FINALIZED_BLOCKS` + 1] finalized events are triggered before the `NewBlock`
// event of the first `Finalized` event.
return Ok(Default::default())
}

if self.announced_blocks.was_announced(&block_hash) {
// Block was already reported by the finalized branch.
return Ok(Default::default())
}

// Double check the parent hash. If the parent hash is not reported, we have a gap.
let parent_block_hash = *notification.header.parent_hash();
if !self.announced_blocks.was_announced(&parent_block_hash) {
// The parent block was not reported, we have a gap.
return Err(SubscriptionManagementError::Custom("Parent block was not reported".into()))
}

self.announced_blocks.insert(block_hash, false);
Ok(self.generate_import_events(block_hash, parent_block_hash, notification.is_new_best))
}

/// Generates new block events from the given finalized hashes.
Expand All @@ -448,19 +548,29 @@ where
return Err(SubscriptionManagementError::BlockHeaderAbsent)
};

if !self.announced_blocks.was_announced(first_header.parent_hash()) {
return Err(SubscriptionManagementError::Custom(
"Parent block was not reported for a finalized block".into(),
));
}

let parents =
std::iter::once(first_header.parent_hash()).chain(finalized_block_hashes.iter());
for (i, (hash, parent)) in finalized_block_hashes.iter().zip(parents).enumerate() {
// Check if the block was already reported and thus, is already pinned.
if !self.sub_handle.pin_block(&self.sub_id, *hash)? {
continue
// Ensure the block is pinned before generating the events.
self.sub_handle.pin_block(&self.sub_id, *hash)?;

// Check if the block was already reported.
if self.announced_blocks.was_announced(hash) {
continue;
}

// Generate `NewBlock` events for all blocks beside the last block in the list
let is_last = i + 1 == finalized_block_hashes.len();
if !is_last {
// Generate only the `NewBlock` event for this block.
events.extend(self.generate_import_events(*hash, *parent, false));
self.announced_blocks.insert(*hash, true);
continue;
}

Expand All @@ -483,7 +593,8 @@ where
}

// Let's generate the `NewBlock` and `NewBestBlock` events for the block.
events.extend(self.generate_import_events(*hash, *parent, true))
events.extend(self.generate_import_events(*hash, *parent, true));
self.announced_blocks.insert(*hash, true);
}

Ok(events)
Expand Down Expand Up @@ -545,6 +656,10 @@ where
let pruned_block_hashes =
self.get_pruned_hashes(&notification.stale_heads, last_finalized)?;

for finalized in &finalized_block_hashes {
self.announced_blocks.insert(*finalized, true);
}

let finalized_event = FollowEvent::Finalized(Finalized {
finalized_block_hashes,
pruned_block_hashes: pruned_block_hashes.clone(),
Expand Down
68 changes: 68 additions & 0 deletions substrate/client/rpc-spec-v2/src/chain_head/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3965,3 +3965,71 @@ async fn follow_report_best_block_of_a_known_block() {
});
assert_eq!(event, expected);
}

#[tokio::test]
async fn follow_event_with_unknown_parent() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let client = Arc::new(builder.build());

let client_mock = Arc::new(ChainHeadMockClient::new(client.clone()));

let api = ChainHead::new(
client_mock.clone(),
backend,
Arc::new(TokioTestExecutor::default()),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
max_lagging_distance: MAX_LAGGING_DISTANCE,
},
)
.into_rpc();

let finalized_hash = client.info().finalized_hash;
let mut sub = api.subscribe_unbounded("chainHead_v1_follow", [false]).await.unwrap();
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Initialized(Initialized {
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
finalized_block_runtime: None,
with_runtime: false,
});
assert_eq!(event, expected);

// Block tree:
//
// finalized -> (gap: block 1) -> block 2
//
// Block 1 is not announced yet. ChainHead should report the stop
// event when encountering an unknown parent of block 2.

// Note: `client` is used just for constructing the blocks.
// The blocks are imported to chainHead using the `client_mock`.
let block_1 = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap()
.build()
.unwrap()
.block;
let block_1_hash = block_1.hash();
client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();

let block_2 = BlockBuilderBuilder::new(&*client)
.on_parent_block(block_1_hash)
.with_parent_block_number(1)
.build()
.unwrap()
.build()
.unwrap()
.block;
client.import(BlockOrigin::Own, block_2.clone()).await.unwrap();

run_with_timeout(client_mock.trigger_import_stream(block_2.header)).await;
// When importing the block 2, chainHead detects a gap in our blocks and stops.
assert_matches!(get_next_event::<FollowEvent<String>>(&mut sub).await, FollowEvent::Stop);
}

0 comments on commit d66a5a4

Please sign in to comment.