-
Notifications
You must be signed in to change notification settings - Fork 1.6k
availability-distribution: look for leaf ancestors within the same session #4596
Changes from 1 commit
3365d96
b5e97a2
b01fc01
0f54ea3
d2d2e64
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,18 +27,23 @@ use std::{ | |
}; | ||
|
||
use futures::{ | ||
channel::mpsc, | ||
channel::{mpsc, oneshot}, | ||
task::{Context, Poll}, | ||
Stream, | ||
}; | ||
|
||
use polkadot_node_subsystem_util::runtime::{get_occupied_cores, RuntimeInfo}; | ||
use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore}; | ||
use polkadot_node_subsystem_util::{ | ||
request_session_index_for_child, | ||
runtime::{self, get_occupied_cores, RuntimeInfo}, | ||
}; | ||
use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore, SessionIndex}; | ||
use polkadot_subsystem::{ | ||
messages::AllMessages, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext, | ||
messages::{AllMessages, ChainApiMessage}, | ||
ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext, | ||
}; | ||
|
||
use super::{Metrics, LOG_TARGET}; | ||
use super::{Metrics, Result, LOG_TARGET}; | ||
use crate::error::Fatal; | ||
|
||
/// Cache for session information. | ||
mod session_cache; | ||
|
@@ -64,6 +69,13 @@ pub struct Requester { | |
/// Localized information about sessions we are currently interested in. | ||
session_cache: SessionCache, | ||
|
||
/// Chain of up to [`LEAF_ANCESTRY_LEN_WITHIN_SESSION`] ancestors of activated leaves. | ||
/// leaf -> [leaf parent, leaf grandparent, ...] | ||
/// Fetch tasks also get launched for array `relay_ancestors[leaf]`, this helps with | ||
/// possibly missing activated leaves updates, as well as with "slow" validators which do not | ||
/// manage to fetch their chunks in time. | ||
relay_ancestors: HashMap<Hash, Vec<Hash>>, | ||
|
||
/// Sender to be cloned for `FetchTask`s. | ||
tx: mpsc::Sender<FromFetchTask>, | ||
|
||
|
@@ -75,14 +87,25 @@ pub struct Requester { | |
} | ||
|
||
impl Requester { | ||
/// How many ancestors of the leaf should we consider along with it. | ||
pub(crate) const LEAF_ANCESTRY_LEN_WITHIN_SESSION: usize = 3; | ||
|
||
/// Create a new `Requester`. | ||
/// | ||
/// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress | ||
/// by advancing the stream. | ||
pub fn new(metrics: Metrics) -> Self { | ||
let (tx, rx) = mpsc::channel(1); | ||
Requester { fetches: HashMap::new(), session_cache: SessionCache::new(), tx, rx, metrics } | ||
Requester { | ||
fetches: HashMap::new(), | ||
session_cache: SessionCache::new(), | ||
relay_ancestors: HashMap::new(), | ||
tx, | ||
rx, | ||
metrics, | ||
} | ||
} | ||
|
||
/// Update heads that need availability distribution. | ||
/// | ||
/// For all active heads we will be fetching our chunks for availability distribution. | ||
|
@@ -91,50 +114,75 @@ impl Requester { | |
ctx: &mut Context, | ||
runtime: &mut RuntimeInfo, | ||
update: ActiveLeavesUpdate, | ||
) -> super::Result<()> | ||
) -> Result<()> | ||
where | ||
Context: SubsystemContext, | ||
{ | ||
tracing::trace!(target: LOG_TARGET, ?update, "Update fetching heads"); | ||
let ActiveLeavesUpdate { activated, deactivated } = update; | ||
// Stale leaves happen after a reversion - we don't want to re-run availability there. | ||
let activated = activated.and_then(|h| match h.status { | ||
LeafStatus::Stale => None, | ||
LeafStatus::Fresh => Some(h), | ||
}); | ||
// Order important! We need to handle activated, prior to deactivated, otherwise we might | ||
// cancel still needed jobs. | ||
self.start_requesting_chunks(ctx, runtime, activated.into_iter()).await?; | ||
if let Some(activated) = activated { | ||
// Stale leaves happen after a reversion - we don't want to re-run availability there. | ||
if let LeafStatus::Fresh = activated.status { | ||
Comment on lines
+111
to
+113
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there really no better way to do this check?? Maybe this is more elegant than two if let's or the ugly thing that was there before:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can also express it with if matches!(activated, Some(activated) if activated.status == LeafStatus::Fresh) { (provided one derives the PartialEq for status) |
||
self.start_requesting_chunks(ctx, runtime, activated).await?; | ||
} | ||
} | ||
self.stop_requesting_chunks(deactivated.into_iter()); | ||
Ok(()) | ||
} | ||
|
||
/// Start requesting chunks for newly imported heads. | ||
/// Start requesting chunks for newly imported head. | ||
/// | ||
/// This will also request [`SESSION_ANCESTRY_LEN`] leaf ancestors from the same session | ||
/// and start requesting chunks for them too. | ||
async fn start_requesting_chunks<Context>( | ||
&mut self, | ||
ctx: &mut Context, | ||
runtime: &mut RuntimeInfo, | ||
new_heads: impl Iterator<Item = ActivatedLeaf>, | ||
) -> super::Result<()> | ||
new_head: ActivatedLeaf, | ||
) -> Result<()> | ||
where | ||
Context: SubsystemContext, | ||
{ | ||
for ActivatedLeaf { hash: leaf, .. } in new_heads { | ||
let cores = get_occupied_cores(ctx, leaf).await?; | ||
let ActivatedLeaf { hash: leaf, .. } = new_head; | ||
let ancestors_in_session = | ||
get_block_ancestors_in_same_session(ctx, leaf, Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION) | ||
.await | ||
.unwrap_or_else(|err| { | ||
tracing::debug!( | ||
target: LOG_TARGET, | ||
leaf = ?leaf, | ||
"Failed to fetch leaf ancestors in the same session due to an error: {}", | ||
err | ||
); | ||
Vec::new() | ||
}); | ||
for hash in std::iter::once(leaf).chain(ancestors_in_session.clone()) { | ||
let cores = get_occupied_cores(ctx, hash).await?; | ||
tracing::trace!( | ||
target: LOG_TARGET, | ||
occupied_cores = ?cores, | ||
"Query occupied core" | ||
); | ||
self.add_cores(ctx, runtime, leaf, cores).await?; | ||
self.add_cores(ctx, runtime, hash, cores).await?; | ||
} | ||
|
||
// Save leaf ancestors for tasks pruning in the future. | ||
self.relay_ancestors.insert(leaf, ancestors_in_session); | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Stop requesting chunks for obsolete heads. | ||
/// | ||
fn stop_requesting_chunks(&mut self, obsolete_leaves: impl Iterator<Item = Hash>) { | ||
let obsolete_leaves: HashSet<_> = obsolete_leaves.collect(); | ||
let obsolete_leaves: HashSet<_> = obsolete_leaves | ||
slumber marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.flat_map(|leaf| { | ||
let leaf_ancestors = self.relay_ancestors.remove(&leaf).unwrap_or_default(); | ||
std::iter::once(leaf).chain(leaf_ancestors) | ||
}) | ||
.collect(); | ||
self.fetches.retain(|_, task| { | ||
task.remove_leaves(&obsolete_leaves); | ||
task.is_live() | ||
|
@@ -154,7 +202,7 @@ impl Requester { | |
runtime: &mut RuntimeInfo, | ||
leaf: Hash, | ||
cores: impl IntoIterator<Item = OccupiedCore>, | ||
) -> super::Result<()> | ||
) -> Result<()> | ||
where | ||
Context: SubsystemContext, | ||
{ | ||
|
@@ -215,3 +263,83 @@ impl Stream for Requester { | |
} | ||
} | ||
} | ||
|
||
/// Requests up to `limit` ancestor hashes of relay parent in the same session. | ||
async fn get_block_ancestors_in_same_session<Context>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logic here seems accurate accounting for the fact that |
||
ctx: &mut Context, | ||
head: Hash, | ||
limit: usize, | ||
) -> Result<Vec<Hash>> | ||
where | ||
Context: SubsystemContext, | ||
{ | ||
// The order is parent, grandparent, ... | ||
// | ||
// `limit + 1` since a session index for the last element in ancestry | ||
// is obtained through its parent. It always gets truncated because | ||
// `session_ancestry_len` can only be incremented `ancestors.len() - 1` times. | ||
let mut ancestors = get_block_ancestors(ctx, head, limit + 1).await?; | ||
|
||
// `head` is the child of the first block in `ancestors`, request its session index. | ||
let head_session_index = match ancestors.first() { | ||
Some(parent) => get_session_index_for_child(ctx, *parent).await?, | ||
None => { | ||
// No first element, i.e. empty. | ||
return Ok(ancestors) | ||
}, | ||
}; | ||
|
||
let mut session_ancestry_len = 0; | ||
for parent in ancestors.iter().skip(1) { | ||
// Parent is the i-th ancestor, request session index for its child -- (i-1)th element. | ||
let session_index = get_session_index_for_child(ctx, *parent).await?; | ||
if session_index == head_session_index { | ||
session_ancestry_len += 1; | ||
} else { | ||
break | ||
} | ||
} | ||
|
||
// Drop the rest. | ||
ancestors.truncate(session_ancestry_len); | ||
|
||
Ok(ancestors) | ||
} | ||
|
||
/// Request up to `limit` ancestor hashes of relay parent from the Chain API. | ||
async fn get_block_ancestors<Context>( | ||
ctx: &mut Context, | ||
relay_parent: Hash, | ||
limit: usize, | ||
) -> Result<Vec<Hash>> | ||
where | ||
Context: SubsystemContext, | ||
{ | ||
let (tx, rx) = oneshot::channel(); | ||
ctx.send_message(ChainApiMessage::Ancestors { | ||
hash: relay_parent, | ||
k: limit, | ||
response_channel: tx, | ||
}) | ||
.await; | ||
|
||
let ancestors = rx.await.map_err(Fatal::ChainApiSenderDropped)?.map_err(Fatal::ChainApi)?; | ||
Ok(ancestors) | ||
} | ||
|
||
/// Request session index for the child of the relay parent from the Runtime API. | ||
async fn get_session_index_for_child<Context>( | ||
slumber marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ctx: &mut Context, | ||
relay_parent: Hash, | ||
) -> Result<SessionIndex> | ||
where | ||
Context: SubsystemContext, | ||
{ | ||
let rx = request_session_index_for_child(relay_parent, ctx.sender()).await; | ||
let session_index = rx | ||
.await | ||
.map_err(|err| runtime::Error::Fatal(runtime::Fatal::RuntimeRequestCanceled(err)))? | ||
.map_err(|err| runtime::Error::NonFatal(runtime::NonFatal::RuntimeRequest(err)))?; | ||
|
||
Ok(session_index) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!