Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 5 additions & 29 deletions finality-aleph/src/network/mock.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
use std::{
collections::{HashSet, VecDeque},
fmt,
sync::Arc,
time::Duration,
};
use std::{collections::VecDeque, fmt, sync::Arc, time::Duration};

use aleph_primitives::KEY_TYPE;
use async_trait::async_trait;
Expand Down Expand Up @@ -97,7 +92,7 @@ impl<T> Default for Channel<T> {
}
}

pub type MockEvent = Event<MockMultiaddress, MockPublicKey>;
pub type MockEvent = Event<MockPublicKey>;

pub type MockData = Vec<u8>;

Expand Down Expand Up @@ -141,7 +136,7 @@ impl<M: Multiaddress + 'static> MockIO<M> {
pub struct MockEventStream(mpsc::UnboundedReceiver<MockEvent>);

#[async_trait]
impl EventStream<MockMultiaddress, MockPublicKey> for MockEventStream {
impl EventStream<MockPublicKey> for MockEventStream {
async fn next_event(&mut self) -> Option<MockEvent> {
self.0.next().await
}
Expand Down Expand Up @@ -172,8 +167,6 @@ impl NetworkSender for MockNetworkSender {

#[derive(Clone)]
pub struct MockNetwork {
pub add_reserved: Channel<(HashSet<MockMultiaddress>, Protocol)>,
pub remove_reserved: Channel<(HashSet<MockPublicKey>, Protocol)>,
pub send_message: Channel<(Vec<u8>, MockPublicKey, Protocol)>,
pub event_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<MockEvent>>>>,
event_stream_taken_oneshot: Arc<Mutex<Option<oneshot::Sender<()>>>>,
Expand All @@ -182,17 +175,11 @@ pub struct MockNetwork {
}

#[derive(Debug, Copy, Clone)]
pub enum MockSenderError {
SomeError,
}
pub struct MockSenderError();

impl fmt::Display for MockSenderError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
MockSenderError::SomeError => {
write!(f, "Some error message")
}
}
write!(f, "Some error message")
}
}

Expand All @@ -202,7 +189,6 @@ impl Network for MockNetwork {
type SenderError = MockSenderError;
type NetworkSender = MockNetworkSender;
type PeerId = MockPublicKey;
type Multiaddress = MockMultiaddress;
type EventStream = MockEventStream;

fn event_stream(&self) -> Self::EventStream {
Expand Down Expand Up @@ -232,21 +218,11 @@ impl Network for MockNetwork {
error,
})
}

fn add_reserved(&self, addresses: HashSet<Self::Multiaddress>, protocol: Protocol) {
self.add_reserved.send((addresses, protocol));
}

fn remove_reserved(&self, peers: HashSet<Self::PeerId>, protocol: Protocol) {
self.remove_reserved.send((peers, protocol));
}
}

impl MockNetwork {
pub fn new(oneshot_sender: oneshot::Sender<()>) -> Self {
MockNetwork {
add_reserved: Channel::new(),
remove_reserved: Channel::new(),
send_message: Channel::new(),
event_sinks: Arc::new(Mutex::new(vec![])),
event_stream_taken_oneshot: Arc::new(Mutex::new(Some(oneshot_sender))),
Expand Down
17 changes: 4 additions & 13 deletions finality-aleph/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,23 @@ pub trait NetworkSender: Send + Sync + 'static {
}

#[derive(Clone)]
pub enum Event<M, P> {
Connected(M),
Disconnected(P),
pub enum Event<P> {
StreamOpened(P, Protocol),
StreamClosed(P, Protocol),
Messages(Vec<(Protocol, Bytes)>),
}

#[async_trait]
pub trait EventStream<M, P> {
async fn next_event(&mut self) -> Option<Event<M, P>>;
pub trait EventStream<P> {
async fn next_event(&mut self) -> Option<Event<P>>;
}

/// Abstraction over a network.
pub trait Network: Clone + Send + Sync + 'static {
type SenderError: std::error::Error;
type NetworkSender: NetworkSender;
type PeerId: Clone + Debug + Eq + Hash + Send;
type Multiaddress: Debug + Eq + Hash;
type EventStream: EventStream<Self::Multiaddress, Self::PeerId>;
type EventStream: EventStream<Self::PeerId>;

/// Returns a stream of events representing what happens on the network.
fn event_stream(&self) -> Self::EventStream;
Expand All @@ -118,12 +115,6 @@ pub trait Network: Clone + Send + Sync + 'static {
peer_id: Self::PeerId,
protocol: Protocol,
) -> Result<Self::NetworkSender, Self::SenderError>;

/// Add peers to one of the reserved sets.
fn add_reserved(&self, addresses: HashSet<Self::Multiaddress>, protocol: Protocol);

/// Remove peers from one of the reserved sets.
fn remove_reserved(&self, peers: HashSet<Self::PeerId>, protocol: Protocol);
}

/// Abstraction for requesting own network addresses and PeerId.
Expand Down
Loading