Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added engine JoinHandle for Peer, renamed sender to to_engine
  • Loading branch information
ong-jonas committed Oct 9, 2023
commit 0f6d4ba40c38ddc95cc6ea30d8d0ba26d7b2295b
7 changes: 5 additions & 2 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub(crate) async fn start(
let mut discover_tick =
tokio::time::interval(Duration::from_secs(config.peer_discovery_interval));

let _network_thread_handle = tokio::task::spawn(async move {
let network_thread_handle = tokio::task::spawn(async move {
loop {
// 4.1 Wait for the following events:
let (send_command, event) = tokio::select! {
Expand Down Expand Up @@ -189,7 +189,10 @@ pub(crate) async fn start(
}
});

Ok(Peer {sender})
Ok(Peer {
engine: network_thread_handle,
to_engine: sender
})
}

async fn build_transport(
Expand Down
21 changes: 11 additions & 10 deletions src/network_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ use crate::engine;
use crate::message_gate::MessageGateChain;
use crate::messages::{DroppedTxMessage, Message, Topic};
use pchain_types::{blockchain::TransactionV1, cryptography::PublicAddress};
use tokio::task::JoinHandle;

/// [Peer] provides inter-process messaging between application and the p2p
/// network. It started the main thread for the p2p network and handles for the [tokio::task]
/// by calling [crate::engine::start].
#[derive(Clone)]
pub struct Peer {
/// Engine handle for the [tokio::task] which is the main thread for
/// the p2p network (see[crate::engine])
pub(crate) engine: JoinHandle<()>,
/// mpsc sender for delivering message to p2p network
pub(crate) sender: tokio::sync::mpsc::Sender<SendCommand>,
pub(crate) to_engine: tokio::sync::mpsc::Sender<SendCommand>,
}

impl Peer {
Expand All @@ -30,38 +33,36 @@ impl Peer {
subscribe_topics: Vec<Topic>,
message_gates: MessageGateChain,
) -> Self {
let handle = engine::start(config, subscribe_topics, message_gates)
return engine::start(config, subscribe_topics, message_gates)
.await
.unwrap();
Self {
sender: handle.sender,
}

}

pub fn broadcast_mempool_tx_msg(&self, content: &TransactionV1) {
let _ = self.sender.try_send(SendCommand::Broadcast(
let _ = self.to_engine.try_send(SendCommand::Broadcast(
Topic::Mempool,
Message::Mempool(content.clone()),
));
}

pub fn broadcast_dropped_tx_msg(&self, content: DroppedTxMessage) {
let _ = self.sender.try_send(SendCommand::Broadcast(
let _ = self.to_engine.try_send(SendCommand::Broadcast(
Topic::DroppedTx,
Message::DroppedTx(content),
));
}

pub fn broadcast_consensus_msg(&self, content: hotstuff_rs::messages::Message) {
let _ = self.sender.try_send(SendCommand::Broadcast(
let _ = self.to_engine.try_send(SendCommand::Broadcast(
Topic::Consensus,
Message::Consensus(content),
));
}

pub fn send_to(&self, address: PublicAddress, content: hotstuff_rs::messages::Message) {
let _ = self
.sender
.to_engine
.try_send(SendCommand::SendTo(address, Message::Consensus(content)));
}
}
Expand Down