-
Notifications
You must be signed in to change notification settings - Fork 5.5k
very dumb leader selection #425
Changes from 1 commit
43eaf84
cabc9a8
bef45b9
e41fcdd
bbdb8a2
a98f58b
1e2e0d0
304fd21
b800a12
b68903c
a286575
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,7 +36,12 @@ fn main() { | |
| let mut opts = Options::new(); | ||
| opts.optflag("h", "help", "print help"); | ||
| opts.optopt("l", "", "run with the identity found in FILE", "FILE"); | ||
| opts.optopt("v", "", "validate; find leader's identity in FILE", "FILE"); | ||
| opts.optopt( | ||
| "t", | ||
| "", | ||
| "testnet; connec to the network at this gossip entry point", | ||
|
||
| "host:port", | ||
|
||
| ); | ||
| opts.optopt( | ||
| "o", | ||
| "", | ||
|
|
@@ -119,14 +124,14 @@ fn main() { | |
| } | ||
|
|
||
| let exit = Arc::new(AtomicBool::new(false)); | ||
| let threads = if matches.opt_present("v") { | ||
| let path = matches.opt_str("v").unwrap(); | ||
| let threads = if matches.opt_present("t") { | ||
| let testnet = matches.opt_str("t").unwrap(); | ||
| eprintln!( | ||
| "starting validator... {} using {}", | ||
| repl_data.requests_addr, path | ||
| "starting validator... {} connecting to {}", | ||
| repl_data.requests_addr, testnet | ||
| ); | ||
| let file = File::open(path.clone()).expect(&format!("file not found: {}", path)); | ||
| let leader = serde_json::from_reader(file).expect("parse"); | ||
| let taddr = testnet.parse().unwrap(); | ||
|
||
| let entry = ReplicatedData::new_entry_point(taddr); | ||
|
||
| let s = Server::new_validator( | ||
| bank, | ||
| repl_data.clone(), | ||
|
|
@@ -135,7 +140,7 @@ fn main() { | |
| UdpSocket::bind(repl_data.replicate_addr).unwrap(), | ||
| UdpSocket::bind(repl_data.gossip_addr).unwrap(), | ||
| UdpSocket::bind(repl_data.repair_addr).unwrap(), | ||
| leader, | ||
| entry, | ||
| exit.clone(), | ||
| ); | ||
| s.thread_hdls | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -551,6 +551,33 @@ impl Crdt { | |
| blob_sender.send(q)?; | ||
| Ok(()) | ||
| } | ||
| /// FIXME: This is obviously the wrong way to do this. Need to implement leader selection | ||
|
||
| fn top_leader(&self) -> Option<PublicKey> { | ||
| let mut table = HashMap::new(); | ||
| let def = PublicKey::default(); | ||
| let cur = self.table.values().filter(|x| x.current_leader_id != def); | ||
| for v in cur { | ||
| let cnt = table.entry(&v.current_leader_id).or_insert(0); | ||
| //let cnt = table.get_mut(&v.current_leader_id).unwrap(); | ||
| *cnt += 1; | ||
| println!("leader {:?} {}", &v.current_leader_id[..4], *cnt); | ||
| } | ||
| let mut sorted: Vec<_> = table.iter().collect(); | ||
| sorted.sort_by_key(|a| a.1); | ||
| sorted.last().map(|a| *(*(*a).0)) | ||
|
||
| } | ||
|
|
||
| /// FIXME: This is obviously the wrong way to do this. Need to implement leader selection | ||
| /// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems fair :) |
||
| fn update_leader(&mut self) { | ||
| if let Some(lid) = self.top_leader() { | ||
|
||
| if self.my_data().current_leader_id != lid { | ||
| if self.table.get(&lid).is_some() { | ||
| self.set_leader(lid); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Apply updates that we received from the identity `from` | ||
| /// # Arguments | ||
|
|
@@ -577,14 +604,20 @@ impl Crdt { | |
| Builder::new() | ||
| .name("solana-gossip".to_string()) | ||
| .spawn(move || loop { | ||
| let start = timestamp(); | ||
| let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler); | ||
| obj.write().unwrap().purge(timestamp()); | ||
| if exit.load(Ordering::Relaxed) { | ||
| return; | ||
| } | ||
| //TODO: possibly tune this parameter | ||
| //we saw a deadlock passing an obj.read().unwrap().timeout into sleep | ||
| sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); | ||
| let _ = obj.write().unwrap().update_leader(); | ||
| let elapsed = timestamp() - start; | ||
| if GOSSIP_SLEEP_MILLIS > elapsed { | ||
| let left = GOSSIP_SLEEP_MILLIS - elapsed; | ||
|
||
| sleep(Duration::from_millis(left)); | ||
| } | ||
| }) | ||
| .unwrap() | ||
| } | ||
|
|
@@ -832,6 +865,7 @@ mod tests { | |
| use packet::BlobRecycler; | ||
| use result::Error; | ||
| use signature::{KeyPair, KeyPairUtil}; | ||
| use std::collections::HashMap; | ||
| use std::sync::atomic::{AtomicBool, Ordering}; | ||
| use std::sync::mpsc::channel; | ||
| use std::sync::{Arc, RwLock}; | ||
|
|
@@ -1173,4 +1207,40 @@ mod tests { | |
| assert_eq!(blob.get_id().unwrap(), id); | ||
| } | ||
| } | ||
| /// FIXME: This is obviously the wrong way to do this. Need to implement leader selection, | ||
| /// delete this test after leader selection is correctly implemented | ||
| #[test] | ||
| fn test_update_leader() { | ||
| let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); | ||
| let lead = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); | ||
|
||
| let lead2 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); | ||
| let mut crdt = Crdt::new(me.clone()); | ||
| assert_matches!(crdt.top_leader(), None); | ||
|
||
| crdt.set_leader(lead.id); | ||
| assert_eq!(crdt.top_leader().unwrap(), lead.id); | ||
| //add a bunch of nodes with a new leader | ||
| for _ in 0..10 { | ||
| let mut dum = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); | ||
| dum.current_leader_id = lead2.id; | ||
| crdt.insert(&dum); | ||
| } | ||
| assert_eq!(crdt.top_leader().unwrap(), lead2.id); | ||
| crdt.update_leader(); | ||
| assert_eq!(crdt.my_data().current_leader_id, lead.id); | ||
| crdt.insert(&lead2); | ||
| crdt.update_leader(); | ||
| assert_eq!(crdt.my_data().current_leader_id, lead2.id); | ||
| } | ||
| #[test] | ||
| fn test_update_leader_pubkeys() { | ||
| let key1 = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); | ||
| let key2 = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); | ||
| let mut table = HashMap::new(); | ||
| table.entry(&key1.current_leader_id).or_insert(0); | ||
| for _ in 0..1000 { | ||
| let a = table.entry(&key2.current_leader_id).or_insert(0); | ||
| *a += 1; | ||
| } | ||
| assert_eq!(table.len(), 2); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Connect