diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index b4dac482472ed0..b2d982f7cdbd5f 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -9,8 +9,10 @@ use solana_core::contact_info::ContactInfo; use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use solana_core::packet::to_packets_chunked; use solana_core::retransmit_stage::retransmitter; +use solana_core::weighted_shuffle::weighted_best; use solana_ledger::bank_forks::BankForks; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; +use solana_ledger::staking_utils; use solana_measure::measure::Measure; use solana_perf::test_tx::test_tx; use solana_runtime::bank::Bank; @@ -68,6 +70,23 @@ fn bench_retransmitter(bencher: &mut Bencher) { let batches = to_packets_chunked(&vec![tx; NUM_PACKETS], chunk_size); info!("batches: {}", batches.len()); + // compute the selected index here, make sure to change cluster_info ownership to critical path id + let working_bank = bank_forks.read().unwrap().working_bank(); + let bank_epoch = working_bank.get_leader_schedule_epoch(working_bank.slot()); + let stakes = staking_utils::staked_nodes_at_epoch(&working_bank, bank_epoch); + let stakes = stakes.map(Arc::new); + let (peers, stakes_and_index) = cluster_info + .read() + .unwrap() + .sorted_retransmit_peers_and_stakes(stakes); + // the packet seeds are always zero, just find the id that gets picked first + let broadcast_index = weighted_best(&stakes_and_index, batches[0].packets[0].meta.seed); + { + let mut w_cinfo = cluster_info.write().unwrap(); + w_cinfo.gossip.set_self(&peers[broadcast_index].id); + w_cinfo.insert_self(peers[broadcast_index].clone()); + } + let retransmitter_handles = retransmitter( Arc::new(sockets), bank_forks, diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 880b943b6f01b5..4c4bde4c776c7d 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -81,9 +81,7 @@ fn retransmit( continue; } if packet.meta.repair { - total_packets -= 1; repair_total += 1; - continue; } let mut compute_turbine_peers = Measure::start("turbine_start"); @@ -111,7 +109,8 @@ fn retransmit( let leader = leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref())); let mut retransmit_time = Measure::start("retransmit_to"); - if !packet.meta.forward { + // If I am on the critical path for this packet, send it to everyone + if my_index % DATA_PLANE_FANOUT == 0 { ClusterInfo::retransmit_to(&neighbors, packet, leader, sock, true)?; ClusterInfo::retransmit_to(&children, packet, leader, sock, false)?; } else { @@ -274,82 +273,3 @@ impl RetransmitStage { Ok(()) } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::contact_info::ContactInfo; - use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use crate::packet::{self, Meta, Packet, Packets}; - use solana_ledger::blockstore_processor::{process_blockstore, ProcessOptions}; - use solana_ledger::create_new_tmp_ledger; - use solana_net_utils::find_available_port_in_range; - use solana_sdk::pubkey::Pubkey; - - #[test] - fn test_skip_repair() { - let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(123); - let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config); - let blockstore = Blockstore::open(&ledger_path).unwrap(); - let opts = ProcessOptions { - full_leader_cache: true, - ..ProcessOptions::default() - }; - let (bank_forks, _, cached_leader_schedule) = - process_blockstore(&genesis_config, &blockstore, Vec::new(), opts).unwrap(); - let leader_schedule_cache = Arc::new(cached_leader_schedule); - let bank_forks = Arc::new(RwLock::new(bank_forks)); - - let mut me = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); - let port = find_available_port_in_range((8000, 10000)).unwrap(); - let me_retransmit = UdpSocket::bind(format!("127.0.0.1:{}", port)).unwrap(); - // need to make sure tvu and tpu are valid addresses - me.tvu_forwards = me_retransmit.local_addr().unwrap(); - let port = find_available_port_in_range((8000, 10000)).unwrap(); - me.tvu = UdpSocket::bind(format!("127.0.0.1:{}", port)) - .unwrap() - .local_addr() - .unwrap(); - - let other = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); - let mut cluster_info = ClusterInfo::new_with_invalid_keypair(other); - cluster_info.insert_info(me); - - let retransmit_socket = Arc::new(vec![UdpSocket::bind("0.0.0.0:0").unwrap()]); - let cluster_info = Arc::new(RwLock::new(cluster_info)); - - let (retransmit_sender, retransmit_receiver) = channel(); - let t_retransmit = retransmitter( - retransmit_socket, - bank_forks, - &leader_schedule_cache, - cluster_info, - Arc::new(Mutex::new(retransmit_receiver)), - ); - let _thread_hdls = vec![t_retransmit]; - - let packets = Packets::new(vec![Packet::default()]); - // it should send this over the sockets. - retransmit_sender.send(packets).unwrap(); - let mut packets = Packets::new(vec![]); - packet::recv_from(&mut packets, &me_retransmit).unwrap(); - assert_eq!(packets.packets.len(), 1); - assert_eq!(packets.packets[0].meta.repair, false); - - let repair = Packet { - meta: Meta { - repair: true, - ..Meta::default() - }, - ..Packet::default() - }; - - // send 1 repair and 1 "regular" packet so that we don't block forever on the recv_from - let packets = Packets::new(vec![repair, Packet::default()]); - retransmit_sender.send(packets).unwrap(); - let mut packets = Packets::new(vec![]); - packet::recv_from(&mut packets, &me_retransmit).unwrap(); - assert_eq!(packets.packets.len(), 1); - assert_eq!(packets.packets[0].meta.repair, false); - } -} diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index d165ac52a12eed..94fbe4a9412efb 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use std::sync::Mutex; use std::time::Instant; -type Nodes = HashMap, Receiver<(i32, bool)>)>; +type Nodes = HashMap, Receiver)>; fn num_threads() -> usize { sys_info::cpu_num().unwrap_or(10) as usize @@ -29,11 +29,10 @@ fn find_insert_shred(id: &Pubkey, shred: i32, batches: &mut [Nodes]) { fn retransmit( mut shuffled_nodes: Vec, - senders: &HashMap>, + senders: &HashMap>, cluster: &ClusterInfo, fanout: usize, shred: i32, - retransmit: bool, ) -> i32 { let mut seed = [0; 32]; let mut my_index = 0; @@ -48,17 +47,18 @@ fn retransmit( } }); seed[0..4].copy_from_slice(&shred.to_le_bytes()); + let retransmit = my_index % fanout == 0; let shuffled_indices = (0..shuffled_nodes.len()).collect(); let (neighbors, children) = compute_retransmit_peers(fanout, my_index, shuffled_indices); children.into_iter().for_each(|i| { let s = senders.get(&shuffled_nodes[i].id).unwrap(); - let _ = s.send((shred, retransmit)); + let _ = s.send(shred); }); if retransmit { neighbors.into_iter().for_each(|i| { let s = senders.get(&shuffled_nodes[i].id).unwrap(); - let _ = s.send((shred, false)); + let _ = s.send(shred); }); } @@ -79,8 +79,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) { // setup accounts for all nodes (leader has 0 bal) let (s, r) = channel(); - let senders: Arc>>> = - Arc::new(Mutex::new(HashMap::new())); + let senders: Arc>>> = Arc::new(Mutex::new(HashMap::new())); senders.lock().unwrap().insert(leader_info.id, s); let mut batches: Vec = Vec::with_capacity(num_threads); (0..num_threads).for_each(|_| batches.push(HashMap::new())); @@ -159,7 +158,6 @@ fn run_simulation(stakes: &[u64], fanout: usize) { &cluster, fanout, *i, - true, ); }); *layer1_done = true; @@ -169,7 +167,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) { if recv.len() < shreds_len { loop { match r.try_recv() { - Ok((data, retx)) => { + Ok(data) => { if recv.insert(data) { let _ = retransmit( shuffled_peers[data as usize].clone(), @@ -177,7 +175,6 @@ fn run_simulation(stakes: &[u64], fanout: usize) { &cluster, fanout, data, - retx, ); } if recv.len() == shreds_len {