Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Inform application layer of slow peer
  • Loading branch information
AgeManning committed Dec 4, 2023
commit cc9cb89b79f763b6bb893b21916344f91dd924c4
39 changes: 39 additions & 0 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFi
use crate::time_cache::DuplicateCache;
use crate::topic::{Hasher, Topic, TopicHash};
use crate::transform::{DataTransform, IdentityTransform};
use crate::types::ExpiredMessages;
use crate::types::{
ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription,
SubscriptionAction,
Expand Down Expand Up @@ -147,6 +148,11 @@ pub enum Event {
},
/// A peer that does not support gossipsub has connected.
GossipsubNotSupported { peer_id: PeerId },
/// A peer is not able to download messages in time.
SlowPeer {
peer_id: PeerId,
expired_messages: ExpiredMessages,
},
}

/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`]
Expand Down Expand Up @@ -338,6 +344,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {

/// Connection handler message queue channels.
handler_send_queues: HashMap<PeerId, RpcSender>,

/// Tracks the numbers of failed messages per peer-id.
expired_messages: HashMap<PeerId, ExpiredMessages>,
}

impl<D, F> Behaviour<D, F>
Expand Down Expand Up @@ -478,6 +487,7 @@ where
subscription_filter,
data_transform,
handler_send_queues: Default::default(),
expired_messages: Default::default(),
})
}
}
Expand Down Expand Up @@ -2444,6 +2454,16 @@ where
// shift the memcache
self.mcache.shift();

// Report expired messages
for (peer_id, expired_messages) in self.expired_messages.drain() {
self.events
.push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
peer_id,
expired_messages,
}));
}
self.expired_messages.shrink_to_fit();

tracing::debug!("Completed Heartbeat");
if let Some(metrics) = self.metrics.as_mut() {
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
Expand Down Expand Up @@ -3133,6 +3153,25 @@ where
HandlerEvent::MessageDropped(rpc) => {
// TODO:
// * Build scoring logic to handle peers that are dropping messages

// Keep track of expired messages for the application layer.
match rpc {
RpcOut::Publish { .. } => {
self.expired_messages
.entry(propagation_source)
.or_default()
.increment_publish();
}
RpcOut::Forward { .. } => {
self.expired_messages
.entry(propagation_source)
.or_default()
.increment_forward();
}
_ => {} //
}

// Record metrics on the failure.
if let Some(metrics) = self.metrics.as_mut() {
match rpc {
RpcOut::Publish { message, .. } => {
Expand Down
1 change: 1 addition & 0 deletions protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,4 @@ pub type Rpc = self::types::Rpc;

pub type IdentTopic = Topic<self::topic::IdentityHash>;
pub type Sha256Topic = Topic<self::topic::Sha256Hash>;
pub use self::types::ExpiredMessages;
26 changes: 26 additions & 0 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,32 @@ use crate::rpc_proto::proto;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

/// The kind of message a peer is unable to download in time.
#[derive(Clone, Debug, Default)]
pub struct ExpiredMessages {
/// The number of publish messages that failed to be published in a heartbeat.
pub publish: usize,
/// The number of forward messages that failed to be published in a heartbeat.
pub forward: usize,
}

impl ExpiredMessages {
/// Increments the number of expired publish messages.
pub fn increment_publish(&mut self) {
self.publish += 1;
}

/// Increments the number of expired forward messages.
pub fn increment_forward(&mut self) {
self.forward += 1;
}

/// Provides the total expired messages.
pub fn total(&self) -> usize {
self.publish + self.forward
}
}

#[derive(Debug)]
/// Validation kinds from the application for received messages.
pub enum MessageAcceptance {
Expand Down