Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
messages::{Message, Topic},
};
use libp2p::{
gossipsub::{self, ConfigBuilder, MessageAuthenticity, MessageId, PublishError},
gossipsub::{self, ConfigBuilder, MessageAuthenticity, MessageId, PublishError, TopicHash},
identify,
identity::{
ed25519::{Keypair, PublicKey},
Expand Down Expand Up @@ -150,9 +150,9 @@ impl Behaviour {
self.gossip.publish(topic.hash(), msg)
}

/// Check if the [gossipsub::Message] is subscribed by this peer
pub fn is_subscribed(&self, message: &gossipsub::Message) -> bool {
self.gossip.topics().any(|topic| message.topic.eq(topic))
/// Check if the [gossipsub::topic] is subscribed by this peer
pub fn is_subscribed(&self, topic_hash: &TopicHash) -> bool {
self.gossip.topics().any(|topic| topic_hash.eq(topic))
}
}

Expand Down Expand Up @@ -293,7 +293,7 @@ mod test {
sequence_number: None,
topic: mailbox_topic_hash,
};
assert!(peer.behaviour.is_subscribed(&mailbox_topic_msg));
assert!(peer.behaviour.is_subscribed(&mailbox_topic_msg.topic));

let hotstuff_rs_msg = gossipsub::Message {
source: None,
Expand All @@ -313,7 +313,7 @@ mod test {
let subscribed_topics: Vec<&MessageTopicHash> = peer.behaviour.gossip.topics().collect();
assert_eq!(subscribed_topics.len(), 2); //including the initial subscribed topic

assert!(peer.behaviour.is_subscribed(&hotstuff_rs_msg));
assert!(!peer.behaviour.is_subscribed(&unsubscribed_msg));
assert!(peer.behaviour.is_subscribed(&hotstuff_rs_msg.topic));
assert!(!peer.behaviour.is_subscribed(&unsubscribed_msg.topic));
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//!
//! ```no_run
//! use crate::Config;
//! use crate::peer::PeerBuilder;
//! use crate::peer::Peer;
//! use crate::messages::Message;
//!
//!
Expand Down
83 changes: 34 additions & 49 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
//!
//! To start a pchain-network peer, users pass a [Config] instance and message handlers into Peer::start().
//! [Config] contains the peer's keypair, or other deployment-specific parameters, such as listening ports, bootstrap nodes etc.
//! Then, the users define the message handlers for processing the [Message].
//! Starting Peer will return a mpsc Sender for delivering PeerCommand to the thread.
//! Users need to define the message handlers for processing the [Message].
//! Starting Peer will return a Sender for delivering PeerCommand to the thread.
//!
//! Example:
//!
Expand Down Expand Up @@ -58,26 +58,38 @@ use crate::{
};



pub struct Peer {
/// Network handle for the [tokio::task] which is the main thread for the p2p network
pub(crate) handle: JoinHandle<()>,

/// mpsc sender for delivering [PeerCommand] to the internal thread, commands are used to
/// publish [Topic] specific messages to the p2p network.
/// publish messages with specific [Topic] to the p2p network.
pub(crate) sender: tokio::sync::mpsc::Sender<PeerCommand>,
}

impl Peer {
/// Constructs a [Peer] from the given configuration and handlers and start the thread for the p2p network
/// Constructs a [Peer] from the given configuration and handlers, starting the thread for the p2p network
/// 1. Load network configuration to set up transport for the P2P network.
/// 2. Establishes connection to the network by adding bootnodes and subscribing to message [Topic].
/// 3. Spawns an asynchronous [tokio] task and enters the main event loop, returning a mpsc Sender used for sending
/// 2. Peer starts listening on the given config address
/// 3. Establishes connection to the network by adding bootnodes and subscribing to message [Topic].
/// 4. Spawns an asynchronous [tokio] task and enters the event handling loop, returning a Sender used for sending
/// [PeerCommand] to the internal thread.
pub async fn start(config: Config, handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>>) -> Result<Peer, PeerStartError> {
let mut swarm = set_up_transport(&config).await?;
swarm = establish_network_connections(swarm, &config)?;
let (handle, sender) = main_loop(swarm, &config, handlers)?;

let mut swarm = set_up_transport(&config)
.await
.map_err(PeerStartError::SystemConfigError)?;

swarm.listen_on(conversions::multi_addr(
Ipv4Addr::new(0, 0, 0, 0),
config.listening_port
))
.map_err(PeerStartError::UnsupportedAddressError)?;

swarm = establish_network_connections(swarm, &config)
.map_err(PeerStartError::SubscriptionError)?;

let (handle, sender) = start_event_handling(swarm, &config, handlers);
Ok(
Peer {
handle,
Expand Down Expand Up @@ -133,7 +145,7 @@ pub(crate) enum PeerCommand {
}

/// Loads the network configuration from [Config] and build the transport for the P2P network
async fn set_up_transport(config: &Config) -> Result<libp2p::Swarm<Behaviour>,PeerStartError> {
async fn set_up_transport(config: &Config) -> Result<libp2p::Swarm<Behaviour>,std::io::Error> {
// Read network configuration
let local_keypair = &config.keypair;
let local_public_address: PublicAddress = local_keypair.public().to_bytes();
Expand All @@ -150,21 +162,14 @@ async fn set_up_transport(config: &Config) -> Result<libp2p::Swarm<Behaviour>,Pe
&local_keypair,
config.kademlia_protocol_name.clone(),
);

let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();
let multiaddr = conversions::multi_addr(
Ipv4Addr::new(0, 0, 0, 0),
config.listening_port
);
swarm.listen_on(multiaddr)?;

let swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();
Ok(swarm)
}

/// Peer establishes network specific connections:
/// 1. Adds network bootnodes to local routing table
/// 2. Subscribes to network specific message [Topic]
fn establish_network_connections(mut swarm: libp2p::Swarm<Behaviour> , config: &Config) -> Result<libp2p::Swarm<Behaviour>,PeerStartError> {
fn establish_network_connections(mut swarm: libp2p::Swarm<Behaviour> , config: &Config) -> Result<libp2p::Swarm<Behaviour>,gossipsub::SubscriptionError> {
// Connection to bootstrap nodes
if !config.boot_nodes.is_empty() {
config.boot_nodes.iter().for_each(|peer_info| {
Expand All @@ -184,8 +189,7 @@ fn establish_network_connections(mut swarm: libp2p::Swarm<Behaviour> , config: &
Ok(swarm)
}

/// Spawns the Main Event loop for p2p network.
/// It waits for:
/// Spawns the Main Event Handling loop for p2p network. It waits for:
/// 1. [NetworkEvent]
/// 2. [Commands](PeerCommand) from application for sending messages or termination
/// 3. a periodic interval to discover peers in the network
Expand All @@ -211,8 +215,8 @@ fn establish_network_connections(mut swarm: libp2p::Swarm<Behaviour> , config: &
/// Upon receiving a (Shutdown)[PeerCommand::Shutdown] command, the process will exit the loop and terminate
/// the thread.
///
fn main_loop(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, message_handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>>) ->
Result<(JoinHandle<()>,tokio::sync::mpsc::Sender<PeerCommand>), PeerStartError> {
fn start_event_handling(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, message_handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>>) ->
(JoinHandle<()>,tokio::sync::mpsc::Sender<PeerCommand>) {
// 4. Start p2p networking
let local_public_address = config.keypair.public().to_bytes();
let (sender, mut receiver) =
Expand Down Expand Up @@ -247,12 +251,12 @@ fn main_loop(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, message_handl
match peer_command {
PeerCommand::Publish(topic, message) => {
log::info!("Publishing (Topic: {:?})", topic);
if topic == Topic::HotStuffRsSend(local_public_address) {
// send to myself
if swarm.behaviour().is_subscribed(&topic.clone().hash()) {
// Send it to ourselves if we subscribed to this topic
message_handlers.iter()
.for_each(|handler| handler(local_public_address, message.clone()));
}
else if let Err(e) = swarm.behaviour_mut().publish(topic, message) {
if let Err(e) = swarm.behaviour_mut().publish(topic, message) {
log::debug!("Failed to publish the message. {:?}", e);
}
}
Expand All @@ -263,7 +267,7 @@ fn main_loop(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, message_handl
}
}

// 3. Forward subscribed messages to Message Handlers when a NetworkEvent is received
// 3. Deliver messages when a NetworkEvent is received
if let Some(event) = event {
match event {
SwarmEvent::Behaviour(NetworkEvent::Gossip(gossipsub::Event::Message {
Expand All @@ -275,7 +279,7 @@ fn main_loop(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, message_handl
conversions::PublicAddress::try_from(*src_peer_id)
{
let public_addr: PublicAddress = public_addr.into();
if swarm.behaviour().is_subscribed(&message) {
if swarm.behaviour().is_subscribed(&message.topic) {
// Send it to ourselves if we subscribed to this topic
if let Ok(message) =
Message::try_from((message, local_public_address))
Expand Down Expand Up @@ -306,9 +310,7 @@ fn main_loop(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, message_handl
}
});

Ok(
(network_thread_handle, sender)
)
(network_thread_handle, sender)
}


Expand Down Expand Up @@ -343,20 +345,3 @@ pub enum PeerStartError {
UnsupportedAddressError(libp2p::TransportError<std::io::Error>),
}

impl From<std::io::Error> for PeerStartError {
fn from(error: std::io::Error) -> PeerStartError {
PeerStartError::SystemConfigError(error)
}
}

impl From<libp2p::gossipsub::SubscriptionError> for PeerStartError {
fn from(error: libp2p::gossipsub::SubscriptionError) -> PeerStartError {
PeerStartError::SubscriptionError(error)
}
}

impl From<libp2p::TransportError<std::io::Error>> for PeerStartError {
fn from(error: libp2p::TransportError<std::io::Error>) -> PeerStartError {
PeerStartError::UnsupportedAddressError(error)
}
}
37 changes: 30 additions & 7 deletions tests/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,30 +276,31 @@ async fn test_sparse_messaging() {
}

// - Network: Node1
// - Node1: keep sending message to itself only
// - Node1: keep broadcasting subscribed message
// - Node1: keep sending message to itself
#[tokio::test]
async fn test_send_to_self() {
async fn test_send_and_broadcast_to_self() {
let keypair_1 = ed25519::Keypair::generate();
let address_1 = keypair_1.public().to_bytes();

let (node_1, message_receiver_1) = node(
keypair_1,
30013,
vec![],
vec![]
vec![Topic::HotStuffRsBroadcast]
).await;

let mut sending_limit = 10;
let mut sending_tick = tokio::time::interval(Duration::from_secs(1));
let mut receiving_tick = tokio::time::interval(Duration::from_secs(2));

let message = create_sync_req(1);
let broadcast_message = create_sync_req(1);
let send_message = create_sync_req(2);

loop {
tokio::select! {
//broadcast does not send to self
_ = sending_tick.tick() => {
node_1.send_hotstuff_rs_msg(address_1, message.clone());
node_1.broadcast_hotstuff_rs_msg(broadcast_message.clone());
if sending_limit == 0 { break }
sending_limit -= 1;
}
Expand All @@ -308,7 +309,29 @@ async fn test_send_to_self() {
if node1_received.is_ok() {
let (msg_orgin, msg) = node1_received.unwrap();
let msg_vec: Vec<u8> = msg.into();
assert_eq!(msg_vec, message.try_to_vec().unwrap());
assert_eq!(msg_vec, broadcast_message.try_to_vec().unwrap());
assert_eq!(msg_orgin, address_1);
break
}
}
}
}

message_receiver_1.try_iter().next();

loop {
tokio::select! {
_ = sending_tick.tick() => {
node_1.send_hotstuff_rs_msg(address_1, send_message.clone());
if sending_limit == 0 { break }
sending_limit -= 1;
}
_ = receiving_tick.tick() => {
let node1_received = message_receiver_1.try_recv();
if node1_received.is_ok() {
let (msg_orgin, msg) = node1_received.unwrap();
let msg_vec: Vec<u8> = msg.into();
assert_eq!(msg_vec, send_message.try_to_vec().unwrap());
assert_eq!(msg_orgin, address_1);
return
}
Expand Down