Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
renamed SendCommand to EngineCommand
  • Loading branch information
ong-jonas committed Oct 9, 2023
commit bba50413e9ff51ae567f26b90669132c81f2ea2b
24 changes: 12 additions & 12 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//!
//! In the event loop, it waits for:
//! - [PeerNetworkEvent].
//! - [Commands](SendCommand) from application for sending message.
//! - [Commands](EngineCommand) from application for sending message.
//! - Timeout of a periodic interval to discover peers in the network.
//!
//! ### Events Handling
Expand Down Expand Up @@ -44,7 +44,7 @@ use crate::{
constants, conversions,
message_gate::MessageGateChain,
messages::{Envelope, Topic},
network_handle::SendCommand,
network_handle::EngineCommand,
Peer,
};

Expand Down Expand Up @@ -93,18 +93,18 @@ pub(crate) async fn start(

// 4. Start p2p networking
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<SendCommand>(config.send_command_buffer_size);
tokio::sync::mpsc::channel::<EngineCommand>(config.send_command_buffer_size);
let mut discover_tick =
tokio::time::interval(Duration::from_secs(config.peer_discovery_interval));

let network_thread_handle = tokio::task::spawn(async move {
loop {
// 4.1 Wait for the following events:
let (send_command, event) = tokio::select! {
let (engine_command, event) = tokio::select! {
biased;
// Receive a SendCommand from application
send_command = receiver.recv() => {
(send_command, None)
// Receive a (EngineCommand) from application
engine_command = receiver.recv() => {
(engine_command, None)
},
// Receive a PeerNetworkEvent
event = swarm.select_next_some() => {
Expand All @@ -118,10 +118,10 @@ pub(crate) async fn start(
},
};

// 4.2 Deliver messages when a SendCommand from application is received
if let Some(send_command) = send_command {
match send_command {
SendCommand::SendTo(recipient, raw_message) => {
// 4.2 Deliver messages when a EngineCommand from application is received
if let Some(engine_command) = engine_command {
match engine_command {
EngineCommand::SendTo(recipient, raw_message) => {
log::info!("SendTo: {}", base64url::encode(recipient).as_str());
if recipient == local_public_address {
// send to myself
Expand All @@ -137,7 +137,7 @@ pub(crate) async fn start(
log::error!("{:?}", e);
}
}
SendCommand::Broadcast(topic, msg) => {
EngineCommand::Broadcast(topic, msg) => {
log::info!("Broadcast (Topic: {:?})", topic);
if let Err(e) = swarm.behaviour_mut().broadcast(topic.into(), msg) {
log::debug!("{:?}", e);
Expand Down
16 changes: 8 additions & 8 deletions src/network_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct Peer {
/// the p2p network (see[crate::engine])
pub(crate) engine: JoinHandle<()>,
/// mpsc sender for delivering message to p2p network
pub(crate) to_engine: tokio::sync::mpsc::Sender<SendCommand>,
pub(crate) to_engine: tokio::sync::mpsc::Sender<EngineCommand>,
}

impl Peer {
Expand All @@ -40,21 +40,21 @@ impl Peer {
}

pub fn broadcast_mempool_tx_msg(&self, content: &TransactionV1) {
let _ = self.to_engine.try_send(SendCommand::Broadcast(
let _ = self.to_engine.try_send(EngineCommand::Broadcast(
Topic::Mempool,
Message::Mempool(content.clone()),
));
}

pub fn broadcast_dropped_tx_msg(&self, content: DroppedTxMessage) {
let _ = self.to_engine.try_send(SendCommand::Broadcast(
let _ = self.to_engine.try_send(EngineCommand::Broadcast(
Topic::DroppedTx,
Message::DroppedTx(content),
));
}

pub fn broadcast_consensus_msg(&self, content: hotstuff_rs::messages::Message) {
let _ = self.to_engine.try_send(SendCommand::Broadcast(
let _ = self.to_engine.try_send(EngineCommand::Broadcast(
Topic::Consensus,
Message::Consensus(content),
));
Expand All @@ -63,13 +63,13 @@ impl Peer {
pub fn send_to(&self, address: PublicAddress, content: hotstuff_rs::messages::Message) {
let _ = self
.to_engine
.try_send(SendCommand::SendTo(address, Message::Consensus(content)));
.try_send(EngineCommand::SendTo(address, Message::Consensus(content)));
}
}

/// A command to send a message either to a specific peer ([SendTo](SendCommand::SendTo)), or to all subscribers
/// of a network topic ([Broadcast](SendTo::Broadcast)).
pub enum SendCommand {
/// A command to send a message either to a specific peer ([SendTo](EngineCommand::SendTo)), or to all subscribers
/// of a network topic ([Broadcast](EngineCommand::Broadcast)).
pub enum EngineCommand {
/// send to a specific peer
SendTo(PublicAddress, Message),

Expand Down