diff --git a/src/crdt.rs b/src/crdt.rs index b7e6ef591cb796..a29dead3569406 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -32,6 +32,13 @@ use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; use streamer::{BlobReceiver, BlobSender, Window}; +use timing::timestamp; + +/// milliseconds we sleep for between gossip requests +const GOSSIP_SLEEP_MILLIS: u64 = 100; + +/// minimum membership table size before we start purging dead nodes +const MIN_TABLE_SIZE: usize = 2; pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); @@ -167,6 +174,7 @@ pub struct Crdt { /// The value of the remote update index that I have last seen /// This Node will ask external nodes for updates since the value in this list pub remote: HashMap, + pub alive: HashMap, pub update_index: u64, pub me: PublicKey, } @@ -191,6 +199,7 @@ impl Crdt { table: HashMap::new(), local: HashMap::new(), remote: HashMap::new(), + alive: HashMap::new(), me: me.id, update_index: 1, }; @@ -235,6 +244,40 @@ impl Crdt { self.table[&v.id].version ); } + //update the liveness table + let now = timestamp(); + *self.alive.entry(v.id).or_insert(now) = now; + } + + /// purge old validators + /// TODO: we need a robust membership protocol + /// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf + /// challenging part is that we are on a permissionless network + pub fn purge(&mut self, now: u64) { + if self.table.len() <= MIN_TABLE_SIZE { + return; + } + //wait for 4x as long as it would randomly take to reach our node + //assuming everyone is waiting the same amount of time as this node + let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4; + let dead_ids: Vec = self.alive + .iter() + .filter_map(|(&k, v)| { + if k != self.me && (now - v) > limit { + info!("purge {:?} {}", &k[..4], now - v); + Some(k) + } else { + trace!("purge skipped {:?} {} {}", &k[..4], now - v, limit); + None + } + }) + .collect(); + for id in dead_ids.iter() { + self.alive.remove(id); + self.table.remove(id); + self.remote.remove(id); + self.local.remove(id); + } } pub fn index_blobs( @@ -522,12 +565,13 @@ impl Crdt { .name("solana-gossip".to_string()) .spawn(move || loop { let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler); + obj.write().unwrap().purge(timestamp()); if exit.load(Ordering::Relaxed) { return; } //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep - sleep(Duration::from_millis(100)); + sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); }) .unwrap() } @@ -572,9 +616,9 @@ impl Crdt { ) -> Option { match deserialize(&blob.data[..blob.meta.size]) { // TODO sigverify these - Ok(Protocol::RequestUpdates(v, reqdata)) => { + Ok(Protocol::RequestUpdates(v, from_rd)) => { trace!("RequestUpdates {}", v); - let addr = reqdata.gossip_addr; + let addr = from_rd.gossip_addr; // only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = obj.read() .expect("'obj' read lock in RequestUpdates") @@ -582,7 +626,7 @@ impl Crdt { trace!("get updates since response {} {}", v, data.len()); let len = data.len(); let rsp = Protocol::ReceiveUpdates(from, ups, data); - obj.write().unwrap().insert(&reqdata); + obj.write().unwrap().insert(&from_rd); if len < 1 { let me = obj.read().unwrap(); trace!( @@ -597,7 +641,7 @@ impl Crdt { "sending updates me {:?} len {} to {:?} {}", &obj.read().unwrap().me[..4], len, - &reqdata.id[..4], + &from_rd.id[..4], addr, ); Some(r) @@ -746,7 +790,7 @@ impl TestNode { #[cfg(test)] mod tests { - use crdt::{parse_port_or_addr, Crdt, ReplicatedData}; + use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE}; use packet::BlobRecycler; use result::Error; use signature::{KeyPair, KeyPairUtil}; @@ -985,6 +1029,43 @@ mod tests { assert!(one && two); } + #[test] + fn purge_test() { + let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let mut crdt = Crdt::new(me.clone()); + let nxt = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); + assert_ne!(me.id, nxt.id); + crdt.insert(&nxt); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + let now = crdt.alive[&nxt.id]; + let len = crdt.table.len() as u64; + crdt.purge(now); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + + crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + + crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + + let nxt2 = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); + assert_ne!(me.id, nxt2.id); + assert_ne!(nxt.id, nxt2.id); + crdt.insert(&nxt2); + let len = crdt.table.len() as u64; + assert!((MIN_TABLE_SIZE as u64) < len); + crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4); + assert_eq!(len as usize, crdt.table.len()); + crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + assert_eq!(2, crdt.table.len()); + } + /// test window requests respond with the right blob, and do not overrun #[test] fn run_window_request() {