Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ mod test {
}
}

// TODO jonas: please verify if theres a better way to check if a peer is added
#[test]
fn test_add_and_remove_peer() {
let mut peer1 = create_new_peer();
Expand All @@ -258,23 +257,27 @@ mod test {
.behaviour
.add_address(&peer2.peer_id, peer2.multi_addr);

let peer_num: usize = peer1
let peer1_kbuckets = peer1
.behaviour
.kad
.kbuckets()
.map(|x| x.num_entries())
.sum();
assert_eq!(peer_num, 1);

.find(|entry|
entry.iter().find(|bucket|
*bucket.node.key.preimage() == peer2.peer_id).is_some());

assert!(peer1_kbuckets.is_some());

peer1.behaviour.remove_peer(&peer2.peer_id);

let peer_num: usize = peer1
let peer1_kbuckets = peer1
.behaviour
.kad
.kbuckets()
.map(|x| x.num_entries())
.sum();
assert_eq!(peer_num, 0);
.find(|entry|
entry.iter().find(|bucket|
*bucket.node.key.preimage() == peer2.peer_id).is_some());

assert!(peer1_kbuckets.is_none());
}

#[test]
Expand Down
8 changes: 3 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
//! };
//! ```
//!
use libp2p::{
identity::{ed25519::Keypair, PeerId},
Multiaddr,
};
use std::net::Ipv4Addr;
use libp2p::identity::ed25519::Keypair;
use pchain_types::cryptography::PublicAddress;

use crate::messages::Topic;
Expand All @@ -39,7 +37,7 @@ pub struct Config {
pub listening_port: u16,

/// Bootstrap nodes for the initial connection
pub boot_nodes: Vec<(PeerId, Multiaddr)>,
pub boot_nodes: Vec<(PublicAddress, Ipv4Addr, u16)>,

/// Buffer size of outgoing messages
pub outgoing_msgs_buffer_capacity: usize,
Expand Down
34 changes: 22 additions & 12 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed},
dns::TokioDnsConfig,
gossipsub::{self, TopicHash},
Multiaddr,
identify, identity, noise,
swarm::{SwarmBuilder, SwarmEvent},
tcp, yamux, PeerId, Transport,
Expand Down Expand Up @@ -89,9 +90,12 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
// 2. Connection to bootstrap nodes
if !config.boot_nodes.is_empty() {
config.boot_nodes.iter().for_each(|peer_info| {
swarm
let multiaddr = multi_addr(peer_info.1, peer_info.2);
if let Ok(peer_id) = &conversions::PublicAddress::new(peer_info.0).try_into() {
swarm
.behaviour_mut()
.add_address(&peer_info.0, peer_info.1.clone());
.add_address(peer_id, multiaddr);
}
});
}

Expand Down Expand Up @@ -134,11 +138,10 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
log::info!("Publishing (Topic: {:?})", topic);
if topic == Topic::HotStuffRsSend(local_public_address) {
// send to myself
let _ = peer
.handlers
.iter()
.map(|handler| handler(local_public_address, message.clone()));
} else if let Err(e) = swarm.behaviour_mut().publish(topic, message) {
peer.handlers.iter()
.for_each(|handler| handler(local_public_address, message.clone()));
}
else if let Err(e) = swarm.behaviour_mut().publish(topic, message) {
log::debug!("Failed to pulish the message. {:?}", e);
}
}
Expand All @@ -163,13 +166,14 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
let public_addr: PublicAddress = public_addr.into();
if swarm.behaviour().is_subscribed(&message) {
// Send it to ourselves if we subscribed to this topic
if let Some(topic) = identify_topic(message.topic, public_addr)

if let Some(topic) = identify_topic(message.topic, local_public_address)
{
if let Ok(message) =
deserialize_message(message.data, topic)
{
let _ = peer.handlers.iter().map(|handler| {
handler(local_public_address, message.clone())
let _ = peer.handlers.iter().for_each(|handler| {
handler(public_addr, message.clone())
});
}
}
Expand Down Expand Up @@ -221,8 +225,9 @@ async fn build_transport(
}

/// Identify the [crate::messages::Topic] of the message
fn identify_topic(topic_hash: TopicHash, public_addr: PublicAddress) -> Option<Topic> {
config::fullnode_topics(public_addr)
fn identify_topic(topic_hash: TopicHash, local_public_address: PublicAddress) -> Option<Topic> {
//address for fullnode_topics should be the local peer address
config::fullnode_topics(local_public_address)
.into_iter()
.find(|t| t.clone().hash() == topic_hash)
}
Expand All @@ -241,3 +246,8 @@ fn deserialize_message(data: Vec<u8>, topic: Topic) -> Result<Message, std::io::
DroppedTxns => DroppedTxnMessage::deserialize(&mut data).map(Message::DroppedTxns),
}
}

/// Convert [std::net::Ipv4Addr] and port [u16] into MultiAddr[libp2p::Multiaddr] type
fn multi_addr(ip_address: Ipv4Addr, port: u16) -> Multiaddr {
format!("/ip4/{}/tcp/{}", ip_address, port).parse().unwrap()
}
40 changes: 27 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,41 @@
//!
//! ```no_run
//! use crate::Config;
//! use crate::message_gate::MessageGateChain;
//! use crate::peer:PeerBuilder;
//! use crate::messages::Message;
//!
//!
//! // 1. Build a configuration.
//! let config = Config::default();
//!
//! // 2. Create message gate chain.
//! let gates = MessageGateChain::new()
//! // .append(message_gate1)
//! // .append(message_gate2)
//! // ...
//!
//! let config = Config {
//! keypair,
//! topics_to_subscribe,
//! listening_port,
//! boot_nodes,
//! outgoing_msgs_buffer_capacity, // 8
//! incoming_msgs_buffer_capacity, // 10
//! peer_discovery_interval, // 10
//! kademlia_protocol_name // "/pchain_p2p/1.0.0"
//! };
//!
//! // 2. Create a message handler
//! let (tx, rx) = mpsc::channel();
//! let message_sender = tx.clone();
//! let message_handler = move |msg_orgin: [u8;32], msg: Message| {
//! let _ = message_sender.send((msg_origin, msg));
//! };
//!
//! // 3. Start P2P network.
//! let network = pchain_network::NetworkHandle::start(network_config, subscribe_topics, message_gate_chain).await;
//!
//! let peer = PeerBuilder::new(config)
//! .on_receive_msg(message_handler)
//! .build()
//! .await
//! .unwrap();
//!
//! // 4. Send out messages.
//! network.broadcast_mempool_tx_msg(txn);
//! peer.broadcast_mempool_msg(txn);
//!
//!

// TODO jonas: update example usage above

pub mod behaviour;

Expand Down
6 changes: 4 additions & 2 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ impl PeerBuilder {
}

pub fn on_receive_msg(
&mut self,
mut self,
handler: impl Fn(PublicAddress, Message) + Send + 'static,
) -> &mut Self {
) -> PeerBuilder {
self.handlers.push(Box::new(handler));
self
}
Expand Down Expand Up @@ -121,6 +121,8 @@ pub(crate) enum EngineCommand {
Publish(Topic, Message),
Shutdown,
}

#[derive(Debug)]
pub enum EngineError {
/// Failed to read from system configuration path
SystemConfigError(std::io::Error),
Expand Down
Loading