-
Notifications
You must be signed in to change notification settings - Fork 4.5k
very dumb leader selection #425
Changes from 6 commits
43eaf84
cabc9a8
bef45b9
e41fcdd
bbdb8a2
a98f58b
1e2e0d0
304fd21
b800a12
b68903c
a286575
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,7 +36,12 @@ fn main() { | |
let mut opts = Options::new(); | ||
opts.optflag("h", "help", "print help"); | ||
opts.optopt("l", "", "run with the identity found in FILE", "FILE"); | ||
opts.optopt("v", "", "validate; find leader's identity in FILE", "FILE"); | ||
opts.optopt( | ||
"t", | ||
"", | ||
"testnet; connect to the network at this gossip entry point", | ||
"HOST:PORT", | ||
); | ||
opts.optopt( | ||
"o", | ||
"", | ||
|
@@ -119,14 +124,14 @@ fn main() { | |
} | ||
|
||
let exit = Arc::new(AtomicBool::new(false)); | ||
let threads = if matches.opt_present("v") { | ||
let path = matches.opt_str("v").unwrap(); | ||
let threads = if matches.opt_present("t") { | ||
let testnet = matches.opt_str("t").unwrap(); | ||
eprintln!( | ||
"starting validator... {} using {}", | ||
repl_data.requests_addr, path | ||
"starting validator... {} connecting to {}", | ||
repl_data.requests_addr, testnet | ||
); | ||
let file = File::open(path.clone()).expect(&format!("file not found: {}", path)); | ||
let leader = serde_json::from_reader(file).expect("parse"); | ||
let taddr = testnet.parse().unwrap(); | ||
let entry = ReplicatedData::new_entry_point(taddr); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The term |
||
let s = Server::new_validator( | ||
bank, | ||
repl_data.clone(), | ||
|
@@ -135,7 +140,7 @@ fn main() { | |
UdpSocket::bind(repl_data.replicate_addr).unwrap(), | ||
UdpSocket::bind(repl_data.gossip_addr).unwrap(), | ||
UdpSocket::bind(repl_data.repair_addr).unwrap(), | ||
leader, | ||
entry, | ||
exit.clone(), | ||
); | ||
s.thread_hdls | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -551,6 +551,32 @@ impl Crdt { | |
blob_sender.send(q)?; | ||
Ok(()) | ||
} | ||
/// FIXME: This is obviously the wrong way to do this. Need to implement leader selection | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's consistently used TODO or FIXME. Codebase currently uses TODO. If you prefer FIXME, can you change the others? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought FIXME was a higher priority TODO :) |
||
fn top_leader(&self) -> Option<PublicKey> { | ||
let mut table = HashMap::new(); | ||
let def = PublicKey::default(); | ||
let cur = self.table.values().filter(|x| x.current_leader_id != def); | ||
for v in cur { | ||
let cnt = table.entry(&v.current_leader_id).or_insert(0); | ||
*cnt += 1; | ||
trace!("leader {:?} {}", &v.current_leader_id[..4], *cnt); | ||
} | ||
let mut sorted: Vec<_> = table.iter().collect(); | ||
sorted.sort_by_key(|a| a.1); | ||
sorted.last().map(|a| *(*(*a).0)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can that be simplified? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hehe, its borrow checker all the way down. I am not sure it can be without copying the generic array key. and there is no way to sort an iterator. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was wrong |
||
} | ||
|
||
/// FIXME: This is obviously the wrong way to do this. Need to implement leader selection | ||
/// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems fair :) |
||
fn update_leader(&mut self) { | ||
if let Some(lid) = self.top_leader() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if self.my_data().current_leader_id != lid { | ||
if self.table.get(&lid).is_some() { | ||
self.set_leader(lid); | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Apply updates that we received from the identity `from` | ||
/// # Arguments | ||
|
@@ -577,14 +603,20 @@ impl Crdt { | |
Builder::new() | ||
.name("solana-gossip".to_string()) | ||
.spawn(move || loop { | ||
let start = timestamp(); | ||
let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler); | ||
obj.write().unwrap().purge(timestamp()); | ||
if exit.load(Ordering::Relaxed) { | ||
return; | ||
} | ||
//TODO: possibly tune this parameter | ||
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep | ||
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); | ||
let _ = obj.write().unwrap().update_leader(); | ||
let elapsed = timestamp() - start; | ||
if GOSSIP_SLEEP_MILLIS > elapsed { | ||
let left = GOSSIP_SLEEP_MILLIS - elapsed; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
sleep(Duration::from_millis(left)); | ||
} | ||
}) | ||
.unwrap() | ||
} | ||
|
@@ -829,6 +861,7 @@ impl TestNode { | |
#[cfg(test)] | ||
mod tests { | ||
use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE}; | ||
use logger; | ||
use packet::BlobRecycler; | ||
use result::Error; | ||
use signature::{KeyPair, KeyPairUtil}; | ||
|
@@ -1173,4 +1206,29 @@ mod tests { | |
assert_eq!(blob.get_id().unwrap(), id); | ||
} | ||
} | ||
/// FIXME: This is obviously the wrong way to do this. Need to implement leader selection, | ||
/// delete this test after leader selection is correctly implemented | ||
#[test] | ||
fn test_update_leader() { | ||
logger::setup(); | ||
let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); | ||
let lead = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
let lead2 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); | ||
let mut crdt = Crdt::new(me.clone()); | ||
assert_matches!(crdt.top_leader(), None); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
crdt.set_leader(lead.id); | ||
assert_eq!(crdt.top_leader().unwrap(), lead.id); | ||
//add a bunch of nodes with a new leader | ||
for _ in 0..10 { | ||
let mut dum = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); | ||
dum.current_leader_id = lead2.id; | ||
crdt.insert(&dum); | ||
} | ||
assert_eq!(crdt.top_leader().unwrap(), lead2.id); | ||
crdt.update_leader(); | ||
assert_eq!(crdt.my_data().current_leader_id, lead.id); | ||
crdt.insert(&lead2); | ||
crdt.update_leader(); | ||
assert_eq!(crdt.my_data().current_leader_id, lead2.id); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testnet_addr
?