Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
895b3a2
new pchain_network structure according to pull request comments
Oct 10, 2023
47046ea
network subscribes to fullnode topics declared in config
ong-jonas Oct 10, 2023
8e13e7b
Added handlers field in PeerBuilder, change visibility of constants
Oct 10, 2023
8359198
filter types of messages to pass into handlers
ong-jonas Oct 10, 2023
929e2c2
Replace libp2p::identity::Keypair with libp2p::identity::ed25519::Key…
Oct 10, 2023
89af337
removed constant inputs that were redundantly passed into functions
ong-jonas Oct 10, 2023
632e0cc
Use libp2p::identity::PeerId, instead of libp2p::PeerId
Oct 11, 2023
7632917
Implemented Drop trait for Peer, fixed errors in engine.rs
Oct 11, 2023
06d7fd1
changed method of identifying message topics, updated protocol name a…
ong-jonas Oct 11, 2023
61caf60
Updated comments and docs
Oct 11, 2023
34dd150
improve deserialization of messages
ong-jonas Oct 12, 2023
309a0d6
Merge branch 'refactor' into refactor
ong-jonas Oct 12, 2023
e4e4b1b
Merge pull request #15 from ong-jonas/refactor
manngayin612 Oct 12, 2023
b6c155e
Resolved conflicts from pull request, added helper function to for me…
Oct 12, 2023
8bf6953
update comments, removed Envelope from message.rs
Oct 12, 2023
8916714
DOC: updated comments in message.rs
Oct 12, 2023
1777c61
fixed network tests
ong-jonas Oct 13, 2023
b0a5f6e
Moved configuration of PeerBuilder as argument of PeerBuilder::new(),…
Oct 13, 2023
2a699f2
Merge branch 'refactor' into tests/refactor
ong-jonas Oct 16, 2023
9e3082b
updated lib.rs example usage
ong-jonas Oct 16, 2023
52de07b
updated behaviour.rs test to check if the specific peer has been added
ong-jonas Oct 16, 2023
71e8b0a
updated config boot_nodes type to Vec<(PublicAddress, Ipv4Addr, u16)>
ong-jonas Oct 16, 2023
51e58a9
updated variable names and comments, removed explicit declaration of …
ong-jonas Oct 16, 2023
cdabd42
Update comments
Oct 16, 2023
f29a849
Updated comments in lib.rs
Oct 16, 2023
24c83d9
Merge pull request #16 from parallelchain-io/tests/refactor
manngayin612 Oct 16, 2023
cea24db
updated documentation and comments
ong-jonas Oct 17, 2023
cf2ab28
implement tryfrom gossip_message to pchain_message and moved definiti…
ong-jonas Oct 19, 2023
862a280
EngineStartError moved to mod engine
ong-jonas Oct 19, 2023
d4403a1
remove PeerBuilder and create Peer through Peer::start, engine::start…
ong-jonas Oct 19, 2023
aca23b8
convert gossipsub message to message using identify topics
ong-jonas Oct 20, 2023
4ab7543
renamed variables
ong-jonas Oct 20, 2023
20ecbbf
remove engine and replace functionality with peer
ong-jonas Oct 20, 2023
311ae56
decompose engine start into 3 separate functions
ong-jonas Oct 23, 2023
e69a8df
renamed MessageError to MessageConversionError
ong-jonas Oct 23, 2023
c9cbc37
renamed InvalidTopic to InvalidTopicError
ong-jonas Oct 23, 2023
d2f1d72
updated conversion errors
ong-jonas Oct 23, 2023
fed8fe6
removed identity topics
ong-jonas Oct 23, 2023
0e2ee29
removed impl From for PeerStartError and mapped errors in Peer::start()
ong-jonas Oct 23, 2023
6c7d3f1
removed start_listening function
ong-jonas Oct 24, 2023
a739d94
peers publish messages to themselves if they are subscribed
ong-jonas Oct 24, 2023
e51f6db
documentation and tests update
ong-jonas Oct 26, 2023
c228d2f
moved conversion of gossipsub::message to conversion.rs
ong-jonas Oct 27, 2023
702dfe2
Merge pull request #18 from parallelchain-io/message/refactor
manngayin612 Oct 27, 2023
a802936
Removed redundant incoming _msgs_buffer_capacity
Oct 27, 2023
7e37505
Renamed variable. Updated docs
Oct 27, 2023
434be9a
separated broadcast and send to self test cases
ong-jonas Oct 27, 2023
d0d1c41
Merge branch 'refactor' of https://github.com/parallelchain-io/pchain…
ong-jonas Oct 27, 2023
2448e97
updated network tests and hotstuff dependency to 0.3
ong-jonas Oct 30, 2023
0c21c1b
added conversion between pchain types Keypair and libp2p Keypair
ong-jonas Oct 31, 2023
18362f4
removed keypair conversions
ong-jonas Oct 31, 2023
2db4d17
updated documentation
ong-jonas Oct 31, 2023
5abd30b
added helper function in network tests
ong-jonas Oct 31, 2023
4904909
Updated lib.rs
Oct 31, 2023
c0f3fe0
Updated lib.rs
Nov 1, 2023
fa28fcd
Update libp2p::Kademlia deprecated packages, set allow_self_origin to…
Nov 6, 2023
710ecff
Update message handlers to be type of FnMut
Nov 9, 2023
156bb47
Fix: disambiguate serialization used for DroppedTxsStatusCode
Nov 10, 2023
03f1c78
FIX: removed deprecated dependencies
ong-jonas Nov 16, 2023
02de232
REFACTOR: removed with_dns, added nodelay(true)
ong-jonas Nov 16, 2023
74551df
REFACTOR: try_from to filter_gossipsub_messages, Vec<msg_handlers> to…
ong-jonas Nov 21, 2023
5900f43
deps: update dependency to hotstuff_rs
ong-jonas Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
renamed variables
  • Loading branch information
ong-jonas committed Oct 20, 2023
commit 4ab7543f783670975809b742c0b872d7037fb2c3
20 changes: 10 additions & 10 deletions src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl From<PublicAddress> for pchain_types::cryptography::PublicAddress {
}

impl TryFrom<PeerId> for PublicAddress {
type Error = ConversionError;
type Error = DecodingError;

fn try_from(peer_id: PeerId) -> Result<Self, Self::Error> {
let kp = identity::PublicKey::try_decode_protobuf(&peer_id.to_bytes())?;
Expand All @@ -39,7 +39,7 @@ impl TryFrom<PeerId> for PublicAddress {
}

impl TryFrom<PublicAddress> for PeerId {
type Error = ConversionError;
type Error = PeerIdTryFromPublicAddressError;

fn try_from(public_addr: PublicAddress) -> Result<Self, Self::Error> {
let kp = ed25519::PublicKey::try_from_bytes(&public_addr.0)?;
Expand All @@ -49,20 +49,20 @@ impl TryFrom<PublicAddress> for PeerId {
}

#[derive(Debug)]
pub enum ConversionError {
pub enum PeerIdTryFromPublicAddressError {
OtherVariantError(OtherVariantError),
DecodingError(DecodingError),
}

impl From<OtherVariantError> for ConversionError {
fn from(error: OtherVariantError) -> ConversionError {
ConversionError::OtherVariantError(error)
impl From<OtherVariantError> for PeerIdTryFromPublicAddressError {
fn from(error: OtherVariantError) -> PeerIdTryFromPublicAddressError {
PeerIdTryFromPublicAddressError::OtherVariantError(error)
}
}

impl From<DecodingError> for ConversionError {
fn from(error: DecodingError) -> ConversionError {
ConversionError::DecodingError(error)
impl From<DecodingError> for PeerIdTryFromPublicAddressError {
fn from(error: DecodingError) -> PeerIdTryFromPublicAddressError {
PeerIdTryFromPublicAddressError::DecodingError(error)
}
}

Expand All @@ -88,7 +88,7 @@ mod test {
assert!(result.is_ok());

// Convert it back to PeerId
let result: Result<PeerId, ConversionError> = result.unwrap().try_into();
let result: Result<PeerId, PeerIdTryFromPublicAddressError> = result.unwrap().try_into();
assert!(result.is_ok());
assert_eq!(result.unwrap(), test_peerid);
}
Expand Down
58 changes: 29 additions & 29 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//!
//! In the event loop, it waits for:
//! 1. [NetworkEvent]
//! 2. [Commands](EngineCommand) from application for sending messages or termination
//! 2. [Commands](PeerAction) from application for sending messages or termination
//! 3. a periodic interval to discover peers in the network
//!
//! ### 1. NetworkEvent Handling
Expand All @@ -23,13 +23,13 @@
//! Upon receiving the ConnectionClosed event, peer information will be removed from the
//! routing table.
//!
//! ### 2. EngineCommand Handling
//! ### 2. PeerAction Handling
//!
//! Upon receiving a (Publish)[EngineCommand::Publish] command, the peer will publish the message to its
//! Upon receiving a (Publish)[PeerAction::Publish] command, the peer will publish the message to its
//! connected peers. The peer will process the message directly if it is a [Topic::HotStuffRsSend] message
//! directed to the peer's public address.
//!
//! Upon receiving a (Shutdown)[EngineCommand::Shutdown] command, the process will exit the loop and terminate
//! Upon receiving a (Shutdown)[PeerAction::Shutdown] command, the process will exit the loop and terminate
//! the thread.
//!

Expand All @@ -52,14 +52,14 @@ use crate::{
behaviour::{Behaviour, NetworkEvent},
conversions,
messages::{Message, Topic},
peer::EngineCommand,
peer::PeerAction,
config::Config,
};


/// [start] p2p networking peer and return the handle of this process.
pub(crate) async fn start(config: Config, message_handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>>)
-> Result<(JoinHandle<()>,tokio::sync::mpsc::Sender<EngineCommand>), EngineStartError> {
-> Result<(JoinHandle<()>,tokio::sync::mpsc::Sender<PeerAction>), PeerStartError> {
let local_keypair = config.keypair;
let local_public_address: PublicAddress = local_keypair.public().to_bytes();
let local_peer_id = identity::Keypair::from(local_keypair.clone())
Expand Down Expand Up @@ -102,18 +102,18 @@ pub(crate) async fn start(config: Config, message_handlers: Vec<Box<dyn Fn(Publi

// 4. Start p2p networking
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<EngineCommand>(config.outgoing_msgs_buffer_capacity);
tokio::sync::mpsc::channel::<PeerAction>(config.outgoing_msgs_buffer_capacity);
let mut discover_tick =
tokio::time::interval(Duration::from_secs(config.peer_discovery_interval));

let network_thread_handle = tokio::task::spawn(async move {
loop {
// 4.1 Wait for the following events:
let (engine_command, event) = tokio::select! {
let (peer_command, event) = tokio::select! {
biased;
// Receive an EngineCommand from application
engine_command = receiver.recv() => {
(engine_command, None)
// Receive a PeerAction from application
peer_command = receiver.recv() => {
(peer_command, None)
},
// Receive a NetworkEvent
event = swarm.select_next_some() => {
Expand All @@ -127,11 +127,11 @@ pub(crate) async fn start(config: Config, message_handlers: Vec<Box<dyn Fn(Publi
},
};

// 4.2 Deliver messages when an EngineCommand::Publish from the application is received
// and shutdown engine when an EngineCommand::Shutdown from the application is received
if let Some(engine_command) = engine_command {
match engine_command {
EngineCommand::Publish(topic, message) => {
// 4.2 Deliver messages when a PeerAction::Publish from the application is received
// and shutdown engine when a PeerAction::Shutdown from the application is received
if let Some(peer_command) = peer_command {
match peer_command {
PeerAction::Publish(topic, message) => {
log::info!("Publishing (Topic: {:?})", topic);
if topic == Topic::HotStuffRsSend(local_public_address) {
// send to myself
Expand All @@ -142,7 +142,7 @@ pub(crate) async fn start(config: Config, message_handlers: Vec<Box<dyn Fn(Publi
log::debug!("Failed to publish the message. {:?}", e);
}
}
EngineCommand::Shutdown => {
PeerAction::Shutdown => {
log::info!("Shutting down the engine...");
break;
}
Expand All @@ -163,11 +163,11 @@ pub(crate) async fn start(config: Config, message_handlers: Vec<Box<dyn Fn(Publi
let public_addr: PublicAddress = public_addr.into();
if swarm.behaviour().is_subscribed(&message) {
// Send it to ourselves if we subscribed to this topic
if let Ok(pchain_message) =
if let Ok(message) =
Message::try_from((message, local_public_address))
{
message_handlers.iter().for_each(|handler| {
handler(public_addr, pchain_message.clone())
handler(public_addr, message.clone())
});
}
}
Expand Down Expand Up @@ -217,7 +217,7 @@ async fn build_transport(
}

#[derive(Debug)]
pub enum EngineStartError {
pub enum PeerStartError {
/// Failed to read from system configuration path
SystemConfigError(std::io::Error),

Expand All @@ -228,20 +228,20 @@ pub enum EngineStartError {
UnsupportedAddressError(libp2p::TransportError<std::io::Error>),
}

impl From<std::io::Error> for EngineStartError {
fn from(error: std::io::Error) -> EngineStartError {
EngineStartError::SystemConfigError(error)
impl From<std::io::Error> for PeerStartError {
fn from(error: std::io::Error) -> PeerStartError {
PeerStartError::SystemConfigError(error)
}
}

impl From<libp2p::gossipsub::SubscriptionError> for EngineStartError {
fn from(error: libp2p::gossipsub::SubscriptionError) -> EngineStartError {
EngineStartError::SubscriptionError(error)
impl From<libp2p::gossipsub::SubscriptionError> for PeerStartError {
fn from(error: libp2p::gossipsub::SubscriptionError) -> PeerStartError {
PeerStartError::SubscriptionError(error)
}
}

impl From<libp2p::TransportError<std::io::Error>> for EngineStartError {
fn from(error: libp2p::TransportError<std::io::Error>) -> EngineStartError {
EngineStartError::UnsupportedAddressError(error)
impl From<libp2p::TransportError<std::io::Error>> for PeerStartError {
fn from(error: libp2p::TransportError<std::io::Error>) -> PeerStartError {
PeerStartError::UnsupportedAddressError(error)
}
}
24 changes: 12 additions & 12 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,38 +39,38 @@ use crate::messages::{DroppedTxnMessage, Message, Topic};
pub struct Peer {
/// Network handle for the [tokio::task] which is the main thread for the
/// p2p network (see [crate::engine]).
pub(crate) engine: JoinHandle<()>,
pub(crate) handle: JoinHandle<()>,

/// mpsc sender for delivering messages to the p2p network.
pub(crate) to_engine: tokio::sync::mpsc::Sender<EngineCommand>,
pub(crate) sender: tokio::sync::mpsc::Sender<PeerAction>,
}

impl Peer {
/// Constructs a [Peer] from the given configuration and handlers, and start the thread for the p2p network.
pub async fn start(config: Config, handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>>) -> Peer {
let (engine, to_engine) = crate::engine::start(config, handlers).await.unwrap();
let (handle, sender) = crate::engine::start(config, handlers).await.unwrap();
Peer {
engine,
to_engine,
handle,
sender,
}
}

pub fn broadcast_mempool_msg(&self, txn: TransactionV1) {
let _ = self.to_engine.try_send(EngineCommand::Publish(
let _ = self.sender.try_send(PeerAction::Publish(
Topic::Mempool,
Message::Mempool(txn),
));
}

pub fn broadcast_dropped_tx_msg(&self, msg: DroppedTxnMessage) {
let _ = self.to_engine.try_send(EngineCommand::Publish(
let _ = self.sender.try_send(PeerAction::Publish(
Topic::DroppedTxns,
Message::DroppedTxns(msg),
));
}

pub fn broadcast_hotstuff_rs_msg(&self, msg: hotstuff_rs::messages::Message) {
let _ = self.to_engine.try_send(EngineCommand::Publish(
let _ = self.sender.try_send(PeerAction::Publish(
Topic::HotStuffRsBroadcast,
Message::HotStuffRs(msg),
));
Expand All @@ -81,7 +81,7 @@ impl Peer {
address: PublicAddress,
msg: hotstuff_rs::messages::Message,
) {
let _ = self.to_engine.try_send(EngineCommand::Publish(
let _ = self.sender.try_send(PeerAction::Publish(
Topic::HotStuffRsSend(address),
Message::HotStuffRs(msg),
));
Expand All @@ -90,13 +90,13 @@ impl Peer {

impl Drop for Peer {
fn drop(&mut self) {
let _ = self.to_engine.try_send(EngineCommand::Shutdown);
let _ = self.sender.try_send(PeerAction::Shutdown);
}
}

/// [EngineCommand] defines commands to other pchain-network peers which includes publishing messages
/// [PeerAction] defines commands to other pchain-network peers which includes publishing messages
/// and shutting down the network when the peer is dropped.
pub(crate) enum EngineCommand {
pub(crate) enum PeerAction {
Publish(Topic, Message),
Shutdown,
}
Expand Down
2 changes: 1 addition & 1 deletion tests/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ async fn test_stopped_node() {
vec![]
).await;

// Stop node by EngineCommand::Shutdown
// Stop node by PeerAction::Shutdown
drop(node_2);

let mut sending_limit = 10;
Expand Down