Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Scheduler to Support Relay Chain Block Number Provider #6362

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,13 @@ parameter_types! {
#[cfg(not(feature = "runtime-benchmarks"))]
parameter_types! {
pub const MaxScheduledPerBlock: u32 = 50;
pub const MaxScheduledBlocks: u32 = 50;
}

#[cfg(feature = "runtime-benchmarks")]
parameter_types! {
pub const MaxScheduledPerBlock: u32 = 200;
pub const MaxScheduledBlocks: u32 = 200;
}

impl pallet_scheduler::Config for Runtime {
Expand All @@ -622,6 +624,8 @@ impl pallet_scheduler::Config for Runtime {
type WeightInfo = weights::pallet_scheduler::WeightInfo<Runtime>;
type OriginPrivilegeCmp = EqualOrGreatestRootCmp;
type Preimages = Preimage;
type BlockNumberProvider = frame_system::Pallet<Runtime>;
type MaxScheduledBlocks = MaxScheduledBlocks;
}

parameter_types! {
Expand Down
3 changes: 3 additions & 0 deletions polkadot/runtime/rococo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ parameter_types! {
pub MaximumSchedulerWeight: Weight = Perbill::from_percent(80) *
BlockWeights::get().max_block;
pub const MaxScheduledPerBlock: u32 = 50;
pub const MaxScheduledBlocks: u32 = 50;
pub const NoPreimagePostponement: Option<u32> = Some(10);
}

Expand Down Expand Up @@ -331,6 +332,8 @@ impl pallet_scheduler::Config for Runtime {
type WeightInfo = weights::pallet_scheduler::WeightInfo<Runtime>;
type OriginPrivilegeCmp = OriginPrivilegeCmp;
type Preimages = Preimage;
type BlockNumberProvider = frame_system::Pallet<Runtime>;
type MaxScheduledBlocks = MaxScheduledBlocks;
}

parameter_types! {
Expand Down
3 changes: 3 additions & 0 deletions polkadot/runtime/westend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ parameter_types! {
pub MaximumSchedulerWeight: frame_support::weights::Weight = Perbill::from_percent(80) *
BlockWeights::get().max_block;
pub const MaxScheduledPerBlock: u32 = 50;
pub const MaxScheduledBlocks: u32 = 50;
pub const NoPreimagePostponement: Option<u32> = Some(10);
}

Expand All @@ -247,6 +248,8 @@ impl pallet_scheduler::Config for Runtime {
type WeightInfo = weights::pallet_scheduler::WeightInfo<Runtime>;
type OriginPrivilegeCmp = frame_support::traits::EqualPrivilegeOnly;
type Preimages = Preimage;
type BlockNumberProvider = frame_system::Pallet<Runtime>;
type MaxScheduledBlocks = MaxScheduledBlocks;
}

parameter_types! {
Expand Down
5 changes: 5 additions & 0 deletions substrate/bin/node/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,11 @@ impl pallet_scheduler::Config for Runtime {
type WeightInfo = pallet_scheduler::weights::SubstrateWeight<Runtime>;
type OriginPrivilegeCmp = EqualPrivilegeOnly;
type Preimages = Preimage;
type BlockNumberProvider = frame_system::Pallet<Runtime>;
#[cfg(feature = "runtime-benchmarks")]
type MaxScheduledBlocks = ConstU32<512>;
#[cfg(not(feature = "runtime-benchmarks"))]
type MaxScheduledBlocks = ConstU32<50>;
}

impl pallet_glutton::Config for Runtime {
Expand Down
2 changes: 2 additions & 0 deletions substrate/frame/democracy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ impl pallet_scheduler::Config for Test {
type WeightInfo = ();
type OriginPrivilegeCmp = EqualPrivilegeOnly;
type Preimages = ();
type BlockNumberProvider = frame_system::Pallet<Test>;
type MaxScheduledBlocks = ConstU32<100>;
}

#[derive_impl(pallet_balances::config_preludes::TestDefaultConfig)]
Expand Down
2 changes: 2 additions & 0 deletions substrate/frame/referenda/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ impl pallet_scheduler::Config for Test {
type WeightInfo = ();
type OriginPrivilegeCmp = EqualPrivilegeOnly;
type Preimages = Preimage;
type BlockNumberProvider = frame_system::Pallet<Test>;
type MaxScheduledBlocks = ConstU32<100>;
}
#[derive_impl(pallet_balances::config_preludes::TestDefaultConfig)]
impl pallet_balances::Config for Test {
Expand Down
70 changes: 50 additions & 20 deletions substrate/frame/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ use frame_system::{
use scale_info::TypeInfo;
use sp_io::hashing::blake2_256;
use sp_runtime::{
traits::{BadOrigin, Dispatchable, One, Saturating, Zero},
traits::{BadOrigin, BlockNumberProvider, Dispatchable, One, Saturating, Zero},
BoundedVec, DispatchError, RuntimeDebug,
};

Expand Down Expand Up @@ -292,6 +292,13 @@ pub mod pallet {

/// The preimage provider with which we look up call hashes to get the call.
type Preimages: QueryPreimage<H = Self::Hashing> + StorePreimage;

/// Provider for the block number. Normally this is the `frame_system` pallet.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally in what case? Parachain or relay/solo?

type BlockNumberProvider: BlockNumberProvider<BlockNumber = BlockNumberFor<Self>>;
gupnik marked this conversation as resolved.
Show resolved Hide resolved

/// The maximum number of blocks that can be scheduled.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any hints on how to configure this? Parachain teams will read this and not know what number to put.

#[pallet::constant]
type MaxScheduledBlocks: Get<u32>;
}

#[pallet::storage]
Expand Down Expand Up @@ -325,6 +332,11 @@ pub mod pallet {
pub(crate) type Lookup<T: Config> =
StorageMap<_, Twox64Concat, TaskName, TaskAddress<BlockNumberFor<T>>>;

/// The queue of block numbers that have scheduled agendas.
#[pallet::storage]
pub(crate) type Queue<T: Config> =
StorageValue<_, BoundedVec<BlockNumberFor<T>, T::MaxScheduledBlocks>, ValueQuery>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know if one vector is enough? I think the referenda pallet creates an alarm for each ref...


/// Events type.
#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
Expand Down Expand Up @@ -376,7 +388,8 @@ pub mod pallet {
#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
/// Execute the scheduled calls
fn on_initialize(now: BlockNumberFor<T>) -> Weight {
fn on_initialize(_do_not_use_local_block_number: BlockNumberFor<T>) -> Weight {
let now = T::BlockNumberProvider::current_block_number();
let mut weight_counter = WeightMeter::with_limit(T::MaximumWeight::get());
Self::service_agendas(&mut weight_counter, now, u32::max_value());
weight_counter.consumed()
Expand Down Expand Up @@ -889,7 +902,7 @@ impl<T: Config> Pallet<T> {
fn resolve_time(
when: DispatchTime<BlockNumberFor<T>>,
) -> Result<BlockNumberFor<T>, DispatchError> {
let now = frame_system::Pallet::<T>::block_number();
let now = T::BlockNumberProvider::current_block_number();

let when = match when {
DispatchTime::At(x) => x,
Expand Down Expand Up @@ -926,17 +939,23 @@ impl<T: Config> Pallet<T> {
let mut agenda = Agenda::<T>::get(when);
let index = if (agenda.len() as u32) < T::MaxScheduledPerBlock::get() {
// will always succeed due to the above check.
let _ = agenda.try_push(Some(what));
let _ = agenda.try_push(Some(what.clone()));
agenda.len() as u32 - 1
} else {
if let Some(hole_index) = agenda.iter().position(|i| i.is_none()) {
agenda[hole_index] = Some(what);
agenda[hole_index] = Some(what.clone());
hole_index as u32
} else {
return Err((DispatchError::Exhausted, what))
}
};
Agenda::<T>::insert(when, agenda);
Queue::<T>::mutate(|q| {
if let Err(index) = q.binary_search_by_key(&when, |x| *x) {
q.try_insert(index, when).map_err(|_| (DispatchError::Exhausted, what))?;
}
Ok(())
})?;
Ok(index)
}

Expand All @@ -952,6 +971,11 @@ impl<T: Config> Pallet<T> {
Some(_) => {},
None => {
Agenda::<T>::remove(when);
Queue::<T>::mutate(|q| {
if let Ok(index) = q.binary_search_by_key(&when, |x| *x) {
q.remove(index);
}
});
},
}
}
Expand Down Expand Up @@ -1157,24 +1181,30 @@ impl<T: Config> Pallet<T> {
return
}

let mut incomplete_since = now + One::one();
let mut when = IncompleteSince::<T>::take().unwrap_or(now);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why would not it work with IncompleteSince, without the block Queue?
How we determine the MaxScheduledBlocks bound?
With the IncompleteSince we iterate over blocks that might have no task to execute and this might make a situation with many incomplete blocks even worth. But probably not too much? One more read?
Both solutions need a strategy for a situation when there are two many tasks that can not be completed and the task queue only grow. If such strategy not yet in place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the IncompleteSince we iterate over blocks that might have no task to execute and this might make a situation with many incomplete blocks even worth. But probably not too much? One more read?

Yes, but then this becomes unbounded in case too many blocks are skipped. The idea behind using the Queue is to bound this to a sufficient number.

How we determine the MaxScheduledBlocks bound?

This should be determined similar to the existing MaxScheduledPerBlock?

Both solutions need a strategy for a situation when there are two many tasks that can not be completed and the task queue only grow. If such strategy not yet in place.

There is already a retry mechanism and the task is purged if the retry count is exceeded (even if failed).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Queue not only bounds how many blocks gonna be processed from the past. It bounds for how many blocks we can schedule. If the number is 50, we can schedule only 50 jobs with distinct schedule time.

The MaxScheduledPerBlock for me seems simpler to define. Because the block size its exiting constrain the system have. But how many distinct schedule time points you can have is something new.

Retries work in case if a certain task fails while it's function call is being executed (not the scheduler fail). I meant a case when there are many (or few but too heavy) overdue tasks (task_block < now), so that the scheduler never (or needs too many time) to complete them and exist such overdue state to start processing tasks in time. Do we handle such case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Queue not only bounds how many blocks gonna be processed from the past. It bounds for how many blocks we can schedule. If the number is 50, we can schedule only 50 jobs with distinct schedule time

Indeed, I do not find it quite comfortable to run a for loop with IncompleteSince when there could be an unknown number of blocks passed between the successive runs. You could always keep the MaxScheduledBlocks on the higher side that would give you a similar experience?

I meant a case when there are many (or few but too heavy) overdue tasks (task_block < now), so that the scheduler never (or needs too many time) to complete them and exist such overdue state to start processing tasks in time. Do we handle such case?

But this stays as an issue even in the current implementation? The change here just makes it bounded, so that the scheduling itself is blocked in such a case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can put a quite big bound on the MaxScheduledBlocks, it is just a vec of block numbers.

Copy link
Contributor

@gui1117 gui1117 Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, indeed it is bad for PoV, as it is read every block.

The situation we want to fix is when the scheduler is using the relay chain block, and the parachain doesn't execute often.

(1) Maybe in this case the scheduler should use a different block provider with less granularity like relay chain block / 100 so that when doing IncompleteSince it increments with a step of 100 relay chain block until it arrives to now.

(2) Or otherwise we can have a more complex structure for the queue. We cut the vector in chunck of 100 blocks.
So we have a double map with first key is block number / 100 and second key is block number % 100, the value is a vector of length at most 100.

But still if the parachains wake up every month it can be not good. But at this point they should use (1).

EDIT: I agree we can also just ignore this situation with a MaxStaleTaskAge parameter. IMO it is fine. And people can do (1) if their parachain executes too much rarely

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added MaxStaleTaskAge as suggested. Thanks both.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, indeed it is bad for PoV, as it is read every block.

Also the task scheduling is affected.

The situation we want to fix is when the scheduler is using the relay chain block, and the parachain doesn't execute often.

I think we have next cases today/planned soon:

  1. Relay Chain with scheduler working with local block provider. No concerns. The new Queue is even redundant;
  2. Parachain with scheduler working with local block provider. Same as (1);
  3. Parachain with scheduler working with Relay Chain block provider;
    3.1 runs scheduler on every second RC block, same as (1);
    3.2 RC or Parachain for some reason is not producing blocks for 2 hours, we have 1200 blocks to iterate through.

We have a problem with (3.2) case only. On the current version (without Queue) it will eventually handle the overdue blocks (we can even calculate how many blocks it will take, lets say if there is no tasks scheduled in that period). With the Queue such situation as (3.2) gonna be handled well, but with a cost.

I would look into numbers, if with the current version we can handle 2 hours of overdue in some reasonable time (lets say 10 blocks), then I think we are fine even with current solution, we just need tests for it. If not, may be we can introduce the Queue in a way that it can be disabled for (1) and (2) cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked that we currently use Scheduler only for the Governance related pallets. I think the related tasks should be better eventually processed than dropped if too old. So MaxStaleTaskAge should be at least optional.

Copy link
Member

@ggwpez ggwpez Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the referenda pallet creates an alarm for every ref to check the voting turnout.

We have a problem with (3.2) case only. On the current version (without Queue) it will eventually handle the overdue blocks (we can even calculate how many blocks it will take, lets say if there is no tasks scheduled in that period).

Depends on how many blocks are produced. I guess when we assume that the parachain will produce blocks at least as fast as it can advance the scheduler then yes.
Playing devils advocate here since there could be parachains that only produce one block every two hours, which would get stuck without ever catching up the IncompleteSince.

Conceptually, I believe that a priority Queue is the right data structure. We try to evaluate an ordered list of tasks by their order. It is exactly what a priority queue is good at. The issue with implementing this as a Vector is obviously the PoV.

Maybe we can implement the Queue as a B Tree? Then we can get the next task in log reads and insert in log writes. And it allows us to do exactly what we want: get the next pending task. It could be PoV optimized by using chunks as well.
To me it just seems that most of the pain here is that we are using the wrong data structure for the job.

let mut executed = 0;
let queue = Queue::<T>::get();
let end_index = match queue.binary_search_by_key(&now, |x| *x) {
Ok(end_index) => end_index.saturating_add(1),
Err(end_index) => {
if end_index == 0 {
return;
}
end_index
},
};

let max_items = T::MaxScheduledPerBlock::get();
let mut count_down = max;
let service_agenda_base_weight = T::WeightInfo::service_agenda_base(max_items);
while count_down > 0 && when <= now && weight.can_consume(service_agenda_base_weight) {
if !Self::service_agenda(weight, &mut executed, now, when, u32::max_value()) {
incomplete_since = incomplete_since.min(when);
let mut index = 0;
while index < end_index {
let when = queue[index];
let mut executed = 0;
if !Self::service_agenda(weight, &mut executed, now, when, max) {
break;
}
when.saturating_inc();
count_down.saturating_dec();
}
incomplete_since = incomplete_since.min(when);
if incomplete_since <= now {
IncompleteSince::<T>::put(incomplete_since);
index.saturating_inc();
}

Queue::<T>::mutate(|queue| {
queue.drain(0..index);
});
}

/// Returns `true` if the agenda was fully completed, `false` if it should be revisited at a
Expand Down
8 changes: 8 additions & 0 deletions substrate/frame/scheduler/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ impl Config for Test {
type WeightInfo = TestWeightInfo;
type OriginPrivilegeCmp = EqualPrivilegeOnly;
type Preimages = Preimage;
type BlockNumberProvider = frame_system::Pallet<Self>;
type MaxScheduledBlocks = ConstU32<20>;
}

pub type LoggerCall = logger::Call<Test>;
Expand All @@ -244,6 +246,12 @@ pub fn run_to_block(n: u64) {
}
}

pub fn go_to_block(n: u64) {
System::set_block_number(n);
Scheduler::on_initialize(n);
Scheduler::on_finalize(n);
}

pub fn root() -> OriginCaller {
system::RawOrigin::Root.into()
}
42 changes: 38 additions & 4 deletions substrate/frame/scheduler/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,7 @@ fn on_initialize_weight_is_correct() {
));

// Will include the named periodic only
System::set_block_number(1);
assert_eq!(
Scheduler::on_initialize(1),
TestWeightInfo::service_agendas_base() +
Expand All @@ -1648,6 +1649,7 @@ fn on_initialize_weight_is_correct() {
assert_eq!(logger::log(), vec![(root(), 2600u32)]);

// Will include anon and anon periodic
System::set_block_number(2);
assert_eq!(
Scheduler::on_initialize(2),
TestWeightInfo::service_agendas_base() +
Expand All @@ -1663,6 +1665,7 @@ fn on_initialize_weight_is_correct() {
assert_eq!(logger::log(), vec![(root(), 2600u32), (root(), 69u32), (root(), 42u32)]);

// Will include named only
System::set_block_number(3);
assert_eq!(
Scheduler::on_initialize(3),
TestWeightInfo::service_agendas_base() +
Expand All @@ -1678,11 +1681,9 @@ fn on_initialize_weight_is_correct() {
);

// Will contain none
System::set_block_number(4);
let actual_weight = Scheduler::on_initialize(4);
assert_eq!(
actual_weight,
TestWeightInfo::service_agendas_base() + TestWeightInfo::service_agenda_base(0)
);
assert_eq!(actual_weight, TestWeightInfo::service_agendas_base());
});
}

Expand Down Expand Up @@ -3035,3 +3036,36 @@ fn unavailable_call_is_detected() {
assert!(!Preimage::is_requested(&hash));
});
}

#[test]
fn basic_scheduling_with_skipped_blocks_works() {
new_test_ext().execute_with(|| {
// Call to schedule
let call =
RuntimeCall::Logger(LoggerCall::log { i: 42, weight: Weight::from_parts(10, 0) });

// BaseCallFilter should be implemented to accept `Logger::log` runtime call which is
// implemented for `BaseFilter` in the mock runtime
assert!(!<Test as frame_system::Config>::BaseCallFilter::contains(&call));

// Schedule call to be executed at the 4th block
assert_ok!(Scheduler::do_schedule(
DispatchTime::At(4),
None,
127,
root(),
Preimage::bound(call).unwrap()
));

// `log` runtime call should not have executed yet
go_to_block(3);
assert!(logger::log().is_empty());

go_to_block(6);
// `log` runtime call should have executed at block 4
assert_eq!(logger::log(), vec![(root(), 42u32)]);

go_to_block(100);
assert_eq!(logger::log(), vec![(root(), 42u32)]);
});
}
Loading