Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
peers publish messages to themselves if they are subscribed
  • Loading branch information
ong-jonas committed Oct 24, 2023
commit a739d946dd0e8e89b93f5c84f9b457e6e9e9ff9d
12 changes: 6 additions & 6 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
messages::{Message, Topic},
};
use libp2p::{
gossipsub::{self, ConfigBuilder, MessageAuthenticity, MessageId, PublishError},
gossipsub::{self, ConfigBuilder, MessageAuthenticity, MessageId, PublishError, TopicHash},
identify,
identity::{
ed25519::{Keypair, PublicKey},
Expand Down Expand Up @@ -151,8 +151,8 @@ impl Behaviour {
}

/// Check if the [gossipsub::Message] is subscribed by this peer
pub fn is_subscribed(&self, message: &gossipsub::Message) -> bool {
self.gossip.topics().any(|topic| message.topic.eq(topic))
pub fn is_subscribed(&self, topic_hash: &TopicHash) -> bool {
self.gossip.topics().any(|topic| topic_hash.eq(topic))
}
}

Expand Down Expand Up @@ -293,7 +293,7 @@ mod test {
sequence_number: None,
topic: mailbox_topic_hash,
};
assert!(peer.behaviour.is_subscribed(&mailbox_topic_msg));
assert!(peer.behaviour.is_subscribed(&mailbox_topic_msg.topic));

let hotstuff_rs_msg = gossipsub::Message {
source: None,
Expand All @@ -313,7 +313,7 @@ mod test {
let subscribed_topics: Vec<&MessageTopicHash> = peer.behaviour.gossip.topics().collect();
assert_eq!(subscribed_topics.len(), 2); //including the initial subscribed topic

assert!(peer.behaviour.is_subscribed(&hotstuff_rs_msg));
assert!(!peer.behaviour.is_subscribed(&unsubscribed_msg));
assert!(peer.behaviour.is_subscribed(&hotstuff_rs_msg.topic));
assert!(!peer.behaviour.is_subscribed(&unsubscribed_msg.topic));
}
}
10 changes: 5 additions & 5 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,12 @@ fn start_event_handling(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, me
match peer_command {
PeerCommand::Publish(topic, message) => {
log::info!("Publishing (Topic: {:?})", topic);
if topic == Topic::HotStuffRsSend(local_public_address) {
// send to myself
if swarm.behaviour().is_subscribed(&topic.clone().hash()) {
// Send it to ourselves if we subscribed to this topic
message_handlers.iter()
.for_each(|handler| handler(local_public_address, message.clone()));
}
else if let Err(e) = swarm.behaviour_mut().publish(topic, message) {
if let Err(e) = swarm.behaviour_mut().publish(topic, message) {
log::debug!("Failed to publish the message. {:?}", e);
}
}
Expand All @@ -267,7 +267,7 @@ fn start_event_handling(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, me
}
}

// 3. Forward subscribed messages to Message Handlers when a NetworkEvent is received
// 3. Deliver messages when a NetworkEvent is received
if let Some(event) = event {
match event {
SwarmEvent::Behaviour(NetworkEvent::Gossip(gossipsub::Event::Message {
Expand All @@ -279,7 +279,7 @@ fn start_event_handling(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, me
conversions::PublicAddress::try_from(*src_peer_id)
{
let public_addr: PublicAddress = public_addr.into();
if swarm.behaviour().is_subscribed(&message) {
if swarm.behaviour().is_subscribed(&message.topic) {
// Send it to ourselves if we subscribed to this topic
if let Ok(message) =
Message::try_from((message, local_public_address))
Expand Down