Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Refactor and fix usage of get_session_index() and get_session_info_by_index() #4735

Merged
merged 14 commits into from
Jan 26, 2022
Merged
38 changes: 18 additions & 20 deletions node/network/availability-distribution/src/requester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,36 +98,33 @@ impl Requester {
tracing::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
let ActiveLeavesUpdate { activated, deactivated } = update;
// Stale leaves happen after a reversion - we don't want to re-run availability there.
let activated = activated.and_then(|h| match h.status {
LeafStatus::Stale => None,
LeafStatus::Fresh => Some(h),
});
// Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs.
self.start_requesting_chunks(ctx, runtime, activated.into_iter()).await?;
if let Some(leaf) = activated.filter(|leaf| leaf.status == LeafStatus::Fresh) {
// Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs.
self.start_requesting_chunks(ctx, runtime, leaf).await?;
}

self.stop_requesting_chunks(deactivated.into_iter());
Ok(())
}

/// Start requesting chunks for newly imported heads.
/// Start requesting chunks for newly imported relay chain head.
async fn start_requesting_chunks<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
new_heads: impl Iterator<Item = ActivatedLeaf>,
leaf: ActivatedLeaf,
) -> super::Result<()>
where
Context: SubsystemContext,
{
for ActivatedLeaf { hash: leaf, .. } in new_heads {
let cores = get_occupied_cores(ctx, leaf).await?;
tracing::trace!(
target: LOG_TARGET,
occupied_cores = ?cores,
"Query occupied core"
);
self.add_cores(ctx, runtime, leaf, cores).await?;
}
let cores = get_occupied_cores(ctx, leaf.hash).await?;
tracing::trace!(
target: LOG_TARGET,
occupied_cores = ?cores,
"Query occupied core"
);
self.add_cores(ctx, runtime, leaf.hash, cores).await?;
Ok(())
}

Expand Down Expand Up @@ -175,8 +172,9 @@ impl Requester {
ctx,
runtime,
// We use leaf here, as relay_parent must be in the same session as the
// leaf. (Cores are dropped at session boundaries.) At the same time,
// only leaves are guaranteed to be fetchable by the state trie.
// leaf. This is guaranteed by runtime which ensures that cores are cleared
// at session boundaries. At the same time, only leaves are guaranteed to
// be fetchable by the state trie.
leaf,
|info| FetchTaskConfig::new(leaf, &core, tx, metrics, info),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl SessionCache {
Context: SubsystemContext,
F: FnOnce(&SessionInfo) -> R,
{
let session_index = runtime.get_session_index(ctx.sender(), parent).await?;
let session_index = runtime.get_session_index_for_child(ctx.sender(), parent).await?;

if let Some(o_info) = self.session_info_cache.get(&session_index) {
tracing::trace!(target: LOG_TARGET, session_index, "Got session from lru");
Expand Down Expand Up @@ -177,13 +177,15 @@ impl SessionCache {
&self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
parent: Hash,
relay_parent: Hash,
sandreim marked this conversation as resolved.
Show resolved Hide resolved
session_index: SessionIndex,
) -> Result<Option<SessionInfo>, Error>
where
Context: SubsystemContext,
{
let info = runtime.get_session_info_by_index(ctx.sender(), parent, session_index).await?;
let info = runtime
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await?;

let discovery_keys = info.session_info.discovery_keys.clone();
let mut validator_groups = info.session_info.validator_groups.clone();
Expand Down
2 changes: 1 addition & 1 deletion node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
let session_index = runtime.get_session_index(ctx.sender(), relay_parent).await?;
let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
let info = &runtime
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await?
Expand Down
29 changes: 24 additions & 5 deletions node/network/dispute-distribution/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,26 @@ impl DisputeSender {
dispute: (SessionIndex, CandidateHash),
) -> Result<()> {
let (session_index, candidate_hash) = dispute;
// We need some relay chain head for context for receiving session info information:
let ref_head = self.active_sessions.values().next().ok_or(NonFatal::NoActiveHeads)?;
// A relay chain head is required as context for receiving session info information from runtime and
// storage. We will iterate `active_sessions` to find a suitable head. We assume that there is at
// least one active head which, by `session_index`, is at least as recent as the `dispute` passed in.
// We need to avoid picking an older one from a session that might not yet exist in storage.
// Related to <https://github.com/paritytech/polkadot/issues/4730> .
let ref_head = self
.active_sessions
.iter()
.find_map(|(active_session_index, head_hash)| {
// There might be more than one session index that is at least as recent as the dispute
// so we just pick the first one. Keep in mind we are talking about the session index for the
// child of block identified by `head_hash` and not the session index for the block.
if active_session_index >= &session_index {
Some(head_hash)
} else {
None
}
})
.ok_or(NonFatal::NoActiveHeads)?;

let info = runtime
.get_session_info_by_index(ctx.sender(), *ref_head, session_index)
.await?;
Expand Down Expand Up @@ -293,7 +311,7 @@ impl DisputeSender {
ctx: &mut Context,
runtime: &mut RuntimeInfo,
) -> Result<bool> {
let new_sessions = get_active_session_indeces(ctx, runtime, &self.active_heads).await?;
let new_sessions = get_active_session_indices(ctx, runtime, &self.active_heads).await?;
let new_sessions_raw: HashSet<_> = new_sessions.keys().collect();
let old_sessions_raw: HashSet<_> = self.active_sessions.keys().collect();
let updated = new_sessions_raw != old_sessions_raw;
Expand All @@ -306,14 +324,15 @@ impl DisputeSender {
/// Retrieve the currently active sessions.
///
/// List is all indices of all active sessions together with the head that was used for the query.
async fn get_active_session_indeces<Context: SubsystemContext>(
async fn get_active_session_indices<Context: SubsystemContext>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
active_heads: &Vec<Hash>,
) -> Result<HashMap<SessionIndex, Hash>> {
let mut indeces = HashMap::new();
// Iterate all heads we track as active and fetch the child' session indices.
for head in active_heads {
let session_index = runtime.get_session_index(ctx.sender(), *head).await?;
let session_index = runtime.get_session_index_for_child(ctx.sender(), *head).await?;
indeces.insert(session_index, *head);
}
Ok(indeces)
Expand Down
6 changes: 4 additions & 2 deletions node/network/dispute-distribution/src/sender/send_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ impl SendTask {
active_sessions: &HashMap<SessionIndex, Hash>,
) -> Result<HashSet<AuthorityDiscoveryId>> {
let ref_head = self.request.0.candidate_receipt.descriptor.relay_parent;
// Parachain validators:
// Retrieve all authorities which participated in the parachain consensus of the session
// in which the candidate was backed.
let info = runtime
.get_session_info_by_index(ctx.sender(), ref_head, self.request.0.session_index)
.await?;
Expand All @@ -219,7 +220,8 @@ impl SendTask {
.map(|(_, v)| v.clone())
.collect();

// Current authorities:
// Retrieve all authorities for the current session as indicated by the active
// heads we are tracking.
for (session_index, head) in active_sessions.iter() {
let info =
runtime.get_session_info_by_index(ctx.sender(), *head, *session_index).await?;
Expand Down
7 changes: 4 additions & 3 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,9 +624,9 @@ struct ActiveHeadData {
statements: IndexMap<StoredStatementComparator, SignedFullStatement>,
/// Large statements we are waiting for with associated meta data.
waiting_large_statements: HashMap<CandidateHash, LargeStatementStatus>,
/// The validators at this head.
/// The parachain validators at the head's child session index.
validators: Vec<ValidatorId>,
/// The session index this head is at.
/// The current session index of this fork.
session_index: sp_staking::SessionIndex,
/// How many `Seconded` statements we've seen per validator.
seconded_counts: HashMap<ValidatorIndex, usize>,
Expand Down Expand Up @@ -1798,8 +1798,9 @@ impl StatementDistributionSubsystem {
"New active leaf",
);

// Retrieve the parachain validators at the child of the head we track.
let session_index =
runtime.get_session_index(ctx.sender(), relay_parent).await?;
runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
let info = runtime
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion node/subsystem-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use polkadot_node_jaeger as jaeger;
const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8;

/// The status of an activated leaf.
#[derive(Debug, Clone)]
#[derive(Clone, Debug, PartialEq)]
pub enum LeafStatus {
/// A leaf is fresh when it's the first time the leaf has been encountered.
/// Most leaves should be fresh.
Expand Down
11 changes: 8 additions & 3 deletions node/subsystem-util/src/rolling_session_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl RollingSessionWindow {
window_size: SessionWindowSize,
block_hash: Hash,
) -> Result<Self, SessionsUnavailable> {
let session_index = get_session_index_for_head(ctx, block_hash).await?;
let session_index = get_session_index_for_child(ctx, block_hash).await?;

let window_start = session_index.saturating_sub(window_size.get() - 1);

Expand Down Expand Up @@ -160,7 +160,7 @@ impl RollingSessionWindow {
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
block_hash: Hash,
) -> Result<SessionWindowUpdate, SessionsUnavailable> {
let session_index = get_session_index_for_head(ctx, block_hash).await?;
let session_index = get_session_index_for_child(ctx, block_hash).await?;

let old_window_start = self.earliest_session;

Expand Down Expand Up @@ -212,7 +212,12 @@ impl RollingSessionWindow {
}
}

async fn get_session_index_for_head(
// Returns the session index expected at any child of the `parent` block.
//
// Note: We could use `RuntimeInfo::get_session_index_for_child` here but it's
// cleaner to just call the runtime API directly without needing to create an instance
// of `RuntimeInfo`.
async fn get_session_index_for_child(
sandreim marked this conversation as resolved.
Show resolved Hide resolved
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
block_hash: Hash,
) -> Result<SessionIndex, SessionsUnavailable> {
Expand Down
19 changes: 10 additions & 9 deletions node/subsystem-util/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ impl RuntimeInfo {
}
}

/// Retrieve the current session index.
pub async fn get_session_index<Sender>(
/// Returns the session index expected at any child of the `parent` block.
/// This does not return the session index for the `parent` block.
pub async fn get_session_index_for_child<Sender>(
&mut self,
sender: &mut Sender,
parent: Hash,
Expand All @@ -141,14 +142,14 @@ impl RuntimeInfo {
pub async fn get_session_info<'a, Sender>(
&'a mut self,
sender: &mut Sender,
parent: Hash,
relay_parent: Hash,
) -> Result<&'a ExtendedSessionInfo>
where
Sender: SubsystemSender,
{
let session_index = self.get_session_index(sender, parent).await?;
let session_index = self.get_session_index_for_child(sender, relay_parent).await?;

self.get_session_info_by_index(sender, parent, session_index).await
self.get_session_info_by_index(sender, relay_parent, session_index).await
}

/// Get `ExtendedSessionInfo` by session index.
Expand Down Expand Up @@ -185,7 +186,7 @@ impl RuntimeInfo {
pub async fn check_signature<Sender, Payload, RealPayload>(
&mut self,
sender: &mut Sender,
parent: Hash,
relay_parent: Hash,
signed: UncheckedSigned<Payload, RealPayload>,
) -> Result<
std::result::Result<Signed<Payload, RealPayload>, UncheckedSigned<Payload, RealPayload>>,
Expand All @@ -195,9 +196,9 @@ impl RuntimeInfo {
Payload: EncodeAs<RealPayload> + Clone,
RealPayload: Encode + Clone,
{
let session_index = self.get_session_index(sender, parent).await?;
let info = self.get_session_info_by_index(sender, parent, session_index).await?;
Ok(check_signature(session_index, &info.session_info, parent, signed))
let session_index = self.get_session_index_for_child(sender, relay_parent).await?;
let info = self.get_session_info_by_index(sender, relay_parent, session_index).await?;
Ok(check_signature(session_index, &info.session_info, relay_parent, signed))
}

/// Build `ValidatorInfo` for the current session.
Expand Down