Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 5 additions & 29 deletions finality-aleph/src/network/mock.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
use std::{
collections::{HashSet, VecDeque},
fmt,
sync::Arc,
time::Duration,
};
use std::{collections::VecDeque, fmt, sync::Arc, time::Duration};

use aleph_primitives::KEY_TYPE;
use async_trait::async_trait;
Expand Down Expand Up @@ -94,14 +89,14 @@ impl<T> Default for Channel<T> {
}
}

pub type MockEvent = Event<MockMultiaddress, MockPublicKey>;
pub type MockEvent = Event<MockPublicKey>;

pub type MockData = Vec<u8>;

pub struct MockEventStream(mpsc::UnboundedReceiver<MockEvent>);

#[async_trait]
impl EventStream<MockMultiaddress, MockPublicKey> for MockEventStream {
impl EventStream<MockPublicKey> for MockEventStream {
async fn next_event(&mut self) -> Option<MockEvent> {
self.0.next().await
}
Expand Down Expand Up @@ -132,8 +127,6 @@ impl NetworkSender for MockNetworkSender {

#[derive(Clone)]
pub struct MockNetwork {
pub add_reserved: Channel<(HashSet<MockMultiaddress>, Protocol)>,
pub remove_reserved: Channel<(HashSet<MockPublicKey>, Protocol)>,
pub send_message: Channel<(Vec<u8>, MockPublicKey, Protocol)>,
pub event_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<MockEvent>>>>,
event_stream_taken_oneshot: Arc<Mutex<Option<oneshot::Sender<()>>>>,
Expand All @@ -142,17 +135,11 @@ pub struct MockNetwork {
}

#[derive(Debug, Copy, Clone)]
pub enum MockSenderError {
SomeError,
}
pub struct MockSenderError;

impl fmt::Display for MockSenderError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
MockSenderError::SomeError => {
write!(f, "Some error message")
}
}
write!(f, "Some error message")
}
}

Expand All @@ -162,7 +149,6 @@ impl Network for MockNetwork {
type SenderError = MockSenderError;
type NetworkSender = MockNetworkSender;
type PeerId = MockPublicKey;
type Multiaddress = MockMultiaddress;
type EventStream = MockEventStream;

fn event_stream(&self) -> Self::EventStream {
Expand Down Expand Up @@ -192,21 +178,11 @@ impl Network for MockNetwork {
error,
})
}

fn add_reserved(&self, addresses: HashSet<Self::Multiaddress>, protocol: Protocol) {
self.add_reserved.send((addresses, protocol));
}

fn remove_reserved(&self, peers: HashSet<Self::PeerId>, protocol: Protocol) {
self.remove_reserved.send((peers, protocol));
}
}

impl MockNetwork {
pub fn new(oneshot_sender: oneshot::Sender<()>) -> Self {
MockNetwork {
add_reserved: Channel::new(),
remove_reserved: Channel::new(),
send_message: Channel::new(),
event_sinks: Arc::new(Mutex::new(vec![])),
event_stream_taken_oneshot: Arc::new(Mutex::new(Some(oneshot_sender))),
Expand Down
17 changes: 4 additions & 13 deletions finality-aleph/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,23 @@ pub trait NetworkSender: Send + Sync + 'static {
}

#[derive(Clone)]
pub enum Event<M, P> {
Connected(M),
Disconnected(P),
pub enum Event<P> {
StreamOpened(P, Protocol),
StreamClosed(P, Protocol),
Messages(Vec<(Protocol, Bytes)>),
}

#[async_trait]
pub trait EventStream<M, P> {
async fn next_event(&mut self) -> Option<Event<M, P>>;
pub trait EventStream<P> {
async fn next_event(&mut self) -> Option<Event<P>>;
}

/// Abstraction over a network.
pub trait Network: Clone + Send + Sync + 'static {
type SenderError: std::error::Error;
type NetworkSender: NetworkSender;
type PeerId: Clone + Debug + Eq + Hash + Send;
type Multiaddress: Debug + Eq + Hash;
type EventStream: EventStream<Self::Multiaddress, Self::PeerId>;
type EventStream: EventStream<Self::PeerId>;

/// Returns a stream of events representing what happens on the network.
fn event_stream(&self) -> Self::EventStream;
Expand All @@ -118,12 +115,6 @@ pub trait Network: Clone + Send + Sync + 'static {
peer_id: Self::PeerId,
protocol: Protocol,
) -> Result<Self::NetworkSender, Self::SenderError>;

/// Add peers to one of the reserved sets.
fn add_reserved(&self, addresses: HashSet<Self::Multiaddress>, protocol: Protocol);

/// Remove peers from one of the reserved sets.
fn remove_reserved(&self, peers: HashSet<Self::PeerId>, protocol: Protocol);
}

/// Abstraction for requesting own network addresses and PeerId.
Expand Down
72 changes: 5 additions & 67 deletions finality-aleph/src/network/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
collections::{HashMap, HashSet},
future::Future,
iter,
};

use futures::{channel::mpsc, StreamExt};
Expand Down Expand Up @@ -147,20 +146,10 @@ impl<N: Network, D: Data> Service<N, D> {

fn handle_network_event(
&mut self,
event: Event<N::Multiaddress, N::PeerId>,
event: Event<N::PeerId>,
) -> Result<(), mpsc::TrySendError<D>> {
use Event::*;
match event {
Connected(multiaddress) => {
trace!(target: "aleph-network", "Connected event from address {:?}", multiaddress);
self.network
.add_reserved(iter::once(multiaddress).collect(), Protocol::Authentication);
}
Disconnected(peer) => {
trace!(target: "aleph-network", "Disconnected event for peer {:?}", peer);
self.network
.remove_reserved(iter::once(peer).collect(), Protocol::Authentication);
}
StreamOpened(peer, protocol) => {
trace!(target: "aleph-network", "StreamOpened event for peer {:?} and the protocol {:?}.", peer, protocol);
let rx = match &protocol {
Expand Down Expand Up @@ -246,7 +235,7 @@ impl<N: Network, D: Data> Service<N, D> {

#[cfg(test)]
mod tests {
use std::{collections::HashSet, iter, iter::FromIterator};
use std::collections::HashSet;

use codec::Encode;
use futures::{
Expand All @@ -262,7 +251,7 @@ mod tests {
mock::{MockData, MockEvent, MockNetwork, MockSenderError},
NetworkServiceIO, Protocol,
},
testing::mocks::validator_network::{random_multiaddress, random_peer_id},
testing::mocks::validator_network::random_peer_id,
};

const PROTOCOL: Protocol = Protocol::Authentication;
Expand Down Expand Up @@ -309,57 +298,6 @@ mod tests {
vec![i, i + 1, i + 2]
}

#[tokio::test]
async fn test_sync_connected() {
let mut test_data = TestData::prepare().await;

let address = random_multiaddress();
test_data
.service
.handle_network_event(MockEvent::Connected(address.clone()))
.expect("Should handle");

let expected = (iter::once(address).collect(), PROTOCOL);

assert_eq!(
test_data
.network
.add_reserved
.next()
.await
.expect("Should receive message"),
expected
);

test_data.cleanup().await
}

#[tokio::test]
async fn test_sync_disconnected() {
let mut test_data = TestData::prepare().await;

let peer_id = random_peer_id();

test_data
.service
.handle_network_event(MockEvent::Disconnected(peer_id.clone()))
.expect("Should handle");

let expected = (iter::once(peer_id).collect(), PROTOCOL);

assert_eq!(
test_data
.network
.remove_reserved
.next()
.await
.expect("Should receive message"),
expected
);

test_data.cleanup().await
}

#[tokio::test]
async fn test_notification_stream_opened() {
let mut test_data = TestData::prepare().await;
Expand Down Expand Up @@ -452,7 +390,7 @@ mod tests {
.network
.create_sender_errors
.lock()
.push_back(MockSenderError::SomeError);
.push_back(MockSenderError);

let peer_id = random_peer_id();

Expand Down Expand Up @@ -491,7 +429,7 @@ mod tests {
.network
.send_errors
.lock()
.push_back(MockSenderError::SomeError);
.push_back(MockSenderError);

let peer_id = random_peer_id();

Expand Down
63 changes: 35 additions & 28 deletions finality-aleph/src/substrate_network.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::{collections::HashSet, fmt, iter, pin::Pin, sync::Arc};
use std::{fmt, iter, pin::Pin, sync::Arc};

use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};
use log::error;
use log::{error, trace};
use sc_consensus::JustificationSyncLink;
use sc_network::{
multiaddr::Protocol as MultiaddressProtocol, Event as SubstrateEvent, Multiaddr,
NetworkService, NetworkSyncForkRequest, PeerId,
multiaddr::Protocol as MultiaddressProtocol, Event as SubstrateEvent, NetworkService,
NetworkSyncForkRequest, PeerId,
};
use sc_network_common::{
protocol::ProtocolName,
Expand Down Expand Up @@ -122,22 +122,40 @@ impl NetworkSender for SubstrateNetworkSender {
}
}

type NetworkEventStream = Pin<Box<dyn Stream<Item = SubstrateEvent> + Send>>;
pub struct NetworkEventStream<B: Block, H: ExHashT> {
stream: Pin<Box<dyn Stream<Item = SubstrateEvent> + Send>>,
network: Arc<NetworkService<B, H>>,
}

#[async_trait]
impl EventStream<Multiaddr, PeerId> for NetworkEventStream {
async fn next_event(&mut self) -> Option<Event<Multiaddr, PeerId>> {
impl<B: Block, H: ExHashT> EventStream<PeerId> for NetworkEventStream<B, H> {
async fn next_event(&mut self) -> Option<Event<PeerId>> {
use Event::*;
use SubstrateEvent::*;
loop {
match self.next().await {
match self.stream.next().await {
Some(event) => match event {
SyncConnected { remote } => {
return Some(Connected(
iter::once(MultiaddressProtocol::P2p(remote.into())).collect(),
))
let multiaddress =
iter::once(MultiaddressProtocol::P2p(remote.into())).collect();
trace!(target: "aleph-network", "Connected event from address {:?}", multiaddress);
if let Err(e) = self.network.add_peers_to_reserved_set(
protocol_name(&Protocol::Authentication),
iter::once(multiaddress).collect(),
) {
error!(target: "aleph-network", "add_reserved failed: {}", e);
}
continue;
}
SyncDisconnected { remote } => {
trace!(target: "aleph-network", "Disconnected event for peer {:?}", remote);
let addresses = iter::once(remote).collect();
self.network.remove_peers_from_reserved_set(
protocol_name(&Protocol::Authentication),
addresses,
);
continue;
}
SyncDisconnected { remote } => return Some(Disconnected(remote)),
NotificationStreamOpened {
remote, protocol, ..
} => match to_protocol(protocol.as_ref()) {
Expand Down Expand Up @@ -177,11 +195,13 @@ impl<B: Block, H: ExHashT> Network for Arc<NetworkService<B, H>> {
type SenderError = SenderError;
type NetworkSender = SubstrateNetworkSender;
type PeerId = PeerId;
type Multiaddress = Multiaddr;
type EventStream = NetworkEventStream;
type EventStream = NetworkEventStream<B, H>;

fn event_stream(&self) -> Self::EventStream {
Box::pin(self.as_ref().event_stream("aleph-network"))
NetworkEventStream {
stream: Box::pin(self.as_ref().event_stream("aleph-network")),
network: self.clone(),
}
}

fn sender(
Expand All @@ -198,17 +218,4 @@ impl<B: Block, H: ExHashT> Network for Arc<NetworkService<B, H>> {
peer_id,
})
}

fn add_reserved(&self, addresses: HashSet<Self::Multiaddress>, protocol: Protocol) {
if let Err(e) = self
.add_peers_to_reserved_set(protocol_name(&protocol), addresses.into_iter().collect())
{
error!(target: "aleph-network", "add_reserved failed: {}", e);
}
}

fn remove_reserved(&self, peers: HashSet<Self::PeerId>, protocol: Protocol) {
let addresses = peers.into_iter().collect();
self.remove_peers_from_reserved_set(protocol_name(&protocol), addresses);
}
}