Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::{time::Duration, vec};

pub(crate) const MAX_TRANSMIT_SIZE: usize = 4;
pub(crate) const MEGABYTES: usize = 1048576;
pub(crate) const PROTOCOL_NAME: &str = "/pchain_p2p/1.0.0";
Copy link
Contributor

@manngayin612 manngayin612 Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protocol name is no longer a constant, it is a field in Config. That's on me, I forgot to remove it from constant.rs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both identity config and kademlia config uses protocol name, however, while kademlia accepts multiple names, identity accepts only 1. So in the event we have multiple protocol names, which one do we use for identity?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in our Config, we can change the protocol name into a single String instead of a Vec, just like what we used to do. Only put it into a vector in Kad config.


/// Defines behaviour of a node on pchain_network
/// 1. Add or Remove a peer from DHT (Kademlia)
Expand All @@ -42,19 +43,19 @@ pub(crate) struct Behaviour {
}

impl Behaviour {
pub fn new(id: PublicAddress, local_key: &Keypair, heartbeat_secs: u64, protocol_name: &str) -> Self {
pub fn new(id: PublicAddress, local_key: &Keypair) -> Self {
let local_peer_id: PeerId = conversions::PublicAddress::new(id)
.try_into()
.expect("Invalid PublicAddress.");

// Configure Kademlia
let kad = Self::kad_config(local_peer_id, protocol_name);
let kad = Self::kad_config(local_peer_id);

// Configure Identify
let identify = Self::identify_config(local_key.public(), protocol_name);
let identify = Self::identify_config(local_key.public());

// Configure Gossipsub - subscribe to the topic of its own the base64-encoded public address
let mut gossip = Self::gossipsub_config(local_key, heartbeat_secs);
let mut gossip = Self::gossipsub_config(local_key);
gossip.subscribe(&Topic::HotStuffRsSend(id).into()).unwrap();

// Configure Ping
Expand All @@ -68,8 +69,8 @@ impl Behaviour {
}
}

fn kad_config(peer_id: PeerId, protocol_name: &str) -> Kademlia<MemoryStore> {
let protocol_name = StreamProtocol::try_from_owned(protocol_name.to_string()).unwrap();
fn kad_config(peer_id: PeerId) -> Kademlia<MemoryStore> {
let protocol_name = StreamProtocol::try_from_owned(PROTOCOL_NAME.to_string()).unwrap();
let kad_config = KademliaConfig::default()
.set_protocol_names(vec![protocol_name])
.set_record_filtering(KademliaStoreInserts::FilterBoth)
Expand All @@ -82,12 +83,12 @@ impl Behaviour {
kad
}

fn identify_config(public_key: PublicKey, protocol_ver: &str) -> identify::Behaviour {
let config = identify::Config::new(protocol_ver.to_string(), public_key);
fn identify_config(public_key: PublicKey) -> identify::Behaviour {
let config = identify::Config::new(PROTOCOL_NAME.to_string(), public_key);
identify::Behaviour::new(config)
}

fn gossipsub_config(keypair: &Keypair, heartbeat_secs: u64) -> gossipsub::Behaviour {
fn gossipsub_config(keypair: &Keypair) -> gossipsub::Behaviour {
let build_msg_id = |msg: &gossipsub::Message| {
let mut id_str = msg.topic.to_string();
let src_peer_id = match msg.source {
Expand All @@ -104,7 +105,7 @@ impl Behaviour {
ConfigBuilder::default()
.max_transmit_size(MAX_TRANSMIT_SIZE * MEGABYTES) // block size is limitted to 2 MB. Multiply by factor of safety = 2.
.message_id_fn(build_msg_id)
.heartbeat_interval(Duration::from_secs(heartbeat_secs))
.heartbeat_interval(Duration::from_secs(10))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can take this magic number out and put them with the constant up there. (Not a must, just think it might be cleaner)

.build()
.unwrap(),
)
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ pub(crate) struct Config {
pub kademlia_protocol_names: Vec<String>,
}

fn fullnode_topics(public_address: PublicAddress) -> Vec<Topic> {
pub(crate) fn fullnode_topics(public_address: PublicAddress) -> Vec<Topic> {
vec![Topic::HotStuffRsBroadcast, Topic::HotStuffRsSend(public_address).into(), Topic::Mempool, Topic::DroppedTxns]
}
47 changes: 38 additions & 9 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
//! Upon receiving commands from application, gossipsub message will be delivered to a
//! Gossipsub topic.

use borsh::BorshDeserialize;
use futures::StreamExt;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed},
Expand All @@ -40,12 +41,12 @@ use std::time::Duration;

use crate::{
behaviour::{Behaviour, PeerNetworkEvent},
messages::{Envelope, Topic},
peer::{EngineCommand, PeerBuilder, Peer}, conversions,
config,
conversions,
messages::{DroppedTxMessage, Envelope, Topic},
peer::{EngineCommand, PeerBuilder, Peer},
};

const KADEMLIA_PROTOCOL_NAME: &str = "/pchain_p2p/1.0.0";

/// [start] p2p networking peer and return the handle [NetworkHandle] of this process.
pub(crate) async fn start(
peer: PeerBuilder,
Expand All @@ -69,9 +70,7 @@ pub(crate) async fn start(
let transport = build_transport(local_keypair.clone()).await?;
let behaviour = Behaviour::new(
local_public_address,
&local_keypair,
10,
&config.kademlia_protocol_names, //TODO jonas
&local_keypair
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned previously, protocol name is no longer a constant, its a configurable field in Config, so we might have to pass in the Config instance.

);

let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();
Expand All @@ -88,8 +87,7 @@ pub(crate) async fn start(
}

// 3. Subscribe to Topic
//TODO jonas
swarm.behaviour_mut().subscribe()?;
swarm.behaviour_mut().subscribe(config::fullnode_topics(local_public_address))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think all applications that uses pchain_network will always subscribe to all topics. For example, fullnode doesn't subscribe to droppedTx. So the topics to be subscribed should be based on the Config we passed in.


// 4. Start p2p networking
let (sender, mut receiver) =
Expand Down Expand Up @@ -170,6 +168,37 @@ pub(crate) async fn start(
// TryFrom trait for Vec<u8> to Message, implement a function that takes in the Message Topic to help
// converting Vec<u8> to Message. You can refer to fullnode/mempool messagegate to see how to
// deserialise each Message type.
match message.topic.as_str() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand you couldn't do branches like Topic::Mempool.hash() in match statement. But comparing the topic using string doesn't seem a good idea.

let topic = fullnode_topics(public_addr).into_iter().find(|t| t.hash() == message.topic);

Will this way of identifying the topic work? I haven't verified, you can try.

"consensus" => {
let hotstuff_message =
hotstuff_rs::messages::Message::deserialize(&mut message.data.as_slice())
.map_or(None, |msg| {
Some(msg)
});
},
"mempool" => {
let mempool_message =
pchain_types::blockchain::TransactionV1::deserialize(&mut message.data.as_slice())
.map_or(None, |msg| {
Some(msg)
});
},
"droppedTx" => {
let droppedtx_message =
DroppedTxMessage::deserialize(&mut message.data.as_slice())
.map_or(None, |msg| {
Some(msg)
});
},
// mailbox topics will be different for every address
_ => {
let hotstuff_message =
hotstuff_rs::messages::Message::deserialize(&mut message.data.as_slice())
.map_or(None, |msg| {
Some(msg)
});
}
}
} else {
log::debug!("Receive unknown gossip message");
}
Expand Down