Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Add logger #40

Merged
merged 5 commits into from
Mar 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl Accountant {
mod tests {
use super::*;
use event::{generate_keypair, get_pubkey};
use historian::ExitReason;
use logger::ExitReason;

#[test]
fn test_accountant() {
Expand Down
170 changes: 30 additions & 140 deletions src/historian.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
//! The `historian` crate provides a microservice for generating a Proof-of-History.
//! It logs Event items on behalf of its users. It continuously generates
//! new hashes, only stopping to check if it has been sent an Event item. It
//! tags each Event with an Entry and sends it back. The Entry includes the
//! Event, the latest hash, and the number of hashes since the last event.
//! The resulting stream of entries represents ordered events in time.
//! It manages a thread containing a Proof-of-History Logger.

use std::thread::JoinHandle;
use std::collections::HashSet;
use std::sync::mpsc::{Receiver, SyncSender};
use std::time::{Duration, SystemTime};
use log::{hash, hash_event, Entry, Sha256Hash};
use event::{get_signature, verify_event, Event, Signature};
use std::time::Instant;
use log::{hash, Entry, Sha256Hash};
use logger::{verify_event_and_reserve_signature, ExitReason, Logger};
use event::{Event, Signature};
use serde::Serialize;
use std::fmt::Debug;

Expand All @@ -21,121 +18,13 @@ pub struct Historian<T> {
pub signatures: HashSet<Signature>,
}

#[derive(Debug, PartialEq, Eq)]
pub enum ExitReason {
RecvDisconnected,
SendDisconnected,
}
fn log_event<T: Serialize + Clone + Debug>(
sender: &SyncSender<Entry<T>>,
num_hashes: &mut u64,
end_hash: &mut Sha256Hash,
event: Event<T>,
) -> Result<(), (Entry<T>, ExitReason)> {
*end_hash = hash_event(end_hash, &event);
let entry = Entry {
end_hash: *end_hash,
num_hashes: *num_hashes,
event,
};
if let Err(_) = sender.send(entry.clone()) {
return Err((entry, ExitReason::SendDisconnected));
}
*num_hashes = 0;
Ok(())
}

fn verify_event_and_reserve_signature<T: Serialize>(
signatures: &mut HashSet<Signature>,
event: &Event<T>,
) -> bool {
if !verify_event(&event) {
return false;
}
if let Some(sig) = get_signature(&event) {
if signatures.contains(&sig) {
return false;
}
signatures.insert(sig);
}
true
}

fn log_events<T: Serialize + Clone + Debug>(
receiver: &Receiver<Event<T>>,
sender: &SyncSender<Entry<T>>,
num_hashes: &mut u64,
end_hash: &mut Sha256Hash,
epoch: SystemTime,
num_ticks: &mut u64,
ms_per_tick: Option<u64>,
) -> Result<(), (Entry<T>, ExitReason)> {
use std::sync::mpsc::TryRecvError;
loop {
if let Some(ms) = ms_per_tick {
let now = SystemTime::now();
if now > epoch + Duration::from_millis((*num_ticks + 1) * ms) {
log_event(sender, num_hashes, end_hash, Event::Tick)?;
*num_ticks += 1;
}
}
match receiver.try_recv() {
Ok(event) => {
log_event(sender, num_hashes, end_hash, event)?;
}
Err(TryRecvError::Empty) => {
return Ok(());
}
Err(TryRecvError::Disconnected) => {
let entry = Entry {
end_hash: *end_hash,
num_hashes: *num_hashes,
event: Event::Tick,
};
return Err((entry, ExitReason::RecvDisconnected));
}
}
}
}

/// A background thread that will continue tagging received Event messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
pub fn create_logger<T: 'static + Serialize + Clone + Debug + Send>(
start_hash: Sha256Hash,
ms_per_tick: Option<u64>,
receiver: Receiver<Event<T>>,
sender: SyncSender<Entry<T>>,
) -> JoinHandle<(Entry<T>, ExitReason)> {
use std::thread;
thread::spawn(move || {
let mut end_hash = start_hash;
let mut num_hashes = 0;
let mut num_ticks = 0;
let epoch = SystemTime::now();
loop {
if let Err(err) = log_events(
&receiver,
&sender,
&mut num_hashes,
&mut end_hash,
epoch,
&mut num_ticks,
ms_per_tick,
) {
return err;
}
end_hash = hash(&end_hash);
num_hashes += 1;
}
})
}

impl<T: 'static + Serialize + Clone + Debug + Send> Historian<T> {
pub fn new(start_hash: &Sha256Hash, ms_per_tick: Option<u64>) -> Self {
use std::sync::mpsc::sync_channel;
let (sender, event_receiver) = sync_channel(1000);
let (entry_sender, receiver) = sync_channel(1000);
let thread_hdl = create_logger(*start_hash, ms_per_tick, event_receiver, entry_sender);
let thread_hdl =
Historian::create_logger(*start_hash, ms_per_tick, event_receiver, entry_sender);
let signatures = HashSet::new();
Historian {
sender,
Expand All @@ -144,9 +33,32 @@ impl<T: 'static + Serialize + Clone + Debug + Send> Historian<T> {
signatures,
}
}

pub fn verify_event(self: &mut Self, event: &Event<T>) -> bool {
return verify_event_and_reserve_signature(&mut self.signatures, event);
}

/// A background thread that will continue tagging received Event messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
fn create_logger(
start_hash: Sha256Hash,
ms_per_tick: Option<u64>,
receiver: Receiver<Event<T>>,
sender: SyncSender<Entry<T>>,
) -> JoinHandle<(Entry<T>, ExitReason)> {
use std::thread;
thread::spawn(move || {
let mut logger = Logger::new(receiver, sender, start_hash);
let now = Instant::now();
loop {
if let Err(err) = logger.log_events(now, ms_per_tick) {
return err;
}
logger.end_hash = hash(&logger.end_hash);
logger.num_hashes += 1;
}
})
}
}

#[cfg(test)]
Expand Down Expand Up @@ -210,26 +122,4 @@ mod tests {
assert!(entries.len() > 1);
assert!(verify_slice(&entries, &zero));
}

#[test]
fn test_bad_event_signature() {
let keypair = generate_keypair();
let sig = sign_claim_data(&hash(b"hello, world"), &keypair);
let event0 = Event::new_claim(get_pubkey(&keypair), hash(b"goodbye cruel world"), sig);
let mut sigs = HashSet::new();
assert!(!verify_event_and_reserve_signature(&mut sigs, &event0));
assert!(!sigs.contains(&sig));
}

#[test]
fn test_duplicate_event_signature() {
let keypair = generate_keypair();
let to = get_pubkey(&keypair);
let data = &hash(b"hello, world");
let sig = sign_claim_data(data, &keypair);
let event0 = Event::new_claim(to, data, sig);
let mut sigs = HashSet::new();
assert!(verify_event_and_reserve_signature(&mut sigs, &event0));
assert!(!verify_event_and_reserve_signature(&mut sigs, &event0));
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![cfg_attr(feature = "unstable", feature(test))]
pub mod log;
pub mod logger;
pub mod event;
pub mod historian;
pub mod accountant;
Expand Down
135 changes: 135 additions & 0 deletions src/logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
//! The `logger` crate provides an object for generating a Proof-of-History.
//! It logs Event items on behalf of its users. It continuously generates
//! new hashes, only stopping to check if it has been sent an Event item. It
//! tags each Event with an Entry and sends it back. The Entry includes the
//! Event, the latest hash, and the number of hashes since the last event.
//! The resulting stream of entries represents ordered events in time.

use std::collections::HashSet;
use std::sync::mpsc::{Receiver, SyncSender};
use std::time::{Duration, Instant};
use log::{hash_event, Entry, Sha256Hash};
use event::{get_signature, verify_event, Event, Signature};
use serde::Serialize;
use std::fmt::Debug;

#[derive(Debug, PartialEq, Eq)]
pub enum ExitReason {
RecvDisconnected,
SendDisconnected,
}

pub struct Logger<T> {
pub sender: SyncSender<Entry<T>>,
pub receiver: Receiver<Event<T>>,
pub end_hash: Sha256Hash,
pub num_hashes: u64,
pub num_ticks: u64,
}

pub fn verify_event_and_reserve_signature<T: Serialize>(
signatures: &mut HashSet<Signature>,
event: &Event<T>,
) -> bool {
if !verify_event(&event) {
return false;
}
if let Some(sig) = get_signature(&event) {
if signatures.contains(&sig) {
return false;
}
signatures.insert(sig);
}
true
}

impl<T: Serialize + Clone + Debug> Logger<T> {
pub fn new(
receiver: Receiver<Event<T>>,
sender: SyncSender<Entry<T>>,
start_hash: Sha256Hash,
) -> Self {
Logger {
receiver,
sender,
end_hash: start_hash,
num_hashes: 0,
num_ticks: 0,
}
}

pub fn log_event(&mut self, event: Event<T>) -> Result<(), (Entry<T>, ExitReason)> {
self.end_hash = hash_event(&self.end_hash, &event);
let entry = Entry {
end_hash: self.end_hash,
num_hashes: self.num_hashes,
event,
};
if let Err(_) = self.sender.send(entry.clone()) {
return Err((entry, ExitReason::SendDisconnected));
}
self.num_hashes = 0;
Ok(())
}

pub fn log_events(
&mut self,
epoch: Instant,
ms_per_tick: Option<u64>,
) -> Result<(), (Entry<T>, ExitReason)> {
use std::sync::mpsc::TryRecvError;
loop {
if let Some(ms) = ms_per_tick {
if epoch.elapsed() > Duration::from_millis((self.num_ticks + 1) * ms) {
self.log_event(Event::Tick)?;
self.num_ticks += 1;
}
}
match self.receiver.try_recv() {
Ok(event) => {
self.log_event(event)?;
}
Err(TryRecvError::Empty) => {
return Ok(());
}
Err(TryRecvError::Disconnected) => {
let entry = Entry {
end_hash: self.end_hash,
num_hashes: self.num_hashes,
event: Event::Tick,
};
return Err((entry, ExitReason::RecvDisconnected));
}
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use log::*;
use event::*;

#[test]
fn test_bad_event_signature() {
let keypair = generate_keypair();
let sig = sign_claim_data(&hash(b"hello, world"), &keypair);
let event0 = Event::new_claim(get_pubkey(&keypair), hash(b"goodbye cruel world"), sig);
let mut sigs = HashSet::new();
assert!(!verify_event_and_reserve_signature(&mut sigs, &event0));
assert!(!sigs.contains(&sig));
}

#[test]
fn test_duplicate_event_signature() {
let keypair = generate_keypair();
let to = get_pubkey(&keypair);
let data = &hash(b"hello, world");
let sig = sign_claim_data(data, &keypair);
let event0 = Event::new_claim(to, data, sig);
let mut sigs = HashSet::new();
assert!(verify_event_and_reserve_signature(&mut sigs, &event0));
assert!(!verify_event_and_reserve_signature(&mut sigs, &event0));
}
}