Skip to content

Commit

Permalink
assign_core impl
Browse files Browse the repository at this point in the history
  • Loading branch information
eskimor committed Nov 7, 2023
1 parent a1366e8 commit c0c23b0
Showing 1 changed file with 148 additions and 63 deletions.
211 changes: 148 additions & 63 deletions polkadot/runtime/parachains/src/assigner_bulk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
//! Handles scheduling of bulk core time.
//!
//! Invariant: For efficiency the schedule, starting from the current workload always form a linked
//! list:
//! list.
//!
//! Workload (type `WorkState`) is only `Idle` if `WorkPlan` is empty for that core. Otherwise it
//! is either `Scheduled` pointing to the next item in `WorkPlan` for the queue or `WorkState`
//! where `end_hint` will be pointing to the next item in `WorkPlan`. `end_hint` will only ever be
//! `None` if `WorkPlan` for that core is empty. Each `Schedule` item in `WorkPlan` also always
//! points to the next `Schedule` coming afterwards via `end_hint`, if it exists.
//! `CoreDescriptor` contains pointers to the begin and the end of the list, together with the
//! currently active assignments.
mod mock_helpers;
#[cfg(test)]
Expand Down Expand Up @@ -84,30 +81,40 @@ struct Schedule<N> {
/// assignments is scheduled.
///
/// Note: This is used to chain schedules: If there is any more schedules for this core you
/// will find the next at `end_hint` + 1 block number in `Workplan`.
/// will find the next at `end_hint` + 1 block number in `CoreSchedules`.
end_hint: Option<N>,

/// The next queued schedule for this core.
///
/// Schedules are forming a queue.
next_schedule: Option<N>,
}

/// An instantiated `Schedule`.
/// Descriptor for a core.
///
/// This is the state of assignments currently being served via the `AssignmentProvider` interface,
/// as opposed to `Schedule` which is upcoming not yet served assignments.
#[derive(Encode, Decode, TypeInfo)]
enum CoreState<N> {
/// No work for this core right now and also nothing scheduled.
Idle,
/// No work for this core right now, but work coming up at block `N`.
Scheduled(N),
/// Work is currently performed on this core, details in current `WorkState`.
Working(WorkState),
/// Contains pointers to first and last schedule into `CoreSchedules` for that core and keeps track
/// of the currently active work as well.
#[derive(Encode, Decode, TypeInfo, Default)]
struct CoreDescriptor<N> {
/// Meta data about the queued schedules for this core.
queue: Option<QueueDescriptor<N>>,
/// Currently performed work.
current_work: Option<WorkState<N>>,
}

impl<N> Default for CoreState<N> {
fn default() -> Self {
Self::Idle
}
/// Pointers into `CoreSchedules` for a particular core.
///
/// Schedules in `CoreSchedules` form a queue. `Schedule::end_hint` always pointing to the next
/// item.
#[derive(Encode, Decode, TypeInfo, Copy, Clone)]
struct QueueDescriptor<N> {
/// First scheduled item, that is not yet active.
first: N,
/// Last scheduled item.
last: N,
}

#[derive(Encode, Decode, TypeInfo)]
struct WorkState<N> {
/// Assignments with current state.
///
Expand All @@ -121,7 +128,7 @@ struct WorkState<N> {
/// assignments is scheduled.
///
/// Note: This is used to chain schedules: If there is any future schedule for this core you
/// will find it at `end_hint` + 1 block number in `Workplan`.
/// will find it at `end_hint` + 1 block number in `CoreSchedules`.
end_hint: Option<N>,
/// Position in the assignments we are currently in.
///
Expand Down Expand Up @@ -150,7 +157,7 @@ struct AssignmentState {

impl<N> From<Schedule<N>> for WorkState<N> {
fn from(schedule: Schedule<N>) -> Self {
let Schedule { assignments, end_hint } = schedule;
let Schedule { assignments, end_hint, next_schedule: _ } = schedule;
let step =
if let Some(min_step_assignment) = assignments.iter().min_by(|a, b| a.1.cmp(&b.1)) {
min_step_assignment.1
Expand Down Expand Up @@ -189,34 +196,28 @@ pub mod pallet {
///
/// Assignments as of the given block number. They will go into state once the block number is
/// reached (and replace whatever was in there before).
///
/// Invariants: We assume that this is append only and consumed. In other words new schedules
/// inserted for a core must have a higher block number than all of the already existing
/// schedules.
#[pallet::storage]
pub(super) type Workplan<T: Config> = StorageMap<
pub(super) type CoreSchedules<T: Config> = StorageMap<
_,
Twox256,
(BlockNumberFor<T>, CoreIndex),
Schedule<BlockNumberFor<T>>,
OptionQuery,
>;

/// Latest schedule for the given core.
///
/// Used for updating `end_hint` of that latest schedule based on newly appended schedules.
/// (See described invariant above.)
#[pallet::storage]
pub(super) type LatestCoreSchedule<T: Config> =
StorageMap<_, Twox256, CoreIndex, BlockNumberFor<T>, OptionQuery>;

/// Assignments which are currently active.
///
/// They will be picked from `PendingAssignments` once we reach the scheduled block number in
/// `PendingAssignments`.
#[pallet::storage]
pub(super) type Workload<T: Config> =
StorageMap<_, Twox256, CoreIndex, CoreState<BlockNumberFor<T>>, ValueQuery, GetDefault>;
pub(super) type CoreDescriptors<T: Config> = StorageMap<
_,
Twox256,
CoreIndex,
CoreDescriptor<BlockNumberFor<T>>,
ValueQuery,
GetDefault,
>;

#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
Expand Down Expand Up @@ -272,16 +273,14 @@ impl<T: Config> AssignmentProvider<BlockNumberFor<T>> for Pallet<T> {
fn pop_assignment_for_core(core_idx: CoreIndex) -> Option<Self::AssignmentType> {
let now = <frame_system::Pallet<T>>::block_number();

Workload::<T>::mutate(core_idx, |core_state| {
CoreDescriptors::<T>::mutate(core_idx, |core_state| {
Self::ensure_workload(now, core_idx, core_state);

let work_state = match core_state {
CoreState::Working(w) => w,
CoreState::Scheduled(_) => return None,
CoreState::Idle => return None,
let work_state = match &mut core_state.current_work {
None => return None,
Some(w) => w,
};


work_state.pos = work_state.pos % work_state.assignments.len() as u16;
let (a_type, a_state) = &mut work_state
.assignments
Expand Down Expand Up @@ -348,34 +347,120 @@ impl<T: Config> Pallet<T> {
fn ensure_workload(
now: BlockNumberFor<T>,
core_idx: CoreIndex,
workload: &mut CoreState<BlockNumberFor<T>>,
descriptor: &mut CoreDescriptor<BlockNumberFor<T>>,
) {
let next_scheduled = match workload {
// Nothing to do here (`WorkPlan` is empty for this core):
CoreState::Idle => return,
CoreState::Scheduled(n) => *n,
CoreState::Working(w) => if let Some(end_hint) = w.end_hint {
end_hint.saturating_add(1u32.into())
} else {
// Nothing to do here (Empty `WorkPlan`).
return
// Workload expired?
if let Some(end_hint) = descriptor.current_work.as_ref().and_then(|w| w.end_hint) {
if end_hint >= now {
descriptor.current_work = None;
}
}

let queue = match descriptor.queue {
// No queue => Nothing to do.
None => return,
Some(q) => q,
};

let next_scheduled = queue.first;

if next_scheduled > now {
// Not yet ready.
return
}

// Workload expired - update to whatever is scheduled or `None` if nothing is:
let update = Workplan::<T>::take((next_scheduled, core_idx));
if let Some(update) = update {
*workload = CoreState::Working(update.into());
} else {
*workload = CoreState::Idle;
}
// Update is needed:
let update = CoreSchedules::<T>::take((next_scheduled, core_idx));
let new_first = update.as_ref().and_then(|u| u.next_schedule);
descriptor.current_work = update.map(|u| u.into());

descriptor.queue = new_first.map(|new_first| {
QueueDescriptor {
first: new_first,
// `last` stays unaffected:
last: queue.last,
}
});
}

pub fn assign_core(
core_idx: CoreIndex,
begin: BlockNumberFor<T>,
assignments: Vec<(CoreAssignment, PartsOf57600)>,
end_hint: Option<BlockNumberFor<T>>,
) {
CoreDescriptors::<T>::mutate(core_idx, |core_descriptor| {
let new_queue = match core_descriptor.queue {
Some(queue) => {
// Common and expected case is that work is appended at the end.
let mut insert_at = if queue.last <= begin { queue.last } else { queue.first };

// Search insertion point (usually last, thus skipped):
while insert_at < queue.last {
match CoreSchedules::<T>::get((insert_at, core_idx))
.and_then(|s| s.next_schedule)
{
None => break,
// End reached, we found the insertion point.
Some(p) if p > insert_at && p <= begin => {
insert_at = p;
},
Some(p) if p > insert_at => break,
Some(_) => {
log::error!(
target: LOG_TARGET,
"Cycle in queue detected!"
);
break
},
}
}
// Overwrite?
if insert_at == begin {
CoreSchedules::<T>::mutate((insert_at, core_idx), |schedule| {
let next_schedule = schedule.as_ref().and_then(|s| s.next_schedule);
*schedule = Some(Schedule { assignments, end_hint, next_schedule });
});
} else {
let old_next =
CoreSchedules::<T>::mutate((insert_at, core_idx), |schedule| {
// Actual insertion, `insert_at` is pointing at our predecessor:
if let Some(schedule) = schedule.as_mut() {
let old_next = schedule.next_schedule;
schedule.next_schedule = Some(begin);
old_next
} else {
None
}
});
CoreSchedules::<T>::insert(
(begin, core_idx),
Schedule { assignments, end_hint, next_schedule: old_next },
);
}

QueueDescriptor {
first: sp_std::cmp::min(begin, queue.first),
last: sp_std::cmp::max(begin, queue.last),
}
},
None => {
// Queue empty, just insert:
CoreSchedules::<T>::insert(
(begin, core_idx),
Schedule { assignments, end_hint, next_schedule: None },
);
QueueDescriptor { first: begin, last: begin }
},
};
core_descriptor.queue = Some(new_queue);
});
}
}

// Tests/Invariant:
// - After `assign_core`, Workload is `Some`.
// - end_hint always points to next item in Workplan.
// - end_hint always points to next item in CoreSchedules.
// - Test insertion in the middle, beginning and end.
// - Test insertion on empty queue.
// - Test overwrite vs insert.

0 comments on commit c0c23b0

Please sign in to comment.