From ac7b56201ec2f9ec1f58e5c6271c506a513d3c1b Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Sat, 21 Oct 2023 04:49:28 +0000 Subject: [PATCH 1/5] add merkle root meta column to blockstore --- ledger/src/blockstore.rs | 435 +++++++++++++++++++++- ledger/src/blockstore/blockstore_purge.rs | 8 + ledger/src/blockstore_db.rs | 51 +++ ledger/src/blockstore_meta.rs | 35 ++ ledger/src/shred.rs | 4 + 5 files changed, 532 insertions(+), 1 deletion(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 28c463646c43c7..f8245df1858d51 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -215,6 +215,7 @@ pub struct Blockstore { bank_hash_cf: LedgerColumn, optimistic_slots_cf: LedgerColumn, max_root: AtomicU64, + merkle_root_meta_cf: LedgerColumn, insert_shreds_lock: Mutex<()>, new_shreds_signals: Mutex>>, completed_slots_senders: Mutex>, @@ -315,6 +316,7 @@ impl Blockstore { let program_costs_cf = db.column(); let bank_hash_cf = db.column(); let optimistic_slots_cf = db.column(); + let merkle_root_meta_cf = db.column(); let db = Arc::new(db); @@ -352,6 +354,7 @@ impl Blockstore { program_costs_cf, bank_hash_cf, optimistic_slots_cf, + merkle_root_meta_cf, new_shreds_signals: Mutex::default(), completed_slots_senders: Mutex::default(), shred_timing_point_sender: None, @@ -711,6 +714,7 @@ impl Blockstore { self.program_costs_cf.submit_rocksdb_cf_metrics(); self.bank_hash_cf.submit_rocksdb_cf_metrics(); self.optimistic_slots_cf.submit_rocksdb_cf_metrics(); + self.merkle_root_meta_cf.submit_rocksdb_cf_metrics(); } fn try_shred_recovery( @@ -831,6 +835,7 @@ impl Blockstore { let mut just_inserted_shreds = HashMap::with_capacity(shreds.len()); let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); let mut duplicate_shreds = vec![]; @@ -850,6 +855,7 @@ impl Blockstore { match self.check_insert_data_shred( shred, &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut slot_meta_working_set, &mut write_batch, @@ -887,6 +893,7 @@ impl Blockstore { self.check_insert_coding_shred( shred, &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_inserted_shreds, @@ -933,6 +940,7 @@ impl Blockstore { match self.check_insert_data_shred( shred.clone(), &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut slot_meta_working_set, &mut write_batch, @@ -998,6 +1006,10 @@ impl Blockstore { write_batch.put::(erasure_set.store_key(), &erasure_meta)?; } + for (erasure_set, merkle_root_meta) in merkle_root_metas { + write_batch.put::(erasure_set.store_key(), &merkle_root_meta)?; + } + for (&slot, index_working_set_entry) in index_working_set.iter() { if index_working_set_entry.did_insert_occur { write_batch.put::(slot, &index_working_set_entry.index)?; @@ -1155,6 +1167,7 @@ impl Blockstore { &self, shred: Shred, erasure_metas: &mut HashMap, + merkle_root_metas: &mut HashMap, index_working_set: &mut HashMap, write_batch: &mut WriteBatch, just_received_shreds: &mut HashMap, @@ -1194,6 +1207,11 @@ impl Blockstore { .expect("Expect database get to succeed") .unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap()) }); + let _merkle_root_meta = merkle_root_metas.entry(erasure_set).or_insert_with(|| { + self.merkle_root_meta(erasure_set) + .expect("Expect database get to succeed") + .unwrap_or_else(|| MerkleRootMeta::from_shred(&shred)) + }); if !erasure_meta.check_coding_shred(&shred) { metrics.num_coding_shreds_invalid_erasure_config += 1; @@ -1331,6 +1349,7 @@ impl Blockstore { &self, shred: Shred, erasure_metas: &mut HashMap, + merkle_root_metas: &mut HashMap, index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, @@ -1356,6 +1375,12 @@ impl Blockstore { ); let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut(); + let erasure_set = shred.erasure_set(); + let _merkle_root_meta = merkle_root_metas.entry(erasure_set).or_insert_with(|| { + self.merkle_root_meta(erasure_set) + .expect("Expect database get to succeed") + .unwrap_or_else(|| MerkleRootMeta::from_shred(&shred)) + }); if !is_trusted { if Self::is_data_shred_present(&shred, slot_meta, index_meta.data()) { @@ -1390,7 +1415,6 @@ impl Blockstore { } } - let erasure_set = shred.erasure_set(); let newly_completed_data_sets = self.insert_data_shred( slot_meta, index_meta.data_mut(), @@ -3183,6 +3207,10 @@ impl Blockstore { .unwrap_or(false) } + fn merkle_root_meta(&self, erasure_set: ErasureSetId) -> Result> { + self.merkle_root_meta_cf.get(erasure_set.store_key()) + } + pub fn insert_optimistic_slot( &self, slot: Slot, @@ -6719,6 +6747,408 @@ pub mod tests { ),); } + #[test] + fn test_merkle_root_metas_coding() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let slot = 1; + let index = 12; + let fec_set_index = 11; + let coding_shred = Shred::new_from_parity_shard( + slot, + index, + &[], // parity_shard + fec_set_index, + 11, // num_data_shreds + 11, // num_coding_shreds + 8, // position + 0, // version + ); + + let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); + let mut index_working_set = HashMap::new(); + let mut just_received_shreds = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + let mut index_meta_time_us = 0; + assert!(blockstore.check_insert_coding_shred( + coding_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + &mut vec![], + false, + ShredSource::Turbine, + &mut BlockstoreInsertionMetrics::default(), + )); + + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index as u64 + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_type(), + ShredType::Code, + ); + + for (erasure_set, merkle_root_meta) in merkle_root_metas { + write_batch + .put::(erasure_set.store_key(), &merkle_root_meta) + .unwrap(); + } + blockstore.db.write(write_batch).unwrap(); + + // Add a shred with different merkle root and index + let new_coding_shred = Shred::new_from_parity_shard( + slot, + index + 1, + &[], // parity_shard + fec_set_index, + 11, // num_data_shreds + 11, // num_coding_shreds + 8, // position + 0, // version + ); + + erasure_metas.clear(); + index_working_set.clear(); + just_received_shreds.clear(); + let mut merkle_root_metas = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + + assert!(blockstore.check_insert_coding_shred( + new_coding_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + &mut vec![], + false, + ShredSource::Turbine, + &mut BlockstoreInsertionMetrics::default(), + )); + + // Verify that we still have the merkle root meta from the original shred + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index as u64 + ); + + // Blockstore should also have the merkle root meta of the original shred + assert_eq!( + blockstore + .merkle_root_meta(coding_shred.erasure_set()) + .unwrap() + .unwrap() + .merkle_root(), + coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + blockstore + .merkle_root_meta(coding_shred.erasure_set()) + .unwrap() + .unwrap() + .first_received_shred_index(), + index as u64 + ); + + // Add a shred from different fec set + let new_index = fec_set_index + 31; + let new_coding_shred = Shred::new_from_parity_shard( + slot, + new_index, + &[], // parity_shard + fec_set_index + 30, // fec_set_index + 11, // num_data_shreds + 11, // num_coding_shreds + 8, // position + 0, // version + ); + + assert!(blockstore.check_insert_coding_shred( + new_coding_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + &mut vec![], + false, + ShredSource::Turbine, + &mut BlockstoreInsertionMetrics::default(), + )); + + // Verify that we still have the merkle root meta for the original shred + // and the new shred + assert_eq!(merkle_root_metas.len(), 2); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index as u64 + ); + assert_eq!( + merkle_root_metas + .get(&new_coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + new_coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&new_coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + new_index as u64 + ); + } + + #[test] + fn test_merkle_root_metas_data() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let slot = 1; + let index = 12; + let fec_set_index = 11; + let data_shred = Shred::new_from_data( + slot, + index, + 1, // parent_offset + &[1, 1, 1], // data + ShredFlags::empty(), + 0, // reference_tick, + 0, // version + fec_set_index, + ); + + let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); + let mut index_working_set = HashMap::new(); + let mut just_received_shreds = HashMap::new(); + let mut slot_meta_working_set = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + let mut index_meta_time_us = 0; + blockstore + .check_insert_data_shred( + data_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + false, + &mut vec![], + None, + ShredSource::Turbine, + ) + .unwrap(); + + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .merkle_root(), + data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index as u64 + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_type(), + ShredType::Data, + ); + + for (erasure_set, merkle_root_meta) in merkle_root_metas { + write_batch + .put::(erasure_set.store_key(), &merkle_root_meta) + .unwrap(); + } + blockstore.db.write(write_batch).unwrap(); + + // Add a shred with different merkle root and index + let new_data_shred = Shred::new_from_data( + slot, + index + 1, + 1, // parent_offset + &[2, 2, 2], // data + ShredFlags::empty(), + 0, // reference_tick, + 0, // version + fec_set_index, + ); + + erasure_metas.clear(); + index_working_set.clear(); + just_received_shreds.clear(); + let mut merkle_root_metas = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + + blockstore + .check_insert_data_shred( + new_data_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + false, + &mut vec![], + None, + ShredSource::Turbine, + ) + .unwrap(); + + // Verify that we still have the merkle root meta from the original shred + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .merkle_root(), + data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index as u64 + ); + + // Blockstore should also have the merkle root meta of the original shred + assert_eq!( + blockstore + .merkle_root_meta(data_shred.erasure_set()) + .unwrap() + .unwrap() + .merkle_root(), + data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + blockstore + .merkle_root_meta(data_shred.erasure_set()) + .unwrap() + .unwrap() + .first_received_shred_index(), + index as u64 + ); + + // Add a shred from different fec set + let new_index = fec_set_index + 31; + let new_data_shred = Shred::new_from_data( + slot, + new_index, + 1, // parent_offset + &[3, 3, 3], // data + ShredFlags::empty(), + 0, // reference_tick, + 0, // version + fec_set_index + 30, + ); + + blockstore + .check_insert_data_shred( + new_data_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + false, + &mut vec![], + None, + ShredSource::Turbine, + ) + .unwrap(); + + // Verify that we still have the merkle root meta for the original shred + // and the new shred + assert_eq!(merkle_root_metas.len(), 2); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .merkle_root(), + data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index as u64 + ); + assert_eq!( + merkle_root_metas + .get(&new_data_shred.erasure_set()) + .unwrap() + .merkle_root(), + new_data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&new_data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + new_index as u64 + ); + } + #[test] fn test_check_insert_coding_shred() { let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -6737,6 +7167,7 @@ pub mod tests { ); let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); let mut index_working_set = HashMap::new(); let mut just_received_shreds = HashMap::new(); let mut write_batch = blockstore.db.batch().unwrap(); @@ -6744,6 +7175,7 @@ pub mod tests { assert!(blockstore.check_insert_coding_shred( coding_shred.clone(), &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_received_shreds, @@ -6759,6 +7191,7 @@ pub mod tests { assert!(!blockstore.check_insert_coding_shred( coding_shred.clone(), &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_received_shreds, diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 9669f8bd305a00..f6b3662ed19e28 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -220,6 +220,10 @@ impl Blockstore { & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) .is_ok(); match purge_type { PurgeType::Exact => { @@ -329,6 +333,10 @@ impl Blockstore { .db .delete_file_in_range_cf::(from_slot, to_slot) .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() } /// Returns true if the special columns, TransactionStatus and diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index b65df82ee00c9e..627e35e88ec82e 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -2,6 +2,7 @@ pub use rocksdb::Direction as IteratorDirection; use { crate::{ blockstore_meta, + blockstore_meta::MerkleRootMeta, blockstore_metrics::{ maybe_enable_rocksdb_perf, report_rocksdb_read_perf, report_rocksdb_write_perf, BlockstoreRocksDbColumnFamilyMetrics, PerfSamplingStatus, PERF_METRIC_OP_NAME_GET, @@ -103,6 +104,8 @@ const BLOCK_HEIGHT_CF: &str = "block_height"; const PROGRAM_COSTS_CF: &str = "program_costs"; /// Column family for optimistic slots const OPTIMISTIC_SLOTS_CF: &str = "optimistic_slots"; +/// Column family for merkle roots +const MERKLE_ROOT_CF: &str = "merkle_root_meta"; #[derive(Error, Debug)] pub enum BlockstoreError { @@ -339,6 +342,19 @@ pub mod columns { /// * value type: [`blockstore_meta::OptimisticSlotMetaVersioned`] pub struct OptimisticSlots; + #[derive(Debug)] + /// The merkle root meta column + /// + /// Each merkle shred is part of a merkle tree for + /// its FEC set. This column stores that merkle root and associated + /// meta information about the first shred received. + /// + /// Its index type is (Slot, FEC) set index. + /// + /// * index type: `crate::shred::ErasureSetId` `(Slot, fec_set_index: u64)` + /// * value type: [`blockstore_meta::MerkleRootMeta`]` + pub struct MerkleRootMeta; + // When adding a new column ... // - Add struct below and implement `Column` and `ColumnName` traits // - Add descriptor in Rocks::cf_descriptors() and name in Rocks::columns() @@ -474,6 +490,7 @@ impl Rocks { new_cf_descriptor::(options, oldest_slot), new_cf_descriptor::(options, oldest_slot), new_cf_descriptor::(options, oldest_slot), + new_cf_descriptor::(options, oldest_slot), ] } @@ -501,6 +518,7 @@ impl Rocks { BlockHeight::NAME, ProgramCosts::NAME, OptimisticSlots::NAME, + MerkleRootMeta::NAME, ] } @@ -1227,6 +1245,39 @@ impl TypedColumn for columns::OptimisticSlots { type Type = blockstore_meta::OptimisticSlotMetaVersioned; } +impl Column for columns::MerkleRootMeta { + type Index = (Slot, u64); + + fn index(key: &[u8]) -> (Slot, u64) { + let slot = BigEndian::read_u64(&key[..8]); + let set_index = BigEndian::read_u64(&key[8..]); + + (slot, set_index) + } + + fn key((slot, set_index): (Slot, u64)) -> Vec { + let mut key = vec![0; 16]; + BigEndian::write_u64(&mut key[..8], slot); + BigEndian::write_u64(&mut key[8..], set_index); + key + } + + fn slot(index: Self::Index) -> Slot { + index.0 + } + + fn as_index(slot: Slot) -> Self::Index { + (slot, 0) + } +} + +impl ColumnName for columns::MerkleRootMeta { + const NAME: &'static str = MERKLE_ROOT_CF; +} +impl TypedColumn for columns::MerkleRootMeta { + type Type = MerkleRootMeta; +} + #[derive(Debug)] pub struct Database { backend: Arc, diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 79954ee96b6d04..f69f70d1ce215a 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -138,6 +138,16 @@ pub(crate) struct ErasureConfig { num_coding: usize, } +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct MerkleRootMeta { + /// The merkle root + merkle_root: Hash, + /// The first received shred index + first_received_shred_index: u64, + /// The shred type of the first received shred + first_received_shred_type: ShredType, +} + #[derive(Deserialize, Serialize)] pub struct DuplicateSlotProof { #[serde(with = "serde_bytes")] @@ -396,6 +406,31 @@ impl ErasureMeta { } } +impl MerkleRootMeta { + pub(crate) fn from_shred(shred: &Shred) -> Self { + Self { + merkle_root: shred.merkle_root().unwrap_or_default(), + first_received_shred_index: u64::from(shred.index()), + first_received_shred_type: shred.shred_type(), + } + } + + #[cfg(test)] + pub(crate) fn merkle_root(&self) -> Hash { + self.merkle_root + } + + #[cfg(test)] + pub(crate) fn first_received_shred_index(&self) -> u64 { + self.first_received_shred_index + } + + #[cfg(test)] + pub(crate) fn first_received_shred_type(&self) -> ShredType { + self.first_received_shred_type + } +} + impl DuplicateSlotProof { pub(crate) fn new(shred1: Vec, shred2: Vec) -> Self { DuplicateSlotProof { shred1, shred2 } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 5fda160e29b976..50056cd3ab1e33 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -551,6 +551,10 @@ impl Shred { Self::ShredData(_) => Err(Error::InvalidShredType), } } + + pub fn merkle_root(&self) -> Option { + layout::get_merkle_root(self.payload()) + } } // Helper methods to extract pieces of the shred from the payload From 45f504a4cb9e3a2a171b93115333f21ecb517204 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Thu, 9 Nov 2023 17:02:15 +0000 Subject: [PATCH 2/5] pr feedback: remove write/reads to column --- ledger/src/blockstore.rs | 429 ---------------------------------- ledger/src/blockstore_db.rs | 14 +- ledger/src/blockstore_meta.rs | 25 -- ledger/src/shred.rs | 4 - 4 files changed, 7 insertions(+), 465 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index f8245df1858d51..9cfc28fa9dd651 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -835,7 +835,6 @@ impl Blockstore { let mut just_inserted_shreds = HashMap::with_capacity(shreds.len()); let mut erasure_metas = HashMap::new(); - let mut merkle_root_metas = HashMap::new(); let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); let mut duplicate_shreds = vec![]; @@ -855,7 +854,6 @@ impl Blockstore { match self.check_insert_data_shred( shred, &mut erasure_metas, - &mut merkle_root_metas, &mut index_working_set, &mut slot_meta_working_set, &mut write_batch, @@ -893,7 +891,6 @@ impl Blockstore { self.check_insert_coding_shred( shred, &mut erasure_metas, - &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_inserted_shreds, @@ -940,7 +937,6 @@ impl Blockstore { match self.check_insert_data_shred( shred.clone(), &mut erasure_metas, - &mut merkle_root_metas, &mut index_working_set, &mut slot_meta_working_set, &mut write_batch, @@ -1006,10 +1002,6 @@ impl Blockstore { write_batch.put::(erasure_set.store_key(), &erasure_meta)?; } - for (erasure_set, merkle_root_meta) in merkle_root_metas { - write_batch.put::(erasure_set.store_key(), &merkle_root_meta)?; - } - for (&slot, index_working_set_entry) in index_working_set.iter() { if index_working_set_entry.did_insert_occur { write_batch.put::(slot, &index_working_set_entry.index)?; @@ -1167,7 +1159,6 @@ impl Blockstore { &self, shred: Shred, erasure_metas: &mut HashMap, - merkle_root_metas: &mut HashMap, index_working_set: &mut HashMap, write_batch: &mut WriteBatch, just_received_shreds: &mut HashMap, @@ -1207,11 +1198,6 @@ impl Blockstore { .expect("Expect database get to succeed") .unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap()) }); - let _merkle_root_meta = merkle_root_metas.entry(erasure_set).or_insert_with(|| { - self.merkle_root_meta(erasure_set) - .expect("Expect database get to succeed") - .unwrap_or_else(|| MerkleRootMeta::from_shred(&shred)) - }); if !erasure_meta.check_coding_shred(&shred) { metrics.num_coding_shreds_invalid_erasure_config += 1; @@ -1349,7 +1335,6 @@ impl Blockstore { &self, shred: Shred, erasure_metas: &mut HashMap, - merkle_root_metas: &mut HashMap, index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, @@ -1376,11 +1361,6 @@ impl Blockstore { let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut(); let erasure_set = shred.erasure_set(); - let _merkle_root_meta = merkle_root_metas.entry(erasure_set).or_insert_with(|| { - self.merkle_root_meta(erasure_set) - .expect("Expect database get to succeed") - .unwrap_or_else(|| MerkleRootMeta::from_shred(&shred)) - }); if !is_trusted { if Self::is_data_shred_present(&shred, slot_meta, index_meta.data()) { @@ -3207,10 +3187,6 @@ impl Blockstore { .unwrap_or(false) } - fn merkle_root_meta(&self, erasure_set: ErasureSetId) -> Result> { - self.merkle_root_meta_cf.get(erasure_set.store_key()) - } - pub fn insert_optimistic_slot( &self, slot: Slot, @@ -6747,408 +6723,6 @@ pub mod tests { ),); } - #[test] - fn test_merkle_root_metas_coding() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - - let slot = 1; - let index = 12; - let fec_set_index = 11; - let coding_shred = Shred::new_from_parity_shard( - slot, - index, - &[], // parity_shard - fec_set_index, - 11, // num_data_shreds - 11, // num_coding_shreds - 8, // position - 0, // version - ); - - let mut erasure_metas = HashMap::new(); - let mut merkle_root_metas = HashMap::new(); - let mut index_working_set = HashMap::new(); - let mut just_received_shreds = HashMap::new(); - let mut write_batch = blockstore.db.batch().unwrap(); - let mut index_meta_time_us = 0; - assert!(blockstore.check_insert_coding_shred( - coding_shred.clone(), - &mut erasure_metas, - &mut merkle_root_metas, - &mut index_working_set, - &mut write_batch, - &mut just_received_shreds, - &mut index_meta_time_us, - &mut vec![], - false, - ShredSource::Turbine, - &mut BlockstoreInsertionMetrics::default(), - )); - - assert_eq!(merkle_root_metas.len(), 1); - assert_eq!( - merkle_root_metas - .get(&coding_shred.erasure_set()) - .unwrap() - .merkle_root(), - coding_shred.merkle_root().unwrap_or_default() - ); - assert_eq!( - merkle_root_metas - .get(&coding_shred.erasure_set()) - .unwrap() - .first_received_shred_index(), - index as u64 - ); - assert_eq!( - merkle_root_metas - .get(&coding_shred.erasure_set()) - .unwrap() - .first_received_shred_type(), - ShredType::Code, - ); - - for (erasure_set, merkle_root_meta) in merkle_root_metas { - write_batch - .put::(erasure_set.store_key(), &merkle_root_meta) - .unwrap(); - } - blockstore.db.write(write_batch).unwrap(); - - // Add a shred with different merkle root and index - let new_coding_shred = Shred::new_from_parity_shard( - slot, - index + 1, - &[], // parity_shard - fec_set_index, - 11, // num_data_shreds - 11, // num_coding_shreds - 8, // position - 0, // version - ); - - erasure_metas.clear(); - index_working_set.clear(); - just_received_shreds.clear(); - let mut merkle_root_metas = HashMap::new(); - let mut write_batch = blockstore.db.batch().unwrap(); - - assert!(blockstore.check_insert_coding_shred( - new_coding_shred.clone(), - &mut erasure_metas, - &mut merkle_root_metas, - &mut index_working_set, - &mut write_batch, - &mut just_received_shreds, - &mut index_meta_time_us, - &mut vec![], - false, - ShredSource::Turbine, - &mut BlockstoreInsertionMetrics::default(), - )); - - // Verify that we still have the merkle root meta from the original shred - assert_eq!(merkle_root_metas.len(), 1); - assert_eq!( - merkle_root_metas - .get(&coding_shred.erasure_set()) - .unwrap() - .merkle_root(), - coding_shred.merkle_root().unwrap_or_default() - ); - assert_eq!( - merkle_root_metas - .get(&coding_shred.erasure_set()) - .unwrap() - .first_received_shred_index(), - index as u64 - ); - - // Blockstore should also have the merkle root meta of the original shred - assert_eq!( - blockstore - .merkle_root_meta(coding_shred.erasure_set()) - .unwrap() - .unwrap() - .merkle_root(), - coding_shred.merkle_root().unwrap_or_default() - ); - assert_eq!( - blockstore - .merkle_root_meta(coding_shred.erasure_set()) - .unwrap() - .unwrap() - .first_received_shred_index(), - index as u64 - ); - - // Add a shred from different fec set - let new_index = fec_set_index + 31; - let new_coding_shred = Shred::new_from_parity_shard( - slot, - new_index, - &[], // parity_shard - fec_set_index + 30, // fec_set_index - 11, // num_data_shreds - 11, // num_coding_shreds - 8, // position - 0, // version - ); - - assert!(blockstore.check_insert_coding_shred( - new_coding_shred.clone(), - &mut erasure_metas, - &mut merkle_root_metas, - &mut index_working_set, - &mut write_batch, - &mut just_received_shreds, - &mut index_meta_time_us, - &mut vec![], - false, - ShredSource::Turbine, - &mut BlockstoreInsertionMetrics::default(), - )); - - // Verify that we still have the merkle root meta for the original shred - // and the new shred - assert_eq!(merkle_root_metas.len(), 2); - assert_eq!( - merkle_root_metas - .get(&coding_shred.erasure_set()) - .unwrap() - .merkle_root(), - coding_shred.merkle_root().unwrap_or_default() - ); - assert_eq!( - merkle_root_metas - .get(&coding_shred.erasure_set()) - .unwrap() - .first_received_shred_index(), - index as u64 - ); - assert_eq!( - merkle_root_metas - .get(&new_coding_shred.erasure_set()) - .unwrap() - .merkle_root(), - new_coding_shred.merkle_root().unwrap_or_default() - ); - assert_eq!( - merkle_root_metas - .get(&new_coding_shred.erasure_set()) - .unwrap() - .first_received_shred_index(), - new_index as u64 - ); - } - - #[test] - fn test_merkle_root_metas_data() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - - let slot = 1; - let index = 12; - let fec_set_index = 11; - let data_shred = Shred::new_from_data( - slot, - index, - 1, // parent_offset - &[1, 1, 1], // data - ShredFlags::empty(), - 0, // reference_tick, - 0, // version - fec_set_index, - ); - - let mut erasure_metas = HashMap::new(); - let mut merkle_root_metas = HashMap::new(); - let mut index_working_set = HashMap::new(); - let mut just_received_shreds = HashMap::new(); - let mut slot_meta_working_set = HashMap::new(); - let mut write_batch = blockstore.db.batch().unwrap(); - let mut index_meta_time_us = 0; - blockstore - .check_insert_data_shred( - data_shred.clone(), - &mut erasure_metas, - &mut merkle_root_metas, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_received_shreds, - &mut index_meta_time_us, - false, - &mut vec![], - None, - ShredSource::Turbine, - ) - .unwrap(); - - assert_eq!(merkle_root_metas.len(), 1); - assert_eq!( - merkle_root_metas - .get(&data_shred.erasure_set()) - .unwrap() - .merkle_root(), - data_shred.merkle_root().unwrap_or_default() - ); - assert_eq!( - merkle_root_metas - .get(&data_shred.erasure_set()) - .unwrap() - .first_received_shred_index(), - index as u64 - ); - assert_eq!( - merkle_root_metas - .get(&data_shred.erasure_set()) - .unwrap() - .first_received_shred_type(), - ShredType::Data, - ); - - for (erasure_set, merkle_root_meta) in merkle_root_metas { - write_batch - .put::(erasure_set.store_key(), &merkle_root_meta) - .unwrap(); - } - blockstore.db.write(write_batch).unwrap(); - - // Add a shred with different merkle root and index - let new_data_shred = Shred::new_from_data( - slot, - index + 1, - 1, // parent_offset - &[2, 2, 2], // data - ShredFlags::empty(), - 0, // reference_tick, - 0, // version - fec_set_index, - ); - - erasure_metas.clear(); - index_working_set.clear(); - just_received_shreds.clear(); - let mut merkle_root_metas = HashMap::new(); - let mut write_batch = blockstore.db.batch().unwrap(); - - blockstore - .check_insert_data_shred( - new_data_shred.clone(), - &mut erasure_metas, - &mut merkle_root_metas, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_received_shreds, - &mut index_meta_time_us, - false, - &mut vec![], - None, - ShredSource::Turbine, - ) - .unwrap(); - - // Verify that we still have the merkle root meta from the original shred - assert_eq!(merkle_root_metas.len(), 1); - assert_eq!( - merkle_root_metas - .get(&data_shred.erasure_set()) - .unwrap() - .merkle_root(), - data_shred.merkle_root().unwrap_or_default() - ); - assert_eq!( - merkle_root_metas - .get(&data_shred.erasure_set()) - .unwrap() - .first_received_shred_index(), - index as u64 - ); - - // Blockstore should also have the merkle root meta of the original shred - assert_eq!( - blockstore - .merkle_root_meta(data_shred.erasure_set()) - .unwrap() - .unwrap() - .merkle_root(), - data_shred.merkle_root().unwrap_or_default() - ); - assert_eq!( - blockstore - .merkle_root_meta(data_shred.erasure_set()) - .unwrap() - .unwrap() - .first_received_shred_index(), - index as u64 - ); - - // Add a shred from different fec set - let new_index = fec_set_index + 31; - let new_data_shred = Shred::new_from_data( - slot, - new_index, - 1, // parent_offset - &[3, 3, 3], // data - ShredFlags::empty(), - 0, // reference_tick, - 0, // version - fec_set_index + 30, - ); - - blockstore - .check_insert_data_shred( - new_data_shred.clone(), - &mut erasure_metas, - &mut merkle_root_metas, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_received_shreds, - &mut index_meta_time_us, - false, - &mut vec![], - None, - ShredSource::Turbine, - ) - .unwrap(); - - // Verify that we still have the merkle root meta for the original shred - // and the new shred - assert_eq!(merkle_root_metas.len(), 2); - assert_eq!( - merkle_root_metas - .get(&data_shred.erasure_set()) - .unwrap() - .merkle_root(), - data_shred.merkle_root().unwrap_or_default() - ); - assert_eq!( - merkle_root_metas - .get(&data_shred.erasure_set()) - .unwrap() - .first_received_shred_index(), - index as u64 - ); - assert_eq!( - merkle_root_metas - .get(&new_data_shred.erasure_set()) - .unwrap() - .merkle_root(), - new_data_shred.merkle_root().unwrap_or_default() - ); - assert_eq!( - merkle_root_metas - .get(&new_data_shred.erasure_set()) - .unwrap() - .first_received_shred_index(), - new_index as u64 - ); - } - #[test] fn test_check_insert_coding_shred() { let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -7167,7 +6741,6 @@ pub mod tests { ); let mut erasure_metas = HashMap::new(); - let mut merkle_root_metas = HashMap::new(); let mut index_working_set = HashMap::new(); let mut just_received_shreds = HashMap::new(); let mut write_batch = blockstore.db.batch().unwrap(); @@ -7175,7 +6748,6 @@ pub mod tests { assert!(blockstore.check_insert_coding_shred( coding_shred.clone(), &mut erasure_metas, - &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_received_shreds, @@ -7191,7 +6763,6 @@ pub mod tests { assert!(!blockstore.check_insert_coding_shred( coding_shred.clone(), &mut erasure_metas, - &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_received_shreds, diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 627e35e88ec82e..408e53b0837c01 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -105,7 +105,7 @@ const PROGRAM_COSTS_CF: &str = "program_costs"; /// Column family for optimistic slots const OPTIMISTIC_SLOTS_CF: &str = "optimistic_slots"; /// Column family for merkle roots -const MERKLE_ROOT_CF: &str = "merkle_root_meta"; +const MERKLE_ROOT_META_CF: &str = "merkle_root_meta"; #[derive(Error, Debug)] pub enum BlockstoreError { @@ -349,7 +349,7 @@ pub mod columns { /// its FEC set. This column stores that merkle root and associated /// meta information about the first shred received. /// - /// Its index type is (Slot, FEC) set index. + /// Its index type is (Slot, fec_set_index). /// /// * index type: `crate::shred::ErasureSetId` `(Slot, fec_set_index: u64)` /// * value type: [`blockstore_meta::MerkleRootMeta`]` @@ -1250,15 +1250,15 @@ impl Column for columns::MerkleRootMeta { fn index(key: &[u8]) -> (Slot, u64) { let slot = BigEndian::read_u64(&key[..8]); - let set_index = BigEndian::read_u64(&key[8..]); + let fec_set_index = BigEndian::read_u64(&key[8..]); - (slot, set_index) + (slot, fec_set_index) } - fn key((slot, set_index): (Slot, u64)) -> Vec { + fn key((slot, fec_set_index): (Slot, u64)) -> Vec { let mut key = vec![0; 16]; BigEndian::write_u64(&mut key[..8], slot); - BigEndian::write_u64(&mut key[8..], set_index); + BigEndian::write_u64(&mut key[8..], fec_set_index); key } @@ -1272,7 +1272,7 @@ impl Column for columns::MerkleRootMeta { } impl ColumnName for columns::MerkleRootMeta { - const NAME: &'static str = MERKLE_ROOT_CF; + const NAME: &'static str = MERKLE_ROOT_META_CF; } impl TypedColumn for columns::MerkleRootMeta { type Type = MerkleRootMeta; diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index f69f70d1ce215a..77595a870ca7e1 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -406,31 +406,6 @@ impl ErasureMeta { } } -impl MerkleRootMeta { - pub(crate) fn from_shred(shred: &Shred) -> Self { - Self { - merkle_root: shred.merkle_root().unwrap_or_default(), - first_received_shred_index: u64::from(shred.index()), - first_received_shred_type: shred.shred_type(), - } - } - - #[cfg(test)] - pub(crate) fn merkle_root(&self) -> Hash { - self.merkle_root - } - - #[cfg(test)] - pub(crate) fn first_received_shred_index(&self) -> u64 { - self.first_received_shred_index - } - - #[cfg(test)] - pub(crate) fn first_received_shred_type(&self) -> ShredType { - self.first_received_shred_type - } -} - impl DuplicateSlotProof { pub(crate) fn new(shred1: Vec, shred2: Vec) -> Self { DuplicateSlotProof { shred1, shred2 } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 50056cd3ab1e33..5fda160e29b976 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -551,10 +551,6 @@ impl Shred { Self::ShredData(_) => Err(Error::InvalidShredType), } } - - pub fn merkle_root(&self) -> Option { - layout::get_merkle_root(self.payload()) - } } // Helper methods to extract pieces of the shred from the payload From fe0e45e87252f0a79ae40665c119deec6cf2e54b Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Thu, 9 Nov 2023 21:07:59 +0000 Subject: [PATCH 3/5] pr feedback: u64 -> u32 + revert --- ledger/src/blockstore.rs | 2 +- ledger/src/blockstore_meta.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 9cfc28fa9dd651..3010a65be7f90c 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1360,7 +1360,6 @@ impl Blockstore { ); let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut(); - let erasure_set = shred.erasure_set(); if !is_trusted { if Self::is_data_shred_present(&shred, slot_meta, index_meta.data()) { @@ -1395,6 +1394,7 @@ impl Blockstore { } } + let erasure_set = shred.erasure_set(); let newly_completed_data_sets = self.insert_data_shred( slot_meta, index_meta.data_mut(), diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 77595a870ca7e1..41a16c9ae3fee3 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -143,7 +143,7 @@ pub struct MerkleRootMeta { /// The merkle root merkle_root: Hash, /// The first received shred index - first_received_shred_index: u64, + first_received_shred_index: u32, /// The shred type of the first received shred first_received_shred_type: ShredType, } From 247d494c7652329f87dfadcb901f28d3069bcb2e Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Thu, 9 Nov 2023 21:13:21 +0000 Subject: [PATCH 4/5] pr feedback: fec_set_index u32, use Self::Index --- ledger/src/blockstore_db.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 408e53b0837c01..d4f9e107da2118 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -351,7 +351,7 @@ pub mod columns { /// /// Its index type is (Slot, fec_set_index). /// - /// * index type: `crate::shred::ErasureSetId` `(Slot, fec_set_index: u64)` + /// * index type: `crate::shred::ErasureSetId` `(Slot, fec_set_index: u32)` /// * value type: [`blockstore_meta::MerkleRootMeta`]` pub struct MerkleRootMeta; @@ -1246,24 +1246,24 @@ impl TypedColumn for columns::OptimisticSlots { } impl Column for columns::MerkleRootMeta { - type Index = (Slot, u64); + type Index = (Slot, /*fec_set_index:*/ u32); - fn index(key: &[u8]) -> (Slot, u64) { + fn index(key: &[u8]) -> Self::Index { let slot = BigEndian::read_u64(&key[..8]); - let fec_set_index = BigEndian::read_u64(&key[8..]); + let fec_set_index = BigEndian::read_u32(&key[8..]); (slot, fec_set_index) } - fn key((slot, fec_set_index): (Slot, u64)) -> Vec { + fn key((slot, fec_set_index): Self::Index) -> Vec { let mut key = vec![0; 16]; BigEndian::write_u64(&mut key[..8], slot); - BigEndian::write_u64(&mut key[8..], fec_set_index); + BigEndian::write_u32(&mut key[8..], fec_set_index); key } - fn slot(index: Self::Index) -> Slot { - index.0 + fn slot((slot, _fec_set_index): Self::Index) -> Slot { + slot } fn as_index(slot: Slot) -> Self::Index { From f17b5d2178fe5cc751b722e1689e0fc54a7dc750 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Fri, 10 Nov 2023 16:38:45 +0000 Subject: [PATCH 5/5] pr feedback: key size 16 -> 12 --- ledger/src/blockstore_db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index d4f9e107da2118..0b2b14445539d6 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -1256,7 +1256,7 @@ impl Column for columns::MerkleRootMeta { } fn key((slot, fec_set_index): Self::Index) -> Vec { - let mut key = vec![0; 16]; + let mut key = vec![0; 12]; BigEndian::write_u64(&mut key[..8], slot); BigEndian::write_u32(&mut key[8..], fec_set_index); key