Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

initialize recycled data #967

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1911,7 +1911,7 @@ mod tests {

fn tmp_ledger(name: &str) -> String {
use std::env;
let out_dir = env::var("OUT_DIR").unwrap();
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let keypair = Keypair::new();

let path = format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey());
Expand Down
2 changes: 1 addition & 1 deletion src/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ mod tests {

fn tmp_ledger_path(name: &str) -> String {
use std::env;
let out_dir = env::var("OUT_DIR").unwrap();
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let keypair = Keypair::new();

format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey())
Expand Down
2 changes: 1 addition & 1 deletion src/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ mod tests {

fn tmp_ledger_path(name: &str) -> String {
use std::env;
let out_dir = env::var("OUT_DIR").unwrap();
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be a function get_out_dir() -> String which can share this logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, eventually I'd like to collapse temp_dir stuff to a single implementation. I had to hurry up and throw this in because for some reason cargo stopped offering "OUT_DIR" in some of my test runs (like cargo watch -x test, maybe)...

let keypair = Keypair::new();

format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey())
Expand Down
86 changes: 67 additions & 19 deletions src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE;
pub const PACKET_DATA_SIZE: usize = 256;
pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE;

#[derive(Clone, Default, Debug)]
#[derive(Clone, Default, Debug, PartialEq)]
#[repr(C)]
pub struct Meta {
pub size: usize,
Expand Down Expand Up @@ -63,6 +63,19 @@ impl Default for Packet {
}
}

pub trait Reset {
// Reset trait is an object that can re-initialize important parts
// of itself, similar to Default, but not necessarily a full clear
// also, we do it in-place.
fn reset(&mut self);
}

impl Reset for Packet {
fn reset(&mut self) {
self.meta = Meta::default();
}
}

impl Meta {
pub fn addr(&self) -> SocketAddr {
if !self.v6 {
Expand Down Expand Up @@ -113,6 +126,14 @@ impl Default for Packets {
}
}

impl Reset for Packets {
fn reset(&mut self) {
for i in 0..self.packets.len() {
self.packets[i].reset();
}
}
}

#[derive(Clone)]
pub struct Blob {
pub data: [u8; BLOB_SIZE],
Expand Down Expand Up @@ -140,6 +161,13 @@ impl Default for Blob {
}
}

impl Reset for Blob {
fn reset(&mut self) {
self.meta = Meta::default();
self.data[..BLOB_HEADER_SIZE].copy_from_slice(&[0u8; BLOB_HEADER_SIZE]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: probably non-optimal memset

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

annoyingly doesn't seem like rust has a safe one at the moment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's a better memset() look like?

Copy link
Contributor

@sakridge sakridge Aug 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rust-lang/rfcs#2067

unsafe {
    ptr::write_bytes(self.slice.as_mut_ptr(), 0, self.slice.len() - 1);
}

I assume this is doing an allocation with a slice of 0 bytes and then doing a memcpy, but allocation is not needed just zero fill with simd movs.

}
}

#[derive(Debug)]
pub enum BlobError {
/// the Blob's meta and data are not self-consistent
Expand All @@ -166,25 +194,35 @@ impl<T: Default> Clone for Recycler<T> {
}
}

impl<T: Default> Recycler<T> {
impl<T: Default + Reset> Recycler<T> {
pub fn allocate(&self) -> Arc<RwLock<T>> {
let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate");
let x = gc
.pop()
.unwrap_or_else(|| Arc::new(RwLock::new(Default::default())));

// Only return the item if this recycler is the last reference to it.
// Remove this check once `T` holds a Weak reference back to this
// recycler and implements `Drop`. At the time of this writing, Weak can't
// be passed across threads ('alloc' is a nightly-only API), and so our
// reference-counted recyclables are awkwardly being recycled by hand,
// which allows this race condition to exist.
if Arc::strong_count(&x) > 1 {
warn!("Recycled item still in use. Booting it.");
drop(gc);
self.allocate()
} else {
x

loop {
Copy link
Contributor Author

@rob-solana rob-solana Aug 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to drop recursion here when I was using default::Default in my initial SWAG at this; I learned that Rust apparently isn't smart about tail recursion, and in our tests (where a lot of strong-ref'd blobs are in the recycler) 64k per stack frame was being allocated, we recursed WINDOW_SIZE frames, and it completely blew up (no stack backtrace or nothin').

I prefer the loop form, though. I'm not a Haskell guy ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably LLVM that's the problem, not Rust. Haskell's GHC had to go add another calling convention to LLVM to allow for tail recursion. The iterative version is fine my me. If you want to write a test for that, looks like thread::Builder lets you control the thread's stack size:

let builder = thread::Builder::new()
                  .name("kaboom".into())
                  .stack_size(32 * 1024 * 1024); // 32MB of stack space

    let handler = builder.spawn(|| {
        // stack-intensive operations
    }).unwrap();

    handler.join()  // <---   assert this returns an Err

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 on iterative version :-)

if let Some(x) = gc.pop() {
// Only return the item if this recycler is the last reference to it.
// Remove this check once `T` holds a Weak reference back to this
// recycler and implements `Drop`. At the time of this writing, Weak can't
// be passed across threads ('alloc' is a nightly-only API), and so our
// reference-counted recyclables are awkwardly being recycled by hand,
// which allows this race condition to exist.
if Arc::strong_count(&x) >= 1 {
// Commenting out this message, is annoying for known use case of
// validator hanging onto a blob in the window, but also sending it over
// to retransmmit_request
//
// warn!("Recycled item still in use. Booting it.");
continue;
}

{
let mut w = x.write().unwrap();
w.reset();
}
return x;
} else {
return Arc::new(RwLock::new(Default::default()));
}
}
}
pub fn recycle(&self, x: Arc<RwLock<T>>) {
Expand Down Expand Up @@ -455,7 +493,8 @@ impl Blob {
#[cfg(test)]
mod tests {
use packet::{
to_packets, Blob, BlobRecycler, Packet, PacketRecycler, Packets, Recycler, NUM_PACKETS,
to_packets, Blob, BlobRecycler, Meta, Packet, PacketRecycler, Packets, Recycler, Reset,
BLOB_HEADER_SIZE, NUM_PACKETS,
};
use request::Request;
use std::collections::VecDeque;
Expand All @@ -474,6 +513,12 @@ mod tests {
assert_eq!(r.gc.lock().unwrap().len(), 0);
}

impl Reset for u8 {
fn reset(&mut self) {
*self = Default::default();
}
}

#[test]
pub fn test_leaked_recyclable() {
// Ensure that the recycler won't return an item
Expand Down Expand Up @@ -611,6 +656,9 @@ mod tests {
b.data_mut()[0] = 1;
assert_eq!(b.data()[0], 1);
assert_eq!(b.get_index().unwrap(), <u64>::max_value());
b.reset();
assert!(b.data[..BLOB_HEADER_SIZE].starts_with(&[0u8; BLOB_HEADER_SIZE]));
assert_eq!(b.meta, Meta::default());
}

}
2 changes: 1 addition & 1 deletion src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ mod tests {

fn tmp_ledger(name: &str, mint: &Mint) -> String {
use std::env;
let out_dir = env::var("OUT_DIR").unwrap();
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let keypair = Keypair::new();

let path = format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey());
Expand Down