Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'refactor' into tests/refactor
  • Loading branch information
ong-jonas committed Oct 16, 2023
commit 2a699f254ff2cf6d1df098fdd09b3e87f232b3c4
28 changes: 14 additions & 14 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,9 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
if topic == Topic::HotStuffRsSend(local_public_address) {
// send to myself
peer.handlers.iter()
.for_each(|handler| handler(local_public_address,message.clone()));

} else if let Err(e) = swarm.behaviour_mut().publish(topic, message)
{
.for_each(|handler| handler(local_public_address, message.clone()));
}
else if let Err(e) = swarm.behaviour_mut().publish(topic, message) {
log::debug!("Failed to pulish the message. {:?}", e);
}
}
Expand All @@ -163,14 +162,15 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
let public_addr: PublicAddress = public_addr.into();
if swarm.behaviour().is_subscribed(&message) {
// Send it to ourselves if we subscribed to this topic
match identify_topic(message.topic, local_public_address) {
Some(topic) => {
if let Ok(message) =
deserialize_message(message.data, topic)
{
peer.handlers.iter()
.for_each(|handler| handler(public_addr,message.clone()));
}

if let Some(topic) = identify_topic(message.topic, local_public_address)
{
if let Ok(message) =
deserialize_message(message.data, topic)
{
let _ = peer.handlers.iter().for_each(|handler| {
handler(public_addr, message.clone())
});
}
}
}
Expand Down Expand Up @@ -221,9 +221,9 @@ async fn build_transport(
}

/// Identify the [crate::messages::Topic] of the message
fn identify_topic(topic_hash: TopicHash, public_addr: PublicAddress) -> Option<Topic> {
fn identify_topic(topic_hash: TopicHash, local_public_address: PublicAddress) -> Option<Topic> {
//address for fullnode_topics should be the local peer address
config::fullnode_topics(public_addr)
config::fullnode_topics(local_public_address)
.into_iter()
.find(|t| t.clone().hash() == topic_hash)
}
Expand Down
11 changes: 4 additions & 7 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,11 @@ impl PeerBuilder {
}
}

pub fn configuration(mut self, config: Config) -> PeerBuilder {
self.config = Some(config);
self
}

pub fn on_receive_msg(
mut self,
handlers: impl Fn(PublicAddress, Message) + Send + 'static,
handler: impl Fn(PublicAddress, Message) + Send + 'static,
) -> PeerBuilder {
self.handlers = Some(Box::new(handlers));
self.handlers.push(Box::new(handler));
self
}

Expand Down Expand Up @@ -126,6 +121,8 @@ pub(crate) enum EngineCommand {
Publish(Topic, Message),
Shutdown,
}

#[derive(Debug)]
pub enum EngineError {
/// Failed to read from system configuration path
SystemConfigError(std::io::Error),
Expand Down
91 changes: 16 additions & 75 deletions tests/network.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::{net::Ipv4Addr, sync::{Arc, mpsc}, time::Duration};
use std::{net::Ipv4Addr, sync::mpsc, time::Duration};

use async_trait::async_trait;
use borsh::BorshSerialize;
use futures::{lock::Mutex, channel::mpsc::Receiver};
use hotstuff_rs::messages::SyncRequest;
use libp2p::{Multiaddr, PeerId, identity};
use libp2p::identity::ed25519::{Keypair, self};
Expand Down Expand Up @@ -35,7 +33,6 @@ fn create_sync_req(start_height: u64) -> hotstuff_rs::messages::Message {
hotstuff_rs::messages::Message::SyncMessage(test_message)
}

//TODO jonas update tests
// - Network: Node1, Node2
// - Node1: keep broadcasting Mempool topic message
// - Node2: set Node1 as bootnode, listens to subscribed topics
Expand All @@ -51,7 +48,7 @@ async fn test_broadcast() {
keypair_1,
30001,
vec![],
vec![Topic::Mempool]
vec![]
).await;

let (_node_2, message_receiver_2) = node(
Expand Down Expand Up @@ -82,9 +79,7 @@ async fn test_broadcast() {
assert_eq!(msg_vec, Vec::from(message.clone()));
assert_eq!(msg_origin, address_1);
return
} else {
continue
}
}
}
}
}
Expand All @@ -107,16 +102,15 @@ async fn test_send_to() {
keypair_1,
30003,
vec![],
vec![Topic::HotStuffRsSend(address_1)])
.await;
vec![]
).await;

let (_node_2, message_receiver_2) = node(
keypair_2,
30004,
vec![(peerid_1, multiaddr(Ipv4Addr::new(127, 0, 0, 1), 30003))],
vec![Topic::HotStuffRsSend(address_2), Topic::HotStuffRsBroadcast]
)
.await;
vec![Topic::HotStuffRsSend(address_2)]
).await;

let mut sending_limit = 10;
let mut sending_tick = tokio::time::interval(Duration::from_secs(1));
Expand All @@ -139,9 +133,7 @@ async fn test_send_to() {
assert_eq!(msg_vec, message.try_to_vec().unwrap());
assert_eq!(msg_orgin, address_1);
return
} else {
continue
}
}
}
}
}
Expand Down Expand Up @@ -275,10 +267,8 @@ async fn test_sparse_messaging() {
let (node3_message_origin, node3_message) = node3_received.unwrap();
let node1_message_vec: Vec<u8> = node1_message.into();
let node3_message_vec: Vec<u8> = node3_message.into();
let message_to_node1 = message_to_node1.try_to_vec().unwrap();
let message_to_node3 = message_to_node3.try_to_vec().unwrap();
assert_eq!(node1_message_vec, message_to_node1);
assert_eq!(node3_message_vec, message_to_node3);
assert_eq!(node1_message_vec, message_to_node1.try_to_vec().unwrap());
assert_eq!(node3_message_vec, message_to_node3.try_to_vec().unwrap());
assert_eq!(node1_message_origin, address_3);
assert_eq!(node3_message_origin, address_1);
return;
Expand Down Expand Up @@ -404,10 +394,11 @@ async fn test_stopped_node() {
keypair_2,
30017,
vec![(peerid_1, multiaddr(Ipv4Addr::new(127, 0, 0, 1), 30016))],
vec![Topic::HotStuffRsSend(address_2), Topic::HotStuffRsBroadcast]
vec![Topic::HotStuffRsSend(address_2)]
)
.await;

// Stop node by EngineCommand::Shutdown
drop(node_2);

let mut sending_limit = 10;
Expand All @@ -427,9 +418,7 @@ async fn test_stopped_node() {
let node2_received = message_receiver_2.try_recv();
if node2_received.is_ok() {
panic!("node 2 should not receive messages!")
} else {
continue
}
}
}
}
}
Expand Down Expand Up @@ -461,11 +450,11 @@ pub async fn node(
message_sender.send((msg_origin, msg)).unwrap();
};

let peer = PeerBuilder::new()
.configuration(config)
let peer = PeerBuilder::new(config)
.on_receive_msg(message_handler)
.build()
.await;
.await
.unwrap();

(peer, rx)
}
Expand All @@ -474,51 +463,3 @@ pub async fn node(
fn multiaddr(ip_address: Ipv4Addr, port: u16) -> Multiaddr {
format!("/ip4/{}/tcp/{}", ip_address, port).parse().unwrap()
}

// #[async_trait]
// pub trait Handler: Send + 'static {
// async fn process(&self, address: PublicAddress, message: Vec<u8>);
// }

// #[derive(Clone)]
// pub struct MessageReceiver {
// /// number of calls to proceed()
// count_proceed: Arc<Mutex<usize>>,

// /// actual message received
// message_received: Arc<Mutex<Vec<u8>>>,

// /// source of message
// origin: Arc<Mutex<PublicAddress>>,
// }

// impl MessageReceiver {
// fn new() -> Self {
// Self {
// count_proceed: Arc::new(Mutex::new(usize::default())),
// message_received: Arc::new(Mutex::new(Vec::default())),
// origin: Arc::new(Mutex::new(PublicAddress::default())),
// }
// }

// async fn received(&self) -> bool {
// *self.count_proceed.lock().await > 0
// }

// async fn get_message(&self) -> Vec<u8> {
// self.message_received.lock().await.to_vec()
// }

// async fn get_origin(&self) -> PublicAddress {
// self.origin.lock().await.to_owned()
// }
// }

// #[async_trait]
// impl Handler for MessageReceiver {
// async fn process(&self, address: PublicAddress, message: Vec<u8>) {
// *self.count_proceed.lock().await += 1;
// *self.message_received.lock().await = message;
// *self.origin.lock().await = address;
// }
// }
You are viewing a condensed version of this merge commit. You can view the full changes here.