Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Validator side of the collator protocol uses CQ #4675

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions cumulus/zombienet/tests/0009-ct-shared-core.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
[settings]
timeout = 1000

[relaychain.genesis.runtimeGenesis.patch.configuration.config.async_backing_params]
max_candidate_depth = 2
allowed_ancestry_len = 2

[relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params]
max_validators_per_core = 1
lookahead = 2
num_cores = 4

[relaychain.genesis.runtimeGenesis.patch.configuration.config.approval_voting_params]
needed_approvals = 3
max_approval_coalesce_count = 5

[relaychain]
default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}"
chain = "rococo-local"
command = "polkadot"

[[relaychain.nodes]]
name = "alice"
args = ["-lruntime=debug,parachain=trace,runtime::parachains::scheduler=trace,runtime::inclusion-inherent=trace,runtime::inclusion=trace,parachain::collator-protocol=trace" ]

[[relaychain.node_groups]]
name = "validator"
args = ["-lruntime=debug,parachain=trace,runtime::parachains::scheduler=trace,runtime::inclusion-inherent=trace,runtime::inclusion=trace,parachain::collator-protocol=trace" ]
count = 4

{% for id in range(2000,2003) %}
[[parachains]]
id = {{id}}
addToGenesis = true
cumulus_based = true
chain = "glutton-westend-local-{{id}}"
[parachains.genesis.runtimeGenesis.patch.glutton]
compute = "50000000"
storage = "2500000000"
trashDataCount = 5120

[parachains.collator]
name = "collator"
image = "{{CUMULUS_IMAGE}}"
command = "polkadot-parachain"
args = ["-lparachain=debug,aura::cumulus=trace,parachain::collator-protocol=trace"]

{% endfor %}
23 changes: 23 additions & 0 deletions cumulus/zombienet/tests/0009-ct-shared-core.zndsl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Description: CT shared core test
Network: ./0009-ct-shared-core.toml
Creds: config

alice: is up
collator: is up
collator-1: is up

# configure relay chain
alice: js-script ./assign-core.js return is 0 within 600 seconds

#collator-single-core: reports block height is at least 20 within 225 seconds
#collator-elastic: reports block height is at least 20 within 225 seconds

#collator-elastic: restart after 30 seconds
#sleep 10 seconds
#collator-single-core: restart after 30 seconds

#collator-single-core: reports block height is at least 40 within 225 seconds
#collator-elastic: reports block height is at least 80 within 225 seconds


sleep 43200 seconds
49 changes: 49 additions & 0 deletions cumulus/zombienet/tests/assign-core-parts.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Assign a parachain to a core.
//
// First argument should be the parachain id.
// Second argument should be the core.
// Third argument should be PartsOf57600 assigned for the parachain id.
async function run(nodeName, networkInfo, args) {
const { wsUri, userDefinedTypes } = networkInfo.nodesByName[nodeName];
const api = await zombie.connect(wsUri, userDefinedTypes);

let para = Number(args[0]);
let core = Number(args[1]);
let parts_of_57600 = Number(args[2]);

console.log(`Assigning para ${para} to core ${core} parts_of_57600 ${parts_of_57600}`);

await zombie.util.cryptoWaitReady();

// Submit transaction with Alice accoung
const keyring = new zombie.Keyring({ type: "sr25519" });
const alice = keyring.addFromUri("//Alice");

// Wait for this transaction to be finalized in a block.
await new Promise(async (resolve, reject) => {
const unsub = await api.tx.sudo
.sudo(api.tx.coretime.assignCore(core, 0, [[{ task: para }, parts_of_57600]], null))
.signAndSend(alice, ({ status, isError }) => {
if (status.isInBlock) {
console.log(
`Transaction included at blockhash ${status.asInBlock}`,
);
} else if (status.isFinalized) {
console.log(
`Transaction finalized at blockHash ${status.asFinalized}`,
);
unsub();
return resolve();
} else if (isError) {
console.log(`Transaction error`);
reject(`Transaction error`);
}
});
});



return 0;
}

module.exports = { run };
59 changes: 59 additions & 0 deletions cumulus/zombienet/tests/assign-core.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Assign a parachain to a core.
async function run(nodeName, networkInfo, args) {
const { wsUri, userDefinedTypes } = networkInfo.nodesByName[nodeName];
const api = await zombie.connect(wsUri, userDefinedTypes);

await zombie.util.cryptoWaitReady();

// Submit transaction with Alice accoung
const keyring = new zombie.Keyring({ type: "sr25519" });
const alice = keyring.addFromUri("//Alice");

// Wait for this transaction to be finalized in a block.
await new Promise(async (resolve, reject) => {
const unsub = await api.tx.sudo
.sudo(api.tx.system.killPrefix("0x638595eebaa445ce03a13547bece90e704e6ac775a3245623103ffec2cb2c92f", 10))
.signAndSend(alice, ({ status, isError }) => {
if (status.isInBlock) {
console.log(
`killPrefix transaction included at blockhash ${status.asInBlock}`,
);
} else if (status.isFinalized) {
console.log(
`killPrefix transaction finalized at blockHash ${status.asFinalized}`,
);
unsub();
return resolve();
} else if (isError) {
console.log(`killPrefix error`);
reject(`killPrefix error`);
}
});
});

// Wait for this transaction to be finalized in a block.
await new Promise(async (resolve, reject) => {
const unsub = await api.tx.sudo
.sudo(api.tx.coretime.assignCore(0, 0, [[{ task: 2000 }, 19200], [{ task: 2001 }, 19200], [{ task: 2002 }, 19200]], null))
.signAndSend(alice, ({ status, isError }) => {
if (status.isInBlock) {
console.log(
`assignCore transaction included at blockhash ${status.asInBlock}`,
);
} else if (status.isFinalized) {
console.log(
`assignCore transaction finalized at blockHash ${status.asFinalized}`,
);
unsub();
return resolve();
} else if (isError) {
console.log(`assignCore error`);
reject(`assignCore error`);
}
});
});

return 0;
}

module.exports = { run };
1 change: 1 addition & 0 deletions polkadot/node/core/prospective-parachains/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,7 @@ async fn fetch_upcoming_paras<Context>(
for core in cores {
match core {
CoreState::Occupied(occupied) => {
upcoming.insert(occupied.para_id());
if let Some(next_up_on_available) = occupied.next_up_on_available {
upcoming.insert(next_up_on_available.para_id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ async fn determine_cores(
if relay_parent_mode.is_enabled() {
// With async backing we don't care about the core state,
// it is only needed for figuring our validators group.
Some(occupied.candidate_descriptor.para_id)
occupied.next_up_on_available.as_ref().map(|c| c.para_id)
} else {
None
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ where
let core_now = rotation_info.core_for_group(group, cores.len());

cores.get(core_now.0 as usize).and_then(|c| match c {
CoreState::Occupied(core) if relay_parent_mode.is_enabled() => Some(core.para_id()),
CoreState::Occupied(core) if relay_parent_mode.is_enabled() =>
core.next_up_on_available.as_ref().map(|c| c.para_id),
CoreState::Scheduled(core) => Some(core.para_id),
CoreState::Occupied(_) | CoreState::Free => None,
})
Expand Down Expand Up @@ -1709,6 +1710,7 @@ async fn run_inner<Context>(
target: LOG_TARGET,
?relay_parent,
?collator_id,
?maybe_candidate_hash,
"Timeout hit - already seconded?"
);
dequeue_next_collation_and_fetch(
Expand Down
2 changes: 2 additions & 0 deletions polkadot/runtime/parachains/src/inclusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,8 @@ impl<T: Config> Pallet<T> {
where
GV: Fn(GroupIndex) -> Option<Vec<ValidatorIndex>>,
{
log::trace!(target: LOG_TARGET, "Processing {} candidates {:?}", candidates.len(), candidates);

if candidates.is_empty() {
return Ok(ProcessedCandidates::default())
}
Expand Down
3 changes: 3 additions & 0 deletions polkadot/runtime/parachains/src/paras_inherent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,9 @@ impl<T: Config> Pallet<T> {
scheduler::Pallet::<T>::group_validators,
core_index_enabled,
)?;

log::trace!(target: LOG_TARGET, "Processed candidates {:?}", candidate_receipt_with_backing_validator_indices.iter().map(|c| (c.0.descriptor.para_id, c.0.descriptor.relay_parent, c.0.descriptor.para_head)).collect::<Vec<_>>());

// Note which of the scheduled cores were actually occupied by a backed candidate.
scheduler::Pallet::<T>::occupied(occupied.into_iter().map(|e| (e.0, e.1)).collect());

Expand Down
50 changes: 50 additions & 0 deletions polkadot/runtime/parachains/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ impl<T: Config> Pallet<T> {
/// `AssignmentProvider`. A claim is considered expired if it's `ttl` field is lower than the
/// current block height.
fn drop_expired_claims_from_claimqueue() {
log::trace!(
target: LOG_TARGET,
"[drop_expired_claims_from_claimqueue] enter {:?}",
ClaimQueue::<T>::get(),
);
let now = frame_system::Pallet::<T>::block_number();
let availability_cores = AvailabilityCores::<T>::get();
let ttl = configuration::ActiveConfig::<T>::get().scheduler_params.ttl;
Expand All @@ -363,6 +368,12 @@ impl<T: Config> Pallet<T> {
while i < core_claimqueue.len() {
let maybe_dropped = if let Some(entry) = core_claimqueue.get(i) {
if entry.ttl < now {
log::trace!(
target: LOG_TARGET,
"[drop_expired_claims_from_claimqueue] Dropping expired claim from claimqueue; idx={} para_id={:?}",
i,
entry.para_id()
);
core_claimqueue.remove(i)
} else {
None
Expand All @@ -379,17 +390,40 @@ impl<T: Config> Pallet<T> {
}
}

log::trace!(
target: LOG_TARGET,
"[drop_expired_claims_from_claimqueue] After iterating the claimqueue for core_idx={:?}, num_dropped={:?}",
core_idx,
num_dropped
);

for _ in 0..num_dropped {
log::trace!(
target: LOG_TARGET,
"[drop_expired_claims_from_claimqueue] Attempting to pop a new entry for core_idx={:?}",
core_idx,
);
// For all claims dropped due to TTL, attempt to pop a new entry to
// the back of the claimqueue.
if let Some(assignment) =
T::AssignmentProvider::pop_assignment_for_core(core_idx)
{
log::trace!(
target: LOG_TARGET,
"[drop_expired_claims_from_claimqueue] Popped a new entry for core_idx={:?}",
core_idx,
);
core_claimqueue.push_back(ParasEntry::new(assignment, now + ttl));
}
}
}
}

log::trace!(
target: LOG_TARGET,
"[drop_expired_claims_from_claimqueue] before return {:?}",
ClaimQueue::<T>::get(),
);
});
}

Expand Down Expand Up @@ -537,6 +571,7 @@ impl<T: Config> Pallet<T> {

// on new session
fn push_claimqueue_items_to_assignment_provider() {
log::debug!(target: LOG_TARGET, "[push_claimqueue_items_to_assignment_provider] enter");
for (_, claim_queue) in ClaimQueue::<T>::take() {
// Push back in reverse order so that when we pop from the provider again,
// the entries in the claimqueue are in the same order as they are right now.
Expand All @@ -550,6 +585,7 @@ impl<T: Config> Pallet<T> {
/// timed out on availability before.
fn maybe_push_assignment(pe: ParasEntryType<T>) {
if pe.availability_timeouts == 0 {
log::debug!(target: LOG_TARGET, "[maybe_push_assignment] pushing {:?}", pe);
T::AssignmentProvider::push_back_assignment(pe.assignment);
}
}
Expand All @@ -566,6 +602,8 @@ impl<T: Config> Pallet<T> {
just_freed_cores: impl IntoIterator<Item = (CoreIndex, FreedReason)>,
now: BlockNumberFor<T>,
) {
log::debug!(target: LOG_TARGET, "[free_cores_and_fill_claimqueue] enter now: {:?}, cq: {:?}", now, ClaimQueue::<T>::get());

let (mut concluded_paras, mut timedout_paras) = Self::free_cores(just_freed_cores);

// This can only happen on new sessions at which we move all assignments back to the
Expand All @@ -586,6 +624,8 @@ impl<T: Config> Pallet<T> {

// add previously timedout paras back into the queue
if let Some(mut entry) = timedout_paras.remove(&core_idx) {
log::debug!(target: LOG_TARGET, "[free_cores_and_fill_claimqueue] timedout para: entry: {:?} core: {:?}", entry, core_idx);

if entry.availability_timeouts < max_availability_timeouts {
// Increment the timeout counter.
entry.availability_timeouts += 1;
Expand All @@ -594,6 +634,8 @@ impl<T: Config> Pallet<T> {
Self::add_to_claimqueue(core_idx, entry);
// The claim has been added back into the claimqueue.
// Do not pop another assignment for the core.
log::debug!(target: LOG_TARGET, "[free_cores_and_fill_claimqueue] claim has been added back into the claimqueue");

continue
} else {
// Consider timed out assignments for on demand parachains as concluded for
Expand All @@ -611,13 +653,16 @@ impl<T: Config> Pallet<T> {
if Self::is_core_occupied(core_idx) { 1 } else { 0 };
for _ in n_lookahead_used..n_lookahead {
if let Some(assignment) = T::AssignmentProvider::pop_assignment_for_core(core_idx) {
log::debug!(target: LOG_TARGET, "[free_cores_and_fill_claimqueue] will add to claim queue for core {:?}", core_idx);

Self::add_to_claimqueue(core_idx, ParasEntry::new(assignment, now + ttl));
}
}
}

debug_assert!(timedout_paras.is_empty());
debug_assert!(concluded_paras.is_empty());
log::debug!(target: LOG_TARGET, "[free_cores_and_fill_claimqueue] before return now: {:?}, cq: {:?}", now, ClaimQueue::<T>::get());
}

fn is_core_occupied(core_idx: CoreIndex) -> bool {
Expand All @@ -628,6 +673,8 @@ impl<T: Config> Pallet<T> {
}

fn add_to_claimqueue(core_idx: CoreIndex, pe: ParasEntryType<T>) {
log::debug!(target: LOG_TARGET, "[add_to_claimqueue] core_idx {:?} pe {:?}", core_idx, pe);

ClaimQueue::<T>::mutate(|la| {
la.entry(core_idx).or_default().push_back(pe);
});
Expand All @@ -638,6 +685,7 @@ impl<T: Config> Pallet<T> {
core_idx: CoreIndex,
para_id: ParaId,
) -> Result<(PositionInClaimqueue, ParasEntryType<T>), &'static str> {
log::debug!(target: LOG_TARGET, "[remove_from_claimqueue] enter");
ClaimQueue::<T>::mutate(|cq| {
let core_claims = cq.get_mut(&core_idx).ok_or("core_idx not found in lookahead")?;

Expand All @@ -648,6 +696,8 @@ impl<T: Config> Pallet<T> {

let pe = core_claims.remove(pos).ok_or("remove returned None")?;

log::debug!(target: LOG_TARGET, "[remove_from_claimqueue] removed pos {}", pos);

Ok((pos as u32, pe))
})
}
Expand Down
Loading
Loading