From c9d2b9af602da266ba2704d8dc88e979187fe7e6 Mon Sep 17 00:00:00 2001 From: Carl Date: Thu, 8 Nov 2018 01:29:27 -0800 Subject: [PATCH 01/11] Add db_window module for windowing functions from RocksDb --- src/db_window.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/db_window.rs b/src/db_window.rs index 298a0a499429ab..78de5a2c2fa505 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -296,7 +296,22 @@ pub fn process_blob( db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())? }; - // TODO: Once erasure is fixed, readd that logic here + // TODO: Once erasure is fixed, add those to the consumed queue as well + /*#[cfg(feature = "erasure")] + { + let window_size = self.window_size(); + if erasure::recover(id, self, *consumed, (*consumed % window_size) as usize).is_err() { + trace!("{}: erasure::recover failed", id); + } + } + // Check that we can get the entries from the blobs + if let Ok(entries) = reconstruct_entries_from_blobs(vec![consumed_blob_queue]) { + for entry in &entries { + *tick_height += entry.is_tick() as u64; + } + + consume_queue.extend(entries); + }*/ for entry in &consumed_entries { *tick_height += entry.is_tick() as u64; From 73e4e0a69d453ad55e5a8f7a29ffafbc7f8f53aa Mon Sep 17 00:00:00 2001 From: Carl Date: Tue, 13 Nov 2018 02:19:58 -0800 Subject: [PATCH 02/11] Replace window with db_window functions in window_service --- src/fullnode.rs | 5 +- src/replicator.rs | 3 +- src/retransmit_stage.rs | 6 +- src/tvu.rs | 13 +- src/window_service.rs | 262 +++++++++++----------------------------- 5 files changed, 90 insertions(+), 199 deletions(-) diff --git a/src/fullnode.rs b/src/fullnode.rs index 4feddd3b2ddd8f..79621239129122 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -267,7 +267,6 @@ impl Fullnode { entry_height, *last_entry_id, cluster_info.clone(), - shared_window.clone(), node.sockets .replicate .iter() @@ -282,6 +281,8 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(ledger_path), + //TODO: pass db path as argument + format!("{}/db_ledger", ledger_path), ); let tpu_forwarder = TpuForwarder::new( node.sockets @@ -423,7 +424,6 @@ impl Fullnode { entry_height, last_entry_id, self.cluster_info.clone(), - self.shared_window.clone(), self.replicate_socket .iter() .map(|s| s.try_clone().expect("Failed to clone replicate sockets")) @@ -435,6 +435,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(&self.ledger_path), + format!("{}/db_ledger", self.ledger_path), ); let tpu_forwarder = TpuForwarder::new( self.transaction_sockets diff --git a/src/replicator.rs b/src/replicator.rs index c6a8563fb5ea49..5386ae97ed973d 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -71,6 +71,7 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { impl Replicator { pub fn new( + db_ledger: Arc>, entry_height: u64, max_entry_height: u64, exit: &Arc, @@ -105,8 +106,8 @@ impl Replicator { // todo: pull blobs off the retransmit_receiver and recycle them? let (retransmit_sender, retransmit_receiver) = channel(); let t_window = window_service( + db_ledger, cluster_info.clone(), - shared_window.clone(), 0, entry_height, max_entry_height, diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 9f462336e303fd..5f5aacd7f207dd 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -2,6 +2,7 @@ use cluster_info::ClusterInfo; use counter::Counter; +use db_ledger::DbLedger; use entry::Entry; use leader_scheduler::LeaderScheduler; @@ -17,7 +18,6 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use streamer::BlobReceiver; -use window::SharedWindow; use window_service::window_service; fn retransmit( @@ -81,8 +81,8 @@ pub struct RetransmitStage { impl RetransmitStage { pub fn new( + db_ledger: Arc>, cluster_info: &Arc>, - window: SharedWindow, tick_height: u64, entry_height: u64, retransmit_socket: Arc, @@ -97,8 +97,8 @@ impl RetransmitStage { let (entry_sender, entry_receiver) = channel(); let done = Arc::new(AtomicBool::new(false)); let t_window = window_service( + db_ledger, cluster_info.clone(), - window, tick_height, entry_height, 0, diff --git a/src/tvu.rs b/src/tvu.rs index 45a9da86f406b5..0a088468876e13 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -13,6 +13,8 @@ use bank::Bank; use blob_fetch_stage::BlobFetchStage; use cluster_info::ClusterInfo; +use db_ledger::DbLedger; +use hash::Hash; use ledger_write_stage::LedgerWriteStage; use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; use retransmit_stage::RetransmitStage; @@ -62,12 +64,18 @@ impl Tvu { entry_height: u64, last_entry_id: Hash, cluster_info: Arc>, - window: SharedWindow, replicate_sockets: Vec, repair_socket: UdpSocket, retransmit_socket: UdpSocket, ledger_path: Option<&str>, + db_ledger_path: String, ) -> Self { + // Eventually will be passed into LedgerWriteStage as well, so wrap the object + // in a Arc + let db_ledger = Arc::new(RwLock::new( + DbLedger::open(&db_ledger_path).expect("Expected to be able to open RocksDb ledger"), + )); + let exit = Arc::new(AtomicBool::new(false)); let repair_socket = Arc::new(repair_socket); @@ -76,12 +84,13 @@ impl Tvu { blob_sockets.push(repair_socket.clone()); let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket(blob_sockets, exit.clone()); + //TODO //the packets coming out of blob_receiver need to be sent to the GPU and verified //then sent to the window, which does the erasure coding reconstruction let (retransmit_stage, blob_window_receiver) = RetransmitStage::new( + db_ledger, &cluster_info, - window, bank.tick_height(), entry_height, Arc::new(retransmit_socket), diff --git a/src/window_service.rs b/src/window_service.rs index abfc9293bb13f0..8a2a2a4b0f54a8 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -2,6 +2,8 @@ //! use cluster_info::{ClusterInfo, NodeInfo}; use counter::Counter; +use db_ledger::{DbLedger, LedgerColumnFamily, MetaCf, DEFAULT_SLOT_HEIGHT}; +use db_window::*; use entry::EntrySender; use leader_scheduler::LeaderScheduler; @@ -12,6 +14,7 @@ use result::{Error, Result}; use solana_metrics::{influxdb, submit}; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; +use std::borrow::{Borrow, BorrowMut}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::RecvTimeoutError; @@ -19,7 +22,6 @@ use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; use std::time::{Duration, Instant}; use streamer::{BlobReceiver, BlobSender}; -use window::{SharedWindow, WindowUtil}; pub const MAX_REPAIR_BACKOFF: usize = 128; @@ -49,103 +51,12 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool { thread_rng().gen_range(0, *times as u64) == 0 } -fn add_block_to_retransmit_queue( - b: &SharedBlob, - leader_id: Pubkey, - retransmit_queue: &mut Vec, -) { - let p = b.read().unwrap(); - //TODO this check isn't safe against adverserial packets - //we need to maintain a sequence window - trace!( - "idx: {} addr: {:?} id: {:?} leader: {:?}", - p.index() - .expect("get_index in fn add_block_to_retransmit_queue"), - p.id() - .expect("get_id in trace! fn add_block_to_retransmit_queue"), - p.meta.addr(), - leader_id - ); - if p.id().expect("get_id in fn add_block_to_retransmit_queue") == leader_id { - //TODO - //need to copy the retransmitted blob - //otherwise we get into races with which thread - //should do the recycling - // - let nv = SharedBlob::default(); - { - let mut mnv = nv.write().unwrap(); - let sz = p.meta.size; - mnv.meta.size = sz; - mnv.data[..sz].copy_from_slice(&p.data[..sz]); - } - retransmit_queue.push(nv); - } -} - -fn retransmit_all_leader_blocks( - window: &SharedWindow, - maybe_leader: Option, - dq: &[SharedBlob], - id: &Pubkey, - consumed: u64, - received: u64, - retransmit: &BlobSender, - pending_retransmits: &mut bool, -) -> Result<()> { - let mut retransmit_queue: Vec = Vec::new(); - if let Some(leader) = maybe_leader { - let leader_id = leader.id; - for b in dq { - add_block_to_retransmit_queue(b, leader_id, &mut retransmit_queue); - } - - if *pending_retransmits { - for w in window - .write() - .expect("Window write failed in retransmit_all_leader_blocks") - .iter_mut() - { - *pending_retransmits = false; - if w.leader_unknown { - if let Some(ref b) = w.data { - add_block_to_retransmit_queue(b, leader_id, &mut retransmit_queue); - w.leader_unknown = false; - } - } - } - } - submit( - influxdb::Point::new("retransmit-queue") - .add_field( - "count", - influxdb::Value::Integer(retransmit_queue.len() as i64), - ).to_owned(), - ); - } else { - warn!("{}: no leader to retransmit from", id); - } - if !retransmit_queue.is_empty() { - trace!( - "{}: RECV_WINDOW {} {}: retransmit {}", - id, - consumed, - received, - retransmit_queue.len(), - ); - inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len()); - retransmit.send(retransmit_queue)?; - } - Ok(()) -} - #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] fn recv_window( - window: &SharedWindow, + db_ledger: &mut DbLedger, id: &Pubkey, cluster_info: &Arc>, - consumed: &mut u64, - received: &mut u64, + leader_scheduler: &LeaderScheduler, tick_height: &mut u64, max_ix: u64, r: &BlobReceiver, @@ -174,28 +85,14 @@ fn recv_window( .to_owned(), ); - trace!( - "{}: RECV_WINDOW {} {}: got packets {}", - id, - *consumed, - *received, - dq.len(), - ); - - retransmit_all_leader_blocks( - window, - maybe_leader, - &dq, - id, - *consumed, - *received, - retransmit, - pending_retransmits, - )?; + retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit)?; let mut pixs = Vec::new(); //send a contiguous set of blocks let mut consume_queue = Vec::new(); + + trace!("{} num blobs received: {}", id, dq.len()); + for b in dq { let (pix, meta_size) = { let p = b.read().unwrap(); @@ -203,51 +100,21 @@ fn recv_window( }; pixs.push(pix); - if !window - .read() - .unwrap() - .blob_idx_in_window(&id, pix, *consumed, received) - { - continue; - } - - // For downloading storage blobs, - // we only want up to a certain index - // then stop - if max_ix != 0 && pix > max_ix { - continue; - } - trace!("{} window pix: {} size: {}", id, pix, meta_size); - window.write().unwrap().process_blob( + process_blob( + leader_scheduler, + db_ledger, id, b, + max_ix, pix, &mut consume_queue, - consumed, tick_height, - leader_unknown, - pending_retransmits, - ); - - // Send a signal when we hit the max entry_height - if max_ix != 0 && *consumed == (max_ix + 1) { - done.store(true, Ordering::Relaxed); - } - } - if log_enabled!(Level::Trace) { - trace!("{}", window.read().unwrap().print(id, *consumed)); - trace!( - "{}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms", - id, - *consumed, - *received, - consume_queue.len(), - pixs, - duration_as_ms(&now.elapsed()) + done, ); } + if !consume_queue.is_empty() { inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len()); s.send(consume_queue)?; @@ -257,8 +124,8 @@ fn recv_window( #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn window_service( + db_ledger: Arc>, cluster_info: Arc>, - window: SharedWindow, tick_height: u64, entry_height: u64, max_entry_height: u64, @@ -273,21 +140,17 @@ pub fn window_service( .name("solana-window".to_string()) .spawn(move || { let mut tick_height_ = tick_height; - let mut consumed = entry_height; - let mut received = entry_height; let mut last = entry_height; let mut times = 0; let id = cluster_info.read().unwrap().my_data().id; let mut pending_retransmits = false; trace!("{}: RECV_WINDOW started", id); loop { - // Check if leader rotation was configured if let Err(e) = recv_window( - &window, + db_ledger.write().unwrap().borrow_mut(), &id, &cluster_info, - &mut consumed, - &mut received, + leader_scheduler.read().unwrap().borrow(), &mut tick_height_, max_entry_height, &r, @@ -306,45 +169,62 @@ pub fn window_service( } } - submit( - influxdb::Point::new("window-stage") - .add_field("consumed", influxdb::Value::Integer(consumed as i64)) - .to_owned(), - ); - - if received <= consumed { - trace!( - "{} we have everything received:{} consumed:{}", - id, - received, - consumed + let meta = { + let rlock = db_ledger.read().unwrap(); + + rlock + .meta_cf + .get(&rlock.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) + }; + + if let Ok(Some(meta)) = meta { + let received = meta.received; + let consumed = meta.consumed; + + submit( + influxdb::Point::new("window-stage") + .add_field("consumed", influxdb::Value::Integer(consumed as i64)) + .to_owned(), ); - continue; - } - //exponential backoff - if !repair_backoff(&mut last, &mut times, consumed) { - trace!("{} !repair_backoff() times = {}", id, times); - continue; - } - trace!("{} let's repair! times = {}", id, times); + // Consumed should never be bigger than received + assert!(consumed <= received); + if received == consumed { + trace!( + "{} we have everything received: {} consumed: {}", + id, + received, + consumed + ); + continue; + } - let mut window = window.write().unwrap(); - let reqs = window.repair( - &cluster_info, - &id, - times, - consumed, - received, - tick_height_, - max_entry_height, - &leader_scheduler, - ); - for (to, req) in reqs { - repair_socket.send_to(&req, to).unwrap_or_else(|e| { - info!("{} repair req send_to({}) error {:?}", id, to, e); - 0 - }); + //exponential backoff + if !repair_backoff(&mut last, &mut times, consumed) { + trace!("{} !repair_backoff() times = {}", id, times); + continue; + } + trace!("{} let's repair! times = {}", id, times); + + let reqs = repair( + DEFAULT_SLOT_HEIGHT, + db_ledger.read().unwrap().borrow(), + &cluster_info, + &id, + times, + tick_height_, + max_entry_height, + &leader_scheduler, + ); + + if let Ok(reqs) = reqs { + for (to, req) in reqs { + repair_socket.send_to(&req, to).unwrap_or_else(|e| { + info!("{} repair req send_to({}) error {:?}", id, to, e); + 0 + }); + } + } } } }).unwrap() From ad2de747bad76dc6d45622cb0218cf807a4285e4 Mon Sep 17 00:00:00 2001 From: Carl Date: Tue, 13 Nov 2018 14:23:51 -0800 Subject: [PATCH 03/11] Fix tests --- src/db_window.rs | 7 ++- src/fullnode.rs | 19 ++++-- src/packet.rs | 9 ++- src/replicator.rs | 14 ++++- src/tvu.rs | 13 +++-- src/window_service.rs | 132 +++++++++++------------------------------- tests/multinode.rs | 99 ++++++++++++++++++++++++------- 7 files changed, 160 insertions(+), 133 deletions(-) diff --git a/src/db_window.rs b/src/db_window.rs index 78de5a2c2fa505..659622d56577ff 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -218,9 +218,10 @@ pub fn retransmit_all_leader_blocks( for b in dq { // Check if the blob is from the scheduled leader for its slot. If so, // add to the retransmit_queue - let slot = b.read().unwrap().slot()?; - if let Some(leader_id) = leader_scheduler.get_leader_for_slot(slot) { - add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue); + if let Ok(slot) = b.read().unwrap().slot() { + if let Some(leader_id) = leader_scheduler.get_leader_for_slot(slot) { + add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue); + } } } diff --git a/src/fullnode.rs b/src/fullnode.rs index 79621239129122..e34ae48a46635c 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -3,6 +3,7 @@ use bank::Bank; use broadcast_stage::BroadcastStage; use cluster_info::{ClusterInfo, Node, NodeInfo}; +use db_ledger::DB_LEDGER_DIRECTORY; use leader_scheduler::LeaderScheduler; use ledger::read_ledger; use ncp::Ncp; @@ -281,8 +282,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(ledger_path), - //TODO: pass db path as argument - format!("{}/db_ledger", ledger_path), + format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY), ); let tpu_forwarder = TpuForwarder::new( node.sockets @@ -435,7 +435,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(&self.ledger_path), - format!("{}/db_ledger", self.ledger_path), + format!("{}/{}", self.ledger_path, DB_LEDGER_DIRECTORY), ); let tpu_forwarder = TpuForwarder::new( self.transaction_sockets @@ -627,10 +627,12 @@ impl Service for Fullnode { mod tests { use bank::Bank; use cluster_info::Node; + use db_ledger::*; use fullnode::{Fullnode, FullnodeReturnType, NodeRole, TvuReturnType}; use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; use ledger::{create_tmp_genesis, create_tmp_sample_ledger, LedgerWriter}; use packet::make_consecutive_blobs; + use rocksdb::{Options, DB}; use service::Service; use signature::{Keypair, KeypairUtil}; use std::cmp; @@ -907,7 +909,7 @@ mod tests { // Create validator identity let num_ending_ticks = 1; - let (mint, validator_ledger_path, genesis_entries) = create_tmp_sample_ledger( + let (mint, validator_ledger_path, mut genesis_entries) = create_tmp_sample_ledger( "test_validator_to_leader_transition", 10_000, num_ending_ticks, @@ -944,6 +946,11 @@ mod tests { ledger_writer.write_entries(&active_set_entries).unwrap(); let ledger_initial_len = genesis_entries.len() as u64 + active_set_entries_len; + // Create RocksDb ledger, write genesis entries to it + let db_ledger_path = format!("{}/{}", validator_ledger_path, DB_LEDGER_DIRECTORY); + genesis_entries.extend(active_set_entries); + write_entries_to_ledger(&vec![db_ledger_path.clone()], &genesis_entries); + // Set the leader scheduler for the validator let leader_rotation_interval = 10; let num_bootstrap_slots = 2; @@ -1036,6 +1043,8 @@ mod tests { // Shut down t_responder.join().expect("responder thread join"); validator.close().unwrap(); - remove_dir_all(&validator_ledger_path).unwrap(); + DB::destroy(&Options::default(), &db_ledger_path) + .expect("Expected successful database destruction"); + let _ignored = remove_dir_all(&validator_ledger_path).unwrap(); } } diff --git a/src/packet.rs b/src/packet.rs index e134308698300d..0ee8cfe208075d 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -2,6 +2,7 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use counter::Counter; +use db_ledger::DEFAULT_SLOT_HEIGHT; #[cfg(test)] use entry::Entry; #[cfg(test)] @@ -147,10 +148,14 @@ impl fmt::Debug for Blob { //auto derive doesn't support large arrays impl Default for Blob { fn default() -> Blob { - Blob { + let mut blob = Blob { data: [0u8; BLOB_SIZE], meta: Meta::default(), - } + }; + + blob.set_slot(DEFAULT_SLOT_HEIGHT) + .expect("Should be able to set slot in default blob"); + blob } } diff --git a/src/replicator.rs b/src/replicator.rs index 5386ae97ed973d..ad0338fb4c6489 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -1,5 +1,6 @@ use blob_fetch_stage::BlobFetchStage; use cluster_info::{ClusterInfo, Node, NodeInfo}; +use db_ledger::DbLedger; use leader_scheduler::LeaderScheduler; use ncp::Ncp; use service::Service; @@ -166,12 +167,14 @@ impl Replicator { mod tests { use client::mk_client; use cluster_info::Node; + use db_ledger::DbLedger; use fullnode::Fullnode; use leader_scheduler::LeaderScheduler; use ledger::{create_tmp_genesis, get_tmp_ledger_path, read_ledger}; use logger; use replicator::sample_file; use replicator::Replicator; + use rocksdb::{Options, DB}; use signature::{Keypair, KeypairUtil}; use solana_sdk::hash::Hash; use std::fs::File; @@ -180,7 +183,7 @@ mod tests { use std::mem::size_of; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::Arc; + use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; @@ -229,7 +232,12 @@ mod tests { info!("starting replicator node"); let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey()); + let db_ledger_path = get_tmp_ledger_path("test_replicator_startup"); + let db_ledger = Arc::new(RwLock::new( + DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), + )); let (replicator, _leader_info) = Replicator::new( + db_ledger, entry_height, 1, &exit, @@ -266,6 +274,10 @@ mod tests { exit.store(true, Ordering::Relaxed); replicator.join(); leader.exit(); + + DB::destroy(&Options::default(), &db_ledger_path) + .expect("Expected successful database destuction"); + let _ignored = remove_dir_all(&db_ledger_path); let _ignored = remove_dir_all(&leader_ledger_path); let _ignored = remove_dir_all(&replicator_ledger_path); } diff --git a/src/tvu.rs b/src/tvu.rs index 0a088468876e13..22ec00618a3962 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -14,7 +14,6 @@ use bank::Bank; use blob_fetch_stage::BlobFetchStage; use cluster_info::ClusterInfo; use db_ledger::DbLedger; -use hash::Hash; use ledger_write_stage::LedgerWriteStage; use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; use retransmit_stage::RetransmitStage; @@ -26,7 +25,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread; use storage_stage::{StorageStage, StorageState}; -use window::SharedWindow; #[derive(Debug, PartialEq, Eq, Clone)] pub enum TvuReturnType { @@ -73,7 +71,7 @@ impl Tvu { // Eventually will be passed into LedgerWriteStage as well, so wrap the object // in a Arc let db_ledger = Arc::new(RwLock::new( - DbLedger::open(&db_ledger_path).expect("Expected to be able to open RocksDb ledger"), + DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), )); let exit = Arc::new(AtomicBool::new(false)); @@ -177,13 +175,16 @@ pub mod tests { use cluster_info::{ClusterInfo, Node}; use entry::Entry; use leader_scheduler::LeaderScheduler; + use ledger::get_tmp_ledger_path; use logger; use mint::Mint; use ncp::Ncp; use packet::SharedBlob; + use rocksdb::{Options, DB}; use service::Service; use signature::{Keypair, KeypairUtil}; use solana_sdk::hash::Hash; + use std::fs::remove_dir_all; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; @@ -271,6 +272,7 @@ pub mod tests { let vote_account_keypair = Arc::new(Keypair::new()); let mut cur_hash = Hash::default(); + let db_ledger_path = get_tmp_ledger_path("test_replicate"); let tvu = Tvu::new( Arc::new(target1_keypair), vote_account_keypair, @@ -278,11 +280,11 @@ pub mod tests { 0, cur_hash, cref1, - dr_1.1, target1.sockets.replicate, target1.sockets.repair, target1.sockets.retransmit, None, + db_ledger_path.clone(), ); let mut alice_ref_balance = starting_balance; @@ -355,5 +357,8 @@ pub mod tests { dr_1.0.join().expect("join"); t_receiver.join().expect("join"); t_responder.join().expect("join"); + DB::destroy(&Options::default(), &db_ledger_path) + .expect("Expected successful database destuction"); + let _ignored = remove_dir_all(&db_ledger_path); } } diff --git a/src/window_service.rs b/src/window_service.rs index 8a2a2a4b0f54a8..d4ec1379d0dca8 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -1,6 +1,6 @@ //! The `window_service` provides a thread for maintaining a window (tail of the ledger). //! -use cluster_info::{ClusterInfo, NodeInfo}; +use cluster_info::ClusterInfo; use counter::Counter; use db_ledger::{DbLedger, LedgerColumnFamily, MetaCf, DEFAULT_SLOT_HEIGHT}; use db_window::*; @@ -8,7 +8,6 @@ use entry::EntrySender; use leader_scheduler::LeaderScheduler; use log::Level; -use packet::SharedBlob; use rand::{thread_rng, Rng}; use result::{Error, Result}; use solana_metrics::{influxdb, submit}; @@ -16,7 +15,7 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; use std::borrow::{Borrow, BorrowMut}; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; @@ -55,24 +54,17 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool { fn recv_window( db_ledger: &mut DbLedger, id: &Pubkey, - cluster_info: &Arc>, leader_scheduler: &LeaderScheduler, tick_height: &mut u64, max_ix: u64, r: &BlobReceiver, s: &EntrySender, retransmit: &BlobSender, - pending_retransmits: &mut bool, done: &Arc, ) -> Result<()> { let timer = Duration::from_millis(200); let mut dq = r.recv_timeout(timer)?; - let maybe_leader: Option = cluster_info - .read() - .expect("'cluster_info' read lock in fn recv_window") - .leader_data() - .cloned(); - let leader_unknown = maybe_leader.is_none(); + while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } @@ -98,15 +90,15 @@ fn recv_window( let p = b.read().unwrap(); (p.index()?, p.meta.size) }; + pixs.push(pix); trace!("{} window pix: {} size: {}", id, pix, meta_size); - process_blob( + let _ = process_blob( leader_scheduler, db_ledger, - id, - b, + &b, max_ix, pix, &mut consume_queue, @@ -115,6 +107,11 @@ fn recv_window( ); } + trace!( + "Elapsed processing time in recv_window(): {}", + duration_as_ms(&now.elapsed()) + ); + if !consume_queue.is_empty() { inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len()); s.send(consume_queue)?; @@ -142,21 +139,18 @@ pub fn window_service( let mut tick_height_ = tick_height; let mut last = entry_height; let mut times = 0; - let id = cluster_info.read().unwrap().my_data().id; - let mut pending_retransmits = false; + let id = cluster_info.read().unwrap().id(); trace!("{}: RECV_WINDOW started", id); loop { if let Err(e) = recv_window( db_ledger.write().unwrap().borrow_mut(), &id, - &cluster_info, leader_scheduler.read().unwrap().borrow(), &mut tick_height_, max_entry_height, &r, &s, &retransmit, - &mut pending_retransmits, &done, ) { match e { @@ -233,18 +227,21 @@ pub fn window_service( #[cfg(test)] mod test { use cluster_info::{ClusterInfo, Node}; + use db_ledger::DbLedger; use entry::Entry; use leader_scheduler::LeaderScheduler; + use ledger::get_tmp_ledger_path; use logger; use packet::{make_consecutive_blobs, SharedBlob, PACKET_DATA_SIZE}; + use rocksdb::{Options, DB}; use solana_sdk::hash::Hash; + use std::fs::remove_dir_all; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; use std::time::Duration; use streamer::{blob_receiver, responder}; - use window::default_window; use window_service::{repair_backoff, window_service}; fn get_entries(r: Receiver>, num: &mut usize) { @@ -276,11 +273,14 @@ mod test { let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let (s_window, r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); - let win = Arc::new(RwLock::new(default_window())); let done = Arc::new(AtomicBool::new(false)); + let db_ledger_path = get_tmp_ledger_path("window_send_test"); + let db_ledger = Arc::new(RwLock::new( + DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), + )); let t_window = window_service( + db_ledger, subs, - win, 0, 0, 0, @@ -324,74 +324,13 @@ mod test { t_receiver.join().expect("join"); t_responder.join().expect("join"); t_window.join().expect("join"); + DB::destroy(&Options::default(), &db_ledger_path) + .expect("Expected successful database destuction"); + let _ignored = remove_dir_all(&db_ledger_path); } #[test] - pub fn window_send_no_leader_test() { - logger::setup(); - let tn = Node::new_localhost(); - let exit = Arc::new(AtomicBool::new(false)); - let cluster_info_me = ClusterInfo::new(tn.info.clone()); - let me_id = cluster_info_me.my_data().id; - let subs = Arc::new(RwLock::new(cluster_info_me)); - - let (s_reader, r_reader) = channel(); - let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); - let (s_window, _r_window) = channel(); - let (s_retransmit, r_retransmit) = channel(); - let win = Arc::new(RwLock::new(default_window())); - let done = Arc::new(AtomicBool::new(false)); - let t_window = window_service( - subs.clone(), - win, - 0, - 0, - 0, - r_reader, - s_window, - s_retransmit, - Arc::new(tn.sockets.repair), - // TODO: For now, the window still checks the ClusterInfo for the current leader - // to determine whether to retransmit a block. In the future when we rely on - // the LeaderScheduler for retransmits, this test will need to be rewritten - // because a leader should only be unknown in the window when the write stage - // hasn't yet calculated the leaders for slots in the next epoch (on entries - // at heights that are multiples of seed_rotation_interval in LeaderScheduler) - Arc::new(RwLock::new(LeaderScheduler::default())), - done, - ); - let t_responder = { - let (s_responder, r_responder) = channel(); - let blob_sockets: Vec> = - tn.sockets.replicate.into_iter().map(Arc::new).collect(); - let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); - let mut msgs = Vec::new(); - for v in 0..10 { - let i = 9 - v; - let b = SharedBlob::default(); - { - let mut w = b.write().unwrap(); - w.set_index(i).unwrap(); - w.set_id(&me_id).unwrap(); - assert_eq!(i, w.index().unwrap()); - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.info.ncp); - } - msgs.push(b); - } - s_responder.send(msgs).expect("send"); - t_responder - }; - - assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err()); - exit.store(true, Ordering::Relaxed); - t_receiver.join().expect("join"); - t_responder.join().expect("join"); - t_window.join().expect("join"); - } - - #[test] - pub fn window_send_late_leader_test() { + pub fn window_send_leader_test2() { logger::setup(); let tn = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); @@ -403,11 +342,14 @@ mod test { let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let (s_window, _r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); - let win = Arc::new(RwLock::new(default_window())); let done = Arc::new(AtomicBool::new(false)); + let db_ledger_path = get_tmp_ledger_path("window_send_late_leader_test"); + let db_ledger = Arc::new(RwLock::new( + DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), + )); let t_window = window_service( + db_ledger, subs.clone(), - win, 0, 0, 0, @@ -415,13 +357,7 @@ mod test { s_window, s_retransmit, Arc::new(tn.sockets.repair), - // TODO: For now, the window still checks the ClusterInfo for the current leader - // to determine whether to retransmit a block. In the future when we rely on - // the LeaderScheduler for retransmits, this test will need to be rewritten - // becasue a leader should only be unknown in the window when the write stage - // hasn't yet calculated the leaders for slots in the next epoch (on entries - // at heights that are multiples of seed_rotation_interval in LeaderScheduler) - Arc::new(RwLock::new(LeaderScheduler::default())), + Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), done, ); let t_responder = { @@ -445,10 +381,7 @@ mod test { } s_responder.send(msgs).expect("send"); - assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err()); - subs.write().unwrap().set_leader(me_id); - let mut msgs1 = Vec::new(); for v in 1..5 { let i = 9 + v; @@ -475,6 +408,9 @@ mod test { t_receiver.join().expect("join"); t_responder.join().expect("join"); t_window.join().expect("join"); + DB::destroy(&Options::default(), &db_ledger_path) + .expect("Expected successful database destuction"); + let _ignored = remove_dir_all(&db_ledger_path); } #[test] diff --git a/tests/multinode.rs b/tests/multinode.rs index 9a68508c175d70..9feaf7088e8757 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -2,13 +2,16 @@ extern crate log; extern crate bincode; extern crate chrono; +extern crate rocksdb; extern crate serde_json; extern crate solana; extern crate solana_sdk; +use rocksdb::{Options, DB}; use solana::blob_fetch_stage::BlobFetchStage; use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; use solana::contact_info::ContactInfo; +use solana::db_ledger::{write_entries_to_ledger, DB_LEDGER_DIRECTORY}; use solana::entry::{reconstruct_entries_from_blobs, Entry}; use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; @@ -959,7 +962,7 @@ fn test_leader_validator_basic() { // Make a common mint and a genesis entry for both leader + validator ledgers let num_ending_ticks = 1; - let (mint, leader_ledger_path, genesis_entries) = create_tmp_sample_ledger( + let (mint, leader_ledger_path, mut genesis_entries) = create_tmp_sample_ledger( "test_leader_validator_basic", 10_000, num_ending_ticks, @@ -986,6 +989,20 @@ fn test_leader_validator_basic() { make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); ledger_writer.write_entries(&active_set_entries).unwrap(); + // Create RocksDb ledgers, write genesis entries to them + let db_leader_ledger_path = format!("{}/{}", leader_ledger_path, DB_LEDGER_DIRECTORY); + let db_validator_ledger_path = format!("{}/{}", validator_ledger_path, DB_LEDGER_DIRECTORY); + + // Write the validator entries to the validator database, they + // will have repair the missing leader "active_set_entries" + write_entries_to_ledger(&vec![db_validator_ledger_path.clone()], &genesis_entries); + + // Next write the leader entries to the leader + genesis_entries.extend(active_set_entries); + write_entries_to_ledger(&vec![db_leader_ledger_path.clone()], &genesis_entries); + + let db_ledger_paths = vec![db_leader_ledger_path, db_validator_ledger_path]; + // Create the leader scheduler config let num_bootstrap_slots = 2; let bootstrap_height = num_bootstrap_slots * leader_rotation_interval; @@ -1086,6 +1103,10 @@ fn test_leader_validator_basic() { assert!(min_len >= bootstrap_height); + for path in db_ledger_paths { + DB::destroy(&Options::default(), &path).expect("Expected successful database destruction"); + } + for path in ledger_paths { remove_dir_all(path).unwrap(); } @@ -1287,7 +1308,7 @@ fn test_full_leader_validator_network() { // Make a common mint and a genesis entry for both leader + validator's ledgers let num_ending_ticks = 1; - let (mint, bootstrap_leader_ledger_path, genesis_entries) = create_tmp_sample_ledger( + let (mint, bootstrap_leader_ledger_path, mut genesis_entries) = create_tmp_sample_ledger( "test_full_leader_validator_network", 10_000, num_ending_ticks, @@ -1334,8 +1355,21 @@ fn test_full_leader_validator_network() { .expect("expected at least one genesis entry") .id; ledger_writer.write_entries(&bootstrap_entries).unwrap(); + genesis_entries.extend(bootstrap_entries); } + // Create RocksDb ledger for bootstrap leader, write genesis entries to them + let db_bootstrap_leader_ledger_path = + format!("{}/{}", bootstrap_leader_ledger_path, DB_LEDGER_DIRECTORY); + + // Write the validator entries to the validator databases. + write_entries_to_ledger( + &vec![db_bootstrap_leader_ledger_path.clone()], + &genesis_entries, + ); + + let mut db_ledger_paths = vec![db_bootstrap_leader_ledger_path]; + // Create the common leader scheduling configuration let num_slots_per_epoch = (N + 1) as u64; let num_bootstrap_slots = 2; @@ -1346,28 +1380,20 @@ fn test_full_leader_validator_network() { Some(bootstrap_height), Some(leader_rotation_interval), Some(seed_rotation_interval), - Some(leader_rotation_interval), + Some(100), ); let exit = Arc::new(AtomicBool::new(false)); - // Start the bootstrap leader fullnode - let bootstrap_leader = Arc::new(RwLock::new(Fullnode::new( - bootstrap_leader_node, - &bootstrap_leader_ledger_path, - Arc::new(node_keypairs.pop_front().unwrap()), - Arc::new(vote_account_keypairs.pop_front().unwrap()), - Some(bootstrap_leader_info.ncp), - false, - LeaderScheduler::new(&leader_scheduler_config), - None, - ))); - let mut nodes: Vec>> = vec![bootstrap_leader.clone()]; - let mut t_nodes = vec![run_node( - bootstrap_leader_info.id, - bootstrap_leader, - exit.clone(), - )]; + // Postpone starting the leader until after the validators are up and running + // to avoid + // 1) Scenario where leader rotates before validators can start up + // 2) Modifying the leader ledger which validators are going to be copying + // during startup + let leader_keypair = node_keypairs.pop_front().unwrap(); + let leader_vote_keypair = vote_account_keypairs.pop_front().unwrap(); + let mut nodes: Vec>> = vec![]; + let mut t_nodes = vec![]; // Start up the validators for kp in node_keypairs.into_iter() { @@ -1376,6 +1402,15 @@ fn test_full_leader_validator_network() { "test_full_leader_validator_network", ); ledger_paths.push(validator_ledger_path.clone()); + + // Create RocksDb ledgers, write genesis entries to them + let db_validator_ledger_path = format!("{}/{}", validator_ledger_path, DB_LEDGER_DIRECTORY); + db_ledger_paths.push(db_validator_ledger_path.clone()); + + // Write the validator entries to the validator database, they + // will have repair the missing leader "active_set_entries" + write_entries_to_ledger(&vec![db_validator_ledger_path.clone()], &genesis_entries); + let validator_id = kp.pubkey(); let validator_node = Node::new_localhost_with_pubkey(validator_id); let validator = Arc::new(RwLock::new(Fullnode::new( @@ -1393,6 +1428,25 @@ fn test_full_leader_validator_network() { t_nodes.push(run_node(validator_id, validator, exit.clone())); } + // Start up the bootstrap leader + let bootstrap_leader = Arc::new(RwLock::new(Fullnode::new( + bootstrap_leader_node, + &bootstrap_leader_ledger_path, + Arc::new(leader_keypair), + Arc::new(leader_vote_keypair), + Some(bootstrap_leader_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + None, + ))); + + nodes.push(bootstrap_leader.clone()); + t_nodes.push(run_node( + bootstrap_leader_info.id, + bootstrap_leader, + exit.clone(), + )); + // Wait for convergence let num_converged = converge(&bootstrap_leader_info, N + 1).len(); assert_eq!(num_converged, N + 1); @@ -1495,6 +1549,11 @@ fn test_full_leader_validator_network() { } assert!(shortest.unwrap() >= target_height); + + for path in db_ledger_paths { + DB::destroy(&Options::default(), &path).expect("Expected successful database destruction"); + } + for path in ledger_paths { remove_dir_all(path).unwrap(); } From 740273929e546c6fa6294072f2e4343ffa3c06f8 Mon Sep 17 00:00:00 2001 From: Carl Date: Tue, 20 Nov 2018 20:18:37 -0800 Subject: [PATCH 04/11] Make note of change in db_window --- src/db_window.rs | 22 ++++------------------ src/packet.rs | 9 ++------- src/tvu.rs | 4 ++-- 3 files changed, 8 insertions(+), 27 deletions(-) diff --git a/src/db_window.rs b/src/db_window.rs index 659622d56577ff..64ba30e87efbb5 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -274,6 +274,8 @@ pub fn process_blob( let is_coding = blob.read().unwrap().is_coding(); // Check if the blob is in the range of our known leaders. If not, we return. + // TODO: Need to update slot in broadcast, otherwise this check will fail with + // leader rotation enabled let slot = blob.read().unwrap().slot()?; let leader = leader_scheduler.get_leader_for_slot(slot); @@ -293,27 +295,11 @@ pub fn process_blob( )?; vec![] } else { - let data_key = ErasureCf::key(slot, pix); + let data_key = DataCf::key(slot, pix); db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())? }; - // TODO: Once erasure is fixed, add those to the consumed queue as well - /*#[cfg(feature = "erasure")] - { - let window_size = self.window_size(); - if erasure::recover(id, self, *consumed, (*consumed % window_size) as usize).is_err() { - trace!("{}: erasure::recover failed", id); - } - } - // Check that we can get the entries from the blobs - if let Ok(entries) = reconstruct_entries_from_blobs(vec![consumed_blob_queue]) { - for entry in &entries { - *tick_height += entry.is_tick() as u64; - } - - consume_queue.extend(entries); - }*/ - + // TODO: Once erasure is fixed, readd that logic here for entry in &consumed_entries { *tick_height += entry.is_tick() as u64; } diff --git a/src/packet.rs b/src/packet.rs index 0ee8cfe208075d..e134308698300d 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -2,7 +2,6 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use counter::Counter; -use db_ledger::DEFAULT_SLOT_HEIGHT; #[cfg(test)] use entry::Entry; #[cfg(test)] @@ -148,14 +147,10 @@ impl fmt::Debug for Blob { //auto derive doesn't support large arrays impl Default for Blob { fn default() -> Blob { - let mut blob = Blob { + Blob { data: [0u8; BLOB_SIZE], meta: Meta::default(), - }; - - blob.set_slot(DEFAULT_SLOT_HEIGHT) - .expect("Should be able to set slot in default blob"); - blob + } } } diff --git a/src/tvu.rs b/src/tvu.rs index 22ec00618a3962..6a225c2df685c3 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -68,8 +68,8 @@ impl Tvu { ledger_path: Option<&str>, db_ledger_path: String, ) -> Self { - // Eventually will be passed into LedgerWriteStage as well, so wrap the object - // in a Arc + // Eventually will be passed into LedgerWriteStage as well to replace the ledger, + // so wrap the object in a Arc let db_ledger = Arc::new(RwLock::new( DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), )); From d3aef1029543e294bb0a66dfd4064236f43f2e9c Mon Sep 17 00:00:00 2001 From: Carl Date: Wed, 21 Nov 2018 03:40:57 -0800 Subject: [PATCH 05/11] Create RocksDb ledger in bin/fullnode --- src/bin/fullnode.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 90fa50f087d705..7d0a242ba46f10 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -6,13 +6,17 @@ extern crate log; extern crate serde_json; #[macro_use] extern crate solana; +extern crate rocksdb; extern crate solana_metrics; use clap::{App, Arg}; +use rocksdb::{Options, DB}; use solana::client::mk_client; use solana::cluster_info::{Node, FULLNODE_PORT_RANGE}; +use solana::db_ledger::{write_entries_to_ledger, DB_LEDGER_DIRECTORY}; use solana::fullnode::{Config, Fullnode, FullnodeReturnType}; use solana::leader_scheduler::LeaderScheduler; +use solana::ledger::read_ledger; use solana::logger; use solana::netutil::find_available_port_in_range; use solana::signature::{Keypair, KeypairUtil}; @@ -92,6 +96,17 @@ fn main() { let ledger_path = matches.value_of("ledger").unwrap(); + // Create the RocksDb ledger + let db_ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY); + // Destroy old ledger + DB::destroy(&Options::default(), &db_ledger_path) + .expect("Expected successful database destruction"); + let ledger_entries: Vec<_> = read_ledger(ledger_path, true) + .expect("opening ledger") + .map(|entry| entry.unwrap()) + .collect(); + write_entries_to_ledger(&vec![db_ledger_path], &ledger_entries[..]); + // socketaddr that is initial pointer into the network's gossip (ncp) let network = matches .value_of("network") From 1a1c62709aad26159c4d092bac984e8aa5192502 Mon Sep 17 00:00:00 2001 From: Carl Date: Thu, 22 Nov 2018 16:23:16 -0800 Subject: [PATCH 06/11] Make db_ledger functions generic --- src/bin/fullnode.rs | 3 ++- src/db_ledger.rs | 43 ++++++++++++++++++++++++++++++------------- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 7d0a242ba46f10..9d96334b4bb44b 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -96,7 +96,8 @@ fn main() { let ledger_path = matches.value_of("ledger").unwrap(); - // Create the RocksDb ledger + // Create the RocksDb ledger, eventually will simply repurpose the input + // ledger path as the RocksDb ledger path let db_ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY); // Destroy old ledger DB::destroy(&Options::default(), &db_ledger_path) diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 7012880506d57e..19f204de8ac3e4 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -5,13 +5,15 @@ use bincode::{deserialize, serialize}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use entry::Entry; -use ledger::Block; use packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use result::{Error, Result}; use rocksdb::{ColumnFamily, Options, WriteBatch, DB}; use serde::de::DeserializeOwned; use serde::Serialize; +use solana_sdk::pubkey::Pubkey; +use std::borrow::Borrow; use std::io; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; pub const DB_LEDGER_DIRECTORY: &str = "db_ledger"; @@ -260,10 +262,19 @@ impl DbLedger { }) } - pub fn write_shared_blobs(&mut self, slot: u64, shared_blobs: &[SharedBlob]) -> Result<()> { - let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); - let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - self.write_blobs(slot, &blobs) + pub fn write_shared_blobs<'a, I>(&mut self, slot: u64, shared_blobs: I) -> Result<()> + where + I: IntoIterator, + I::Item: Borrow, + { + for b in shared_blobs { + let bl = b.borrow().read().unwrap(); + let index = bl.index()?; + let key = DataCf::key(slot, index); + self.insert_data_blob(&key, &*bl)?; + } + + Ok(()) } pub fn write_blobs<'a, I>(&mut self, slot: u64, blobs: I) -> Result<()> @@ -278,12 +289,15 @@ impl DbLedger { Ok(()) } - pub fn write_entries(&mut self, slot: u64, entries: &[Entry]) -> Result<()> { - let shared_blobs = entries.to_blobs(); - let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); - let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - self.write_blobs(slot, &blobs)?; - Ok(()) + pub fn write_entries<'a, I>(&mut self, slot: u64, entries: I) -> Result<()> + where + I: IntoIterator, + { + let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); + let shared_blobs = entries.into_iter().zip(0..).map(|(entry, idx)| { + entry.to_blob(Some(idx), Some(Pubkey::default()), Some(&default_addr)) + }); + self.write_shared_blobs(slot, shared_blobs) } pub fn insert_data_blob(&self, key: &[u8], new_blob: &Blob) -> Result> { @@ -421,12 +435,15 @@ impl DbLedger { } } -pub fn write_entries_to_ledger(ledger_paths: &[String], entries: &[Entry]) { +pub fn write_entries_to_ledger<'a, I>(ledger_paths: &[String], entries: I) +where + I: IntoIterator + Copy, +{ for ledger_path in ledger_paths { let mut db_ledger = DbLedger::open(ledger_path).expect("Expected to be able to open database ledger"); db_ledger - .write_entries(DEFAULT_SLOT_HEIGHT, &entries) + .write_entries(DEFAULT_SLOT_HEIGHT, entries) .expect("Expected successful write of genesis entries"); } } From ac902881674f5335686d452303d1cfb33088a1fc Mon Sep 17 00:00:00 2001 From: Carl Date: Thu, 22 Nov 2018 16:34:40 -0800 Subject: [PATCH 07/11] Add db_ledger to bin/replicator --- src/bin/fullnode.rs | 2 +- src/bin/replicator.rs | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 9d96334b4bb44b..33d14ab2b64e6c 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -106,7 +106,7 @@ fn main() { .expect("opening ledger") .map(|entry| entry.unwrap()) .collect(); - write_entries_to_ledger(&vec![db_ledger_path], &ledger_entries[..]); + write_entries_to_ledger(&[db_ledger_path], &ledger_entries[..]); // socketaddr that is initial pointer into the network's gossip (ncp) let network = matches diff --git a/src/bin/replicator.rs b/src/bin/replicator.rs index 165fa1c89b8227..e8c4030a7767d1 100644 --- a/src/bin/replicator.rs +++ b/src/bin/replicator.rs @@ -10,6 +10,7 @@ use clap::{App, Arg}; use solana::chacha::{chacha_cbc_encrypt_file, CHACHA_BLOCK_SIZE}; use solana::client::mk_client; use solana::cluster_info::Node; +use solana::db_ledger::{DbLedger, DB_LEDGER_DIRECTORY}; use solana::fullnode::Config; use solana::ledger::LEDGER_DATA_FILE; use solana::logger; @@ -23,7 +24,7 @@ use std::net::{Ipv4Addr, SocketAddr}; use std::path::Path; use std::process::exit; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; @@ -94,7 +95,14 @@ fn main() { // TODO: ask network what slice we should store let entry_height = 0; + // Create the RocksDb ledger, eventually will simply repurpose the input + // ledger path as the RocksDb ledger path + let db_ledger_path = format!("{}/{}", ledger_path.unwrap(), DB_LEDGER_DIRECTORY); + let db_ledger = Arc::new(RwLock::new( + DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), + )); let (replicator, leader_info) = Replicator::new( + db_ledger, entry_height, 5, &exit, From 00a8b7ea454f2d9718e491c862acd31f38348222 Mon Sep 17 00:00:00 2001 From: Carl Date: Thu, 22 Nov 2018 17:10:35 -0800 Subject: [PATCH 08/11] Fix clippy errors --- src/db_ledger.rs | 2 +- src/fullnode.rs | 4 ++-- src/tvu.rs | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 19f204de8ac3e4..e47666d303027d 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -262,7 +262,7 @@ impl DbLedger { }) } - pub fn write_shared_blobs<'a, I>(&mut self, slot: u64, shared_blobs: I) -> Result<()> + pub fn write_shared_blobs(&mut self, slot: u64, shared_blobs: I) -> Result<()> where I: IntoIterator, I::Item: Borrow, diff --git a/src/fullnode.rs b/src/fullnode.rs index e34ae48a46635c..752db1bf4ddcce 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -282,7 +282,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(ledger_path), - format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY), + &format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY), ); let tpu_forwarder = TpuForwarder::new( node.sockets @@ -435,7 +435,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(&self.ledger_path), - format!("{}/{}", self.ledger_path, DB_LEDGER_DIRECTORY), + &format!("{}/{}", self.ledger_path, DB_LEDGER_DIRECTORY), ); let tpu_forwarder = TpuForwarder::new( self.transaction_sockets diff --git a/src/tvu.rs b/src/tvu.rs index 6a225c2df685c3..05ed46e11434e8 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -66,12 +66,12 @@ impl Tvu { repair_socket: UdpSocket, retransmit_socket: UdpSocket, ledger_path: Option<&str>, - db_ledger_path: String, + db_ledger_path: &str, ) -> Self { // Eventually will be passed into LedgerWriteStage as well to replace the ledger, // so wrap the object in a Arc let db_ledger = Arc::new(RwLock::new( - DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), + DbLedger::open(db_ledger_path).expect("Expected to be able to open database ledger"), )); let exit = Arc::new(AtomicBool::new(false)); @@ -284,7 +284,7 @@ pub mod tests { target1.sockets.repair, target1.sockets.retransmit, None, - db_ledger_path.clone(), + &db_ledger_path.clone(), ); let mut alice_ref_balance = starting_balance; From e8b80e4e3cb0c44f21531d35f51da11932812e2e Mon Sep 17 00:00:00 2001 From: Carl Date: Fri, 23 Nov 2018 00:12:48 -0800 Subject: [PATCH 09/11] Use enumerate() where apt --- src/db_ledger.rs | 10 +++++++--- src/db_window.rs | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/db_ledger.rs b/src/db_ledger.rs index e47666d303027d..244c3bd680269e 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -294,8 +294,12 @@ impl DbLedger { I: IntoIterator, { let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); - let shared_blobs = entries.into_iter().zip(0..).map(|(entry, idx)| { - entry.to_blob(Some(idx), Some(Pubkey::default()), Some(&default_addr)) + let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| { + entry.to_blob( + Some(idx as u64), + Some(Pubkey::default()), + Some(&default_addr), + ) }); self.write_shared_blobs(slot, shared_blobs) } @@ -661,7 +665,7 @@ mod tests { let num_entries = 8; let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); - for (b, i) in shared_blobs.iter().zip(0..num_entries) { + for (i, b) in shared_blobs.iter().enumerate() { b.write().unwrap().set_index(1 << (i * 8)).unwrap(); } diff --git a/src/db_window.rs b/src/db_window.rs index 64ba30e87efbb5..1e08164d644948 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -531,8 +531,8 @@ mod test { assert!(gap > 3); let num_entries = 10; let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); - for (b, i) in shared_blobs.iter().zip(0..shared_blobs.len() as u64) { - b.write().unwrap().set_index(i * gap).unwrap(); + for (i, b) in shared_blobs.iter().enumerate() { + b.write().unwrap().set_index(i as u64 * gap).unwrap(); } let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); From bf3501932d6a7805127dde29ea6dbf96e9395db1 Mon Sep 17 00:00:00 2001 From: Carl Date: Fri, 23 Nov 2018 18:29:57 -0800 Subject: [PATCH 10/11] Encapsulate RocksDb destroy, move opening of RocksDb ledger into fullnode, clean up tests --- src/bin/fullnode.rs | 16 ------ src/bin/replicator.rs | 5 +- src/db_ledger.rs | 39 ++++++++------ src/db_window.rs | 1 + src/fullnode.rs | 118 ++++++++++++++++++++++++++---------------- src/ledger.rs | 18 ++++++- src/tvu.rs | 13 ++--- tests/multinode.rs | 75 +++------------------------ 8 files changed, 128 insertions(+), 157 deletions(-) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 33d14ab2b64e6c..90fa50f087d705 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -6,17 +6,13 @@ extern crate log; extern crate serde_json; #[macro_use] extern crate solana; -extern crate rocksdb; extern crate solana_metrics; use clap::{App, Arg}; -use rocksdb::{Options, DB}; use solana::client::mk_client; use solana::cluster_info::{Node, FULLNODE_PORT_RANGE}; -use solana::db_ledger::{write_entries_to_ledger, DB_LEDGER_DIRECTORY}; use solana::fullnode::{Config, Fullnode, FullnodeReturnType}; use solana::leader_scheduler::LeaderScheduler; -use solana::ledger::read_ledger; use solana::logger; use solana::netutil::find_available_port_in_range; use solana::signature::{Keypair, KeypairUtil}; @@ -96,18 +92,6 @@ fn main() { let ledger_path = matches.value_of("ledger").unwrap(); - // Create the RocksDb ledger, eventually will simply repurpose the input - // ledger path as the RocksDb ledger path - let db_ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY); - // Destroy old ledger - DB::destroy(&Options::default(), &db_ledger_path) - .expect("Expected successful database destruction"); - let ledger_entries: Vec<_> = read_ledger(ledger_path, true) - .expect("opening ledger") - .map(|entry| entry.unwrap()) - .collect(); - write_entries_to_ledger(&[db_ledger_path], &ledger_entries[..]); - // socketaddr that is initial pointer into the network's gossip (ncp) let network = matches .value_of("network") diff --git a/src/bin/replicator.rs b/src/bin/replicator.rs index e8c4030a7767d1..6a8ba9450106c1 100644 --- a/src/bin/replicator.rs +++ b/src/bin/replicator.rs @@ -10,7 +10,7 @@ use clap::{App, Arg}; use solana::chacha::{chacha_cbc_encrypt_file, CHACHA_BLOCK_SIZE}; use solana::client::mk_client; use solana::cluster_info::Node; -use solana::db_ledger::{DbLedger, DB_LEDGER_DIRECTORY}; +use solana::db_ledger::DbLedger; use solana::fullnode::Config; use solana::ledger::LEDGER_DATA_FILE; use solana::logger; @@ -97,9 +97,8 @@ fn main() { // Create the RocksDb ledger, eventually will simply repurpose the input // ledger path as the RocksDb ledger path - let db_ledger_path = format!("{}/{}", ledger_path.unwrap(), DB_LEDGER_DIRECTORY); let db_ledger = Arc::new(RwLock::new( - DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), + DbLedger::open(&ledger_path.unwrap()).expect("Expected to be able to open database ledger"), )); let (replicator, leader_info) = Replicator::new( db_ledger, diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 244c3bd680269e..3d0c7ab4310c80 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -234,6 +234,8 @@ pub const ERASURE_CF: &str = "erasure"; impl DbLedger { // Opens a Ledger in directory, provides "infinite" window of blobs pub fn open(ledger_path: &str) -> Result { + let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY); + // Use default database options let mut options = Options::default(); options.create_if_missing(true); @@ -262,6 +264,12 @@ impl DbLedger { }) } + pub fn destroy(ledger_path: &str) -> Result<()> { + let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY); + DB::destroy(&Options::default(), &ledger_path)?; + Ok(()) + } + pub fn write_shared_blobs(&mut self, slot: u64, shared_blobs: I) -> Result<()> where I: IntoIterator, @@ -289,13 +297,14 @@ impl DbLedger { Ok(()) } - pub fn write_entries<'a, I>(&mut self, slot: u64, entries: I) -> Result<()> + pub fn write_entries(&mut self, slot: u64, entries: I) -> Result<()> where - I: IntoIterator, + I: IntoIterator, + I::Item: Borrow, { let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| { - entry.to_blob( + entry.borrow().to_blob( Some(idx as u64), Some(Pubkey::default()), Some(&default_addr), @@ -439,15 +448,17 @@ impl DbLedger { } } -pub fn write_entries_to_ledger<'a, I>(ledger_paths: &[String], entries: I) +pub fn write_entries_to_ledger(ledger_paths: &[&str], entries: I) where - I: IntoIterator + Copy, + I: IntoIterator, + I::Item: Borrow, { + let mut entries = entries.into_iter(); for ledger_path in ledger_paths { let mut db_ledger = DbLedger::open(ledger_path).expect("Expected to be able to open database ledger"); db_ledger - .write_entries(DEFAULT_SLOT_HEIGHT, entries) + .write_entries(DEFAULT_SLOT_HEIGHT, entries.by_ref()) .expect("Expected successful write of genesis entries"); } } @@ -456,7 +467,6 @@ where mod tests { use super::*; use ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block}; - use rocksdb::{Options, DB}; #[test] fn test_put_get_simple() { @@ -506,8 +516,7 @@ mod tests { // Destroying database without closing it first is undefined behavior drop(ledger); - DB::destroy(&Options::default(), &ledger_path) - .expect("Expected successful database destruction"); + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); } #[test] @@ -569,8 +578,7 @@ mod tests { // Destroying database without closing it first is undefined behavior drop(ledger); - DB::destroy(&Options::default(), &ledger_path) - .expect("Expected successful database destruction"); + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); } #[test] @@ -612,8 +620,7 @@ mod tests { // Destroying database without closing it first is undefined behavior drop(ledger); - DB::destroy(&Options::default(), &ledger_path) - .expect("Expected successful database destruction"); + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); } #[test] @@ -649,8 +656,7 @@ mod tests { // Destroying database without closing it first is undefined behavior drop(ledger); - DB::destroy(&Options::default(), &ledger_path) - .expect("Expected successful database destruction"); + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); } #[test] @@ -689,7 +695,6 @@ mod tests { db_iterator.next(); } } - DB::destroy(&Options::default(), &db_ledger_path) - .expect("Expected successful database destruction"); + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); } } diff --git a/src/db_window.rs b/src/db_window.rs index 1e08164d644948..413fe67f07882c 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -276,6 +276,7 @@ pub fn process_blob( // Check if the blob is in the range of our known leaders. If not, we return. // TODO: Need to update slot in broadcast, otherwise this check will fail with // leader rotation enabled + // Github issue: https://github.com/solana-labs/solana/issues/1899. let slot = blob.read().unwrap().slot()?; let leader = leader_scheduler.get_leader_for_slot(slot); diff --git a/src/fullnode.rs b/src/fullnode.rs index 752db1bf4ddcce..9f61681d3ca95d 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -3,7 +3,7 @@ use bank::Bank; use broadcast_stage::BroadcastStage; use cluster_info::{ClusterInfo, Node, NodeInfo}; -use db_ledger::DB_LEDGER_DIRECTORY; +use db_ledger::{write_entries_to_ledger, DbLedger}; use leader_scheduler::LeaderScheduler; use ledger::read_ledger; use ncp::Ncp; @@ -107,6 +107,7 @@ pub struct Fullnode { broadcast_socket: UdpSocket, rpc_addr: SocketAddr, rpc_pubsub_addr: SocketAddr, + db_ledger: Arc>, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -259,6 +260,10 @@ impl Fullnode { .expect("Leader not known after processing bank"); cluster_info.write().unwrap().set_leader(scheduled_leader); + + // Create the RocksDb ledger + let db_ledger = Self::make_db_ledger(ledger_path); + let node_role = if scheduled_leader != keypair.pubkey() { // Start in validator mode. let tvu = Tvu::new( @@ -282,7 +287,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(ledger_path), - &format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY), + db_ledger.clone(), ); let tpu_forwarder = TpuForwarder::new( node.sockets @@ -353,6 +358,7 @@ impl Fullnode { broadcast_socket: node.sockets.broadcast, rpc_addr, rpc_pubsub_addr, + db_ledger, } } @@ -435,7 +441,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(&self.ledger_path), - &format!("{}/{}", self.ledger_path, DB_LEDGER_DIRECTORY), + self.db_ledger.clone(), ); let tpu_forwarder = TpuForwarder::new( self.transaction_sockets @@ -590,6 +596,19 @@ impl Fullnode { ), ) } + + fn make_db_ledger(ledger_path: &str) -> Arc> { + // Destroy any existing instances of the RocksDb ledger + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); + let ledger_entries = read_ledger(ledger_path, true) + .expect("opening ledger") + .map(|entry| entry.unwrap()); + + write_entries_to_ledger(&[ledger_path], ledger_entries); + let db = + DbLedger::open(ledger_path).expect("Expected to successfully open database ledger"); + Arc::new(RwLock::new(db)) + } } impl Service for Fullnode { @@ -630,9 +649,8 @@ mod tests { use db_ledger::*; use fullnode::{Fullnode, FullnodeReturnType, NodeRole, TvuReturnType}; use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; - use ledger::{create_tmp_genesis, create_tmp_sample_ledger, LedgerWriter}; + use ledger::{create_tmp_genesis, create_tmp_sample_ledger, tmp_copy_ledger, LedgerWriter}; use packet::make_consecutive_blobs; - use rocksdb::{Options, DB}; use service::Service; use signature::{Keypair, KeypairUtil}; use std::cmp; @@ -842,6 +860,13 @@ mod tests { + num_ending_ticks as u64; ledger_writer.write_entries(&active_set_entries).unwrap(); + let validator_ledger_path = + tmp_copy_ledger(&bootstrap_leader_ledger_path, "test_wrong_role_transition"); + let ledger_paths = vec![ + bootstrap_leader_ledger_path.clone(), + validator_ledger_path.clone(), + ]; + // Create the common leader scheduling configuration let num_slots_per_epoch = 3; let leader_rotation_interval = 5; @@ -858,45 +883,53 @@ mod tests { Some(genesis_tick_height), ); - // Test that a node knows to transition to a validator based on parsing the ledger - let leader_vote_account_keypair = Arc::new(Keypair::new()); - let bootstrap_leader = Fullnode::new( - bootstrap_leader_node, - &bootstrap_leader_ledger_path, - bootstrap_leader_keypair, - leader_vote_account_keypair, - Some(bootstrap_leader_info.ncp), - false, - LeaderScheduler::new(&leader_scheduler_config), - None, - ); + { + // Test that a node knows to transition to a validator based on parsing the ledger + let leader_vote_account_keypair = Arc::new(Keypair::new()); + let bootstrap_leader = Fullnode::new( + bootstrap_leader_node, + &bootstrap_leader_ledger_path, + bootstrap_leader_keypair, + leader_vote_account_keypair, + Some(bootstrap_leader_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + None, + ); - match bootstrap_leader.node_role { - Some(NodeRole::Validator(_)) => (), - _ => { - panic!("Expected bootstrap leader to be a validator"); + match bootstrap_leader.node_role { + Some(NodeRole::Validator(_)) => (), + _ => { + panic!("Expected bootstrap leader to be a validator"); + } } - } - // Test that a node knows to transition to a leader based on parsing the ledger - let validator = Fullnode::new( - validator_node, - &bootstrap_leader_ledger_path, - Arc::new(validator_keypair), - Arc::new(validator_vote_account_keypair), - Some(bootstrap_leader_info.ncp), - false, - LeaderScheduler::new(&leader_scheduler_config), - None, - ); + // Test that a node knows to transition to a leader based on parsing the ledger + let validator = Fullnode::new( + validator_node, + &validator_ledger_path, + Arc::new(validator_keypair), + Arc::new(validator_vote_account_keypair), + Some(bootstrap_leader_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + None, + ); - match validator.node_role { - Some(NodeRole::Leader(_)) => (), - _ => { - panic!("Expected node to be the leader"); + match validator.node_role { + Some(NodeRole::Leader(_)) => (), + _ => { + panic!("Expected node to be the leader"); + } } + + validator.close().expect("Expected node to close"); + bootstrap_leader.close().expect("Expected node to close"); + } + for path in ledger_paths { + DbLedger::destroy(&path).expect("Expected successful database destruction"); + let _ignored = remove_dir_all(&path); } - let _ignored = remove_dir_all(&bootstrap_leader_ledger_path); } #[test] @@ -909,7 +942,7 @@ mod tests { // Create validator identity let num_ending_ticks = 1; - let (mint, validator_ledger_path, mut genesis_entries) = create_tmp_sample_ledger( + let (mint, validator_ledger_path, genesis_entries) = create_tmp_sample_ledger( "test_validator_to_leader_transition", 10_000, num_ending_ticks, @@ -946,11 +979,6 @@ mod tests { ledger_writer.write_entries(&active_set_entries).unwrap(); let ledger_initial_len = genesis_entries.len() as u64 + active_set_entries_len; - // Create RocksDb ledger, write genesis entries to it - let db_ledger_path = format!("{}/{}", validator_ledger_path, DB_LEDGER_DIRECTORY); - genesis_entries.extend(active_set_entries); - write_entries_to_ledger(&vec![db_ledger_path.clone()], &genesis_entries); - // Set the leader scheduler for the validator let leader_rotation_interval = 10; let num_bootstrap_slots = 2; @@ -1043,7 +1071,7 @@ mod tests { // Shut down t_responder.join().expect("responder thread join"); validator.close().unwrap(); - DB::destroy(&Options::default(), &db_ledger_path) + DbLedger::destroy(&validator_ledger_path) .expect("Expected successful database destruction"); let _ignored = remove_dir_all(&validator_ledger_path).unwrap(); } diff --git a/src/ledger.rs b/src/ledger.rs index e4f4f0e37a9da0..578fb62970541e 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -13,7 +13,7 @@ use rayon::prelude::*; use signature::{Keypair, KeypairUtil}; use solana_sdk::hash::{hash, Hash}; use solana_sdk::pubkey::Pubkey; -use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; +use std::fs::{copy, create_dir_all, remove_dir_all, File, OpenOptions}; use std::io::prelude::*; use std::io::{self, BufReader, BufWriter, Seek, SeekFrom}; use std::mem::size_of; @@ -638,6 +638,22 @@ pub fn create_tmp_sample_ledger( (mint, path, genesis) } +pub fn tmp_copy_ledger(from: &str, name: &str) -> String { + let tostr = get_tmp_ledger_path(name); + + { + let to = Path::new(&tostr); + let from = Path::new(&from); + + create_dir_all(to).unwrap(); + + copy(from.join("data"), to.join("data")).unwrap(); + copy(from.join("index"), to.join("index")).unwrap(); + } + + tostr +} + pub fn make_tiny_test_entries(num: usize) -> Vec { let zero = Hash::default(); let one = hash(&zero.as_ref()); diff --git a/src/tvu.rs b/src/tvu.rs index 05ed46e11434e8..e5e69c37a0b8fb 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -66,14 +66,8 @@ impl Tvu { repair_socket: UdpSocket, retransmit_socket: UdpSocket, ledger_path: Option<&str>, - db_ledger_path: &str, + db_ledger: Arc>, ) -> Self { - // Eventually will be passed into LedgerWriteStage as well to replace the ledger, - // so wrap the object in a Arc - let db_ledger = Arc::new(RwLock::new( - DbLedger::open(db_ledger_path).expect("Expected to be able to open database ledger"), - )); - let exit = Arc::new(AtomicBool::new(false)); let repair_socket = Arc::new(repair_socket); @@ -173,6 +167,7 @@ pub mod tests { use bank::Bank; use bincode::serialize; use cluster_info::{ClusterInfo, Node}; + use db_ledger::DbLedger; use entry::Entry; use leader_scheduler::LeaderScheduler; use ledger::get_tmp_ledger_path; @@ -273,6 +268,8 @@ pub mod tests { let vote_account_keypair = Arc::new(Keypair::new()); let mut cur_hash = Hash::default(); let db_ledger_path = get_tmp_ledger_path("test_replicate"); + let db_ledger = + DbLedger::open(&db_ledger_path).expect("Expected to successfully open ledger"); let tvu = Tvu::new( Arc::new(target1_keypair), vote_account_keypair, @@ -284,7 +281,7 @@ pub mod tests { target1.sockets.repair, target1.sockets.retransmit, None, - &db_ledger_path.clone(), + Arc::new(RwLock::new(db_ledger)), ); let mut alice_ref_balance = starting_balance; diff --git a/tests/multinode.rs b/tests/multinode.rs index 9feaf7088e8757..9caf30e5467a2f 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -2,21 +2,19 @@ extern crate log; extern crate bincode; extern crate chrono; -extern crate rocksdb; extern crate serde_json; extern crate solana; extern crate solana_sdk; -use rocksdb::{Options, DB}; use solana::blob_fetch_stage::BlobFetchStage; use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; use solana::contact_info::ContactInfo; -use solana::db_ledger::{write_entries_to_ledger, DB_LEDGER_DIRECTORY}; +use solana::db_ledger::DbLedger; use solana::entry::{reconstruct_entries_from_blobs, Entry}; use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; use solana::ledger::{ - create_tmp_genesis, create_tmp_sample_ledger, get_tmp_ledger_path, read_ledger, LedgerWindow, + create_tmp_genesis, create_tmp_sample_ledger, read_ledger, tmp_copy_ledger, LedgerWindow, LedgerWriter, }; use solana::logger; @@ -36,9 +34,8 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::{duration_as_ms, duration_as_s}; use std::collections::{HashSet, VecDeque}; use std::env; -use std::fs::{copy, create_dir_all, remove_dir_all}; +use std::fs::remove_dir_all; use std::net::UdpSocket; -use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; @@ -113,22 +110,6 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { rv } -fn tmp_copy_ledger(from: &str, name: &str) -> String { - let tostr = get_tmp_ledger_path(name); - - { - let to = Path::new(&tostr); - let from = Path::new(&from); - - create_dir_all(to).unwrap(); - - copy(from.join("data"), to.join("data")).unwrap(); - copy(from.join("index"), to.join("index")).unwrap(); - } - - tostr -} - fn make_tiny_test_entries(start_hash: Hash, num: usize) -> Vec { let mut id = start_hash; let mut num_hashes = 0; @@ -962,7 +943,7 @@ fn test_leader_validator_basic() { // Make a common mint and a genesis entry for both leader + validator ledgers let num_ending_ticks = 1; - let (mint, leader_ledger_path, mut genesis_entries) = create_tmp_sample_ledger( + let (mint, leader_ledger_path, genesis_entries) = create_tmp_sample_ledger( "test_leader_validator_basic", 10_000, num_ending_ticks, @@ -989,20 +970,6 @@ fn test_leader_validator_basic() { make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); ledger_writer.write_entries(&active_set_entries).unwrap(); - // Create RocksDb ledgers, write genesis entries to them - let db_leader_ledger_path = format!("{}/{}", leader_ledger_path, DB_LEDGER_DIRECTORY); - let db_validator_ledger_path = format!("{}/{}", validator_ledger_path, DB_LEDGER_DIRECTORY); - - // Write the validator entries to the validator database, they - // will have repair the missing leader "active_set_entries" - write_entries_to_ledger(&vec![db_validator_ledger_path.clone()], &genesis_entries); - - // Next write the leader entries to the leader - genesis_entries.extend(active_set_entries); - write_entries_to_ledger(&vec![db_leader_ledger_path.clone()], &genesis_entries); - - let db_ledger_paths = vec![db_leader_ledger_path, db_validator_ledger_path]; - // Create the leader scheduler config let num_bootstrap_slots = 2; let bootstrap_height = num_bootstrap_slots * leader_rotation_interval; @@ -1103,11 +1070,8 @@ fn test_leader_validator_basic() { assert!(min_len >= bootstrap_height); - for path in db_ledger_paths { - DB::destroy(&Options::default(), &path).expect("Expected successful database destruction"); - } - for path in ledger_paths { + DbLedger::destroy(&path).expect("Expected successful database destruction"); remove_dir_all(path).unwrap(); } } @@ -1308,7 +1272,7 @@ fn test_full_leader_validator_network() { // Make a common mint and a genesis entry for both leader + validator's ledgers let num_ending_ticks = 1; - let (mint, bootstrap_leader_ledger_path, mut genesis_entries) = create_tmp_sample_ledger( + let (mint, bootstrap_leader_ledger_path, genesis_entries) = create_tmp_sample_ledger( "test_full_leader_validator_network", 10_000, num_ending_ticks, @@ -1355,21 +1319,8 @@ fn test_full_leader_validator_network() { .expect("expected at least one genesis entry") .id; ledger_writer.write_entries(&bootstrap_entries).unwrap(); - genesis_entries.extend(bootstrap_entries); } - // Create RocksDb ledger for bootstrap leader, write genesis entries to them - let db_bootstrap_leader_ledger_path = - format!("{}/{}", bootstrap_leader_ledger_path, DB_LEDGER_DIRECTORY); - - // Write the validator entries to the validator databases. - write_entries_to_ledger( - &vec![db_bootstrap_leader_ledger_path.clone()], - &genesis_entries, - ); - - let mut db_ledger_paths = vec![db_bootstrap_leader_ledger_path]; - // Create the common leader scheduling configuration let num_slots_per_epoch = (N + 1) as u64; let num_bootstrap_slots = 2; @@ -1401,15 +1352,8 @@ fn test_full_leader_validator_network() { &bootstrap_leader_ledger_path, "test_full_leader_validator_network", ); - ledger_paths.push(validator_ledger_path.clone()); - - // Create RocksDb ledgers, write genesis entries to them - let db_validator_ledger_path = format!("{}/{}", validator_ledger_path, DB_LEDGER_DIRECTORY); - db_ledger_paths.push(db_validator_ledger_path.clone()); - // Write the validator entries to the validator database, they - // will have repair the missing leader "active_set_entries" - write_entries_to_ledger(&vec![db_validator_ledger_path.clone()], &genesis_entries); + ledger_paths.push(validator_ledger_path.clone()); let validator_id = kp.pubkey(); let validator_node = Node::new_localhost_with_pubkey(validator_id); @@ -1550,11 +1494,8 @@ fn test_full_leader_validator_network() { assert!(shortest.unwrap() >= target_height); - for path in db_ledger_paths { - DB::destroy(&Options::default(), &path).expect("Expected successful database destruction"); - } - for path in ledger_paths { + DbLedger::destroy(&path).expect("Expected successful database destruction"); remove_dir_all(path).unwrap(); } } From 0153f128d832797cbf3b6fdbeeb86504564dc674 Mon Sep 17 00:00:00 2001 From: Carl Date: Sat, 24 Nov 2018 16:37:20 -0800 Subject: [PATCH 11/11] Move db_ledger open from bin/replicator to src/replicator --- src/bin/replicator.rs | 9 +-- src/replicator.rs | 130 ++++++++++++++++++++++-------------------- 2 files changed, 69 insertions(+), 70 deletions(-) diff --git a/src/bin/replicator.rs b/src/bin/replicator.rs index 6a8ba9450106c1..165fa1c89b8227 100644 --- a/src/bin/replicator.rs +++ b/src/bin/replicator.rs @@ -10,7 +10,6 @@ use clap::{App, Arg}; use solana::chacha::{chacha_cbc_encrypt_file, CHACHA_BLOCK_SIZE}; use solana::client::mk_client; use solana::cluster_info::Node; -use solana::db_ledger::DbLedger; use solana::fullnode::Config; use solana::ledger::LEDGER_DATA_FILE; use solana::logger; @@ -24,7 +23,7 @@ use std::net::{Ipv4Addr, SocketAddr}; use std::path::Path; use std::process::exit; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::thread::sleep; use std::time::Duration; @@ -95,13 +94,7 @@ fn main() { // TODO: ask network what slice we should store let entry_height = 0; - // Create the RocksDb ledger, eventually will simply repurpose the input - // ledger path as the RocksDb ledger path - let db_ledger = Arc::new(RwLock::new( - DbLedger::open(&ledger_path.unwrap()).expect("Expected to be able to open database ledger"), - )); let (replicator, leader_info) = Replicator::new( - db_ledger, entry_height, 5, &exit, diff --git a/src/replicator.rs b/src/replicator.rs index ad0338fb4c6489..e8c525532cf815 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -72,7 +72,6 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { impl Replicator { pub fn new( - db_ledger: Arc>, entry_height: u64, max_entry_height: u64, exit: &Arc, @@ -106,6 +105,17 @@ impl Replicator { let (entry_window_sender, entry_window_receiver) = channel(); // todo: pull blobs off the retransmit_receiver and recycle them? let (retransmit_sender, retransmit_receiver) = channel(); + + // Create the RocksDb ledger, eventually will simply repurpose the input + // ledger path as the RocksDb ledger path once we replace the ledger with + // RocksDb. Note for now, this ledger will not contain any of the existing entries + // in the ledger located at ledger_path, and will only append on newly received + // entries after being passed to window_service + let db_ledger = Arc::new(RwLock::new( + DbLedger::open(&ledger_path.unwrap()) + .expect("Expected to be able to open database ledger"), + )); + let t_window = window_service( db_ledger, cluster_info.clone(), @@ -174,7 +184,6 @@ mod tests { use logger; use replicator::sample_file; use replicator::Replicator; - use rocksdb::{Options, DB}; use signature::{Keypair, KeypairUtil}; use solana_sdk::hash::Hash; use std::fs::File; @@ -183,7 +192,7 @@ mod tests { use std::mem::size_of; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::{Arc, RwLock}; + use std::sync::Arc; use std::thread::sleep; use std::time::Duration; @@ -208,76 +217,73 @@ mod tests { let (mint, leader_ledger_path) = create_tmp_genesis(leader_ledger_path, 100, leader_info.id, 1); - let leader = Fullnode::new( - leader_node, - &leader_ledger_path, - leader_keypair, - vote_account_keypair, - None, - false, - LeaderScheduler::from_bootstrap_leader(leader_info.id), - None, - ); - - let mut leader_client = mk_client(&leader_info); - - let bob = Keypair::new(); + { + let leader = Fullnode::new( + leader_node, + &leader_ledger_path, + leader_keypair, + vote_account_keypair, + None, + false, + LeaderScheduler::from_bootstrap_leader(leader_info.id), + None, + ); - let last_id = leader_client.get_last_id(); - leader_client - .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) - .unwrap(); + let mut leader_client = mk_client(&leader_info); - let replicator_keypair = Keypair::new(); + let bob = Keypair::new(); - info!("starting replicator node"); - let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey()); - let db_ledger_path = get_tmp_ledger_path("test_replicator_startup"); - let db_ledger = Arc::new(RwLock::new( - DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), - )); - let (replicator, _leader_info) = Replicator::new( - db_ledger, - entry_height, - 1, - &exit, - Some(replicator_ledger_path), - replicator_node, - Some(network_addr), - done.clone(), - ); + let last_id = leader_client.get_last_id(); + leader_client + .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) + .unwrap(); - let mut num_entries = 0; - for _ in 0..60 { - match read_ledger(replicator_ledger_path, true) { - Ok(entries) => { - for _ in entries { - num_entries += 1; + let replicator_keypair = Keypair::new(); + + info!("starting replicator node"); + let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey()); + let (replicator, _leader_info) = Replicator::new( + entry_height, + 1, + &exit, + Some(replicator_ledger_path), + replicator_node, + Some(network_addr), + done.clone(), + ); + + let mut num_entries = 0; + for _ in 0..60 { + match read_ledger(replicator_ledger_path, true) { + Ok(entries) => { + for _ in entries { + num_entries += 1; + } + info!("{} entries", num_entries); + if num_entries > 0 { + break; + } } - info!("{} entries", num_entries); - if num_entries > 0 { - break; + Err(e) => { + info!("error reading ledger: {:?}", e); } } - Err(e) => { - info!("error reading ledger: {:?}", e); - } + sleep(Duration::from_millis(300)); + let last_id = leader_client.get_last_id(); + leader_client + .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) + .unwrap(); } - sleep(Duration::from_millis(300)); - let last_id = leader_client.get_last_id(); - leader_client - .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) - .unwrap(); + assert_eq!(done.load(Ordering::Relaxed), true); + assert!(num_entries > 0); + exit.store(true, Ordering::Relaxed); + replicator.join(); + leader.exit(); } - assert_eq!(done.load(Ordering::Relaxed), true); - assert!(num_entries > 0); - exit.store(true, Ordering::Relaxed); - replicator.join(); - leader.exit(); - DB::destroy(&Options::default(), &db_ledger_path) + DbLedger::destroy(&leader_ledger_path).expect("Expected successful database destuction"); + DbLedger::destroy(&replicator_ledger_path) .expect("Expected successful database destuction"); - let _ignored = remove_dir_all(&db_ledger_path); let _ignored = remove_dir_all(&leader_ledger_path); let _ignored = remove_dir_all(&replicator_ledger_path); }