-
Notifications
You must be signed in to change notification settings - Fork 766
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update Scheduler to Support Relay Chain Block Number Provider #6362
base: master
Are you sure you want to change the base?
Changes from 2 commits
f8e5c63
455d765
762c816
86701cf
27d1fa1
c79900c
622805d
abbd346
844662e
813e596
8c40f24
b905628
2ea271a
9a73c23
ece2027
da116a7
b1b414a
584f98f
fe918e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,7 +107,7 @@ use frame_system::{ | |
use scale_info::TypeInfo; | ||
use sp_io::hashing::blake2_256; | ||
use sp_runtime::{ | ||
traits::{BadOrigin, Dispatchable, One, Saturating, Zero}, | ||
traits::{BadOrigin, BlockNumberProvider, Dispatchable, One, Saturating, Zero}, | ||
BoundedVec, DispatchError, RuntimeDebug, | ||
}; | ||
|
||
|
@@ -292,6 +292,13 @@ pub mod pallet { | |
|
||
/// The preimage provider with which we look up call hashes to get the call. | ||
type Preimages: QueryPreimage<H = Self::Hashing> + StorePreimage; | ||
|
||
/// Provider for the block number. Normally this is the `frame_system` pallet. | ||
type BlockNumberProvider: BlockNumberProvider<BlockNumber = BlockNumberFor<Self>>; | ||
gupnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/// The maximum number of blocks that can be scheduled. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any hints on how to configure this? Parachain teams will read this and not know what number to put. |
||
#[pallet::constant] | ||
type MaxScheduledBlocks: Get<u32>; | ||
} | ||
|
||
#[pallet::storage] | ||
|
@@ -325,6 +332,11 @@ pub mod pallet { | |
pub(crate) type Lookup<T: Config> = | ||
StorageMap<_, Twox64Concat, TaskName, TaskAddress<BlockNumberFor<T>>>; | ||
|
||
/// The queue of block numbers that have scheduled agendas. | ||
#[pallet::storage] | ||
pub(crate) type Queue<T: Config> = | ||
StorageValue<_, BoundedVec<BlockNumberFor<T>, T::MaxScheduledBlocks>, ValueQuery>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we know if one vector is enough? I think the referenda pallet creates an alarm for each ref... |
||
|
||
/// Events type. | ||
#[pallet::event] | ||
#[pallet::generate_deposit(pub(super) fn deposit_event)] | ||
|
@@ -376,7 +388,8 @@ pub mod pallet { | |
#[pallet::hooks] | ||
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> { | ||
/// Execute the scheduled calls | ||
fn on_initialize(now: BlockNumberFor<T>) -> Weight { | ||
fn on_initialize(_do_not_use_local_block_number: BlockNumberFor<T>) -> Weight { | ||
let now = T::BlockNumberProvider::current_block_number(); | ||
let mut weight_counter = WeightMeter::with_limit(T::MaximumWeight::get()); | ||
Self::service_agendas(&mut weight_counter, now, u32::max_value()); | ||
weight_counter.consumed() | ||
|
@@ -889,7 +902,7 @@ impl<T: Config> Pallet<T> { | |
fn resolve_time( | ||
when: DispatchTime<BlockNumberFor<T>>, | ||
) -> Result<BlockNumberFor<T>, DispatchError> { | ||
let now = frame_system::Pallet::<T>::block_number(); | ||
let now = T::BlockNumberProvider::current_block_number(); | ||
|
||
let when = match when { | ||
DispatchTime::At(x) => x, | ||
|
@@ -926,17 +939,23 @@ impl<T: Config> Pallet<T> { | |
let mut agenda = Agenda::<T>::get(when); | ||
let index = if (agenda.len() as u32) < T::MaxScheduledPerBlock::get() { | ||
// will always succeed due to the above check. | ||
let _ = agenda.try_push(Some(what)); | ||
let _ = agenda.try_push(Some(what.clone())); | ||
agenda.len() as u32 - 1 | ||
} else { | ||
if let Some(hole_index) = agenda.iter().position(|i| i.is_none()) { | ||
agenda[hole_index] = Some(what); | ||
agenda[hole_index] = Some(what.clone()); | ||
hole_index as u32 | ||
} else { | ||
return Err((DispatchError::Exhausted, what)) | ||
} | ||
}; | ||
Agenda::<T>::insert(when, agenda); | ||
Queue::<T>::mutate(|q| { | ||
if let Err(index) = q.binary_search_by_key(&when, |x| *x) { | ||
q.try_insert(index, when).map_err(|_| (DispatchError::Exhausted, what))?; | ||
} | ||
Ok(()) | ||
})?; | ||
Ok(index) | ||
} | ||
|
||
|
@@ -952,6 +971,11 @@ impl<T: Config> Pallet<T> { | |
Some(_) => {}, | ||
None => { | ||
Agenda::<T>::remove(when); | ||
Queue::<T>::mutate(|q| { | ||
if let Ok(index) = q.binary_search_by_key(&when, |x| *x) { | ||
q.remove(index); | ||
} | ||
}); | ||
}, | ||
} | ||
} | ||
|
@@ -1157,24 +1181,30 @@ impl<T: Config> Pallet<T> { | |
return | ||
} | ||
|
||
let mut incomplete_since = now + One::one(); | ||
let mut when = IncompleteSince::<T>::take().unwrap_or(now); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain why would not it work with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, but then this becomes unbounded in case too many blocks are skipped. The idea behind using the
This should be determined similar to the existing
There is already a retry mechanism and the task is purged if the retry count is exceeded (even if failed). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The The Retries work in case if a certain task fails while it's function call is being executed (not the scheduler fail). I meant a case when there are many (or few but too heavy) overdue tasks (task_block < now), so that the scheduler never (or needs too many time) to complete them and exist such overdue state to start processing tasks in time. Do we handle such case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Indeed, I do not find it quite comfortable to run a
But this stays as an issue even in the current implementation? The change here just makes it bounded, so that the scheduling itself is blocked in such a case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can put a quite big bound on the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, indeed it is bad for PoV, as it is read every block. The situation we want to fix is when the scheduler is using the relay chain block, and the parachain doesn't execute often. (1) Maybe in this case the scheduler should use a different block provider with less granularity like (2) Or otherwise we can have a more complex structure for the queue. We cut the vector in chunck of 100 blocks. But still if the parachains wake up every month it can be not good. But at this point they should use (1). EDIT: I agree we can also just ignore this situation with a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Also the task scheduling is affected.
I think we have next cases today/planned soon:
We have a problem with (3.2) case only. On the current version (without I would look into numbers, if with the current version we can handle 2 hours of overdue in some reasonable time (lets say 10 blocks), then I think we are fine even with current solution, we just need tests for it. If not, may be we can introduce the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just checked that we currently use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes the referenda pallet creates an alarm for every ref to check the voting turnout.
Depends on how many blocks are produced. I guess when we assume that the parachain will produce blocks at least as fast as it can advance the scheduler then yes. Conceptually, I believe that a priority Queue is the right data structure. We try to evaluate an ordered list of tasks by their order. It is exactly what a priority queue is good at. The issue with implementing this as a Vector is obviously the PoV. Maybe we can implement the Queue as a B Tree? Then we can get the next task in log reads and insert in log writes. And it allows us to do exactly what we want: get the next pending task. It could be PoV optimized by using chunks as well. |
||
let mut executed = 0; | ||
let queue = Queue::<T>::get(); | ||
let end_index = match queue.binary_search_by_key(&now, |x| *x) { | ||
Ok(end_index) => end_index.saturating_add(1), | ||
Err(end_index) => { | ||
if end_index == 0 { | ||
return; | ||
} | ||
end_index | ||
}, | ||
}; | ||
|
||
let max_items = T::MaxScheduledPerBlock::get(); | ||
let mut count_down = max; | ||
let service_agenda_base_weight = T::WeightInfo::service_agenda_base(max_items); | ||
while count_down > 0 && when <= now && weight.can_consume(service_agenda_base_weight) { | ||
if !Self::service_agenda(weight, &mut executed, now, when, u32::max_value()) { | ||
incomplete_since = incomplete_since.min(when); | ||
let mut index = 0; | ||
while index < end_index { | ||
let when = queue[index]; | ||
let mut executed = 0; | ||
if !Self::service_agenda(weight, &mut executed, now, when, max) { | ||
break; | ||
} | ||
when.saturating_inc(); | ||
count_down.saturating_dec(); | ||
} | ||
incomplete_since = incomplete_since.min(when); | ||
if incomplete_since <= now { | ||
IncompleteSince::<T>::put(incomplete_since); | ||
index.saturating_inc(); | ||
} | ||
|
||
Queue::<T>::mutate(|queue| { | ||
queue.drain(0..index); | ||
}); | ||
} | ||
|
||
/// Returns `true` if the agenda was fully completed, `false` if it should be revisited at a | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally in what case? Parachain or relay/solo?