From a76f6af9b5fd203705d0e7f4993785e9fc587de9 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 14 Jun 2018 12:31:52 -0700 Subject: [PATCH 1/5] purge validators we havent seen for a long time --- src/crdt.rs | 68 ++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index b7e6ef591cb796..ceeb69ea443eec 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -32,6 +32,9 @@ use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; use streamer::{BlobReceiver, BlobSender, Window}; +use timing::timestamp; + +const GOSSIP_SLEEP_MILLIS: u64 = 100; pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); @@ -167,6 +170,7 @@ pub struct Crdt { /// The value of the remote update index that I have last seen /// This Node will ask external nodes for updates since the value in this list pub remote: HashMap, + pub alive: HashMap, pub update_index: u64, pub me: PublicKey, } @@ -191,6 +195,7 @@ impl Crdt { table: HashMap::new(), local: HashMap::new(), remote: HashMap::new(), + alive: HashMap::new(), me: me.id, update_index: 1, }; @@ -235,6 +240,35 @@ impl Crdt { self.table[&v.id].version ); } + //update the liveness table + let now = timestamp(); + *self.alive.entry(v.id).or_insert(now) = now; + } + + /// purge old validators + /// TODO: we need a robust membership protocol + /// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf + /// challenging part is that we are on a permissionless network + pub fn purge(&mut self, now: u64) { + //wait for 4x as long as it would randomly take to reach our node + //assuming everyone is waiting the same amount of time as this node + let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4; + let purge_set: Vec = self.alive + .iter() + .filter_map(|(k, v)| { + if *k != self.me && (now - v) > limit { + Some((*k).clone()) + } else { + None + } + }) + .collect(); + for p in purge_set.iter() { + self.alive.remove(p); + self.table.remove(p); + self.remote.remove(p); + self.local.remove(p); + } } pub fn index_blobs( @@ -527,7 +561,7 @@ impl Crdt { } //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep - sleep(Duration::from_millis(100)); + sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); }) .unwrap() } @@ -572,9 +606,9 @@ impl Crdt { ) -> Option { match deserialize(&blob.data[..blob.meta.size]) { // TODO sigverify these - Ok(Protocol::RequestUpdates(v, reqdata)) => { + Ok(Protocol::RequestUpdates(v, from_rd)) => { trace!("RequestUpdates {}", v); - let addr = reqdata.gossip_addr; + let addr = from_rd.gossip_addr; // only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = obj.read() .expect("'obj' read lock in RequestUpdates") @@ -582,7 +616,7 @@ impl Crdt { trace!("get updates since response {} {}", v, data.len()); let len = data.len(); let rsp = Protocol::ReceiveUpdates(from, ups, data); - obj.write().unwrap().insert(&reqdata); + obj.write().unwrap().insert(&from_rd); if len < 1 { let me = obj.read().unwrap(); trace!( @@ -597,7 +631,7 @@ impl Crdt { "sending updates me {:?} len {} to {:?} {}", &obj.read().unwrap().me[..4], len, - &reqdata.id[..4], + &from_rd.id[..4], addr, ); Some(r) @@ -746,7 +780,7 @@ impl TestNode { #[cfg(test)] mod tests { - use crdt::{parse_port_or_addr, Crdt, ReplicatedData}; + use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS}; use packet::BlobRecycler; use result::Error; use signature::{KeyPair, KeyPairUtil}; @@ -985,6 +1019,28 @@ mod tests { assert!(one && two); } + #[test] + fn purge_test() { + let me = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); + let mut crdt = Crdt::new(me.clone()); + let nxt = ReplicatedData::new_entry_point("127.0.0.2:1234".parse().unwrap()); + crdt.insert(&nxt); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + let now = crdt.alive[&nxt.id]; + crdt.purge(now); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + + crdt.purge(now + GOSSIP_SLEEP_MILLIS * 4); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + + crdt.purge(now + GOSSIP_SLEEP_MILLIS * 4 + 1); + let rv = crdt.gossip_request(); + assert_matches!(rv, Err(Error::CrdtTooSmall)); + } + /// test window requests respond with the right blob, and do not overrun #[test] fn run_window_request() { From 10fbd8d0b7d008fe01368ad956fe467ac97928a3 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 14 Jun 2018 12:37:33 -0700 Subject: [PATCH 2/5] tests pass --- src/crdt.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index ceeb69ea443eec..8d79d2de1236f5 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -257,8 +257,10 @@ impl Crdt { .iter() .filter_map(|(k, v)| { if *k != self.me && (now - v) > limit { + trace!("purging {:?} {}", &k[..4], v); Some((*k).clone()) } else { + trace!("purge skipped {:?} {}", &k[..4], v); None } }) @@ -1021,22 +1023,24 @@ mod tests { #[test] fn purge_test() { - let me = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); + let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let mut crdt = Crdt::new(me.clone()); - let nxt = ReplicatedData::new_entry_point("127.0.0.2:1234".parse().unwrap()); + let nxt = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); + assert_ne!(me.id, nxt.id); crdt.insert(&nxt); let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt.gossip_addr); let now = crdt.alive[&nxt.id]; + let len = crdt.table.len() as u64; crdt.purge(now); let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt.gossip_addr); - crdt.purge(now + GOSSIP_SLEEP_MILLIS * 4); + crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4); let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt.gossip_addr); - crdt.purge(now + GOSSIP_SLEEP_MILLIS * 4 + 1); + crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); let rv = crdt.gossip_request(); assert_matches!(rv, Err(Error::CrdtTooSmall)); } From 688aa49f683750f6ec0834c2ee3687a85539e6ac Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 14 Jun 2018 22:03:49 -0700 Subject: [PATCH 3/5] min table size before purge --- src/crdt.rs | 44 +++++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index 8d79d2de1236f5..b11423e33fc6f1 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -34,8 +34,12 @@ use std::time::Duration; use streamer::{BlobReceiver, BlobSender, Window}; use timing::timestamp; +/// milliseconds we sleep for between gossip requests const GOSSIP_SLEEP_MILLIS: u64 = 100; +/// minimum membership table size before we start purging dead nodes +const MIN_TABLE_SIZE: usize = 2; + pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); if let Some(addrstr) = optstr { @@ -250,26 +254,29 @@ impl Crdt { /// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf /// challenging part is that we are on a permissionless network pub fn purge(&mut self, now: u64) { + if self.table.len() <= MIN_TABLE_SIZE { + return; + } //wait for 4x as long as it would randomly take to reach our node //assuming everyone is waiting the same amount of time as this node let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4; - let purge_set: Vec = self.alive + let dead_ids: Vec = self.alive .iter() - .filter_map(|(k, v)| { - if *k != self.me && (now - v) > limit { - trace!("purging {:?} {}", &k[..4], v); - Some((*k).clone()) + .filter_map(|(&k, v)| { + if k != self.me && (now - v) > limit { + info!("purge {:?} {}", &k[..4], now - v); + Some(k) } else { - trace!("purge skipped {:?} {}", &k[..4], v); + trace!("purge skipped {:?} {} {}", &k[..4], now - v, limit); None } }) .collect(); - for p in purge_set.iter() { - self.alive.remove(p); - self.table.remove(p); - self.remote.remove(p); - self.local.remove(p); + for id in dead_ids.iter() { + self.alive.remove(id); + self.table.remove(id); + self.remote.remove(id); + self.local.remove(id); } } @@ -1041,8 +1048,19 @@ mod tests { assert_eq!(rv.0, nxt.gossip_addr); crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); - let rv = crdt.gossip_request(); - assert_matches!(rv, Err(Error::CrdtTooSmall)); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + + let nxt2 = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); + assert_ne!(me.id, nxt2.id); + assert_ne!(nxt.id, nxt2.id); + crdt.insert(&nxt2); + let len = crdt.table.len() as u64; + assert_eq!(3, len); + crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); + let rv = crdt.gossip_request().unwrap(); + assert_eq!(rv.0, nxt.gossip_addr); + assert_eq!(2, crdt.table.len()); } /// test window requests respond with the right blob, and do not overrun From 96a240f8b8215cffd0bf289f914ae11d9a08c41d Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 14 Jun 2018 22:06:53 -0700 Subject: [PATCH 4/5] purger --- src/crdt.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/crdt.rs b/src/crdt.rs index b11423e33fc6f1..bc986df41a9e65 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -565,6 +565,7 @@ impl Crdt { .name("solana-gossip".to_string()) .spawn(move || loop { let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler); + obj.write().unwrap().purge(timestamp()); if exit.load(Ordering::Relaxed) { return; } From 6f6bc327c007c02ace641b29c6b16084e5621fda Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 14 Jun 2018 22:15:08 -0700 Subject: [PATCH 5/5] make sure we test large tables --- src/crdt.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index bc986df41a9e65..a29dead3569406 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -790,7 +790,7 @@ impl TestNode { #[cfg(test)] mod tests { - use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS}; + use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE}; use packet::BlobRecycler; use result::Error; use signature::{KeyPair, KeyPairUtil}; @@ -1057,7 +1057,9 @@ mod tests { assert_ne!(nxt.id, nxt2.id); crdt.insert(&nxt2); let len = crdt.table.len() as u64; - assert_eq!(3, len); + assert!((MIN_TABLE_SIZE as u64) < len); + crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4); + assert_eq!(len as usize, crdt.table.len()); crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt.gossip_addr);