Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

very dumb leader selection #425

Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions src/bin/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ fn main() {
let mut opts = Options::new();
opts.optflag("h", "help", "print help");
opts.optopt("l", "", "run with the identity found in FILE", "FILE");
opts.optopt("v", "", "validate; find leader's identity in FILE", "FILE");
opts.optopt(
"t",
"",
"testnet; connect to the network at this gossip entry point",
"HOST:PORT",
);
opts.optopt(
"o",
"",
Expand Down Expand Up @@ -119,14 +124,14 @@ fn main() {
}

let exit = Arc::new(AtomicBool::new(false));
let threads = if matches.opt_present("v") {
let path = matches.opt_str("v").unwrap();
let threads = if matches.opt_present("t") {
let testnet = matches.opt_str("t").unwrap();
eprintln!(
"starting validator... {} using {}",
repl_data.requests_addr, path
"starting validator... {} connecting to {}",
repl_data.requests_addr, testnet
);
let file = File::open(path.clone()).expect(&format!("file not found: {}", path));
let leader = serde_json::from_reader(file).expect("parse");
let taddr = testnet.parse().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testnet_addr?

let entry = ReplicatedData::new_entry_point(taddr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The term entry is already actively used throughout the codebase. Can you find another?

let s = Server::new_validator(
bank,
repl_data.clone(),
Expand All @@ -135,7 +140,7 @@ fn main() {
UdpSocket::bind(repl_data.replicate_addr).unwrap(),
UdpSocket::bind(repl_data.gossip_addr).unwrap(),
UdpSocket::bind(repl_data.repair_addr).unwrap(),
leader,
entry,
exit.clone(),
);
s.thread_hdls
Expand Down
60 changes: 59 additions & 1 deletion src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,32 @@ impl Crdt {
blob_sender.send(q)?;
Ok(())
}
/// FIXME: This is obviously the wrong way to do this. Need to implement leader selection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's consistently used TODO or FIXME. Codebase currently uses TODO. If you prefer FIXME, can you change the others?

Copy link
Member Author

@aeyakovenko aeyakovenko Jun 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought FIXME was a higher priority TODO :)

fn top_leader(&self) -> Option<PublicKey> {
let mut table = HashMap::new();
let def = PublicKey::default();
let cur = self.table.values().filter(|x| x.current_leader_id != def);
for v in cur {
let cnt = table.entry(&v.current_leader_id).or_insert(0);
*cnt += 1;
trace!("leader {:?} {}", &v.current_leader_id[..4], *cnt);
}
let mut sorted: Vec<_> = table.iter().collect();
sorted.sort_by_key(|a| a.1);
sorted.last().map(|a| *(*(*a).0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can that be simplified?

Copy link
Member Author

@aeyakovenko aeyakovenko Jun 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hehe, its borrow checker all the way down. I am not sure it can be without copying the generic array key. and there is no way to sort an iterator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong

}

/// FIXME: This is obviously the wrong way to do this. Need to implement leader selection
/// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fair :)

fn update_leader(&mut self) {
if let Some(lid) = self.top_leader() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id would be a better name since lid is a word. If not specific enough, use leader_id

if self.my_data().current_leader_id != lid {
if self.table.get(&lid).is_some() {
self.set_leader(lid);
}
}
}
}

/// Apply updates that we received from the identity `from`
/// # Arguments
Expand All @@ -577,14 +603,20 @@ impl Crdt {
Builder::new()
.name("solana-gossip".to_string())
.spawn(move || loop {
let start = timestamp();
let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler);
obj.write().unwrap().purge(timestamp());
if exit.load(Ordering::Relaxed) {
return;
}
//TODO: possibly tune this parameter
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
let _ = obj.write().unwrap().update_leader();
let elapsed = timestamp() - start;
if GOSSIP_SLEEP_MILLIS > elapsed {
let left = GOSSIP_SLEEP_MILLIS - elapsed;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left is unnecessarily ambiguous. time_left or remaining would be better.

sleep(Duration::from_millis(left));
}
})
.unwrap()
}
Expand Down Expand Up @@ -829,6 +861,7 @@ impl TestNode {
#[cfg(test)]
mod tests {
use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE};
use logger;
use packet::BlobRecycler;
use result::Error;
use signature::{KeyPair, KeyPairUtil};
Expand Down Expand Up @@ -1173,4 +1206,29 @@ mod tests {
assert_eq!(blob.get_id().unwrap(), id);
}
}
/// FIXME: This is obviously the wrong way to do this. Need to implement leader selection,
/// delete this test after leader selection is correctly implemented
#[test]
fn test_update_leader() {
logger::setup();
let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap());
let lead = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leader0 and leader1 would be consistent with naming elsewhere

let lead2 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap());
let mut crdt = Crdt::new(me.clone());
assert_matches!(crdt.top_leader(), None);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert_eq

crdt.set_leader(lead.id);
assert_eq!(crdt.top_leader().unwrap(), lead.id);
//add a bunch of nodes with a new leader
for _ in 0..10 {
let mut dum = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap());
dum.current_leader_id = lead2.id;
crdt.insert(&dum);
}
assert_eq!(crdt.top_leader().unwrap(), lead2.id);
crdt.update_leader();
assert_eq!(crdt.my_data().current_leader_id, lead.id);
crdt.insert(&lead2);
crdt.update_leader();
assert_eq!(crdt.my_data().current_leader_id, lead2.id);
}
}
7 changes: 2 additions & 5 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl Server {
replicate_socket: UdpSocket,
gossip_listen_socket: UdpSocket,
repair_socket: UdpSocket,
leader_repl_data: ReplicatedData,
entry_point: ReplicatedData,
exit: Arc<AtomicBool>,
) -> Self {
let bank = Arc::new(bank);
Expand All @@ -143,12 +143,9 @@ impl Server {
thread_hdls.extend(rpu.thread_hdls);

let crdt = Arc::new(RwLock::new(Crdt::new(me)));
crdt.write()
.expect("'crdt' write lock in pub fn replicate")
.set_leader(leader_repl_data.id);
crdt.write()
.expect("'crdt' write lock before insert() in pub fn replicate")
.insert(&leader_repl_data);
.insert(&entry_point);
let window = streamer::default_window();
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let retransmit_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
Expand Down