diff --git a/src/behaviour.rs b/src/behaviour.rs index 2717758..2fbfcf2 100644 --- a/src/behaviour.rs +++ b/src/behaviour.rs @@ -248,7 +248,6 @@ mod test { } } - // TODO jonas: please verify if theres a better way to check if a peer is added #[test] fn test_add_and_remove_peer() { let mut peer1 = create_new_peer(); @@ -258,23 +257,27 @@ mod test { .behaviour .add_address(&peer2.peer_id, peer2.multi_addr); - let peer_num: usize = peer1 + let peer1_added_peer2 = peer1 .behaviour .kad .kbuckets() - .map(|x| x.num_entries()) - .sum(); - assert_eq!(peer_num, 1); - + .find(|entry| + entry.iter().find(|bucket| + *bucket.node.key.preimage() == peer2.peer_id).is_some()); + + assert!(peer1_added_peer2.is_some()); + peer1.behaviour.remove_peer(&peer2.peer_id); - let peer_num: usize = peer1 + let peer1_added_peer2 = peer1 .behaviour .kad .kbuckets() - .map(|x| x.num_entries()) - .sum(); - assert_eq!(peer_num, 0); + .find(|entry| + entry.iter().find(|bucket| + *bucket.node.key.preimage() == peer2.peer_id).is_some()); + + assert!(peer1_added_peer2.is_none()); } #[test] diff --git a/src/config.rs b/src/config.rs index d7b1200..4deaf2b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -20,10 +20,8 @@ //! }; //! ``` //! -use libp2p::{ - identity::{ed25519::Keypair, PeerId}, - Multiaddr, -}; +use std::net::Ipv4Addr; +use libp2p::identity::ed25519::Keypair; use pchain_types::cryptography::PublicAddress; use crate::messages::Topic; @@ -39,7 +37,7 @@ pub struct Config { pub listening_port: u16, /// Bootstrap nodes for the initial connection - pub boot_nodes: Vec<(PeerId, Multiaddr)>, + pub boot_nodes: Vec<(PublicAddress, Ipv4Addr, u16)>, /// Buffer size of outgoing messages pub outgoing_msgs_buffer_capacity: usize, diff --git a/src/engine.rs b/src/engine.rs index 8a7aa99..78cce98 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -39,6 +39,7 @@ use libp2p::{ core::{muxing::StreamMuxerBox, transport::Boxed}, dns::TokioDnsConfig, gossipsub::{self, TopicHash}, + Multiaddr, identify, identity, noise, swarm::{SwarmBuilder, SwarmEvent}, tcp, yamux, PeerId, Transport, @@ -89,9 +90,12 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result { // 2. Connection to bootstrap nodes if !config.boot_nodes.is_empty() { config.boot_nodes.iter().for_each(|peer_info| { - swarm + let multiaddr = multi_addr(peer_info.1, peer_info.2); + if let Ok(peer_id) = &conversions::PublicAddress::new(peer_info.0).try_into() { + swarm .behaviour_mut() - .add_address(&peer_info.0, peer_info.1.clone()); + .add_address(peer_id, multiaddr); + } }); } @@ -111,7 +115,7 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result { // 4.1 Wait for the following events: let (engine_command, event) = tokio::select! { biased; - // Receive a EngineCommand from application + // Receive an EngineCommand from application engine_command = receiver.recv() => { (engine_command, None) }, @@ -127,18 +131,18 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result { }, }; - // 4.2 Deliver messages when a EngineCommand from an application is received + // 4.2 Deliver messages when an EngineCommand::Publish from the application is received + // and shutdown engine when an EngineCommand::Shutdown from the application is received if let Some(engine_command) = engine_command { match engine_command { EngineCommand::Publish(topic, message) => { log::info!("Publishing (Topic: {:?})", topic); if topic == Topic::HotStuffRsSend(local_public_address) { // send to myself - let _ = peer - .handlers - .iter() - .map(|handler| handler(local_public_address, message.clone())); - } else if let Err(e) = swarm.behaviour_mut().publish(topic, message) { + peer.handlers.iter() + .for_each(|handler| handler(local_public_address, message.clone())); + } + else if let Err(e) = swarm.behaviour_mut().publish(topic, message) { log::debug!("Failed to pulish the message. {:?}", e); } } @@ -163,13 +167,14 @@ pub(crate) async fn start(peer: PeerBuilder) -> Result { let public_addr: PublicAddress = public_addr.into(); if swarm.behaviour().is_subscribed(&message) { // Send it to ourselves if we subscribed to this topic - if let Some(topic) = identify_topic(message.topic, public_addr) + + if let Some(topic) = identify_topic(message.topic, local_public_address) { if let Ok(message) = deserialize_message(message.data, topic) { - let _ = peer.handlers.iter().map(|handler| { - handler(local_public_address, message.clone()) + peer.handlers.iter().for_each(|handler| { + handler(public_addr, message.clone()) }); } } @@ -221,8 +226,8 @@ async fn build_transport( } /// Identify the [crate::messages::Topic] of the message -fn identify_topic(topic_hash: TopicHash, public_addr: PublicAddress) -> Option { - config::fullnode_topics(public_addr) +fn identify_topic(topic_hash: TopicHash, local_public_address: PublicAddress) -> Option { + config::fullnode_topics(local_public_address) .into_iter() .find(|t| t.clone().hash() == topic_hash) } @@ -241,3 +246,8 @@ fn deserialize_message(data: Vec, topic: Topic) -> Result DroppedTxnMessage::deserialize(&mut data).map(Message::DroppedTxns), } } + +/// Convert ip address [std::net::Ipv4Addr] and port [u16] into MultiAddr [libp2p::Multiaddr] type +fn multi_addr(ip_address: Ipv4Addr, port: u16) -> Multiaddr { + format!("/ip4/{}/tcp/{}", ip_address, port).parse().unwrap() +} diff --git a/src/lib.rs b/src/lib.rs index 6d79cbe..e6de3fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,27 +10,42 @@ //! //! ```no_run //! use crate::Config; -//! use crate::message_gate::MessageGateChain; +//! use crate::peer:PeerBuilder; +//! use crate::messages::Message; //! //! //! // 1. Build a configuration. -//! let config = Config::default(); -//! -//! // 2. Create message gate chain. -//! let gates = MessageGateChain::new() -//! // .append(message_gate1) -//! // .append(message_gate2) -//! // ... -//! +//! let config = Config { +//! keypair, // libp2p_identity::ed25519::Keypair::generate() +//! topics_to_subscribe, // vec![Topic::HotStuffRsBroadcast] +//! listening_port, // 25519 +//! boot_nodes, // vec![] +//! outgoing_msgs_buffer_capacity, // 8 +//! incoming_msgs_buffer_capacity, // 10 +//! peer_discovery_interval, // 10 +//! kademlia_protocol_name // "/pchain_p2p/1.0.0" +//! }; +//! +//! // 2. Create a message handler +//! let (tx, rx) = mpsc::channel(); +//! let message_sender = tx.clone(); +//! let message_handler = move |msg_orgin: [u8;32], msg: Message| { +//! // processing the message... +//! let _ = message_sender.send((msg_origin, msg)); +//! }; +//! //! // 3. Start P2P network. -//! let network = pchain_network::NetworkHandle::start(network_config, subscribe_topics, message_gate_chain).await; -//! +//! let peer = PeerBuilder::new(config) +//! .on_receive_msg(message_handler) +//! .build() +//! .await +//! .unwrap(); +//! //! // 4. Send out messages. -//! network.broadcast_mempool_tx_msg(txn); +//! peer.broadcast_mempool_msg(txn); //! //! -// TODO jonas: update example usage above pub mod behaviour; diff --git a/src/peer.rs b/src/peer.rs index 70f9825..cca829a 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -53,9 +53,9 @@ impl PeerBuilder { } pub fn on_receive_msg( - &mut self, + mut self, handler: impl Fn(PublicAddress, Message) + Send + 'static, - ) -> &mut Self { + ) -> PeerBuilder { self.handlers.push(Box::new(handler)); self } @@ -121,6 +121,8 @@ pub(crate) enum EngineCommand { Publish(Topic, Message), Shutdown, } + +#[derive(Debug)] pub enum EngineError { /// Failed to read from system configuration path SystemConfigError(std::io::Error), diff --git a/tests/network.rs b/tests/network.rs index b5ba36d..8a77092 100644 --- a/tests/network.rs +++ b/tests/network.rs @@ -1,16 +1,13 @@ -use std::{net::Ipv4Addr, sync::Arc, time::Duration}; +use std::{net::Ipv4Addr, sync::mpsc, time::Duration}; -use async_trait::async_trait; use borsh::BorshSerialize; -use futures::lock::Mutex; use hotstuff_rs::messages::SyncRequest; -use libp2p::gossipsub::TopicHash; -use libp2p::identity::ed25519::{Keypair, PublicKey}; +use libp2p::identity::ed25519::{Keypair, self}; +use pchain_network::peer::PeerBuilder; use pchain_network::{ - config::{Config, Peer}, - message_gate::{MessageGate, MessageGateChain}, - messages::{Topic, Envelope, Message}, - NetworkHandle, + config::Config, + messages::{Topic, Message}, + peer::Peer, }; use pchain_types::{blockchain::TransactionV1, cryptography::PublicAddress}; @@ -35,20 +32,30 @@ fn create_sync_req(start_height: u64) -> hotstuff_rs::messages::Message { hotstuff_rs::messages::Message::SyncMessage(test_message) } -//TODO jonas update tests // - Network: Node1, Node2 -// - Node1: keep broadcasting Mempool topic message +// - Node1: Set Node2 as bootnode, keep broadcasting Mempool topic message // - Node2: set Node1 as bootnode, listens to subscribed topics #[tokio::test] async fn test_broadcast() { - let (address_1, node_1, _) = node(30001, vec![], None, vec![]).await; - let (_address_2, _node_2, receiver_gate) = node( + let keypair_1 = ed25519::Keypair::generate(); + let address_1 = keypair_1.public().to_bytes(); + + let keypair_2 = ed25519::Keypair::generate(); + let address_2 = keypair_2.public().to_bytes(); + + let (node_1, _message_receiver_1) = node( + keypair_1, + 30001, + vec![(address_2, Ipv4Addr::new(127, 0, 0, 1), 30002)], + vec![] + ).await; + + let (_node_2, message_receiver_2) = node( + keypair_2, 30002, - vec![Peer::new(address_1, Ipv4Addr::new(127, 0, 0, 1), 30001)], - Some(Topic::Mempool), - vec![Topic::Mempool], - ) - .await; + vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30001)], + vec![Topic::Mempool] + ).await; let mut sending_limit = 10; let mut sending_tick = tokio::time::interval(Duration::from_secs(1)); @@ -59,16 +66,19 @@ async fn test_broadcast() { loop { tokio::select! { _ = sending_tick.tick() => { - node_1.broadcast_mempool_tx_msg(&base_tx(address_1)); + node_1.broadcast_mempool_msg(base_tx(address_1)); if sending_limit == 0 { break } sending_limit -= 1; } _ = receiving_tick.tick() => { - if receiver_gate.received().await { - assert_eq!(receiver_gate.get_message().await, Vec::from(message)); - assert_eq!(receiver_gate.get_origin().await, address_1); - return; - } + let node2_received = message_receiver_2.try_recv(); + if node2_received.is_ok() { + let (msg_origin, msg) = node2_received.unwrap(); + let msg_vec: Vec = msg.into(); + assert_eq!(msg_vec, Vec::from(message.clone())); + assert_eq!(msg_origin, address_1); + return + } } } } @@ -76,19 +86,29 @@ async fn test_broadcast() { } // - Network: Node1, Node2 -// - Node1: keep sending message to Node2 only +// - Node1: set Node2 as bootnode, keep sending message to Node2 only // - Node2: set Node1 as bootnode, listens to subscribed topics #[tokio::test] async fn test_send_to() { - let (address_1, node_1, _) = node(30003, vec![], None, vec![]).await; + let keypair_1 = ed25519::Keypair::generate(); + let address_1 = keypair_1.public().to_bytes(); + + let keypair_2 = ed25519::Keypair::generate(); + let address_2 = keypair_2.public().to_bytes(); - let (address_2, _node_2, receiver_gate) = node( + let (node_1, _message_receiver_1) = node( + keypair_1, + 30003, + vec![(address_2, Ipv4Addr::new(127, 0, 0, 1), 30004)], + vec![] + ).await; + + let (_node_2, message_receiver_2) = node( + keypair_2, 30004, - vec![Peer::new(address_1, Ipv4Addr::new(127, 0, 0, 1), 30003)], - None, - vec![], - ) - .await; + vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30003)], + vec![] + ).await; let mut sending_limit = 10; let mut sending_tick = tokio::time::interval(Duration::from_secs(1)); @@ -99,17 +119,19 @@ async fn test_send_to() { loop { tokio::select! { _ = sending_tick.tick() => { - node_1.send_to(address_2, message.clone()); + node_1.send_hotstuff_rs_msg(address_2, message.clone()); if sending_limit == 0 { break } sending_limit -= 1; } _ = receiving_tick.tick() => { - let node2_received = receiver_gate.received().await; - if node2_received { - assert_eq!(receiver_gate.get_message().await, message.try_to_vec().unwrap()); - assert_eq!(receiver_gate.get_origin().await, address_1); - return; - } + let node2_received = message_receiver_2.try_recv(); + if node2_received.is_ok() { + let (msg_orgin, msg) = node2_received.unwrap(); + let msg_vec: Vec = msg.into(); + assert_eq!(msg_vec, message.try_to_vec().unwrap()); + assert_eq!(msg_orgin, address_1); + return + } } } } @@ -117,26 +139,43 @@ async fn test_send_to() { } // - Network: Node1, Node2, Node3 -// - Node1: keep sending message to Node2 only -// - Node2: set Node1 as bootnode, listens to subscribed topics -// - Node3: set Node1 as bootnode, should not process any message +// - Node1: set Node2 and Node3 as bootnode, keep sending message to Node2 only +// - Node2: set Node1 and Node3 as bootnode, listens to subscribed topics +// - Node3: set Node1 and Node2 as bootnode, should not process any message #[tokio::test] async fn test_send_to_only_specific_receiver() { - let (address_1, node_1, _) = node(30005, vec![], None, vec![]).await; + let keypair_1 = ed25519::Keypair::generate(); + let address_1 = keypair_1.public().to_bytes(); - let (address_2, _node_2, _) = node( + let keypair_2 = ed25519::Keypair::generate(); + let address_2 = keypair_2.public().to_bytes(); + + let keypair_3 = ed25519::Keypair::generate(); + let address_3 = keypair_3.public().to_bytes(); + + let (node_1, _message_receiver_1) = node( + keypair_1, + 30005, + vec![(address_2, Ipv4Addr::new(127, 0, 0, 1), 30006), + (address_3, Ipv4Addr::new(127, 0, 0, 1), 30007)], + vec![] + ).await; + + let (_node_2, _message_receiver_2) = node( + keypair_2, 30006, - vec![Peer::new(address_1, Ipv4Addr::new(127, 0, 0, 1), 30005)], - None, - vec![], + vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30005), + (address_3, Ipv4Addr::new(127, 0, 0, 1), 30007)], + vec![] ) .await; - let (_address_3, _node_3, receiver_gate) = node( + let (_node_3, message_receiver_3) = node( + keypair_3, 30007, - vec![Peer::new(address_1, Ipv4Addr::new(127, 0, 0, 1), 30005)], - None, - vec![], + vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30005), + (address_2, Ipv4Addr::new(127, 0, 0, 1), 30006)], + vec![] ) .await; @@ -147,13 +186,13 @@ async fn test_send_to_only_specific_receiver() { loop { tokio::select! { _ = sending_tick.tick() => { - node_1.send_to(address_2, create_sync_req(1)); + node_1.send_hotstuff_rs_msg(address_2, create_sync_req(1)); if sending_limit == 0 { break } sending_limit -= 1; } _ = receiving_tick.tick() => { - let node3_received = receiver_gate.received().await; + let node3_received = message_receiver_3.try_recv().is_ok(); if node3_received { panic!("Wrong recipient"); } @@ -169,21 +208,35 @@ async fn test_send_to_only_specific_receiver() { // - Node1 and Node3 should receive message from each other #[tokio::test] async fn test_sparse_messaging() { - let (address_1, node_1, receiver_gate_1) = node(30008, vec![], None, vec![]).await; + let keypair_1 = ed25519::Keypair::generate(); + let address_1 = keypair_1.public().to_bytes(); - let (address_2, _node_2, _) = node( + let keypair_2 = ed25519::Keypair::generate(); + let address_2 = keypair_2.public().to_bytes(); + + let keypair_3 = ed25519::Keypair::generate(); + let address_3 = keypair_3.public().to_bytes(); + + let (node_1, message_receiver_1) = node( + keypair_1, + 30008, + vec![], + vec![] + ).await; + + let (_node_2, _message_receiver_2) = node( + keypair_2, 30009, - vec![Peer::new(address_1, Ipv4Addr::new(127, 0, 0, 1), 30008)], - None, - vec![], + vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30008)], + vec![] ) .await; - let (address_3, node_3, receiver_gate_3) = node( + let (node_3, message_receiver_3) = node( + keypair_3, 30010, - vec![Peer::new(address_2, Ipv4Addr::new(127, 0, 0, 1), 30009)], - None, - vec![], + vec![(address_2, Ipv4Addr::new(127, 0, 0, 1), 30009)], + vec![] ) .await; @@ -197,20 +250,24 @@ async fn test_sparse_messaging() { loop { tokio::select! { _ = sending_tick.tick() => { - node_1.send_to(address_3, message_to_node3.clone()); - node_3.send_to(address_1, message_to_node1.clone()); + node_1.send_hotstuff_rs_msg(address_3, message_to_node3.clone()); + node_3.send_hotstuff_rs_msg(address_1, message_to_node1.clone()); if sending_limit == 0 { break } sending_limit -= 1; } _ = receiving_tick.tick() => { - let node1_received = receiver_gate_1.received().await; - let node3_received = receiver_gate_3.received().await; - if node3_received && node1_received { - assert_eq!(receiver_gate_1.get_message().await, message_to_node1.try_to_vec().unwrap()); - assert_eq!(receiver_gate_3.get_message().await, message_to_node3.try_to_vec().unwrap()); - assert_eq!(receiver_gate_1.get_origin().await, address_3); - assert_eq!(receiver_gate_3.get_origin().await, address_1); + let node1_received = message_receiver_1.try_recv(); + let node3_received = message_receiver_3.try_recv(); + if node3_received.is_ok() && node1_received.is_ok() { + let (node1_message_origin, node1_message) = node1_received.unwrap(); + let (node3_message_origin, node3_message) = node3_received.unwrap(); + let node1_message_vec: Vec = node1_message.into(); + let node3_message_vec: Vec = node3_message.into(); + assert_eq!(node1_message_vec, message_to_node1.try_to_vec().unwrap()); + assert_eq!(node3_message_vec, message_to_node3.try_to_vec().unwrap()); + assert_eq!(node1_message_origin, address_3); + assert_eq!(node3_message_origin, address_1); return; } } @@ -220,10 +277,18 @@ async fn test_sparse_messaging() { } // - Network: Node1 -// - Node1: keep sending message itself only +// - Node1: keep sending message to itself only #[tokio::test] async fn test_send_to_self() { - let (address_1, node_1, receiver_gate) = node(30013, vec![], None, vec![]).await; + let keypair_1 = ed25519::Keypair::generate(); + let address_1 = keypair_1.public().to_bytes(); + + let (node_1, message_receiver_1) = node( + keypair_1, + 30013, + vec![], + vec![] + ).await; let mut sending_limit = 10; let mut sending_tick = tokio::time::interval(Duration::from_secs(1)); @@ -235,15 +300,17 @@ async fn test_send_to_self() { tokio::select! { //broadcast does not send to self _ = sending_tick.tick() => { - node_1.send_to(address_1, message.clone()); + node_1.send_hotstuff_rs_msg(address_1, message.clone()); if sending_limit == 0 { break } sending_limit -= 1; } _ = receiving_tick.tick() => { - let node1_received = receiver_gate.received().await; - if node1_received { - assert_eq!(receiver_gate.get_message().await, message.try_to_vec().unwrap()); - assert_eq!(receiver_gate.get_origin().await, address_1); + let node1_received = message_receiver_1.try_recv(); + if node1_received.is_ok() { + let (msg_orgin, msg) = node1_received.unwrap(); + let msg_vec: Vec = msg.into(); + assert_eq!(msg_vec, message.try_to_vec().unwrap()); + assert_eq!(msg_orgin, address_1); return } } @@ -253,19 +320,29 @@ async fn test_send_to_self() { } // - Network: Node1, Node2 -// - Node1: keep broadcasting messages whose topic is not subscribed by Node2 +// - Node1: set Node2 as bootnode, keep broadcasting message with topic that is not subscribed by Node2 // - Node2: set Node1 as bootnode, should not receive anything from Node1 #[tokio::test] async fn test_broadcast_different_topics() { - let (address_1, node_1, _) = node(30014, vec![], None, vec![]).await; + let keypair_1 = ed25519::Keypair::generate(); + let address_1 = keypair_1.public().to_bytes(); + + let keypair_2 = ed25519::Keypair::generate(); + let address_2 = keypair_2.public().to_bytes(); - let (_address_2, _node_2, receiver_gate) = node( + let (node_1, _message_receiver_1) = node( + keypair_1, + 30014, + vec![(address_2, Ipv4Addr::new(127, 0, 0, 1), 30015)], + vec![Topic::Mempool] + ).await; + + let (_node_2, message_receiver_2) = node( + keypair_2, 30015, - vec![Peer::new(address_1, Ipv4Addr::new(127, 0, 0, 1), 30014)], - Some(Topic::Mempool), - vec![Topic::HotStuffRs], - ) - .await; + vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30014)], + vec![Topic::HotStuffRsBroadcast], + ).await; let mut sending_limit = 10; let mut sending_tick = tokio::time::interval(Duration::from_secs(1)); @@ -274,12 +351,12 @@ async fn test_broadcast_different_topics() { loop { tokio::select! { _ = sending_tick.tick() => { - node_1.broadcast_mempool_tx_msg(&base_tx(address_1)); + node_1.broadcast_mempool_msg(base_tx(address_1)); if sending_limit == 0 { break } sending_limit -= 1; } _ = receiving_tick.tick() => { - if receiver_gate.received().await { + if message_receiver_2.try_recv().is_ok() { panic!("Received messages that are not subscribed"); } } @@ -287,87 +364,88 @@ async fn test_broadcast_different_topics() { } } -pub async fn node( - port: u16, - boot_nodes: Vec, - gate_topic: Option, - subscribe_topics: Vec, -) -> (PublicAddress, NetworkHandle, MessageCounts) { - let keypair: Keypair = Keypair::generate_ed25519(); - let address = public_address(&keypair.public()); - let config = Config::with_keypair( - keypair - .try_into_ed25519() - .unwrap() - .to_bytes() - .try_into() - .unwrap(), - ) - .set_port(port) - .set_boot_nodes(boot_nodes); - - let gate = if !subscribe_topics.is_empty() { - MessageCounts::new(gate_topic.unwrap()) - } else { - MessageCounts::new(Topic::Mailbox(address)) - }; - let message_chain = MessageGateChain::new().append(gate.clone()); +// - Network: Node1, Node2 +// - Node1: set Node2 as bootnode, keep sending messages to Node2 only +// - Node2: set Node1 as bootnode, the handle is being dropped, should not receive message +#[tokio::test] +async fn test_stopped_node() { + let keypair_1 = ed25519::Keypair::generate(); + let address_1 = keypair_1.public().to_bytes(); + + let keypair_2 = ed25519::Keypair::generate(); + let address_2 = keypair_2.public().to_bytes(); + + let (node_1, _message_receiver_1) = node( + keypair_1, + 30016, + vec![(address_2, Ipv4Addr::new(127, 0, 0, 1), 30017)], + vec![] + ).await; + + let (node_2, message_receiver_2) = node( + keypair_2, + 30017, + vec![(address_1, Ipv4Addr::new(127, 0, 0, 1), 30016)], + vec![] + ).await; + + // Stop node by EngineCommand::Shutdown + drop(node_2); - let node = pchain_network::NetworkHandle::start(config, subscribe_topics, message_chain) - .await; + let mut sending_limit = 10; + let mut sending_tick = tokio::time::interval(Duration::from_secs(1)); + let mut receiving_tick = tokio::time::interval(Duration::from_secs(2)); - (address, node, gate) -} + let message = create_sync_req(1); -pub fn public_address(public_key: &PublicKey) -> PublicAddress { - let kp = public_key.clone().try_into_ed25519().unwrap(); - kp.to_bytes() + loop { + tokio::select! { + _ = sending_tick.tick() => { + node_1.send_hotstuff_rs_msg(address_2, message.clone()); + if sending_limit == 0 { break } + sending_limit -= 1; + } + _ = receiving_tick.tick() => { + let node2_received = message_receiver_2.try_recv(); + if node2_received.is_ok() { + panic!("node 2 should not receive messages!") + } + } + } + } } -#[derive(Clone)] -pub struct MessageCounts { - topic: Topic, - /// number of calls to proceed() - count_proceed: Arc>, - /// actual message received - message_received: Arc>>, - - /// source of message - origin: Arc>, -} - -impl MessageCounts { - fn new(topic: Topic) -> Self { - Self { - topic, - count_proceed: Arc::new(Mutex::new(usize::default())), - message_received: Arc::new(Mutex::new(Vec::default())), - origin: Arc::new(Mutex::new(PublicAddress::default())), - } - } +pub async fn node( + keypair: Keypair, + listening_port: u16, + boot_nodes: Vec<([u8;32], Ipv4Addr, u16)>, + topics_to_subscribe: Vec +) -> (Peer, std::sync::mpsc::Receiver<(PublicAddress, Message)>) { + + let config = Config { + keypair, + topics_to_subscribe, + listening_port, + boot_nodes, + outgoing_msgs_buffer_capacity: 8, + incoming_msgs_buffer_capacity: 10, + peer_discovery_interval: 10, + kademlia_protocol_name: String::from("/pchain_p2p/1.0.0") + }; - async fn received(&self) -> bool { - *self.count_proceed.lock().await > 0 - } + let(tx,rx) = mpsc::channel(); - async fn get_message(&self) -> Vec { - self.message_received.lock().await.to_vec() - } + let message_sender = tx.clone(); + let message_handler = move |msg_origin: [u8;32], msg: Message| { + let _ = message_sender.send((msg_origin, msg)); + }; - async fn get_origin(&self) -> PublicAddress { - self.origin.lock().await.to_owned() - } -} + let peer = PeerBuilder::new(config) + .on_receive_msg(message_handler) + .build() + .await + .unwrap(); -#[async_trait] -impl MessageGate for MessageCounts { - fn accepted(&self, topic_hash: &TopicHash) -> bool { - self.topic.clone().is(topic_hash) - } - async fn process(&self, envelope: Envelope) { - *self.count_proceed.lock().await += 1; - *self.message_received.lock().await = envelope.message; - *self.origin.lock().await = envelope.origin; - } + (peer, rx) }