Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
990251c
mailbox feature
ong-jonas Oct 19, 2023
2396d8d
merge refactor into feature
ong-jonas Oct 27, 2023
8e73502
message ID no longer overridden
ong-jonas Oct 27, 2023
28d9103
removed engine.rs
ong-jonas Oct 27, 2023
d55849b
renamed SMR topics to hotstuff_rs
ong-jonas Oct 27, 2023
7da376a
added mailbox subscription feature
ong-jonas Oct 27, 2023
043800a
update network tests
ong-jonas Oct 27, 2023
dd322db
removed droppedtx message type
ong-jonas Oct 30, 2023
cde45ef
added different message handlers and updated hotstuff dependency to 0.3
ong-jonas Oct 30, 2023
0f945a7
updated config keypair to pchain type
ong-jonas Oct 31, 2023
0879474
added helper function in network tests
ong-jonas Oct 31, 2023
17673c0
mempool messages accepts V1 or V2
ong-jonas Nov 1, 2023
4a7c7aa
Update lib.rs
Nov 1, 2023
ef789f2
Fix: serialize TransactionV1 and TransactionV2 separately in messages.rs
Nov 1, 2023
725d99b
Update libp2p deprecated package
Nov 1, 2023
30d2dd4
update message serialization and handler channels
ong-jonas Nov 8, 2023
994834c
update message handlers to FnMut
Nov 9, 2023
f01c511
FIX: removed deprecated dependencies
ong-jonas Nov 16, 2023
54ffb64
REFACTOR: removed with_dns, added nodelay(true)
ong-jonas Nov 16, 2023
29ea7c7
REFACTOR: try_from to filter_gossipsub_messages, Vec<msg_handlers> to…
ong-jonas Nov 21, 2023
6b3543d
deps: update dependency to hotstuff_rs
ong-jonas Nov 30, 2023
5a7c61e
refactor: update pchain-types dependency version
ong-jonas Dec 1, 2023
8556c0f
Merge branch 'v0.5' into mailbox/feature
ong-jonas Dec 5, 2023
5b379a9
REFACTOR: documentation update to reflect single message handler
ong-jonas Dec 5, 2023
f66e4c2
refactor: moved is_close_peer function to peer.rs and updated PeerSta…
ong-jonas Dec 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pchain_network"
version = "0.5.0"
version = "0.6.0"
authors = ["ParallelChain Lab <[email protected]>"]
edition = "2021"
description = "parallelchain-network: Libp2p based Network Layer implementation for ParallelChain Mainnet."
Expand Down
28 changes: 12 additions & 16 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Behaviour {
}

fn kad_config(peer_id: PeerId, protocol_name: String) -> kad::Behaviour<MemoryStore> {
let protocol_name = StreamProtocol::try_from_owned(protocol_name).unwrap();
let protocol_name: StreamProtocol = StreamProtocol::try_from_owned(protocol_name).unwrap();
let kad_config = kad::Config::default()
.set_protocol_names(vec![protocol_name])
.set_record_filtering(kad::StoreInserts::FilterBoth)
Expand All @@ -96,24 +96,11 @@ impl Behaviour {
}

fn gossipsub_config(keypair: &Keypair) -> gossipsub::Behaviour {
let build_msg_id = |msg: &gossipsub::Message| {
let mut id_str = msg.topic.to_string();
let src_peer_id = match msg.source {
Some(src) => base64url::encode(src.to_bytes()),
None => "none".to_string(),
};
id_str.push_str(&src_peer_id);
id_str.push_str(&msg.sequence_number.unwrap_or_default().to_string());
MessageId::from(id_str)
};

let gossip = gossipsub::Behaviour::new(
MessageAuthenticity::Signed(keypair.clone().into()),
ConfigBuilder::default()
.max_transmit_size(MAX_TRANSMIT_SIZE * MEGABYTES) // block size is limitted to 2 MB. Multiply by factor of safety = 2.
.message_id_fn(build_msg_id)
.heartbeat_interval(Duration::from_secs(HEARTBEAT_INTERVAL))
.allow_self_origin(true)
.build()
.unwrap(),
)
Expand Down Expand Up @@ -146,6 +133,15 @@ impl Behaviour {
Ok(())
}

/// Unsubscribe from [Topic]
pub fn unsubscribe(&mut self, topics: Vec<Topic>) -> Result<(), gossipsub::PublishError> {
for topic in topics {
self.gossip.unsubscribe(&topic.into())?;
}

Ok(())
}

/// Publish a [Message] to peers subscribed to the [Topic]
pub fn publish(&mut self, topic: Topic, msg: Message) -> Result<MessageId, PublishError> {
self.gossip.publish(topic.hash(), msg)
Expand Down Expand Up @@ -227,7 +223,7 @@ mod test {
peer_discovery_interval: 10,
kademlia_protocol_name: String::from("/test"),
};

let public_address = config.keypair.verifying_key().to_bytes();

let behaviour = Behaviour::new(
Expand Down Expand Up @@ -309,7 +305,7 @@ mod test {
source: None,
data: vec![],
sequence_number: None,
topic: Topic::DroppedTxns.hash(),
topic: Topic::Mempool.hash(),
};

let subscribed_topics: Vec<&MessageTopicHash> = peer.behaviour.gossip.topics().collect();
Expand Down
5 changes: 2 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//!
//! ```no_run
//! let config = Config {
//! keypair, //pchain_types::cryptography::Keypair,
//! keypair: pchain_types::cryptography::Keypair,
//! topics_to_subscribe: vec![Topic::HotStuffRsBroadcast],
//! listening_port: 25519,
//! boot_nodes: vec![],
Expand Down Expand Up @@ -53,6 +53,5 @@ pub(crate) fn fullnode_topics(public_address: PublicAddress) -> Vec<Topic> {
Topic::HotStuffRsBroadcast,
Topic::HotStuffRsSend(public_address),
Topic::Mempool,
Topic::DroppedTxns,
]
}
}
58 changes: 25 additions & 33 deletions src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,29 @@
//! - TryFrom<[PublicAddress]> for [PeerId]
//! - filter_gossipsub_messages([libp2p::gossipsub::Message], [pchain_types::cryptography::PublicAddress]) to [Message]

use libp2p::identity::{self, DecodingError, OtherVariantError, PeerId,
ed25519::PublicKey
};
use libp2p::identity::{self, ed25519, DecodingError, OtherVariantError, PeerId};
use libp2p::Multiaddr;
use std::net::Ipv4Addr;
use borsh::BorshDeserialize;
use pchain_types::{cryptography, rpc};

use crate::messages::{
DroppedTxnMessage,
Message,
Topic::{HotStuffRsBroadcast,HotStuffRsSend,Mempool,DroppedTxns}
Topic::{HotStuffRsBroadcast,HotStuffRsSend,Mempool}
};
use crate::config::fullnode_topics;


/// PublicAddress(PublicAddress) is wrapper around [PublicAddress](pchain_types::cryptography::PublicAddress).
pub struct PublicAddress(pchain_types::cryptography::PublicAddress);
/// [PublicAddress(PublicAddress)] is wrapper around [PublicAddress](cryptography::PublicAddress).
pub struct PublicAddress(cryptography::PublicAddress);

impl PublicAddress {
pub fn new(addr: pchain_types::cryptography::PublicAddress) -> Self {
pub fn new(addr: cryptography::PublicAddress) -> Self {
PublicAddress(addr)
}
}

impl From<PublicAddress> for pchain_types::cryptography::PublicAddress {
fn from(peer: PublicAddress) -> pchain_types::cryptography::PublicAddress {
impl From<PublicAddress> for cryptography::PublicAddress {
fn from(peer: PublicAddress) -> cryptography::PublicAddress {
peer.0
}
}
Expand All @@ -55,7 +52,7 @@ impl TryFrom<PublicAddress> for PeerId {
type Error = DecodingError;

fn try_from(public_addr: PublicAddress) -> Result<Self, Self::Error> {
let kp = PublicKey::try_from_bytes(&public_addr.0)?;
let kp = ed25519::PublicKey::try_from_bytes(&public_addr.0)?;
let public_key: identity::PublicKey = kp.into();
Ok(public_key.to_peer_id())
}
Expand All @@ -81,30 +78,25 @@ impl From<DecodingError> for PublicAddressTryFromPeerIdError {

/// converts [Message](libp2p::gossipsub::Message) to [Message](Message) while
/// filtering for message topics that can be forwarded to fullnode
pub fn filter_gossipsub_messages(message: libp2p::gossipsub::Message, local_public_address: pchain_types::cryptography::PublicAddress)
-> Result<Message, MessageConversionError> {
pub fn filter_gossipsub_messages(message: libp2p::gossipsub::Message, local_public_address: cryptography::PublicAddress) -> Result<Message, MessageConversionError> {
let (topic_hash, data) = (message.topic, message.data);
let mut data = data.as_slice();

let topic = fullnode_topics(local_public_address)
.into_iter()
.find(|t| t.clone().hash() == topic_hash)
.ok_or(InvalidTopicError)?;

match topic {
HotStuffRsBroadcast | HotStuffRsSend(_) => {
let message = hotstuff_rs::messages::Message::deserialize(&mut data).map(Message::HotStuffRs)?;
Ok(message)
},
Mempool => {
let message = pchain_types::blockchain::TransactionV1::deserialize(&mut data).map(Message::Mempool)?;
Ok(message)
},
DroppedTxns => {
let message = DroppedTxnMessage::deserialize(&mut data).map(Message::DroppedTxns)?;
Ok(message)

let topic = fullnode_topics(local_public_address)
.into_iter()
.find(|t| t.clone().hash() == topic_hash)
.ok_or(InvalidTopicError)?;

match topic {
HotStuffRsBroadcast | HotStuffRsSend(_) => {
let message = hotstuff_rs::messages::Message::deserialize(&mut data).map(Message::HotStuffRs)?;
Ok(message)
},
Mempool => {
let message = rpc::TransactionV1OrV2::deserialize(&mut data).map(Message::Mempool)?;
Ok(message)
},
}
}
}

#[derive(Debug)]
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
//! kademlia_protocol_name // "/pchain_p2p/1.0.0"
//! };
//!
//! // 2. Create message handlers
//! let (tx, rx) = tokio::sync::mpsc::channel();
//! // 2. Create message handler
//! let (tx, rx) = tokio::sync::mpsc::channel(100);
//! let message_sender = tx.clone();
//! let message_handler = move |msg_origin: [u8;32], msg: Message| {
//! match msg {
Expand All @@ -37,7 +37,7 @@
//! _ => {}
//! }
//! };
//!
//!
//! // 3. Start P2P network.
//! let peer = Peer::start(config, Box::new(message_handler))
//! .await
Expand Down
78 changes: 12 additions & 66 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
use borsh::{BorshSerialize, BorshDeserialize};
use libp2p::gossipsub::IdentTopic;
use pchain_types::{
blockchain::TransactionV1,
cryptography::{PublicAddress, Sha256Hash},
serialization::Serializable,
cryptography::PublicAddress,
rpc::TransactionV1OrV2,
};

/// Hash of the message topic.
Expand All @@ -29,7 +28,6 @@ pub enum Topic {
HotStuffRsBroadcast,
HotStuffRsSend(PublicAddress),
Mempool,
DroppedTxns,
}

impl Topic {
Expand All @@ -41,10 +39,9 @@ impl Topic {
impl From<Topic> for IdentTopic {
fn from(topic: Topic) -> Self {
let str = match topic {
Topic::HotStuffRsBroadcast => "consensus".to_string(),
Topic::HotStuffRsSend(addr) => base64url::encode(addr),
Topic::HotStuffRsBroadcast => "hotstuff_rs".to_string(),
Topic::HotStuffRsSend(addr) => String::from("hotstuff_rs/") + &base64url::encode(addr),
Topic::Mempool => "mempool".to_string(),
Topic::DroppedTxns => "droppedTx".to_string(),
};
IdentTopic::new(str)
}
Expand All @@ -55,69 +52,22 @@ impl From<Topic> for IdentTopic {
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub enum Message {
HotStuffRs(hotstuff_rs::messages::Message),
Mempool(TransactionV1),
DroppedTxns(DroppedTxnMessage),
Mempool(TransactionV1OrV2),
}

impl From<Message> for Vec<u8> {
fn from(msg: Message) -> Self {
match msg {
Message::HotStuffRs(msg) => msg.try_to_vec().unwrap(),
Message::Mempool(txn) => Serializable::serialize(&txn),
Message::DroppedTxns(msg) => msg.try_to_vec().unwrap(),
Message::Mempool(txn) => {
let mut data: Vec<u8> = Vec::new();
TransactionV1OrV2::serialize(&txn, &mut data).unwrap();
data
}
}
}
}

/// [DroppedTxnMessage] defines message content for [Message::DroppedTxns].
#[derive(Clone, borsh::BorshSerialize, borsh::BorshDeserialize)]
pub enum DroppedTxnMessage {
MempoolDroppedTx {
txn: TransactionV1,
status_code: DroppedTxnStatusCode,
},
ExecutorDroppedTx {
tx_hash: Sha256Hash,
status_code: DroppedTxnStatusCode,
},
}

#[derive(Clone)]
pub enum DroppedTxnStatusCode {
Invalid,
NonceTooLow,
NonceInaccessible,
}

impl From<&DroppedTxnStatusCode> for u16 {
fn from(status_code: &DroppedTxnStatusCode) -> Self {
match status_code {
DroppedTxnStatusCode::Invalid => 0x515_u16,
DroppedTxnStatusCode::NonceTooLow => 0x516_u16,
DroppedTxnStatusCode::NonceInaccessible => 0x517_u16,
}
}
}

impl borsh::BorshSerialize for DroppedTxnStatusCode {
fn serialize<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
let status_code: u16 = self.into();
BorshSerialize::serialize(&status_code, writer)
}
}

impl borsh::BorshDeserialize for DroppedTxnStatusCode {
fn deserialize_reader<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
let status_code = match u16::deserialize_reader(reader) {
Ok(0x515_u16) => DroppedTxnStatusCode::Invalid,
Ok(0x516_u16) => DroppedTxnStatusCode::NonceTooLow,
Ok(0x517_u16) => DroppedTxnStatusCode::NonceInaccessible,
_ => panic!("Invalid droppedTx status code."),
};
Ok(status_code)
}
}

#[cfg(test)]

mod test {
Expand All @@ -128,19 +78,15 @@ mod test {
#[test]
fn test_message_topic() {
let hotstuff_broadcast_topic = Topic::HotStuffRsBroadcast;
let ident_topic = IdentTopic::new("consensus".to_string());
let ident_topic = IdentTopic::new("hotstuff_rs".to_string());
assert_eq!(hotstuff_broadcast_topic.hash(), ident_topic.hash());

let hotstuff_send_topic = Topic::HotStuffRsSend([1u8; 32]);
let ident_topic = IdentTopic::new(base64url::encode([1u8; 32]));
let ident_topic = IdentTopic::new(String::from("hotstuff_rs/") + &base64url::encode([1u8; 32]));
assert_eq!(hotstuff_send_topic.hash(), ident_topic.hash());

let mempool_topic = Topic::Mempool;
let ident_topic = IdentTopic::new("mempool".to_string());
assert_eq!(mempool_topic.hash(), ident_topic.hash());

let droppedtxn_topic = Topic::DroppedTxns;
let ident_topic = IdentTopic::new("droppedTx".to_string());
assert_eq!(droppedtxn_topic.hash(), ident_topic.hash());
}
}
Loading