Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statement-distribution: fix filtering of statements for elastic parachains #3879

Merged
merged 19 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions polkadot/node/collation-generation/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum Error {
#[error(transparent)]
Util(#[from] polkadot_node_subsystem_util::Error),
#[error(transparent)]
UtilRuntime(#[from] polkadot_node_subsystem_util::runtime::Error),
#[error(transparent)]
Erasure(#[from] polkadot_erasure_coding::Error),
#[error("Parachain backing state not available in runtime.")]
MissingParaBackingState,
Expand Down
54 changes: 10 additions & 44 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,23 @@ use polkadot_node_primitives::{
SubmitCollationParams,
};
use polkadot_node_subsystem::{
messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiRequest},
messages::{CollationGenerationMessage, CollatorProtocolMessage},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
SubsystemContext, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{
has_required_runtime, request_async_backing_params, request_availability_cores,
request_claim_queue, request_para_backing_state, request_persisted_validation_data,
request_validation_code, request_validation_code_hash, request_validators,
request_async_backing_params, request_availability_cores, request_para_backing_state,
request_persisted_validation_data, request_validation_code, request_validation_code_hash,
request_validators,
vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core},
};
use polkadot_primitives::{
collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt,
CollatorPair, CoreIndex, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData, ScheduledCore, ValidationCodeHash,
PersistedValidationData, ValidationCodeHash,
};
use sp_core::crypto::Pair;
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};
use std::sync::Arc;

mod error;

Expand Down Expand Up @@ -228,7 +226,9 @@ async fn handle_new_activations<Context>(
let availability_cores = availability_cores??;
let async_backing_params = async_backing_params?.ok();
let n_validators = validators??.len();
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?;
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent)
.await
.map_err(crate::error::Error::UtilRuntime)?;

// The loop bellow will fill in cores that the para is allowed to build on.
let mut cores_to_build_on = Vec::new();
Expand Down Expand Up @@ -655,37 +655,3 @@ fn erasure_root(
let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
Ok(polkadot_erasure_coding::branches(&chunks).root())
}

// Checks if the runtime supports `request_claim_queue` and executes it. Returns `Ok(None)`
// otherwise. Any [`RuntimeApiError`]s are bubbled up to the caller.
async fn fetch_claim_queue(
sender: &mut impl overseer::CollationGenerationSenderTrait,
relay_parent: Hash,
) -> crate::error::Result<Option<BTreeMap<CoreIndex, VecDeque<ParaId>>>> {
if has_required_runtime(
sender,
relay_parent,
RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
)
.await
{
let res = request_claim_queue(relay_parent, sender).await.await??;
Ok(Some(res))
} else {
gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`");
Ok(None)
}
}

// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`.
// This function is supposed to be used in `handle_new_activations` hence the return type.
fn fetch_next_scheduled_on_core(
claim_queue: &BTreeMap<CoreIndex, VecDeque<ParaId>>,
core_idx: CoreIndex,
) -> Option<ScheduledCore> {
claim_queue
.get(&core_idx)?
.front()
.cloned()
.map(|para_id| ScheduledCore { para_id, collator: None })
}
15 changes: 9 additions & 6 deletions polkadot/node/collation-generation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ use polkadot_node_subsystem::{
ActivatedLeaf,
};
use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_node_subsystem_util::{vstaging::ClaimQueueSnapshot, TimeoutExt};
use polkadot_primitives::{
async_backing::{BackingState, CandidatePendingAvailability},
AsyncBackingParams, BlockNumber, CollatorPair, HeadData, PersistedValidationData,
ScheduledCore, ValidationCode,
};
use rstest::rstest;
use sp_keyring::sr25519::Keyring as Sr25519Keyring;
use std::pin::Pin;
use std::{
collections::{BTreeMap, VecDeque},
pin::Pin,
};
use test_helpers::{
dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator, make_candidate,
};
Expand Down Expand Up @@ -617,7 +620,7 @@ fn fallback_when_no_validation_code_hash_api(#[case] runtime_version: u32) {
_hash,
RuntimeApiRequest::ClaimQueue(tx),
))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
let res = BTreeMap::<CoreIndex, VecDeque<ParaId>>::new();
let res = ClaimQueueSnapshot::new();
tx.send(Ok(res)).unwrap();
},
Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
Expand Down Expand Up @@ -780,7 +783,7 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run
candidate_hash: Default::default(),
candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
})];
let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);

test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
Expand Down Expand Up @@ -962,7 +965,7 @@ fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled(
candidate_hash: Default::default(),
candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
})];
let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);

test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
Expand Down Expand Up @@ -1050,7 +1053,7 @@ mod helpers {
async_backing_params: AsyncBackingParams,
cores: Vec<CoreState>,
runtime_version: u32,
claim_queue: BTreeMap<CoreIndex, VecDeque<ParaId>>,
claim_queue: ClaimQueueSnapshot,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
Expand Down
3 changes: 3 additions & 0 deletions polkadot/node/network/statement-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ pub enum Error {
#[error("Fetching validator groups failed {0:?}")]
FetchValidatorGroups(RuntimeApiError),

#[error("Fetching claim queue failed {0:?}")]
FetchClaimQueue(runtime::Error),

#[error("Attempted to share statement when not a validator or not assigned")]
InvalidShare,

Expand Down
107 changes: 75 additions & 32 deletions polkadot/node/network/statement-distribution/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use polkadot_node_subsystem_util::{
backing_implicit_view::View as ImplicitView,
reputation::ReputationAggregator,
runtime::{request_min_backing_votes, ProspectiveParachainsMode},
vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core, ClaimQueueSnapshot},
};
use polkadot_primitives::{
AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, CoreState, GroupIndex,
Expand Down Expand Up @@ -149,10 +150,9 @@ pub(crate) const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(1);
struct PerRelayParentState {
local_validator: Option<LocalValidatorState>,
statement_store: StatementStore,
availability_cores: Vec<CoreState>,
group_rotation_info: GroupRotationInfo,
seconding_limit: usize,
session: SessionIndex,
groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
}

impl PerRelayParentState {
Expand Down Expand Up @@ -693,15 +693,24 @@ pub(crate) async fn handle_active_leaves_update<Context>(
}
});

let maybe_claim_queue = fetch_claim_queue(ctx.sender(), new_relay_parent)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we pass per value (can't re-use the result), might make sense to move that call into determine_groups_per_para.

The result of request_availability_cores is used in find_active_validator_state, but wrongly. It uses para_id a function we should remove, as it makes no sense to either return the ParaId of the occupying para or the scheduled one.*)

Regardless, I would move the fetching code into the function that need the data. handle_active_leaves_update is already pages of code.

*) Note: This is obviously unrelated, so can be a separate PR, but it should be fixed for Coretime asap. @tdimitrov

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.await
.map_err(JfyiError::FetchClaimQueue)?;

let groups_per_para = determine_groups_per_para(
&availability_cores,
&group_rotation_info,
&maybe_claim_queue,
seconding_limit - 1,
sandreim marked this conversation as resolved.
Show resolved Hide resolved
);
state.per_relay_parent.insert(
new_relay_parent,
PerRelayParentState {
local_validator,
statement_store: StatementStore::new(&per_session.groups),
availability_cores,
group_rotation_info,
seconding_limit,
session: session_index,
groups_per_para,
},
);
}
Expand Down Expand Up @@ -2126,17 +2135,54 @@ async fn provide_candidate_to_grid<Context>(
}
}

fn group_for_para(
// Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings.
fn determine_groups_per_para(
availability_cores: &[CoreState],
group_rotation_info: &GroupRotationInfo,
para_id: ParaId,
) -> Option<GroupIndex> {
// Note: this won't work well for on-demand parachains as it assumes that core assignments are
// fixed across blocks.
let core_index = availability_cores.iter().position(|c| c.para_id() == Some(para_id));
maybe_claim_queue: &Option<ClaimQueueSnapshot>,
max_candidate_depth: usize,
) -> HashMap<ParaId, Vec<GroupIndex>> {
// Determine the core indices occupied by each para at the current relay parent. To support
// on-demand parachains we also consider the core indices at next block if core has a candidate
// pending availability.
let para_core_indices = availability_cores.iter().enumerate().filter_map(|(index, core)| {
match core {
CoreState::Scheduled(scheduled_core) =>
Some((scheduled_core.para_id, CoreIndex(index as u32))),
CoreState::Occupied(occupied_core) => {
if max_candidate_depth >= 1 {
// Use claim queue if available, or fallback to `next_up_on_available`
let maybe_scheduled_core = match maybe_claim_queue {
sandreim marked this conversation as resolved.
Show resolved Hide resolved
Some(claim_queue) => {
// What's up next on this core ?
fetch_next_scheduled_on_core(claim_queue, CoreIndex(index as u32))
},
None => {
// Runtime doesn't support claim queue runtime api. Fallback to
// `next_up_on_available`
occupied_core.next_up_on_available.clone()
},
};

maybe_scheduled_core
.filter(|scheduled_core| scheduled_core.para_id == occupied_core.para_id())
sandreim marked this conversation as resolved.
Show resolved Hide resolved
.map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32)))
} else {
None
}
},
CoreState::Free => None,
}
});

core_index
.map(|c| group_rotation_info.group_for_core(CoreIndex(c as _), availability_cores.len()))
let mut groups_per_para = HashMap::new();
// Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`.
for (para, core_index) in para_core_indices {
let group_index = group_rotation_info.group_for_core(core_index, availability_cores.len());
alexggh marked this conversation as resolved.
Show resolved Hide resolved
groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index)
}

groups_per_para
}

#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
Expand Down Expand Up @@ -2192,18 +2238,14 @@ async fn fragment_tree_update_inner<Context>(
let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash);
let prs = state.per_relay_parent.get_mut(&receipt.descriptor().relay_parent);
if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) {
let group_index = group_for_para(
&prs.availability_cores,
&prs.group_rotation_info,
receipt.descriptor().para_id,
);

let per_session = state.per_session.get(&prs.session);
if let (Some(per_session), Some(group_index)) = (per_session, group_index) {
// TODO(maybe for sanity): perform an extra check on the candidate backing group
alexggh marked this conversation as resolved.
Show resolved Hide resolved
// index all allowed
if let Some(per_session) = per_session {
send_backing_fresh_statements(
ctx,
candidate_hash,
group_index,
confirmed.group_index(),
&receipt.descriptor().relay_parent,
prs,
confirmed,
Expand Down Expand Up @@ -2311,13 +2353,14 @@ async fn handle_incoming_manifest_common<'a, Context>(
Some(x) => x,
};

let expected_group = group_for_para(
&relay_parent_state.availability_cores,
&relay_parent_state.group_rotation_info,
para_id,
);
let expected_groups = relay_parent_state.groups_per_para.get(&para_id);

if expected_group != Some(manifest_summary.claimed_group_index) {
if expected_groups.is_none() ||
!expected_groups
.expect("checked is_some(); qed")
sandreim marked this conversation as resolved.
Show resolved Hide resolved
.iter()
.any(|g| g == &manifest_summary.claimed_group_index)
{
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
return None
}
Expand Down Expand Up @@ -3037,13 +3080,13 @@ pub(crate) async fn handle_response<Context>(
relay_parent_state.session,
|v| per_session.session_info.validators.get(v).map(|x| x.clone()),
|para, g_index| {
let expected_group = group_for_para(
&relay_parent_state.availability_cores,
&relay_parent_state.group_rotation_info,
para,
);
let expected_groups = relay_parent_state.groups_per_para.get(&para);

Some(g_index) == expected_group
expected_groups.is_some() &&
expected_groups
.expect("checked is_some(); qed")
sandreim marked this conversation as resolved.
Show resolved Hide resolved
.iter()
.any(|g| g == &g_index)
},
disabled_mask,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,66 @@ fn useful_cluster_statement_from_non_cluster_peer_rejected() {
});
}

// Both validators in the test are part of backing groups assigned to same parachain
#[test]
fn elastic_scaling_useful_cluster_statement_from_non_cluster_peer_rejected() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

let config = TestConfig {
validator_count: 20,
group_size: 3,
local_validator: LocalRole::Validator,
async_backing_params: None,
};

let relay_parent = Hash::repeat_byte(1);
let peer_a = PeerId::random();

test_harness(config, |state, mut overseer| async move {
let candidate_hash = CandidateHash(Hash::repeat_byte(42));

let test_leaf = state.make_dummy_leaf_with_multiple_cores_per_para(relay_parent, 3);

// Peer A is not in our group, but it's group is assigned to same para as we are.
sandreim marked this conversation as resolved.
Show resolved Hide resolved
let not_our_group = GroupIndex(1);

let that_group_validators = state.group_validators(not_our_group, false);
let v_non = that_group_validators[0];

connect_peer(
&mut overseer,
peer_a.clone(),
Some(vec![state.discovery_id(v_non)].into_iter().collect()),
)
.await;

send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;

let statement = state
.sign_statement(
v_non,
CompactStatement::Seconded(candidate_hash),
&SigningContext { parent_hash: relay_parent, session_index: 1 },
)
.as_unchecked()
.clone();

send_peer_message(
&mut overseer,
peer_a.clone(),
protocol_v2::StatementDistributionMessage::Statement(relay_parent, statement),
)
.await;

assert_matches!(
overseer.recv().await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
if p == peer_a && r == COST_UNEXPECTED_STATEMENT_INVALID_SENDER.into() => { }
);

overseer
});
}

#[test]
fn statement_from_non_cluster_originator_unexpected() {
let config = TestConfig {
Expand Down
Loading
Loading