Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
990251c
mailbox feature
ong-jonas Oct 19, 2023
2396d8d
merge refactor into feature
ong-jonas Oct 27, 2023
8e73502
message ID no longer overridden
ong-jonas Oct 27, 2023
28d9103
removed engine.rs
ong-jonas Oct 27, 2023
d55849b
renamed SMR topics to hotstuff_rs
ong-jonas Oct 27, 2023
7da376a
added mailbox subscription feature
ong-jonas Oct 27, 2023
043800a
update network tests
ong-jonas Oct 27, 2023
dd322db
removed droppedtx message type
ong-jonas Oct 30, 2023
cde45ef
added different message handlers and updated hotstuff dependency to 0.3
ong-jonas Oct 30, 2023
0f945a7
updated config keypair to pchain type
ong-jonas Oct 31, 2023
0879474
added helper function in network tests
ong-jonas Oct 31, 2023
17673c0
mempool messages accepts V1 or V2
ong-jonas Nov 1, 2023
4a7c7aa
Update lib.rs
Nov 1, 2023
ef789f2
Fix: serialize TransactionV1 and TransactionV2 separately in messages.rs
Nov 1, 2023
725d99b
Update libp2p deprecated package
Nov 1, 2023
30d2dd4
update message serialization and handler channels
ong-jonas Nov 8, 2023
994834c
update message handlers to FnMut
Nov 9, 2023
f01c511
FIX: removed deprecated dependencies
ong-jonas Nov 16, 2023
54ffb64
REFACTOR: removed with_dns, added nodelay(true)
ong-jonas Nov 16, 2023
29ea7c7
REFACTOR: try_from to filter_gossipsub_messages, Vec<msg_handlers> to…
ong-jonas Nov 21, 2023
6b3543d
deps: update dependency to hotstuff_rs
ong-jonas Nov 30, 2023
5a7c61e
refactor: update pchain-types dependency version
ong-jonas Dec 1, 2023
8556c0f
Merge branch 'v0.5' into mailbox/feature
ong-jonas Dec 5, 2023
5b379a9
REFACTOR: documentation update to reflect single message handler
ong-jonas Dec 5, 2023
f66e4c2
refactor: moved is_close_peer function to peer.rs and updated PeerSta…
ong-jonas Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
mailbox feature
  • Loading branch information
ong-jonas committed Oct 19, 2023
commit 990251c6f0beb4cf418823e7cd552248f16ba351
21 changes: 15 additions & 6 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,23 @@ impl Behaviour {
Ok(())
}

/// Unsubscribe from [Topic]
pub fn unsubscribe(&mut self, topics: Vec<Topic>) -> Result<(), gossipsub::PublishError> {
for topic in topics {
self.gossip.unsubscribe(&topic.into())?;
}

Ok(())
}

/// Publish a [Message] to peers subscribed to the [Topic]
pub fn publish(&mut self, topic: Topic, msg: Message) -> Result<MessageId, PublishError> {
self.gossip.publish(topic.hash(), msg)
}

/// Check if the [gossipsub::Message] is subscribed by this peer
pub fn is_subscribed(&self, message: &gossipsub::Message) -> bool {
self.gossip.topics().any(|topic| message.topic.eq(topic))
/// Check if the [gossipsub::TopicHash] is subscribed by this peer
pub fn is_subscribed(&self, topic_hash: &gossipsub::TopicHash) -> bool {
self.gossip.topics().any(|topic| topic_hash.eq(topic))
}
}

Expand Down Expand Up @@ -293,7 +302,7 @@ mod test {
sequence_number: None,
topic: mailbox_topic_hash,
};
assert!(peer.behaviour.is_subscribed(&mailbox_topic_msg));
assert!(peer.behaviour.is_subscribed(&mailbox_topic_msg.topic));

let hotstuff_rs_msg = gossipsub::Message {
source: None,
Expand All @@ -313,7 +322,7 @@ mod test {
let subscribed_topics: Vec<&MessageTopicHash> = peer.behaviour.gossip.topics().collect();
assert_eq!(subscribed_topics.len(), 2); //including the initial subscribed topic

assert!(peer.behaviour.is_subscribed(&hotstuff_rs_msg));
assert!(!peer.behaviour.is_subscribed(&unsubscribed_msg));
assert!(peer.behaviour.is_subscribed(&hotstuff_rs_msg.topic));
assert!(!peer.behaviour.is_subscribed(&unsubscribed_msg.topic));
}
}
10 changes: 0 additions & 10 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,3 @@ pub struct Config {
/// Protocol name to communicate with other Kademlia nodes
pub kademlia_protocol_name: String,
}

// Returns a complete list of accepted topics in pchain-network
pub(crate) fn fullnode_topics(public_address: PublicAddress) -> Vec<Topic> {
vec![
Topic::HotStuffRsBroadcast,
Topic::HotStuffRsSend(public_address),
Topic::Mempool,
Topic::DroppedTxns,
]
}
81 changes: 54 additions & 27 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use futures::StreamExt;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed},
dns::TokioDnsConfig,
gossipsub::{self, TopicHash},
gossipsub::{self},
kad::KBucketKey,
Multiaddr,
identify, identity, noise,
swarm::{SwarmBuilder, SwarmEvent},
Expand All @@ -51,8 +52,7 @@ use std::time::Duration;

use crate::{
behaviour::{Behaviour, NetworkEvent},
config, conversions,
messages::Topic::{DroppedTxns, HotStuffRsBroadcast, HotStuffRsSend, Mempool},
conversions,
messages::{DroppedTxnMessage, Message, Topic},
peer::{EngineCommand, EngineError, Peer, PeerBuilder},
};
Expand Down Expand Up @@ -162,19 +162,15 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
conversions::PublicAddress::try_from(*src_peer_id)
{
let public_addr: PublicAddress = public_addr.into();
if swarm.behaviour().is_subscribed(&message) {
if swarm.behaviour().is_subscribed(&message.topic) {
// Send it to ourselves if we subscribed to this topic

if let Some(topic) = identify_topic(message.topic, local_public_address)
if let Ok(message) =
deserialize_message(message)
{
if let Ok(message) =
deserialize_message(message.data, topic)
{
peer.handlers.iter().for_each(|handler| {
handler(public_addr, message.clone())
});
}
}
peer.handlers.iter().for_each(|handler| {
handler(public_addr, message.clone())
});
}
}
}
}
Expand All @@ -187,9 +183,29 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
info.listen_addrs.iter().for_each(|a| {
swarm.behaviour_mut().add_address(&peer_id, a.clone());
});

// Subscribe to mailbox topic [HotStuffRsSend(PublicAddress)]
if let Ok(addr) = conversions::PublicAddress::try_from(peer_id) {
let public_addr: PublicAddress = addr.into();
let topic = Topic::HotStuffRsSend(public_addr);
if !swarm.behaviour().is_subscribed(&topic.clone().hash())
&& is_close_peer(&local_peer_id, &peer_id)
{
let _ = swarm.behaviour_mut().subscribe(vec![topic]);
}
}
}
SwarmEvent::ConnectionClosed { peer_id, .. } => {
swarm.behaviour_mut().remove_peer(&peer_id);

// Unsubscribe from mailbox topic [HotStuffRsSend(PublicAddress)]
if let Ok(addr) = conversions::PublicAddress::try_from(peer_id) {
let public_addr: PublicAddress = addr.into();
let topic = Topic::HotStuffRsSend(public_addr);
if swarm.behaviour().is_subscribed(&topic.clone().hash()) {
let _ = swarm.behaviour_mut().unsubscribe(vec![topic]);
}
}
}
_ => {}
}
Expand Down Expand Up @@ -222,29 +238,40 @@ async fn build_transport(
.boxed())
}

/// Identify the [crate::messages::Topic] of the message
fn identify_topic(topic_hash: TopicHash, local_public_address: PublicAddress) -> Option<Topic> {
config::fullnode_topics(local_public_address)
.into_iter()
.find(|t| t.clone().hash() == topic_hash)
}

/// Deserialize [libp2p::gossipsub::Message] into [crate::messages::Message]
fn deserialize_message(data: Vec<u8>, topic: Topic) -> Result<Message, std::io::Error> {
let mut data = data.as_slice();
fn deserialize_message(msg: gossipsub::Message) -> Result<Message, std::io::Error> {
let mut data = msg.data.as_slice();
let topic_hash = msg.topic.as_str();

match topic {
HotStuffRsBroadcast | HotStuffRsSend(_) => {
match topic_hash {
"consensus" => {
hotstuff_rs::messages::Message::deserialize(&mut data).map(Message::HotStuffRs)
}
Mempool => {
"mempool" => {
pchain_types::blockchain::TransactionV1::deserialize(&mut data).map(Message::Mempool)
}
DroppedTxns => DroppedTxnMessage::deserialize(&mut data).map(Message::DroppedTxns),
"droppedTx" => {
DroppedTxnMessage::deserialize(&mut data).map(Message::DroppedTxns)
},
_ => {
hotstuff_rs::messages::Message::deserialize(&mut data).map(Message::HotStuffRs)
},
}
}

/// Convert ip address [std::net::Ipv4Addr] and port [u16] into MultiAddr [libp2p::Multiaddr] type
fn multi_addr(ip_address: Ipv4Addr, port: u16) -> Multiaddr {
format!("/ip4/{}/tcp/{}", ip_address, port).parse().unwrap()
}

/// Check the distance between 2 peers. Subscribe to new peer's mailbox topic
/// if the distance is below 255
fn is_close_peer(peer_1: &PeerId, peer_2: &PeerId) -> bool {
let peer_1_key = KBucketKey::from(*peer_1);
let peer_2_key = KBucketKey::from(*peer_2);
// returns the distance in base2 logarithm ranging from 0 - 256
let distance = KBucketKey::distance(&peer_1_key, &peer_2_key)
.ilog2()
.unwrap_or(0);
distance < 255
}
50 changes: 40 additions & 10 deletions tests/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::{net::Ipv4Addr, sync::mpsc, time::Duration};

use borsh::BorshSerialize;
use hotstuff_rs::messages::SyncRequest;
use libp2p::identity::ed25519::{Keypair, self};
use libp2p::identity::{ed25519::{Keypair, self}, PublicKey};
use libp2p::kad::KBucketKey;
use pchain_network::peer::PeerBuilder;
use pchain_network::{
config::Config,
Expand Down Expand Up @@ -139,18 +140,26 @@ async fn test_send_to() {
}

// - Network: Node1, Node2, Node3
// - Node2 and Node3 are close peers to subscribe to each other's mailbox
// - Node1: set Node2 and Node3 as bootnode, keep sending message to Node2 only
// - Node2: set Node1 and Node3 as bootnode, listens to subscribed topics
// - Node3: set Node1 and Node2 as bootnode, should not process any message
// - Node3: set Node1 and Node2 as bootnode, should process message meant for Node2
#[tokio::test]
async fn test_send_to_only_specific_receiver() {
async fn test_mailbox_subscription() {

let keypair_1 = ed25519::Keypair::generate();
let address_1 = keypair_1.public().to_bytes();

let keypair_2 = ed25519::Keypair::generate();
let address_2 = keypair_2.public().to_bytes();

let keypair_3 = ed25519::Keypair::generate();
let mut keypair_3 = ed25519::Keypair::generate();

// generate new node 3 until it is a close peer of node 2
while !is_close_peer(keypair_2.public(), keypair_3.public()) {
keypair_3 = ed25519::Keypair::generate();
}

let address_3 = keypair_3.public().to_bytes();

let (node_1, _message_receiver_1) = node(
Expand Down Expand Up @@ -179,26 +188,33 @@ async fn test_send_to_only_specific_receiver() {
)
.await;

let mut sending_limit = 10;
let mut sending_limit = 20;
let mut sending_tick = tokio::time::interval(Duration::from_secs(1));
let mut receiving_tick = tokio::time::interval(Duration::from_secs(2));

let message = create_sync_req(1);

loop {
tokio::select! {
_ = sending_tick.tick() => {
node_1.send_hotstuff_rs_msg(address_2, create_sync_req(1));
node_1.send_hotstuff_rs_msg(address_2, message.clone());

if sending_limit == 0 { break }
sending_limit -= 1;
}
_ = receiving_tick.tick() => {
let node3_received = message_receiver_3.try_recv().is_ok();
if node3_received {
panic!("Wrong recipient");
}
let node3_received = message_receiver_3.try_recv();
if node3_received.is_ok() {
let (msg_orgin, msg) = node3_received.unwrap();
let msg_vec: Vec<u8> = msg.into();
assert_eq!(msg_vec, message.try_to_vec().unwrap());
assert_eq!(msg_orgin, address_1);
return
}
}
}
}
panic!("Node3 did not receive mailbox message!")
}

// - Network: Node1, Node2, Node3
Expand Down Expand Up @@ -448,3 +464,17 @@ pub async fn node(

(peer, rx)
}

fn is_close_peer(public_key1: ed25519::PublicKey, public_key2: ed25519::PublicKey) -> bool {
let peer_1: PublicKey = public_key1.into();
let peer_2: PublicKey = public_key2.into();
let peer_1 = peer_1.to_peer_id();
let peer_2 = peer_2.to_peer_id();
let peer_1_key = KBucketKey::from(peer_1);
let peer_2_key = KBucketKey::from(peer_2);
// returns the distance in base2 logarithm ranging from 0 - 256
let distance = KBucketKey::distance(&peer_1_key, &peer_2_key)
.ilog2()
.unwrap_or(0);
distance < 255
}