Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

v1.17: replay: reload tower if set-identity during startup (backport of #35173) #35256

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,28 @@ impl Tower {
}
}

#[cfg(test)]
pub fn new_random(node_pubkey: Pubkey) -> Self {
use rand::Rng;

let mut rng = rand::thread_rng();
let root_slot = rng.gen();
let vote_state = VoteState::new_rand_for_tests(node_pubkey, root_slot);
let last_vote = VoteStateUpdate::from(
vote_state
.votes
.iter()
.map(|lv| (lv.slot(), lv.confirmation_count()))
.collect::<Vec<_>>(),
);
Self {
node_pubkey,
vote_state,
last_vote: VoteTransaction::CompactVoteStateUpdate(last_vote),
..Tower::default()
}
}

pub fn new_from_bankforks(
bank_forks: &BankForks,
node_pubkey: &Pubkey,
Expand Down
126 changes: 101 additions & 25 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,21 @@ impl ReplayStage {
let _exit = Finalizer::new(exit.clone());
let mut identity_keypair = cluster_info.keypair().clone();
let mut my_pubkey = identity_keypair.pubkey();
if my_pubkey != tower.node_pubkey {
// set-identity was called during the startup procedure, ensure the tower is consistent
// before starting the loop. further calls to set-identity will reload the tower in the loop
let my_old_pubkey = tower.node_pubkey;
tower = Self::load_tower(
tower_storage.as_ref(),
&my_pubkey,
&vote_account,
&bank_forks,
);
warn!(
"Identity changed during startup from {} to {}",
my_old_pubkey, my_pubkey
);
}
let (mut progress, mut heaviest_subtree_fork_choice) =
Self::initialize_progress_and_fork_choice_with_locked_bank_forks(
&bank_forks,
Expand Down Expand Up @@ -947,28 +962,12 @@ impl ReplayStage {
my_pubkey = identity_keypair.pubkey();

// Load the new identity's tower
tower = Tower::restore(tower_storage.as_ref(), &my_pubkey)
.and_then(|restored_tower| {
let root_bank = bank_forks.read().unwrap().root_bank();
let slot_history = root_bank.get_slot_history();
restored_tower.adjust_lockouts_after_replay(
root_bank.slot(),
&slot_history,
)
})
.unwrap_or_else(|err| {
if err.is_file_missing() {
Tower::new_from_bankforks(
&bank_forks.read().unwrap(),
&my_pubkey,
&vote_account,
)
} else {
error!("Failed to load tower for {}: {}", my_pubkey, err);
std::process::exit(1);
}
});

tower = Self::load_tower(
tower_storage.as_ref(),
&my_pubkey,
&vote_account,
&bank_forks,
);
// Ensure the validator can land votes with the new identity before
// becoming leader
has_new_vote_been_rooted = !wait_for_vote_to_start_leader;
Expand Down Expand Up @@ -1118,6 +1117,32 @@ impl ReplayStage {
})
}

fn load_tower(
tower_storage: &dyn TowerStorage,
node_pubkey: &Pubkey,
vote_account: &Pubkey,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Tower {
Tower::restore(tower_storage, node_pubkey)
.and_then(|restored_tower| {
let root_bank = bank_forks.read().unwrap().root_bank();
let slot_history = root_bank.get_slot_history();
restored_tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history)
})
.unwrap_or_else(|err| {
if err.is_file_missing() {
Tower::new_from_bankforks(
&bank_forks.read().unwrap(),
node_pubkey,
vote_account,
)
} else {
error!("Failed to load tower for {}: {}", node_pubkey, err);
std::process::exit(1);
}
})
}

fn check_for_vote_only_mode(
heaviest_bank_slot: Slot,
forks_root: Slot,
Expand Down Expand Up @@ -4086,9 +4111,9 @@ pub(crate) mod tests {
crate::{
consensus::{
progress_map::{ValidatorStakeInfo, RETRANSMIT_BASE_DELAY_MS},
tower_storage::NullTowerStorage,
tower_storage::{FileTowerStorage, NullTowerStorage},
tree_diff::TreeDiff,
Tower,
Tower, VOTE_THRESHOLD_DEPTH,
},
replay_stage::ReplayStage,
vote_simulator::{self, VoteSimulator},
Expand All @@ -4110,7 +4135,7 @@ pub(crate) mod tests {
},
solana_runtime::{
accounts_background_service::AbsRequestSender,
commitment::BlockCommitment,
commitment::{BlockCommitment, VOTE_THRESHOLD_SIZE},
genesis_utils::{GenesisConfigInfo, ValidatorVoteKeypairs},
},
solana_sdk::{
Expand All @@ -4134,6 +4159,7 @@ pub(crate) mod tests {
iter,
sync::{atomic::AtomicU64, Arc, RwLock},
},
tempfile::tempdir,
trees::{tr, Tree},
};

Expand Down Expand Up @@ -8327,4 +8353,54 @@ pub(crate) mod tests {
assert_eq!(reset_fork, Some(4));
assert_eq!(failures, vec![HeaviestForkFailures::LockedOut(4),]);
}

#[test]
fn test_tower_load_missing() {
let tower_file = tempdir().unwrap().into_path();
let tower_storage = FileTowerStorage::new(tower_file);
let node_pubkey = Pubkey::new_unique();
let vote_account = Pubkey::new_unique();
let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6)))));
let generate_votes = |pubkeys: Vec<Pubkey>| {
pubkeys
.into_iter()
.zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2)))
.collect()
};
let (vote_simulator, _blockstore) =
setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes)));
let bank_forks = vote_simulator.bank_forks;

let tower =
ReplayStage::load_tower(&tower_storage, &node_pubkey, &vote_account, &bank_forks);
let expected_tower = Tower::new_for_tests(VOTE_THRESHOLD_DEPTH, VOTE_THRESHOLD_SIZE);
assert_eq!(tower.vote_state, expected_tower.vote_state);
assert_eq!(tower.node_pubkey, node_pubkey);
}

#[test]
fn test_tower_load() {
let tower_file = tempdir().unwrap().into_path();
let tower_storage = FileTowerStorage::new(tower_file);
let node_keypair = Keypair::new();
let node_pubkey = node_keypair.pubkey();
let vote_account = Pubkey::new_unique();
let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6)))));
let generate_votes = |pubkeys: Vec<Pubkey>| {
pubkeys
.into_iter()
.zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2)))
.collect()
};
let (vote_simulator, _blockstore) =
setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes)));
let bank_forks = vote_simulator.bank_forks;
let expected_tower = Tower::new_random(node_pubkey);
expected_tower.save(&tower_storage, &node_keypair).unwrap();

let tower =
ReplayStage::load_tower(&tower_storage, &node_pubkey, &vote_account, &bank_forks);
assert_eq!(tower.vote_state, expected_tower.vote_state);
assert_eq!(tower.node_pubkey, expected_tower.node_pubkey);
}
}
18 changes: 18 additions & 0 deletions sdk/program/src/vote/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,24 @@ impl VoteState {
}
}

pub fn new_rand_for_tests(node_pubkey: Pubkey, root_slot: Slot) -> Self {
let votes = (1..32)
.map(|x| LandedVote {
latency: 0,
lockout: Lockout::new_with_confirmation_count(
u64::from(x).saturating_add(root_slot),
32_u32.saturating_sub(x),
),
})
.collect();
Self {
node_pubkey,
root_slot: Some(root_slot),
votes,
..VoteState::default()
}
}

pub fn get_authorized_voter(&self, epoch: Epoch) -> Option<Pubkey> {
self.authorized_voters.get_authorized_voter(epoch)
}
Expand Down
Loading