Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Merge pull request #166 from garious/refactor-historian
Browse files Browse the repository at this point in the history
TPU-friendly Historian
  • Loading branch information
garious authored May 3, 2018
2 parents 4ad50f0 + 430aeb8 commit a158d74
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 51 deletions.
48 changes: 31 additions & 17 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::VecDeque;
use std::io::Write;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::mpsc::{channel, Receiver, Sender, SyncSender};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
Expand All @@ -34,6 +34,7 @@ pub struct AccountantSkel<W: Write + Send + 'static> {
acc: Accountant,
last_id: Hash,
writer: W,
historian_input: SyncSender<Signal>,
historian: Historian,
entry_info_subscribers: Vec<SocketAddr>,
}
Expand Down Expand Up @@ -78,11 +79,18 @@ pub enum Response {

impl<W: Write + Send + 'static> AccountantSkel<W> {
/// Create a new AccountantSkel that wraps the given Accountant.
pub fn new(acc: Accountant, last_id: Hash, writer: W, historian: Historian) -> Self {
pub fn new(
acc: Accountant,
last_id: Hash,
writer: W,
historian_input: SyncSender<Signal>,
historian: Historian,
) -> Self {
AccountantSkel {
acc,
last_id,
writer,
historian_input,
historian,
entry_info_subscribers: vec![],
}
Expand All @@ -105,7 +113,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {

/// Process any Entry items that have been published by the Historian.
pub fn sync(&mut self) -> Hash {
while let Ok(entry) = self.historian.receiver.try_recv() {
while let Ok(entry) = self.historian.output.try_recv() {
self.last_id = entry.id;
self.acc.register_entry_id(&self.last_id);
writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
Expand Down Expand Up @@ -214,15 +222,14 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
// Process the transactions in parallel and then log the successful ones.
for result in self.acc.process_verified_transactions(trs) {
if let Ok(tr) = result {
self.historian
.sender
self.historian_input
.send(Signal::Event(Event::Transaction(tr)))?;
}
}

// Let validators know they should not attempt to process additional
// transactions in parallel.
self.historian.sender.send(Signal::Tick)?;
self.historian_input.send(Signal::Tick)?;

// Process the remaining requests serially.
let rsps = reqs.into_iter()
Expand Down Expand Up @@ -482,6 +489,7 @@ mod tests {
use std::io::sink;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::sync_channel;
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::time::Duration;
Expand Down Expand Up @@ -530,8 +538,9 @@ mod tests {
let mint = Mint::new(2);
let acc = Accountant::new(&mint);
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
let historian = Historian::new(&mint.last_id(), None);
let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian);
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), input, historian);

// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
Expand All @@ -545,9 +554,9 @@ mod tests {
assert!(skel.process_packets(req_vers).is_ok());

// Collect the ledger and feed it to a new accountant.
skel.historian.sender.send(Signal::Tick).unwrap();
drop(skel.historian.sender);
let entries: Vec<Entry> = skel.historian.receiver.iter().collect();
skel.historian_input.send(Signal::Tick).unwrap();
drop(skel.historian_input);
let entries: Vec<Entry> = skel.historian.output.iter().collect();

// Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives
Expand All @@ -569,11 +578,13 @@ mod tests {
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let historian = Historian::new(&alice.last_id(), Some(30));
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Arc::new(Mutex::new(AccountantSkel::new(
acc,
alice.last_id(),
sink(),
input,
historian,
)));
let _threads = AccountantSkel::serve(&acc, &addr, exit.clone()).unwrap();
Expand Down Expand Up @@ -651,11 +662,13 @@ mod tests {
let starting_balance = 10_000;
let alice = Mint::new(starting_balance);
let acc = Accountant::new(&alice);
let historian = Historian::new(&alice.last_id(), Some(30));
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Arc::new(Mutex::new(AccountantSkel::new(
acc,
alice.last_id(),
sink(),
input,
historian,
)));

Expand Down Expand Up @@ -790,8 +803,9 @@ mod bench {
.map(|tr| (Request::Transaction(tr), rsp_addr, 1_u8))
.collect();

let historian = Historian::new(&mint.last_id(), None);
let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian);
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), input, historian);

let now = Instant::now();
assert!(skel.process_packets(req_vers).is_ok());
Expand All @@ -800,8 +814,8 @@ mod bench {
let tps = txs as f64 / sec;

// Ensure that all transactions were successfully logged.
drop(skel.historian.sender);
let entries: Vec<Entry> = skel.historian.receiver.iter().collect();
drop(input);
let entries: Vec<Entry> = skel.historian.output.iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize);

Expand Down
5 changes: 4 additions & 1 deletion src/accountant_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ mod tests {
use signature::{KeyPair, KeyPairUtil};
use std::io::sink;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::sync_channel;
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::time::Duration;
Expand All @@ -178,11 +179,13 @@ mod tests {
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let historian = Historian::new(&alice.last_id(), Some(30));
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Arc::new(Mutex::new(AccountantSkel::new(
acc,
alice.last_id(),
sink(),
input,
historian,
)));
let _threads = AccountantSkel::serve(&acc, addr, exit.clone()).unwrap();
Expand Down
15 changes: 8 additions & 7 deletions src/bin/historian-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,27 @@ use solana::ledger::Block;
use solana::recorder::Signal;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::transaction::Transaction;
use std::sync::mpsc::SendError;
use std::sync::mpsc::{sync_channel, SendError, SyncSender};
use std::thread::sleep;
use std::time::Duration;

fn create_ledger(hist: &Historian, seed: &Hash) -> Result<(), SendError<Signal>> {
fn create_ledger(input: &SyncSender<Signal>, seed: &Hash) -> Result<(), SendError<Signal>> {
sleep(Duration::from_millis(15));
let keypair = KeyPair::new();
let tr = Transaction::new(&keypair, keypair.pubkey(), 42, *seed);
let signal0 = Signal::Event(Event::Transaction(tr));
hist.sender.send(signal0)?;
input.send(signal0)?;
sleep(Duration::from_millis(10));
Ok(())
}

fn main() {
let (input, event_receiver) = sync_channel(10);
let seed = Hash::default();
let hist = Historian::new(&seed, Some(10));
create_ledger(&hist, &seed).expect("send error");
drop(hist.sender);
let entries: Vec<Entry> = hist.receiver.iter().collect();
let hist = Historian::new(event_receiver, &seed, Some(10));
create_ledger(&input, &seed).expect("send error");
drop(input);
let entries: Vec<Entry> = hist.output.iter().collect();
for entry in &entries {
println!("{:?}", entry);
}
Expand Down
5 changes: 4 additions & 1 deletion src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::env;
use std::io::{stdin, stdout, Read};
use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::sync_channel;
use std::sync::{Arc, Mutex};

fn print_usage(program: &str, opts: Options) {
Expand Down Expand Up @@ -95,12 +96,14 @@ fn main() {
acc.register_entry_id(&last_id);
}

let historian = Historian::new(&last_id, Some(1000));
let (input, event_receiver) = sync_channel(10_000);
let historian = Historian::new(event_receiver, &last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false));
let skel = Arc::new(Mutex::new(AccountantSkel::new(
acc,
last_id,
stdout(),
input,
historian,
)));
let threads = AccountantSkel::serve(&skel, &addr, exit.clone()).unwrap();
Expand Down
51 changes: 26 additions & 25 deletions src/historian.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,20 @@ use std::thread::{spawn, JoinHandle};
use std::time::Instant;

pub struct Historian {
pub sender: SyncSender<Signal>,
pub receiver: Receiver<Entry>,
pub output: Receiver<Entry>,
pub thread_hdl: JoinHandle<ExitReason>,
}

impl Historian {
pub fn new(start_hash: &Hash, ms_per_tick: Option<u64>) -> Self {
let (sender, event_receiver) = sync_channel(10_000);
let (entry_sender, receiver) = sync_channel(10_000);
pub fn new(
event_receiver: Receiver<Signal>,
start_hash: &Hash,
ms_per_tick: Option<u64>,
) -> Self {
let (entry_sender, output) = sync_channel(10_000);
let thread_hdl =
Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender);
Historian {
sender,
receiver,
thread_hdl,
}
Historian { output, thread_hdl }
}

/// A background thread that will continue tagging received Event messages and
Expand Down Expand Up @@ -59,24 +57,25 @@ mod tests {

#[test]
fn test_historian() {
let (input, event_receiver) = sync_channel(10);
let zero = Hash::default();
let hist = Historian::new(&zero, None);
let hist = Historian::new(event_receiver, &zero, None);

hist.sender.send(Signal::Tick).unwrap();
input.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000));
hist.sender.send(Signal::Tick).unwrap();
input.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000));
hist.sender.send(Signal::Tick).unwrap();
input.send(Signal::Tick).unwrap();

let entry0 = hist.receiver.recv().unwrap();
let entry1 = hist.receiver.recv().unwrap();
let entry2 = hist.receiver.recv().unwrap();
let entry0 = hist.output.recv().unwrap();
let entry1 = hist.output.recv().unwrap();
let entry2 = hist.output.recv().unwrap();

assert_eq!(entry0.num_hashes, 0);
assert_eq!(entry1.num_hashes, 0);
assert_eq!(entry2.num_hashes, 0);

drop(hist.sender);
drop(input);
assert_eq!(
hist.thread_hdl.join().unwrap(),
ExitReason::RecvDisconnected
Expand All @@ -87,10 +86,11 @@ mod tests {

#[test]
fn test_historian_closed_sender() {
let (input, event_receiver) = sync_channel(10);
let zero = Hash::default();
let hist = Historian::new(&zero, None);
drop(hist.receiver);
hist.sender.send(Signal::Tick).unwrap();
let hist = Historian::new(event_receiver, &zero, None);
drop(hist.output);
input.send(Signal::Tick).unwrap();
assert_eq!(
hist.thread_hdl.join().unwrap(),
ExitReason::SendDisconnected
Expand All @@ -99,12 +99,13 @@ mod tests {

#[test]
fn test_ticking_historian() {
let (input, event_receiver) = sync_channel(10);
let zero = Hash::default();
let hist = Historian::new(&zero, Some(20));
let hist = Historian::new(event_receiver, &zero, Some(20));
sleep(Duration::from_millis(300));
hist.sender.send(Signal::Tick).unwrap();
drop(hist.sender);
let entries: Vec<Entry> = hist.receiver.iter().collect();
input.send(Signal::Tick).unwrap();
drop(input);
let entries: Vec<Entry> = hist.output.iter().collect();
assert!(entries.len() > 1);

// Ensure the ID is not the seed.
Expand Down

0 comments on commit a158d74

Please sign in to comment.