diff --git a/Cargo.toml b/Cargo.toml index 3cf225b..072f96b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pchain_network" -version = "0.5.0" +version = "0.6.0" authors = ["ParallelChain Lab "] edition = "2021" description = "parallelchain-network: Libp2p based Network Layer implementation for ParallelChain Mainnet." diff --git a/src/behaviour.rs b/src/behaviour.rs index d8f46a0..fd847e4 100644 --- a/src/behaviour.rs +++ b/src/behaviour.rs @@ -77,7 +77,7 @@ impl Behaviour { } fn kad_config(peer_id: PeerId, protocol_name: String) -> kad::Behaviour { - let protocol_name = StreamProtocol::try_from_owned(protocol_name).unwrap(); + let protocol_name: StreamProtocol = StreamProtocol::try_from_owned(protocol_name).unwrap(); let kad_config = kad::Config::default() .set_protocol_names(vec![protocol_name]) .set_record_filtering(kad::StoreInserts::FilterBoth) @@ -96,24 +96,11 @@ impl Behaviour { } fn gossipsub_config(keypair: &Keypair) -> gossipsub::Behaviour { - let build_msg_id = |msg: &gossipsub::Message| { - let mut id_str = msg.topic.to_string(); - let src_peer_id = match msg.source { - Some(src) => base64url::encode(src.to_bytes()), - None => "none".to_string(), - }; - id_str.push_str(&src_peer_id); - id_str.push_str(&msg.sequence_number.unwrap_or_default().to_string()); - MessageId::from(id_str) - }; - let gossip = gossipsub::Behaviour::new( MessageAuthenticity::Signed(keypair.clone().into()), ConfigBuilder::default() .max_transmit_size(MAX_TRANSMIT_SIZE * MEGABYTES) // block size is limitted to 2 MB. Multiply by factor of safety = 2. - .message_id_fn(build_msg_id) .heartbeat_interval(Duration::from_secs(HEARTBEAT_INTERVAL)) - .allow_self_origin(true) .build() .unwrap(), ) @@ -146,6 +133,15 @@ impl Behaviour { Ok(()) } + /// Unsubscribe from [Topic] + pub fn unsubscribe(&mut self, topics: Vec) -> Result<(), gossipsub::PublishError> { + for topic in topics { + self.gossip.unsubscribe(&topic.into())?; + } + + Ok(()) + } + /// Publish a [Message] to peers subscribed to the [Topic] pub fn publish(&mut self, topic: Topic, msg: Message) -> Result { self.gossip.publish(topic.hash(), msg) @@ -227,7 +223,7 @@ mod test { peer_discovery_interval: 10, kademlia_protocol_name: String::from("/test"), }; - + let public_address = config.keypair.verifying_key().to_bytes(); let behaviour = Behaviour::new( @@ -309,7 +305,7 @@ mod test { source: None, data: vec![], sequence_number: None, - topic: Topic::DroppedTxns.hash(), + topic: Topic::Mempool.hash(), }; let subscribed_topics: Vec<&MessageTopicHash> = peer.behaviour.gossip.topics().collect(); diff --git a/src/config.rs b/src/config.rs index 9490b64..bbec4e2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,7 +9,7 @@ //! //! ```no_run //! let config = Config { -//! keypair, //pchain_types::cryptography::Keypair, +//! keypair: pchain_types::cryptography::Keypair, //! topics_to_subscribe: vec![Topic::HotStuffRsBroadcast], //! listening_port: 25519, //! boot_nodes: vec![], @@ -53,6 +53,5 @@ pub(crate) fn fullnode_topics(public_address: PublicAddress) -> Vec { Topic::HotStuffRsBroadcast, Topic::HotStuffRsSend(public_address), Topic::Mempool, - Topic::DroppedTxns, ] -} +} \ No newline at end of file diff --git a/src/conversions.rs b/src/conversions.rs index b6bfd46..1f801db 100644 --- a/src/conversions.rs +++ b/src/conversions.rs @@ -11,32 +11,29 @@ //! - TryFrom<[PublicAddress]> for [PeerId] //! - filter_gossipsub_messages([libp2p::gossipsub::Message], [pchain_types::cryptography::PublicAddress]) to [Message] -use libp2p::identity::{self, DecodingError, OtherVariantError, PeerId, - ed25519::PublicKey -}; +use libp2p::identity::{self, ed25519, DecodingError, OtherVariantError, PeerId}; use libp2p::Multiaddr; use std::net::Ipv4Addr; use borsh::BorshDeserialize; +use pchain_types::{cryptography, rpc}; use crate::messages::{ - DroppedTxnMessage, Message, - Topic::{HotStuffRsBroadcast,HotStuffRsSend,Mempool,DroppedTxns} + Topic::{HotStuffRsBroadcast,HotStuffRsSend,Mempool} }; use crate::config::fullnode_topics; - -/// PublicAddress(PublicAddress) is wrapper around [PublicAddress](pchain_types::cryptography::PublicAddress). -pub struct PublicAddress(pchain_types::cryptography::PublicAddress); +/// [PublicAddress(PublicAddress)] is wrapper around [PublicAddress](cryptography::PublicAddress). +pub struct PublicAddress(cryptography::PublicAddress); impl PublicAddress { - pub fn new(addr: pchain_types::cryptography::PublicAddress) -> Self { + pub fn new(addr: cryptography::PublicAddress) -> Self { PublicAddress(addr) } } -impl From for pchain_types::cryptography::PublicAddress { - fn from(peer: PublicAddress) -> pchain_types::cryptography::PublicAddress { +impl From for cryptography::PublicAddress { + fn from(peer: PublicAddress) -> cryptography::PublicAddress { peer.0 } } @@ -55,7 +52,7 @@ impl TryFrom for PeerId { type Error = DecodingError; fn try_from(public_addr: PublicAddress) -> Result { - let kp = PublicKey::try_from_bytes(&public_addr.0)?; + let kp = ed25519::PublicKey::try_from_bytes(&public_addr.0)?; let public_key: identity::PublicKey = kp.into(); Ok(public_key.to_peer_id()) } @@ -81,30 +78,25 @@ impl From for PublicAddressTryFromPeerIdError { /// converts [Message](libp2p::gossipsub::Message) to [Message](Message) while /// filtering for message topics that can be forwarded to fullnode -pub fn filter_gossipsub_messages(message: libp2p::gossipsub::Message, local_public_address: pchain_types::cryptography::PublicAddress) --> Result { +pub fn filter_gossipsub_messages(message: libp2p::gossipsub::Message, local_public_address: cryptography::PublicAddress) -> Result { let (topic_hash, data) = (message.topic, message.data); let mut data = data.as_slice(); - - let topic = fullnode_topics(local_public_address) - .into_iter() - .find(|t| t.clone().hash() == topic_hash) - .ok_or(InvalidTopicError)?; - - match topic { - HotStuffRsBroadcast | HotStuffRsSend(_) => { - let message = hotstuff_rs::messages::Message::deserialize(&mut data).map(Message::HotStuffRs)?; - Ok(message) - }, - Mempool => { - let message = pchain_types::blockchain::TransactionV1::deserialize(&mut data).map(Message::Mempool)?; - Ok(message) - }, - DroppedTxns => { - let message = DroppedTxnMessage::deserialize(&mut data).map(Message::DroppedTxns)?; - Ok(message) + + let topic = fullnode_topics(local_public_address) + .into_iter() + .find(|t| t.clone().hash() == topic_hash) + .ok_or(InvalidTopicError)?; + + match topic { + HotStuffRsBroadcast | HotStuffRsSend(_) => { + let message = hotstuff_rs::messages::Message::deserialize(&mut data).map(Message::HotStuffRs)?; + Ok(message) + }, + Mempool => { + let message = rpc::TransactionV1OrV2::deserialize(&mut data).map(Message::Mempool)?; + Ok(message) + }, } - } } #[derive(Debug)] diff --git a/src/lib.rs b/src/lib.rs index 1455cb6..724e6f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,8 +25,8 @@ //! kademlia_protocol_name // "/pchain_p2p/1.0.0" //! }; //! -//! // 2. Create message handlers -//! let (tx, rx) = tokio::sync::mpsc::channel(); +//! // 2. Create message handler +//! let (tx, rx) = tokio::sync::mpsc::channel(100); //! let message_sender = tx.clone(); //! let message_handler = move |msg_origin: [u8;32], msg: Message| { //! match msg { @@ -37,7 +37,7 @@ //! _ => {} //! } //! }; -//! +//! //! // 3. Start P2P network. //! let peer = Peer::start(config, Box::new(message_handler)) //! .await diff --git a/src/messages.rs b/src/messages.rs index 3238e64..16271d9 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -15,9 +15,8 @@ use borsh::{BorshSerialize, BorshDeserialize}; use libp2p::gossipsub::IdentTopic; use pchain_types::{ - blockchain::TransactionV1, - cryptography::{PublicAddress, Sha256Hash}, - serialization::Serializable, + cryptography::PublicAddress, + rpc::TransactionV1OrV2, }; /// Hash of the message topic. @@ -29,7 +28,6 @@ pub enum Topic { HotStuffRsBroadcast, HotStuffRsSend(PublicAddress), Mempool, - DroppedTxns, } impl Topic { @@ -41,10 +39,9 @@ impl Topic { impl From for IdentTopic { fn from(topic: Topic) -> Self { let str = match topic { - Topic::HotStuffRsBroadcast => "consensus".to_string(), - Topic::HotStuffRsSend(addr) => base64url::encode(addr), + Topic::HotStuffRsBroadcast => "hotstuff_rs".to_string(), + Topic::HotStuffRsSend(addr) => String::from("hotstuff_rs/") + &base64url::encode(addr), Topic::Mempool => "mempool".to_string(), - Topic::DroppedTxns => "droppedTx".to_string(), }; IdentTopic::new(str) } @@ -55,69 +52,22 @@ impl From for IdentTopic { #[derive(Clone, BorshSerialize, BorshDeserialize)] pub enum Message { HotStuffRs(hotstuff_rs::messages::Message), - Mempool(TransactionV1), - DroppedTxns(DroppedTxnMessage), + Mempool(TransactionV1OrV2), } impl From for Vec { fn from(msg: Message) -> Self { match msg { Message::HotStuffRs(msg) => msg.try_to_vec().unwrap(), - Message::Mempool(txn) => Serializable::serialize(&txn), - Message::DroppedTxns(msg) => msg.try_to_vec().unwrap(), + Message::Mempool(txn) => { + let mut data: Vec = Vec::new(); + TransactionV1OrV2::serialize(&txn, &mut data).unwrap(); + data + } } } } -/// [DroppedTxnMessage] defines message content for [Message::DroppedTxns]. -#[derive(Clone, borsh::BorshSerialize, borsh::BorshDeserialize)] -pub enum DroppedTxnMessage { - MempoolDroppedTx { - txn: TransactionV1, - status_code: DroppedTxnStatusCode, - }, - ExecutorDroppedTx { - tx_hash: Sha256Hash, - status_code: DroppedTxnStatusCode, - }, -} - -#[derive(Clone)] -pub enum DroppedTxnStatusCode { - Invalid, - NonceTooLow, - NonceInaccessible, -} - -impl From<&DroppedTxnStatusCode> for u16 { - fn from(status_code: &DroppedTxnStatusCode) -> Self { - match status_code { - DroppedTxnStatusCode::Invalid => 0x515_u16, - DroppedTxnStatusCode::NonceTooLow => 0x516_u16, - DroppedTxnStatusCode::NonceInaccessible => 0x517_u16, - } - } -} - -impl borsh::BorshSerialize for DroppedTxnStatusCode { - fn serialize(&self, writer: &mut W) -> std::io::Result<()> { - let status_code: u16 = self.into(); - BorshSerialize::serialize(&status_code, writer) - } -} - -impl borsh::BorshDeserialize for DroppedTxnStatusCode { - fn deserialize_reader(reader: &mut R) -> std::io::Result { - let status_code = match u16::deserialize_reader(reader) { - Ok(0x515_u16) => DroppedTxnStatusCode::Invalid, - Ok(0x516_u16) => DroppedTxnStatusCode::NonceTooLow, - Ok(0x517_u16) => DroppedTxnStatusCode::NonceInaccessible, - _ => panic!("Invalid droppedTx status code."), - }; - Ok(status_code) - } -} - #[cfg(test)] mod test { @@ -128,19 +78,15 @@ mod test { #[test] fn test_message_topic() { let hotstuff_broadcast_topic = Topic::HotStuffRsBroadcast; - let ident_topic = IdentTopic::new("consensus".to_string()); + let ident_topic = IdentTopic::new("hotstuff_rs".to_string()); assert_eq!(hotstuff_broadcast_topic.hash(), ident_topic.hash()); let hotstuff_send_topic = Topic::HotStuffRsSend([1u8; 32]); - let ident_topic = IdentTopic::new(base64url::encode([1u8; 32])); + let ident_topic = IdentTopic::new(String::from("hotstuff_rs/") + &base64url::encode([1u8; 32])); assert_eq!(hotstuff_send_topic.hash(), ident_topic.hash()); let mempool_topic = Topic::Mempool; let ident_topic = IdentTopic::new("mempool".to_string()); assert_eq!(mempool_topic.hash(), ident_topic.hash()); - - let droppedtxn_topic = Topic::DroppedTxns; - let ident_topic = IdentTopic::new("droppedTx".to_string()); - assert_eq!(droppedtxn_topic.hash(), ident_topic.hash()); } } diff --git a/src/peer.rs b/src/peer.rs index 895a343..2ba82ba 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -8,55 +8,57 @@ //! It starts a ParallelChain Network peer and keeps the peer alive -- the peer stops working when the //! thread is dropped. //! -//! To start a pchain-network peer, users pass a [Config] instance and message handlers into Peer::start(). +//! To start a pchain-network peer, users pass a [Config] instance and the message handler into Peer::start(). //! [Config] contains the peer's keypair, or other deployment-specific parameters, such as listening ports, bootstrap nodes etc. //! Users need to define the message handler for processing the [Message]. //! Starting Peer will return a Sender for delivering PeerCommand to the thread. //! //! Example: +//! //! 1. Define the configurations //! ``` //! let config = Config {...} //! ``` -//! 2. Define the message handler +//! // 2. Define the message handler //! ``` -//! let(tx,rx) = mpsc::channel(); +//! let(tx,rx) = mpsc::channel(100); //! let message_sender = tx.clone(); //! let message_handler = move |msg_origin: [u8;32], msg: Message| { //! let _ = message_sender.send((msg_origin, msg)); //! }; -//! ``` +//! ``` //! 3. Start the peer -//! ``` -//! let peer = Peer::start(config, Box::new(message_handler)).await.unwrap(); -//! ``` +//! ``` +//! let peer = Peer::start(config, message_handler).await.unwrap(); +//! ``` //! 4. Send PeerCommand -//! ``` +//! ``` //! peer.broadcast_mempool_msg(txn); -//! ``` +//! ``` use futures::StreamExt; use tokio::task::JoinHandle; use libp2p::{ + kad::KBucketKey, gossipsub, - identify, identity::{self, ed25519::Keypair}, noise, + identify, identity::{self, ed25519::Keypair, PeerId}, noise, SwarmBuilder, swarm::SwarmEvent, tcp, }; + use libp2p_mplex::MplexConfig; -use pchain_types::cryptography::PublicAddress; -use pchain_types::blockchain::TransactionV1; -use std::net::Ipv4Addr; -use std::time::Duration; +use pchain_types::{cryptography::PublicAddress, rpc::TransactionV1OrV2}; +use std::{net::Ipv4Addr,time::Duration}; use crate::{ behaviour::{Behaviour, NetworkEvent}, conversions, - messages::{DroppedTxnMessage,Message, Topic}, + messages::{Message, Topic}, config::Config, }; +const MAX_REPLICA_DISTANCE: u32 = 255; pub struct Peer { /// Network handle for the [tokio::task] which is the main thread for the p2p network @@ -68,7 +70,7 @@ pub struct Peer { } impl Peer { -/// Constructs a [Peer] from the given configuration and handlers, starting the thread for the p2p network +/// Constructs a [Peer] from the given configuration and handler, starting the thread for the p2p network /// 1. Load network configuration to set up transport for the P2P network. /// 2. Peer starts listening on the given config address /// 3. Establishes connection to the network by adding bootnodes and subscribing to message [Topic]. @@ -78,7 +80,7 @@ impl Peer { let mut swarm = set_up_transport(&config) .await - .map_err(PeerStartError::SystemConfigError)?; + .map_err(PeerStartError::BuildTransportError)?; swarm.listen_on(conversions::multi_addr( Ipv4Addr::new(0, 0, 0, 0), @@ -98,20 +100,13 @@ impl Peer { ) } - pub fn broadcast_mempool_msg(&self, txn: TransactionV1) { + pub fn broadcast_mempool_msg(&self, txn: TransactionV1OrV2) { let _ = self.sender.try_send(PeerCommand::Publish( Topic::Mempool, Message::Mempool(txn), )); } - pub fn broadcast_dropped_tx_msg(&self, msg: DroppedTxnMessage) { - let _ = self.sender.try_send(PeerCommand::Publish( - Topic::DroppedTxns, - Message::DroppedTxns(msg), - )); - } - pub fn broadcast_hotstuff_rs_msg(&self, msg: hotstuff_rs::messages::Message) { let _ = self.sender.try_send(PeerCommand::Publish( Topic::HotStuffRsBroadcast, @@ -145,7 +140,7 @@ pub(crate) enum PeerCommand { } /// Loads the network configuration from [Config] and build the transport for the P2P network -async fn set_up_transport(config: &Config) -> Result,std::io::Error> { +async fn set_up_transport(config: &Config) -> Result,libp2p::noise::Error> { // Read network configuration let local_keypair = &config.keypair; let local_public_address: PublicAddress = local_keypair.verifying_key().to_bytes(); @@ -168,7 +163,7 @@ async fn set_up_transport(config: &Config) -> Result,st tcp::Config::new().nodelay(true), noise::Config::new, (MplexConfig::default, libp2p::yamux::Config::default) - ).unwrap() + )? .with_behaviour(|_| behaviour).unwrap() .with_swarm_config(|config| config.with_idle_connection_timeout(Duration::from_secs(20))) .build(); @@ -227,7 +222,13 @@ fn establish_network_connections(mut swarm: libp2p::Swarm , config: & fn start_event_handling(mut swarm: libp2p::Swarm, config: &Config, mut message_handler: Box) -> (JoinHandle<()>,tokio::sync::mpsc::Sender) { // 4. Start p2p networking - let local_public_address: PublicAddress = config.keypair.verifying_key().to_bytes(); + let local_keypair = &config.keypair; + let local_public_address: PublicAddress = local_keypair.verifying_key().to_bytes(); + let local_libp2p_keypair = Keypair::try_from_bytes(config.keypair.to_keypair_bytes().as_mut_slice()).unwrap(); + let local_peer_id = identity::Keypair::from(local_libp2p_keypair.clone()) + .public() + .to_peer_id(); + let (sender, mut receiver) = tokio::sync::mpsc::channel::(config.outgoing_msgs_buffer_capacity); let mut discover_tick = @@ -306,9 +307,29 @@ fn start_event_handling(mut swarm: libp2p::Swarm, config: &Config, mu info.listen_addrs.iter().for_each(|a| { swarm.behaviour_mut().add_address(&peer_id, a.clone()); }); + + // subscribe to individual topic of closest replicas when added to Kademlia Kbucket + if let Ok(addr) = conversions::PublicAddress::try_from(peer_id) { + let public_addr: PublicAddress = addr.into(); + let topic = Topic::HotStuffRsSend(public_addr); + if !swarm.behaviour().is_subscribed(&topic.clone().hash()) + && is_close_peer(&local_peer_id, &peer_id) + { + let _ = swarm.behaviour_mut().subscribe(vec![topic]); + } + } } SwarmEvent::ConnectionClosed { peer_id, .. } => { swarm.behaviour_mut().remove_peer(&peer_id); + + // unsubscribe from individual topic of replicas if disconnected + if let Ok(addr) = conversions::PublicAddress::try_from(peer_id) { + let public_addr: PublicAddress = addr.into(); + let topic = Topic::HotStuffRsSend(public_addr); + if swarm.behaviour().is_subscribed(&topic.clone().hash()) { + let _ = swarm.behaviour_mut().unsubscribe(vec![topic]); + } + } } _ => {} } @@ -319,10 +340,22 @@ fn start_event_handling(mut swarm: libp2p::Swarm, config: &Config, mu (network_thread_handle, sender) } +/// Check the distance between 2 peers. Subscribe to new peer's individual topic +/// if the distance is below [MAX_REPLICA_DISTANCE] +fn is_close_peer(peer_1: &PeerId, peer_2: &PeerId) -> bool { + let peer_1_key = KBucketKey::from(*peer_1); + let peer_2_key = KBucketKey::from(*peer_2); + // returns the distance in base2 logarithm ranging from 0 - 256 + let distance = KBucketKey::distance(&peer_1_key, &peer_2_key) + .ilog2() + .unwrap_or(0); + distance < MAX_REPLICA_DISTANCE +} + #[derive(Debug)] pub enum PeerStartError { - /// Failed to read from system configuration path - SystemConfigError(std::io::Error), + /// Error building TCP transport + BuildTransportError(libp2p::noise::Error), /// Failed to subscribe to a topic on gossipsub SubscriptionError(libp2p::gossipsub::SubscriptionError), diff --git a/tests/network.rs b/tests/network.rs index 7fce819..ec516d5 100644 --- a/tests/network.rs +++ b/tests/network.rs @@ -1,4 +1,5 @@ use std::{net::Ipv4Addr, time::Duration}; +use tokio::sync::mpsc; use borsh::BorshSerialize; use hotstuff_rs::messages::SyncRequest; @@ -8,9 +9,15 @@ use pchain_network::{ config::Config, messages::{Topic, Message}, }; -use pchain_types::{blockchain::TransactionV1, cryptography::PublicAddress}; -fn base_tx(signer: PublicAddress) -> TransactionV1 { +use pchain_types::{ + blockchain::{TransactionV1, TransactionV2}, + rpc, + cryptography, + cryptography::PublicAddress +}; + +fn base_tx_v1(signer: PublicAddress) -> TransactionV1 { TransactionV1 { signer, nonce: 0, @@ -23,6 +30,19 @@ fn base_tx(signer: PublicAddress) -> TransactionV1 { } } +fn base_tx_v2(signer: PublicAddress) -> TransactionV2 { + TransactionV2 { + signer, + nonce: 0, + commands: vec![], + gas_limit: 200000, + max_base_fee_per_gas: 8, + priority_fee_per_gas: 0, + hash: [1u8; 32], + signature: [1u8; 64], + } +} + fn create_sync_req(start_height: u64) -> hotstuff_rs::messages::Message { let test_message = hotstuff_rs::messages::SyncMessage::SyncRequest(SyncRequest { start_height, @@ -57,12 +77,12 @@ async fn test_broadcast() { let mut sending_tick = tokio::time::interval(Duration::from_secs(1)); let mut receiving_tick = tokio::time::interval(Duration::from_secs(2)); - let message = Message::Mempool(base_tx(address_1)); + let message = Message::Mempool(rpc::TransactionV1OrV2::V1(base_tx_v1(address_1))); loop { tokio::select! { _ = sending_tick.tick() => { - node_1.broadcast_mempool_msg(base_tx(address_1)); + node_1.broadcast_mempool_msg(rpc::TransactionV1OrV2::V1(base_tx_v1(address_1))); if sending_limit == 0 { break } sending_limit -= 1; } @@ -147,7 +167,7 @@ async fn test_send_to() { async fn test_send_to_only_specific_receiver() { let (keypair_1, address_1) = generate_peer(); let (keypair_2, address_2) = generate_peer(); - let (keypair_3, address_3) = generate_peer(); + let (keypair_3, address_3) = generate_peer(); let (node_1, _message_receiver_1) = node( keypair_1, @@ -380,7 +400,7 @@ async fn test_broadcast_different_topics() { loop { tokio::select! { _ = sending_tick.tick() => { - node_1.broadcast_mempool_msg(base_tx(address_1)); + node_1.broadcast_mempool_msg(rpc::TransactionV1OrV2::V1(base_tx_v1(address_1))); if sending_limit == 0 { break } sending_limit -= 1; } @@ -452,11 +472,10 @@ pub async fn node( listening_port: u16, boot_nodes: Vec<([u8;32], Ipv4Addr, u16)>, topics_to_subscribe: Vec -) -> (Peer, tokio::sync::mpsc::Receiver<(PublicAddress, Message)>) { - - let local_keypair = pchain_types::cryptography::Keypair::from_keypair_bytes(&keypair.to_bytes()).unwrap(); +) -> (Peer, mpsc::Receiver<(PublicAddress, Message)>) { + let local_keypair = cryptography::Keypair::from_keypair_bytes(&keypair.to_bytes()).unwrap(); let config = Config { - keypair:local_keypair, + keypair: local_keypair, topics_to_subscribe, listening_port, boot_nodes, @@ -465,7 +484,7 @@ pub async fn node( kademlia_protocol_name: String::from("/pchain_p2p/1.0.0") }; - let(tx,rx) = tokio::sync::mpsc::channel(100); + let(tx,rx) = mpsc::channel(100); let message_sender = tx.clone(); let message_handler = move |msg_origin: [u8;32], msg: Message| { @@ -478,11 +497,10 @@ pub async fn node( // process mempool message let _ = message_sender.try_send((msg_origin, Message::Mempool(mempool_message))); } - _ => {} } }; let peer = Peer::start(config, Box::new(message_handler)).await.unwrap(); (peer, rx) -} +} \ No newline at end of file