diff --git a/src/lib.rs b/src/lib.rs index e61b59f88936f6..1c14c9320f91aa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ pub mod thin_client; pub mod timing; pub mod transaction; pub mod tvu; +pub mod write_stage; extern crate bincode; extern crate byteorder; extern crate chrono; diff --git a/src/rpu.rs b/src/rpu.rs index 683d6e787c1d71..d48dc1510f083d 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -3,8 +3,6 @@ use bank::Bank; use crdt::{Crdt, ReplicatedData}; -use entry::Entry; -use entry_writer::EntryWriter; use hash::Hash; use packet; use record_stage::RecordStage; @@ -14,12 +12,13 @@ use result::Result; use sig_verify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver}; +use std::sync::atomic::AtomicBool; +use std::sync::mpsc::channel; use std::sync::{Arc, Mutex, RwLock}; -use std::thread::{spawn, JoinHandle}; +use std::thread::JoinHandle; use std::time::Duration; use streamer; +use write_stage::WriteStage; pub struct Rpu { bank: Arc, @@ -37,29 +36,6 @@ impl Rpu { } } - fn write_service( - bank: Arc, - exit: Arc, - broadcast: streamer::BlobSender, - blob_recycler: packet::BlobRecycler, - writer: Mutex, - entry_receiver: Receiver, - ) -> JoinHandle<()> { - spawn(move || loop { - let entry_writer = EntryWriter::new(&bank); - let _ = entry_writer.write_and_send_entries( - &broadcast, - &blob_recycler, - &writer, - &entry_receiver, - ); - if exit.load(Ordering::Relaxed) { - info!("broadcat_service exiting"); - break; - } - }) - } - /// Create a UDP microservice that forwards messages the given Rpu. /// This service is the network leader /// Set `exit` to shutdown its threads. @@ -107,11 +83,9 @@ impl Rpu { self.tick_duration, ); - let (broadcast_sender, broadcast_receiver) = channel(); - let t_write = Self::write_service( + let write_stage = WriteStage::new( self.bank.clone(), exit.clone(), - broadcast_sender, blob_recycler.clone(), Mutex::new(writer), record_stage.entry_receiver, @@ -124,7 +98,7 @@ impl Rpu { crdt.clone(), window, blob_recycler.clone(), - broadcast_receiver, + write_stage.blob_receiver, ); let respond_socket = UdpSocket::bind(local.clone())?; @@ -139,7 +113,7 @@ impl Rpu { t_receiver, t_responder, request_stage.thread_hdl, - t_write, + write_stage.thread_hdl, t_gossip, t_listen, t_broadcast, diff --git a/src/tvu.rs b/src/tvu.rs index 1216988f48a78e..d26171d7094588 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -3,8 +3,6 @@ use bank::Bank; use crdt::{Crdt, ReplicatedData}; -use entry::Entry; -use entry_writer::EntryWriter; use hash::Hash; use ledger; use packet; @@ -15,11 +13,12 @@ use result::Result; use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver}; +use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; +use write_stage::WriteStage; pub struct Tvu { bank: Arc, @@ -37,23 +36,6 @@ impl Tvu { } } - fn drain_service( - bank: Arc, - exit: Arc, - entry_receiver: Receiver, - ) -> JoinHandle<()> { - spawn(move || { - let entry_writer = EntryWriter::new(&bank); - loop { - let _ = entry_writer.drain_entries(&entry_receiver); - if exit.load(Ordering::Relaxed) { - info!("drain_service exiting"); - break; - } - } - }) - } - /// Process verified blobs, already in order /// Respond with a signed hash of the state fn replicate_state( @@ -193,8 +175,8 @@ impl Tvu { obj.tick_duration, ); - let t_write = - Self::drain_service(obj.bank.clone(), exit.clone(), record_stage.entry_receiver); + let write_stage = + WriteStage::new_drain(obj.bank.clone(), exit.clone(), record_stage.entry_receiver); let t_responder = streamer::responder( respond_socket, @@ -215,7 +197,7 @@ impl Tvu { t_packet_receiver, t_responder, request_stage.thread_hdl, - t_write, + write_stage.thread_hdl, ]; threads.extend(sig_verify_stage.thread_hdls.into_iter()); Ok(threads) diff --git a/src/write_stage.rs b/src/write_stage.rs new file mode 100644 index 00000000000000..fd253846774744 --- /dev/null +++ b/src/write_stage.rs @@ -0,0 +1,71 @@ +//! The `write_stage` module implements write stage of the RPU. + +use bank::Bank; +use entry::Entry; +use entry_writer::EntryWriter; +use packet; +use std::io::Write; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Receiver}; +use std::sync::{Arc, Mutex}; +use std::thread::{spawn, JoinHandle}; +use streamer; + +pub struct WriteStage { + pub thread_hdl: JoinHandle<()>, + pub blob_receiver: streamer::BlobReceiver, +} + +impl WriteStage { + /// Create a new Rpu that wraps the given Bank. + pub fn new( + bank: Arc, + exit: Arc, + blob_recycler: packet::BlobRecycler, + writer: Mutex, + entry_receiver: Receiver, + ) -> Self { + let (blob_sender, blob_receiver) = channel(); + let thread_hdl = spawn(move || loop { + let entry_writer = EntryWriter::new(&bank); + let _ = entry_writer.write_and_send_entries( + &blob_sender, + &blob_recycler, + &writer, + &entry_receiver, + ); + if exit.load(Ordering::Relaxed) { + info!("broadcat_service exiting"); + break; + } + }); + + WriteStage { + thread_hdl, + blob_receiver, + } + } + + pub fn new_drain( + bank: Arc, + exit: Arc, + entry_receiver: Receiver, + ) -> Self { + let (_blob_sender, blob_receiver) = channel(); + let thread_hdl = spawn(move || { + let entry_writer = EntryWriter::new(&bank); + loop { + let _ = entry_writer.drain_entries(&entry_receiver); + if exit.load(Ordering::Relaxed) { + info!("drain_service exiting"); + break; + } + } + }); + + WriteStage { + thread_hdl, + blob_receiver, + } + } +}