Skip to content

Commit

Permalink
Merge branch 'brad-implement-assign-core' into rk-core-time-prep
Browse files Browse the repository at this point in the history
+ Remove random insert.
  • Loading branch information
eskimor committed Nov 8, 2023
2 parents 79cc200 + 1027244 commit dcec120
Show file tree
Hide file tree
Showing 4 changed files with 466 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
mock::MockGenesisConfig,
paras::{ParaGenesisArgs, ParaKind},
};
use sp_runtime::Perbill;

use primitives::{Balance, HeadData, ValidationCode};

Expand Down
144 changes: 67 additions & 77 deletions polkadot/runtime/parachains/src/assigner_bulk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,12 @@ use crate::{
},
};

use frame_support::{
pallet_prelude::*,
traits::{
Currency,
ExistenceRequirement::{self, AllowDeath, KeepAlive},
WithdrawReasons,
},
};
use frame_support::{pallet_prelude::*, defensive};
use frame_system::pallet_prelude::*;
use pallet_broker::CoreAssignment;
use primitives::{CoreIndex, Id as ParaId};
use sp_runtime::{
traits::{One, SaturatedConversion},
FixedPointNumber, FixedPointOperand, FixedU128, Perbill, Saturating,
};

use sp_std::{collections::vec_deque::VecDeque, prelude::*};

const LOG_TARGET: &str = "runtime::parachains::assigner-bulk";
use sp_std::prelude::*;

pub use pallet::*;

Expand All @@ -71,6 +58,7 @@ impl WeightInfo for TestWeightInfo {}
///
/// for a particular core.
#[derive(Encode, Decode, TypeInfo)]
#[cfg_attr(test, derive(PartialEq, RuntimeDebug))]
struct Schedule<N> {
// Original assignments
assignments: Vec<(CoreAssignment, PartsOf57600)>,
Expand All @@ -79,9 +67,6 @@ struct Schedule<N> {
/// If this is `Some`, then this `CoreState` will be dropped at that block number. If this is
/// `None`, then we will keep serving our core assignments in a circle until a new set of
/// assignments is scheduled.
///
/// Note: This is used to chain schedules: If there is any more schedules for this core you
/// will find the next at `end_hint` + 1 block number in `CoreSchedules`.
end_hint: Option<N>,

/// The next queued schedule for this core.
Expand All @@ -107,6 +92,7 @@ struct CoreDescriptor<N> {
/// Schedules in `CoreSchedules` form a queue. `Schedule::end_hint` always pointing to the next
/// item.
#[derive(Encode, Decode, TypeInfo, Copy, Clone)]
#[cfg_attr(test, derive(PartialEq, RuntimeDebug, Clone))]
struct QueueDescriptor<N> {
/// First scheduled item, that is not yet active.
first: N,
Expand All @@ -115,6 +101,7 @@ struct QueueDescriptor<N> {
}

#[derive(Encode, Decode, TypeInfo)]
#[cfg_attr(test, derive(PartialEq, RuntimeDebug, Clone))]
struct WorkState<N> {
/// Assignments with current state.
///
Expand All @@ -126,9 +113,6 @@ struct WorkState<N> {
/// If this is `Some`, then this `CoreState` will be dropped at that block number. If this is
/// `None`, then we will keep serving our core assignments in a circle until a new set of
/// assignments is scheduled.
///
/// Note: This is used to chain schedules: If there is any future schedule for this core you
/// will find it at `end_hint` + 1 block number in `CoreSchedules`.
end_hint: Option<N>,
/// Position in the assignments we are currently in.
///
Expand All @@ -142,6 +126,7 @@ struct WorkState<N> {
}

#[derive(Encode, Decode, TypeInfo)]
#[cfg_attr(test, derive(PartialEq, RuntimeDebug, Clone, Copy))]
struct AssignmentState {
/// Ratio of the core this assignment has.
///
Expand Down Expand Up @@ -222,16 +207,28 @@ pub mod pallet {
#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn on_initialize(_now: BlockNumberFor<T>) -> Weight {
unimplemented!("TODO: Implement")
//TODO: Implement
T::DbWeight::get().reads_writes(0, 0)
}
}

#[pallet::call]
impl<T: Config> Pallet<T> {}

#[pallet::error]
pub enum Error<T> {
AssignmentsEmpty,
/// Assignments together exceeded 57600.
OverScheduled,
/// assign_core is only allowed to append new assignments at the end of already existing
/// ones.
DisallowedInsert,
}
}

/// Assignments as provided by our `AssignmentProvider` implementation.
#[derive(Encode, Decode, TypeInfo, Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub enum BulkAssignment<OnDemand> {
/// Assignment was an instantaneous core time assignment.
Instantaneous(OnDemand),
Expand Down Expand Up @@ -383,68 +380,60 @@ impl<T: Config> Pallet<T> {
});
}

/// Append another assignment for a core.
///
/// Important only appending is allowed. Meaning, all already existing assignments must have a
/// begin smaller than the one passed here. This restriction exists, because it makes the
/// insertion O(1) and the author could not think of a reason, why this restriction should be
/// causing any problems. Inserting arbitrarily causes a `DispatchError::DisallowedInsert`
/// error. This restriction could easily be lifted if need be and in fact an implementation is
/// available
/// [here](https://github.com/paritytech/polkadot-sdk/pull/1694/commits/c0c23b01fd2830910cde92c11960dad12cdff398#diff-0c85a46e448de79a5452395829986ee8747e17a857c27ab624304987d2dde8baR386).
/// The problem is that insertion complexity then depends on the size of the existing queue,
/// which makes determining weights hard and could lead to issues like overweight blocks (at
/// least in theory).
pub fn assign_core(
core_idx: CoreIndex,
begin: BlockNumberFor<T>,
assignments: Vec<(CoreAssignment, PartsOf57600)>,
end_hint: Option<BlockNumberFor<T>>,
) {
) -> Result<(), DispatchError> {
// There should be at least one assignment
ensure!(assignments.len() > 0usize, Error::<T>::AssignmentsEmpty);

// Check that the total parts between all assignments do not exceed 57600
ensure!(
assignments
.iter()
.map(|assignment| assignment.1)
.sum::<PartsOf57600>() <= PartsOf57600::from(57600u16),
Error::<T>::OverScheduled
);

CoreDescriptors::<T>::mutate(core_idx, |core_descriptor| {
let new_queue = match core_descriptor.queue {
Some(queue) => {
// Common and expected case is that work is appended at the end.
let mut insert_at = if queue.last <= begin { queue.last } else { queue.first };

// Search insertion point (usually last, thus skipped):
// TODO: Although unexpected, it will most likely be better to just fail. ->
// Get rid of this loop and document that new assignments are only accepted at
// the end.
while insert_at < queue.last {
match CoreSchedules::<T>::get((insert_at, core_idx))
.and_then(|s| s.next_schedule)
{
None => break,
// End reached, we found the insertion point.
Some(p) if p > insert_at && p <= begin => {
insert_at = p;
},
Some(p) if p > insert_at => break,
Some(_) => {
log::error!(
target: LOG_TARGET,
"Cycle in queue detected!"
);
break
},
ensure!(
begin > queue.last,
Error::<T>::DisallowedInsert,
);

CoreSchedules::<T>::mutate((queue.last, core_idx), |schedule| {
if let Some(schedule) = schedule.as_mut() {
debug_assert!(schedule.next_schedule.is_none(), "queue.end was supposed to be the end, so the next item must be `None`!");
schedule.next_schedule = Some(begin);
} else {
defensive!("Queue end entry does not exist?");
}
}
// Overwrite?
if insert_at == begin {
CoreSchedules::<T>::mutate((insert_at, core_idx), |schedule| {
let next_schedule = schedule.as_ref().and_then(|s| s.next_schedule);
*schedule = Some(Schedule { assignments, end_hint, next_schedule });
});
} else {
let old_next =
CoreSchedules::<T>::mutate((insert_at, core_idx), |schedule| {
// Actual insertion, `insert_at` is pointing at our predecessor:
if let Some(schedule) = schedule.as_mut() {
let old_next = schedule.next_schedule;
schedule.next_schedule = Some(begin);
old_next
} else {
None
}
});
CoreSchedules::<T>::insert(
(begin, core_idx),
Schedule { assignments, end_hint, next_schedule: old_next },
);
}
});
CoreSchedules::<T>::insert(
(begin, core_idx),
Schedule { assignments, end_hint, next_schedule: None },
);

QueueDescriptor {
first: sp_std::cmp::min(begin, queue.first),
last: sp_std::cmp::max(begin, queue.last),
first: queue.first,
last: begin,
}
},
None => {
Expand All @@ -457,13 +446,14 @@ impl<T: Config> Pallet<T> {
},
};
core_descriptor.queue = Some(new_queue);
});
Ok(())
})
}
}

// Tests/Invariant:
// - After `assign_core`, Workload is `Some`.
// - end_hint always points to next item in CoreSchedules.
// - Test insertion in the middle, beginning and end.
// - next_schedule always points to next item in CoreSchedules.
// - Test insertion in the middle, beginning and end: Should fail in all cases but the last.
// - Test insertion on empty queue.
// - Test overwrite vs insert.
// - Test overwrite vs insert: Overwrite no longer allowed - should fail with error.
Loading

0 comments on commit dcec120

Please sign in to comment.