diff --git a/.clippy.toml b/.clippy.toml new file mode 100644 index 00000000000000..756c7dc24eaf24 --- /dev/null +++ b/.clippy.toml @@ -0,0 +1 @@ +too-many-arguments-threshold = 9 diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index 89b5122903a9f7..1b7e5e87c995ef 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -134,7 +134,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { let bank = Arc::new(Bank::new(&mint)); let verified_setup: Vec<_> = - to_packets_chunked(&packet_recycler, setup_transactions.clone(), tx) + to_packets_chunked(&packet_recycler, &setup_transactions.clone(), tx) .into_iter() .map(|x| { let len = (*x).read().unwrap().packets.len(); @@ -144,16 +144,12 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { let verified_setup_len = verified_setup.len(); verified_sender.send(verified_setup).unwrap(); - BankingStage::process_packets( - bank.clone(), - &verified_receiver, - &signal_sender, - &packet_recycler, - ).unwrap(); + BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler) + .unwrap(); check_txs(verified_setup_len, &signal_receiver, num_src_accounts); - let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), 192) + let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), 192) .into_iter() .map(|x| { let len = (*x).read().unwrap().packets.len(); @@ -163,12 +159,8 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { let verified_len = verified.len(); verified_sender.send(verified).unwrap(); - BankingStage::process_packets( - bank.clone(), - &verified_receiver, - &signal_sender, - &packet_recycler, - ).unwrap(); + BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler) + .unwrap(); check_txs(verified_len, &signal_receiver, tx); }); @@ -201,7 +193,7 @@ fn bench_banking_stage_single_from(bencher: &mut Bencher) { bencher.iter(move || { let bank = Arc::new(Bank::new(&mint)); - let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), tx) + let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), tx) .into_iter() .map(|x| { let len = (*x).read().unwrap().packets.len(); @@ -210,12 +202,8 @@ fn bench_banking_stage_single_from(bencher: &mut Bencher) { .collect(); let verified_len = verified.len(); verified_sender.send(verified).unwrap(); - BankingStage::process_packets( - bank.clone(), - &verified_receiver, - &signal_sender, - &packet_recycler, - ).unwrap(); + BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler) + .unwrap(); check_txs(verified_len, &signal_receiver, tx); }); diff --git a/ci/test-nightly.sh b/ci/test-nightly.sh index aed09ac080db0d..49fb291583a492 100755 --- a/ci/test-nightly.sh +++ b/ci/test-nightly.sh @@ -13,7 +13,7 @@ _() { _ cargo build --verbose --features unstable _ cargo test --verbose --features unstable - +_ cargo clippy -- --deny=warnings exit 0 diff --git a/src/bank.rs b/src/bank.rs index 3c8ca651a3d2b6..b9ecf743c7e330 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -120,11 +120,7 @@ impl Bank { /// Commit funds to the `payment.to` party. fn apply_payment(&self, payment: &Payment, balances: &mut HashMap) { - if balances.contains_key(&payment.to) { - *balances.get_mut(&payment.to).unwrap() += payment.tokens; - } else { - balances.insert(payment.to, payment.tokens); - } + *balances.entry(payment.to).or_insert(0) += payment.tokens; } /// Return the last entry ID registered. @@ -511,7 +507,7 @@ impl Bank { let bals = self.balances .read() .expect("'balances' read lock in get_balance"); - bals.get(pubkey).map(|x| *x).unwrap_or(0) + bals.get(pubkey).cloned().unwrap_or(0) } pub fn transaction_count(&self) -> usize { diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 4616d45809ad46..c8b7f6de357834 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -40,7 +40,7 @@ impl BankingStage { .name("solana-banking-stage".to_string()) .spawn(move || loop { if let Err(e) = Self::process_packets( - bank.clone(), + &bank.clone(), &verified_receiver, &signal_sender, &packet_recycler, @@ -72,7 +72,7 @@ impl BankingStage { /// Process the incoming packets and send output `Signal` messages to `signal_sender`. /// Discard packets via `packet_recycler`. pub fn process_packets( - bank: Arc, + bank: &Arc, verified_receiver: &Receiver)>>, signal_sender: &Sender, packet_recycler: &PacketRecycler, diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 24f561d1be869c..2a7c88922e18a2 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -35,10 +35,10 @@ use std::time::Duration; use std::time::Instant; fn sample_tx_count( - exit: Arc, - maxes: Arc>>, + exit: &Arc, + maxes: &Arc>>, first_count: u64, - v: NodeInfo, + v: &NodeInfo, sample_period: u64, ) { let mut client = mk_client(&v); @@ -76,9 +76,9 @@ fn sample_tx_count( fn generate_and_send_txs( client: &mut ThinClient, - tx_clients: &Vec, + tx_clients: &[ThinClient], id: &Mint, - keypairs: &Vec, + keypairs: &[KeyPair], leader: &NodeInfo, txs: i64, last_id: &mut Hash, @@ -129,7 +129,7 @@ fn generate_and_send_txs( leader.contact_info.tpu ); for tx in txs { - client.transfer_signed(tx.clone()).unwrap(); + client.transfer_signed(tx).unwrap(); } }); println!( @@ -199,7 +199,7 @@ fn main() { let leader: NodeInfo; if let Some(l) = matches.value_of("leader") { - leader = read_leader(l.to_string()).node_info; + leader = read_leader(l).node_info; } else { let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); leader = NodeInfo::new_leader(&server_addr); @@ -207,7 +207,7 @@ fn main() { let id: Mint; if let Some(m) = matches.value_of("mint") { - id = read_mint(m.to_string()).expect("client mint"); + id = read_mint(m).expect("client mint"); } else { eprintln!("No mint found!"); exit(1); @@ -225,12 +225,12 @@ fn main() { time_sec = s.to_string().parse().expect("integer"); } - let mut drone_addr = leader.contact_info.tpu.clone(); + let mut drone_addr = leader.contact_info.tpu; drone_addr.set_port(9900); let signal = Arc::new(AtomicBool::new(false)); let mut c_threads = vec![]; - let validators = converge(&leader, signal.clone(), num_nodes, &mut c_threads); + let validators = converge(&leader, &signal.clone(), num_nodes, &mut c_threads); assert_eq!(validators.len(), num_nodes); let mut client = mk_client(&leader); @@ -241,7 +241,7 @@ fn main() { if starting_balance < txs { let airdrop_amount = txs - starting_balance; println!("Airdropping {:?} tokens", airdrop_amount); - let _airdrop = request_airdrop(&drone_addr, &id, airdrop_amount as u64).unwrap(); + request_airdrop(&drone_addr, &id, airdrop_amount as u64).unwrap(); // TODO: return airdrop Result from Drone sleep(Duration::from_millis(100)); @@ -282,13 +282,13 @@ fn main() { Builder::new() .name("solana-client-sample".to_string()) .spawn(move || { - sample_tx_count(exit, maxes, first_count, v, sample_period); + sample_tx_count(&exit, &maxes, first_count, &v, sample_period); }) .unwrap() }) .collect(); - let clients = (0..threads).map(|_| mk_client(&leader)).collect(); + let clients: Vec<_> = (0..threads).map(|_| mk_client(&leader)).collect(); // generate and send transactions for the specified duration let time = Duration::new(time_sec / 2, 0); @@ -385,7 +385,7 @@ fn spy_node() -> (NodeInfo, UdpSocket) { fn converge( leader: &NodeInfo, - exit: Arc, + exit: &Arc, num_nodes: usize, threads: &mut Vec>, ) -> Vec { @@ -399,7 +399,7 @@ fn converge( let window = default_window(); let gossip_send_socket = udp_random_bind(8000, 10000, 5).unwrap(); let ncp = Ncp::new( - spy_ref.clone(), + &spy_ref.clone(), window.clone(), spy_gossip, gossip_send_socket, @@ -428,13 +428,13 @@ fn converge( rv } -fn read_leader(path: String) -> Config { - let file = File::open(path.clone()).expect(&format!("file not found: {}", path)); - serde_json::from_reader(file).expect(&format!("failed to parse {}", path)) +fn read_leader(path: &str) -> Config { + let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path)); + serde_json::from_reader(file).unwrap_or_else(|_| panic!("failed to parse {}", path)) } -fn read_mint(path: String) -> Result> { - let file = File::open(path.clone())?; +fn read_mint(path: &str) -> Result> { + let file = File::open(path.to_string())?; let mint = serde_json::from_reader(file)?; Ok(mint) } diff --git a/src/bin/drone.rs b/src/bin/drone.rs index 76b3c93414d376..f19f7a802a10e2 100644 --- a/src/bin/drone.rs +++ b/src/bin/drone.rs @@ -62,7 +62,7 @@ fn main() { let leader: NodeInfo; if let Some(l) = matches.value_of("leader") { - leader = read_leader(l.to_string()).node_info; + leader = read_leader(l).node_info; } else { let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); leader = NodeInfo::new_leader(&server_addr); @@ -70,7 +70,7 @@ fn main() { let mint: Mint; if let Some(m) = matches.value_of("mint") { - mint = read_mint(m.to_string()).expect("client mint"); + mint = read_mint(m).expect("client mint"); } else { eprintln!("No mint found!"); exit(1); @@ -148,13 +148,13 @@ fn main() { }); tokio::run(done); } -fn read_leader(path: String) -> Config { - let file = File::open(path.clone()).expect(&format!("file not found: {}", path)); - serde_json::from_reader(file).expect(&format!("failed to parse {}", path)) +fn read_leader(path: &str) -> Config { + let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path)); + serde_json::from_reader(file).unwrap_or_else(|_| panic!("failed to parse {}", path)) } -fn read_mint(path: String) -> Result> { - let file = File::open(path.clone())?; +fn read_mint(path: &str) -> Result> { + let file = File::open(path.to_string())?; let mint = serde_json::from_reader(file)?; Ok(mint) } diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 556241592e9bbf..c100922b70acbd 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -82,7 +82,7 @@ fn main() -> () { None, ) } else { - node.data.leader_id = node.data.id.clone(); + node.data.leader_id = node.data.id; let outfile = if let Some(o) = matches.value_of("output") { OutFile::Path(o.to_string()) diff --git a/src/bin/wallet.rs b/src/bin/wallet.rs index 97be24a9a6bab2..9864696421d73e 100644 --- a/src/bin/wallet.rs +++ b/src/bin/wallet.rs @@ -66,9 +66,9 @@ impl Default for WalletConfig { fn default() -> WalletConfig { let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); WalletConfig { - leader: NodeInfo::new_leader(&default_addr.clone()), + leader: NodeInfo::new_leader(&default_addr), id: Mint::new(0), - drone_addr: default_addr.clone(), + drone_addr: default_addr, command: WalletCommand::Balance, } } @@ -143,7 +143,7 @@ fn parse_args() -> Result> { let leader: NodeInfo; if let Some(l) = matches.value_of("leader") { - leader = read_leader(l.to_string()).node_info; + leader = read_leader(l).node_info; } else { let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); leader = NodeInfo::new_leader(&server_addr); @@ -151,26 +151,26 @@ fn parse_args() -> Result> { let id: Mint; if let Some(m) = matches.value_of("mint") { - id = read_mint(m.to_string())?; + id = read_mint(m)?; } else { eprintln!("No mint found!"); exit(1); }; - let mut drone_addr = leader.contact_info.tpu.clone(); + let mut drone_addr = leader.contact_info.tpu; drone_addr.set_port(9900); let command = match matches.subcommand() { ("airdrop", Some(airdrop_matches)) => { - let mut tokens: i64 = id.tokens; - if airdrop_matches.is_present("tokens") { - tokens = airdrop_matches.value_of("tokens").unwrap().parse()?; - } + let tokens = if airdrop_matches.is_present("tokens") { + airdrop_matches.value_of("tokens").unwrap().parse()? + } else { + id.tokens + }; Ok(WalletCommand::AirDrop(tokens)) } ("pay", Some(pay_matches)) => { - let to: PublicKey; - if pay_matches.is_present("to") { + let to = if pay_matches.is_present("to") { let pubkey_vec = bs58::decode(pay_matches.value_of("to").unwrap()) .into_vec() .expect("base58-encoded public key"); @@ -179,14 +179,17 @@ fn parse_args() -> Result> { display_actions(); Err(WalletError::BadParameter("Invalid public key".to_string()))?; } - to = PublicKey::clone_from_slice(&pubkey_vec); + PublicKey::clone_from_slice(&pubkey_vec) } else { - to = id.pubkey(); - } - let mut tokens: i64 = id.tokens; - if pay_matches.is_present("tokens") { - tokens = pay_matches.value_of("tokens").unwrap().parse()?; - } + id.pubkey() + }; + + let tokens = if pay_matches.is_present("tokens") { + pay_matches.value_of("tokens").unwrap().parse()? + } else { + id.tokens + }; + Ok(WalletCommand::Pay(tokens, to)) } ("confirm", Some(confirm_matches)) => { @@ -250,7 +253,7 @@ fn process_command( WalletCommand::AirDrop(tokens) => { println!("Airdrop requested..."); println!("Airdropping {:?} tokens", tokens); - let _airdrop = request_airdrop(&config.drone_addr, &config.id, tokens as u64)?; + request_airdrop(&config.drone_addr, &config.id, tokens as u64)?; // TODO: return airdrop Result from Drone sleep(Duration::from_millis(100)); println!( @@ -277,23 +280,23 @@ fn process_command( } fn display_actions() { - println!(""); + println!(); println!("Commands:"); println!(" address Get your public key"); println!(" balance Get your account balance"); println!(" airdrop Request a batch of tokens"); println!(" pay Send tokens to a public key"); println!(" confirm Confirm your last payment by signature"); - println!(""); + println!(); } -fn read_leader(path: String) -> Config { - let file = File::open(path.clone()).expect(&format!("file not found: {}", path)); - serde_json::from_reader(file).expect(&format!("failed to parse {}", path)) +fn read_leader(path: &str) -> Config { + let file = File::open(path.to_string()).unwrap_or_else(|_| panic!("file not found: {}", path)); + serde_json::from_reader(file).unwrap_or_else(|_| panic!("failed to parse {}", path)) } -fn read_mint(path: String) -> Result> { - let file = File::open(path.clone())?; +fn read_mint(path: &str) -> Result> { + let file = File::open(path.to_string())?; let mint = serde_json::from_reader(file)?; Ok(mint) } diff --git a/src/blob_fetch_stage.rs b/src/blob_fetch_stage.rs index c02c0836ee7188..b7ef06124a70ba 100644 --- a/src/blob_fetch_stage.rs +++ b/src/blob_fetch_stage.rs @@ -18,14 +18,14 @@ impl BlobFetchStage { pub fn new( socket: UdpSocket, exit: Arc, - blob_recycler: BlobRecycler, + blob_recycler: &BlobRecycler, ) -> (Self, BlobReceiver) { Self::new_multi_socket(vec![socket], exit, blob_recycler) } pub fn new_multi_socket( sockets: Vec, exit: Arc, - blob_recycler: BlobRecycler, + blob_recycler: &BlobRecycler, ) -> (Self, BlobReceiver) { let (blob_sender, blob_receiver) = channel(); let thread_hdls: Vec<_> = sockets diff --git a/src/choose_gossip_peer_strategy.rs b/src/choose_gossip_peer_strategy.rs index fd235387978daa..ad9156163909d6 100644 --- a/src/choose_gossip_peer_strategy.rs +++ b/src/choose_gossip_peer_strategy.rs @@ -159,7 +159,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { // Return u32 b/c the weighted sampling API from rand::distributions // only takes u32 for weights - if weighted_vote >= std::u32::MAX as f64 { + if weighted_vote >= f64::from(std::u32::MAX) { return std::u32::MAX; } @@ -173,7 +173,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> { - if options.len() < 1 { + if options.is_empty() { Err(CrdtError::TooSmall)?; } diff --git a/src/crdt.rs b/src/crdt.rs index d241cc59634f67..d8d42b4e8092b3 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -56,7 +56,7 @@ pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); if let Some(addrstr) = optstr { if let Ok(port) = addrstr.parse() { - let mut addr = daddr.clone(); + let mut addr = daddr; addr.set_port(port); addr } else if let Ok(addr) = addrstr.parse() { @@ -173,12 +173,12 @@ impl NodeInfo { make_debug_id(&self.id) } fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr { - let mut nxt_addr = addr.clone(); + let mut nxt_addr = *addr; nxt_addr.set_port(addr.port() + nxt); nxt_addr } pub fn new_leader_with_pubkey(pubkey: PublicKey, bind_addr: &SocketAddr) -> Self { - let transactions_addr = bind_addr.clone(); + let transactions_addr = *bind_addr; let gossip_addr = Self::next_port(&bind_addr, 1); let replicate_addr = Self::next_port(&bind_addr, 2); let requests_addr = Self::next_port(&bind_addr, 3); @@ -201,9 +201,9 @@ impl NodeInfo { NodeInfo::new( PublicKey::default(), gossip_addr, - daddr.clone(), - daddr.clone(), - daddr.clone(), + daddr, + daddr, + daddr, daddr, ) } @@ -341,19 +341,19 @@ impl Crdt { fn update_leader_liveness(&mut self) { //TODO: (leaders should vote) //until then we pet their liveness every time we see some votes from anyone - let ld = self.leader_data().map(|x| x.id.clone()); + let ld = self.leader_data().map(|x| x.id); trace!("leader_id {:?}", ld); if let Some(leader_id) = ld { self.update_liveness(leader_id); } } - pub fn insert_votes(&mut self, votes: Vec<(PublicKey, Vote, Hash)>) { + pub fn insert_votes(&mut self, votes: &[(PublicKey, Vote, Hash)]) { static mut COUNTER_VOTE: Counter = create_counter!("crdt-vote-count", LOG_RATE); inc_counter!(COUNTER_VOTE, votes.len()); - if votes.len() > 0 { + if !votes.is_empty() { info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len()); } - for v in &votes { + for v in votes { self.insert_vote(&v.0, &v.1, v.2); } } @@ -371,7 +371,7 @@ impl Crdt { ); self.update_index += 1; - let _ = self.table.insert(v.id.clone(), v.clone()); + let _ = self.table.insert(v.id, v.clone()); let _ = self.local.insert(v.id, self.update_index); static mut COUNTER_UPDATE: Counter = create_counter!("crdt-update-count", LOG_RATE); inc_counter!(COUNTER_UPDATE, 1); @@ -449,7 +449,7 @@ impl Crdt { static mut COUNTER_PURGE: Counter = create_counter!("crdt-purge-count", LOG_RATE); inc_counter!(COUNTER_PURGE, dead_ids.len()); - for id in dead_ids.iter() { + for id in &dead_ids { self.alive.remove(id); self.table.remove(id); self.remote.remove(id); @@ -461,11 +461,7 @@ impl Crdt { } } - pub fn index_blobs( - me: &NodeInfo, - blobs: &Vec, - receive_index: &mut u64, - ) -> Result<()> { + pub fn index_blobs(me: &NodeInfo, blobs: &[SharedBlob], receive_index: &mut u64) -> Result<()> { // enumerate all the blobs, those are the indices trace!("{:x}: INDEX_BLOBS {}", me.debug_id(), blobs.len()); for (i, b) in blobs.iter().enumerate() { @@ -518,13 +514,13 @@ impl Crdt { /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn broadcast( me: &NodeInfo, - broadcast_table: &Vec, + broadcast_table: &[NodeInfo], window: &Window, s: &UdpSocket, transmit_index: &mut u64, received_index: u64, ) -> Result<()> { - if broadcast_table.len() < 1 { + if broadcast_table.is_empty() { warn!("{:x}:not enough peers in crdt table", me.debug_id()); Err(CrdtError::TooSmall)?; } @@ -676,7 +672,7 @@ impl Crdt { Err(CrdtError::TooSmall)?; } let n = (Self::random() as usize) % valid.len(); - let addr = valid[n].contact_info.ncp.clone(); + let addr = valid[n].contact_info.ncp; let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix); let out = serialize(&req)?; Ok((addr, out)) @@ -768,7 +764,7 @@ impl Crdt { } let mut sorted: Vec<(&PublicKey, usize)> = table.into_iter().collect(); let my_id = self.debug_id(); - for x in sorted.iter() { + for x in &sorted { trace!( "{:x}: sorted leaders {:x} votes: {}", my_id, @@ -784,10 +780,8 @@ impl Crdt { /// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet fn update_leader(&mut self) { if let Some(leader_id) = self.top_leader() { - if self.my_data().leader_id != leader_id { - if self.table.get(&leader_id).is_some() { - self.set_leader(leader_id); - } + if self.my_data().leader_id != leader_id && self.table.get(&leader_id).is_some() { + self.set_leader(leader_id); } } } @@ -822,7 +816,9 @@ impl Crdt { continue; } - let liveness_entry = self.external_liveness.entry(*pk).or_insert(HashMap::new()); + let liveness_entry = self.external_liveness + .entry(*pk) + .or_insert_with(HashMap::new); let peer_index = *liveness_entry.entry(from).or_insert(*external_remote_index); if *external_remote_index > peer_index { liveness_entry.insert(from, *external_remote_index); @@ -854,7 +850,7 @@ impl Crdt { obj.write().unwrap().purge(timestamp()); //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep - let _ = obj.write().unwrap().update_leader(); + obj.write().unwrap().update_leader(); let elapsed = timestamp() - start; if GOSSIP_SLEEP_MILLIS > elapsed { let time_left = GOSSIP_SLEEP_MILLIS - elapsed; @@ -948,10 +944,7 @@ impl Crdt { let me = obj.read().unwrap(); // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = me.get_updates_since(v); - let external_liveness = me.remote - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); + let external_liveness = me.remote.iter().map(|(k, v)| (*k, *v)).collect(); drop(me); trace!("get updates since response {} {}", v, data.len()); let len = data.len(); @@ -1091,6 +1084,12 @@ pub struct TestNode { pub sockets: Sockets, } +impl Default for TestNode { + fn default() -> Self { + Self::new() + } +} + impl TestNode { pub fn new() -> Self { let pubkey = KeyPair::new().pubkey(); @@ -1115,7 +1114,7 @@ impl TestNode { repair.local_addr().unwrap(), ); TestNode { - data: data, + data, sockets: Sockets { gossip, gossip_send, @@ -1130,19 +1129,19 @@ impl TestNode { } } pub fn new_with_bind_addr(data: NodeInfo, bind_addr: SocketAddr) -> TestNode { - let mut local_gossip_addr = bind_addr.clone(); + let mut local_gossip_addr = bind_addr; local_gossip_addr.set_port(data.contact_info.ncp.port()); - let mut local_replicate_addr = bind_addr.clone(); + let mut local_replicate_addr = bind_addr; local_replicate_addr.set_port(data.contact_info.tvu.port()); - let mut local_requests_addr = bind_addr.clone(); + let mut local_requests_addr = bind_addr; local_requests_addr.set_port(data.contact_info.rpu.port()); - let mut local_transactions_addr = bind_addr.clone(); + let mut local_transactions_addr = bind_addr; local_transactions_addr.set_port(data.contact_info.tpu.port()); - let mut local_repair_addr = bind_addr.clone(); + let mut local_repair_addr = bind_addr; local_repair_addr.set_port(data.contact_info.tvu_window.port()); let transaction = UdpSocket::bind(local_transactions_addr).unwrap(); @@ -1160,7 +1159,7 @@ impl TestNode { let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); TestNode { - data: data, + data, sockets: Sockets { gossip, gossip_send, @@ -1300,7 +1299,7 @@ mod tests { }; sleep(Duration::from_millis(100)); let votes = vec![(d.id.clone(), vote_new_version_old_addrs, Hash::default())]; - crdt.insert_votes(votes); + crdt.insert_votes(&votes); let updated = crdt.alive[&leader.id]; //should be accepted, since the update is for the same address field as the one we know assert_eq!(crdt.table[&d.id].version, 1); diff --git a/src/drone.rs b/src/drone.rs index 37a8bf4a5780c5..d8e7736c970548 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -17,7 +17,7 @@ use transaction::Transaction; pub const TIME_SLICE: u64 = 60; pub const REQUEST_CAP: u64 = 1_000_000; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum DroneRequest { GetAirdrop { airdrop_request_amount: u64, @@ -112,7 +112,7 @@ impl Drone { airdrop_request_amount, client_public_key, } => { - request_amount = airdrop_request_amount.clone(); + request_amount = airdrop_request_amount; tx = Transaction::new( &self.mint_keypair, client_public_key, @@ -136,7 +136,7 @@ impl Drone { ) .to_owned(), ); - client.transfer_signed(tx) + client.transfer_signed(&tx) } else { Err(Error::new(ErrorKind::Other, "token limit reached")) } diff --git a/src/entry.rs b/src/entry.rs index 704c246943dda8..dc04b07dbe12ec 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -144,7 +144,7 @@ fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) - /// Creates the next Tick or Transaction Entry `num_hashes` after `start_hash`. pub fn next_entry(start_hash: &Hash, num_hashes: u64, transactions: Vec) -> Entry { - assert!(num_hashes > 0 || transactions.len() == 0); + assert!(num_hashes > 0 || transactions.is_empty()); Entry { num_hashes, id: next_hash(start_hash, num_hashes, &transactions), diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs index bac6be8d8a385c..c1731f2c3af865 100644 --- a/src/fetch_stage.rs +++ b/src/fetch_stage.rs @@ -18,14 +18,14 @@ impl FetchStage { pub fn new( socket: UdpSocket, exit: Arc, - packet_recycler: PacketRecycler, + packet_recycler: &PacketRecycler, ) -> (Self, PacketReceiver) { Self::new_multi_socket(vec![socket], exit, packet_recycler) } pub fn new_multi_socket( sockets: Vec, exit: Arc, - packet_recycler: PacketRecycler, + packet_recycler: &PacketRecycler, ) -> (Self, PacketReceiver) { let (packet_sender, packet_receiver) = channel(); let thread_hdls: Vec<_> = sockets diff --git a/src/fullnode.rs b/src/fullnode.rs index 3df3fd75c7201b..ab896202ca19b3 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -99,7 +99,7 @@ impl FullNode { "starting... local gossip address: {} (advertising {})", local_gossip_addr, node.data.contact_info.ncp ); - let requests_addr = node.data.contact_info.rpu.clone(); + let requests_addr = node.data.contact_info.rpu; let exit = Arc::new(AtomicBool::new(false)); if !leader { let testnet_addr = network_entry_for_validator.expect("validator requires entry"); @@ -112,7 +112,7 @@ impl FullNode { entry_height, Some(ledger_tail), node, - network_entry_point, + &network_entry_point, exit.clone(), ); info!( @@ -121,7 +121,7 @@ impl FullNode { ); server } else { - node.data.leader_id = node.data.id.clone(); + node.data.leader_id = node.data.id; let outfile_for_leader: Box = match outfile_for_leader { Some(OutFile::Path(file)) => Box::new( OpenOptions::new() @@ -208,7 +208,7 @@ impl FullNode { let bank = Arc::new(bank); let mut thread_hdls = vec![]; let rpu = Rpu::new( - bank.clone(), + &bank.clone(), node.sockets.requests, node.sockets.respond, exit.clone(), @@ -218,18 +218,18 @@ impl FullNode { let blob_recycler = BlobRecycler::default(); let crdt = Arc::new(RwLock::new(Crdt::new(node.data))); let (tpu, blob_receiver) = Tpu::new( - bank.clone(), - crdt.clone(), + &bank.clone(), + &crdt.clone(), tick_duration, node.sockets.transaction, - blob_recycler.clone(), + &blob_recycler.clone(), exit.clone(), writer, ); thread_hdls.extend(tpu.thread_hdls()); let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler); let ncp = Ncp::new( - crdt.clone(), + &crdt.clone(), window.clone(), node.sockets.gossip, node.sockets.gossip_send, @@ -285,13 +285,13 @@ impl FullNode { entry_height: u64, ledger_tail: Option>, node: TestNode, - entry_point: NodeInfo, + entry_point: &NodeInfo, exit: Arc, ) -> Self { let bank = Arc::new(bank); let mut thread_hdls = vec![]; let rpu = Rpu::new( - bank.clone(), + &bank.clone(), node.sockets.requests, node.sockets.respond, exit.clone(), @@ -308,7 +308,7 @@ impl FullNode { let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler); let ncp = Ncp::new( - crdt.clone(), + &crdt.clone(), window.clone(), node.sockets.gossip, node.sockets.gossip_send, @@ -367,7 +367,7 @@ mod tests { let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); let entry = tn.data.clone(); - let v = FullNode::new_validator(kp, bank, 0, None, tn, entry, exit); + let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit); v.close().unwrap(); } } diff --git a/src/logger.rs b/src/logger.rs index 120e3ff82412ea..b6ff42a8c11002 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -9,6 +9,6 @@ static INIT: Once = ONCE_INIT; /// Setup function that is only run once, even if called multiple times. pub fn setup() { INIT.call_once(|| { - let _ = env_logger::init(); + env_logger::init(); }); } diff --git a/src/metrics.rs b/src/metrics.rs index 05261137a036cf..c502722ae981aa 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -36,10 +36,11 @@ impl InfluxDbMetricsWriter { } fn build_client() -> Option { - let host = env::var("INFLUX_HOST").unwrap_or("https://metrics.solana.com:8086".to_string()); - let db = env::var("INFLUX_DATABASE").unwrap_or("scratch".to_string()); - let username = env::var("INFLUX_USERNAME").unwrap_or("scratch_writer".to_string()); - let password = env::var("INFLUX_PASSWORD").unwrap_or("topsecret".to_string()); + let host = env::var("INFLUX_HOST") + .unwrap_or_else(|_| "https://metrics.solana.com:8086".to_string()); + let db = env::var("INFLUX_DATABASE").unwrap_or_else(|_| "scratch".to_string()); + let username = env::var("INFLUX_USERNAME").unwrap_or_else(|_| "scratch_writer".to_string()); + let password = env::var("INFLUX_PASSWORD").unwrap_or_else(|_| "topsecret".to_string()); debug!("InfluxDB host={} db={} username={}", host, db, username); let mut client = influxdb::Client::new_with_option(host, db, None) @@ -80,13 +81,13 @@ impl Default for MetricsAgent { impl MetricsAgent { fn new(writer: Arc, write_frequency: Duration) -> Self { let (sender, receiver) = channel::(); - thread::spawn(move || Self::run(receiver, writer, write_frequency)); + thread::spawn(move || Self::run(&receiver, &writer, write_frequency)); MetricsAgent { sender } } fn run( - receiver: Receiver, - writer: Arc, + receiver: &Receiver, + writer: &Arc, write_frequency: Duration, ) { trace!("run: enter"); @@ -120,13 +121,11 @@ impl MetricsAgent { } let now = Instant::now(); - if now.duration_since(last_write_time) >= write_frequency { - if !points.is_empty() { - debug!("run: writing {} points", points.len()); - writer.write(points); - points = Vec::new(); - last_write_time = now; - } + if now.duration_since(last_write_time) >= write_frequency && !points.is_empty() { + debug!("run: writing {} points", points.len()); + writer.write(points); + points = Vec::new(); + last_write_time = now; } } trace!("run: exit"); diff --git a/src/nat.rs b/src/nat.rs index 3ca30a7efc4abe..7481dd075c49bb 100644 --- a/src/nat.rs +++ b/src/nat.rs @@ -92,7 +92,7 @@ pub fn udp_public_bind(label: &str, startport: u16, endport: u16) -> UdpSocketPa // // TODO: Remove the |sender| socket and deal with the downstream changes to // the UDP signalling - let mut local_addr_sender = local_addr.clone(); + let mut local_addr_sender = local_addr; local_addr_sender.set_port(public_addr.port()); UdpSocket::bind(local_addr_sender).unwrap() }; diff --git a/src/ncp.rs b/src/ncp.rs index 7e8d05aa57aa0c..e03b7b5df1d714 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -18,7 +18,7 @@ pub struct Ncp { impl Ncp { pub fn new( - crdt: Arc>, + crdt: &Arc>, window: Arc>>>, gossip_listen_socket: UdpSocket, gossip_send_socket: UdpSocket, @@ -93,7 +93,7 @@ mod tests { let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); let d = Ncp::new( - c.clone(), + &c.clone(), w, tn.sockets.gossip, tn.sockets.gossip_send, diff --git a/src/packet.rs b/src/packet.rs index 037ac51e33d4dd..33c6ffc3a77eb4 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -240,7 +240,7 @@ impl Packets { pub fn to_packets_chunked( r: &PacketRecycler, - xs: Vec, + xs: &[T], chunks: usize, ) -> Vec { let mut out = vec![]; @@ -258,10 +258,10 @@ pub fn to_packets_chunked( } out.push(p); } - return out; + out } -pub fn to_packets(r: &PacketRecycler, xs: Vec) -> Vec { +pub fn to_packets(r: &PacketRecycler, xs: &[T]) -> Vec { to_packets_chunked(r, xs, NUM_PACKETS) } @@ -347,7 +347,7 @@ impl Blob { } pub fn is_coding(&self) -> bool { - return (self.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0; + (self.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0 } pub fn set_coding(&mut self) -> Result<()> { @@ -524,15 +524,15 @@ mod tests { fn test_to_packets() { let tx = Request::GetTransactionCount; let re = PacketRecycler::default(); - let rv = to_packets(&re, vec![tx.clone(); 1]); + let rv = to_packets(&re, &vec![tx.clone(); 1]); assert_eq!(rv.len(), 1); assert_eq!(rv[0].read().unwrap().packets.len(), 1); - let rv = to_packets(&re, vec![tx.clone(); NUM_PACKETS]); + let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS]); assert_eq!(rv.len(), 1); assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); - let rv = to_packets(&re, vec![tx.clone(); NUM_PACKETS + 1]); + let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS + 1]); assert_eq!(rv.len(), 2); assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); assert_eq!(rv[1].read().unwrap().packets.len(), 1); diff --git a/src/record_stage.rs b/src/record_stage.rs index 4f8651a7b2c440..a2aaa069908c1a 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -32,7 +32,7 @@ impl RecordStage { start_hash: &Hash, ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); - let start_hash = start_hash.clone(); + let start_hash = *start_hash; let thread_hdl = Builder::new() .name("solana-record-stage".to_string()) @@ -52,7 +52,7 @@ impl RecordStage { tick_duration: Duration, ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); - let start_hash = start_hash.clone(); + let start_hash = *start_hash; let thread_hdl = Builder::new() .name("solana-record-stage".to_string()) @@ -60,13 +60,14 @@ impl RecordStage { let mut recorder = Recorder::new(start_hash); let start_time = Instant::now(); loop { - if let Err(_) = Self::try_process_signals( + if Self::try_process_signals( &mut recorder, start_time, tick_duration, &signal_receiver, &entry_sender, - ) { + ).is_err() + { return; } recorder.hash(); diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index bb2cd8e231342e..8c2d8c087fe5b0 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -66,7 +66,7 @@ impl ReplicateStage { let shared_blob = blob_recycler.allocate(); let (vote, addr) = { let mut wcrdt = crdt.write().unwrap(); - wcrdt.insert_votes(votes); + wcrdt.insert_votes(&votes); //TODO: doesn't seem like there is a synchronous call to get height and id info!("replicate_stage {} {:?}", height, &last_id[..8]); wcrdt.new_vote(height, last_id) diff --git a/src/request.rs b/src/request.rs index a1a006c38b2788..96c84b80ad67e6 100644 --- a/src/request.rs +++ b/src/request.rs @@ -4,7 +4,7 @@ use hash::Hash; use signature::{PublicKey, Signature}; #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum Request { GetBalance { key: PublicKey }, GetLastId, diff --git a/src/rpu.rs b/src/rpu.rs index ca5ac7d6a2ad72..af8b52ca93a3be 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -41,7 +41,7 @@ pub struct Rpu { impl Rpu { pub fn new( - bank: Arc, + bank: &Arc, requests_socket: UdpSocket, respond_socket: UdpSocket, exit: Arc, diff --git a/src/sigverify.rs b/src/sigverify.rs index c4bfeb6bc975f1..ffe7698e09c8e0 100644 --- a/src/sigverify.rs +++ b/src/sigverify.rs @@ -59,13 +59,14 @@ fn verify_packet(packet: &Packet) -> u8 { ).is_ok() as u8 } -fn batch_size(batches: &Vec) -> usize { +fn batch_size(batches: &[SharedPackets]) -> usize { batches .iter() .map(|p| p.read().unwrap().packets.len()) .sum() } +#[cfg_attr(feature = "cargo-clippy", allow(ptr_arg))] #[cfg(not(feature = "cuda"))] pub fn ed25519_verify(batches: &Vec) -> Vec> { use rayon::prelude::*; diff --git a/src/sigverify_stage.rs b/src/sigverify_stage.rs index e90ea238f6384b..57ffa020e76a9a 100644 --- a/src/sigverify_stage.rs +++ b/src/sigverify_stage.rs @@ -17,27 +17,27 @@ use std::time::Instant; use streamer::{self, PacketReceiver}; use timing; +pub type VerifiedPackets = Vec<(SharedPackets, Vec)>; + pub struct SigVerifyStage { thread_hdls: Vec>, } impl SigVerifyStage { - pub fn new( - packet_receiver: Receiver, - ) -> (Self, Receiver)>>) { + pub fn new(packet_receiver: Receiver) -> (Self, Receiver) { let (verified_sender, verified_receiver) = channel(); let thread_hdls = Self::verifier_services(packet_receiver, verified_sender); (SigVerifyStage { thread_hdls }, verified_receiver) } - fn verify_batch(batch: Vec) -> Vec<(SharedPackets, Vec)> { + fn verify_batch(batch: Vec) -> VerifiedPackets { let r = sigverify::ed25519_verify(&batch); batch.into_iter().zip(r).collect() } fn verifier( recvr: &Arc>, - sendr: &Arc)>>>>, + sendr: &Arc>>, ) -> Result<()> { let (batch, len) = streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; @@ -74,7 +74,7 @@ impl SigVerifyStage { fn verifier_service( packet_receiver: Arc>, - verified_sender: Arc)>>>>, + verified_sender: Arc>>, ) -> JoinHandle<()> { spawn(move || loop { if let Err(e) = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()) { @@ -89,7 +89,7 @@ impl SigVerifyStage { fn verifier_services( packet_receiver: PacketReceiver, - verified_sender: Sender)>>, + verified_sender: Sender, ) -> Vec> { let sender = Arc::new(Mutex::new(verified_sender)); let receiver = Arc::new(Mutex::new(packet_receiver)); diff --git a/src/streamer.rs b/src/streamer.rs index d2cbda902b311e..cb6c5192854fff 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -170,7 +170,7 @@ fn find_next_missing( let reqs: Vec<_> = (*consumed..*received) .filter_map(|pix| { let i = (pix % WINDOW_SIZE) as usize; - if let &None = &window[i] { + if window[i].is_none() { let val = crdt.read().unwrap().window_index_request(pix as u64); if let Ok((to, req)) = val { return Some((to, req)); @@ -223,7 +223,7 @@ fn repair_window( let reqs = find_next_missing(locked_window, crdt, consumed, received)?; trace!("{:x}: repair_window missing: {}", debug_id, reqs.len()); - if reqs.len() > 0 { + if !reqs.is_empty() { static mut COUNTER_REPAIR: Counter = create_counter!("streamer-repair_window-repair", LOG_RATE); inc_counter!(COUNTER_REPAIR, reqs.len()); @@ -249,6 +249,150 @@ fn repair_window( Ok(()) } +fn retransmit_all_leader_blocks( + maybe_leader: Option, + dq: &mut SharedBlobs, + debug_id: u64, + recycler: &BlobRecycler, + consumed: &mut u64, + received: &mut u64, + retransmit: &BlobSender, +) -> Result<()> { + let mut retransmit_queue = VecDeque::new(); + if let Some(leader) = maybe_leader { + for b in dq { + let p = b.read().expect("'b' read lock in fn recv_window"); + //TODO this check isn't safe against adverserial packets + //we need to maintain a sequence window + let leader_id = leader.id; + trace!( + "idx: {} addr: {:?} id: {:?} leader: {:?}", + p.get_index().expect("get_index in fn recv_window"), + p.get_id().expect("get_id in trace! fn recv_window"), + p.meta.addr(), + leader_id + ); + if p.get_id().expect("get_id in fn recv_window") == leader_id { + //TODO + //need to copy the retransmitted blob + //otherwise we get into races with which thread + //should do the recycling + // + //a better abstraction would be to recycle when the blob + //is dropped via a weakref to the recycler + let nv = recycler.allocate(); + { + let mut mnv = nv.write().expect("recycler write lock in fn recv_window"); + let sz = p.meta.size; + mnv.meta.size = sz; + mnv.data[..sz].copy_from_slice(&p.data[..sz]); + } + retransmit_queue.push_back(nv); + } + } + } else { + warn!("{:x}: no leader to retransmit from", debug_id); + } + if !retransmit_queue.is_empty() { + debug!( + "{:x}: RECV_WINDOW {} {}: retransmit {}", + debug_id, + *consumed, + *received, + retransmit_queue.len(), + ); + static mut COUNTER_RETRANSMIT: Counter = + create_counter!("streamer-recv_window-retransmit", LOG_RATE); + inc_counter!(COUNTER_RETRANSMIT, retransmit_queue.len()); + retransmit.send(retransmit_queue)?; + } + Ok(()) +} + +fn process_blob( + b: SharedBlob, + pix: u64, + w: usize, + consume_queue: &mut SharedBlobs, + locked_window: &Window, + debug_id: u64, + recycler: &BlobRecycler, + consumed: &mut u64, +) { + let mut window = locked_window.write().unwrap(); + + // Search the window for old blobs in the window + // of consumed to received and clear any old ones + for ix in *consumed..(pix + 1) { + let k = (ix % WINDOW_SIZE) as usize; + if let Some(b) = &mut window[k] { + if b.read().unwrap().get_index().unwrap() >= *consumed as u64 { + continue; + } + } + if let Some(b) = mem::replace(&mut window[k], None) { + recycler.recycle(b); + } + } + + // Insert the new blob into the window + // spot should be free because we cleared it above + if window[w].is_none() { + window[w] = Some(b); + } else if let Some(cblob) = &window[w] { + if cblob.read().unwrap().get_index().unwrap() != pix as u64 { + warn!("{:x}: overrun blob at index {:}", debug_id, w); + } else { + debug!("{:x}: duplicate blob at index {:}", debug_id, w); + } + } + loop { + let k = (*consumed % WINDOW_SIZE) as usize; + trace!("k: {} consumed: {}", k, *consumed); + + if window[k].is_none() { + break; + } + let mut is_coding = false; + if let Some(ref cblob) = window[k] { + let cblob_r = cblob + .read() + .expect("blob read lock for flogs streamer::window"); + if cblob_r.get_index().unwrap() < *consumed { + break; + } + if cblob_r.is_coding() { + is_coding = true; + } + } + if !is_coding { + consume_queue.push_back(window[k].clone().expect("clone in fn recv_window")); + *consumed += 1; + } else { + #[cfg(feature = "erasure")] + { + let block_start = *consumed - (*consumed % erasure::NUM_CODED as u64); + let coding_end = block_start + erasure::NUM_CODED as u64; + // We've received all this block's data blobs, go and null out the window now + for j in block_start..*consumed { + if let Some(b) = mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None) { + recycler.recycle(b); + } + } + for j in *consumed..coding_end { + window[(j % WINDOW_SIZE) as usize] = None; + } + + *consumed += erasure::MAX_MISSING as u64; + debug!( + "skipping processing coding blob k: {} consumed: {}", + k, *consumed + ); + } + } + } +} + fn recv_window( debug_id: u64, locked_window: &Window, @@ -278,57 +422,17 @@ fn recv_window( *received, dq.len(), ); - { - //retransmit all leader blocks - let mut retransmit_queue = VecDeque::new(); - if let Some(leader) = maybe_leader { - for b in &dq { - let p = b.read().expect("'b' read lock in fn recv_window"); - //TODO this check isn't safe against adverserial packets - //we need to maintain a sequence window - let leader_id = leader.id; - trace!( - "idx: {} addr: {:?} id: {:?} leader: {:?}", - p.get_index().expect("get_index in fn recv_window"), - p.get_id().expect("get_id in trace! fn recv_window"), - p.meta.addr(), - leader_id - ); - if p.get_id().expect("get_id in fn recv_window") == leader_id { - //TODO - //need to copy the retransmitted blob - //otherwise we get into races with which thread - //should do the recycling - // - //a better abstraction would be to recycle when the blob - //is dropped via a weakref to the recycler - let nv = recycler.allocate(); - { - let mut mnv = nv.write().expect("recycler write lock in fn recv_window"); - let sz = p.meta.size; - mnv.meta.size = sz; - mnv.data[..sz].copy_from_slice(&p.data[..sz]); - } - retransmit_queue.push_back(nv); - } - } - } else { - warn!("{:x}: no leader to retransmit from", debug_id); - } - if !retransmit_queue.is_empty() { - debug!( - "{:x}: RECV_WINDOW {} {}: retransmit {}", - debug_id, - *consumed, - *received, - retransmit_queue.len(), - ); - static mut COUNTER_RETRANSMIT: Counter = - create_counter!("streamer-recv_window-retransmit", LOG_RATE); - inc_counter!(COUNTER_RETRANSMIT, retransmit_queue.len()); - retransmit.send(retransmit_queue)?; - } - } + + retransmit_all_leader_blocks( + maybe_leader, + &mut dq, + debug_id, + recycler, + consumed, + received, + retransmit, + )?; + //send a contiguous set of blocks let mut consume_queue = VecDeque::new(); while let Some(b) = dq.pop_front() { @@ -353,82 +457,17 @@ fn recv_window( //if we get different blocks at the same index //that is a network failure/attack trace!("window w: {} size: {}", w, meta_size); - { - let mut window = locked_window.write().unwrap(); - - // Search the window for old blobs in the window - // of consumed to received and clear any old ones - for ix in *consumed..(pix + 1) { - let k = (ix % WINDOW_SIZE) as usize; - if let Some(b) = &mut window[k] { - if b.read().unwrap().get_index().unwrap() >= *consumed as u64 { - continue; - } - } - if let Some(b) = mem::replace(&mut window[k], None) { - recycler.recycle(b); - } - } - // Insert the new blob into the window - // spot should be free because we cleared it above - if window[w].is_none() { - window[w] = Some(b); - } else if let Some(cblob) = &window[w] { - if cblob.read().unwrap().get_index().unwrap() != pix as u64 { - warn!("{:x}: overrun blob at index {:}", debug_id, w); - } else { - debug!("{:x}: duplicate blob at index {:}", debug_id, w); - } - } - loop { - let k = (*consumed % WINDOW_SIZE) as usize; - trace!("k: {} consumed: {}", k, *consumed); - - if window[k].is_none() { - break; - } - let mut is_coding = false; - if let &Some(ref cblob) = &window[k] { - let cblob_r = cblob - .read() - .expect("blob read lock for flogs streamer::window"); - if cblob_r.get_index().unwrap() < *consumed { - break; - } - if cblob_r.is_coding() { - is_coding = true; - } - } - if !is_coding { - consume_queue.push_back(window[k].clone().expect("clone in fn recv_window")); - *consumed += 1; - } else { - #[cfg(feature = "erasure")] - { - let block_start = *consumed - (*consumed % erasure::NUM_CODED as u64); - let coding_end = block_start + erasure::NUM_CODED as u64; - // We've received all this block's data blobs, go and null out the window now - for j in block_start..*consumed { - if let Some(b) = - mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None) - { - recycler.recycle(b); - } - } - for j in *consumed..coding_end { - window[(j % WINDOW_SIZE) as usize] = None; - } - - *consumed += erasure::MAX_MISSING as u64; - debug!( - "skipping processing coding blob k: {} consumed: {}", - k, *consumed - ); - } - } - } - } + process_blob( + b, + pix, + w, + &mut consume_queue, + locked_window, + debug_id, + recycler, + consumed, + ); } print_window(debug_id, locked_window, *consumed); trace!("sending consume_queue.len: {}", consume_queue.len()); @@ -461,16 +500,14 @@ fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) { "_" } else if v.is_none() { "0" - } else { - if let &Some(ref cblob) = &v { - if cblob.read().unwrap().is_coding() { - "C" - } else { - "1" - } + } else if let Some(ref cblob) = v { + if cblob.read().unwrap().is_coding() { + "C" } else { - "0" + "1" } + } else { + "0" } }) .collect(); @@ -575,7 +612,7 @@ pub fn window( fn broadcast( me: &NodeInfo, - broadcast_table: &Vec, + broadcast_table: &[NodeInfo], window: &Window, recycler: &BlobRecycler, r: &BlobReceiver, diff --git a/src/thin_client.rs b/src/thin_client.rs index e40e4cf379b8fd..10d7b7373990af 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -39,7 +39,7 @@ impl ThinClient { transactions_addr: SocketAddr, transactions_socket: UdpSocket, ) -> Self { - let client = ThinClient { + ThinClient { requests_addr, requests_socket, transactions_addr, @@ -48,8 +48,7 @@ impl ThinClient { transaction_count: 0, balances: HashMap::new(), signature_status: false, - }; - client + } } pub fn recv_response(&self) -> io::Result { @@ -60,8 +59,8 @@ impl ThinClient { deserialize(&buf).or_else(|_| Err(io::Error::new(io::ErrorKind::Other, "deserialize"))) } - pub fn process_response(&mut self, resp: Response) { - match resp { + pub fn process_response(&mut self, resp: &Response) { + match *resp { Response::Balance { key, val } => { trace!("Response balance {:?} {:?}", key, val); self.balances.insert(key, val); @@ -76,13 +75,10 @@ impl ThinClient { } Response::SignatureStatus { signature_status } => { self.signature_status = signature_status; - match signature_status { - true => { - trace!("Response found signature"); - } - false => { - trace!("Response signature not found"); - } + if signature_status { + trace!("Response found signature"); + } else { + trace!("Response signature not found"); } } } @@ -90,7 +86,7 @@ impl ThinClient { /// Send a signed Transaction to the server for processing. This method /// does not wait for a response. - pub fn transfer_signed(&self, tx: Transaction) -> io::Result { + pub fn transfer_signed(&self, tx: &Transaction) -> io::Result { let data = serialize(&tx).expect("serialize Transaction in pub fn transfer_signed"); self.transactions_socket .send_to(&data, &self.transactions_addr) @@ -107,7 +103,7 @@ impl ThinClient { let now = Instant::now(); let tx = Transaction::new(keypair, to, n, *last_id); let sig = tx.sig; - let result = self.transfer_signed(tx).map(|_| sig); + let result = self.transfer_signed(&tx).map(|_| sig); metrics::submit( influxdb::Point::new("thinclient") .add_tag("op", influxdb::Value::String("transfer".to_string())) @@ -137,12 +133,12 @@ impl ThinClient { if let Response::Balance { key, .. } = &resp { done = key == pubkey; } - self.process_response(resp); + self.process_response(&resp); } self.balances .get(pubkey) - .map(|x| *x) - .ok_or(io::Error::new(io::ErrorKind::Other, "nokey")) + .cloned() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "nokey")) } /// Request the transaction count. If the response packet is dropped by the network, @@ -160,10 +156,10 @@ impl ThinClient { if let Ok(resp) = self.recv_response() { info!("recv_response {:?}", resp); - if let &Response::TransactionCount { .. } = &resp { + if let Response::TransactionCount { .. } = resp { done = true; } - self.process_response(resp); + self.process_response(&resp); } } self.transaction_count @@ -184,10 +180,10 @@ impl ThinClient { match self.recv_response() { Ok(resp) => { - if let &Response::LastId { .. } = &resp { + if let Response::LastId { .. } = resp { done = true; } - self.process_response(resp); + self.process_response(&resp); } Err(e) => { debug!("thin_client get_last_id error: {}", e); @@ -232,10 +228,10 @@ impl ThinClient { .expect("buffer error in pub fn get_last_id"); if let Ok(resp) = self.recv_response() { - if let &Response::SignatureStatus { .. } = &resp { + if let Response::SignatureStatus { .. } = resp { done = true; } - self.process_response(resp); + self.process_response(&resp); } } metrics::submit( @@ -355,7 +351,7 @@ mod tests { let tx = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); - let _sig = client.transfer_signed(tx).unwrap(); + let _sig = client.transfer_signed(&tx).unwrap(); let last_id = client.get_last_id(); @@ -364,7 +360,7 @@ mod tests { contract.tokens = 502; contract.plan = Plan::Budget(Budget::new_payment(502, bob_pubkey)); } - let _sig = client.transfer_signed(tr2).unwrap(); + let _sig = client.transfer_signed(&tr2).unwrap(); let balance = client.poll_get_balance(&bob_pubkey); assert_eq!(balance.unwrap(), 500); diff --git a/src/timing.rs b/src/timing.rs index 3ae7bf8c78f1b3..4c7eeae4f89b05 100644 --- a/src/timing.rs +++ b/src/timing.rs @@ -3,20 +3,20 @@ use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; pub fn duration_as_us(d: &Duration) -> u64 { - return (d.as_secs() * 1000 * 1000) + (d.subsec_nanos() as u64 / 1_000); + (d.as_secs() * 1000 * 1000) + (u64::from(d.subsec_nanos()) / 1_000) } pub fn duration_as_ms(d: &Duration) -> u64 { - return (d.as_secs() * 1000) + (d.subsec_nanos() as u64 / 1_000_000); + (d.as_secs() * 1000) + (u64::from(d.subsec_nanos()) / 1_000_000) } pub fn duration_as_s(d: &Duration) -> f32 { - return d.as_secs() as f32 + (d.subsec_nanos() as f32 / 1_000_000_000.0); + d.as_secs() as f32 + (d.subsec_nanos() as f32 / 1_000_000_000.0) } pub fn timestamp() -> u64 { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("create timestamp in timing"); - return duration_as_ms(&now); + duration_as_ms(&now) } diff --git a/src/tpu.rs b/src/tpu.rs index c0d14313c1ba0e..10a21c0b22db9e 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -52,18 +52,18 @@ pub struct Tpu { impl Tpu { pub fn new( - bank: Arc, - crdt: Arc>, + bank: &Arc, + crdt: &Arc>, tick_duration: Option, transactions_socket: UdpSocket, - blob_recycler: BlobRecycler, + blob_recycler: &BlobRecycler, exit: Arc, writer: W, ) -> (Self, BlobReceiver) { let packet_recycler = PacketRecycler::default(); let (fetch_stage, packet_receiver) = - FetchStage::new(transactions_socket, exit, packet_recycler.clone()); + FetchStage::new(transactions_socket, exit, &packet_recycler.clone()); let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver); diff --git a/src/tvu.rs b/src/tvu.rs index 4469bf73f93cde..cf1d79ce011da8 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -83,17 +83,17 @@ impl Tvu { let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket( vec![replicate_socket, repair_socket], exit, - blob_recycler.clone(), + &blob_recycler.clone(), ); //TODO //the packets coming out of blob_receiver need to be sent to the GPU and verified //then sent to the window, which does the erasure coding reconstruction let (window_stage, blob_window_receiver) = WindowStage::new( - crdt.clone(), + &crdt.clone(), window, entry_height, retransmit_socket, - blob_recycler.clone(), + &blob_recycler.clone(), blob_fetch_receiver, ); @@ -161,7 +161,7 @@ pub mod tests { ) -> Result<(Ncp, Window)> { let window = streamer::default_window(); let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - let ncp = Ncp::new(crdt, window.clone(), listen, send_sock, exit)?; + let ncp = Ncp::new(&crdt, window.clone(), listen, send_sock, exit)?; Ok((ncp, window)) } /// Test that message sent from leader to target1 and replicated to target2 diff --git a/src/voting.rs b/src/voting.rs index a32d202dc5365a..56bb10a5cd4df4 100644 --- a/src/voting.rs +++ b/src/voting.rs @@ -3,17 +3,15 @@ use hash::Hash; use signature::PublicKey; use transaction::{Instruction, Vote}; -pub fn entries_to_votes(entries: &Vec) -> Vec<(PublicKey, Vote, Hash)> { +pub fn entries_to_votes(entries: &[Entry]) -> Vec<(PublicKey, Vote, Hash)> { entries .iter() .flat_map(|entry| { let vs: Vec<(PublicKey, Vote, Hash)> = entry .transactions .iter() - .filter_map(|tx| match &tx.instruction { - &Instruction::NewVote(ref vote) => { - Some((tx.from.clone(), vote.clone(), tx.last_id.clone())) - } + .filter_map(|tx| match tx.instruction { + Instruction::NewVote(ref vote) => Some((tx.from, vote.clone(), tx.last_id)), _ => None, }) .collect(); diff --git a/src/window_stage.rs b/src/window_stage.rs index 74a0853de2a42f..244740d6ca0191 100644 --- a/src/window_stage.rs +++ b/src/window_stage.rs @@ -15,11 +15,11 @@ pub struct WindowStage { impl WindowStage { pub fn new( - crdt: Arc>, + crdt: &Arc>, window: Window, entry_height: u64, retransmit_socket: UdpSocket, - blob_recycler: BlobRecycler, + blob_recycler: &BlobRecycler, fetch_stage_receiver: BlobReceiver, ) -> (Self, BlobReceiver) { let (retransmit_sender, retransmit_receiver) = channel(); diff --git a/src/write_stage.rs b/src/write_stage.rs index 9e805623a28121..e0f62d06111f88 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -35,7 +35,7 @@ impl WriteStage { ) -> Result<()> { let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; let votes = entries_to_votes(&entries); - crdt.write().unwrap().insert_votes(votes); + crdt.write().unwrap().insert_votes(&votes); entry_writer.write_and_register_entries(&entries)?; trace!("New blobs? {}", entries.len()); let mut blobs = VecDeque::new(); diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 77ce210599568d..6dc58bac457aef 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -21,7 +21,7 @@ fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); let d = Ncp::new( - c.clone(), + &c.clone(), w, tn.sockets.gossip, tn.sockets.gossip_send, diff --git a/tests/multinode.rs b/tests/multinode.rs index b0979eb14f324e..be18f6cafa2ab7 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -35,7 +35,7 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = default_window(); let ncp = Ncp::new( - spy_ref.clone(), + &spy_ref.clone(), spy_window, spy.sockets.gossip, spy.sockets.gossip_send,