-
Notifications
You must be signed in to change notification settings - Fork 4.5k
initialize recycled data #967
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,7 @@ pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE; | |
pub const PACKET_DATA_SIZE: usize = 256; | ||
pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; | ||
|
||
#[derive(Clone, Default, Debug)] | ||
#[derive(Clone, Default, Debug, PartialEq)] | ||
#[repr(C)] | ||
pub struct Meta { | ||
pub size: usize, | ||
|
@@ -63,6 +63,19 @@ impl Default for Packet { | |
} | ||
} | ||
|
||
pub trait Reset { | ||
// Reset trait is an object that can re-initialize important parts | ||
// of itself, similar to Default, but not necessarily a full clear | ||
// also, we do it in-place. | ||
fn reset(&mut self); | ||
} | ||
|
||
impl Reset for Packet { | ||
fn reset(&mut self) { | ||
self.meta = Meta::default(); | ||
} | ||
} | ||
|
||
impl Meta { | ||
pub fn addr(&self) -> SocketAddr { | ||
if !self.v6 { | ||
|
@@ -113,6 +126,14 @@ impl Default for Packets { | |
} | ||
} | ||
|
||
impl Reset for Packets { | ||
fn reset(&mut self) { | ||
for i in 0..self.packets.len() { | ||
self.packets[i].reset(); | ||
} | ||
} | ||
} | ||
|
||
#[derive(Clone)] | ||
pub struct Blob { | ||
pub data: [u8; BLOB_SIZE], | ||
|
@@ -140,6 +161,13 @@ impl Default for Blob { | |
} | ||
} | ||
|
||
impl Reset for Blob { | ||
fn reset(&mut self) { | ||
self.meta = Meta::default(); | ||
self.data[..BLOB_HEADER_SIZE].copy_from_slice(&[0u8; BLOB_HEADER_SIZE]); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: probably non-optimal memset There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. annoyingly doesn't seem like rust has a safe one at the moment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's a better memset() look like? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I assume this is doing an allocation with a slice of 0 bytes and then doing a memcpy, but allocation is not needed just zero fill with simd movs. |
||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
pub enum BlobError { | ||
/// the Blob's meta and data are not self-consistent | ||
|
@@ -166,25 +194,35 @@ impl<T: Default> Clone for Recycler<T> { | |
} | ||
} | ||
|
||
impl<T: Default> Recycler<T> { | ||
impl<T: Default + Reset> Recycler<T> { | ||
pub fn allocate(&self) -> Arc<RwLock<T>> { | ||
let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate"); | ||
let x = gc | ||
.pop() | ||
.unwrap_or_else(|| Arc::new(RwLock::new(Default::default()))); | ||
|
||
// Only return the item if this recycler is the last reference to it. | ||
// Remove this check once `T` holds a Weak reference back to this | ||
// recycler and implements `Drop`. At the time of this writing, Weak can't | ||
// be passed across threads ('alloc' is a nightly-only API), and so our | ||
// reference-counted recyclables are awkwardly being recycled by hand, | ||
// which allows this race condition to exist. | ||
if Arc::strong_count(&x) > 1 { | ||
warn!("Recycled item still in use. Booting it."); | ||
drop(gc); | ||
self.allocate() | ||
} else { | ||
x | ||
|
||
loop { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to drop recursion here when I was using default::Default in my initial SWAG at this; I learned that Rust apparently isn't smart about tail recursion, and in our tests (where a lot of strong-ref'd blobs are in the recycler) 64k per stack frame was being allocated, we recursed WINDOW_SIZE frames, and it completely blew up (no stack backtrace or nothin'). I prefer the loop form, though. I'm not a Haskell guy ;) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's probably LLVM that's the problem, not Rust. Haskell's GHC had to go add another calling convention to LLVM to allow for tail recursion. The iterative version is fine my me. If you want to write a test for that, looks like let builder = thread::Builder::new()
.name("kaboom".into())
.stack_size(32 * 1024 * 1024); // 32MB of stack space
let handler = builder.spawn(|| {
// stack-intensive operations
}).unwrap();
handler.join() // <--- assert this returns an Err There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 on iterative version :-) |
||
if let Some(x) = gc.pop() { | ||
// Only return the item if this recycler is the last reference to it. | ||
// Remove this check once `T` holds a Weak reference back to this | ||
// recycler and implements `Drop`. At the time of this writing, Weak can't | ||
// be passed across threads ('alloc' is a nightly-only API), and so our | ||
// reference-counted recyclables are awkwardly being recycled by hand, | ||
// which allows this race condition to exist. | ||
if Arc::strong_count(&x) >= 1 { | ||
// Commenting out this message, is annoying for known use case of | ||
// validator hanging onto a blob in the window, but also sending it over | ||
// to retransmmit_request | ||
// | ||
// warn!("Recycled item still in use. Booting it."); | ||
continue; | ||
} | ||
|
||
{ | ||
let mut w = x.write().unwrap(); | ||
w.reset(); | ||
} | ||
return x; | ||
} else { | ||
return Arc::new(RwLock::new(Default::default())); | ||
} | ||
} | ||
} | ||
pub fn recycle(&self, x: Arc<RwLock<T>>) { | ||
|
@@ -455,7 +493,8 @@ impl Blob { | |
#[cfg(test)] | ||
mod tests { | ||
use packet::{ | ||
to_packets, Blob, BlobRecycler, Packet, PacketRecycler, Packets, Recycler, NUM_PACKETS, | ||
to_packets, Blob, BlobRecycler, Meta, Packet, PacketRecycler, Packets, Recycler, Reset, | ||
BLOB_HEADER_SIZE, NUM_PACKETS, | ||
}; | ||
use request::Request; | ||
use std::collections::VecDeque; | ||
|
@@ -474,6 +513,12 @@ mod tests { | |
assert_eq!(r.gc.lock().unwrap().len(), 0); | ||
} | ||
|
||
impl Reset for u8 { | ||
fn reset(&mut self) { | ||
*self = Default::default(); | ||
} | ||
} | ||
|
||
#[test] | ||
pub fn test_leaked_recyclable() { | ||
// Ensure that the recycler won't return an item | ||
|
@@ -611,6 +656,9 @@ mod tests { | |
b.data_mut()[0] = 1; | ||
assert_eq!(b.data()[0], 1); | ||
assert_eq!(b.get_index().unwrap(), <u64>::max_value()); | ||
b.reset(); | ||
assert!(b.data[..BLOB_HEADER_SIZE].starts_with(&[0u8; BLOB_HEADER_SIZE])); | ||
assert_eq!(b.meta, Meta::default()); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be a function
get_out_dir() -> String
which can share this logic?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, eventually I'd like to collapse temp_dir stuff to a single implementation. I had to hurry up and throw this in because for some reason cargo stopped offering "OUT_DIR" in some of my test runs (like cargo watch -x test, maybe)...