Skip to content

Commit

Permalink
Merge pull request #2 from aeyakovenko/sakridge/client_experiments
Browse files Browse the repository at this point in the history
Sakridge/client experiments
  • Loading branch information
sakridge authored May 4, 2018
2 parents 09825f8 + 7312e25 commit b6919d3
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 22 deletions.
28 changes: 10 additions & 18 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::process::exit;
use std::time::Instant;
use untrusted::Input;
use std::time::Duration;
use std::thread::sleep;

fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: cat <mint.json> | {} [options]\n\n", program);
Expand Down Expand Up @@ -117,6 +118,7 @@ fn main() {
);

let initial_tx_count = acc.transaction_count();
println!("initial count {}", initial_tx_count);

println!("Transfering {} transactions in {} batches", txs, threads);
let now = Instant::now();
Expand All @@ -137,24 +139,14 @@ fn main() {
let mut tx_count;
let mut last_tx_count = 0;
let mut last_count_tripped = 0;
loop {
for _ in 0..5 {
tx_count = acc.transaction_count();
if tx_count > last_tx_count {
duration = now.elapsed();
} else {
if tx_count > 0 {
if last_count_tripped > 1 {
break;
}
last_count_tripped += 1;
}
}
last_tx_count = tx_count;
duration = now.elapsed();
let txs = tx_count - initial_tx_count;
println!("Transactions processed {}", txs);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (txs * 1_000_000_000) as f64 / ns as f64;
println!("{} tps", tps);
sleep(Duration::new(1,0));
}
let txs = tx_count - initial_tx_count;
println!("Transactions processed {}", txs);

let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (txs * 1_000_000_000) as f64 / ns as f64;
println!("Done. {} tps", tps);
}
14 changes: 10 additions & 4 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,19 @@ impl Crdt {
/// (A,B)
/// * A - Address to send to
/// * B - RequestUpdates protocol message
fn gossip_request(&self) -> (SocketAddr, Protocol) {
let n = (Self::random() as usize) % self.table.len();
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
if self.table.len() <= 1 {
return Err(Error::GeneralError);
}
let mut n = (Self::random() as usize) % self.table.len();
while self.table.values().nth(n).unwrap().id == self.me {
n = (Self::random() as usize) % self.table.len();
}
trace!("random {:?} {}", &self.me[0..1], n);
let v = self.table.values().nth(n).unwrap().clone();
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
(v.gossip_addr, req)
Ok((v.gossip_addr, req))
}

/// At random pick a node and try to get updated changes from them
Expand All @@ -274,7 +280,7 @@ impl Crdt {

// Lock the object only to do this operation and not for any longer
// especially not when doing the `sock.send_to`
let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request();
let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request()?;
let sock = UdpSocket::bind("0.0.0.0:0")?;
// TODO this will get chatty, so we need to first ask for number of updates since
// then only ask for specific data that we dont have
Expand Down
1 change: 1 addition & 0 deletions src/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub enum Error {
AccountingError(accountant::AccountingError),
SendError,
Services,
GeneralError,
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down

0 comments on commit b6919d3

Please sign in to comment.