Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::{time::Duration, vec};

const MAX_TRANSMIT_SIZE: usize = 4;
const MEGABYTES: usize = 1048576;
const HEARTBEAT_INTERVAL: u64 = 10;

/// Defines behaviour of a node on pchain_network
/// 1. Add or Remove a peer from DHT (Kademlia)
Expand All @@ -43,7 +44,7 @@ pub(crate) struct Behaviour {
}

impl Behaviour {
pub fn new(id: PublicAddress, local_key: &Keypair, heartbeat_secs: u64, protocol_name: &str) -> Self {
pub fn new(id: PublicAddress, local_key: &Keypair, protocol_name: String) -> Self {
let local_peer_id: PeerId = conversions::PublicAddress::new(id)
.try_into()
.expect("Invalid PublicAddress.");
Expand All @@ -55,7 +56,7 @@ impl Behaviour {
let identify = Self::identify_config(local_key.public(), protocol_name);

// Configure Gossipsub - subscribe to the topic of its own the base64-encoded public address
let mut gossip = Self::gossipsub_config(local_key, heartbeat_secs);
let mut gossip = Self::gossipsub_config(local_key);
gossip.subscribe(&Topic::HotStuffRsSend(id).into()).unwrap();

// Configure Ping
Expand All @@ -69,8 +70,8 @@ impl Behaviour {
}
}

fn kad_config(peer_id: PeerId, protocol_name: &str) -> Kademlia<MemoryStore> {
let protocol_name = StreamProtocol::try_from_owned(protocol_name.to_string()).unwrap();
fn kad_config(peer_id: PeerId, protocol_name: String) -> Kademlia<MemoryStore> {
let protocol_name = StreamProtocol::try_from_owned(protocol_name).unwrap();
let kad_config = KademliaConfig::default()
.set_protocol_names(vec![protocol_name])
.set_record_filtering(KademliaStoreInserts::FilterBoth)
Expand All @@ -83,12 +84,12 @@ impl Behaviour {
kad
}

fn identify_config(public_key: PublicKey, protocol_ver: &str) -> identify::Behaviour {
let config = identify::Config::new(protocol_ver.to_string(), public_key.into());
fn identify_config(public_key: PublicKey, protocol_name: String) -> identify::Behaviour {
let config = identify::Config::new(protocol_name, public_key);
identify::Behaviour::new(config)
}

fn gossipsub_config(keypair: &Keypair, heartbeat_secs: u64) -> gossipsub::Behaviour {
fn gossipsub_config(keypair: &Keypair) -> gossipsub::Behaviour {
let build_msg_id = |msg: &gossipsub::Message| {
let mut id_str = msg.topic.to_string();
let src_peer_id = match msg.source {
Expand All @@ -105,7 +106,7 @@ impl Behaviour {
ConfigBuilder::default()
.max_transmit_size(MAX_TRANSMIT_SIZE * MEGABYTES) // block size is limitted to 2 MB. Multiply by factor of safety = 2.
.message_id_fn(build_msg_id)
.heartbeat_interval(Duration::from_secs(heartbeat_secs))
.heartbeat_interval(Duration::from_secs(HEARTBEAT_INTERVAL))
.build()
.unwrap(),
)
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ pub struct Config {
/// Interval in seconds for querying the network to discover peers
pub peer_discovery_interval: u64,

/// Protocol name to communicate with other Kademlia node
pub kademlia_protocol_name: String,
/// Protocol name to communicate with other Kademlia nodes
pub protocol_name: String,
}

// Returns a complete list of accepted topics in pchain-network
Expand Down
43 changes: 35 additions & 8 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
//! Upon receiving commands from application, gossipsub message will be delivered to a
//! Gossipsub topic.

use borsh::BorshDeserialize;
use futures::StreamExt;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::Boxed},
Expand All @@ -39,8 +40,11 @@ use std::net::Ipv4Addr;
use std::time::Duration;

use crate::{
config,
conversions,
behaviour::{Behaviour, NetworkEvent},
messages::{Envelope, Topic},
messages::{DroppedTxMessage, Envelope, Topic, Message},
messages::Topic::{DroppedTxns, HotStuffRsBroadcast, HotStuffRsSend, Mempool},
peer::{EngineCommand, PeerBuilder, Peer}, conversions, config,
};

Expand All @@ -66,8 +70,7 @@ pub(crate) async fn start(
let behaviour = Behaviour::new(
local_public_address,
&local_keypair,
10,
&config.kademlia_protocol_name, //TODO jonas
config.protocol_name
);

let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();
Expand All @@ -84,8 +87,7 @@ pub(crate) async fn start(
}

// 3. Subscribe to Topic
//TODO jonas
swarm.behaviour_mut().subscribe(config::fullnode_topics(local_public_address))?;
swarm.behaviour_mut().subscribe(config.topics_to_subscribe.clone())?;

// 4. Start p2p networking
let (sender, mut receiver) =
Expand Down Expand Up @@ -162,13 +164,38 @@ pub(crate) async fn start(
// So we need to convert Vec<u8> to pchain_network::Message. Instead of implementing
// TryFrom trait for Vec<u8> to Message, implement a function that takes in the Message Topic to help
// converting Vec<u8> to Message. You can refer to fullnode/mempool messagegate to see how to
// deserialise each Message type.


// deserialise each Message type.
let topic = config::fullnode_topics(local_public_address)
.into_iter()
.find(|t| t.clone().hash() == message.topic);

if let Some(topic) = topic {
let pchain_message = match topic {
HotStuffRsBroadcast => {
hotstuff_rs::messages::Message::deserialize(&mut message.data.as_slice())
.map(|hotstuff_message| Message::Consensus(hotstuff_message))
},
Mempool => {
pchain_types::blockchain::TransactionV1::deserialize(&mut message.data.as_slice())
.map(|mempool_message| Message::Mempool(mempool_message))
},
DroppedTxns => {
DroppedTxMessage::deserialize(&mut message.data.as_slice())
.map(|droppedtx_message| Message::DroppedTx(droppedtx_message))
},
HotStuffRsSend(address) => {
hotstuff_rs::messages::Message::deserialize(&mut message.data.as_slice())
.map(|hotstuff_message| Message::Consensus(hotstuff_message))
}
};
}

} else {
log::debug!("Receive unknown gossip message");
}
} else {
log::debug!("Received message from invalid PeerId.")
log::debug!("Received message from invalid PeerId.");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct Envelope {
}

/// [Topic] defines the topics of the messages in pchain-network.
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub enum Topic {
HotStuffRsBroadcast,
HotStuffRsSend(PublicAddress),
Expand Down