diff --git a/Cargo.lock b/Cargo.lock index e5064b19a7829..52d44251fe5e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1811,6 +1811,7 @@ name = "frame-support" version = "3.0.0" dependencies = [ "bitflags", + "env_logger 0.8.3", "frame-metadata", "frame-support-procedural", "frame-system", diff --git a/bin/node/runtime/src/lib.rs b/bin/node/runtime/src/lib.rs index 448867b25cb17..2075d75833bc9 100644 --- a/bin/node/runtime/src/lib.rs +++ b/bin/node/runtime/src/lib.rs @@ -471,6 +471,7 @@ parameter_types! { pub const SlashDeferDuration: pallet_staking::EraIndex = 24 * 7; // 1/4 the bonding duration. pub const RewardCurve: &'static PiecewiseLinear<'static> = &REWARD_CURVE; pub const MaxNominatorRewardedPerValidator: u32 = 256; + pub SlashTaskWeight: Weight = RuntimeBlockWeights::get().max_block / 2; } impl pallet_staking::Config for Runtime { @@ -496,6 +497,7 @@ impl pallet_staking::Config for Runtime { type NextNewSession = Session; type MaxNominatorRewardedPerValidator = MaxNominatorRewardedPerValidator; type ElectionProvider = ElectionProviderMultiPhase; + type TaskExecutor = frame_support::executor::SinglePassExecutor, SlashTaskWeight>; type WeightInfo = pallet_staking::weights::SubstrateWeight; } diff --git a/frame/babe/src/mock.rs b/frame/babe/src/mock.rs index 39831eceb75ba..00e9f21fa95c6 100644 --- a/frame/babe/src/mock.rs +++ b/frame/babe/src/mock.rs @@ -211,6 +211,7 @@ impl pallet_staking::Config for Test { type MaxNominatorRewardedPerValidator = MaxNominatorRewardedPerValidator; type NextNewSession = Session; type ElectionProvider = onchain::OnChainSequentialPhragmen; + type TaskExecutor = frame_support::executor::SinglePassExecutor>; type WeightInfo = (); } diff --git a/frame/grandpa/src/mock.rs b/frame/grandpa/src/mock.rs index d59d0d19d0e87..ac293a7bd7c3d 100644 --- a/frame/grandpa/src/mock.rs +++ b/frame/grandpa/src/mock.rs @@ -217,6 +217,7 @@ impl pallet_staking::Config for Test { type MaxNominatorRewardedPerValidator = MaxNominatorRewardedPerValidator; type NextNewSession = Session; type ElectionProvider = onchain::OnChainSequentialPhragmen; + type TaskExecutor = frame_support::executor::SinglePassExecutor>; type WeightInfo = (); } diff --git a/frame/offences/benchmarking/src/mock.rs b/frame/offences/benchmarking/src/mock.rs index a0a09e0fbb897..6bd512657e0b1 100644 --- a/frame/offences/benchmarking/src/mock.rs +++ b/frame/offences/benchmarking/src/mock.rs @@ -176,6 +176,7 @@ impl pallet_staking::Config for Test { type NextNewSession = Session; type MaxNominatorRewardedPerValidator = MaxNominatorRewardedPerValidator; type ElectionProvider = onchain::OnChainSequentialPhragmen; + type TaskExecutor = frame_support::executor::SinglePassExecutor>; type WeightInfo = (); } diff --git a/frame/session/benchmarking/src/mock.rs b/frame/session/benchmarking/src/mock.rs index cf2fa8a07cfe0..ffb9624328211 100644 --- a/frame/session/benchmarking/src/mock.rs +++ b/frame/session/benchmarking/src/mock.rs @@ -181,6 +181,7 @@ impl pallet_staking::Config for Test { type NextNewSession = Session; type MaxNominatorRewardedPerValidator = MaxNominatorRewardedPerValidator; type ElectionProvider = onchain::OnChainSequentialPhragmen; + type TaskExecutor = frame_support::executor::SinglePassExecutor>; type WeightInfo = (); } diff --git a/frame/staking/Cargo.toml b/frame/staking/Cargo.toml index 908e361e667e3..e2cfb5fc83043 100644 --- a/frame/staking/Cargo.toml +++ b/frame/staking/Cargo.toml @@ -17,6 +17,7 @@ static_assertions = "1.1.0" serde = { version = "1.0.101", optional = true } codec = { package = "parity-scale-codec", version = "2.0.0", default-features = false, features = ["derive"] } sp-std = { version = "3.0.0", default-features = false, path = "../../primitives/std" } +sp-core = { version = "3.0.0", path = "../../primitives/core", default-features = false } sp-io ={ version = "3.0.0", default-features = false, path = "../../primitives/io" } sp-runtime = { version = "3.0.0", default-features = false, path = "../../primitives/runtime" } sp-staking = { version = "3.0.0", default-features = false, path = "../../primitives/staking" } @@ -36,7 +37,6 @@ rand_chacha = { version = "0.2", default-features = false, optional = true } [dev-dependencies] sp-storage = { version = "3.0.0", path = "../../primitives/storage" } sp-tracing = { version = "3.0.0", path = "../../primitives/tracing" } -sp-core = { version = "3.0.0", path = "../../primitives/core" } pallet-balances = { version = "3.0.0", path = "../balances" } pallet-timestamp = { version = "3.0.0", path = "../timestamp" } pallet-staking-reward-curve = { version = "3.0.0", path = "../staking/reward-curve" } @@ -54,6 +54,7 @@ std = [ "codec/std", "sp-std/std", "sp-io/std", + "sp-core/std", "frame-support/std", "sp-runtime/std", "sp-staking/std", diff --git a/frame/staking/fuzzer/src/mock.rs b/frame/staking/fuzzer/src/mock.rs index 11d810a26e175..a5dc4086d7b48 100644 --- a/frame/staking/fuzzer/src/mock.rs +++ b/frame/staking/fuzzer/src/mock.rs @@ -196,5 +196,6 @@ impl pallet_staking::Config for Test { type UnsignedPriority = (); type OffchainSolutionWeightLimit = (); type WeightInfo = (); + type TaskExecutor = frame_support::executor::SinglePassExecutor>; type ElectionProvider = MockElectionProvider; } diff --git a/frame/staking/src/benchmarking.rs b/frame/staking/src/benchmarking.rs index 1d8a5c1fd6451..f0b4e0bf74601 100644 --- a/frame/staking/src/benchmarking.rs +++ b/frame/staking/src/benchmarking.rs @@ -247,7 +247,7 @@ benchmarks! { // all nominators now should be nominating our validator... for n in nominator_stashes.iter() { - assert!(Nominators::::get(n).unwrap().targets.contains(&stash)); + assert!(Nominators::::get(n).unwrap().targets.iter().any(|(t, _)| t == &stash)); } // we need the unlookuped version of the nominator stash for the kick. @@ -260,7 +260,7 @@ benchmarks! { verify { // all nominators now should *not* be nominating our validator... for n in nominator_stashes.iter() { - assert!(!Nominators::::get(n).unwrap().targets.contains(&stash)); + assert!(!Nominators::::get(n).unwrap().targets.iter().any(|(t, _)| t == &stash)); } } diff --git a/frame/staking/src/lib.rs b/frame/staking/src/lib.rs index c28dbc87bccdd..b8a2aa7177f39 100644 --- a/frame/staking/src/lib.rs +++ b/frame/staking/src/lib.rs @@ -276,8 +276,8 @@ pub mod testing_utils; #[cfg(any(feature = "runtime-benchmarks", test))] pub mod benchmarking; -pub mod slashing; pub mod inflation; +pub mod slashing; pub mod weights; use sp_std::{ @@ -577,7 +577,7 @@ impl StakingLedger where #[derive(PartialEq, Eq, Clone, Encode, Decode, RuntimeDebug)] pub struct Nominations { /// The targets of nomination. - pub targets: Vec, + pub targets: Vec<(AccountId, bool)>, /// The era the nominations were submitted. /// /// Except for initial nominations which are considered submitted at era 0. @@ -778,6 +778,15 @@ pub trait Config: frame_system::Config + SendTransactionTypes> { /// See [Era payout](./index.html#era-payout). type EraPayout: EraPayout>; + /// The task executor of this pallet. + /// + /// The type of the task, the weight quota and executor can all be configured from the runtime. + type TaskExecutor: + executor::StoredExecutor> + + executor::StorageValueShim> + + frame_support::dispatch::Parameter + + Default; + /// Something that can estimate the next session change, accurately or as a best effort guess. type NextNewSession: EstimateNextNewSession; @@ -830,6 +839,128 @@ impl Default for Releases { } } +use frame_support::{CloneNoBound, PartialEqNoBound, EqNoBound, executor::{self, StoredExecutor}}; +/// A task that needs to be stored and executed per slashed validator. +#[derive(Encode, Decode, CloneNoBound, PartialEqNoBound, EqNoBound)] +pub struct SlashTask { + /// The slashed validator. + slashed: T::AccountId, + /// The prefix that must be maintained for all keys. + prefix: Vec, + /// The last iterated key of their nominators. + last_key: Vec, +} + +impl SlashTask { + /// Creates a new instance of `Self` with proper prefix and `last_key`. + /// + /// Will always create a task that begins from the first nominator and iterates all of them. If, + /// for whatever reason, one needs to start mid-way from a specific key, then create `Self` + /// manually. + fn new(slashed: T::AccountId) -> Self { + use frame_support::storage::generator::StorageMap; + Self { + slashed, + prefix: >::prefix_hash(), + last_key: >::prefix_hash(), + } + } +} + +impl Default for SlashTask +where + T::AccountId: Default, +{ + fn default() -> Self { + Self { + slashed: Default::default(), + last_key: Default::default(), + prefix: Default::default(), + } + } +} + +impl sp_std::fmt::Debug for SlashTask { + #[cfg(feature = "std")] + fn fmt(&self, f: &mut sp_std::fmt::Formatter) -> sp_std::fmt::Result { + write!(f, "SlashTask({:?}, {:?})", self.slashed, sp_core::hexdisplay::HexDisplay::from(&self.last_key)) + } + + #[cfg(not(feature = "std"))] + fn fmt(&self, f: &mut sp_std::fmt::Formatter) -> sp_std::fmt::Result { + write!(f, "SlashTask({:?}, {:?})", self.slashed, self.last_key) + } +} + +impl executor::RuntimeTask for SlashTask { + fn execute(mut self, max_weight: Weight) -> (Option, Weight) { + let weight_per_iteration = ::DbWeight::get().reads_writes(1, 1); + let max_iterations = max_weight / weight_per_iteration; + let mut iterated: Weight = 0; + let mut needs_more_iterations = false; + log!( + debug, + "executing SlashTask with {} cap, resulting in {} iterations at most.", + max_weight, + max_iterations + ); + + while let Some(next_key) = sp_io::storage::next_key(self.last_key.as_ref()) { + if !next_key.starts_with(&self.prefix) { + // The keys are done. Break + break; + } + + log!(trace, "[{}] next-key = {:?} ", iterated, next_key); + match frame_support::storage::unhashed::get::>(&next_key) { + Some(mut next_nominations) => { + next_nominations.targets.iter_mut().for_each(|(target, active)| { + if target == &self.slashed { + *active = false; + } + }); + + frame_support::storage::unhashed::put(&next_key, &next_nominations); + + iterated = iterated.saturating_add(1); + self.last_key = next_key; + } + None => { + // we end up here on the last nominator, of a corrupt data. Better to log it. + log!(warn, "an un-decodable nomination detected during SlashTask."); + break; + } + }; + + if iterated >= max_iterations { + // we will brake out because a `next_key` exist, but we no longer have weight. This + // task is not done yet. + needs_more_iterations = true; + break; + } + + // TODO: 4 cases need to be considered: 1- a key is added before the current + // cursor: don't care. 2- a key is removed after the current cursor: don't care. + // 3- a key is added after the current cursor: worth having a test, but no + // biggie. 4- a key is removed after the current cursor: worth having a test, + // but no biggie. 5- the current key is deleted: then the next_key must be + // updated. + // TODO: also need a test to ensure that this iterates over all nominators, skipping + // the first one seems likely. + } + + let consumed_weight = weight_per_iteration.saturating_mul(iterated); + // we are guaranteed that `max_iterations * weight_per_iter <= max_weight`, given `iterated + // <= max_iterations`, thus `consumed_weight <= max_weight`. + debug_assert!(iterated <= max_iterations); + debug_assert!(consumed_weight <= max_weight); + + let leftover_task = if needs_more_iterations { Some(self) } else { None }; + + (leftover_task, consumed_weight) + } +} + decl_storage! { trait Store for Module as Staking { /// Number of eras to keep in history. @@ -986,6 +1117,9 @@ decl_storage! { /// The earliest era for which we have a pending, unapplied slash. EarliestUnappliedSlash: Option; + /// Task scheduler for chill tasks + SlashTaskExecutor get(fn slash_task_executor): T::TaskExecutor; + /// The last planned session scheduled by the session pallet. /// /// This is basically in sync with the call to [`SessionManager::new_session`]. @@ -1075,6 +1209,41 @@ pub mod migrations { T::DbWeight::get().writes(6 + 1) } } + + #[derive(Decode)] + struct OldNominations { + targets: Vec, + submitted_in: EraIndex, + suppressed: bool, + } + + pub fn migrate_to_reversable_slash() -> frame_support::weights::Weight { + Nominators::::translate::, _>(|_, nomination| { + let targets = nomination + .targets + .iter() + .map(|target| { + ( + target.clone(), + >::get(&target).map_or(true, |spans| { + nomination.submitted_in >= spans.last_nonzero_slash() + }), + /* active if no slashing span, or if submitted after the last nonzero + * slash. */ + ) + }) + .collect::>(); + Some(Nominations { + targets, + suppressed: nomination.suppressed, + submitted_in: nomination.submitted_in, + }) + // TODO: any reason to keep the submitted_in around? I don't think so. Remove it now or + // in a later migration. + }); + + T::BlockWeights::get().max_block + } } decl_event!( @@ -1182,6 +1351,26 @@ decl_module! { fn deposit_event() = default; + fn on_initialize(_now: T::BlockNumber) -> Weight { + let task_weight = if >::decode_len().unwrap_or_default() > 0 { + let task_weight = >::mutate(|e| e.execute()); + // each execution of the executor must consume less weight than its quota. + debug_assert!( + task_weight <= ::Quota::get(), + "expected a maximum weight consumption of {}, got {}", + task_weight, + ::Quota::get(), + ); + // The additional weight of reading the tasks, and writing back the result. + T::DbWeight::get().reads_writes(1, 1).saturating_add(task_weight) + } else { + T::DbWeight::get().reads(1) + }; + + let finalize_weight = T::DbWeight::get().reads(1); + task_weight.saturating_add(finalize_weight) + } + fn on_runtime_upgrade() -> Weight { if StorageVersion::get() == Releases::V5_0_0 { migrations::v6::migrate::() @@ -1190,11 +1379,6 @@ decl_module! { } } - fn on_initialize(_now: T::BlockNumber) -> Weight { - // just return the weight of the on_finalize. - T::DbWeight::get().reads(1) - } - fn on_finalize() { // Set the start of the first era. if let Some(mut active_era) = Self::active_era() { @@ -1510,16 +1694,19 @@ decl_module! { ensure!(!targets.is_empty(), Error::::EmptyTargets); ensure!(targets.len() <= T::MAX_NOMINATIONS as usize, Error::::TooManyTargets); - let old = Nominators::::get(stash).map_or_else(Vec::new, |x| x.targets); + let old = Nominators::::get(stash).map_or_else( + Vec::new, + |x| x.targets.into_iter().map(|(x, _)| x).collect::>(), + ); let targets = targets.into_iter() .map(|t| T::Lookup::lookup(t).map_err(DispatchError::from)) .map(|n| n.and_then(|n| if old.contains(&n) || !Validators::::get(&n).blocked { - Ok(n) + Ok((n, true)) // all newly submitted nominations are active again TODO: test for this. } else { Err(Error::::BadTarget.into()) })) - .collect::, _>>()?; + .collect::, _>>()?; let nominations = Nominations { targets, @@ -1917,7 +2104,7 @@ decl_module! { .into_iter() { Nominators::::mutate(&nom_stash, |maybe_nom| if let Some(ref mut nom) = maybe_nom { - if let Some(pos) = nom.targets.iter().position(|v| v == stash) { + if let Some(pos) = nom.targets.iter().position(|(v, _)| v == stash) { nom.targets.swap_remove(pos); Self::deposit_event(RawEvent::Kicked(nom_stash.clone(), stash.clone())); } @@ -2506,20 +2693,14 @@ impl Module { all_voters.push(self_vote); } - // collect all slashing spans into a BTreeMap for further queries. - let slashing_spans = >::iter().collect::>(); - for (nominator, nominations) in >::iter() { - let Nominations { submitted_in, mut targets, suppressed: _ } = nominations; - - // Filter out nomination targets which were nominated before the most recent - // slashing span. - targets.retain(|stash| { - slashing_spans - .get(stash) - .map_or(true, |spans| submitted_in >= spans.last_nonzero_slash()) - }); + let Nominations { targets: full_targets, suppressed: _, submitted_in: _ } = nominations; + // just filter out those who are not active. + let targets = full_targets + .into_iter() + .filter_map(|(target, active)| if active { Some(target) } else { None }) + .collect::>(); let vote_weight = weight_of(&nominator); all_voters.push((nominator, vote_weight, targets)) } @@ -2646,7 +2827,11 @@ impl frame_election_provider_support::ElectionDataProvider>::insert( v, - Nominations { targets: t, submitted_in: 0, suppressed: false }, + Nominations { + targets: t.into_iter().map(|t| (t, true)).collect::>(), + submitted_in: 0, + suppressed: false, + }, ); }); } @@ -2835,6 +3020,12 @@ where continue } + // if this is a non-zero slash, schedule tasks to chill their nominations. + if !slash_fraction.is_zero() { + let task = SlashTask::new(stash.clone()); + >::append(task); + } + let unapplied = slashing::compute_slash::(slashing::SlashParams { stash, slash: *slash_fraction, diff --git a/frame/staking/src/mock.rs b/frame/staking/src/mock.rs index 188eda801095e..1b76b257c7229 100644 --- a/frame/staking/src/mock.rs +++ b/frame/staking/src/mock.rs @@ -216,6 +216,8 @@ parameter_types! { pub const BondingDuration: EraIndex = 3; pub const RewardCurve: &'static PiecewiseLinear<'static> = &I_NPOS; pub const MaxNominatorRewardedPerValidator: u32 = 64; + pub static SlashTaskWeight: Weight = + ::DbWeight::get().reads_writes(1, 1) * 4; } thread_local! { @@ -258,6 +260,7 @@ impl Config for Test { type NextNewSession = Session; type MaxNominatorRewardedPerValidator = MaxNominatorRewardedPerValidator; type ElectionProvider = onchain::OnChainSequentialPhragmen; + type TaskExecutor = executor::SinglePassExecutor, SlashTaskWeight>; type WeightInfo = (); } @@ -350,6 +353,10 @@ impl ExtBuilder { self.has_stakers = has; self } + pub fn slash_task_weight(self, weight: Weight) -> Self { + SlashTaskWeight::set(weight); + self + } pub fn initialize_first_session(mut self, init: bool) -> Self { self.initialize_first_session = init; self diff --git a/frame/staking/src/tests.rs b/frame/staking/src/tests.rs index 7a3ec19f8af2f..c7215338d2836 100644 --- a/frame/staking/src/tests.rs +++ b/frame/staking/src/tests.rs @@ -111,7 +111,7 @@ fn basic_setup_works() { Staking::ledger(100), Some(StakingLedger { stash: 101, total: 500, active: 500, unlocking: vec![], claimed_rewards: vec![] }) ); - assert_eq!(Staking::nominators(101).unwrap().targets, vec![11, 21]); + assert_eq!(Staking::nominators(101).unwrap().targets, vec![(11, true), (21, true)]); assert_eq!( Staking::eras_stakers(Staking::active_era().unwrap().index, 11), @@ -381,7 +381,7 @@ fn blocking_and_kicking_works() { // attempt to nominate from 100/101... assert_ok!(Staking::nominate(Origin::signed(100), vec![11])); // should have worked since we're already nominated them - assert_eq!(Nominators::::get(&101).unwrap().targets, vec![11]); + assert_eq!(Nominators::::get(&101).unwrap().targets, vec![(11, true)]); // kick the nominator assert_ok!(Staking::kick(Origin::signed(10), vec![101])); // should have been kicked now @@ -3885,9 +3885,9 @@ mod election_data_provider { } #[test] - fn voters_exclude_slashed() { + fn voters_exclude_inactive() { ExtBuilder::default().build().execute_with(|| { - assert_eq!(Staking::nominators(101).unwrap().targets, vec![11, 21]); + assert_eq!(Staking::nominators(101).unwrap().targets, vec![(11, true), (21, true)]); assert_eq!( >::voters(None) .unwrap() @@ -3899,11 +3899,11 @@ mod election_data_provider { vec![11, 21] ); - start_active_era(1); - add_slash(&11); + >::mutate(101, |maybe_nom| { + maybe_nom.as_mut().unwrap().targets[0].1 = false + }); // 11 is gone. - start_active_era(2); assert_eq!( >::voters(None) .unwrap() @@ -3972,3 +3972,240 @@ mod election_data_provider { }) } } + +#[test] +fn re_nomination_actives_votes() { + ExtBuilder::default().build().execute_with(|| { + >::mutate(101, |maybe_nom| { + maybe_nom.as_mut().unwrap().targets[0].1 = false + }); + assert_eq!(Staking::nominators(101).unwrap().targets, vec![(11, false), (21, true)]); + + // resubmit and it is back + assert_ok!(Staking::nominate(Origin::signed(100), vec![11, 21])); + assert_eq!(Staking::nominators(101).unwrap().targets, vec![(11, true), (21, true)]); + }) +} + +mod slash_task_executor { + use super::*; + use frame_support::storage::generator::StorageMap; + use frame_support::executor::StoredExecutor; + + #[test] + fn wrong_starting_key_is_noop() { + ExtBuilder::default().build().execute_with(|| { + // a task with wrong starting key. + let wrong_key = >::prefix_hash(); + let task = SlashTask:: { + slashed: 11, + prefix: >::prefix_hash(), + last_key: wrong_key, + }; + + >::mutate(|e| e.add_task(task)); + assert_eq!(Staking::slash_task_executor().count(), 1); + + let prev_nominators = >::iter().collect::>(); + >::mutate(|e| e.execute()); + + // task removed, nothing else notable changed. + assert_eq!(Staking::slash_task_executor().count(), 0); + assert_eq!(>::iter().collect::>(), prev_nominators); + }) + } + + #[test] + fn storage_prefix_next_key_works() { + ExtBuilder::default().build_and_execute(|| { + // when building a task for slashing events, we expect calling next key on + // `prefix_hash()` of `>` to yield the key of the first nominator. This + // test ensures that this assumption is help by frame-support. + let base_nominators_key = >::prefix_hash(); + let (first_nominator, _) = >::iter().collect::>()[0]; + let first_nominator_key = >::storage_map_final_key(first_nominator); + assert_eq!( + sp_io::storage::next_key(&base_nominators_key).unwrap(), + first_nominator_key + ); + assert!(matches!( + frame_support::storage::unhashed::get::>( + &first_nominator_key + ), + Some(_), + )); + + // when having just one nominator, + assert_eq!(>::iter().count(), 1); + // calling next_key on that nominator again will give a key which is no longer + // decodable. + let next_key = sp_io::storage::next_key(&first_nominator_key).unwrap(); + assert!(matches!( + frame_support::storage::unhashed::get::>(&next_key), + None, + )); + }) + } + + #[test] + fn basic_task_work() { + ExtBuilder::default().build_and_execute(|| { + assert_eq!(>::iter().count(), 1); + run_to_block(1); + assert_eq!(Staking::slash_task_executor().count(), 0); + add_slash(&11); + + // task is added after slash. + assert_eq!(Staking::slash_task_executor().count(), 1); + assert_eq!( + Staking::slash_task_executor().tasks(), + vec![SlashTask { + slashed: 11, + last_key: >::prefix_hash(), + prefix: >::prefix_hash() + }] + ); + + // but nothing is executed. Both votes are `true`. + assert_eq!( + >::iter().map(|(who, n)| (who, n.targets)).collect::>(), + vec![(101, vec![(11, true), (21, true)])], + ); + + // next block, all of them are executed. + run_to_block(2); + + assert_eq!(Staking::slash_task_executor().count(), 0); + assert_eq!( + >::iter().map(|(who, n)| (who, n.targets)).collect::>(), + vec![(101, vec![(11, false), (21, true)])], + ); + }); + } + + #[test] + fn execute_over_multiple_blocks() { + ExtBuilder::default() + .slash_task_weight( + ::DbWeight::get().reads_writes(1, 1) * 3, + ) + // we allow 10 iterations per execution. + .build_and_execute(|| { + let count_applied_tasks = || { + // those who are still actively voting for 11. + let unapplied = + >::iter().filter(|(_, n)| n.targets[0].1).count(); + // those who got chilled. + let applied = + >::iter().filter(|(_, n)| !n.targets[0].1).count(); + + // assert on the fly that the votes for 21 never change + assert!(>::iter().all(|(_, n)| n.targets[1].1)); + + (applied, unapplied) + }; + + // remove the default nominator in a rather harsh way. + >::remove(101); + assert_eq!(>::iter().count(), 0); + + // bond 10 fresh nominators, all voting for 11 + (0..10).for_each(|n| bond_nominator(1000 + n, 2000 + n, 100, vec![11, 21])); + + assert_eq!(Staking::slash_task_executor().count(), 0); + run_to_block(1); + add_slash(&11); + assert_eq!(Staking::slash_task_executor().count(), 1); + + // 3 votes chilled + run_to_block(2); + assert_eq!(count_applied_tasks(), (3, 7)); + + // 3 votes chilled + run_to_block(3); + assert_eq!(count_applied_tasks(), (6, 4)); + + // 3 votes chilled + run_to_block(4); + assert_eq!(count_applied_tasks(), (9, 1)); + assert_eq!(Staking::slash_task_executor().count(), 1); + + // 3 votes chilled + run_to_block(5); + assert_eq!(count_applied_tasks(), (10, 0)); + assert_eq!(Staking::slash_task_executor().count(), 0); + }) + } + + #[test] + fn overlapping_slash() { + ExtBuilder::default() + .slash_task_weight( + ::DbWeight::get().reads_writes(1, 1) * 3, + ) + // we allow 10 iterations per execution. + .build_and_execute(|| { + let count_applied_chilled_for_validator_index = |at: usize| { + // those who got chilled. + let applied = + >::iter().filter(|(_, n)| !n.targets[at].1).count(); + // those who are still actively voting for 11. + let unapplied = + >::iter().filter(|(_, n)| n.targets[at].1).count(); + + (applied, unapplied) + }; + + // remove the default nominator in a rather harsh way. + >::remove(101); + assert_eq!(>::iter().count(), 0); + + // bond 10 fresh nominators, all voting for 11 + (0..10).for_each(|n| bond_nominator(1000 + n, 2000 + n, 100, vec![11, 21])); + + assert_eq!(Staking::slash_task_executor().count(), 0); + run_to_block(1); + add_slash(&11); + assert_eq!(Staking::slash_task_executor().count(), 1); + assert_eq!(count_applied_chilled_for_validator_index(0), (0, 10)); + + // 3 votes chilled + run_to_block(2); + assert_eq!(count_applied_chilled_for_validator_index(0), (3, 7)); + + // 3 votes chilled + run_to_block(3); + assert_eq!(count_applied_chilled_for_validator_index(0), (6, 4)); + + // a new slash coming! + add_slash(&21); + assert_eq!(Staking::slash_task_executor().count(), 2); + + // 3 votes chilled from the first task, none from the second. + run_to_block(4); + assert_eq!(count_applied_chilled_for_validator_index(0), (9, 1)); + assert_eq!(count_applied_chilled_for_validator_index(1), (0, 10)); + assert_eq!(Staking::slash_task_executor().count(), 2); + + // 1 votes chilled form the first task and removed, 2 task chilled form the second. + run_to_block(5); + assert_eq!(count_applied_chilled_for_validator_index(0), (10, 0)); + assert_eq!(count_applied_chilled_for_validator_index(1), (2, 8)); + assert_eq!(Staking::slash_task_executor().count(), 1); + + // second task progresses as normal. + run_to_block(6); + assert_eq!(count_applied_chilled_for_validator_index(1), (5, 5)); + assert_eq!(Staking::slash_task_executor().count(), 1); + + run_to_block(7); + assert_eq!(count_applied_chilled_for_validator_index(1), (8, 2)); + assert_eq!(Staking::slash_task_executor().count(), 1); + + // all done. + run_to_block(8); + assert_eq!(count_applied_chilled_for_validator_index(1), (10, 0)); + assert_eq!(Staking::slash_task_executor().count(), 0); + }) + } +} diff --git a/frame/support/Cargo.toml b/frame/support/Cargo.toml index 7b1179122b973..faeb5a1813950 100644 --- a/frame/support/Cargo.toml +++ b/frame/support/Cargo.toml @@ -37,6 +37,7 @@ log = { version = "0.4.14", default-features = false } pretty_assertions = "0.6.1" frame-system = { version = "3.0.0", path = "../system" } parity-util-mem = { version = "0.9.0", default-features = false, features = ["primitive-types"] } +env_logger = "0.8.0" [features] default = ["std"] diff --git a/frame/support/src/executor.rs b/frame/support/src/executor.rs new file mode 100644 index 0000000000000..60e7a86beed21 --- /dev/null +++ b/frame/support/src/executor.rs @@ -0,0 +1,567 @@ +// This file is part of Substrate. +// Copyright (C) 2021 Parity Technologies (UK) Ltd. + +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use sp_std::prelude::*; +use crate::{weights::Weight, traits::Get, storage}; +use codec::{Encode, Decode}; +use sp_runtime::traits::Zero; +use crate::{RuntimeDebugNoBound, PartialEqNoBound, EqNoBound, CloneNoBound}; + +const LOG_TARGET: &'static str = "runtime::task_executor"; + +/// A task that can be stored in storage and executed at some later time. +/// +/// This trait itself does not make any assumptions about *when* the task is executed. As far as +/// this trait is concerned, it can be now, all at once. It can be executed as mandatory work in +/// `on_initialize` or `on_finalize`, or in some other low priority circumstance (e.g. on_idle). +/// +/// If the type implementing this trait is generic over `` then one needs to derive +/// [`CloneNoBound`], [`PartialEqNoBound`], [`EqNoBound`], [`RuntimeDebugNoBound`], as opposed to +/// their normal counterparts, and implement `Default` manually. This is to prevent `T` to be +/// bounded to these traits. +pub trait RuntimeTask: + Sized + Clone + Default + Encode + Decode + PartialEq + Eq + sp_std::fmt::Debug + codec::EncodeLike +{ + /// Execute the task while consuming self. The task must not most consume more than `max_weight` + /// under any circumstance. Consuming *less* than `max_weight` is allowed. + /// + /// A tuple is returned, where the items are as follows: + /// 1. Option, where `None` means that this task is now complete (and shall not be kept + /// in storage anymore), and `Some(_)` indicating that this task is not yet complete, and + /// should be executed at a later time. + /// 2. The actual amount of weight that was consumed. Must always be less than `max_weight`. + /// parameter. + /// + /// It is critically important for a task to only return a non-zero consumed weight **ONLY if it + /// _actually did something_**. If a positive weight is returned, then an executor could + /// interpret this as a task that could use another execution slot, and continue the execution + /// potentially for numerous iterations. + fn execute(self, max_weight: Weight) -> (Option, Weight); + + /// The leftover weight that this task expects to execute, if any. + #[cfg(test)] + fn leftover(&self) -> Weight; +} + +#[cfg(any(test, feature = "std"))] +impl RuntimeTask for () { + fn execute(self, _: Weight) -> (Option, Weight) { + (None, 0) + } + #[cfg(test)] + fn leftover(&self) -> Weight { + 0 + } +} + +/// Common trait for a executor that is stored as a storage item. +pub trait StoredExecutor: codec::FullCodec { + /// The task type used by this executor. + type Task: RuntimeTask; + + /// Something that can define how much weight quote this executor is allowed to use per + /// execution. + type Quota: Get; + + /// Execute all tasks based on an unspecified strategy, consuming at most `Self::Quota` and + /// returning the actual amount of weight consumed. + /// + /// The returned weight must take into account the cost of internal operations of the + /// implementation, such as scheduling, as well. But, it DOES NOT take into account any + /// potential storage operations that needed to be performed to fetch `Self` from storage. + /// + /// A sensible patter of using an implementation of this trait is therefore: + /// + /// ```ignore + /// let mut consumed = >::mutate(|e| e.execute()); + /// consumed += ; + /// ``` + // TODO: while the work that this does itself is pretty negligible, must benchmark it anyhow and + // take it into account. + fn execute(&mut self) -> Weight; + + /// Create a new (empty) instance of [`Self`]. + fn new() -> Self; + + /// Add a new task to the internal state. + fn add_task(&mut self, task: Self::Task); + + /// Remove all tasks, without executing any of them. + fn clear(&mut self); + + /// Removes the first task that is equal to `task`. + fn remove(&mut self, task: Self::Task); + + /// Returns the number of current tasks. + fn count(&self) -> usize; + + /// Return a vector of all tasks. + #[cfg(any(test, feature = "std"))] + fn tasks(&self) -> Vec; + // TODO: providing an iter() might also be good. +} + +#[cfg(any(test, feature = "std"))] +impl StoredExecutor for () { + type Task = (); + type Quota = (); + + fn execute(&mut self) -> Weight { + unreachable!() + } + fn new() -> Self { + unreachable!() + } + fn add_task(&mut self, _: Self::Task) { + unreachable!() + } + fn clear(&mut self) { + unreachable!() + } + fn remove(&mut self, _: Self::Task) { + unreachable!() + } + fn count(&self) -> usize { + unreachable!() + } + fn tasks(&self) -> Vec { + unreachable!() + } +} + +/// An executor that only tries to execute a single pass on a given list of tasks in each +/// execution. +/// +/// This is suitable for homogenous tasks. Otherwise, if among a the inner task queue one of the +/// intermediate ones fails to consume any weight, it is sensible to re-try all the previous ones +/// again as well. +#[derive(Encode, Decode, RuntimeDebugNoBound, PartialEqNoBound, EqNoBound, CloneNoBound)] +pub struct SinglePassExecutor = ()> { + /// The queue of tasks. + pub(crate) tasks: Vec, + _marker: sp_std::marker::PhantomData, +} + +// TODO: can't we just have a DefaultNoBound as well? then we can ditch this. +impl> Default for SinglePassExecutor { + fn default() -> Self { + Self { tasks: vec![], _marker: sp_std::marker::PhantomData } + } +} + +impl> StoredExecutor for SinglePassExecutor { + type Task = Task; + type Quota = Quota; + + fn new() -> Self { + Self { tasks: vec![], _marker: Default::default() } + } + + fn add_task(&mut self, task: Task) { + self.tasks.push(task) + } + + fn clear(&mut self) { + self.tasks.clear() + } + + fn remove(&mut self, task: Task) { + let maybe_index = self.tasks.iter().position(|t| t == &task); + if let Some(index) = maybe_index { + self.tasks.remove(index); + } + } + + fn count(&self) -> usize { + self.tasks.len() + } + + #[cfg(any(test, feature = "std"))] + fn tasks(&self) -> Vec { + self.tasks.clone() + } + + fn execute(&mut self) -> Weight { + let max_weight = Self::Quota::get(); + let (next_tasks, consumed) = single_pass::(self.tasks.as_ref(), max_weight); + self.tasks = next_tasks; + consumed + } +} + +macro_rules! impl_append_decode_len_shim { + ($executor:ident) => { + // I broke the seal.. forgive me @bkchr. + impl storage::private::Sealed for $executor + where + Task: RuntimeTask, + Quota: Get, + {} + impl storage::StorageAppend for $executor + where + Task: RuntimeTask, + Quota: Get, + {} + impl storage::StorageDecodeLength for $executor + where + Task: RuntimeTask, + Quota: Get, + {} + impl codec::DecodeLength for $executor + where + Task: RuntimeTask, + Quota: Get, + { + fn len(mut self_encoded: &[u8]) -> Result { + use sp_std::convert::TryFrom; + // `SinglePassExecutor` stored just a `Vec`, thus the length is at the + // beginning in `Compact` form. + usize::try_from(u32::from(codec::Compact::::decode(&mut self_encoded)?)) + .map_err(|_| "Failed convert decoded size into usize.".into()) + } + } + }; +} + +impl_append_decode_len_shim!(SinglePassExecutor); + +/// Aggregator trait to indicate an executor with task `Task` has `decode_len` and `append`. +pub trait StorageValueShim: + codec::DecodeLength + + storage::StorageDecodeLength + + storage::private::Sealed + + storage::StorageAppend +{} + +impl StorageValueShim for S where + S: codec::DecodeLength + + storage::StorageDecodeLength + + storage::private::Sealed + + storage::StorageAppend, + Task: RuntimeTask +{} + +/// Make a single pass over some tasks, returning a new set of tasks that remain un-finished, along +/// the consumed weight. +/// +/// This is useful for different scheduling strategies. +pub(crate) fn single_pass(tasks: &[T], max_weight: Weight) -> (Vec, Weight) { + // just a tiny optimization for this edge case + if tasks.is_empty() || max_weight.is_zero() { + return (tasks.to_vec(), Zero::zero()); + } + + let mut leftover_weight = max_weight; + let next_tasks = tasks + .iter() + .cloned() + .filter_map(|task| { + if leftover_weight.is_zero() { + return Some(task); + } + + let (maybe_leftover_task, consumed) = task.execute(leftover_weight); + leftover_weight = leftover_weight.saturating_sub(consumed); + maybe_leftover_task + }) + .collect::>(); + + log::debug!( + target: LOG_TARGET, + "executed a single pass.\nPrev tasks = {:?}\nNext tasks = {:?}", + tasks, + next_tasks, + ); + + (next_tasks, max_weight.saturating_sub(leftover_weight)) +} + +#[cfg(test)] +mod tests { + use super::*; + + crate::parameter_types! { + static Quota: Weight = 10; + } + + /// A test task. + #[derive(Clone, Encode, Decode, Default, PartialEq, Eq, Debug)] + struct Task { + /// The amount of weight that this task will consume. + weight: Weight, + /// If set to a non-zero number, the in the first `half` time this task is `execute`ed, it + /// will only consume `self.weight / 2`, and decrement `self.half`. Once `self.half` is + /// zero, it will try to consume the whole `self.weight`. + half: u8, + /// If set to `true`, it will only consume some weight upon `execute` IFF it can consume the + /// entire `self.weight`. In other words, this if set to true, this task will have a + /// all-or-none execution. + /// + /// Note that if combined with `self.half > 0`, this behavior is changed and in that case + /// only half of `self.weight` is the subject; if it can consume all of `self.weight / 2`, + /// it will, else it will consume nothing. + greedy: bool, + } + + struct TaskBuilder { + half: u8, + greedy: bool, + } + + impl Default for TaskBuilder { + fn default() -> Self { + Self { half: 0, greedy: true } + } + } + + impl TaskBuilder { + fn half(mut self, half: u8) -> Self { + self.half = half; + self + } + + fn greedy(mut self, greedy: bool) -> Self { + self.greedy = greedy; + self + } + + fn build(self, weight: Weight) -> Task { + Task { weight, greedy: self.greedy, half: self.half } + } + } + + impl Task { + /// Should be called after `self.weight` has been reduce to reflect the update of an + /// execution, to determine of this task should live or not. + fn maybe_destroy(self) -> Option { + if self.weight > 0 { + Some(self) + } else { + None + } + } + + /// Should consume `amount` of `Self`'s weight, capping it at `max_weight`. + fn consume(mut self, amount: Weight, max_weight: Weight) -> (Option, Weight) { + let consumed = if self.greedy { + if amount > max_weight { + // we are greedy and we need more than max_weight, consume all of it. + self.weight -= max_weight; + max_weight + } else { + // we are greedy and max_weight is enough. Destroy self. + self.weight -= amount; + amount + } + } else { + if amount > max_weight { + // we are not greedy and max_weight is not enough, thus noop. + 0 + } else { + // we are not greedy and max_weight is enough, thus destroy self. + self.weight -= amount; + amount + } + }; + + (self.maybe_destroy(), consumed) + } + } + + impl RuntimeTask for Task { + fn execute(mut self, max_weight: Weight) -> (Option, Weight) { + let weight_needed = self.weight; + match self.half { + 0 => { + // at this point we try and consume as much as possible. + self.consume(weight_needed, max_weight) + } + _ => { + // try and consume either half of your needed weight, or all of the available, + // if it is less. + self.half -= 1; + self.consume(weight_needed / 2, max_weight) + } + } + } + + fn leftover(&self) -> Weight { + self.weight + } + } + + fn remaining_weights_of>( + executor: &E, + ) -> Vec { + executor.tasks().iter().map(|t| t.leftover()).collect::>() + } + + #[test] + fn shim_works() { + macro_rules! shim_test { + ($executor: ident) => { + sp_io::TestExternalities::default().execute_with(|| { + $executor::append(TaskBuilder::default().build(10)); + $executor::append(TaskBuilder::default().build(20)); + + assert_eq!($executor::decode_len().unwrap(), 2); + $executor::append(TaskBuilder::default().build(30)); + assert_eq!($executor::decode_len().unwrap(), 3); + + // without the shim + assert_eq!($executor::get().unwrap().count(), 3); + assert_eq!( + remaining_weights_of(&$executor::get().unwrap()), + vec![10, 20, 30], + ); + }); + }; + } + // a representation of a single-pass executor that is stored as a storage value. + crate::generate_storage_alias!( + DoNotCareAtAll, TestStoredSinglePassExecutor => Value> + ); + + shim_test!(TestStoredSinglePassExecutor); + } + + #[test] + fn single_pass_less_weight_than_than_single_task() { + // execute a series of tasks with less weight per block for single task. + Quota::set(7); + let mut executor = SinglePassExecutor::::new(); + executor.add_task(TaskBuilder::default().build(10)); + executor.add_task(TaskBuilder::default().build(10)); + executor.add_task(TaskBuilder::default().build(10)); + assert_eq!(remaining_weights_of(&executor), vec![10, 10, 10]); + + assert_eq!(executor.execute(), 7); + assert_eq!(remaining_weights_of(&executor), vec![3, 10, 10]); + + assert_eq!(executor.execute(), 7); + assert_eq!(remaining_weights_of(&executor), vec![6, 10]); + + assert_eq!(executor.execute(), 7); + assert_eq!(remaining_weights_of(&executor), vec![9]); + + assert_eq!(executor.execute(), 7); + assert_eq!(remaining_weights_of(&executor), vec![2]); + + assert_eq!(executor.execute(), 2); + assert_eq!(remaining_weights_of(&executor), Vec::::new()); + + // noop + assert_eq!(executor.execute(), 0); + } + + #[test] + fn single_pass_more_weight_than_than_single_task() { + // execute a series of tasks with less weight per block for single task. + let mut executor = SinglePassExecutor::::new(); + executor.add_task(TaskBuilder::default().build(10)); + executor.add_task(TaskBuilder::default().build(10)); + executor.add_task(TaskBuilder::default().build(10)); + assert_eq!(remaining_weights_of(&executor), vec![10, 10, 10]); + + Quota::set(12); + assert_eq!(executor.execute(), 12); + assert_eq!(remaining_weights_of(&executor), vec![8, 10]); + + assert_eq!(executor.execute(), 12); + assert_eq!(remaining_weights_of(&executor), vec![6]); + + assert_eq!(executor.execute(), 6); + assert_eq!(remaining_weights_of(&executor), Vec::::new()); + + // noop + assert_eq!(executor.execute(), 0); + } + + #[test] + fn single_pass_equal_weight_to_single_task() { + // execute a series of tasks with less weight per block for single task. + let mut executor = SinglePassExecutor::::new(); + executor.add_task(TaskBuilder::default().build(10)); + executor.add_task(TaskBuilder::default().build(10)); + executor.add_task(TaskBuilder::default().build(10)); + assert_eq!(remaining_weights_of(&executor), vec![10, 10, 10]); + + Quota::set(10); + assert_eq!(executor.execute(), 10); + assert_eq!(remaining_weights_of(&executor), vec![10, 10]); + + assert_eq!(executor.execute(), 10); + assert_eq!(remaining_weights_of(&executor), vec![10]); + + assert_eq!(executor.execute(), 10); + assert_eq!(remaining_weights_of(&executor), Vec::::new()); + + // noop + assert_eq!(executor.execute(), 0); + } + + #[test] + fn where_additional_pass_is_useful() { + // This is an example where a single pass execution is sab-par. + let _ = env_logger::try_init(); + let mut executor = SinglePassExecutor::::new(); + executor.add_task(TaskBuilder::default().half(1).greedy(false).build(30)); + executor.add_task(TaskBuilder::default().half(1).greedy(false).build(20)); + executor.add_task(TaskBuilder::default().half(1).greedy(false).build(10)); + + // first batch, we consume 15 + 10 + 5 = 30. We have 6 leftover, and the last 5 could have + // been consumed, but nothing we can do. + Quota::set(36); + assert_eq!(executor.execute(), 30); + assert_eq!(remaining_weights_of(&executor), vec![15, 10, 5]); + } + + #[test] + fn empty_executor_is_noop() { + fn with_executor>(mut executor: E) { + assert_eq!(remaining_weights_of(&executor), Vec::::new()); + + Quota::set(0); + assert_eq!(executor.execute(), 0); + assert_eq!(remaining_weights_of(&executor), Vec::::new()); + + assert_eq!(executor.execute(), 0); + assert_eq!(remaining_weights_of(&executor), Vec::::new()); + } + + with_executor(SinglePassExecutor::::new()); + } + + #[test] + fn no_weight_allowed_is_noop() { + fn with_executor>(mut executor: E) { + executor.add_task(TaskBuilder::default().build(10)); + executor.add_task(TaskBuilder::default().build(10)); + executor.add_task(TaskBuilder::default().build(10)); + assert_eq!(remaining_weights_of(&executor), vec![10, 10, 10]); + + Quota::set(0); + assert_eq!(executor.execute(), 0); + assert_eq!(remaining_weights_of(&executor), vec![10, 10, 10]); + + assert_eq!(executor.execute(), 0); + assert_eq!(remaining_weights_of(&executor), vec![10, 10, 10]); + } + + with_executor(SinglePassExecutor::::new()); + } +} diff --git a/frame/support/src/lib.rs b/frame/support/src/lib.rs index 362c4c5a0a73b..afcf2a971f4cb 100644 --- a/frame/support/src/lib.rs +++ b/frame/support/src/lib.rs @@ -68,10 +68,11 @@ pub mod error; pub mod traits; pub mod weights; pub mod instances; +pub mod executor; pub use self::hash::{ Twox256, Twox128, Blake2_256, Blake2_128, Identity, Twox64Concat, Blake2_128Concat, Hashable, - StorageHasher, ReversibleStorageHasher + StorageHasher, ReversibleStorageHasher, }; pub use self::storage::{ StorageValue, StorageMap, StorageDoubleMap, StoragePrefixedMap, IterableStorageMap, diff --git a/frame/support/src/storage/mod.rs b/frame/support/src/storage/mod.rs index e00a3fe831829..febbc43c5b7b2 100644 --- a/frame/support/src/storage/mod.rs +++ b/frame/support/src/storage/mod.rs @@ -804,7 +804,7 @@ pub trait StorageDecodeLength: private::Sealed + codec::DecodeLength { /// Provides `Sealed` trait to prevent implementing trait `StorageAppend` & `StorageDecodeLength` /// outside of this crate. -mod private { +pub(crate) mod private { use super::*; pub trait Sealed {}