Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Rocks db window service #1888

Merged
merged 11 commits into from
Nov 25, 2018
76 changes: 51 additions & 25 deletions src/db_ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
use bincode::{deserialize, serialize};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use entry::Entry;
use ledger::Block;
use packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use result::{Error, Result};
use rocksdb::{ColumnFamily, Options, WriteBatch, DB};
use serde::de::DeserializeOwned;
use serde::Serialize;
use solana_sdk::pubkey::Pubkey;
use std::borrow::Borrow;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

pub const DB_LEDGER_DIRECTORY: &str = "db_ledger";

Expand Down Expand Up @@ -232,6 +234,8 @@ pub const ERASURE_CF: &str = "erasure";
impl DbLedger {
// Opens a Ledger in directory, provides "infinite" window of blobs
pub fn open(ledger_path: &str) -> Result<Self> {
let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY);

// Use default database options
let mut options = Options::default();
options.create_if_missing(true);
Expand Down Expand Up @@ -260,10 +264,25 @@ impl DbLedger {
})
}

pub fn write_shared_blobs(&mut self, slot: u64, shared_blobs: &[SharedBlob]) -> Result<()> {
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
self.write_blobs(slot, &blobs)
pub fn destroy(ledger_path: &str) -> Result<()> {
let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY);
DB::destroy(&Options::default(), &ledger_path)?;
Ok(())
}

pub fn write_shared_blobs<I>(&mut self, slot: u64, shared_blobs: I) -> Result<()>
where
I: IntoIterator,
I::Item: Borrow<SharedBlob>,
{
for b in shared_blobs {
let bl = b.borrow().read().unwrap();
let index = bl.index()?;
let key = DataCf::key(slot, index);
self.insert_data_blob(&key, &*bl)?;
}

Ok(())
}

pub fn write_blobs<'a, I>(&mut self, slot: u64, blobs: I) -> Result<()>
Expand All @@ -278,12 +297,20 @@ impl DbLedger {
Ok(())
}

pub fn write_entries(&mut self, slot: u64, entries: &[Entry]) -> Result<()> {
let shared_blobs = entries.to_blobs();
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
self.write_blobs(slot, &blobs)?;
Ok(())
pub fn write_entries<I>(&mut self, slot: u64, entries: I) -> Result<()>
where
I: IntoIterator,
I::Item: Borrow<Entry>,
{
let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| {
entry.borrow().to_blob(
Some(idx as u64),
Some(Pubkey::default()),
Some(&default_addr),
)
});
self.write_shared_blobs(slot, shared_blobs)
}

pub fn insert_data_blob(&self, key: &[u8], new_blob: &Blob) -> Result<Vec<Entry>> {
Expand Down Expand Up @@ -421,12 +448,17 @@ impl DbLedger {
}
}

pub fn write_entries_to_ledger(ledger_paths: &[String], entries: &[Entry]) {
pub fn write_entries_to_ledger<I>(ledger_paths: &[&str], entries: I)
where
I: IntoIterator,
I::Item: Borrow<Entry>,
{
let mut entries = entries.into_iter();
for ledger_path in ledger_paths {
let mut db_ledger =
DbLedger::open(ledger_path).expect("Expected to be able to open database ledger");
db_ledger
.write_entries(DEFAULT_SLOT_HEIGHT, &entries)
.write_entries(DEFAULT_SLOT_HEIGHT, entries.by_ref())
.expect("Expected successful write of genesis entries");
}
}
Expand All @@ -435,7 +467,6 @@ pub fn write_entries_to_ledger(ledger_paths: &[String], entries: &[Entry]) {
mod tests {
use super::*;
use ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block};
use rocksdb::{Options, DB};

#[test]
fn test_put_get_simple() {
Expand Down Expand Up @@ -485,8 +516,7 @@ mod tests {

// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}

#[test]
Expand Down Expand Up @@ -548,8 +578,7 @@ mod tests {

// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}

#[test]
Expand Down Expand Up @@ -591,8 +620,7 @@ mod tests {

// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}

#[test]
Expand Down Expand Up @@ -628,8 +656,7 @@ mod tests {

// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}

#[test]
Expand All @@ -644,7 +671,7 @@ mod tests {
let num_entries = 8;
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();

for (b, i) in shared_blobs.iter().zip(0..num_entries) {
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(1 << (i * 8)).unwrap();
}

Expand All @@ -668,7 +695,6 @@ mod tests {
db_iterator.next();
}
}
DB::destroy(&Options::default(), &db_ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
}
17 changes: 10 additions & 7 deletions src/db_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,10 @@ pub fn retransmit_all_leader_blocks(
for b in dq {
// Check if the blob is from the scheduled leader for its slot. If so,
// add to the retransmit_queue
let slot = b.read().unwrap().slot()?;
if let Some(leader_id) = leader_scheduler.get_leader_for_slot(slot) {
add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
if let Ok(slot) = b.read().unwrap().slot() {
if let Some(leader_id) = leader_scheduler.get_leader_for_slot(slot) {
add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
}
}
}

Expand Down Expand Up @@ -273,6 +274,9 @@ pub fn process_blob(
let is_coding = blob.read().unwrap().is_coding();

// Check if the blob is in the range of our known leaders. If not, we return.
// TODO: Need to update slot in broadcast, otherwise this check will fail with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a github issue instead?

// leader rotation enabled
// Github issue: https://github.com/solana-labs/solana/issues/1899.
let slot = blob.read().unwrap().slot()?;
let leader = leader_scheduler.get_leader_for_slot(slot);

Expand All @@ -292,12 +296,11 @@ pub fn process_blob(
)?;
vec![]
} else {
let data_key = ErasureCf::key(slot, pix);
let data_key = DataCf::key(slot, pix);
db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())?
};

// TODO: Once erasure is fixed, readd that logic here

for entry in &consumed_entries {
*tick_height += entry.is_tick() as u64;
}
Expand Down Expand Up @@ -529,8 +532,8 @@ mod test {
assert!(gap > 3);
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();
for (b, i) in shared_blobs.iter().zip(0..shared_blobs.len() as u64) {
b.write().unwrap().set_index(i * gap).unwrap();
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64 * gap).unwrap();
}
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
Expand Down
Loading