This repository has been archived by the owner on Jan 13, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Purge old validators #355
Merged
Merged
Purge old validators #355
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,13 @@ use std::sync::{Arc, RwLock}; | |
use std::thread::{sleep, Builder, JoinHandle}; | ||
use std::time::Duration; | ||
use streamer::{BlobReceiver, BlobSender, Window}; | ||
use timing::timestamp; | ||
|
||
/// milliseconds we sleep for between gossip requests | ||
const GOSSIP_SLEEP_MILLIS: u64 = 100; | ||
|
||
/// minimum membership table size before we start purging dead nodes | ||
const MIN_TABLE_SIZE: usize = 2; | ||
|
||
pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr { | ||
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); | ||
|
@@ -167,6 +174,7 @@ pub struct Crdt { | |
/// The value of the remote update index that I have last seen | ||
/// This Node will ask external nodes for updates since the value in this list | ||
pub remote: HashMap<PublicKey, u64>, | ||
pub alive: HashMap<PublicKey, u64>, | ||
pub update_index: u64, | ||
pub me: PublicKey, | ||
} | ||
|
@@ -191,6 +199,7 @@ impl Crdt { | |
table: HashMap::new(), | ||
local: HashMap::new(), | ||
remote: HashMap::new(), | ||
alive: HashMap::new(), | ||
me: me.id, | ||
update_index: 1, | ||
}; | ||
|
@@ -235,6 +244,40 @@ impl Crdt { | |
self.table[&v.id].version | ||
); | ||
} | ||
//update the liveness table | ||
let now = timestamp(); | ||
*self.alive.entry(v.id).or_insert(now) = now; | ||
} | ||
|
||
/// purge old validators | ||
/// TODO: we need a robust membership protocol | ||
/// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf | ||
/// challenging part is that we are on a permissionless network | ||
pub fn purge(&mut self, now: u64) { | ||
if self.table.len() <= MIN_TABLE_SIZE { | ||
return; | ||
} | ||
//wait for 4x as long as it would randomly take to reach our node | ||
//assuming everyone is waiting the same amount of time as this node | ||
let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4; | ||
let dead_ids: Vec<PublicKey> = self.alive | ||
.iter() | ||
.filter_map(|(&k, v)| { | ||
if k != self.me && (now - v) > limit { | ||
info!("purge {:?} {}", &k[..4], now - v); | ||
Some(k) | ||
} else { | ||
trace!("purge skipped {:?} {} {}", &k[..4], now - v, limit); | ||
None | ||
} | ||
}) | ||
.collect(); | ||
for id in dead_ids.iter() { | ||
self.alive.remove(id); | ||
self.table.remove(id); | ||
self.remote.remove(id); | ||
self.local.remove(id); | ||
} | ||
} | ||
|
||
pub fn index_blobs( | ||
|
@@ -522,12 +565,13 @@ impl Crdt { | |
.name("solana-gossip".to_string()) | ||
.spawn(move || loop { | ||
let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler); | ||
obj.write().unwrap().purge(timestamp()); | ||
if exit.load(Ordering::Relaxed) { | ||
return; | ||
} | ||
//TODO: possibly tune this parameter | ||
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep | ||
sleep(Duration::from_millis(100)); | ||
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); | ||
}) | ||
.unwrap() | ||
} | ||
|
@@ -572,17 +616,17 @@ impl Crdt { | |
) -> Option<SharedBlob> { | ||
match deserialize(&blob.data[..blob.meta.size]) { | ||
// TODO sigverify these | ||
Ok(Protocol::RequestUpdates(v, reqdata)) => { | ||
Ok(Protocol::RequestUpdates(v, from_rd)) => { | ||
trace!("RequestUpdates {}", v); | ||
let addr = reqdata.gossip_addr; | ||
let addr = from_rd.gossip_addr; | ||
// only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from` | ||
let (from, ups, data) = obj.read() | ||
.expect("'obj' read lock in RequestUpdates") | ||
.get_updates_since(v); | ||
trace!("get updates since response {} {}", v, data.len()); | ||
let len = data.len(); | ||
let rsp = Protocol::ReceiveUpdates(from, ups, data); | ||
obj.write().unwrap().insert(&reqdata); | ||
obj.write().unwrap().insert(&from_rd); | ||
if len < 1 { | ||
let me = obj.read().unwrap(); | ||
trace!( | ||
|
@@ -597,7 +641,7 @@ impl Crdt { | |
"sending updates me {:?} len {} to {:?} {}", | ||
&obj.read().unwrap().me[..4], | ||
len, | ||
&reqdata.id[..4], | ||
&from_rd.id[..4], | ||
addr, | ||
); | ||
Some(r) | ||
|
@@ -746,7 +790,7 @@ impl TestNode { | |
|
||
#[cfg(test)] | ||
mod tests { | ||
use crdt::{parse_port_or_addr, Crdt, ReplicatedData}; | ||
use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE}; | ||
use packet::BlobRecycler; | ||
use result::Error; | ||
use signature::{KeyPair, KeyPairUtil}; | ||
|
@@ -985,6 +1029,43 @@ mod tests { | |
assert!(one && two); | ||
} | ||
|
||
#[test] | ||
fn purge_test() { | ||
let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); | ||
let mut crdt = Crdt::new(me.clone()); | ||
let nxt = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); | ||
assert_ne!(me.id, nxt.id); | ||
crdt.insert(&nxt); | ||
let rv = crdt.gossip_request().unwrap(); | ||
assert_eq!(rv.0, nxt.gossip_addr); | ||
let now = crdt.alive[&nxt.id]; | ||
let len = crdt.table.len() as u64; | ||
crdt.purge(now); | ||
let rv = crdt.gossip_request().unwrap(); | ||
assert_eq!(rv.0, nxt.gossip_addr); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inlining There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it makes really long lines, harder to read for some There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4); | ||
let rv = crdt.gossip_request().unwrap(); | ||
assert_eq!(rv.0, nxt.gossip_addr); | ||
|
||
crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); | ||
let rv = crdt.gossip_request().unwrap(); | ||
assert_eq!(rv.0, nxt.gossip_addr); | ||
|
||
let nxt2 = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); | ||
assert_ne!(me.id, nxt2.id); | ||
assert_ne!(nxt.id, nxt2.id); | ||
crdt.insert(&nxt2); | ||
let len = crdt.table.len() as u64; | ||
assert!((MIN_TABLE_SIZE as u64) < len); | ||
crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4); | ||
assert_eq!(len as usize, crdt.table.len()); | ||
crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); | ||
let rv = crdt.gossip_request().unwrap(); | ||
assert_eq!(rv.0, nxt.gossip_addr); | ||
assert_eq!(2, crdt.table.len()); | ||
} | ||
|
||
/// test window requests respond with the right blob, and do not overrun | ||
#[test] | ||
fn run_window_request() { | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍