Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
removed impl From for PeerStartError and mapped errors in Peer::start()
  • Loading branch information
ong-jonas committed Oct 23, 2023
commit 0e2ee299582b30696c42bf3bce249b0ba1ab4167
64 changes: 28 additions & 36 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ use libp2p::{
gossipsub,
identify, identity::{self}, noise,
swarm::{SwarmBuilder, SwarmEvent},
tcp, yamux, PeerId, Transport,
tcp, yamux, PeerId, Transport, TransportError,
};
use libp2p_mplex::MplexConfig;
use pchain_types::cryptography::PublicAddress;
use pchain_types::blockchain::TransactionV1;
use std::net::Ipv4Addr;
use std::{net::Ipv4Addr, io};
use std::time::Duration;

use crate::{
Expand All @@ -58,7 +58,6 @@ use crate::{
};



pub struct Peer {
/// Network handle for the [tokio::task] which is the main thread for the p2p network
pub(crate) handle: JoinHandle<()>,
Expand All @@ -71,13 +70,23 @@ pub struct Peer {
impl Peer {
/// Constructs a [Peer] from the given configuration and handlers and start the thread for the p2p network
/// 1. Load network configuration to set up transport for the P2P network.
/// 2. Establishes connection to the network by adding bootnodes and subscribing to message [Topic].
/// 3. Spawns an asynchronous [tokio] task and enters the main event loop, returning a mpsc Sender used for sending
/// 2. Peer starts listening on the given address
/// 3. Establishes connection to the network by adding bootnodes and subscribing to message [Topic].
/// 4. Spawns an asynchronous [tokio] task and enters the event handling loop, returning a mpsc Sender used for sending
/// [PeerCommand] to the internal thread.
pub async fn start(config: Config, handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>>) -> Result<Peer, PeerStartError> {
let mut swarm = set_up_transport(&config).await?;
swarm = establish_network_connections(swarm, &config)?;
let (handle, sender) = main_loop(swarm, &config, handlers)?;
let mut swarm = set_up_transport(&config)
.await
.map_err(PeerStartError::SystemConfigError)?;

swarm = start_listening(swarm, &config)
.await
.map_err(PeerStartError::UnsupportedAddressError)?;

swarm = establish_network_connections(swarm, &config)
.map_err(PeerStartError::SubscriptionError)?;

let (handle, sender) = start_event_handling(swarm, &config, handlers);
Ok(
Peer {
handle,
Expand Down Expand Up @@ -133,7 +142,7 @@ pub(crate) enum PeerCommand {
}

/// Loads the network configuration from [Config] and build the transport for the P2P network
async fn set_up_transport(config: &Config) -> Result<libp2p::Swarm<Behaviour>,PeerStartError> {
async fn set_up_transport(config: &Config) -> Result<libp2p::Swarm<Behaviour>,std::io::Error> {
// Read network configuration
let local_keypair = &config.keypair;
let local_public_address: PublicAddress = local_keypair.public().to_bytes();
Expand All @@ -150,21 +159,24 @@ async fn set_up_transport(config: &Config) -> Result<libp2p::Swarm<Behaviour>,Pe
&local_keypair,
config.kademlia_protocol_name.clone(),
);
let swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();
Ok(swarm)
}

let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();
/// Starts listening on the given address
async fn start_listening(mut swarm: libp2p::Swarm<Behaviour>, config: &Config) -> Result<libp2p::Swarm<Behaviour>, TransportError<io::Error>> {
let multiaddr = conversions::multi_addr(
Ipv4Addr::new(0, 0, 0, 0),
config.listening_port
);
swarm.listen_on(multiaddr)?;

Ok(swarm)
}

/// Peer establishes network specific connections:
/// 1. Adds network bootnodes to local routing table
/// 2. Subscribes to network specific message [Topic]
fn establish_network_connections(mut swarm: libp2p::Swarm<Behaviour> , config: &Config) -> Result<libp2p::Swarm<Behaviour>,PeerStartError> {
fn establish_network_connections(mut swarm: libp2p::Swarm<Behaviour> , config: &Config) -> Result<libp2p::Swarm<Behaviour>,gossipsub::SubscriptionError> {
// Connection to bootstrap nodes
if !config.boot_nodes.is_empty() {
config.boot_nodes.iter().for_each(|peer_info| {
Expand All @@ -184,8 +196,7 @@ fn establish_network_connections(mut swarm: libp2p::Swarm<Behaviour> , config: &
Ok(swarm)
}

/// Spawns the Main Event loop for p2p network.
/// It waits for:
/// Spawns the Main Event Handling loop for p2p network. It waits for:
/// 1. [NetworkEvent]
/// 2. [Commands](PeerCommand) from application for sending messages or termination
/// 3. a periodic interval to discover peers in the network
Expand All @@ -211,8 +222,8 @@ fn establish_network_connections(mut swarm: libp2p::Swarm<Behaviour> , config: &
/// Upon receiving a (Shutdown)[PeerCommand::Shutdown] command, the process will exit the loop and terminate
/// the thread.
///
fn main_loop(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, message_handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>>) ->
Result<(JoinHandle<()>,tokio::sync::mpsc::Sender<PeerCommand>), PeerStartError> {
fn start_event_handling(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, message_handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>>) ->
(JoinHandle<()>,tokio::sync::mpsc::Sender<PeerCommand>) {
// 4. Start p2p networking
let local_public_address = config.keypair.public().to_bytes();
let (sender, mut receiver) =
Expand Down Expand Up @@ -306,9 +317,7 @@ fn main_loop(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, message_handl
}
});

Ok(
(network_thread_handle, sender)
)
(network_thread_handle, sender)
}


Expand Down Expand Up @@ -343,20 +352,3 @@ pub enum PeerStartError {
UnsupportedAddressError(libp2p::TransportError<std::io::Error>),
}

impl From<std::io::Error> for PeerStartError {
fn from(error: std::io::Error) -> PeerStartError {
PeerStartError::SystemConfigError(error)
}
}

impl From<libp2p::gossipsub::SubscriptionError> for PeerStartError {
fn from(error: libp2p::gossipsub::SubscriptionError) -> PeerStartError {
PeerStartError::SubscriptionError(error)
}
}

impl From<libp2p::TransportError<std::io::Error>> for PeerStartError {
fn from(error: libp2p::TransportError<std::io::Error>) -> PeerStartError {
PeerStartError::UnsupportedAddressError(error)
}
}