Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use libp2p::{
use pchain_types::cryptography::PublicAddress;
use std::{time::Duration, vec};

pub(crate) const MAX_TRANSMIT_SIZE: usize = 4;
pub(crate) const MEGABYTES: usize = 1048576;
const MAX_TRANSMIT_SIZE: usize = 4;
const MEGABYTES: usize = 1048576;
const HEARTBEAT_INTERVAL: u64 = 10;

/// Defines behaviour of a node on pchain_network
/// 1. Add or Remove a peer from DHT (Kademlia)
Expand All @@ -42,7 +43,7 @@ pub(crate) struct Behaviour {
}

impl Behaviour {
pub fn new(id: PublicAddress, local_key: &Keypair, heartbeat_secs: u64, protocol_name: &str) -> Self {
pub fn new(id: PublicAddress, local_key: &Keypair, protocol_name: String) -> Self {
let local_peer_id: PeerId = conversions::PublicAddress::new(id)
.try_into()
.expect("Invalid PublicAddress.");
Expand All @@ -54,7 +55,7 @@ impl Behaviour {
let identify = Self::identify_config(local_key.public(), protocol_name);

// Configure Gossipsub - subscribe to the topic of its own the base64-encoded public address
let mut gossip = Self::gossipsub_config(local_key, heartbeat_secs);
let mut gossip = Self::gossipsub_config(local_key);
gossip.subscribe(&Topic::HotStuffRsSend(id).into()).unwrap();

// Configure Ping
Expand All @@ -68,8 +69,8 @@ impl Behaviour {
}
}

fn kad_config(peer_id: PeerId, protocol_name: &str) -> Kademlia<MemoryStore> {
let protocol_name = StreamProtocol::try_from_owned(protocol_name.to_string()).unwrap();
fn kad_config(peer_id: PeerId, protocol_name: String) -> Kademlia<MemoryStore> {
let protocol_name = StreamProtocol::try_from_owned(protocol_name).unwrap();
let kad_config = KademliaConfig::default()
.set_protocol_names(vec![protocol_name])
.set_record_filtering(KademliaStoreInserts::FilterBoth)
Expand All @@ -82,12 +83,12 @@ impl Behaviour {
kad
}

fn identify_config(public_key: PublicKey, protocol_ver: &str) -> identify::Behaviour {
let config = identify::Config::new(protocol_ver.to_string(), public_key);
fn identify_config(public_key: PublicKey, protocol_name: String) -> identify::Behaviour {
let config = identify::Config::new(protocol_name, public_key);
identify::Behaviour::new(config)
}

fn gossipsub_config(keypair: &Keypair, heartbeat_secs: u64) -> gossipsub::Behaviour {
fn gossipsub_config(keypair: &Keypair) -> gossipsub::Behaviour {
let build_msg_id = |msg: &gossipsub::Message| {
let mut id_str = msg.topic.to_string();
let src_peer_id = match msg.source {
Expand All @@ -104,7 +105,7 @@ impl Behaviour {
ConfigBuilder::default()
.max_transmit_size(MAX_TRANSMIT_SIZE * MEGABYTES) // block size is limitted to 2 MB. Multiply by factor of safety = 2.
.message_id_fn(build_msg_id)
.heartbeat_interval(Duration::from_secs(heartbeat_secs))
.heartbeat_interval(Duration::from_secs(HEARTBEAT_INTERVAL))
.build()
.unwrap(),
)
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ pub(crate) struct Config {
pub outgoing_msgs_buffer_capacity: usize,
pub incoming_msgs_buffer_capacity: usize,
pub peer_discovery_interval: u64,
pub kademlia_protocol_names: Vec<String>,
pub protocol_name: String,
}

fn fullnode_topics(public_address: PublicAddress) -> Vec<Topic> {
pub(crate) fn fullnode_topics(public_address: PublicAddress) -> Vec<Topic> {
vec![Topic::HotStuffRsBroadcast, Topic::HotStuffRsSend(public_address).into(), Topic::Mempool, Topic::DroppedTxns]
}
60 changes: 50 additions & 10 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
//! Upon receiving commands from application, gossipsub message will be delivered to a
//! Gossipsub topic.

use borsh::BorshDeserialize;
use futures::StreamExt;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed},
Expand All @@ -40,12 +41,13 @@ use std::time::Duration;

use crate::{
behaviour::{Behaviour, PeerNetworkEvent},
messages::{Envelope, Topic},
peer::{EngineCommand, PeerBuilder, Peer}, conversions,
config,
conversions,
messages::{DroppedTxMessage, Envelope, Topic},
messages::Topic::{DroppedTxns, HotStuffRsBroadcast, HotStuffRsSend, Mempool},
peer::{EngineCommand, PeerBuilder, Peer},
};

const KADEMLIA_PROTOCOL_NAME: &str = "/pchain_p2p/1.0.0";

/// [start] p2p networking peer and return the handle [NetworkHandle] of this process.
pub(crate) async fn start(
peer: PeerBuilder,
Expand All @@ -70,8 +72,7 @@ pub(crate) async fn start(
let behaviour = Behaviour::new(
local_public_address,
&local_keypair,
10,
&config.kademlia_protocol_names, //TODO jonas
config.protocol_name
);

let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();
Expand All @@ -88,8 +89,7 @@ pub(crate) async fn start(
}

// 3. Subscribe to Topic
//TODO jonas
swarm.behaviour_mut().subscribe()?;
swarm.behaviour_mut().subscribe(config::fullnode_topics(local_public_address))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think all applications that uses pchain_network will always subscribe to all topics. For example, fullnode doesn't subscribe to droppedTx. So the topics to be subscribed should be based on the Config we passed in.


// 4. Start p2p networking
let (sender, mut receiver) =
Expand Down Expand Up @@ -169,12 +169,52 @@ pub(crate) async fn start(
// So we need to convert Vec<u8> to pchain_network::Message. Instead of implementing
// TryFrom trait for Vec<u8> to Message, implement a function that takes in the Message Topic to help
// converting Vec<u8> to Message. You can refer to fullnode/mempool messagegate to see how to
// deserialise each Message type.
// deserialise each Message type.

let topic = config::fullnode_topics(local_public_address)
.into_iter()
.find(|t| t.clone().hash() == message.topic);

if let Some(topic) = topic {
match topic {
HotStuffRsBroadcast => {
let hotstuff_message =
hotstuff_rs::messages::Message::deserialize(&mut message.data.as_slice())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason of getting the de-serialized result of type Result<> and wrap them with Optional and check if it is Some() or None later? Can't we do something directly when the deserialisation gives out Error?

Also, you gotta return the deserialised message out after this match statement. You should be about to return pchain_network::message::Message.

You might consider getting this part out into a separate helper function to make it look cleaner.

.map_or(None, |msg| {
Some(msg)
});
},
Mempool => {
let mempool_message =
pchain_types::blockchain::TransactionV1::deserialize(&mut message.data.as_slice())
.map_or(None, |msg| {
Some(msg)
});
},
DroppedTxns => {
let droppedtx_message =
DroppedTxMessage::deserialize(&mut message.data.as_slice())
.map_or(None, |msg| {
Some(msg)
});
},
HotStuffRsSend(local_public_address) => {
let hotstuff_message =
hotstuff_rs::messages::Message::deserialize(&mut message.data.as_slice())
.map_or(None, |msg| {
Some(msg)
});
}
}

} else {
log::debug!("Invalid message topic");
}
} else {
log::debug!("Receive unknown gossip message");
}
} else {
log::debug!("Received message from invalid PeerId.")
log::debug!("Received message from invalid PeerId.");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use pchain_types::{blockchain::TransactionV1, cryptography::{Sha256Hash, PublicA
/// Hash of the message topic.
pub type MessageTopicHash = libp2p::gossipsub::TopicHash;

#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub enum Topic {
HotStuffRsBroadcast,
HotStuffRsSend(PublicAddress),
Expand Down