Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Keep sessions in window for the full unfinalized chain #6054

Merged
merged 6 commits into from
Oct 4, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 110 additions & 7 deletions node/subsystem-util/src/rolling_session_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ use polkadot_primitives::v2::{Hash, SessionIndex, SessionInfo};

use futures::channel::oneshot;
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{RuntimeApiMessage, RuntimeApiRequest},
errors::{ChainApiError, RuntimeApiError},
messages::{ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest},
overseer,
};

const LOG_TARGET: &str = "parachain::rolling-session-window";

/// Sessions unavailable in state to cache.
#[derive(Debug, Clone, thiserror::Error)]
pub enum SessionsUnavailableReason {
Expand All @@ -38,9 +40,18 @@ pub enum SessionsUnavailableReason {
/// The runtime API itself returned an error.
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
/// The chain API itself returned an error.
#[error(transparent)]
ChainApi(#[from] ChainApiError),
/// Missing session info from runtime API for given `SessionIndex`.
#[error("Missing session index {0:?}")]
Missing(SessionIndex),
/// Missing last finalized block number.
#[error("Missing last finalized block number")]
MissingLastFinalizedBlock,
/// Missing last finalized block hash.
#[error("Missing last finalized block hash")]
MissingLastFinalizedBlockHash,
sandreim marked this conversation as resolved.
Show resolved Hide resolved
}

/// Information about the sessions being fetched.
Expand Down Expand Up @@ -98,7 +109,8 @@ impl RollingSessionWindow {
block_hash: Hash,
) -> Result<Self, SessionsUnavailable>
where
Sender: overseer::SubsystemSender<RuntimeApiMessage>,
Sender: overseer::SubsystemSender<RuntimeApiMessage>
+ overseer::SubsystemSender<ChainApiMessage>,
{
let session_index = get_session_index_for_child(&mut sender, block_hash).await?;

Expand Down Expand Up @@ -146,19 +158,105 @@ impl RollingSessionWindow {
self.earliest_session + (self.session_info.len() as SessionIndex).saturating_sub(1)
}

async fn last_finalized_block_session<Sender>(
&self,
sender: &mut Sender,
) -> Result<u32, SessionsUnavailable>
where
Sender: overseer::SubsystemSender<RuntimeApiMessage>
+ overseer::SubsystemSender<ChainApiMessage>,
{
let last_finalized_height = {
let (tx, rx) = oneshot::channel();
sender.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await;
match rx.await {
Ok(Ok(number)) => number,
Ok(Err(e)) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::ChainApi(e),
info: None,
}),
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?err,
"Failed fetching last finalized block number"
);
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::MissingLastFinalizedBlock,
info: None,
})
},
}
};

let (tx, rx) = oneshot::channel();
// We want to get the parent of the last finalized block.
sandreim marked this conversation as resolved.
Show resolved Hide resolved
sender
.send_message(ChainApiMessage::FinalizedBlockHash(
last_finalized_height.saturating_sub(1),
tx,
))
.await;
let last_finalized_hash_parent = match rx.await {
Ok(Ok(maybe_hash)) => maybe_hash,
Ok(Err(e)) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::ChainApi(e),
info: None,
}),
Err(err) => {
gum::warn!(target: LOG_TARGET, ?err, "Failed fetching last finalized block hash");
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash,
info: None,
})
},
};

// Get the session in which the last finalized block was authored.
if let Some(last_finalized_hash_parent) = last_finalized_hash_parent {
let session =
match get_session_index_for_child(sender, last_finalized_hash_parent).await {
Ok(session_index) => session_index,
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?err,
?last_finalized_hash_parent,
"Failed fetching session index"
);
return Err(err)
},
};

Ok(session)
} else {
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash,
info: None,
})
}
}

/// When inspecting a new import notification, updates the session info cache to match
/// the session of the imported block's child.
///
/// this only needs to be called on heads where we are directly notified about import, as sessions do
/// not change often and import notifications are expected to be typically increasing in session number.
///
/// some backwards drift in session index is acceptable.
pub async fn cache_session_info_for_head(
pub async fn cache_session_info_for_head<Sender>(
&mut self,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
sender: &mut Sender,
block_hash: Hash,
) -> Result<SessionWindowUpdate, SessionsUnavailable> {
) -> Result<SessionWindowUpdate, SessionsUnavailable>
where
Sender: overseer::SubsystemSender<RuntimeApiMessage>
+ overseer::SubsystemSender<ChainApiMessage>,
{
let session_index = get_session_index_for_child(sender, block_hash).await?;
let last_finalized_block_session = self.last_finalized_block_session(sender).await?;
sandreim marked this conversation as resolved.
Show resolved Hide resolved

let old_window_start = self.earliest_session;

Expand All @@ -171,7 +269,12 @@ impl RollingSessionWindow {

let old_window_end = latest;

let window_start = session_index.saturating_sub(self.window_size.get() - 1);
// Ensure we keep sessions up to last finalized block by adjusting the window start.
// This will increase the session window to cover the full unfinalized chain.
let window_start = std::cmp::min(
session_index.saturating_sub(self.window_size.get() - 1),
last_finalized_block_session,
);

// keep some of the old window, if applicable.
let overlap_start = window_start.saturating_sub(old_window_start);
Expand Down