Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updated variable names and comments, removed explicit declaration of …
…mailbox topics from tests
  • Loading branch information
ong-jonas committed Oct 16, 2023
commit 51e58a94ebf6f63f3e2810b3cd4709cd1bb4b352
8 changes: 4 additions & 4 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,27 +257,27 @@ mod test {
.behaviour
.add_address(&peer2.peer_id, peer2.multi_addr);

let peer1_kbuckets = peer1
let peer1_added_peer2 = peer1
.behaviour
.kad
.kbuckets()
.find(|entry|
entry.iter().find(|bucket|
*bucket.node.key.preimage() == peer2.peer_id).is_some());

assert!(peer1_kbuckets.is_some());
assert!(peer1_added_peer2.is_some());

peer1.behaviour.remove_peer(&peer2.peer_id);

let peer1_kbuckets = peer1
let peer1_added_peer2 = peer1
.behaviour
.kad
.kbuckets()
.find(|entry|
entry.iter().find(|bucket|
*bucket.node.key.preimage() == peer2.peer_id).is_some());

assert!(peer1_kbuckets.is_none());
assert!(peer1_added_peer2.is_none());
}

#[test]
Expand Down
10 changes: 5 additions & 5 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
// 4.1 Wait for the following events:
let (engine_command, event) = tokio::select! {
biased;
// Receive a EngineCommand from application
// Receive an EngineCommand from application
engine_command = receiver.recv() => {
(engine_command, None)
},
Expand All @@ -131,7 +131,8 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
},
};

// 4.2 Deliver messages when a EngineCommand from an application is received
// 4.2 Deliver messages when an EngineCommand::Publish from the application is received
// and shutdown engine when an EngineCommand::Shutdown from the application is received
if let Some(engine_command) = engine_command {
match engine_command {
EngineCommand::Publish(topic, message) => {
Expand Down Expand Up @@ -172,7 +173,7 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result<Peer, EngineError> {
if let Ok(message) =
deserialize_message(message.data, topic)
{
let _ = peer.handlers.iter().for_each(|handler| {
peer.handlers.iter().for_each(|handler| {
handler(public_addr, message.clone())
});
}
Expand Down Expand Up @@ -226,7 +227,6 @@ async fn build_transport(

/// Identify the [crate::messages::Topic] of the message
fn identify_topic(topic_hash: TopicHash, local_public_address: PublicAddress) -> Option<Topic> {
//address for fullnode_topics should be the local peer address
config::fullnode_topics(local_public_address)
.into_iter()
.find(|t| t.clone().hash() == topic_hash)
Expand All @@ -247,7 +247,7 @@ fn deserialize_message(data: Vec<u8>, topic: Topic) -> Result<Message, std::io::
}
}

/// Convert [std::net::Ipv4Addr] and port [u16] into MultiAddr[libp2p::Multiaddr] type
/// Convert ip address [std::net::Ipv4Addr] and port [u16] into MultiAddr [libp2p::Multiaddr] type
fn multi_addr(ip_address: Ipv4Addr, port: u16) -> Multiaddr {
format!("/ip4/{}/tcp/{}", ip_address, port).parse().unwrap()
}
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
//!
//! // 1. Build a configuration.
//! let config = Config {
//! keypair,
//! topics_to_subscribe,
//! listening_port,
//! boot_nodes,
//! keypair, // libp2p_identity::ed25519::Keypair::generate()
//! topics_to_subscribe, // vec![Topic::HotStuffRsBroadcast]
//! listening_port, // 25519
//! boot_nodes, // vec![]
//! outgoing_msgs_buffer_capacity, // 8
//! incoming_msgs_buffer_capacity, // 10
//! peer_discovery_interval, // 10
Expand Down
70 changes: 35 additions & 35 deletions tests/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,20 @@ fn create_sync_req(start_height: u64) -> hotstuff_rs::messages::Message {
}

// - Network: Node1, Node2
// - Node1: keep broadcasting Mempool topic message
// - Node1: Set Node2 as bootnode, keep broadcasting Mempool topic message
// - Node2: set Node1 as bootnode, listens to subscribed topics
#[tokio::test]
async fn test_broadcast() {
let keypair_1 = ed25519::Keypair::generate();
let address_1 = keypair_1.public().to_bytes();

let keypair_2 = ed25519::Keypair::generate();
let address_2 = keypair_2.public().to_bytes();

let (node_1, _message_receiver_1) = node(
keypair_1,
30001,
vec![],
vec![(address_2, Ipv4Addr::new(127, 0, 0, 1), 30002)],
vec![]
).await;

Expand Down Expand Up @@ -85,7 +86,7 @@ async fn test_broadcast() {
}

// - Network: Node1, Node2
// - Node1: keep sending message to Node2 only
// - Node1: set Node2 as bootnode, keep sending message to Node2 only
// - Node2: set Node1 as bootnode, listens to subscribed topics
#[tokio::test]
async fn test_send_to() {
Expand All @@ -98,15 +99,15 @@ async fn test_send_to() {
let (node_1, _message_receiver_1) = node(
keypair_1,
30003,
vec![],
vec![(address_2, Ipv4Addr::new(127, 0, 0, 1), 30004)],
vec![]
).await;

let (_node_2, message_receiver_2) = node(
keypair_2,
30004,
vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30003)],
vec![Topic::HotStuffRsSend(address_2)]
vec![]
).await;

let mut sending_limit = 10;
Expand Down Expand Up @@ -138,9 +139,9 @@ async fn test_send_to() {
}

// - Network: Node1, Node2, Node3
// - Node1: keep sending message to Node2 only
// - Node2: set Node1 as bootnode, listens to subscribed topics
// - Node3: set Node1 as bootnode, should not process any message
// - Node1: set Node2 and Node3 as bootnode, keep sending message to Node2 only
// - Node2: set Node1 and Node3 as bootnode, listens to subscribed topics
// - Node3: set Node1 and Node2 as bootnode, should not process any message
#[tokio::test]
async fn test_send_to_only_specific_receiver() {
let keypair_1 = ed25519::Keypair::generate();
Expand All @@ -155,23 +156,26 @@ async fn test_send_to_only_specific_receiver() {
let (node_1, _message_receiver_1) = node(
keypair_1,
30005,
vec![],
vec![Topic::HotStuffRsSend(address_1)]
vec![(address_2, Ipv4Addr::new(127, 0, 0, 1), 30006),
(address_3, Ipv4Addr::new(127, 0, 0, 1), 30007)],
vec![]
).await;

let (_node_2, _message_receiver_2) = node(
keypair_2,
30006,
vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30005)],
vec![Topic::HotStuffRsSend(address_2)]
vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30005),
(address_3, Ipv4Addr::new(127, 0, 0, 1), 30007)],
vec![]
)
.await;

let (_node_3, message_receiver_3) = node(
keypair_3,
30007,
vec![(address_2, Ipv4Addr::new(127, 0, 0, 1), 30005)],
vec![Topic::HotStuffRsSend(address_3)]
vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30005),
(address_2, Ipv4Addr::new(127, 0, 0, 1), 30006)],
vec![]
)
.await;

Expand Down Expand Up @@ -217,22 +221,22 @@ async fn test_sparse_messaging() {
keypair_1,
30008,
vec![],
vec![Topic::HotStuffRsSend(address_1)]
vec![]
).await;

let (_node_2, _message_receiver_2) = node(
keypair_2,
30009,
vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30008)],
vec![Topic::HotStuffRsSend(address_2)]
vec![]
)
.await;

let (node_3, message_receiver_3) = node(
keypair_3,
30010,
vec![(address_2, Ipv4Addr::new(127, 0, 0, 1), 30009)],
vec![Topic::HotStuffRsSend(address_3)]
vec![]
)
.await;

Expand Down Expand Up @@ -283,9 +287,8 @@ async fn test_send_to_self() {
keypair_1,
30013,
vec![],
vec![Topic::HotStuffRsSend(address_1)]
)
.await;
vec![]
).await;

let mut sending_limit = 10;
let mut sending_tick = tokio::time::interval(Duration::from_secs(1));
Expand Down Expand Up @@ -317,7 +320,7 @@ async fn test_send_to_self() {
}

// - Network: Node1, Node2
// - Node1: keep broadcasting messages whose topic is not subscribed by Node2
// - Node1: set Node2 as bootnode, keep broadcasting messages whose topic is not subscribed by Node2
// - Node2: set Node1 as bootnode, should not receive anything from Node1
#[tokio::test]
async fn test_broadcast_different_topics() {
Expand All @@ -330,18 +333,16 @@ async fn test_broadcast_different_topics() {
let (node_1, _message_receiver_1) = node(
keypair_1,
30014,
vec![],
vec![(address_2, Ipv4Addr::new(127, 0, 0, 1), 30015)],
vec![Topic::Mempool]
)
.await;
).await;

let (_node_2, message_receiver_2) = node(
keypair_2,
30015,
vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30014)],
vec![Topic::HotStuffRsBroadcast, Topic::HotStuffRsSend(address_2)],
)
.await;
vec![Topic::HotStuffRsBroadcast],
).await;

let mut sending_limit = 10;
let mut sending_tick = tokio::time::interval(Duration::from_secs(1));
Expand All @@ -364,8 +365,8 @@ async fn test_broadcast_different_topics() {
}

// - Network: Node1, Node2
// - Node1: keep sending message to Node2 only
// - Node2: Engine::Command(Shutdown), should not receive message
// - Node1: set Node2 as bootnode, keep sending message to Node2 only
// - Node2: set Node1 as bootnode, send an EngineCommand::Shutdown to application, should not receive message
#[tokio::test]
async fn test_stopped_node() {
let keypair_1 = ed25519::Keypair::generate();
Expand All @@ -377,17 +378,16 @@ async fn test_stopped_node() {
let (node_1, _message_receiver_1) = node(
keypair_1,
30016,
vec![],
vec![Topic::HotStuffRsSend(address_1)])
.await;
vec![(address_2, Ipv4Addr::new(127, 0, 0, 1), 30017)],
vec![]
).await;

let (node_2, message_receiver_2) = node(
keypair_2,
30017,
vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30016)],
vec![Topic::HotStuffRsSend(address_2)]
)
.await;
vec![]
).await;

// Stop node by EngineCommand::Shutdown
drop(node_2);
Expand Down