Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Reuse entry_writer from solana-genesis #512

Merged
merged 7 commits into from
Jul 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions src/bin/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,28 @@ extern crate serde_json;
extern crate solana;

use atty::{is, Stream};
use solana::entry_writer::EntryWriter;
use solana::mint::Mint;
use std::io::{stdin, Read};
use std::error;
use std::io::{stdin, stdout, Read};
use std::process::exit;
use std::sync::Mutex;

fn main() {
fn main() -> Result<(), Box<error::Error>> {
if is(Stream::Stdin) {
eprintln!("nothing found on stdin, expected a json file");
exit(1);
}

let mut buffer = String::new();
let num_bytes = stdin().read_to_string(&mut buffer).unwrap();
let num_bytes = stdin().read_to_string(&mut buffer)?;
if num_bytes == 0 {
eprintln!("empty file on stdin, expected a json file");
exit(1);
}

let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| {
eprintln!("failed to parse json: {}", e);
exit(1);
});
for x in mint.create_entries() {
let serialized = serde_json::to_string(&x).unwrap_or_else(|e| {
eprintln!("failed to serialize: {}", e);
exit(1);
});
println!("{}", serialized);
}
let mint: Mint = serde_json::from_str(&buffer)?;
let writer = Mutex::new(stdout());
EntryWriter::write_entries(&writer, &mint.create_entries())?;
Ok(())
}
86 changes: 53 additions & 33 deletions src/entry_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ use packet::BlobRecycler;
use result::Result;
use serde_json;
use std::collections::VecDeque;
use std::io::sink;
use std::io::Write;
use std::io::{self, sink, Write};
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use std::time::Duration;
use streamer::BlobSender;

Expand All @@ -26,59 +25,76 @@ impl<'a> EntryWriter<'a> {
EntryWriter { bank }
}

fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
trace!("write_entry entry");
fn write_entry<W: Write>(writer: &Mutex<W>, entry: &Entry) -> io::Result<()> {
let serialized = serde_json::to_string(&entry).unwrap();
writeln!(writer.lock().unwrap(), "{}", serialized)
}

pub fn write_entries<W: Write>(writer: &Mutex<W>, entries: &[Entry]) -> io::Result<()> {
for entry in entries {
Self::write_entry(writer, entry)?;
}
Ok(())
}

fn write_and_register_entry<W: Write>(
&self,
writer: &Mutex<W>,
entry: &Entry,
) -> io::Result<()> {
trace!("write_and_register_entry entry");
if !entry.has_more {
self.bank.register_entry_id(&entry.id);
}
writeln!(
writer.lock().expect("'writer' lock in fn fn write_entry"),
"{}",
serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry")
).expect("writeln! in fn write_entry");
Self::write_entry(&writer, entry)
}

fn write_entries<W: Write>(
fn write_and_register_entries<W: Write>(
&self,
writer: &Mutex<W>,
entry_receiver: &Receiver<Entry>,
) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations
let mut l = vec![];
entries: &[Entry],
) -> io::Result<()> {
for entry in entries {
self.write_and_register_entry(writer, &entry)?;
}
Ok(())
}

fn recv_entries(entry_receiver: &Receiver<Entry>) -> Result<Vec<Entry>> {
let entry = entry_receiver.recv_timeout(Duration::new(1, 0))?;
self.write_entry(writer, &entry);
l.push(entry);
let mut entries = vec![entry];
while let Ok(entry) = entry_receiver.try_recv() {
self.write_entry(writer, &entry);
l.push(entry);
entries.push(entry);
}
Ok(l)
Ok(entries)
}

/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
pub fn write_and_send_entries<W: Write>(
&self,
broadcast: &BlobSender,
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
writer: &Mutex<W>,
entry_receiver: &Receiver<Entry>,
) -> Result<()> {
let mut q = VecDeque::new();
let list = self.write_entries(writer, entry_receiver)?;
trace!("New blobs? {}", list.len());
list.to_blobs(blob_recycler, &mut q);
if !q.is_empty() {
trace!("broadcasting {}", q.len());
broadcast.send(q)?;
let entries = Self::recv_entries(entry_receiver)?;
self.write_and_register_entries(writer, &entries)?;
trace!("New blobs? {}", entries.len());
let mut blobs = VecDeque::new();
entries.to_blobs(blob_recycler, &mut blobs);
if !blobs.is_empty() {
trace!("broadcasting {}", blobs.len());
blob_sender.send(blobs)?;
}
Ok(())
}

/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
pub fn drain_entries(&self, entry_receiver: &Receiver<Entry>) -> Result<()> {
self.write_entries(&Arc::new(Mutex::new(sink())), entry_receiver)?;
let entries = Self::recv_entries(entry_receiver)?;
self.write_and_register_entries(&Mutex::new(sink()), &entries)?;
Ok(())
}
}
Expand Down Expand Up @@ -111,14 +127,18 @@ mod tests {
assert!(entries[0].has_more);
assert!(!entries[1].has_more);

// Verify that write_entry doesn't register the first entries after a split.
// Verify that write_and_register_entry doesn't register the first entries after a split.
assert_eq!(bank.last_id(), mint.last_id());
let writer = Mutex::new(sink());
entry_writer.write_entry(&writer, &entries[0]);
entry_writer
.write_and_register_entry(&writer, &entries[0])
.unwrap();
assert_eq!(bank.last_id(), mint.last_id());

// Verify that write_entry registers the final entry after a split.
entry_writer.write_entry(&writer, &entries[1]);
// Verify that write_and_register_entry registers the final entry after a split.
entry_writer
.write_and_register_entry(&writer, &entries[1])
.unwrap();
assert_eq!(bank.last_id(), entries[1].id);
}
}