Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

More modules #206

Merged
merged 4 commits into from
May 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Cargo.lock
/target/
**/*.rs.bk
.cargo
2 changes: 1 addition & 1 deletion src/ecdsa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ mod tests {
use bincode::serialize;
use ecdsa;
use packet::{Packet, Packets, SharedPackets};
use request_stage::Request;
use request::Request;
use std::sync::RwLock;
use transaction::Transaction;
use transaction::test_tx;
Expand Down
15 changes: 6 additions & 9 deletions src/entry_writer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! The `entry_writer` module helps implement the TPU's write stage.

use accountant::Accountant;
use entry::Entry;
use event_processor::EventProcessor;
use ledger;
use packet;
use request_stage::RequestProcessor;
use request_processor::RequestProcessor;
use result::Result;
use serde_json;
use std::collections::VecDeque;
Expand All @@ -16,25 +16,22 @@ use std::time::Duration;
use streamer;

pub struct EntryWriter<'a> {
event_processor: &'a EventProcessor,
accountant: &'a Accountant,
request_processor: &'a RequestProcessor,
}

impl<'a> EntryWriter<'a> {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(
event_processor: &'a EventProcessor,
request_processor: &'a RequestProcessor,
) -> Self {
pub fn new(accountant: &'a Accountant, request_processor: &'a RequestProcessor) -> Self {
EntryWriter {
event_processor,
accountant,
request_processor,
}
}

fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
trace!("write_entry entry");
self.event_processor.accountant.register_entry_id(&entry.id);
self.accountant.register_entry_id(&entry.id);
writeln!(
writer.lock().expect("'writer' lock in fn fn write_entry"),
"{}",
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub mod mint;
pub mod packet;
pub mod plan;
pub mod recorder;
pub mod request;
pub mod request_processor;
pub mod request_stage;
pub mod result;
pub mod rpu;
Expand Down
99 changes: 99 additions & 0 deletions src/request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! The `request` module defines the messages for the thin client.

use bincode::serialize;
use hash::Hash;
use packet;
use packet::SharedPackets;
use signature::PublicKey;
use transaction::Transaction;

#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Request {
Transaction(Transaction),
GetBalance { key: PublicKey },
Subscribe { subscriptions: Vec<Subscription> },
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Subscription {
EntryInfo,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EntryInfo {
pub id: Hash,
pub num_hashes: u64,
pub num_events: u64,
}

impl Request {
/// Verify the request is valid.
pub fn verify(&self) -> bool {
match *self {
Request::Transaction(ref tr) => tr.verify_plan(),
_ => true,
}
}
}

#[derive(Serialize, Deserialize, Debug)]
pub enum Response {
Balance { key: PublicKey, val: Option<i64> },
EntryInfo(EntryInfo),
}

pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedPackets> {
let mut out = vec![];
for rrs in reqs.chunks(packet::NUM_PACKETS) {
let p = r.allocate();
p.write()
.unwrap()
.packets
.resize(rrs.len(), Default::default());
for (i, o) in rrs.iter().zip(p.write().unwrap().packets.iter_mut()) {
let v = serialize(&i).expect("serialize request");
let len = v.len();
o.data[..len].copy_from_slice(&v);
o.meta.size = len;
}
out.push(p);
}
return out;
}

#[cfg(test)]
mod tests {
use bincode::serialize;
use ecdsa;
use packet::{PacketRecycler, NUM_PACKETS};
use request::{to_request_packets, Request};
use transaction::{memfind, test_tx};

#[test]
fn test_layout() {
let tr = test_tx();
let tx = serialize(&tr).unwrap();
let packet = serialize(&Request::Transaction(tr)).unwrap();
assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET));
assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None);
}

#[test]
fn test_to_packets() {
let tr = Request::Transaction(test_tx());
let re = PacketRecycler::default();
let rv = to_request_packets(&re, vec![tr.clone(); 1]);
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().unwrap().packets.len(), 1);

let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS]);
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);

let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]);
assert_eq!(rv.len(), 2);
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
assert_eq!(rv[1].read().unwrap().packets.len(), 1);
}
}
232 changes: 232 additions & 0 deletions src/request_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
//! The `request_stage` processes thin client Request messages.

use accountant::Accountant;
use bincode::{deserialize, serialize};
use entry::Entry;
use event::Event;
use event_processor::EventProcessor;
use packet;
use packet::SharedPackets;
use rayon::prelude::*;
use request::{EntryInfo, Request, Response, Subscription};
use result::Result;
use std::collections::VecDeque;
use std::net::{SocketAddr, UdpSocket};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::Instant;
use streamer;
use timing;

pub struct RequestProcessor {
accountant: Arc<Accountant>,
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
}

impl RequestProcessor {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(accountant: Arc<Accountant>) -> Self {
RequestProcessor {
accountant,
entry_info_subscribers: Mutex::new(vec![]),
}
}

/// Process Request items sent by clients.
fn process_request(
&self,
msg: Request,
rsp_addr: SocketAddr,
) -> Option<(Response, SocketAddr)> {
match msg {
Request::GetBalance { key } => {
let val = self.accountant.get_balance(&key);
let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp);
Some(rsp)
}
Request::Transaction(_) => unreachable!(),
Request::Subscribe { subscriptions } => {
for subscription in subscriptions {
match subscription {
Subscription::EntryInfo => {
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
}
}
}
None
}
}
}

pub fn process_requests(
&self,
reqs: Vec<(Request, SocketAddr)>,
) -> Vec<(Response, SocketAddr)> {
reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect()
}

pub fn notify_entry_info_subscribers(&self, entry: &Entry) {
// TODO: No need to bind().
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");

// copy subscribers to avoid taking lock while doing io
let addrs = self.entry_info_subscribers.lock().unwrap().clone();
trace!("Sending to {} addrs", addrs.len());
for addr in addrs {
let entry_info = EntryInfo {
id: entry.id,
num_hashes: entry.num_hashes,
num_events: entry.events.len() as u64,
};
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
trace!("sending {} to {}", data.len(), addr);
//TODO dont do IO here, this needs to be on a separate channel
let res = socket.send_to(&data, addr);
if res.is_err() {
eprintln!("couldn't send response: {:?}", res);
}
}
}

fn deserialize_requests(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> {
p.packets
.par_iter()
.map(|x| {
deserialize(&x.data[0..x.meta.size])
.map(|req| (req, x.meta.addr()))
.ok()
})
.collect()
}

// Copy-paste of deserialize_requests() because I can't figure out how to
// route the lifetimes in a generic version.
pub fn deserialize_events(p: &packet::Packets) -> Vec<Option<(Event, SocketAddr)>> {
p.packets
.par_iter()
.map(|x| {
deserialize(&x.data[0..x.meta.size])
.map(|req| (req, x.meta.addr()))
.ok()
})
.collect()
}

/// Split Request list into verified transactions and the rest
fn partition_requests(
req_vers: Vec<(Request, SocketAddr, u8)>,
) -> (Vec<Event>, Vec<(Request, SocketAddr)>) {
let mut events = vec![];
let mut reqs = vec![];
for (msg, rsp_addr, verify) in req_vers {
match msg {
Request::Transaction(tr) => {
if verify != 0 {
events.push(Event::Transaction(tr));
}
}
_ => reqs.push((msg, rsp_addr)),
}
}
(events, reqs)
}

fn serialize_response(
resp: Response,
rsp_addr: SocketAddr,
blob_recycler: &packet::BlobRecycler,
) -> Result<packet::SharedBlob> {
let blob = blob_recycler.allocate();
{
let mut b = blob.write().unwrap();
let v = serialize(&resp)?;
let len = v.len();
b.data[..len].copy_from_slice(&v);
b.meta.size = len;
b.meta.set_addr(&rsp_addr);
}
Ok(blob)
}

fn serialize_responses(
rsps: Vec<(Response, SocketAddr)>,
blob_recycler: &packet::BlobRecycler,
) -> Result<VecDeque<packet::SharedBlob>> {
let mut blobs = VecDeque::new();
for (resp, rsp_addr) in rsps {
blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?);
}
Ok(blobs)
}

pub fn process_request_packets(
&self,
event_processor: &EventProcessor,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
entry_sender: &Sender<Entry>,
blob_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let recv_start = Instant::now();
let mms = verified_receiver.recv_timeout(timer)?;
let mut reqs_len = 0;
let mms_len = mms.len();
info!(
"@{:?} process start stalled for: {:?}ms batches: {}",
timing::timestamp(),
timing::duration_as_ms(&recv_start.elapsed()),
mms.len(),
);
let proc_start = Instant::now();
for (msgs, vers) in mms {
let reqs = Self::deserialize_requests(&msgs.read().unwrap());
reqs_len += reqs.len();
let req_vers = reqs.into_iter()
.zip(vers)
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
.filter(|x| {
let v = x.0.verify();
v
})
.collect();

debug!("partitioning");
let (events, reqs) = Self::partition_requests(req_vers);
debug!("events: {} reqs: {}", events.len(), reqs.len());

debug!("process_events");
let entry = event_processor.process_events(events)?;
entry_sender.send(entry)?;
debug!("done process_events");

debug!("process_requests");
let rsps = self.process_requests(reqs);
debug!("done process_requests");

let blobs = Self::serialize_responses(rsps, blob_recycler)?;
if !blobs.is_empty() {
info!("process: sending blobs: {}", blobs.len());
//don't wake up the other side if there is nothing
blob_sender.send(blobs)?;
}
packet_recycler.recycle(msgs);
}
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
info!(
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
timing::timestamp(),
mms_len,
total_time_ms,
reqs_len,
(reqs_len as f32) / (total_time_s)
);
Ok(())
}
}
Loading