Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
//! - TryFrom<[PublicAddress]> for [PeerId]

use libp2p::identity::{self, ed25519, DecodingError, OtherVariantError, PeerId};
use libp2p::Multiaddr;
use std::net::Ipv4Addr;

/// PublicAddress(PublicAddress) is wrapper around [PublicAddress](pchain_types::cryptography::PublicAddress).
pub struct PublicAddress(pchain_types::cryptography::PublicAddress);
Expand Down Expand Up @@ -64,6 +66,11 @@ impl From<DecodingError> for ConversionError {
}
}

/// Convert ip address [std::net::Ipv4Addr] and port [u16] into MultiAddr [libp2p::Multiaddr] type
pub fn multi_addr(ip_address: Ipv4Addr, port: u16) -> Multiaddr {
format!("/ip4/{}/tcp/{}", ip_address, port).parse().unwrap()
}

#[cfg(test)]

mod test {
Expand Down
91 changes: 44 additions & 47 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@
//! the thread.
//!

use borsh::BorshDeserialize;
use futures::StreamExt;
use tokio::task::JoinHandle;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed},
dns::TokioDnsConfig,
gossipsub::{self, TopicHash},
Multiaddr,
gossipsub,
identify, identity, noise,
swarm::{SwarmBuilder, SwarmEvent},
tcp, yamux, PeerId, Transport,
Expand All @@ -51,16 +50,16 @@ use std::time::Duration;

use crate::{
behaviour::{Behaviour, NetworkEvent},
config, conversions,
messages::Topic::{DroppedTxns, HotStuffRsBroadcast, HotStuffRsSend, Mempool},
messages::{DroppedTxnMessage, Message, Topic},
peer::{EngineCommand, EngineError, Peer, PeerBuilder},
conversions,
messages::{Message, Topic},
peer::EngineCommand,
config::Config,
};

/// [start] p2p networking peer and return the handle [Peer] of this process.
pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
let config = peer.config;

/// [start] p2p networking peer and return the handle of this process.
pub(crate) async fn start(config: Config, message_handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>>)
-> Result<(JoinHandle<()>,tokio::sync::mpsc::Sender<EngineCommand>), EngineStartError> {
let local_keypair = config.keypair;
let local_public_address: PublicAddress = local_keypair.public().to_bytes();
let local_peer_id = identity::Keypair::from(local_keypair.clone())
Expand All @@ -78,7 +77,7 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
);

let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();
let multiaddr = multi_addr(
let multiaddr = conversions::multi_addr(
Ipv4Addr::new(0, 0, 0, 0),
config.listening_port
);
Expand All @@ -87,7 +86,7 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
// 2. Connection to bootstrap nodes
if !config.boot_nodes.is_empty() {
config.boot_nodes.iter().for_each(|peer_info| {
let multiaddr = multi_addr(peer_info.1, peer_info.2);
let multiaddr = conversions::multi_addr(peer_info.1, peer_info.2);
if let Ok(peer_id) = &conversions::PublicAddress::new(peer_info.0).try_into() {
swarm
.behaviour_mut()
Expand Down Expand Up @@ -136,11 +135,11 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
log::info!("Publishing (Topic: {:?})", topic);
if topic == Topic::HotStuffRsSend(local_public_address) {
// send to myself
peer.handlers.iter()
message_handlers.iter()
.for_each(|handler| handler(local_public_address, message.clone()));
}
else if let Err(e) = swarm.behaviour_mut().publish(topic, message) {
log::debug!("Failed to pulish the message. {:?}", e);
log::debug!("Failed to publish the message. {:?}", e);
}
}
EngineCommand::Shutdown => {
Expand All @@ -164,17 +163,13 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
let public_addr: PublicAddress = public_addr.into();
if swarm.behaviour().is_subscribed(&message) {
// Send it to ourselves if we subscribed to this topic

if let Some(topic) = identify_topic(message.topic, local_public_address)
if let Ok(pchain_message) =
Message::try_from(message)
{
if let Ok(message) =
deserialize_message(message.data, topic)
{
peer.handlers.iter().for_each(|handler| {
handler(public_addr, message.clone())
});
}
}
message_handlers.iter().for_each(|handler| {
handler(public_addr, pchain_message.clone())
});
}
}
}
}
Expand All @@ -197,10 +192,9 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
}
});

Ok(Peer {
engine: network_thread_handle,
to_engine: sender,
})
Ok(
(network_thread_handle, sender)
)
}

async fn build_transport(
Expand All @@ -222,29 +216,32 @@ async fn build_transport(
.boxed())
}

/// Identify the [crate::messages::Topic] of the message
fn identify_topic(topic_hash: TopicHash, local_public_address: PublicAddress) -> Option<Topic> {
config::fullnode_topics(local_public_address)
.into_iter()
.find(|t| t.clone().hash() == topic_hash)
#[derive(Debug)]
pub enum EngineStartError {
/// Failed to read from system configuration path
SystemConfigError(std::io::Error),

/// Failed to subscribe to a topic on gossipsub
SubscriptionError(libp2p::gossipsub::SubscriptionError),

/// Swarm failed to listen on an unsupported address
UnsupportedAddressError(libp2p::TransportError<std::io::Error>),
}

/// Deserialize [libp2p::gossipsub::Message] into [crate::messages::Message]
fn deserialize_message(data: Vec<u8>, topic: Topic) -> Result<Message, std::io::Error> {
let mut data = data.as_slice();
impl From<std::io::Error> for EngineStartError {
fn from(error: std::io::Error) -> EngineStartError {
EngineStartError::SystemConfigError(error)
}
}

match topic {
HotStuffRsBroadcast | HotStuffRsSend(_) => {
hotstuff_rs::messages::Message::deserialize(&mut data).map(Message::HotStuffRs)
}
Mempool => {
pchain_types::blockchain::TransactionV1::deserialize(&mut data).map(Message::Mempool)
}
DroppedTxns => DroppedTxnMessage::deserialize(&mut data).map(Message::DroppedTxns),
impl From<libp2p::gossipsub::SubscriptionError> for EngineStartError {
fn from(error: libp2p::gossipsub::SubscriptionError) -> EngineStartError {
EngineStartError::SubscriptionError(error)
}
}

/// Convert ip address [std::net::Ipv4Addr] and port [u16] into MultiAddr [libp2p::Multiaddr] type
fn multi_addr(ip_address: Ipv4Addr, port: u16) -> Multiaddr {
format!("/ip4/{}/tcp/{}", ip_address, port).parse().unwrap()
impl From<libp2p::TransportError<std::io::Error>> for EngineStartError {
fn from(error: libp2p::TransportError<std::io::Error>) -> EngineStartError {
EngineStartError::UnsupportedAddressError(error)
}
}
26 changes: 24 additions & 2 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//! in the pchain-network.
//!

use borsh::BorshSerialize;
use borsh::{BorshSerialize, BorshDeserialize};
use libp2p::gossipsub::IdentTopic;
use pchain_types::{
blockchain::TransactionV1,
Expand Down Expand Up @@ -51,7 +51,7 @@ impl From<Topic> for IdentTopic {
}

/// [Message] are structured messages that are sent between ParallelChain Network Peers.
#[derive(Clone)]
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub enum Message {
HotStuffRs(hotstuff_rs::messages::Message),
Mempool(TransactionV1),
Expand All @@ -68,6 +68,28 @@ impl From<Message> for Vec<u8> {
}
}

impl TryFrom<libp2p::gossipsub::Message> for Message {
type Error = std::io::Error;

fn try_from(message: libp2p::gossipsub::Message) -> Result<Message, std::io::Error> {
let (topic_hash, data) = (message.topic, message.data);
match topic_hash.as_str() {
"consensus" => {
hotstuff_rs::messages::Message::deserialize(&mut data.as_slice()).map(Message::HotStuffRs)
}
"mempool" => {
pchain_types::blockchain::TransactionV1::deserialize(&mut data.as_slice()).map(Message::Mempool)
}
"droppedTx" => {
DroppedTxnMessage::deserialize(&mut data.as_slice()).map(Message::DroppedTxns)
}
_ => {
hotstuff_rs::messages::Message::deserialize(&mut data.as_slice()).map(Message::HotStuffRs)
}
}
}
}

/// [DroppedTxnMessage] defines message content for [Message::DroppedTxns].
#[derive(Clone, borsh::BorshSerialize, borsh::BorshDeserialize)]
pub enum DroppedTxnMessage {
Expand Down
68 changes: 9 additions & 59 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,36 +35,6 @@ use tokio::task::JoinHandle;
use crate::config::Config;
use crate::messages::{DroppedTxnMessage, Message, Topic};

/// The builder struct for constructing a [Peer].
pub struct PeerBuilder {
/// Configurations of the peer
pub config: Config,

/// Message handler to process received messages from the network
pub handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>>,
}

impl PeerBuilder {
pub fn new(config: Config) -> PeerBuilder {
PeerBuilder {
config,
handlers: vec![],
}
}

pub fn on_receive_msg(
mut self,
handler: impl Fn(PublicAddress, Message) + Send + 'static,
) -> PeerBuilder {
self.handlers.push(Box::new(handler));
self
}

/// Constructs a [Peer] from the given configuration and handlers, and start the thread for the p2p network.
pub async fn build(self) -> Result<Peer, EngineError> {
crate::engine::start(self).await
}
}

pub struct Peer {
/// Network handle for the [tokio::task] which is the main thread for the
Expand All @@ -76,6 +46,15 @@ pub struct Peer {
}

impl Peer {
/// Constructs a [Peer] from the given configuration and handlers, and start the thread for the p2p network.
pub async fn start(config: Config, handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>>) -> Peer {
let (engine, to_engine) = crate::engine::start(config, handlers).await.unwrap();
Peer {
engine,
to_engine,
}
}

pub fn broadcast_mempool_msg(&self, txn: TransactionV1) {
let _ = self.to_engine.try_send(EngineCommand::Publish(
Topic::Mempool,
Expand Down Expand Up @@ -122,32 +101,3 @@ pub(crate) enum EngineCommand {
Shutdown,
}

#[derive(Debug)]
pub enum EngineError {
/// Failed to read from system configuration path
SystemConfigError(std::io::Error),

/// Failed to subscribe to a topic on gossipsub
SubscriptionError(libp2p::gossipsub::SubscriptionError),

/// Swarm failed to listen on an unsupported address
UnsupportedAddressError(libp2p::TransportError<std::io::Error>),
}

impl From<std::io::Error> for EngineError {
fn from(error: std::io::Error) -> EngineError {
EngineError::SystemConfigError(error)
}
}

impl From<libp2p::gossipsub::SubscriptionError> for EngineError {
fn from(error: libp2p::gossipsub::SubscriptionError) -> EngineError {
EngineError::SubscriptionError(error)
}
}

impl From<libp2p::TransportError<std::io::Error>> for EngineError {
fn from(error: libp2p::TransportError<std::io::Error>) -> EngineError {
EngineError::UnsupportedAddressError(error)
}
}
12 changes: 5 additions & 7 deletions tests/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ use std::{net::Ipv4Addr, sync::mpsc, time::Duration};
use borsh::BorshSerialize;
use hotstuff_rs::messages::SyncRequest;
use libp2p::identity::ed25519::{Keypair, self};
use pchain_network::peer::PeerBuilder;
use pchain_network::peer::Peer;
use pchain_network::{
config::Config,
messages::{Topic, Message},
peer::Peer,
};
use pchain_types::{blockchain::TransactionV1, cryptography::PublicAddress};

Expand Down Expand Up @@ -440,11 +439,10 @@ pub async fn node(
let _ = message_sender.send((msg_origin, msg));
};

let peer = PeerBuilder::new(config)
.on_receive_msg(message_handler)
.build()
.await
.unwrap();
let mut message_handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>> = vec![];
message_handlers.push(Box::new(message_handler));

let peer = Peer::start(config, message_handlers).await;

(peer, rx)
}