Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
renamed variables
  • Loading branch information
ong-jonas committed Oct 20, 2023
commit 4ab7543f783670975809b742c0b872d7037fb2c3
20 changes: 10 additions & 10 deletions src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl From<PublicAddress> for pchain_types::cryptography::PublicAddress {
}

impl TryFrom<PeerId> for PublicAddress {
type Error = ConversionError;
type Error = DecodingError;

fn try_from(peer_id: PeerId) -> Result<Self, Self::Error> {
let kp = identity::PublicKey::try_decode_protobuf(&peer_id.to_bytes())?;
Expand All @@ -39,7 +39,7 @@ impl TryFrom<PeerId> for PublicAddress {
}

impl TryFrom<PublicAddress> for PeerId {
type Error = ConversionError;
type Error = PeerIdTryFromPublicAddressError;

fn try_from(public_addr: PublicAddress) -> Result<Self, Self::Error> {
let kp = ed25519::PublicKey::try_from_bytes(&public_addr.0)?;
Expand All @@ -49,20 +49,20 @@ impl TryFrom<PublicAddress> for PeerId {
}

#[derive(Debug)]
pub enum ConversionError {
pub enum PeerIdTryFromPublicAddressError {
OtherVariantError(OtherVariantError),
DecodingError(DecodingError),
}

impl From<OtherVariantError> for ConversionError {
fn from(error: OtherVariantError) -> ConversionError {
ConversionError::OtherVariantError(error)
impl From<OtherVariantError> for PeerIdTryFromPublicAddressError {
fn from(error: OtherVariantError) -> PeerIdTryFromPublicAddressError {
PeerIdTryFromPublicAddressError::OtherVariantError(error)
}
}

impl From<DecodingError> for ConversionError {
fn from(error: DecodingError) -> ConversionError {
ConversionError::DecodingError(error)
impl From<DecodingError> for PeerIdTryFromPublicAddressError {
fn from(error: DecodingError) -> PeerIdTryFromPublicAddressError {
PeerIdTryFromPublicAddressError::DecodingError(error)
}
}

Expand All @@ -88,7 +88,7 @@ mod test {
assert!(result.is_ok());

// Convert it back to PeerId
let result: Result<PeerId, ConversionError> = result.unwrap().try_into();
let result: Result<PeerId, PeerIdTryFromPublicAddressError> = result.unwrap().try_into();
assert!(result.is_ok());
assert_eq!(result.unwrap(), test_peerid);
}
Expand Down
58 changes: 29 additions & 29 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//!
//! In the event loop, it waits for:
//! 1. [NetworkEvent]
//! 2. [Commands](EngineCommand) from application for sending messages or termination
//! 2. [Commands](PeerAction) from application for sending messages or termination
//! 3. a periodic interval to discover peers in the network
//!
//! ### 1. NetworkEvent Handling
Expand All @@ -23,13 +23,13 @@
//! Upon receiving the ConnectionClosed event, peer information will be removed from the
//! routing table.
//!
//! ### 2. EngineCommand Handling
//! ### 2. PeerAction Handling
//!
//! Upon receiving a (Publish)[EngineCommand::Publish] command, the peer will publish the message to its
//! Upon receiving a (Publish)[PeerAction::Publish] command, the peer will publish the message to its
//! connected peers. The peer will process the message directly if it is a [Topic::HotStuffRsSend] message
//! directed to the peer's public address.
//!
//! Upon receiving a (Shutdown)[EngineCommand::Shutdown] command, the process will exit the loop and terminate
//! Upon receiving a (Shutdown)[PeerAction::Shutdown] command, the process will exit the loop and terminate
//! the thread.
//!

Expand All @@ -52,14 +52,14 @@ use crate::{
behaviour::{Behaviour, NetworkEvent},
conversions,
messages::{Message, Topic},
peer::EngineCommand,
peer::PeerAction,
config::Config,
};


/// [start] p2p networking peer and return the handle of this process.
pub(crate) async fn start(config: Config, message_handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>>)
-> Result<(JoinHandle<()>,tokio::sync::mpsc::Sender<EngineCommand>), EngineStartError> {
-> Result<(JoinHandle<()>,tokio::sync::mpsc::Sender<PeerAction>), PeerStartError> {
let local_keypair = config.keypair;
let local_public_address: PublicAddress = local_keypair.public().to_bytes();
let local_peer_id = identity::Keypair::from(local_keypair.clone())
Expand Down Expand Up @@ -102,18 +102,18 @@ pub(crate) async fn start(config: Config, message_handlers: Vec<Box<dyn Fn(Publi

// 4. Start p2p networking
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<EngineCommand>(config.outgoing_msgs_buffer_capacity);
tokio::sync::mpsc::channel::<PeerAction>(config.outgoing_msgs_buffer_capacity);
let mut discover_tick =
tokio::time::interval(Duration::from_secs(config.peer_discovery_interval));

let network_thread_handle = tokio::task::spawn(async move {
loop {
// 4.1 Wait for the following events:
let (engine_command, event) = tokio::select! {
let (peer_command, event) = tokio::select! {
biased;
// Receive an EngineCommand from application
engine_command = receiver.recv() => {
(engine_command, None)
// Receive a PeerAction from application
peer_command = receiver.recv() => {
(peer_command, None)
},
// Receive a NetworkEvent
event = swarm.select_next_some() => {
Expand All @@ -127,11 +127,11 @@ pub(crate) async fn start(config: Config, message_handlers: Vec<Box<dyn Fn(Publi
},
};

// 4.2 Deliver messages when an EngineCommand::Publish from the application is received
// and shutdown engine when an EngineCommand::Shutdown from the application is received
if let Some(engine_command) = engine_command {
match engine_command {
EngineCommand::Publish(topic, message) => {
// 4.2 Deliver messages when a PeerAction::Publish from the application is received
// and shutdown engine when a PeerAction::Shutdown from the application is received
if let Some(peer_command) = peer_command {
match peer_command {
PeerAction::Publish(topic, message) => {
log::info!("Publishing (Topic: {:?})", topic);
if topic == Topic::HotStuffRsSend(local_public_address) {
// send to myself
Expand All @@ -142,7 +142,7 @@ pub(crate) async fn start(config: Config, message_handlers: Vec<Box<dyn Fn(Publi
log::debug!("Failed to publish the message. {:?}", e);
}
}
EngineCommand::Shutdown => {
PeerAction::Shutdown => {
log::info!("Shutting down the engine...");
break;
}
Expand All @@ -163,11 +163,11 @@ pub(crate) async fn start(config: Config, message_handlers: Vec<Box<dyn Fn(Publi
let public_addr: PublicAddress = public_addr.into();
if swarm.behaviour().is_subscribed(&message) {
// Send it to ourselves if we subscribed to this topic
if let Ok(pchain_message) =
if let Ok(message) =
Message::try_from((message, local_public_address))
{
message_handlers.iter().for_each(|handler| {
handler(public_addr, pchain_message.clone())
handler(public_addr, message.clone())
});
}
}
Expand Down Expand Up @@ -217,7 +217,7 @@ async fn build_transport(
}

#[derive(Debug)]
pub enum EngineStartError {
pub enum PeerStartError {
/// Failed to read from system configuration path
SystemConfigError(std::io::Error),

Expand All @@ -228,20 +228,20 @@ pub enum EngineStartError {
UnsupportedAddressError(libp2p::TransportError<std::io::Error>),
}

impl From<std::io::Error> for EngineStartError {
fn from(error: std::io::Error) -> EngineStartError {
EngineStartError::SystemConfigError(error)
impl From<std::io::Error> for PeerStartError {
fn from(error: std::io::Error) -> PeerStartError {
PeerStartError::SystemConfigError(error)
}
}

impl From<libp2p::gossipsub::SubscriptionError> for EngineStartError {
fn from(error: libp2p::gossipsub::SubscriptionError) -> EngineStartError {
EngineStartError::SubscriptionError(error)
impl From<libp2p::gossipsub::SubscriptionError> for PeerStartError {
fn from(error: libp2p::gossipsub::SubscriptionError) -> PeerStartError {
PeerStartError::SubscriptionError(error)
}
}

impl From<libp2p::TransportError<std::io::Error>> for EngineStartError {
fn from(error: libp2p::TransportError<std::io::Error>) -> EngineStartError {
EngineStartError::UnsupportedAddressError(error)
impl From<libp2p::TransportError<std::io::Error>> for PeerStartError {
fn from(error: libp2p::TransportError<std::io::Error>) -> PeerStartError {
PeerStartError::UnsupportedAddressError(error)
}
}
24 changes: 12 additions & 12 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,38 +39,38 @@ use crate::messages::{DroppedTxnMessage, Message, Topic};
pub struct Peer {
/// Network handle for the [tokio::task] which is the main thread for the
/// p2p network (see [crate::engine]).
pub(crate) engine: JoinHandle<()>,
pub(crate) handle: JoinHandle<()>,

/// mpsc sender for delivering messages to the p2p network.
pub(crate) to_engine: tokio::sync::mpsc::Sender<EngineCommand>,
pub(crate) sender: tokio::sync::mpsc::Sender<PeerAction>,
}

impl Peer {
/// Constructs a [Peer] from the given configuration and handlers, and start the thread for the p2p network.
pub async fn start(config: Config, handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>>) -> Peer {
let (engine, to_engine) = crate::engine::start(config, handlers).await.unwrap();
let (handle, sender) = crate::engine::start(config, handlers).await.unwrap();
Peer {
engine,
to_engine,
handle,
sender,
}
}

pub fn broadcast_mempool_msg(&self, txn: TransactionV1) {
let _ = self.to_engine.try_send(EngineCommand::Publish(
let _ = self.sender.try_send(PeerAction::Publish(
Topic::Mempool,
Message::Mempool(txn),
));
}

pub fn broadcast_dropped_tx_msg(&self, msg: DroppedTxnMessage) {
let _ = self.to_engine.try_send(EngineCommand::Publish(
let _ = self.sender.try_send(PeerAction::Publish(
Topic::DroppedTxns,
Message::DroppedTxns(msg),
));
}

pub fn broadcast_hotstuff_rs_msg(&self, msg: hotstuff_rs::messages::Message) {
let _ = self.to_engine.try_send(EngineCommand::Publish(
let _ = self.sender.try_send(PeerAction::Publish(
Topic::HotStuffRsBroadcast,
Message::HotStuffRs(msg),
));
Expand All @@ -81,7 +81,7 @@ impl Peer {
address: PublicAddress,
msg: hotstuff_rs::messages::Message,
) {
let _ = self.to_engine.try_send(EngineCommand::Publish(
let _ = self.sender.try_send(PeerAction::Publish(
Topic::HotStuffRsSend(address),
Message::HotStuffRs(msg),
));
Expand All @@ -90,13 +90,13 @@ impl Peer {

impl Drop for Peer {
fn drop(&mut self) {
let _ = self.to_engine.try_send(EngineCommand::Shutdown);
let _ = self.sender.try_send(PeerAction::Shutdown);
}
}

/// [EngineCommand] defines commands to other pchain-network peers which includes publishing messages
/// [PeerAction] defines commands to other pchain-network peers which includes publishing messages
/// and shutting down the network when the peer is dropped.
pub(crate) enum EngineCommand {
pub(crate) enum PeerAction {
Publish(Topic, Message),
Shutdown,
}
Expand Down
2 changes: 1 addition & 1 deletion tests/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ async fn test_stopped_node() {
vec![]
).await;

// Stop node by EngineCommand::Shutdown
// Stop node by PeerAction::Shutdown
drop(node_2);

let mut sending_limit = 10;
Expand Down