Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Fix SlotMeta is_connected tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
Steven Czabaniuk committed Sep 29, 2022
1 parent 264ca90 commit 7fd50bc
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 37 deletions.
114 changes: 82 additions & 32 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3323,6 +3323,47 @@ impl Blockstore {
);
Ok(())
}

/// Mark a root `slot` as connected, traverse `slot`'s children and update
/// the children's connected status if appropriate.
///
/// A ledger with a full path of blocks from genesis to latest root would
/// have all of the rooted blocks marked as connected such that new blocks
/// could also be connected. However, starting from some root (such as from
/// a snapshot) is a valid way to join a cluster. For this case, mark this
/// root as connected such that the node that joined midway through can
/// have their slots considered connected.
pub fn set_and_chain_connected_on_root(&self, slot: Slot) -> Result<()> {
info!("Marking slot {} and any full children slots as connected", slot);

let mut meta = self.meta(slot)?.unwrap_or_else(|| SlotMeta::new(slot, None));
// If the slot was already connected, there is nothing to do as this slot's
// children are also assumed to be appropriately connected
let already_connected = meta.is_connected();
if already_connected {
return Ok(())
}

meta.set_connected(true);
let mut write_batch = self.db.batch()?;
write_batch.put::<cf::SlotMeta>(meta.slot, &meta)?;

let mut next_slots = VecDeque::from(meta.next_slots);
while !next_slots.is_empty() {
let current_slot = next_slots.pop_front().unwrap();
let mut current_meta = self.meta(current_slot)?.unwrap_or_else(||
panic!("Slot {} is a child but has no SlotMeta in blockstore", current_slot)
);

if update_child_from_connected_parent(&mut current_meta) {
next_slots.extend(current_meta.next_slots.iter());
}
write_batch.put::<cf::SlotMeta>(current_meta.slot, &current_meta)?;
}

self.db.write(write_batch)?;
Ok(())
}
}

// Update the `completed_data_indexes` with a new shred `new_shred_index`. If a
Expand Down Expand Up @@ -3737,32 +3778,22 @@ fn handle_chaining_for_slot(
}
}

// If this is a newly inserted slot, then we know the children of this slot were not previously
// If this is a newly completed slot, then we know the children of this slot were not previously
// connected to the trunk of the ledger. Thus if slot.is_connected is now true, we need to
// update all child slots with `is_connected` = true because these children are also now newly
// connected to trunk of the ledger
let should_propagate_is_connected =
is_newly_completed_slot(&RefCell::borrow(meta), meta_backup)
&& RefCell::borrow(meta).is_connected;
&& RefCell::borrow(meta).is_parent_connected();

if should_propagate_is_connected {
// slot_function returns a boolean indicating whether to explore the children
// of the input slot
let slot_function = |slot: &mut SlotMeta| {
slot.is_connected = true;

// We don't want to set the is_connected flag on the children of non-full
// slots
slot.is_full()
};

traverse_children_mut(
db,
slot,
meta,
working_set,
new_chained_slots,
slot_function,
update_child_from_connected_parent,
)?;
}

Expand Down Expand Up @@ -3815,21 +3846,35 @@ where
Ok(())
}


// Update the meta for a slot whose parent is known to be connected.
// Returns a bool indicating whether this slot is now connected which would
// indicate that it's children are also in need of update.
fn update_child_from_connected_parent(meta: &mut SlotMeta) -> bool {
meta.set_parent_connected(true);

// If this slot is full, this slot is now connected
let is_full = meta.is_full();
meta.set_connected(is_full);

// Traverse children if we are full
is_full
}

fn is_orphan(meta: &SlotMeta) -> bool {
// If we have no parent, then this is the head of a detached chain of
// slots
meta.parent_slot.is_none()
}

// 1) Chain current_slot to the previous slot defined by prev_slot_meta
// 2) Determine whether to set the is_connected flag
fn chain_new_slot_to_prev_slot(
prev_slot_meta: &mut SlotMeta,
current_slot: Slot,
current_slot_meta: &mut SlotMeta,
) {
prev_slot_meta.next_slots.push(current_slot);
current_slot_meta.is_connected = prev_slot_meta.is_connected && prev_slot_meta.is_full();
current_slot_meta.set_parent_connected(prev_slot_meta.is_connected());
}

fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option<SlotMeta>) -> bool {
Expand All @@ -3842,8 +3887,8 @@ fn slot_has_updates(slot_meta: &SlotMeta, slot_meta_backup: &Option<SlotMeta>) -
// We should signal that there are updates if we extended the chain of consecutive blocks starting
// from block 0, which is true iff:
// 1) The block with index prev_block_index is itself part of the trunk of consecutive blocks
// starting from block 0,
slot_meta.is_connected &&
// starting from block 0 or from a snapshot
slot_meta.is_parent_connected() &&
// AND either:
// 1) The slot didn't exist in the database before, and now we have a consecutive
// block for that slot
Expand Down Expand Up @@ -4832,7 +4877,7 @@ pub mod tests {
assert_eq!(meta.parent_slot, Some(0));
assert_eq!(meta.last_index, Some(num_shreds - 1));
assert!(meta.next_slots.is_empty());
assert!(meta.is_connected);
assert!(meta.is_connected());
}

#[test]
Expand Down Expand Up @@ -5355,7 +5400,7 @@ pub mod tests {
let meta1 = blockstore.meta(1).unwrap().unwrap();
assert!(meta1.next_slots.is_empty());
// Slot 1 is not trunk because slot 0 hasn't been inserted yet
assert!(!meta1.is_connected);
assert!(!meta1.is_connected());
assert_eq!(meta1.parent_slot, Some(0));
assert_eq!(meta1.last_index, Some(shreds_per_slot as u64 - 1));

Expand All @@ -5367,15 +5412,15 @@ pub mod tests {
let meta2 = blockstore.meta(2).unwrap().unwrap();
assert!(meta2.next_slots.is_empty());
// Slot 2 is not trunk because slot 0 hasn't been inserted yet
assert!(!meta2.is_connected);
assert!(!meta2.is_connected());
assert_eq!(meta2.parent_slot, Some(1));
assert_eq!(meta2.last_index, Some(shreds_per_slot as u64 - 1));

// Check the first slot again, it should chain to the second slot,
// but still isn't part of the trunk
let meta1 = blockstore.meta(1).unwrap().unwrap();
assert_eq!(meta1.next_slots, vec![2]);
assert!(!meta1.is_connected);
assert!(!meta1.is_connected());
assert_eq!(meta1.parent_slot, Some(0));
assert_eq!(meta1.last_index, Some(shreds_per_slot as u64 - 1));

Expand All @@ -5394,12 +5439,13 @@ pub mod tests {
assert_eq!(meta.parent_slot, Some(slot - 1));
}
assert_eq!(meta.last_index, Some(shreds_per_slot as u64 - 1));
assert!(meta.is_connected);
assert!(meta.is_connected());
}
}

#[test]
fn test_handle_chaining_missing_slots() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

Expand Down Expand Up @@ -5452,10 +5498,11 @@ pub mod tests {
assert_eq!(meta.parent_slot, Some(slot - 1));
}

assert!(!meta.is_connected());
if slot == 0 {
assert!(meta.is_connected);
assert!(meta.is_parent_connected());
} else {
assert!(!meta.is_connected);
assert!(!meta.is_parent_connected());
}
}

Expand All @@ -5480,13 +5527,14 @@ pub mod tests {
assert_eq!(meta.parent_slot, Some(slot - 1));
}
assert_eq!(meta.last_index, Some(shreds_per_slot as u64 - 1));
assert!(meta.is_connected);
assert!(meta.is_connected());
}
}

#[test]
#[allow(clippy::cognitive_complexity)]
pub fn test_forward_chaining_is_connected() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

Expand Down Expand Up @@ -5523,14 +5571,15 @@ pub mod tests {
}

// Ensure that each slot has their parent correct
// Additionally, slot 0 should be the only connected slot
if slot == 0 {
assert_eq!(meta.parent_slot, Some(0));
assert!(meta.is_connected);

} else {
assert_eq!(meta.parent_slot, Some(slot - 1));
assert!(!meta.is_connected);
}
// No slots should be connected yet, not even slot 0
// as slot 0 is still not full yet
assert!(!meta.is_connected());

assert_eq!(meta.last_index, Some(shreds_per_slot as u64 - 1));
}
Expand All @@ -5549,10 +5598,11 @@ pub mod tests {
} else {
assert!(meta.next_slots.is_empty());
}
if slot <= slot_index as u64 + 3 {
assert!(meta.is_connected);
if slot < slot_index as u64 + 3 {
assert!(meta.is_full());
assert!(meta.is_connected());
} else {
assert!(!meta.is_connected);
assert!(!meta.is_connected());
}

if slot == 0 {
Expand Down Expand Up @@ -5632,7 +5682,7 @@ pub mod tests {
let slot_meta = blockstore.meta(slot).unwrap().unwrap();
assert_eq!(slot_meta.consumed, entries_per_slot);
assert_eq!(slot_meta.received, entries_per_slot);
assert!(slot_meta.is_connected);
assert!(slot_meta.is_connected;
let slot_parent = {
if slot == 0 {
0
Expand Down
41 changes: 37 additions & 4 deletions ledger/src/blockstore_meta.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
crate::shred::{Shred, ShredType},
bitflags::bitflags,
serde::{Deserialize, Deserializer, Serialize, Serializer},
solana_sdk::{
clock::{Slot, UnixTimestamp},
Expand All @@ -11,6 +12,14 @@ use {
},
};

bitflags! {
#[derive(Default, Serialize, Deserialize)]
pub struct ConnectedFlags:u8 {
const CONNECTED = 0b0000_0001;
const PARENT_CONNECTED = 0b1000_0000;
}
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
// The Meta column family
pub struct SlotMeta {
Expand All @@ -37,9 +46,8 @@ pub struct SlotMeta {
// The list of slots, each of which contains a block that derives
// from this one.
pub next_slots: Vec<Slot>,
// True if this slot is full (consumed == last_index + 1) and if every
// slot that is a parent of this slot is also connected.
pub is_connected: bool,
// True if this slot's parent is connected
pub connected_flags: ConnectedFlags,
// Shreds indices which are marked data complete.
pub completed_data_indexes: BTreeSet<u32>,
}
Expand Down Expand Up @@ -214,6 +222,24 @@ impl SlotMeta {
Some(self.consumed) == self.last_index.map(|ix| ix + 1)
}

pub fn is_connected(&self) -> bool {
self.connected_flags.contains(ConnectedFlags::CONNECTED)
}

pub fn is_parent_connected(&self) -> bool {
self.connected_flags
.contains(ConnectedFlags::PARENT_CONNECTED)
}

pub fn set_connected(&mut self, value: bool) {
self.connected_flags.set(ConnectedFlags::CONNECTED, value);
}

pub fn set_parent_connected(&mut self, value: bool) {
self.connected_flags
.set(ConnectedFlags::PARENT_CONNECTED, value);
}

/// Dangerous. Currently only needed for a local-cluster test
pub fn unset_parent(&mut self) {
self.parent_slot = None;
Expand All @@ -225,10 +251,17 @@ impl SlotMeta {
}

pub(crate) fn new(slot: Slot, parent_slot: Option<Slot>) -> Self {
let connected_flags = if slot == 0 {
// Slot 0 is the start, mark it as having its' parent connected
// such that slot 0 becoming full will be updated as connected
ConnectedFlags::PARENT_CONNECTED
} else {
ConnectedFlags::default()
};
SlotMeta {
slot,
parent_slot,
is_connected: slot == 0,
connected_flags,
..SlotMeta::default()
}
}
Expand Down
6 changes: 5 additions & 1 deletion ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,14 +805,18 @@ pub fn process_blockstore_from_root(
info!("Processing ledger from slot {}...", start_slot);
let now = Instant::now();

// ensure start_slot is rooted for correct replay
// Ensure start_slot is rooted for correct replay; also ensure start_slot and
// qualifying children are marked as connected
if blockstore.is_primary_access() {
blockstore
.mark_slots_as_if_rooted_normally_at_startup(
vec![(bank.slot(), Some(bank.hash()))],
true,
)
.expect("Couldn't mark start_slot as root on startup");
blockstore
.set_and_chain_connected_on_root(bank.slot())
.expect("Couldn't mark start_slot as connected during startup")
} else {
info!(
"Starting slot {} isn't root and won't be updated due to being secondary blockstore access",
Expand Down

0 comments on commit 7fd50bc

Please sign in to comment.