Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
990251c
mailbox feature
ong-jonas Oct 19, 2023
2396d8d
merge refactor into feature
ong-jonas Oct 27, 2023
8e73502
message ID no longer overridden
ong-jonas Oct 27, 2023
28d9103
removed engine.rs
ong-jonas Oct 27, 2023
d55849b
renamed SMR topics to hotstuff_rs
ong-jonas Oct 27, 2023
7da376a
added mailbox subscription feature
ong-jonas Oct 27, 2023
043800a
update network tests
ong-jonas Oct 27, 2023
dd322db
removed droppedtx message type
ong-jonas Oct 30, 2023
cde45ef
added different message handlers and updated hotstuff dependency to 0.3
ong-jonas Oct 30, 2023
0f945a7
updated config keypair to pchain type
ong-jonas Oct 31, 2023
0879474
added helper function in network tests
ong-jonas Oct 31, 2023
17673c0
mempool messages accepts V1 or V2
ong-jonas Nov 1, 2023
4a7c7aa
Update lib.rs
Nov 1, 2023
ef789f2
Fix: serialize TransactionV1 and TransactionV2 separately in messages.rs
Nov 1, 2023
725d99b
Update libp2p deprecated package
Nov 1, 2023
30d2dd4
update message serialization and handler channels
ong-jonas Nov 8, 2023
994834c
update message handlers to FnMut
Nov 9, 2023
f01c511
FIX: removed deprecated dependencies
ong-jonas Nov 16, 2023
54ffb64
REFACTOR: removed with_dns, added nodelay(true)
ong-jonas Nov 16, 2023
29ea7c7
REFACTOR: try_from to filter_gossipsub_messages, Vec<msg_handlers> to…
ong-jonas Nov 21, 2023
6b3543d
deps: update dependency to hotstuff_rs
ong-jonas Nov 30, 2023
5a7c61e
refactor: update pchain-types dependency version
ong-jonas Dec 1, 2023
8556c0f
Merge branch 'v0.5' into mailbox/feature
ong-jonas Dec 5, 2023
5b379a9
REFACTOR: documentation update to reflect single message handler
ong-jonas Dec 5, 2023
f66e4c2
refactor: moved is_close_peer function to peer.rs and updated PeerSta…
ong-jonas Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
REFACTOR: try_from to filter_gossipsub_messages, Vec<msg_handlers> to…
… msg_handler
  • Loading branch information
ong-jonas committed Nov 21, 2023
commit 29ea7c73f83cf796e447868f59d74b0e3ef9a4cd
15 changes: 6 additions & 9 deletions src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! - From<[PublicAddress]> for [ParallelChain PublicAddress](pchain_types::cryptography::PublicAddress)
//! - TryFrom<PeerId> for [PublicAddress]
//! - TryFrom<[PublicAddress]> for [PeerId]
//! - TryFrom<([libp2p::gossipsub::Message], [pchain_types::cryptography::PublicAddress])> for [Message]
//! - filter_gossipsub_messages([libp2p::gossipsub::Message], [pchain_types::cryptography::PublicAddress]) to [Message]

use libp2p::identity::{self, ed25519, DecodingError, OtherVariantError, PeerId};
use libp2p::{Multiaddr, kad::KBucketKey};
Expand Down Expand Up @@ -77,13 +77,11 @@ impl From<DecodingError> for PublicAddressTryFromPeerIdError {
}
}

impl TryFrom<(libp2p::gossipsub::Message, cryptography::PublicAddress)> for Message {
type Error = MessageConversionError;

fn try_from((message , local_public_address): (libp2p::gossipsub::Message, cryptography::PublicAddress))
-> Result<Self, Self::Error> {
let (topic_hash, data) = (message.topic, message.data);
let mut data = data.as_slice();
/// converts [Message](libp2p::gossipsub::Message) to [Message](Message) while
/// filtering for message topics that can be forwarded to fullnode
pub fn filter_gossipsub_messages(message: libp2p::gossipsub::Message, local_public_address: cryptography::PublicAddress) -> Result<Message, MessageConversionError> {
let (topic_hash, data) = (message.topic, message.data);
let mut data = data.as_slice();

let topic = fullnode_topics(local_public_address)
.into_iter()
Expand All @@ -100,7 +98,6 @@ impl TryFrom<(libp2p::gossipsub::Message, cryptography::PublicAddress)> for Mess
Ok(message)
},
}
}
}

#[derive(Debug)]
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@
//! };
//!
//! // 2. Create message handlers
//! let (tx, rx) = mpsc::channel();
//! let (tx, rx) = tokio::sync::mpsc::channel(100);
//! let hotstuff_sender = tx.clone();
//! let hotstuff_handler = move |msg_orgin: [u8;32], msg: Message| {
//! match msg {
//! Message::HotStuffRs(hotstuff_message) => {
//! //process hotstuff message
//! let _ = hotstuff_sender.send((msg_origin, Message::HotStuffRs(hotstuff_message)));
//! let _ = hotstuff_sender.try_send((msg_origin, Message::HotStuffRs(hotstuff_message)));
//! }
//! _ => {}
//! }
//! };
//! let mut message_handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>> = vec![];
//! let mut message_handlers: Vec<Box<dyn FnMut(PublicAddress, Message) + Send>> = vec![];
//! message_handlers.push(Box::new(hotstuff_handler));
//!
//! // 3. Start P2P network.
Expand All @@ -47,7 +47,7 @@
//!
//! // 4. Send out messages.
//! peer.broadcast_mempool_msg(txn);
//!
//! ```
//!


Expand Down
37 changes: 19 additions & 18 deletions src/peer.rs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at PeerStartError::SystemConfigError. Its itemdoc says that the variant corresponds to the case where we "Failed to read from system configuration path". However, I don't see anywhere in the code where we read a file at a path.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated it with the correct error variant.

Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,29 @@
//! Starting Peer will return a Sender for delivering PeerCommand to the thread.
//!
//! Example:
//!
//! // 1. Define the configurations
//!
//! 1. Define the configurations
//! ```
//! let config = Config {...}
//!
//! // 2. Define the message handlers
//! ```
//! 2. Define the message handlers
//! ```
//! let(tx,rx) = mpsc::channel();
//! let message_sender = tx.clone();
//! let message_handler = move |msg_origin: [u8;32], msg: Message| {
//! let _ = message_sender.send((msg_origin, msg));
//! };
//! let mut message_handlers: Vec<Box<dyn Fn(PublicAddress, Message) + Send>> = vec![];
//! message_handlers.push(Box::new(message_handler));
//!
//! // 3. Start the peer
//! ```
//! 3. Start the peer
//! ```
//! let peer = Peer::start(config, message_handlers).await.unwrap();
//!
//! // 4. Send PeerCommand
//! ```
//! 4. Send PeerCommand
//! ```
//! peer.broadcast_mempool_msg(txn);
//!
//! ```

use futures::StreamExt;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -72,7 +76,7 @@ impl Peer {
/// 3. Establishes connection to the network by adding bootnodes and subscribing to message [Topic].
/// 4. Spawns an asynchronous [tokio] task and enters the event handling loop, returning a Sender used for sending
/// [PeerCommand] to the internal thread.
pub async fn start(config: Config, handlers: Vec<Box<dyn FnMut(PublicAddress, Message) + Send>>) -> Result<Peer, PeerStartError> {
pub async fn start(config: Config, handler: Box<dyn FnMut(PublicAddress, Message) + Send>) -> Result<Peer, PeerStartError> {

let mut swarm = set_up_transport(&config)
.await
Expand All @@ -87,7 +91,7 @@ impl Peer {
swarm = establish_network_connections(swarm, &config)
.map_err(PeerStartError::SubscriptionError)?;

let (handle, sender) = start_event_handling(swarm, &config, handlers);
let (handle, sender) = start_event_handling(swarm, &config, handler);
Ok(
Peer {
handle,
Expand Down Expand Up @@ -215,7 +219,7 @@ fn establish_network_connections(mut swarm: libp2p::Swarm<Behaviour> , config: &
/// Upon receiving a (Shutdown)[PeerCommand::Shutdown] command, the process will exit the loop and terminate
/// the thread.
///
fn start_event_handling(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, mut message_handlers: Vec<Box<dyn FnMut(PublicAddress, Message) + Send>>) ->
fn start_event_handling(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, mut message_handler: Box<dyn FnMut(PublicAddress, Message) + Send>) ->
(JoinHandle<()>,tokio::sync::mpsc::Sender<PeerCommand>) {
// 4. Start p2p networking
let local_keypair = &config.keypair;
Expand Down Expand Up @@ -259,8 +263,7 @@ fn start_event_handling(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, mu
log::info!("Publishing (Topic: {:?})", topic);
if swarm.behaviour().is_subscribed(&topic.clone().hash()) {
// Send it to ourselves if we subscribed to this topic
message_handlers.iter_mut()
.for_each(|handler| handler(local_public_address, message.clone()));
message_handler(local_public_address, message.clone());
}
if let Err(e) = swarm.behaviour_mut().publish(topic, message) {
log::debug!("Failed to publish the message. {:?}", e);
Expand Down Expand Up @@ -288,11 +291,9 @@ fn start_event_handling(mut swarm: libp2p::Swarm<Behaviour>, config: &Config, mu
if swarm.behaviour().is_subscribed(&message.topic) {
// Send it to ourselves if we subscribed to this topic
if let Ok(message) =
Message::try_from((message, local_public_address))
conversions::filter_gossipsub_messages(message, local_public_address)
{
message_handlers.iter_mut().for_each(|handler| {
handler(public_addr, message.clone())
});
message_handler(public_addr, message.clone())
}
}
}
Expand Down
20 changes: 4 additions & 16 deletions tests/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,33 +486,21 @@ pub async fn node(

let(tx,rx) = mpsc::channel(100);

let hotstuff_sender = tx.clone();
let hotstuff_handler = move |msg_origin: [u8;32], msg: Message| {
let message_sender = tx.clone();
let message_handler = move |msg_origin: [u8;32], msg: Message| {
match msg {
Message::HotStuffRs(hotstuff_message) => {
// process hotstuff message
let _ = hotstuff_sender.try_send((msg_origin, Message::HotStuffRs(hotstuff_message)));
let _ = message_sender.try_send((msg_origin, Message::HotStuffRs(hotstuff_message)));
}
_ => {}
}
};

let message_sender = tx.clone();
let mempool_handler = move |msg_origin: [u8;32], msg: Message| {
match msg {
Message::Mempool(mempool_message) => {
// process mempool message
let _ = message_sender.try_send((msg_origin, Message::Mempool(mempool_message)));
}
_ => {}
}
};

let mut message_handlers: Vec<Box<dyn FnMut(PublicAddress, Message) + Send>> = vec![];
message_handlers.push(Box::new(hotstuff_handler));
message_handlers.push(Box::new(mempool_handler));

let peer = Peer::start(config, message_handlers).await.unwrap();
let peer = Peer::start(config, Box::new(message_handler)).await.unwrap();

(peer, rx)
}