Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
implement tryfrom gossip_message to pchain_message and moved definiti…
…ons to conversions
  • Loading branch information
ong-jonas committed Oct 19, 2023
commit cf2ab28bd881438c88661c8589f29cbd12a82f30
7 changes: 7 additions & 0 deletions src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
//! - TryFrom<[PublicAddress]> for [PeerId]

use libp2p::identity::{self, ed25519, DecodingError, OtherVariantError, PeerId};
use libp2p::Multiaddr;
use std::net::Ipv4Addr;

/// PublicAddress(PublicAddress) is wrapper around [PublicAddress](pchain_types::cryptography::PublicAddress).
pub struct PublicAddress(pchain_types::cryptography::PublicAddress);
Expand Down Expand Up @@ -64,6 +66,11 @@ impl From<DecodingError> for ConversionError {
}
}

/// Convert ip address [std::net::Ipv4Addr] and port [u16] into MultiAddr [libp2p::Multiaddr] type
pub fn multi_addr(ip_address: Ipv4Addr, port: u16) -> Multiaddr {
format!("/ip4/{}/tcp/{}", ip_address, port).parse().unwrap()
}

#[cfg(test)]

mod test {
Expand Down
56 changes: 11 additions & 45 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@
//! the thread.
//!

use borsh::BorshDeserialize;
use futures::StreamExt;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed},
dns::TokioDnsConfig,
gossipsub::{self, TopicHash},
Multiaddr,
gossipsub,
identify, identity, noise,
swarm::{SwarmBuilder, SwarmEvent},
tcp, yamux, PeerId, Transport,
Expand All @@ -51,9 +49,8 @@ use std::time::Duration;

use crate::{
behaviour::{Behaviour, NetworkEvent},
config, conversions,
messages::Topic::{DroppedTxns, HotStuffRsBroadcast, HotStuffRsSend, Mempool},
messages::{DroppedTxnMessage, Message, Topic},
conversions,
messages::{Message, Topic},
peer::{EngineCommand, EngineError, Peer, PeerBuilder},
};

Expand All @@ -78,7 +75,7 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
);

let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();
let multiaddr = multi_addr(
let multiaddr = conversions::multi_addr(
Ipv4Addr::new(0, 0, 0, 0),
config.listening_port
);
Expand All @@ -87,7 +84,7 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
// 2. Connection to bootstrap nodes
if !config.boot_nodes.is_empty() {
config.boot_nodes.iter().for_each(|peer_info| {
let multiaddr = multi_addr(peer_info.1, peer_info.2);
let multiaddr = conversions::multi_addr(peer_info.1, peer_info.2);
if let Ok(peer_id) = &conversions::PublicAddress::new(peer_info.0).try_into() {
swarm
.behaviour_mut()
Expand Down Expand Up @@ -164,17 +161,13 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
let public_addr: PublicAddress = public_addr.into();
if swarm.behaviour().is_subscribed(&message) {
// Send it to ourselves if we subscribed to this topic

if let Some(topic) = identify_topic(message.topic, local_public_address)
if let Ok(pchain_message) =
Message::try_from(message)
{
if let Ok(message) =
deserialize_message(message.data, topic)
{
peer.handlers.iter().for_each(|handler| {
handler(public_addr, message.clone())
});
}
}
peer.handlers.iter().for_each(|handler| {
handler(public_addr, pchain_message.clone())
});
}
}
}
}
Expand Down Expand Up @@ -221,30 +214,3 @@ async fn build_transport(
.timeout(std::time::Duration::from_secs(20))
.boxed())
}

/// Identify the [crate::messages::Topic] of the message
fn identify_topic(topic_hash: TopicHash, local_public_address: PublicAddress) -> Option<Topic> {
config::fullnode_topics(local_public_address)
.into_iter()
.find(|t| t.clone().hash() == topic_hash)
}

/// Deserialize [libp2p::gossipsub::Message] into [crate::messages::Message]
fn deserialize_message(data: Vec<u8>, topic: Topic) -> Result<Message, std::io::Error> {
let mut data = data.as_slice();

match topic {
HotStuffRsBroadcast | HotStuffRsSend(_) => {
hotstuff_rs::messages::Message::deserialize(&mut data).map(Message::HotStuffRs)
}
Mempool => {
pchain_types::blockchain::TransactionV1::deserialize(&mut data).map(Message::Mempool)
}
DroppedTxns => DroppedTxnMessage::deserialize(&mut data).map(Message::DroppedTxns),
}
}

/// Convert ip address [std::net::Ipv4Addr] and port [u16] into MultiAddr [libp2p::Multiaddr] type
fn multi_addr(ip_address: Ipv4Addr, port: u16) -> Multiaddr {
format!("/ip4/{}/tcp/{}", ip_address, port).parse().unwrap()
}
26 changes: 24 additions & 2 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//! in the pchain-network.
//!

use borsh::BorshSerialize;
use borsh::{BorshSerialize, BorshDeserialize};
use libp2p::gossipsub::IdentTopic;
use pchain_types::{
blockchain::TransactionV1,
Expand Down Expand Up @@ -51,7 +51,7 @@ impl From<Topic> for IdentTopic {
}

/// [Message] are structured messages that are sent between ParallelChain Network Peers.
#[derive(Clone)]
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub enum Message {
HotStuffRs(hotstuff_rs::messages::Message),
Mempool(TransactionV1),
Expand All @@ -68,6 +68,28 @@ impl From<Message> for Vec<u8> {
}
}

impl TryFrom<libp2p::gossipsub::Message> for Message {
type Error = std::io::Error;

fn try_from(message: libp2p::gossipsub::Message) -> Result<Message, std::io::Error> {
let (topic_hash, data) = (message.topic, message.data);
match topic_hash.as_str() {
"consensus" => {
hotstuff_rs::messages::Message::deserialize(&mut data.as_slice()).map(Message::HotStuffRs)
}
"mempool" => {
pchain_types::blockchain::TransactionV1::deserialize(&mut data.as_slice()).map(Message::Mempool)
}
"droppedTx" => {
DroppedTxnMessage::deserialize(&mut data.as_slice()).map(Message::DroppedTxns)
}
_ => {
hotstuff_rs::messages::Message::deserialize(&mut data.as_slice()).map(Message::HotStuffRs)
}
}
}
}

/// [DroppedTxnMessage] defines message content for [Message::DroppedTxns].
#[derive(Clone, borsh::BorshSerialize, borsh::BorshDeserialize)]
pub enum DroppedTxnMessage {
Expand Down