Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
blockstore: add merkle root to ErasureMeta column family
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar committed Oct 24, 2023
1 parent 612e8e8 commit 924d4fb
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 28 deletions.
4 changes: 2 additions & 2 deletions gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
itertools::Itertools,
solana_ledger::{
blockstore::BlockstoreError,
blockstore_meta::{DuplicateSlotProof, ErasureMeta},
blockstore_meta::{DuplicateSlotProof, MerkleErasureMeta},
shred::{self, Shred, ShredType},
},
solana_sdk::{
Expand Down Expand Up @@ -141,7 +141,7 @@ where
// a part of the same fec set. Further work to enhance detection is planned in
// https://github.com/solana-labs/solana/issues/33037
if shred1.fec_set_index() == shred2.fec_set_index()
&& !ErasureMeta::check_erasure_consistency(shred1, shred2)
&& !MerkleErasureMeta::check_erasure_consistency(shred1, shred2)
{
return Ok(());
}
Expand Down
72 changes: 61 additions & 11 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ impl Blockstore {
false
}

fn erasure_meta(&self, erasure_set: ErasureSetId) -> Result<Option<ErasureMeta>> {
self.erasure_meta_cf.get(erasure_set.store_key())
fn erasure_meta(&self, erasure_set: ErasureSetId) -> Result<Option<MerkleErasureMeta>> {
self.erasure_meta_cf.get_new_or_old(erasure_set.store_key())
}

/// Check whether the specified slot is an orphan slot which does not
Expand Down Expand Up @@ -594,7 +594,7 @@ impl Blockstore {
fn get_recovery_data_shreds<'a>(
index: &'a Index,
slot: Slot,
erasure_meta: &'a ErasureMeta,
erasure_meta: &'a MerkleErasureMeta,
prev_inserted_shreds: &'a HashMap<ShredId, Shred>,
data_cf: &'a LedgerColumn<cf::ShredData>,
) -> impl Iterator<Item = Shred> + 'a {
Expand All @@ -619,7 +619,7 @@ impl Blockstore {
fn get_recovery_coding_shreds<'a>(
index: &'a Index,
slot: Slot,
erasure_meta: &'a ErasureMeta,
erasure_meta: &'a MerkleErasureMeta,
prev_inserted_shreds: &'a HashMap<ShredId, Shred>,
code_cf: &'a LedgerColumn<cf::ShredCode>,
) -> impl Iterator<Item = Shred> + 'a {
Expand All @@ -643,7 +643,7 @@ impl Blockstore {

fn recover_shreds(
index: &Index,
erasure_meta: &ErasureMeta,
erasure_meta: &MerkleErasureMeta,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
recovered_shreds: &mut Vec<Shred>,
data_cf: &LedgerColumn<cf::ShredData>,
Expand Down Expand Up @@ -677,7 +677,7 @@ impl Blockstore {

fn submit_metrics(
slot: Slot,
erasure_meta: &ErasureMeta,
erasure_meta: &MerkleErasureMeta,
attempted: bool,
status: String,
recovered: usize,
Expand Down Expand Up @@ -725,7 +725,7 @@ impl Blockstore {

fn try_shred_recovery(
&self,
erasure_metas: &HashMap<ErasureSetId, ErasureMeta>,
erasure_metas: &HashMap<ErasureSetId, MerkleErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
reed_solomon_cache: &ReedSolomonCache,
Expand Down Expand Up @@ -1164,7 +1164,7 @@ impl Blockstore {
fn check_insert_coding_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
erasure_metas: &mut HashMap<ErasureSetId, MerkleErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_received_shreds: &mut HashMap<ShredId, Shred>,
Expand Down Expand Up @@ -1202,7 +1202,7 @@ impl Blockstore {
let erasure_meta = erasure_metas.entry(erasure_set).or_insert_with(|| {
self.erasure_meta(erasure_set)
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap())
.unwrap_or_else(|| MerkleErasureMeta::from_coding_shred(&shred).unwrap())
});

if !erasure_meta.check_coding_shred(&shred) {
Expand Down Expand Up @@ -1275,7 +1275,7 @@ impl Blockstore {
&self,
shred: &Shred,
slot: Slot,
erasure_meta: &ErasureMeta,
erasure_meta: &MerkleErasureMeta,
just_received_shreds: &HashMap<ShredId, Shred>,
) -> Option<Vec<u8>> {
// Search for the shred which set the initial erasure config, either inserted,
Expand Down Expand Up @@ -1340,7 +1340,7 @@ impl Blockstore {
fn check_insert_data_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
erasure_metas: &mut HashMap<ErasureSetId, MerkleErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
Expand Down Expand Up @@ -7504,6 +7504,56 @@ pub mod tests {
assert_eq!(meta.fee, index1_slot * 1000);
}

#[test]
#[allow(deprecated)]
fn test_erasure_meta_migration() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let erasure_meta_cf = &blockstore.erasure_meta_cf;

let config = ErasureConfig {
num_data: 1,
num_coding: 17,
};
let erasure_meta_old = ErasureMeta {
set_index: 5,
first_coding_index: 8,
config,
__unused_size: 0,
};

erasure_meta_cf
.put_old_type((100, 5), &erasure_meta_old)
.unwrap();

let erasure_meta = erasure_meta_cf.get_new_or_old((100, 5)).unwrap().unwrap();
assert_eq!(erasure_meta.set_index(), erasure_meta_old.set_index);
assert_eq!(
erasure_meta.first_coding_index(),
erasure_meta_old.first_coding_index
);
assert_eq!(erasure_meta.config(), erasure_meta_old.config);
assert_eq!(erasure_meta.merkle_root(), Hash::default());

let erasure_meta_new = ErasureMeta {
set_index: 3,
first_coding_index: 2,
config,
__unused_size: 0,
}
.into();
erasure_meta_cf.put((101, 3), &erasure_meta_new).unwrap();

let erasure_meta = erasure_meta_cf.get_new_or_old((101, 3)).unwrap().unwrap();
assert_eq!(erasure_meta.set_index(), erasure_meta_new.set_index());
assert_eq!(
erasure_meta.first_coding_index(),
erasure_meta_new.first_coding_index()
);
assert_eq!(erasure_meta.config(), erasure_meta_new.config());
assert_eq!(erasure_meta.merkle_root(), Hash::default());
}

#[test]
fn test_get_transaction_status() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
Expand Down
63 changes: 60 additions & 3 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,13 @@ pub mod columns {
///
/// This column family stores ErasureMeta which includes metadata about
/// dropped network packets (or erasures) that can be used to recover
/// missing data shreds.
/// missing data shreds. For merkle shreds, it also stores the merkle root.
///
/// Its index type is `crate::shred::ErasureSetId`, which consists of a Slot ID
/// and a FEC (Forward Error Correction) set index.
///
/// * index type: `crate::shred::ErasureSetId` `(Slot, fec_set_index: u64)`
/// * value type: [`blockstore_meta::ErasureMeta`]
/// * value type: [`blockstore_meta::MerkleErasureMeta`]
pub struct ErasureMeta;

#[derive(Debug)]
Expand Down Expand Up @@ -802,6 +802,12 @@ pub trait ColumnIndexDeprecation: Column {
}
}

/// Helper trait to transition a column between bincode formats
pub trait BincodeTypeTransition: Column + TypedColumn {
/// This should have a different serialized size than the TypedColumn::Type
type OldType: Serialize + DeserializeOwned + Into<Self::Type>;
}

impl Column for columns::TransactionStatus {
type Index = (Signature, Slot);

Expand Down Expand Up @@ -1216,7 +1222,11 @@ impl ColumnName for columns::ErasureMeta {
const NAME: &'static str = ERASURE_META_CF;
}
impl TypedColumn for columns::ErasureMeta {
type Type = blockstore_meta::ErasureMeta;
type Type = blockstore_meta::MerkleErasureMeta;
}
impl BincodeTypeTransition for columns::ErasureMeta {
#[allow(deprecated)]
type OldType = blockstore_meta::ErasureMeta;
}

impl SlotColumn for columns::OptimisticSlots {}
Expand Down Expand Up @@ -1796,6 +1806,42 @@ where
}
}

impl<C> LedgerColumn<C>
where
C: BincodeTypeTransition + ColumnName,
{
/// Read the column in either the `C::Type` or `C::OldType` format.
pub fn get_new_or_old(&self, key: C::Index) -> Result<Option<C::Type>> {
self.get_new_or_old_raw(&C::key(key))
}

pub fn get_new_or_old_raw(&self, key: &[u8]) -> Result<Option<C::Type>> {
let mut result = Ok(None);
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.read_perf_status,
);
if let Some(pinnable_slice) = self.backend.get_pinned_cf(self.handle(), key)? {
let value = if let Ok(value) = deserialize::<C::Type>(pinnable_slice.as_ref()) {
value
} else {
deserialize::<C::OldType>(pinnable_slice.as_ref())?.into()
};
result = Ok(Some(value))
}

if let Some(op_start_instant) = is_perf_enabled {
report_rocksdb_read_perf(
C::NAME,
PERF_METRIC_OP_NAME_GET,
&op_start_instant.elapsed(),
&self.column_options,
);
}
result
}
}

impl<'a> WriteBatch<'a> {
pub fn put_bytes<C: Column + ColumnName>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> {
self.write_batch
Expand Down Expand Up @@ -2236,6 +2282,17 @@ pub mod tests {
}
}

impl<C> LedgerColumn<C>
where
C: BincodeTypeTransition + ColumnName,
{
pub fn put_old_type(&self, key: C::Index, value: &C::OldType) -> Result<()> {
let serialized_value = serialize(value)?;
self.backend
.put_cf(self.handle(), &C::key(key), &serialized_value)
}
}

impl<C> LedgerColumn<C>
where
C: ColumnIndexDeprecation + ColumnName,
Expand Down
64 changes: 52 additions & 12 deletions ledger/src/blockstore_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,24 +118,38 @@ pub struct ShredIndex {
index: BTreeSet<u64>,
}

#[deprecated = "Use MerkleErasureMeta"]
#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
/// Erasure coding information
pub struct ErasureMeta {
/// Which erasure set in the slot this is
set_index: u64,
pub(crate) set_index: u64,
/// First coding index in the FEC set
first_coding_index: u64,
pub(crate) first_coding_index: u64,
/// Size of shards in this erasure set
#[serde(rename = "size")]
__unused_size: usize,
pub(crate) __unused_size: usize,
/// Erasure configuration for this erasure set
pub(crate) config: ErasureConfig,
}

#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
/// Erasure coding information for merkle shreds
pub struct MerkleErasureMeta {
/// Which erasure set in the slot this is
set_index: u64,
/// First coding index in the FEC set
first_coding_index: u64,
/// Erasure configuration for this erasure set
config: ErasureConfig,
/// Merkle root for this FEC set
merkle_root: Hash,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub(crate) struct ErasureConfig {
num_data: usize,
num_coding: usize,
pub(crate) num_data: usize,
pub(crate) num_coding: usize,
}

#[derive(Deserialize, Serialize)]
Expand Down Expand Up @@ -321,7 +335,7 @@ impl SlotMeta {
}
}

impl ErasureMeta {
impl MerkleErasureMeta {
pub(crate) fn from_coding_shred(shred: &Shred) -> Option<Self> {
match shred.shred_type() {
ShredType::Data => None,
Expand All @@ -331,11 +345,12 @@ impl ErasureMeta {
num_coding: usize::from(shred.num_coding_shreds().ok()?),
};
let first_coding_index = u64::from(shred.first_coding_index()?);
let erasure_meta = ErasureMeta {
let merkle_root = Hash::default();
let erasure_meta = MerkleErasureMeta {
set_index: u64::from(shred.fec_set_index()),
config,
first_coding_index,
__unused_size: 0,
merkle_root,
};
Some(erasure_meta)
}
Expand All @@ -345,10 +360,9 @@ impl ErasureMeta {
// Returns true if the erasure fields on the shred
// are consistent with the erasure-meta.
pub(crate) fn check_coding_shred(&self, shred: &Shred) -> bool {
let Some(mut other) = Self::from_coding_shred(shred) else {
let Some(other) = Self::from_coding_shred(shred) else {
return false;
};
other.__unused_size = self.__unused_size;
self == &other
}

Expand Down Expand Up @@ -394,6 +408,32 @@ impl ErasureMeta {
StillNeed(num_needed)
}
}

#[cfg(test)]
pub(crate) fn set_index(&self) -> u64 {
self.set_index
}

#[cfg(test)]
pub(crate) fn first_coding_index(&self) -> u64 {
self.first_coding_index
}

#[cfg(test)]
pub(crate) fn merkle_root(&self) -> Hash {
self.merkle_root
}
}

impl From<ErasureMeta> for MerkleErasureMeta {
fn from(erasure_meta: ErasureMeta) -> MerkleErasureMeta {
MerkleErasureMeta {
set_index: erasure_meta.set_index,
first_coding_index: erasure_meta.first_coding_index,
config: erasure_meta.config,
merkle_root: Hash::default(),
}
}
}

impl DuplicateSlotProof {
Expand Down Expand Up @@ -511,11 +551,11 @@ mod test {
num_data: 8,
num_coding: 16,
};
let e_meta = ErasureMeta {
let e_meta = MerkleErasureMeta {
set_index,
first_coding_index: set_index,
config: erasure_config,
__unused_size: 0,
merkle_root: Hash::default(),
};
let mut rng = thread_rng();
let mut index = Index::new(0);
Expand Down
Loading

0 comments on commit 924d4fb

Please sign in to comment.