Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Fix SlotMeta is_connected tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
Steven Czabaniuk committed Jan 14, 2023
1 parent 2b88401 commit 04d19ab
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 62 deletions.
242 changes: 183 additions & 59 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3325,6 +3325,54 @@ impl Blockstore {
);
Ok(())
}

/// Mark a root `slot` as connected, traverse `slot`'s children and update
/// the children's connected status if appropriate.
///
/// A ledger with a full path of blocks from genesis to the latest root will
/// have all of the rooted blocks marked as connected such that new blocks
/// could also be connected. However, starting from some root (such as from
/// a snapshot) is a valid way to join a cluster. For this case, mark this
/// root as connected such that the node that joined midway through can
/// have their slots considered connected.
// steviez
pub fn set_and_chain_connected_on_root_and_next_slots(&self, root: Slot) -> Result<()> {
let mut root_meta = self
.meta(root)?
.unwrap_or_else(|| SlotMeta::new(root, None));
// If the slot was already connected, there is nothing to do as this slot's
// children are also assumed to be appropriately connected
if root_meta.is_connected() {
return Ok(());
}
info!(
"Marking slot {} and any full children slots as connected",
root
);

// Mark both bits as connected for snapshots slots so that these slots
// behavior more similarly to typical slots that become connected.
root_meta.set_parent_connected();
root_meta.set_connected();
let mut write_batch = self.db.batch()?;
write_batch.put::<cf::SlotMeta>(root_meta.slot, &root_meta)?;

let mut next_slots = VecDeque::from(root_meta.next_slots);
while !next_slots.is_empty() {
let slot = next_slots.pop_front().unwrap();
let mut meta = self.meta(slot)?.unwrap_or_else(|| {
panic!("Slot {} is a child but has no SlotMeta in blockstore", slot)
});

if meta.set_parent_connected() {
next_slots.extend(meta.next_slots.iter());
}
write_batch.put::<cf::SlotMeta>(meta.slot, &meta)?;
}

self.db.write(write_batch)?;
Ok(())
}
}

// Update the `completed_data_indexes` with a new shred `new_shred_index`. If a
Expand Down Expand Up @@ -3702,7 +3750,6 @@ fn handle_chaining_for_slot(

let meta = &slot_meta_entry.new_slot_meta;
let meta_backup = &slot_meta_entry.old_slot_meta;

{
let mut meta_mut = meta.borrow_mut();
let was_orphan_slot = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap());
Expand Down Expand Up @@ -3739,44 +3786,32 @@ fn handle_chaining_for_slot(
}
}

// If this is a newly inserted slot, then we know the children of this slot were not previously
// connected to the trunk of the ledger. Thus if slot.is_connected is now true, we need to
// update all child slots with `is_connected` = true because these children are also now newly
// connected to trunk of the ledger
// If this is a newly completed slot and the parent is connected, then the
// slot is now connected. Mark the slot as connected, and then traverse the
// children to update their parent_connected and connected status.
let should_propagate_is_connected =
is_newly_completed_slot(&RefCell::borrow(meta), meta_backup)
&& RefCell::borrow(meta).is_connected();
&& RefCell::borrow(meta).is_parent_connected();

if should_propagate_is_connected {
// slot_function returns a boolean indicating whether to explore the children
// of the input slot
let slot_function = |slot: &mut SlotMeta| {
slot.set_connected();

// We don't want to set the is_connected flag on the children of non-full
// slots
slot.is_full()
};

meta.borrow_mut().set_connected();
traverse_children_mut(
db,
slot,
meta,
working_set,
new_chained_slots,
slot_function,
SlotMeta::set_parent_connected,
)?;
}

Ok(())
}

/// Traverse all the direct and indirect children slots and apply the specified
/// `slot_function`.
/// Traverse all the children (direct and indirect) of `slot_meta`, and apply
/// `slot_function` to each of the children (but not `slot_meta`).
///
/// Arguments:
/// `db`: the blockstore db that stores shreds and their metadata.
/// `slot`: starting slot to traverse.
/// `slot_meta`: the SlotMeta of the above `slot`.
/// `working_set`: a slot-id to SlotMetaWorkingSetEntry map which is used
/// to traverse the graph.
Expand All @@ -3787,7 +3822,6 @@ fn handle_chaining_for_slot(
/// a given slot.
fn traverse_children_mut<F>(
db: &Database,
slot: Slot,
slot_meta: &Rc<RefCell<SlotMeta>>,
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
passed_visisted_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
Expand All @@ -3796,25 +3830,18 @@ fn traverse_children_mut<F>(
where
F: Fn(&mut SlotMeta) -> bool,
{
let mut next_slots: VecDeque<(u64, Rc<RefCell<SlotMeta>>)> =
vec![(slot, slot_meta.clone())].into();
let slot_meta = slot_meta.borrow();
let mut next_slots: VecDeque<u64> = slot_meta.next_slots.to_vec().into();
while !next_slots.is_empty() {
let (_, current_slot) = next_slots.pop_front().unwrap();
// Check whether we should explore the children of this slot
if slot_function(&mut current_slot.borrow_mut()) {
let current_slot = &RefCell::borrow(&*current_slot);
for next_slot_index in current_slot.next_slots.iter() {
let next_slot = find_slot_meta_else_create(
db,
working_set,
passed_visisted_slots,
*next_slot_index,
)?;
next_slots.push_back((*next_slot_index, next_slot));
}
let slot = next_slots.pop_front().unwrap();
let meta_ref = find_slot_meta_else_create(db, working_set, passed_visisted_slots, slot)?;
let mut meta = meta_ref.borrow_mut();
if slot_function(&mut meta) {
meta.next_slots
.iter()
.for_each(|slot| next_slots.push_back(*slot));
}
}

Ok(())
}

Expand All @@ -3825,15 +3852,14 @@ fn is_orphan(meta: &SlotMeta) -> bool {
}

// 1) Chain current_slot to the previous slot defined by prev_slot_meta
// 2) Determine whether to set the is_connected flag
fn chain_new_slot_to_prev_slot(
prev_slot_meta: &mut SlotMeta,
current_slot: Slot,
current_slot_meta: &mut SlotMeta,
) {
prev_slot_meta.next_slots.push(current_slot);
if prev_slot_meta.is_connected() && prev_slot_meta.is_full() {
current_slot_meta.set_connected();
if prev_slot_meta.is_connected() {
current_slot_meta.set_parent_connected();
}
}

Expand All @@ -3847,8 +3873,8 @@ fn slot_has_updates(slot_meta: &SlotMeta, slot_meta_backup: &Option<SlotMeta>) -
// We should signal that there are updates if we extended the chain of consecutive blocks starting
// from block 0, which is true iff:
// 1) The block with index prev_block_index is itself part of the trunk of consecutive blocks
// starting from block 0,
slot_meta.is_connected() &&
// starting from block 0 or from a snapshot, which is rooted by definition
slot_meta.is_parent_connected() &&
// AND either:
// 1) The slot didn't exist in the database before, and now we have a consecutive
// block for that slot
Expand Down Expand Up @@ -4415,7 +4441,7 @@ pub mod tests {
solana_transaction_status::{
InnerInstruction, InnerInstructions, Reward, Rewards, TransactionTokenBalance,
},
std::{thread::Builder, time::Duration},
std::{cmp::Ordering, thread::Builder, time::Duration},
};

// used for tests only
Expand Down Expand Up @@ -5349,7 +5375,7 @@ pub mod tests {
blockstore.insert_shreds(shreds1, None, false).unwrap();
let meta1 = blockstore.meta(1).unwrap().unwrap();
assert!(meta1.next_slots.is_empty());
// Slot 1 is not trunk because slot 0 hasn't been inserted yet
// Slot 1 is not connected because slot 0 hasn't been inserted yet
assert!(!meta1.is_connected());
assert_eq!(meta1.parent_slot, Some(0));
assert_eq!(meta1.last_index, Some(shreds_per_slot as u64 - 1));
Expand All @@ -5361,13 +5387,13 @@ pub mod tests {
blockstore.insert_shreds(shreds2, None, false).unwrap();
let meta2 = blockstore.meta(2).unwrap().unwrap();
assert!(meta2.next_slots.is_empty());
// Slot 2 is not trunk because slot 0 hasn't been inserted yet
// Slot 2 is not connected because slot 0 hasn't been inserted yet
assert!(!meta2.is_connected());
assert_eq!(meta2.parent_slot, Some(1));
assert_eq!(meta2.last_index, Some(shreds_per_slot as u64 - 1));

// Check the first slot again, it should chain to the second slot,
// but still isn't part of the trunk
// but still isn't connected.
let meta1 = blockstore.meta(1).unwrap().unwrap();
assert_eq!(meta1.next_slots, vec![2]);
assert!(!meta1.is_connected());
Expand Down Expand Up @@ -5427,8 +5453,10 @@ pub mod tests {
assert_eq!(meta.parent_slot, Some(slot - 1));
}

// Slot 0 is the only connected slot
assert!(!meta.is_connected() || meta.slot == 0);
// None of the slot should be connected, but since slot 0 is
// the special case, it will have parent_connected as true.
assert!(!meta.is_connected());
assert!(!meta.is_parent_connected() || slot == 0);
}

// Write the even slot shreds that we did not earlier
Expand Down Expand Up @@ -5458,6 +5486,7 @@ pub mod tests {
#[test]
#[allow(clippy::cognitive_complexity)]
pub fn test_forward_chaining_is_connected() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

Expand Down Expand Up @@ -5494,14 +5523,14 @@ pub mod tests {
}

// Ensure that each slot has their parent correct
// Additionally, slot 0 should be the only connected slot
if slot == 0 {
assert_eq!(meta.parent_slot, Some(0));
assert!(meta.is_connected());
} else {
assert_eq!(meta.parent_slot, Some(slot - 1));
assert!(!meta.is_connected());
}
// No slots should be connected yet, not even slot 0
// as slot 0 is still not full yet
assert!(!meta.is_connected());

assert_eq!(meta.last_index, Some(shreds_per_slot as u64 - 1));
}
Expand All @@ -5515,28 +5544,123 @@ pub mod tests {

for slot in 0..num_slots {
let meta = blockstore.meta(slot).unwrap().unwrap();

if slot != num_slots - 1 {
assert_eq!(meta.next_slots, vec![slot + 1]);
} else {
assert!(meta.next_slots.is_empty());
}
if slot <= slot_index + 3 {

if slot < slot_index + 3 {
assert!(meta.is_full());
assert!(meta.is_connected());
} else {
assert!(!meta.is_connected());
}

if slot == 0 {
assert_eq!(meta.parent_slot, Some(0));
} else {
assert_eq!(meta.parent_slot, Some(slot - 1));
}

assert_eq!(meta.last_index, Some(shreds_per_slot as u64 - 1));
}
}
}
}

#[test]
fn test_set_and_chain_connected_on_root_and_next_slots() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

// Create enough entries to ensure 5 shreds result
let entries_per_slot = max_ticks_per_n_shreds(5, None);

let mut start_slot = 5;
// Start a chain from a slot not in blockstore, this is the case when
// node starts with no blockstore and downloads a snapshot. In this
// scenario, the slot will be marked connected despite its' parent not
// being connected (or existing) and not being full.
blockstore
.set_and_chain_connected_on_root_and_next_slots(start_slot)
.unwrap();
let slot_meta5 = blockstore.meta(start_slot).unwrap().unwrap();
assert!(!slot_meta5.is_full());
assert!(slot_meta5.is_parent_connected());
assert!(slot_meta5.is_connected());

let num_slots = 5;
// Insert some new slots and ensure they connect to the root correctly
start_slot += 1;
let (shreds, _) = make_many_slot_entries(start_slot, num_slots, entries_per_slot);
blockstore.insert_shreds(shreds, None, false).unwrap();
for slot in start_slot..start_slot + num_slots {
info!("Evaluating slot {}", slot);
let meta = blockstore.meta(slot).unwrap().unwrap();
assert!(meta.is_parent_connected());
assert!(meta.is_connected());
}

// Chain connected on slots that are already connected, should just noop
blockstore
.set_and_chain_connected_on_root_and_next_slots(start_slot)
.unwrap();
for slot in start_slot..start_slot + num_slots {
let meta = blockstore.meta(slot).unwrap().unwrap();
assert!(meta.is_parent_connected());
assert!(meta.is_connected());
}

// Start another chain that is disconnected from previous chain. But, insert
// a non-full slot and ensure this slot (and its' children) are not marked
// as connected.
start_slot += 2 * num_slots;
let (shreds, _) = make_many_slot_entries(start_slot, num_slots, entries_per_slot);
// Insert all shreds except for the shreds with index > 0 from non_full_slot
let non_full_slot = start_slot + 2;
let (shreds, missing_shreds) = shreds
.into_iter()
.partition(|shred| shred.slot() != non_full_slot || shred.index() == 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
// Chain method hasn't been called yet, so none of these connected yet
for slot in start_slot..start_slot + num_slots {
let meta = blockstore.meta(slot).unwrap().unwrap();
assert!(!meta.is_parent_connected());
assert!(!meta.is_connected());
}
// Now chain from the new starting point
blockstore
.set_and_chain_connected_on_root_and_next_slots(start_slot)
.unwrap();
for slot in start_slot..start_slot + num_slots {
let meta = blockstore.meta(slot).unwrap().unwrap();
match slot.cmp(&non_full_slot) {
Ordering::Less => {
// These are fully connected as expected
assert!(meta.is_parent_connected());
assert!(meta.is_connected());
}
Ordering::Equal => {
// Parent will be connected, but this slot not connected itself
assert!(meta.is_parent_connected());
assert!(!meta.is_connected());
}
Ordering::Greater => {
// All children are not connected either
assert!(!meta.is_parent_connected());
assert!(!meta.is_connected());
}
}
}

// Insert the missing shreds and ensure all slots connected now
blockstore
.insert_shreds(missing_shreds, None, false)
.unwrap();
for slot in start_slot..start_slot + num_slots {
let meta = blockstore.meta(slot).unwrap().unwrap();
assert!(meta.is_parent_connected());
assert!(meta.is_connected());
}
}

/*
#[test]
pub fn test_chaining_tree() {
Expand Down
Loading

0 comments on commit 04d19ab

Please sign in to comment.