Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
renamed Peer functions and message topics
  • Loading branch information
ong-jonas committed Oct 9, 2023
commit f41b8b732dbb6c68a886863aef916c58f6a846c0
12 changes: 6 additions & 6 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl PeerBehaviour {

// Configure Gossipsub - subscribe to the topic of its own the base64-encoded public address
let mut gossip = Self::gossipsub_config(local_key, heartbeat_secs);
gossip.subscribe(&Topic::Mailbox(id).into()).unwrap();
gossip.subscribe(&Topic::HotStuffRsSend(id).into()).unwrap();

// Configure Ping
let ping = ping::Behaviour::default();
Expand Down Expand Up @@ -142,7 +142,7 @@ impl PeerBehaviour {
address: PublicAddress,
msg: Message,
) -> Result<MessageId, PublishError> {
let topic = Topic::Mailbox(address).hash();
let topic = Topic::HotStuffRsSend(address).hash();
let content: Vec<u8> = msg.into();
self.gossip.publish(topic, content)
}
Expand Down Expand Up @@ -274,7 +274,7 @@ mod test {
fn test_subscribe_topics() {
let mut peer1 = create_new_peer();

let self_topic_hash = Topic::Mailbox(peer1.public_address).hash();
let self_topic_hash = Topic::HotStuffRsSend(peer1.public_address).hash();

let self_topic_message = gossipsub::Message {
source: None,
Expand All @@ -289,16 +289,16 @@ mod test {
source: None,
data: vec![],
sequence_number: None,
topic: Topic::Consensus.hash(),
topic: Topic::HotStuffRsBroadcast.hash(),
};
let _ = peer1.behaviour.subscribe(vec![Topic::Consensus]);
let _ = peer1.behaviour.subscribe(vec![Topic::HotStuffRsBroadcast]);

// create Message with unsubscribed topic
let unsubscribed_msg = gossipsub::Message {
source: None,
data: vec![],
sequence_number: None,
topic: Topic::DroppedTx.hash(),
topic: Topic::DroppedTxns.hash(),
};

let subscribed_topics: Vec<&MessageTopicHash> = peer1.behaviour.gossip.topics().collect();
Expand Down
4 changes: 2 additions & 2 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ pub(crate) async fn start(
match engine_command {
EngineCommand::Publish(topic, message) => {
log::info!("Publish (Topic: {:?})", topic);
if topic == Topic::Mailbox(local_public_address) {
if topic == Topic::HotStuffRsSend(local_public_address) {
// send to myself
let envelope = Envelope {
origin: local_public_address,
message: message.into(),
};
message_gates
.message_in(&Topic::Mailbox(local_public_address).hash(), envelope)
.message_in(&Topic::HotStuffRsSend(local_public_address).hash(), envelope)
.await;
} else if let Err(e) = swarm.behaviour_mut().broadcast(topic.into(), message) {
log::debug!("{:?}", e);
Expand Down
33 changes: 17 additions & 16 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ pub type MessageTopicHash = libp2p::gossipsub::TopicHash;
/// [Topic] defines the topics available for subscribing.
#[derive(Debug, Clone, PartialEq)]
pub enum Topic {
Consensus,
HotStuffRsBroadcast,
HotStuffRsSend(PublicAddress),
Mempool,
DroppedTx,
Mailbox(PublicAddress),
DroppedTxns,

}

impl Topic {
Expand All @@ -39,7 +40,7 @@ impl Topic {
}

pub fn is_mailbox(topic_hash: &MessageTopicHash, node_address: PublicAddress) -> bool {
topic_hash == &Topic::Mailbox(node_address).hash()
topic_hash == &Topic::HotStuffRsSend(node_address).hash()
}

pub fn hash(self) -> MessageTopicHash {
Expand All @@ -50,10 +51,10 @@ impl Topic {
impl From<Topic> for IdentTopic {
fn from(topic: Topic) -> Self {
let str = match topic {
Topic::Consensus => "consensus".to_string(),
Topic::HotStuffRsBroadcast => "consensus".to_string(),
Topic::HotStuffRsSend(addr) => base64url::encode(addr),
Topic::Mempool => "mempool".to_string(),
Topic::DroppedTx => "droppedTx".to_string(),
Topic::Mailbox(addr) => base64url::encode(addr),
Topic::DroppedTxns => "droppedTx".to_string(),
};
IdentTopic::new(str)
}
Expand Down Expand Up @@ -153,20 +154,20 @@ mod test {
assert_eq!(mempool_topic.hash(), ident_topic.hash());

// Consensus topic
let consensus_topic = Topic::Consensus;
let consensus_topic = Topic::HotStuffRsBroadcast;

let ident_topic = IdentTopic::new("consensus".to_string());
assert_eq!(consensus_topic.hash(), ident_topic.hash());

// Dropped Tx Topic
let droppedtx_topic = Topic::DroppedTx;
let droppedtx_topic = Topic::DroppedTxns;

let ident_topic = IdentTopic::new("droppedTx".to_string());
assert_eq!(droppedtx_topic.hash(), ident_topic.hash());

// Mailbox Topic
let addr = [1u8;32];
let mailbox_topic = Topic::Mailbox(addr);
let mailbox_topic = Topic::HotStuffRsSend(addr);

let ident_topic = IdentTopic::new(base64url::encode(addr));
assert_eq!(mailbox_topic.hash(), ident_topic.hash())
Expand All @@ -179,7 +180,7 @@ mod test {
conversions::PublicAddress::try_from(Keypair::generate_ed25519().public())
.unwrap()
.into();
let test_topic = Topic::Mailbox(test_public_address);
let test_topic = Topic::HotStuffRsSend(test_public_address);

let expected_topic = IdentTopic::new(String::from(base64url::encode(test_public_address)));
assert_eq!(test_topic.hash(), expected_topic.hash());
Expand All @@ -195,15 +196,15 @@ mod test {
assert!(Topic::Mempool.is(&mempool_msg_hash));
assert!(!Topic::is_mailbox(&mempool_msg_hash, test_public_address));

let consensus_msg_hash = Topic::Consensus.hash();
assert!(Topic::Consensus.is(&consensus_msg_hash));
let consensus_msg_hash = Topic::HotStuffRsBroadcast.hash();
assert!(Topic::HotStuffRsBroadcast.is(&consensus_msg_hash));
assert!(!Topic::is_mailbox(&consensus_msg_hash, test_public_address));

let droppedtx_msg_hash = Topic::DroppedTx.hash();
assert!(Topic::DroppedTx.is(&droppedtx_msg_hash));
let droppedtx_msg_hash = Topic::DroppedTxns.hash();
assert!(Topic::DroppedTxns.is(&droppedtx_msg_hash));
assert!(!Topic::is_mailbox(&droppedtx_msg_hash, test_public_address));

let mailbox_topic_hash = Topic::Mailbox(test_public_address).hash();
let mailbox_topic_hash = Topic::HotStuffRsSend(test_public_address).hash();
assert!(Topic::is_mailbox(&mailbox_topic_hash, test_public_address));
}
}
14 changes: 7 additions & 7 deletions src/network_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,30 @@ impl Peer {

}

pub fn broadcast_mempool_tx_msg(&self, content: &TransactionV1) {
pub fn broadcast_mempool_msg(&self, content: &TransactionV1) {
let _ = self.to_engine.try_send(EngineCommand::Publish(
Topic::Mempool,
Message::Mempool(content.clone()),
));
}

pub fn broadcast_dropped_tx_msg(&self, content: DroppedTxMessage) {
pub fn broadcast_dropped_txn_msg(&self, content: DroppedTxMessage) {
let _ = self.to_engine.try_send(EngineCommand::Publish(
Topic::DroppedTx,
Topic::DroppedTxns,
Message::DroppedTx(content),
));
}

pub fn broadcast_consensus_msg(&self, content: hotstuff_rs::messages::Message) {
pub fn broadcast_hotstuff_rs_msg(&self, content: hotstuff_rs::messages::Message) {
let _ = self.to_engine.try_send(EngineCommand::Publish(
Topic::Consensus,
Topic::HotStuffRsBroadcast,
Message::Consensus(content),
));
}

pub fn send_to(&self, address: PublicAddress, content: hotstuff_rs::messages::Message) {
pub fn send_hotstuff_rs_msg(&self, address: PublicAddress, content: hotstuff_rs::messages::Message) {
let _ = self.to_engine.try_send(EngineCommand::Publish(
Topic::Mailbox(address),
Topic::HotStuffRsSend(address),
Message::Consensus(content)
));
}
Expand Down
18 changes: 9 additions & 9 deletions tests/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn test_broadcast() {
loop {
tokio::select! {
_ = sending_tick.tick() => {
node_1.broadcast_mempool_tx_msg(&base_tx(address_1));
node_1.broadcast_mempool_msg(&base_tx(address_1));
if sending_limit == 0 { break }
sending_limit -= 1;
}
Expand Down Expand Up @@ -97,7 +97,7 @@ async fn test_send_to() {
loop {
tokio::select! {
_ = sending_tick.tick() => {
node_1.send_to(address_2, message.clone());
node_1.send_hotstuff_rs_msg(address_2, message.clone());
if sending_limit == 0 { break }
sending_limit -= 1;
}
Expand Down Expand Up @@ -145,7 +145,7 @@ async fn test_send_to_only_specific_receiver() {
loop {
tokio::select! {
_ = sending_tick.tick() => {
node_1.send_to(address_2, create_sync_req(1));
node_1.send_hotstuff_rs_msg(address_2, create_sync_req(1));

if sending_limit == 0 { break }
sending_limit -= 1;
Expand Down Expand Up @@ -195,8 +195,8 @@ async fn test_sparse_messaging() {
loop {
tokio::select! {
_ = sending_tick.tick() => {
node_1.send_to(address_3, message_to_node3.clone());
node_3.send_to(address_1, message_to_node1.clone());
node_1.send_hotstuff_rs_msg(address_3, message_to_node3.clone());
node_3.send_hotstuff_rs_msg(address_1, message_to_node1.clone());

if sending_limit == 0 { break }
sending_limit -= 1;
Expand Down Expand Up @@ -233,7 +233,7 @@ async fn test_send_to_self() {
tokio::select! {
//broadcast does not send to self
_ = sending_tick.tick() => {
node_1.send_to(address_1, message.clone());
node_1.send_hotstuff_rs_msg(address_1, message.clone());
if sending_limit == 0 { break }
sending_limit -= 1;
}
Expand Down Expand Up @@ -261,7 +261,7 @@ async fn test_broadcast_different_topics() {
30015,
vec![PeerInfo::new(address_1, Ipv4Addr::new(127, 0, 0, 1), 30014)],
Some(Topic::Mempool),
vec![Topic::Consensus],
vec![Topic::HotStuffRsBroadcast],
)
.await;

Expand All @@ -272,7 +272,7 @@ async fn test_broadcast_different_topics() {
loop {
tokio::select! {
_ = sending_tick.tick() => {
node_1.broadcast_mempool_tx_msg(&base_tx(address_1));
node_1.broadcast_mempool_msg(&base_tx(address_1));
if sending_limit == 0 { break }
sending_limit -= 1;
}
Expand Down Expand Up @@ -307,7 +307,7 @@ pub async fn node(
let gate = if !subscribe_topics.is_empty() {
MessageCounts::new(gate_topic.unwrap())
} else {
MessageCounts::new(Topic::Mailbox(address))
MessageCounts::new(Topic::HotStuffRsSend(address))
};
let message_chain = MessageGateChain::new().append(gate.clone());

Expand Down