Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
actions queue
Browse files Browse the repository at this point in the history
  • Loading branch information
shawntabrizi committed Feb 3, 2021
1 parent a785985 commit b2e9011
Showing 1 changed file with 137 additions and 76 deletions.
213 changes: 137 additions & 76 deletions runtime/parachains/src/paras.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use sp_std::result;
#[cfg(feature = "std")]
use sp_std::marker::PhantomData;
use primitives::v1::{
Id as ParaId, ValidationCode, HeadData,
Id as ParaId, ValidationCode, HeadData, SessionIndex,
};
use sp_runtime::traits::One;
use frame_support::{
Expand All @@ -52,6 +52,9 @@ pub trait Config: frame_system::Config + configuration::Config {
+ Into<result::Result<Origin, <Self as Config>::Origin>>;
}

// Wait until the session index is 2 larger then the current index to apply any changes.
const SESSION_DELAY: SessionIndex = 2;

// the two key times necessary to track for every code replacement.
#[derive(Default, Encode, Decode)]
#[cfg_attr(test, derive(Debug, Clone, PartialEq))]
Expand Down Expand Up @@ -96,9 +99,7 @@ enum UseCodeAt<N> {
#[derive(PartialEq, Eq, Clone, Encode, Decode, RuntimeDebug)]
pub enum ParaLifecycle {
/// Para is new and is onboarding as a Parathread.
OnboardingAsParathread,
/// Para is new and is onboarding as a Parachain.
OnboardingAsParachain,
Onboarding,
/// Para is a Parathread.
Parathread,
/// Para is a Parachain.
Expand All @@ -108,14 +109,12 @@ pub enum ParaLifecycle {
/// Para is a Parachain which is downgrading to a Parathread.
DowngradingToParathread,
/// Parathread is being offboarded.
OutgoingParathread,
/// Parachain is being offboarded.
OutgoingParachain,
Offboarding,
}

impl ParaLifecycle {
pub fn is_onboarding(&self) -> bool {
matches!(self, ParaLifecycle::OnboardingAsParathread | ParaLifecycle::OnboardingAsParachain)
matches!(self, ParaLifecycle::Onboarding)
}

pub fn is_stable(&self) -> bool {
Expand All @@ -131,14 +130,27 @@ impl ParaLifecycle {
}

pub fn is_outgoing(&self) -> bool {
matches!(self, ParaLifecycle::OutgoingParathread | ParaLifecycle::OutgoingParachain)
matches!(self, ParaLifecycle::Offboarding)
}

pub fn is_transitioning(&self) -> bool {
!Self::is_stable(self)
}
}

/// The possible actions that can be applied to a para.
#[derive(PartialEq, Eq, Clone, Encode, Decode, RuntimeDebug)]
pub enum ParaAction {
/// New parathread needs to be onboarded.
Onboard,
/// Parathread needs to be upgraded to parachain.
Upgrade,
/// Parachain needs to be downgraded to parathread.
Downgrade,
/// Parathread needs to be offboarded.
Offboard,
}

impl<N: Ord + Copy> ParaPastCodeMeta<N> {
// note a replacement has occurred at a given block number.
fn note_replacement(&mut self, expected_at: N, activated_at: N) {
Expand Down Expand Up @@ -265,6 +277,12 @@ decl_storage! {
UpcomingUpgrades: Vec<ParaId>;
/// Existing Parachains that should downgrade to be a Parathread. Ordered ascending by ParaId.
UpcomingDowngrades: Vec<ParaId>;

/// The queue of actions to execute on a particular session index.
ActionsQueue: map hasher(twox_64_concat) SessionIndex => Vec<(ParaId, ParaAction)>;

/// The current session index.
CurrentSessionIndex get(fn session_index): SessionIndex;
}
add_extra_genesis {
config(paras): Vec<(ParaId, ParaGenesisArgs)>;
Expand Down Expand Up @@ -319,8 +337,9 @@ impl<T: Config> Module<T> {
pub(crate) fn initializer_finalize() { }

/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(_notification: &SessionChangeNotification<T::BlockNumber>) {
pub(crate) fn initializer_on_new_session(notification: &SessionChangeNotification<T::BlockNumber>) {
let now = <frame_system::Module<T>>::block_number();
CurrentSessionIndex::set(notification.session_index);
let mut parachains = Self::clean_up_outgoing(now);
Self::apply_incoming(&mut parachains);
Self::apply_upgrades(&mut parachains);
Expand Down Expand Up @@ -362,42 +381,102 @@ impl<T: Config> Module<T> {
parachains
}

// Apply all actions queued for the given session index.
//
// This function checks that the lifecycle of each parachain matches the expected state transition.
// If the lifecycle is incorrect, this will apply no changes to that para, except for outgoing
// paras, where we will be defensive and offboard them no matter their state.
fn apply_actions_queue(session: SessionIndex) {
let actions = ActionsQueue::take(session);
let mut parachains = <Self as Store>::Parachains::get();
let now = <frame_system::Module<T>>::block_number();

for (para, action) in actions {
match action {
// Onboard a new parathread
ParaAction::Onboard => {
ParaLifecycles::mutate(&para, |state| {
if *state == Some(ParaLifecycle::Onboarding) {
if let Some(genesis_data) = <Self as Store>::UpcomingParasGenesis::take(&para) {
*state = Some(ParaLifecycle::Parathread);
<Self as Store>::Heads::insert(&para, genesis_data.genesis_head);
<Self as Store>::CurrentCode::insert(&para, genesis_data.validation_code);
}
}
});
},
// Upgrade a parathread to a parachain
ParaAction::Upgrade => {
ParaLifecycles::mutate(&para, |state| {
if *state == Some(ParaLifecycle::UpgradingToParachain) {
if let Err(i) = parachains.binary_search(&para) {
parachains.insert(i, para);
}
*state = Some(ParaLifecycle::Parachain);
}
});
},
// Downgrade a parachain to a parathread
ParaAction::Downgrade => {
ParaLifecycles::mutate(&para, |state| {
if *state == Some(ParaLifecycle::DowngradingToParathread) {
if let Ok(i) = parachains.binary_search(&para) {
parachains.remove(i);
}
*state = Some(ParaLifecycle::Parathread);
}
});
},
// Offboard a parathread from the system
ParaAction::Offboard => {
// Warn if there is a state error... but still perform the offboarding to be defensive.
if let Some(state) = ParaLifecycles::get(&para) {
if !state.is_outgoing() {
frame_support::debug::error!(
target: "parachains",
"Outgoing parachain has wrong lifecycle state."
)
}
};

if let Ok(i) = parachains.binary_search(&para) {
parachains.remove(i);
}

<Self as Store>::Heads::remove(&para);
<Self as Store>::FutureCodeUpgrades::remove(&para);
<Self as Store>::FutureCode::remove(&para);
ParaLifecycles::remove(&para);

let removed_code = <Self as Store>::CurrentCode::take(&para);
if let Some(removed_code) = removed_code {
Self::note_past_code(para, now, now, removed_code);
}
},
}
}

// Place the new parachains set in storage.
<Self as Store>::Parachains::put(parachains);
}

/// Applies all incoming paras, updating the parachains list for those that are parachains.
fn apply_incoming(parachains: &mut Vec<ParaId>) {
let upcoming = <Self as Store>::UpcomingParas::take();
for upcoming_para in upcoming {
let state = match ParaLifecycles::get(&upcoming_para) {
Some(ParaLifecycle::OnboardingAsParachain) => ParaLifecycle::OnboardingAsParachain,
Some(ParaLifecycle::OnboardingAsParathread) => ParaLifecycle::OnboardingAsParathread,
_ => continue,
if ParaLifecycles::get(&upcoming_para) != Some(ParaLifecycle::Onboarding) {
continue;
};

let genesis_data = match <Self as Store>::UpcomingParasGenesis::take(&upcoming_para) {
None => continue,
Some(g) => g,
};

let mut onboarded = false;

if genesis_data.parachain {
if let Err(i) = parachains.binary_search(&upcoming_para) {
if state == ParaLifecycle::OnboardingAsParachain {
parachains.insert(i, upcoming_para);
ParaLifecycles::insert(&upcoming_para, ParaLifecycle::Parachain);
onboarded = true;
}
}
} else {
if state == ParaLifecycle::OnboardingAsParathread {
ParaLifecycles::insert(&upcoming_para, ParaLifecycle::Parathread);
onboarded = true
}
}
ParaLifecycles::insert(&upcoming_para, ParaLifecycle::Parathread);

if onboarded {
<Self as Store>::Heads::insert(&upcoming_para, genesis_data.genesis_head);
<Self as Store>::CurrentCode::insert(&upcoming_para, genesis_data.validation_code);
}
<Self as Store>::Heads::insert(&upcoming_para, genesis_data.genesis_head);
<Self as Store>::CurrentCode::insert(&upcoming_para, genesis_data.validation_code);
}
}

Expand Down Expand Up @@ -536,11 +615,8 @@ impl<T: Config> Module<T> {
}
weight = weight.saturating_add(T::DbWeight::get().reads_writes(1, 1));

if genesis.parachain {
ParaLifecycles::insert(&id, ParaLifecycle::OnboardingAsParachain);
} else {
ParaLifecycles::insert(&id, ParaLifecycle::OnboardingAsParathread);
}

ParaLifecycles::insert(&id, ParaLifecycle::Onboarding);
UpcomingParasGenesis::insert(&id, &genesis);
weight = weight.saturating_add(T::DbWeight::get().writes(2));

Expand All @@ -552,8 +628,7 @@ impl<T: Config> Module<T> {
/// Noop if para is already outgoing or not known.
pub(crate) fn schedule_para_cleanup(id: ParaId) -> Weight {
match ParaLifecycles::get(&id) {
Some(ParaLifecycle::OnboardingAsParathread) |
Some(ParaLifecycle::OnboardingAsParachain) => {
Some(ParaLifecycle::Onboarding) => {
UpcomingParas::mutate(|v| {
match v.binary_search(&id) {
Ok(i) => {
Expand All @@ -567,8 +642,9 @@ impl<T: Config> Module<T> {
}
})
},
Some(ParaLifecycle::Parathread) => {
ParaLifecycles::insert(&id, ParaLifecycle::OutgoingParathread);
Some(ParaLifecycle::Parathread) |
Some(ParaLifecycle::Parachain) => {
ParaLifecycles::insert(&id, ParaLifecycle::Offboarding);
OutgoingParas::mutate(|v| {
match v.binary_search(&id) {
Ok(_) => T::DbWeight::get().reads_writes(1, 1),
Expand All @@ -580,18 +656,6 @@ impl<T: Config> Module<T> {
})

},
Some(ParaLifecycle::Parachain) => {
OutgoingParas::mutate(|v| {
match v.binary_search(&id) {
Ok(_) => T::DbWeight::get().reads_writes(1, 0),
Err(i) => {
v.insert(i, id);
ParaLifecycles::insert(&id, ParaLifecycle::OutgoingParachain);
T::DbWeight::get().reads_writes(1, 2)
}
}
})
},
Some(ParaLifecycle::UpgradingToParachain) => {
let upgrade_weight = UpcomingUpgrades::mutate(|v| {
match v.binary_search(&id) {
Expand All @@ -607,7 +671,7 @@ impl<T: Config> Module<T> {
Ok(_) => T::DbWeight::get().reads_writes(1, 0),
Err(i) => {
v.insert(i, id);
ParaLifecycles::insert(&id, ParaLifecycle::OutgoingParathread);
ParaLifecycles::insert(&id, ParaLifecycle::Offboarding);
T::DbWeight::get().reads_writes(1, 2)
}
}
Expand All @@ -629,17 +693,14 @@ impl<T: Config> Module<T> {
Ok(_) => T::DbWeight::get().reads_writes(1, 0),
Err(i) => {
v.insert(i, id);
ParaLifecycles::insert(&id, ParaLifecycle::OutgoingParathread);
ParaLifecycles::insert(&id, ParaLifecycle::Offboarding);
T::DbWeight::get().reads_writes(1, 2)
}
}
});
downgrade_weight.saturating_add(outgoing_weight)
},
None |
Some(ParaLifecycle::OutgoingParathread) |
Some(ParaLifecycle::OutgoingParachain)
=> { T::DbWeight::get().reads(1) },
None | Some(ParaLifecycle::Offboarding) => { T::DbWeight::get().reads(1) },
}
}

Expand Down Expand Up @@ -1408,9 +1469,9 @@ mod tests {
assert_eq!(<Paras as Store>::UpcomingParas::get(), vec![c, b, a]);

// Lifecycle is tracked correctly
assert_eq!(ParaLifecycles::get(&a), Some(ParaLifecycle::OnboardingAsParathread));
assert_eq!(ParaLifecycles::get(&b), Some(ParaLifecycle::OnboardingAsParachain));
assert_eq!(ParaLifecycles::get(&c), Some(ParaLifecycle::OnboardingAsParachain));
assert_eq!(ParaLifecycles::get(&a), Some(ParaLifecycle::Onboarding));
assert_eq!(ParaLifecycles::get(&b), Some(ParaLifecycle::Onboarding));
assert_eq!(ParaLifecycles::get(&c), Some(ParaLifecycle::Onboarding));

// run to block without session change.
run_to_block(2, None);
Expand All @@ -1419,9 +1480,9 @@ mod tests {
assert_eq!(<Paras as Store>::UpcomingParas::get(), vec![c, b, a]);

// Lifecycle is tracked correctly
assert_eq!(ParaLifecycles::get(&a), Some(ParaLifecycle::OnboardingAsParathread));
assert_eq!(ParaLifecycles::get(&b), Some(ParaLifecycle::OnboardingAsParachain));
assert_eq!(ParaLifecycles::get(&c), Some(ParaLifecycle::OnboardingAsParachain));
assert_eq!(ParaLifecycles::get(&a), Some(ParaLifecycle::Onboarding));
assert_eq!(ParaLifecycles::get(&b), Some(ParaLifecycle::Onboarding));
assert_eq!(ParaLifecycles::get(&c), Some(ParaLifecycle::Onboarding));


run_to_block(3, Some(vec![3]));
Expand All @@ -1431,8 +1492,8 @@ mod tests {

// Lifecycle is tracked correctly
assert_eq!(ParaLifecycles::get(&a), Some(ParaLifecycle::Parathread));
assert_eq!(ParaLifecycles::get(&b), Some(ParaLifecycle::Parachain));
assert_eq!(ParaLifecycles::get(&c), Some(ParaLifecycle::Parachain));
assert_eq!(ParaLifecycles::get(&b), Some(ParaLifecycle::Parathread));
assert_eq!(ParaLifecycles::get(&c), Some(ParaLifecycle::Parathread));

assert_eq!(Paras::current_code(&a), Some(vec![2].into()));
assert_eq!(Paras::current_code(&b), Some(vec![1].into()));
Expand Down Expand Up @@ -1479,9 +1540,9 @@ mod tests {
assert_eq!(<Paras as Store>::UpcomingParas::get(), vec![c, b, a]);

// Lifecycle is tracked correctly
assert_eq!(ParaLifecycles::get(&a), Some(ParaLifecycle::OnboardingAsParathread));
assert_eq!(ParaLifecycles::get(&b), Some(ParaLifecycle::OnboardingAsParachain));
assert_eq!(ParaLifecycles::get(&c), Some(ParaLifecycle::OnboardingAsParachain));
assert_eq!(ParaLifecycles::get(&a), Some(ParaLifecycle::Onboarding));
assert_eq!(ParaLifecycles::get(&b), Some(ParaLifecycle::Onboarding));
assert_eq!(ParaLifecycles::get(&c), Some(ParaLifecycle::Onboarding));


// run to block without session change.
Expand All @@ -1491,9 +1552,9 @@ mod tests {
assert_eq!(<Paras as Store>::UpcomingParas::get(), vec![c, b, a]);

// Lifecycle is tracked correctly
assert_eq!(ParaLifecycles::get(&a), Some(ParaLifecycle::OnboardingAsParathread));
assert_eq!(ParaLifecycles::get(&b), Some(ParaLifecycle::OnboardingAsParachain));
assert_eq!(ParaLifecycles::get(&c), Some(ParaLifecycle::OnboardingAsParachain));
assert_eq!(ParaLifecycles::get(&a), Some(ParaLifecycle::Onboarding));
assert_eq!(ParaLifecycles::get(&b), Some(ParaLifecycle::Onboarding));
assert_eq!(ParaLifecycles::get(&c), Some(ParaLifecycle::Onboarding));

Paras::schedule_para_cleanup(c);

Expand All @@ -1506,7 +1567,7 @@ mod tests {

// Lifecycle is tracked correctly
assert_eq!(ParaLifecycles::get(&a), Some(ParaLifecycle::Parathread));
assert_eq!(ParaLifecycles::get(&b), Some(ParaLifecycle::Parachain));
assert_eq!(ParaLifecycles::get(&b), Some(ParaLifecycle::Parathread));
assert_eq!(ParaLifecycles::get(&c), None);

assert_eq!(Paras::current_code(&a), Some(vec![2].into()));
Expand Down

0 comments on commit b2e9011

Please sign in to comment.