Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Purge old validators #355

Merged
merged 5 commits into from
Jun 15, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 87 additions & 6 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ use std::sync::{Arc, RwLock};
use std::thread::{sleep, Builder, JoinHandle};
use std::time::Duration;
use streamer::{BlobReceiver, BlobSender, Window};
use timing::timestamp;

/// milliseconds we sleep for between gossip requests
const GOSSIP_SLEEP_MILLIS: u64 = 100;

/// minimum membership table size before we start purging dead nodes
const MIN_TABLE_SIZE: usize = 2;

pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
Expand Down Expand Up @@ -167,6 +174,7 @@ pub struct Crdt {
/// The value of the remote update index that I have last seen
/// This Node will ask external nodes for updates since the value in this list
pub remote: HashMap<PublicKey, u64>,
pub alive: HashMap<PublicKey, u64>,
pub update_index: u64,
pub me: PublicKey,
}
Expand All @@ -191,6 +199,7 @@ impl Crdt {
table: HashMap::new(),
local: HashMap::new(),
remote: HashMap::new(),
alive: HashMap::new(),
me: me.id,
update_index: 1,
};
Expand Down Expand Up @@ -235,6 +244,40 @@ impl Crdt {
self.table[&v.id].version
);
}
//update the liveness table
let now = timestamp();
*self.alive.entry(v.id).or_insert(now) = now;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

/// purge old validators
/// TODO: we need a robust membership protocol
/// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
/// challenging part is that we are on a permissionless network
pub fn purge(&mut self, now: u64) {
if self.table.len() <= MIN_TABLE_SIZE {
return;
}
//wait for 4x as long as it would randomly take to reach our node
//assuming everyone is waiting the same amount of time as this node
let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4;
let dead_ids: Vec<PublicKey> = self.alive
.iter()
.filter_map(|(&k, v)| {
if k != self.me && (now - v) > limit {
info!("purge {:?} {}", &k[..4], now - v);
Some(k)
} else {
trace!("purge skipped {:?} {} {}", &k[..4], now - v, limit);
None
}
})
.collect();
for id in dead_ids.iter() {
self.alive.remove(id);
self.table.remove(id);
self.remote.remove(id);
self.local.remove(id);
}
}

pub fn index_blobs(
Expand Down Expand Up @@ -522,12 +565,13 @@ impl Crdt {
.name("solana-gossip".to_string())
.spawn(move || loop {
let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler);
obj.write().unwrap().purge(timestamp());
if exit.load(Ordering::Relaxed) {
return;
}
//TODO: possibly tune this parameter
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
sleep(Duration::from_millis(100));
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
})
.unwrap()
}
Expand Down Expand Up @@ -572,17 +616,17 @@ impl Crdt {
) -> Option<SharedBlob> {
match deserialize(&blob.data[..blob.meta.size]) {
// TODO sigverify these
Ok(Protocol::RequestUpdates(v, reqdata)) => {
Ok(Protocol::RequestUpdates(v, from_rd)) => {
trace!("RequestUpdates {}", v);
let addr = reqdata.gossip_addr;
let addr = from_rd.gossip_addr;
// only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = obj.read()
.expect("'obj' read lock in RequestUpdates")
.get_updates_since(v);
trace!("get updates since response {} {}", v, data.len());
let len = data.len();
let rsp = Protocol::ReceiveUpdates(from, ups, data);
obj.write().unwrap().insert(&reqdata);
obj.write().unwrap().insert(&from_rd);
if len < 1 {
let me = obj.read().unwrap();
trace!(
Expand All @@ -597,7 +641,7 @@ impl Crdt {
"sending updates me {:?} len {} to {:?} {}",
&obj.read().unwrap().me[..4],
len,
&reqdata.id[..4],
&from_rd.id[..4],
addr,
);
Some(r)
Expand Down Expand Up @@ -746,7 +790,7 @@ impl TestNode {

#[cfg(test)]
mod tests {
use crdt::{parse_port_or_addr, Crdt, ReplicatedData};
use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE};
use packet::BlobRecycler;
use result::Error;
use signature::{KeyPair, KeyPairUtil};
Expand Down Expand Up @@ -985,6 +1029,43 @@ mod tests {
assert!(one && two);
}

#[test]
fn purge_test() {
let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap());
let mut crdt = Crdt::new(me.clone());
let nxt = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap());
assert_ne!(me.id, nxt.id);
crdt.insert(&nxt);
let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.gossip_addr);
let now = crdt.alive[&nxt.id];
let len = crdt.table.len() as u64;
crdt.purge(now);
let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.gossip_addr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlining rv on all 4 assertions would make this test easier to read

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes really long lines, harder to read for some

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4);
let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.gossip_addr);

crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1);
let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.gossip_addr);

let nxt2 = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap());
assert_ne!(me.id, nxt2.id);
assert_ne!(nxt.id, nxt2.id);
crdt.insert(&nxt2);
let len = crdt.table.len() as u64;
assert!((MIN_TABLE_SIZE as u64) < len);
crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4);
assert_eq!(len as usize, crdt.table.len());
crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1);
let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.gossip_addr);
assert_eq!(2, crdt.table.len());
}

/// test window requests respond with the right blob, and do not overrun
#[test]
fn run_window_request() {
Expand Down