Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use libp2p::{
use pchain_types::cryptography::PublicAddress;
use std::{time::Duration, vec};

pub(crate) const MAX_TRANSMIT_SIZE: usize = 4;
pub(crate) const MEGABYTES: usize = 1048576;
const MAX_TRANSMIT_SIZE: usize = 4;
const MEGABYTES: usize = 1048576;
const HEARTBEAT_INTERVAL: u64 = 10;

/// Defines behaviour of a node on pchain_network
/// 1. Add or Remove a peer from DHT (Kademlia)
Expand All @@ -42,7 +43,7 @@ pub(crate) struct Behaviour {
}

impl Behaviour {
pub fn new(id: PublicAddress, local_key: &Keypair, heartbeat_secs: u64, protocol_name: &str) -> Self {
pub fn new(id: PublicAddress, local_key: &Keypair, protocol_name: String) -> Self {
let local_peer_id: PeerId = conversions::PublicAddress::new(id)
.try_into()
.expect("Invalid PublicAddress.");
Expand All @@ -54,7 +55,7 @@ impl Behaviour {
let identify = Self::identify_config(local_key.public(), protocol_name);

// Configure Gossipsub - subscribe to the topic of its own the base64-encoded public address
let mut gossip = Self::gossipsub_config(local_key, heartbeat_secs);
let mut gossip = Self::gossipsub_config(local_key);
gossip.subscribe(&Topic::HotStuffRsSend(id).into()).unwrap();

// Configure Ping
Expand All @@ -68,8 +69,8 @@ impl Behaviour {
}
}

fn kad_config(peer_id: PeerId, protocol_name: &str) -> Kademlia<MemoryStore> {
let protocol_name = StreamProtocol::try_from_owned(protocol_name.to_string()).unwrap();
fn kad_config(peer_id: PeerId, protocol_name: String) -> Kademlia<MemoryStore> {
let protocol_name = StreamProtocol::try_from_owned(protocol_name).unwrap();
let kad_config = KademliaConfig::default()
.set_protocol_names(vec![protocol_name])
.set_record_filtering(KademliaStoreInserts::FilterBoth)
Expand All @@ -82,12 +83,12 @@ impl Behaviour {
kad
}

fn identify_config(public_key: PublicKey, protocol_ver: &str) -> identify::Behaviour {
let config = identify::Config::new(protocol_ver.to_string(), public_key);
fn identify_config(public_key: PublicKey, protocol_name: String) -> identify::Behaviour {
let config = identify::Config::new(protocol_name, public_key);
identify::Behaviour::new(config)
}

fn gossipsub_config(keypair: &Keypair, heartbeat_secs: u64) -> gossipsub::Behaviour {
fn gossipsub_config(keypair: &Keypair) -> gossipsub::Behaviour {
let build_msg_id = |msg: &gossipsub::Message| {
let mut id_str = msg.topic.to_string();
let src_peer_id = match msg.source {
Expand All @@ -104,7 +105,7 @@ impl Behaviour {
ConfigBuilder::default()
.max_transmit_size(MAX_TRANSMIT_SIZE * MEGABYTES) // block size is limitted to 2 MB. Multiply by factor of safety = 2.
.message_id_fn(build_msg_id)
.heartbeat_interval(Duration::from_secs(heartbeat_secs))
.heartbeat_interval(Duration::from_secs(HEARTBEAT_INTERVAL))
.build()
.unwrap(),
)
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ pub(crate) struct Config {
pub outgoing_msgs_buffer_capacity: usize,
pub incoming_msgs_buffer_capacity: usize,
pub peer_discovery_interval: u64,
pub kademlia_protocol_names: Vec<String>,
pub protocol_name: String,
}

fn fullnode_topics(public_address: PublicAddress) -> Vec<Topic> {
pub(crate) fn fullnode_topics(public_address: PublicAddress) -> Vec<Topic> {
vec![Topic::HotStuffRsBroadcast, Topic::HotStuffRsSend(public_address).into(), Topic::Mempool, Topic::DroppedTxns]
}
46 changes: 36 additions & 10 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
//! Upon receiving commands from application, gossipsub message will be delivered to a
//! Gossipsub topic.

use borsh::BorshDeserialize;
use futures::StreamExt;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed},
Expand All @@ -40,12 +41,13 @@ use std::time::Duration;

use crate::{
behaviour::{Behaviour, PeerNetworkEvent},
messages::{Envelope, Topic},
peer::{EngineCommand, PeerBuilder, Peer}, conversions,
config,
conversions,
messages::{DroppedTxMessage, Envelope, Topic, Message},
messages::Topic::{DroppedTxns, HotStuffRsBroadcast, HotStuffRsSend, Mempool},
peer::{EngineCommand, PeerBuilder, Peer},
};

const KADEMLIA_PROTOCOL_NAME: &str = "/pchain_p2p/1.0.0";

/// [start] p2p networking peer and return the handle [NetworkHandle] of this process.
pub(crate) async fn start(
peer: PeerBuilder,
Expand All @@ -70,8 +72,7 @@ pub(crate) async fn start(
let behaviour = Behaviour::new(
local_public_address,
&local_keypair,
10,
&config.kademlia_protocol_names, //TODO jonas
config.protocol_name
);

let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();
Expand All @@ -88,8 +89,7 @@ pub(crate) async fn start(
}

// 3. Subscribe to Topic
//TODO jonas
swarm.behaviour_mut().subscribe()?;
swarm.behaviour_mut().subscribe(config.topics_to_subscribe.clone())?;

// 4. Start p2p networking
let (sender, mut receiver) =
Expand Down Expand Up @@ -169,12 +169,38 @@ pub(crate) async fn start(
// So we need to convert Vec<u8> to pchain_network::Message. Instead of implementing
// TryFrom trait for Vec<u8> to Message, implement a function that takes in the Message Topic to help
// converting Vec<u8> to Message. You can refer to fullnode/mempool messagegate to see how to
// deserialise each Message type.
// deserialise each Message type.
let topic = config::fullnode_topics(local_public_address)
.into_iter()
.find(|t| t.clone().hash() == message.topic);


if let Some(topic) = topic {
let pchain_message = match topic {
HotStuffRsBroadcast => {
hotstuff_rs::messages::Message::deserialize(&mut message.data.as_slice())
.map(|hotstuff_message| Message::Consensus(hotstuff_message))
},
Mempool => {
pchain_types::blockchain::TransactionV1::deserialize(&mut message.data.as_slice())
.map(|mempool_message| Message::Mempool(mempool_message))
},
DroppedTxns => {
DroppedTxMessage::deserialize(&mut message.data.as_slice())
.map(|droppedtx_message| Message::DroppedTx(droppedtx_message))
},
HotStuffRsSend(address) => {
hotstuff_rs::messages::Message::deserialize(&mut message.data.as_slice())
.map(|hotstuff_message| Message::Consensus(hotstuff_message))
}
};
}

} else {
log::debug!("Receive unknown gossip message");
}
} else {
log::debug!("Received message from invalid PeerId.")
log::debug!("Received message from invalid PeerId.");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use pchain_types::{blockchain::TransactionV1, cryptography::{Sha256Hash, PublicA
/// Hash of the message topic.
pub type MessageTopicHash = libp2p::gossipsub::TopicHash;

#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub enum Topic {
HotStuffRsBroadcast,
HotStuffRsSend(PublicAddress),
Expand Down