diff --git a/finality-aleph/src/network/io.rs b/finality-aleph/src/network/io.rs index 868115214b..8557349581 100644 --- a/finality-aleph/src/network/io.rs +++ b/finality-aleph/src/network/io.rs @@ -1,42 +1,43 @@ use futures::channel::mpsc; -use crate::network::{ - manager::{DataInSession, VersionedAuthentication}, - ConnectionManagerIO, Data, Multiaddress, NetworkServiceIO as NetworkIO, SessionManagerIO, +use crate::{ + network::{ + manager::{DataInSession, VersionedAuthentication}, + ConnectionManagerIO, Data, Multiaddress, NetworkServiceIO as NetworkIO, SessionManagerIO, + }, + validator_network::{Network as ValidatorNetwork, PublicKey}, }; -type AuthenticationNetworkIO = NetworkIO, DataInSession, M>; +type AuthenticationNetworkIO = NetworkIO>; -pub fn setup() -> ( - ConnectionManagerIO, - AuthenticationNetworkIO, +pub fn setup< + D: Data, + M: Multiaddress + 'static, + VN: ValidatorNetwork>, +>( + validator_network: VN, +) -> ( + ConnectionManagerIO, + AuthenticationNetworkIO, SessionManagerIO, -) { +) +where + M::PeerId: PublicKey, +{ // Prepare and start the network - let (commands_for_network, commands_from_io) = mpsc::unbounded(); - let (data_for_network, data_from_user) = mpsc::unbounded(); let (messages_for_network, messages_from_user) = mpsc::unbounded(); let (commands_for_service, commands_from_user) = mpsc::unbounded(); let (messages_for_service, commands_from_manager) = mpsc::unbounded(); - let (data_for_user, data_from_network) = mpsc::unbounded(); let (messages_for_user, messages_from_network) = mpsc::unbounded(); let connection_io = ConnectionManagerIO::new( - commands_for_network, - data_for_network, messages_for_network, commands_from_user, commands_from_manager, - data_from_network, messages_from_network, + validator_network, ); - let channels_for_network = NetworkIO::new( - data_from_user, - messages_from_user, - data_for_user, - messages_for_user, - commands_from_io, - ); + let channels_for_network = NetworkIO::new(messages_from_user, messages_for_user); let channels_for_session_manager = SessionManagerIO::new(commands_for_service, messages_for_service); diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/manager/service.rs index 247d3c3e23..e52d75232a 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/manager/service.rs @@ -21,6 +21,7 @@ use crate::{ }, AddressedData, ConnectionCommand, Data, Multiaddress, NetworkIdentity, PeerId, }, + validator_network::{Network as ValidatorNetwork, PublicKey}, MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL, }; @@ -595,22 +596,22 @@ impl Service { } } -/// Input/output interface for the connectiona manager service. -pub struct IO { - commands_for_network: mpsc::UnboundedSender>, - data_for_network: mpsc::UnboundedSender, M::PeerId>>, +/// Input/output interface for the connection manager service. +pub struct IO>> +where + M::PeerId: PublicKey, +{ authentications_for_network: mpsc::UnboundedSender>, commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - data_from_network: mpsc::UnboundedReceiver>, authentications_from_network: mpsc::UnboundedReceiver>, + validator_network: VN, } /// Errors that can happen during the network service operations. #[derive(Debug, PartialEq, Eq)] pub enum Error { NetworkSend, - CommandSend, /// Should never be fatal. UserSend, /// Should never be fatal. @@ -620,31 +621,28 @@ pub enum Error { NetworkChannel, } -impl IO { +impl>> IO +where + M::PeerId: PublicKey, +{ pub fn new( - commands_for_network: mpsc::UnboundedSender>, - data_for_network: mpsc::UnboundedSender, M::PeerId>>, authentications_for_network: mpsc::UnboundedSender>, commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - data_from_network: mpsc::UnboundedReceiver>, authentications_from_network: mpsc::UnboundedReceiver>, - ) -> IO { + validator_network: VN, + ) -> IO { IO { - commands_for_network, - data_for_network, authentications_for_network, commands_from_user, messages_from_user, - data_from_network, authentications_from_network, + validator_network, } } - fn send_data(&self, to_send: AddressedData, M::PeerId>) -> Result<(), Error> { - self.data_for_network - .unbounded_send(to_send) - .map_err(|_| Error::NetworkSend) + fn send_data(&self, to_send: AddressedData, M::PeerId>) { + self.validator_network.send(to_send.0, to_send.1) } fn send_authentication(&self, to_send: DiscoveryMessage) -> Result<(), Error> { @@ -653,21 +651,32 @@ impl IO { .map_err(|_| Error::NetworkSend) } - fn send_command(&self, to_send: ConnectionCommand) -> Result<(), Error> { - self.commands_for_network - .unbounded_send(to_send) - .map_err(|_| Error::CommandSend) + fn handle_connection_command(&mut self, connection_command: ConnectionCommand) { + match connection_command { + ConnectionCommand::AddReserved(addresses) => { + for multi in addresses { + if let Some(peer_id) = multi.get_peer_id() { + self.validator_network.add_connection(peer_id, vec![multi]); + } + } + } + ConnectionCommand::DelReserved(peers) => { + for peer in peers { + self.validator_network.remove_connection(peer); + } + } + }; } - fn send( - &self, + fn handle_service_actions( + &mut self, ServiceActions { maybe_command, maybe_message, }: ServiceActions, ) -> Result<(), Error> { if let Some(command) = maybe_command { - self.send_command(command)?; + self.handle_connection_command(command); } if let Some(message) = maybe_message { self.send_authentication(message)?; @@ -695,7 +704,7 @@ impl IO { trace!(target: "aleph-network", "Manager received a command from user"); match maybe_command { Some(command) => match service.on_command(command).await { - Ok(to_send) => self.send(to_send)?, + Ok(to_send) => self.handle_service_actions(to_send)?, Err(e) => warn!(target: "aleph-network", "Failed to update handler: {:?}", e), }, None => return Err(Error::CommandsChannel), @@ -705,12 +714,12 @@ impl IO { trace!(target: "aleph-network", "Manager received a message from user"); match maybe_message { Some((message, session_id, recipient)) => for message in service.on_user_message(message, session_id, recipient) { - self.send_data(message)?; + self.send_data(message); }, None => return Err(Error::MessageChannel), } }, - maybe_data = self.data_from_network.next() => { + maybe_data = self.validator_network.next() => { trace!(target: "aleph-network", "Manager received some data from network"); match maybe_data { Some(DataInSession{data, session_id}) => if let Err(e) = service.send_session_data(&session_id, data) { @@ -727,7 +736,7 @@ impl IO { trace!(target: "aleph-network", "Manager received an authentication from network"); match maybe_authentication { Some(authentication) => match authentication.try_into() { - Ok(message) => self.send(service.on_discovery_message(message))?, + Ok(message) => self.handle_service_actions(service.on_discovery_message(message))?, Err(e) => warn!(target: "aleph-network", "Error casting versioned authentication to discovery message: {:?}", e), }, None => return Err(Error::NetworkChannel), @@ -736,7 +745,7 @@ impl IO { _ = maintenance.tick() => { debug!(target: "aleph-network", "Manager starts maintenence"); match service.retry_session_start().await { - Ok(to_send) => self.send(to_send)?, + Ok(to_send) => self.handle_service_actions(to_send)?, Err(e) => warn!(target: "aleph-network", "Retry failed to update handler: {:?}", e), } for to_send in service.discovery() { diff --git a/finality-aleph/src/network/mock.rs b/finality-aleph/src/network/mock.rs index 83db534947..6de9642bb1 100644 --- a/finality-aleph/src/network/mock.rs +++ b/finality-aleph/src/network/mock.rs @@ -17,10 +17,7 @@ use tokio::time::timeout; use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, - network::{ - manager::VersionedAuthentication, AddressedData, ConnectionCommand, Event, EventStream, - Multiaddress, Network, NetworkIdentity, NetworkSender, NetworkServiceIO, Protocol, - }, + network::{Event, EventStream, Network, NetworkIdentity, NetworkSender, Protocol}, testing::mocks::validator_network::{random_identity, MockMultiaddress}, validator_network::mock::MockPublicKey, AuthorityId, NodeIndex, @@ -101,43 +98,6 @@ pub type MockEvent = Event; pub type MockData = Vec; -pub struct MockIO { - pub messages_for_network: mpsc::UnboundedSender>, - pub data_for_network: mpsc::UnboundedSender>, - pub messages_from_network: mpsc::UnboundedReceiver>, - pub data_from_network: mpsc::UnboundedReceiver, - pub commands_for_network: mpsc::UnboundedSender>, -} - -impl MockIO { - pub fn new() -> ( - MockIO, - NetworkServiceIO, MockData, M>, - ) { - let (messages_for_network, messages_from_user) = mpsc::unbounded(); - let (data_for_network, data_from_user) = mpsc::unbounded(); - let (messages_for_user, messages_from_network) = mpsc::unbounded(); - let (data_for_user, data_from_network) = mpsc::unbounded(); - let (commands_for_network, commands_from_manager) = mpsc::unbounded(); - ( - MockIO { - messages_for_network, - data_for_network, - messages_from_network, - data_from_network, - commands_for_network, - }, - NetworkServiceIO::new( - data_from_user, - messages_from_user, - data_for_user, - messages_for_user, - commands_from_manager, - ), - ) - } -} - pub struct MockEventStream(mpsc::UnboundedReceiver); #[async_trait] diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index cdffb48152..99c2cb75cb 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -59,7 +59,7 @@ pub trait PeerId: PartialEq + Eq + Clone + Debug + Display + Hash + Codec + Send } /// Represents the address of an arbitrary node. -pub trait Multiaddress: Debug + Hash + Codec + Clone + Eq + Send + Sync { +pub trait Multiaddress: Debug + Hash + Codec + Clone + Eq + Send + Sync + 'static { type PeerId: PeerId; /// Returns the peer id associated with this multiaddress if it exists and is unique. diff --git a/finality-aleph/src/network/service.rs b/finality-aleph/src/network/service.rs index 42c2ef954b..2c2a05efae 100644 --- a/finality-aleph/src/network/service.rs +++ b/finality-aleph/src/network/service.rs @@ -11,11 +11,7 @@ use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound use tokio::time; use crate::{ - network::{ - AddressedData, ConnectionCommand, Data, Event, EventStream, Multiaddress, Network, - NetworkSender, Protocol, - }, - validator_network::{Network as ValidatorNetwork, PublicKey}, + network::{Data, Event, EventStream, Network, NetworkSender, Protocol}, STATUS_REPORT_INTERVAL, }; @@ -24,54 +20,30 @@ use crate::{ /// 1. Incoming network events /// 1. Messages are forwarded to the user. /// 2. Various forms of (dis)connecting, keeping track of all currently connected nodes. -/// 2. Commands from the network manager, modifying the reserved peer set. /// 3. Outgoing messages, sending them out, using 1.2. to broadcast. -/// Currently this also handles the validator network for sending in-session data, but this is -/// likely to change in the future. -pub struct Service< - N: Network, - D: Data, - VD: Data, - A: Data + Multiaddress, - VN: ValidatorNetwork, -> where - A::PeerId: PublicKey, -{ +pub struct Service { network: N, - validator_network: VN, - data_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver, - data_for_user: mpsc::UnboundedSender, messages_for_user: mpsc::UnboundedSender, - commands_from_manager: mpsc::UnboundedReceiver>, authentication_connected_peers: HashSet, authentication_peer_senders: HashMap>, spawn_handle: SpawnTaskHandle, } /// Input/output channels for the network service. -pub struct IO { - pub data_from_user: mpsc::UnboundedReceiver>, +pub struct IO { pub messages_from_user: mpsc::UnboundedReceiver, - pub data_for_user: mpsc::UnboundedSender, pub messages_for_user: mpsc::UnboundedSender, - pub commands_from_manager: mpsc::UnboundedReceiver>, } -impl IO { +impl IO { pub fn new( - data_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver, - data_for_user: mpsc::UnboundedSender, messages_for_user: mpsc::UnboundedSender, - commands_from_manager: mpsc::UnboundedReceiver>, - ) -> IO { + ) -> IO { IO { - data_from_user, messages_from_user, - data_for_user, messages_for_user, - commands_from_manager, } } } @@ -82,30 +54,12 @@ enum SendError { SendingFailed, } -impl< - N: Network, - D: Data, - VD: Data, - A: Data + Multiaddress, - VN: ValidatorNetwork, - > Service -where - A::PeerId: PublicKey, -{ - pub fn new( - network: N, - validator_network: VN, - spawn_handle: SpawnTaskHandle, - io: IO, - ) -> Service { +impl Service { + pub fn new(network: N, spawn_handle: SpawnTaskHandle, io: IO) -> Service { Service { network, - validator_network, - data_from_user: io.data_from_user, messages_from_user: io.messages_from_user, - data_for_user: io.data_for_user, messages_for_user: io.messages_for_user, - commands_from_manager: io.commands_from_manager, spawn_handle, authentication_connected_peers: HashSet::new(), authentication_peer_senders: HashMap::new(), @@ -248,28 +202,6 @@ where Ok(()) } - fn handle_validator_network_data(&mut self, data: VD) -> Result<(), mpsc::TrySendError> { - self.data_for_user.unbounded_send(data) - } - - fn on_manager_command(&mut self, command: ConnectionCommand) { - use ConnectionCommand::*; - match command { - AddReserved(addresses) => { - for multi in addresses { - if let Some(peer_id) = multi.get_peer_id() { - self.validator_network.add_connection(peer_id, vec![multi]); - } - } - } - DelReserved(peers) => { - for peer in peers { - self.validator_network.remove_connection(peer); - } - } - } - } - fn status_report(&self) { let mut status = String::from("Network status report: "); @@ -297,22 +229,6 @@ where return; } }, - maybe_data = self.validator_network.next() => match maybe_data { - Some(data) => if let Err(e) = self.handle_validator_network_data(data) { - error!(target: "aleph-network", "Cannot forward messages to user: {:?}", e); - return; - }, - None => { - error!(target: "aleph-network", "Validator network event stream ended."); - } - }, - maybe_data = self.data_from_user.next() => match maybe_data { - Some((data, peer_id)) => self.validator_network.send(data, peer_id), - None => { - error!(target: "aleph-network", "User data stream ended."); - return; - } - }, maybe_message = self.messages_from_user.next() => match maybe_message { Some(message) => self.broadcast(message, Protocol::Authentication), None => { @@ -320,13 +236,6 @@ where return; } }, - maybe_command = self.commands_from_manager.next() => match maybe_command { - Some(command) => self.on_manager_command(command), - None => { - error!(target: "aleph-network", "Manager command stream ended."); - return; - } - }, _ = status_ticker.tick() => { self.status_report(); }, @@ -340,31 +249,28 @@ mod tests { use std::{collections::HashSet, iter, iter::FromIterator}; use codec::Encode; - use futures::{channel::oneshot, StreamExt}; + use futures::{ + channel::{mpsc, oneshot}, + StreamExt, + }; use sc_service::TaskManager; - use tokio::{runtime::Handle, task::JoinHandle}; + use tokio::runtime::Handle; - use super::{ConnectionCommand, Service}; + use super::Service; use crate::{ network::{ - manager::{SessionHandler, VersionedAuthentication}, - mock::{crypto_basics, MockData, MockEvent, MockIO, MockNetwork, MockSenderError}, - testing::DiscoveryMessage, - NetworkIdentity, Protocol, - }, - testing::mocks::validator_network::{ - random_multiaddress, random_peer_id, MockMultiaddress, - MockNetwork as MockValidatorNetwork, + mock::{MockData, MockEvent, MockNetwork, MockSenderError}, + NetworkServiceIO, Protocol, }, - SessionId, + testing::mocks::validator_network::{random_multiaddress, random_peer_id}, }; + const PROTOCOL: Protocol = Protocol::Authentication; + pub struct TestData { - pub service_handle: JoinHandle<()>, - pub exit_tx: oneshot::Sender<()>, pub network: MockNetwork, - pub validator_network: MockValidatorNetwork, - pub mock_io: MockIO, + pub messages_from_network: mpsc::UnboundedReceiver, + pub service: Service, // `TaskManager` can't be dropped for `SpawnTaskHandle` to work _task_manager: TaskManager, } @@ -374,59 +280,28 @@ mod tests { let task_manager = TaskManager::new(Handle::current(), None).unwrap(); // Prepare communication with service - let (mock_io, io) = MockIO::new(); + // We can drop the sender, as we will call service.broadcast directly + let (_, messages_from_user) = mpsc::unbounded(); + let (messages_for_user, messages_from_network) = mpsc::unbounded(); + let io = NetworkServiceIO::new(messages_from_user, messages_for_user); + // Event stream will never be taken, so we can drop the receiver + let (event_stream_oneshot_tx, _) = oneshot::channel(); + // Prepare service - let (event_stream_oneshot_tx, event_stream_oneshot_rx) = oneshot::channel(); let network = MockNetwork::new(event_stream_oneshot_tx); - let validator_network = MockValidatorNetwork::new("addr").await; - let service = Service::new( - network.clone(), - validator_network.clone(), - task_manager.spawn_handle(), - io, - ); - let (exit_tx, exit_rx) = oneshot::channel(); - let task_handle = async move { - tokio::select! { - _ = service.run() => { }, - _ = exit_rx => { }, - }; - }; - let service_handle = tokio::spawn(task_handle); - // wait till service takes the event_stream - event_stream_oneshot_rx.await.unwrap(); + let service = Service::new(network.clone(), task_manager.spawn_handle(), io); - // `TaskManager` needs to be passed. + // `TaskManager` needs to be passed, so sender threads are running in background. Self { - service_handle, - exit_tx, network, - validator_network, - mock_io, + service, + messages_from_network, _task_manager: task_manager, } } async fn cleanup(self) { - self.exit_tx.send(()).unwrap(); - self.service_handle.await.unwrap(); self.network.close_channels().await; - self.validator_network.close_channels().await; - } - - // We do this only to make sure that NotificationStreamOpened/NotificationStreamClosed events are handled - async fn wait_for_events_handled(&mut self) { - let address = random_multiaddress(); - self.network - .emit_event(MockEvent::Connected(address.clone())); - assert_eq!( - self.network - .add_reserved - .next() - .await - .expect("Should receive message"), - (iter::once(address).collect(), Protocol::Authentication,) - ); } } @@ -434,33 +309,17 @@ mod tests { vec![i, i + 1, i + 2] } - async fn authentication( - multiaddresses: Vec, - ) -> VersionedAuthentication { - let crypto_basics = crypto_basics(1).await; - let handler = SessionHandler::new( - Some(crypto_basics.0[0].clone()), - crypto_basics.1.clone(), - SessionId(43), - multiaddresses, - ) - .await - .unwrap(); - VersionedAuthentication::V1(DiscoveryMessage::AuthenticationBroadcast( - handler.authentication().unwrap(), - )) - } - #[tokio::test] async fn test_sync_connected() { let mut test_data = TestData::prepare().await; let address = random_multiaddress(); test_data - .network - .emit_event(MockEvent::Connected(address.clone())); + .service + .handle_network_event(MockEvent::Connected(address.clone())) + .expect("Should handle"); - let expected = (iter::once(address).collect(), Protocol::Authentication); + let expected = (iter::once(address).collect(), PROTOCOL); assert_eq!( test_data @@ -482,10 +341,11 @@ mod tests { let peer_id = random_peer_id(); test_data - .network - .emit_event(MockEvent::Disconnected(peer_id.clone())); + .service + .handle_network_event(MockEvent::Disconnected(peer_id.clone())) + .expect("Should handle"); - let expected = (iter::once(peer_id).collect(), Protocol::Authentication); + let expected = (iter::once(peer_id).collect(), PROTOCOL); assert_eq!( test_data @@ -507,21 +367,14 @@ mod tests { let peer_ids: Vec<_> = (0..3).map(|_| random_peer_id()).collect(); peer_ids.iter().for_each(|peer_id| { - test_data.network.emit_event(MockEvent::StreamOpened( - peer_id.clone(), - Protocol::Authentication, - )); + test_data + .service + .handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL)) + .expect("Should handle"); }); - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; - - let message = authentication(test_data.validator_network.identity().0).await; - test_data - .mock_io - .messages_for_network - .unbounded_send(message.clone()) - .unwrap(); + let message = message(1); + test_data.service.broadcast(message.clone(), PROTOCOL); let broadcasted_messages = HashSet::<_>::from_iter( test_data @@ -535,7 +388,7 @@ mod tests { let expected_messages = HashSet::from_iter( peer_ids .into_iter() - .map(|peer_id| (message.clone().encode(), peer_id, Protocol::Authentication)), + .map(|peer_id| (message.clone().encode(), peer_id, PROTOCOL)), ); assert_eq!(broadcasted_messages, expected_messages); @@ -551,31 +404,24 @@ mod tests { let opened_authorities_n = 2; peer_ids.iter().for_each(|peer_id| { - test_data.network.emit_event(MockEvent::StreamOpened( - peer_id.clone(), - Protocol::Authentication, - )); + test_data + .service + .handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL)) + .expect("Should handle"); }); peer_ids .iter() .skip(opened_authorities_n) .for_each(|peer_id| { - test_data.network.emit_event(MockEvent::StreamClosed( - peer_id.clone(), - Protocol::Authentication, - )); + test_data + .service + .handle_network_event(MockEvent::StreamClosed(peer_id.clone(), PROTOCOL)) + .expect("Should handle"); }); - // We do this only to make sure that NotificationStreamClosed events are handled - test_data.wait_for_events_handled().await; - - let message = authentication(test_data.validator_network.identity().0).await; - test_data - .mock_io - .messages_for_network - .unbounded_send(message.clone()) - .unwrap(); + let message = message(1); + test_data.service.broadcast(message.clone(), PROTOCOL); let broadcasted_messages = HashSet::<_>::from_iter( test_data @@ -590,7 +436,7 @@ mod tests { peer_ids .into_iter() .take(opened_authorities_n) - .map(|peer_id| (message.clone().encode(), peer_id, Protocol::Authentication)), + .map(|peer_id| (message.clone().encode(), peer_id, PROTOCOL)), ); assert_eq!(broadcasted_messages, expected_messages); @@ -598,56 +444,6 @@ mod tests { test_data.cleanup().await } - #[tokio::test] - async fn test_send_validator_data() { - let mut test_data = TestData::prepare().await; - - let peer_id = random_peer_id(); - - let message = message(1); - - test_data - .mock_io - .data_for_network - .unbounded_send((message.clone(), peer_id.clone())) - .unwrap(); - - let expected = (message, peer_id); - - assert_eq!( - test_data - .validator_network - .send - .next() - .await - .expect("Should receive message"), - expected, - ); - - test_data.cleanup().await - } - - #[tokio::test] - async fn test_receives_validator_data() { - let mut test_data = TestData::prepare().await; - - let message = message(1); - - test_data.validator_network.next.send(message.clone()); - - assert_eq!( - test_data - .mock_io - .data_from_network - .next() - .await - .expect("Should receive message"), - message, - ); - - test_data.cleanup().await - } - #[tokio::test] async fn test_create_sender_error() { let mut test_data = TestData::prepare().await; @@ -660,30 +456,19 @@ mod tests { let peer_id = random_peer_id(); - let message_1 = authentication(vec![(random_peer_id(), String::from("other_1"))]).await; - let message_2 = authentication(vec![(random_peer_id(), String::from("other_2"))]).await; - - test_data.network.emit_event(MockEvent::StreamOpened( - peer_id.clone(), - Protocol::Authentication, - )); - - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; + let message_1 = message(1); + let message_2 = message(4); test_data - .mock_io - .messages_for_network - .unbounded_send(message_1) - .unwrap(); + .service + .handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL)) + .expect("Should handle"); - test_data - .mock_io - .messages_for_network - .unbounded_send(message_2.clone()) - .unwrap(); + test_data.service.broadcast(message_1, PROTOCOL); + + test_data.service.broadcast(message_2.clone(), PROTOCOL); - let expected = (message_2.encode(), peer_id, Protocol::Authentication); + let expected = (message_2.encode(), peer_id, PROTOCOL); assert_eq!( test_data @@ -710,30 +495,19 @@ mod tests { let peer_id = random_peer_id(); - let message_1 = authentication(vec![(random_peer_id(), String::from("other_1"))]).await; - let message_2 = authentication(vec![(random_peer_id(), String::from("other_2"))]).await; - - test_data.network.emit_event(MockEvent::StreamOpened( - peer_id.clone(), - Protocol::Authentication, - )); - - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; + let message_1 = message(1); + let message_2 = message(4); test_data - .mock_io - .messages_for_network - .unbounded_send(message_1) - .unwrap(); + .service + .handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL)) + .expect("Should handle"); - test_data - .mock_io - .messages_for_network - .unbounded_send(message_2.clone()) - .unwrap(); + test_data.service.broadcast(message_1, PROTOCOL); - let expected = (message_2.encode(), peer_id, Protocol::Authentication); + test_data.service.broadcast(message_2.clone(), PROTOCOL); + + let expected = (message_2.encode(), peer_id, PROTOCOL); assert_eq!( test_data @@ -752,16 +526,18 @@ mod tests { async fn test_notification_received() { let mut test_data = TestData::prepare().await; - let message = authentication(vec![(random_peer_id(), String::from("other_addr"))]).await; + let message = message(1); - test_data.network.emit_event(MockEvent::Messages(vec![( - Protocol::Authentication, - message.clone().encode().into(), - )])); + test_data + .service + .handle_network_event(MockEvent::Messages(vec![( + PROTOCOL, + message.clone().encode().into(), + )])) + .expect("Should handle"); assert_eq!( test_data - .mock_io .messages_from_network .next() .await @@ -771,60 +547,4 @@ mod tests { test_data.cleanup().await } - - #[tokio::test] - async fn test_command_add_reserved() { - let mut test_data = TestData::prepare().await; - - let multiaddress: MockMultiaddress = (random_peer_id(), String::from("other_addr")); - - test_data - .mock_io - .commands_for_network - .unbounded_send(ConnectionCommand::AddReserved( - iter::once(multiaddress.clone()).collect(), - )) - .unwrap(); - - let expected = (multiaddress.0.clone(), vec![multiaddress]); - - assert_eq!( - test_data - .validator_network - .add_connection - .next() - .await - .expect("Should receive message"), - expected - ); - - test_data.cleanup().await - } - - #[tokio::test] - async fn test_command_remove_reserved() { - let mut test_data = TestData::prepare().await; - - let peer_id = random_peer_id(); - - test_data - .mock_io - .commands_for_network - .unbounded_send(ConnectionCommand::DelReserved( - iter::once(peer_id.clone()).collect(), - )) - .unwrap(); - - assert_eq!( - test_data - .validator_network - .remove_connection - .next() - .await - .expect("Should receive message"), - peer_id - ); - - test_data.cleanup().await - } } diff --git a/finality-aleph/src/nodes/validator_node.rs b/finality-aleph/src/nodes/validator_node.rs index ec06def4bb..677346b754 100644 --- a/finality-aleph/src/nodes/validator_node.rs +++ b/finality-aleph/src/nodes/validator_node.rs @@ -115,7 +115,7 @@ where session_map: session_authorities.clone(), }); - let (connection_io, network_io, session_io) = setup_io(); + let (connection_io, network_io, session_io) = setup_io(validator_network); let connection_manager = ConnectionManager::new( network_identity, @@ -130,12 +130,7 @@ where }; let session_manager = SessionManager::new(session_io); - let network = NetworkService::new( - network.clone(), - validator_network, - spawn_handle.clone(), - network_io, - ); + let network = NetworkService::new(network.clone(), spawn_handle.clone(), network_io); let network_task = async move { network.run().await }; spawn_handle.spawn("aleph/justification_handler", None, handler_task); diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index 13a243001f..fae9bbee0b 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -102,7 +102,7 @@ async fn prepare_one_session_test_data() -> TestData { let validator_network = MockValidatorNetwork::from(authorities[0].addresses(), authorities[0].peer_id()); - let (connection_io, network_io, session_io) = setup_io(); + let (connection_io, network_io, session_io) = setup_io(validator_network.clone()); let connection_manager = ConnectionManager::new( validator_network.clone(), @@ -111,12 +111,8 @@ async fn prepare_one_session_test_data() -> TestData { let session_manager = SessionManager::new(session_io); - let network_service = NetworkService::new( - network.clone(), - validator_network.clone(), - task_manager.spawn_handle(), - network_io, - ); + let network_service = + NetworkService::new(network.clone(), task_manager.spawn_handle(), network_io); let network_manager_task = async move { tokio::select! {