diff --git a/src/crdt.rs b/src/crdt.rs index 3c4daf30a281b2..01f2e8a6503121 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -1911,7 +1911,7 @@ mod tests { fn tmp_ledger(name: &str) -> String { use std::env; - let out_dir = env::var("OUT_DIR").unwrap(); + let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); let keypair = Keypair::new(); let path = format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey()); diff --git a/src/drone.rs b/src/drone.rs index 698b0e2d3b2b09..362416ca26e5fd 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -261,7 +261,7 @@ mod tests { fn tmp_ledger_path(name: &str) -> String { use std::env; - let out_dir = env::var("OUT_DIR").unwrap(); + let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); let keypair = Keypair::new(); format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey()) diff --git a/src/ledger.rs b/src/ledger.rs index 1f558d550f8034..d967f6f815a558 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -550,7 +550,7 @@ mod tests { fn tmp_ledger_path(name: &str) -> String { use std::env; - let out_dir = env::var("OUT_DIR").unwrap(); + let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); let keypair = Keypair::new(); format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey()) diff --git a/src/packet.rs b/src/packet.rs index dacd0f965d6e08..33ced51cd0b0d0 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -26,7 +26,7 @@ pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE; pub const PACKET_DATA_SIZE: usize = 256; pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; -#[derive(Clone, Default, Debug)] +#[derive(Clone, Default, Debug, PartialEq)] #[repr(C)] pub struct Meta { pub size: usize, @@ -63,6 +63,19 @@ impl Default for Packet { } } +pub trait Reset { + // Reset trait is an object that can re-initialize important parts + // of itself, similar to Default, but not necessarily a full clear + // also, we do it in-place. + fn reset(&mut self); +} + +impl Reset for Packet { + fn reset(&mut self) { + self.meta = Meta::default(); + } +} + impl Meta { pub fn addr(&self) -> SocketAddr { if !self.v6 { @@ -113,6 +126,14 @@ impl Default for Packets { } } +impl Reset for Packets { + fn reset(&mut self) { + for i in 0..self.packets.len() { + self.packets[i].reset(); + } + } +} + #[derive(Clone)] pub struct Blob { pub data: [u8; BLOB_SIZE], @@ -140,6 +161,13 @@ impl Default for Blob { } } +impl Reset for Blob { + fn reset(&mut self) { + self.meta = Meta::default(); + self.data[..BLOB_HEADER_SIZE].copy_from_slice(&[0u8; BLOB_HEADER_SIZE]); + } +} + #[derive(Debug)] pub enum BlobError { /// the Blob's meta and data are not self-consistent @@ -166,25 +194,35 @@ impl Clone for Recycler { } } -impl Recycler { +impl Recycler { pub fn allocate(&self) -> Arc> { let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate"); - let x = gc - .pop() - .unwrap_or_else(|| Arc::new(RwLock::new(Default::default()))); - - // Only return the item if this recycler is the last reference to it. - // Remove this check once `T` holds a Weak reference back to this - // recycler and implements `Drop`. At the time of this writing, Weak can't - // be passed across threads ('alloc' is a nightly-only API), and so our - // reference-counted recyclables are awkwardly being recycled by hand, - // which allows this race condition to exist. - if Arc::strong_count(&x) > 1 { - warn!("Recycled item still in use. Booting it."); - drop(gc); - self.allocate() - } else { - x + + loop { + if let Some(x) = gc.pop() { + // Only return the item if this recycler is the last reference to it. + // Remove this check once `T` holds a Weak reference back to this + // recycler and implements `Drop`. At the time of this writing, Weak can't + // be passed across threads ('alloc' is a nightly-only API), and so our + // reference-counted recyclables are awkwardly being recycled by hand, + // which allows this race condition to exist. + if Arc::strong_count(&x) >= 1 { + // Commenting out this message, is annoying for known use case of + // validator hanging onto a blob in the window, but also sending it over + // to retransmmit_request + // + // warn!("Recycled item still in use. Booting it."); + continue; + } + + { + let mut w = x.write().unwrap(); + w.reset(); + } + return x; + } else { + return Arc::new(RwLock::new(Default::default())); + } } } pub fn recycle(&self, x: Arc>) { @@ -455,7 +493,8 @@ impl Blob { #[cfg(test)] mod tests { use packet::{ - to_packets, Blob, BlobRecycler, Packet, PacketRecycler, Packets, Recycler, NUM_PACKETS, + to_packets, Blob, BlobRecycler, Meta, Packet, PacketRecycler, Packets, Recycler, Reset, + BLOB_HEADER_SIZE, NUM_PACKETS, }; use request::Request; use std::collections::VecDeque; @@ -474,6 +513,12 @@ mod tests { assert_eq!(r.gc.lock().unwrap().len(), 0); } + impl Reset for u8 { + fn reset(&mut self) { + *self = Default::default(); + } + } + #[test] pub fn test_leaked_recyclable() { // Ensure that the recycler won't return an item @@ -611,6 +656,9 @@ mod tests { b.data_mut()[0] = 1; assert_eq!(b.data()[0], 1); assert_eq!(b.get_index().unwrap(), ::max_value()); + b.reset(); + assert!(b.data[..BLOB_HEADER_SIZE].starts_with(&[0u8; BLOB_HEADER_SIZE])); + assert_eq!(b.meta, Meta::default()); } } diff --git a/src/thin_client.rs b/src/thin_client.rs index 918c7c1d0f872e..44884f4c197d51 100755 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -332,7 +332,7 @@ mod tests { fn tmp_ledger(name: &str, mint: &Mint) -> String { use std::env; - let out_dir = env::var("OUT_DIR").unwrap(); + let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); let keypair = Keypair::new(); let path = format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey());