Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
parallelizes gossip packets receiver with processing of requests
Gossip packet processing is composed of two stages:
  * The first is consuming packets from the socket, deserializing,
    sanitizing and verifying them:
    https://github.com/solana-labs/solana/blob/7f0349b29/gossip/src/cluster_info.rs#L2510-L2521
  * The second is actually processing the requests/messages:
    https://github.com/solana-labs/solana/blob/7f0349b29/gossip/src/cluster_info.rs#L2585-L2605

The former does not acquire any locks and so can be parallelized with
the later, allowing better pipelineing properties and smaller latency in
responding to gossip requests or propagating messages.
  • Loading branch information
behzadnouri committed Jun 3, 2021
commit d53eaa1ea3446e0417d400dfbc7f7fcca1932102
108 changes: 79 additions & 29 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ use {
result::Result,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{Receiver, RecvTimeoutError, Sender},
{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard},
},
thread::{sleep, Builder, JoinHandle},
Expand Down Expand Up @@ -235,7 +236,7 @@ impl Default for ClusterInfo {
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, AbiExample)]
struct PruneData {
pub(crate) struct PruneData {
/// Pubkey of the node that sent this prune data
pubkey: Pubkey,
/// Pubkeys of nodes that should be pruned
Expand Down Expand Up @@ -329,7 +330,7 @@ pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
#[frozen_abi(digest = "GANv3KVkTYF84kmg1bAuWEZd9MaiYzPquuu13hup3379")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
enum Protocol {
pub(crate) enum Protocol {
/// Gossip protocol messages
PullRequest(CrdsFilter, CrdsValue),
PullResponse(Pubkey, Vec<CrdsValue>),
Expand Down Expand Up @@ -2494,7 +2495,7 @@ impl ClusterInfo {

fn process_packets(
&self,
packets: VecDeque<Packet>,
packets: VecDeque<(/*from:*/ SocketAddr, Protocol)>,
thread_pool: &ThreadPool,
recycler: &PacketsRecycler,
response_sender: &PacketSender,
Expand All @@ -2504,24 +2505,6 @@ impl ClusterInfo {
should_check_duplicate_instance: bool,
) -> Result<(), GossipError> {
let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time);
self.stats
.packets_received_count
.add_relaxed(packets.len() as u64);
let packets: Vec<_> = thread_pool.install(|| {
packets
.into_par_iter()
.filter_map(|packet| {
let protocol: Protocol =
limited_deserialize(&packet.data[..packet.meta.size]).ok()?;
protocol.sanitize().ok()?;
let protocol = protocol.par_verify()?;
Some((packet.meta.addr(), protocol))
})
.collect()
});
self.stats
.packets_received_verified_count
.add_relaxed(packets.len() as u64);
// Check if there is a duplicate instance of
// this node with more recent timestamp.
let check_duplicate_instance = |values: &[CrdsValue]| {
Expand Down Expand Up @@ -2606,23 +2589,64 @@ impl ClusterInfo {
Ok(())
}

// Consumes packets received from the socket, deserializing, sanitizing and
// verifying them and then sending them down the channel for the actual
// handling of requests/messages.
fn run_consume(
Copy link
Contributor

@carllin carllin Jun 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to run_socket_consume to be more specific, as "consume" is pretty generic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

&self,
receiver: &PacketReceiver,
sender: &Sender<Vec<(/*from:*/ SocketAddr, Protocol)>>,
thread_pool: &ThreadPool,
) -> Result<(), GossipError> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let packets: Vec<_> = receiver.recv_timeout(RECV_TIMEOUT)?.packets.into();
let mut packets = VecDeque::from(packets);
for payload in receiver.try_iter() {
packets.extend(payload.packets.iter().cloned());
let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC);
if excess_count > 0 {
packets.drain(0..excess_count);
self.stats
.gossip_packets_dropped_count
.add_relaxed(excess_count as u64);
}
}
self.stats
.packets_received_count
.add_relaxed(packets.len() as u64);
let verify_packet = |packet: Packet| {
let data = &packet.data[..packet.meta.size];
let protocol: Protocol = limited_deserialize(data).ok()?;
protocol.sanitize().ok()?;
let protocol = protocol.par_verify()?;
Some((packet.meta.addr(), protocol))
};
let packets: Vec<_> = {
let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time);
thread_pool.install(|| packets.into_par_iter().filter_map(verify_packet).collect())
};
self.stats
.packets_received_verified_count
.add_relaxed(packets.len() as u64);
Ok(sender.send(packets)?)
}

/// Process messages from the network
fn run_listen(
&self,
recycler: &PacketsRecycler,
bank_forks: Option<&RwLock<BankForks>>,
requests_receiver: &PacketReceiver,
receiver: &Receiver<Vec<(/*from:*/ SocketAddr, Protocol)>>,
response_sender: &PacketSender,
thread_pool: &ThreadPool,
last_print: &mut Instant,
should_check_duplicate_instance: bool,
) -> Result<(), GossipError> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
const SUBMIT_GOSSIP_STATS_INTERVAL: Duration = Duration::from_secs(2);
let packets: Vec<_> = requests_receiver.recv_timeout(RECV_TIMEOUT)?.packets.into();
let mut packets = VecDeque::from(packets);
while let Ok(packet) = requests_receiver.try_recv() {
packets.extend(packet.packets.iter().cloned());
let mut packets = VecDeque::from(receiver.recv_timeout(RECV_TIMEOUT)?);
for payload in receiver.try_iter() {
packets.extend(payload);
let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC);
if excess_count > 0 {
packets.drain(0..excess_count);
Expand Down Expand Up @@ -2659,10 +2683,35 @@ impl ClusterInfo {
Ok(())
}

pub fn listen(
pub(crate) fn consume(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: functions in the code base that start threads generally start with start_*_thread, how about start_socket_consume_thread()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self: Arc<Self>,
receiver: PacketReceiver,
sender: Sender<Vec<(/*from:*/ SocketAddr, Protocol)>>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let thread_pool = ThreadPoolBuilder::new()
.num_threads(get_thread_count().min(8))
.thread_name(|i| format!("gossip-consume-{}", i))
.build()
.unwrap();
let run_consume = move || {
while !exit.load(Ordering::Relaxed) {
match self.run_consume(&receiver, &sender, &thread_pool) {
Err(GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break,
Err(GossipError::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Err(err) => error!("gossip consume: {}", err),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, the only other potential error other than the Timeout is Ok(sender.send(packets)?), https://github.com/solana-labs/solana/pull/17647/files#diff-b07406ad1c913d21d39a4163cb34f48c55cef6d472c814419a732f3937ffceddR2631. Hopefully this one doesn't have a chance of being too spammy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I guess should be fine.

Ok(()) => (),
}
}
};
let thread_name = String::from("gossip-consume");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "gossip-consume" -> "gossip-socket-consume"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread names in linux are limited to 15 characters, and everything after the 15th character is trimmed when inspecting running processes from shell, which will make it confusing if I use a longer name here.

Builder::new().name(thread_name).spawn(run_consume).unwrap()
}

pub(crate) fn listen(
self: Arc<Self>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
requests_receiver: PacketReceiver,
requests_receiver: Receiver<Vec<(/*from:*/ SocketAddr, Protocol)>>,
response_sender: PacketSender,
should_check_duplicate_instance: bool,
exit: &Arc<AtomicBool>,
Expand All @@ -2689,7 +2738,8 @@ impl ClusterInfo {
should_check_duplicate_instance,
) {
match err {
GossipError::RecvTimeoutError(_) => {
GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
GossipError::RecvTimeoutError(RecvTimeoutError::Timeout) => {
let table_size = self.gossip.read().unwrap().crds.len();
debug!(
"{}: run_listen timeout, table size: {}",
Expand Down
6 changes: 6 additions & 0 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub(crate) struct GossipStats {
pub(crate) trim_crds_table_failed: Counter,
pub(crate) trim_crds_table_purged_values_count: Counter,
pub(crate) tvu_peers: Counter,
pub(crate) verify_gossip_packets_time: Counter,
}

pub(crate) fn submit_gossip_stats(
Expand Down Expand Up @@ -171,6 +172,11 @@ pub(crate) fn submit_gossip_stats(
stats.process_gossip_packets_time.clear(),
i64
),
(
"verify_gossip_packets_time",
stats.verify_gossip_packets_time.clear(),
i64
),
(
"handle_batch_ping_messages_time",
stats.handle_batch_ping_messages_time.clear(),
Expand Down
15 changes: 12 additions & 3 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,15 @@ impl GossipService {
1,
);
let (response_sender, response_receiver) = channel();
let t_responder = streamer::responder("gossip", gossip_socket, response_receiver);
let (consume_sender, listen_receiver) = channel();
let t_consume =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit t_consume -> t_socket_consume

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

cluster_info
.clone()
.consume(request_receiver, consume_sender, exit.clone());
let t_listen = ClusterInfo::listen(
cluster_info.clone(),
bank_forks.clone(),
request_receiver,
listen_receiver,
response_sender.clone(),
should_check_duplicate_instance,
exit,
Expand All @@ -72,7 +76,12 @@ impl GossipService {
gossip_validators,
exit,
);
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
// To work around:
// https://github.com/rust-lang/rust/issues/54267
// responder thread should start after response_sender.clone(). see:
// https://github.com/rust-lang/rust/issues/39364#issuecomment-381446873
let t_responder = streamer::responder("gossip", gossip_socket, response_receiver);
let thread_hdls = vec![t_receiver, t_responder, t_consume, t_listen, t_gossip];
Self { thread_hdls }
}

Expand Down