Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Make the gossip network into a module
  • Loading branch information
timorl committed Dec 12, 2022
commit cd55d731e3ba24a443eeeb5213b4138703c7fda2
4 changes: 1 addition & 3 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ use tokio::time::Duration;
use crate::{
abft::{CurrentNetworkData, LegacyNetworkData},
aggregation::{CurrentRmcNetworkData, LegacyRmcNetworkData},
network::Split,
network::{protocol_name, Split},
session::{
first_block_of_session, last_block_of_session, session_id_from_block_num,
SessionBoundaries, SessionId,
},
substrate_network::protocol_name,
VersionedTryFromError::{ExpectedNewGotOld, ExpectedOldGotNew},
};

Expand All @@ -45,7 +44,6 @@ mod nodes;
mod party;
mod session;
mod session_map;
mod substrate_network;
mod tcp_network;
#[cfg(test)]
pub mod testing;
Expand Down
132 changes: 132 additions & 0 deletions finality-aleph/src/network/gossip/mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use std::{collections::VecDeque, fmt, sync::Arc};

use async_trait::async_trait;
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use parking_lot::Mutex;

use crate::{
network::{
gossip::{Event, EventStream, NetworkSender, Protocol, RawNetwork},
mock::Channel,
},
validator_network::mock::MockPublicKey,
};

pub type MockEvent = Event<MockPublicKey>;

pub struct MockEventStream(mpsc::UnboundedReceiver<MockEvent>);

#[async_trait]
impl EventStream<MockPublicKey> for MockEventStream {
async fn next_event(&mut self) -> Option<MockEvent> {
self.0.next().await
}
}

pub struct MockNetworkSender {
sender: mpsc::UnboundedSender<(Vec<u8>, MockPublicKey, Protocol)>,
peer_id: MockPublicKey,
protocol: Protocol,
error: Result<(), MockSenderError>,
}

#[async_trait]
impl NetworkSender for MockNetworkSender {
type SenderError = MockSenderError;

async fn send<'a>(
&'a self,
data: impl Into<Vec<u8>> + Send + Sync + 'static,
) -> Result<(), MockSenderError> {
self.error?;
self.sender
.unbounded_send((data.into(), self.peer_id.clone(), self.protocol))
.unwrap();
Ok(())
}
}

#[derive(Clone)]
pub struct MockRawNetwork {
pub send_message: Channel<(Vec<u8>, MockPublicKey, Protocol)>,
pub event_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<MockEvent>>>>,
event_stream_taken_oneshot: Arc<Mutex<Option<oneshot::Sender<()>>>>,
pub create_sender_errors: Arc<Mutex<VecDeque<MockSenderError>>>,
pub send_errors: Arc<Mutex<VecDeque<MockSenderError>>>,
}

#[derive(Debug, Copy, Clone)]
pub struct MockSenderError;

impl fmt::Display for MockSenderError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Some error message")
}
}

impl std::error::Error for MockSenderError {}

impl RawNetwork for MockRawNetwork {
type SenderError = MockSenderError;
type NetworkSender = MockNetworkSender;
type PeerId = MockPublicKey;
type EventStream = MockEventStream;

fn event_stream(&self) -> Self::EventStream {
let (tx, rx) = mpsc::unbounded();
self.event_sinks.lock().push(tx);
// Necessary for tests to detect when service takes event_stream
if let Some(tx) = self.event_stream_taken_oneshot.lock().take() {
tx.send(()).unwrap();
}
MockEventStream(rx)
}

fn sender(
&self,
peer_id: Self::PeerId,
protocol: Protocol,
) -> Result<Self::NetworkSender, Self::SenderError> {
self.create_sender_errors
.lock()
.pop_front()
.map_or(Ok(()), Err)?;
let error = self.send_errors.lock().pop_front().map_or(Ok(()), Err);
Ok(MockNetworkSender {
sender: self.send_message.0.clone(),
peer_id,
protocol,
error,
})
}
}

impl MockRawNetwork {
pub fn new(oneshot_sender: oneshot::Sender<()>) -> Self {
MockRawNetwork {
send_message: Channel::new(),
event_sinks: Arc::new(Mutex::new(vec![])),
event_stream_taken_oneshot: Arc::new(Mutex::new(Some(oneshot_sender))),
create_sender_errors: Arc::new(Mutex::new(VecDeque::new())),
send_errors: Arc::new(Mutex::new(VecDeque::new())),
}
}

pub fn emit_event(&mut self, event: MockEvent) {
for sink in &*self.event_sinks.lock() {
sink.unbounded_send(event.clone()).unwrap();
}
}

// Consumes the network asserting there are no unreceived messages in the channels.
pub async fn close_channels(self) {
self.event_sinks.lock().clear();
// We disable it until tests regarding new substrate network protocol are created.
// assert!(self.add_reserved.close().await.is_none());
// assert!(self.remove_reserved.close().await.is_none());
assert!(self.send_message.close().await.is_none());
}
}
78 changes: 78 additions & 0 deletions finality-aleph/src/network/gossip/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//! A P2P-based gossip network, for now only for sending broadcasts.
use std::{
fmt::{Debug, Display},
hash::Hash,
};

use bytes::Bytes;

use crate::network::Data;

#[cfg(test)]
pub mod mock;
mod service;

pub use service::Service;

#[async_trait::async_trait]
/// Interface for the gossip network, currently only supports broadcasting and receiving data.
pub trait Network<D: Data>: Send + 'static {
type Error: Display + Send;

/// Broadcast data to all directly connected peers. Network-wide broadcasts have to be
/// implemented on top of this abstraction. Note that there might be no currently connected
/// peers, so there are no guarantees any single call sends anything even if no errors are
/// returned, retry appropriately.
fn broadcast(&mut self, data: D) -> Result<(), Self::Error>;

/// Receive some data from the network.
async fn next(&mut self) -> Result<D, Self::Error>;
}

/// The Authentication protocol is used for validator discovery.
#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
pub enum Protocol {
Authentication,
}

/// Abstraction over a sender to the raw network.
#[async_trait::async_trait]
pub trait NetworkSender: Send + Sync + 'static {
type SenderError: std::error::Error;

/// A method for sending data. Returns Error if not connected to the peer.
async fn send<'a>(
&'a self,
data: impl Into<Vec<u8>> + Send + Sync + 'static,
) -> Result<(), Self::SenderError>;
}

#[derive(Clone)]
pub enum Event<P> {
StreamOpened(P, Protocol),
StreamClosed(P, Protocol),
Messages(Vec<(Protocol, Bytes)>),
}

#[async_trait::async_trait]
pub trait EventStream<P> {
async fn next_event(&mut self) -> Option<Event<P>>;
}

/// Abstraction over a raw p2p network.
pub trait RawNetwork: Clone + Send + Sync + 'static {
type SenderError: std::error::Error;
type NetworkSender: NetworkSender;
type PeerId: Clone + Debug + Eq + Hash + Send;
type EventStream: EventStream<Self::PeerId>;

/// Returns a stream of events representing what happens on the network.
fn event_stream(&self) -> Self::EventStream;

/// Returns a sender to the given peer using a given protocol. Returns Error if not connected to the peer.
fn sender(
&self,
peer_id: Self::PeerId,
protocol: Protocol,
) -> Result<Self::NetworkSender, Self::SenderError>;
}
Loading