Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge latest changes
  • Loading branch information
AgeManning committed Dec 6, 2023
commit 2a94685a5fa0487b617ab48c8c0f4f77de998bb9
6 changes: 5 additions & 1 deletion protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
## 0.46.1 - unreleased
- Adds scoring for slow peers and introduces a message to inform the application of slow peers.

- Adds metrics for priority and non-priority queue lengths.

- Removes the control pool and sends control messages on demand.

- Implement publish and forward message dropping.

- Implement backpressure by diferentiating between priority and non priority messages.
- Implement backpressure by differentiating between priority and non priority messages.
Drop `Publish` and `Forward` messages when the queue becomes full.
See [PR 4914](https://github.com/libp2p/rust-libp2p/pull/4914)

Expand Down
56 changes: 47 additions & 9 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ use crate::{
config::{Config, ValidationMode},
types::RpcOut,
};
use crate::{gossip_promises::GossipPromises, types::Graft};
use crate::{
handler::{Handler, HandlerEvent, HandlerIn},
types::Prune,
};
use crate::{mcache::MessageCache, types::IWant};
use crate::{
metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
types::IHave,
};
use crate::{rpc_proto::proto, FailedMessages, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use instant::SystemTime;
Expand Down Expand Up @@ -1309,9 +1319,23 @@ where
.get_mut(peer_id)
.expect("Peerid should exist");

sender.iwant(IWant {
message_ids: iwant_ids_vec,
});
if sender
.iwant(IWant {
message_ids: iwant_ids_vec,
})
.is_err()
{
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IWANT");

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
// Increment failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
}
}
tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
}
Expand Down Expand Up @@ -2523,7 +2547,7 @@ where

tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());

for peer in to_msg_peers {
for peer_id in to_msg_peers {
let mut peer_message_ids = message_ids.clone();

if peer_message_ids.len() > self.config.max_ihave_length() {
Expand All @@ -2537,12 +2561,26 @@ where
// send an IHAVE message
let sender = self
.handler_send_queues
.get_mut(&peer)
.get_mut(&peer_id)
.expect("Peerid should exist");
sender.ihave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
});
if sender
.ihave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
})
.is_err()
{
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IHAVE");

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(&peer_id);
}
// Increment failed message count
self.failed_messages
.entry(peer_id)
.or_default()
.non_priority += 1;
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ fn test_join() {
)
.unwrap();
peers.push(peer);
let sender = RpcSender::new(random_peer, gs.config.connection_handler_queue_len());
let sender = RpcSender::new(gs.config.connection_handler_queue_len());
let receiver = sender.new_receiver();
gs.handler_send_queues.insert(random_peer, sender);
receivers.insert(random_peer, receiver);
Expand Down
28 changes: 10 additions & 18 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,28 +634,20 @@ impl RpcSender {

/// Send a `RpcOut::IHave` message to the `RpcReceiver`
/// this is low priority and if queue is full the message is dropped.
pub(crate) fn ihave(&mut self, ihave: IHave) {
if let Err(err) = self.non_priority.try_send(RpcOut::IHave(ihave)) {
let rpc = err.into_inner();
tracing::trace!(
"IHAVE message {:?} to peer {} dropped, queue is full",
rpc,
self.peer_id
);
}
#[must_use]
pub(crate) fn ihave(&mut self, ihave: IHave) -> Result<(), ()> {
self.non_priority
.try_send(RpcOut::IHave(ihave))
.map_err(|_| ())
}

/// Send a `RpcOut::IHave` message to the `RpcReceiver`
/// this is low priority and if queue is full the message is dropped.
pub(crate) fn iwant(&mut self, iwant: IWant) {
if let Err(err) = self.non_priority.try_send(RpcOut::IWant(iwant)) {
let rpc = err.into_inner();
tracing::trace!(
"IWANT message {:?} to peer {} dropped, queue is full",
rpc,
self.peer_id
);
}
#[must_use]
pub(crate) fn iwant(&mut self, iwant: IWant) -> Result<(), ()> {
self.non_priority
.try_send(RpcOut::IWant(iwant))
.map_err(|_| ())
}

/// Send a `RpcOut::Subscribe` message to the `RpcReceiver`
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.