diff --git a/docker/Dockerfile.synthetic_network b/docker/Dockerfile.synthetic_network new file mode 100644 index 0000000000..7b67c65c6b --- /dev/null +++ b/docker/Dockerfile.synthetic_network @@ -0,0 +1,13 @@ +FROM syntheticnet:latest as synthnet + +FROM aleph-node:latest + +# Linux networking tools and node.js +RUN apt update && \ + apt install nodejs curl iproute2 iputils-ping net-tools netwox tcpdump gdb gdbserver stress -y + +COPY --from=synthnet /opt/lib/ /opt/lib/ +WORKDIR /opt/lib +ENTRYPOINT [] +ENV ENTRY="/node/docker_entrypoint.sh" +CMD ["/opt/lib/setup.sh"] \ No newline at end of file diff --git a/docker/docker-compose.synthetic-network.yml b/docker/docker-compose.synthetic-network.yml new file mode 100644 index 0000000000..a9438eddaa --- /dev/null +++ b/docker/docker-compose.synthetic-network.yml @@ -0,0 +1,96 @@ +services: + Node0: + extends: + file: docker-compose.base.yml + service: Node0 + image: aleph-node:syntheticnet + networks: + - synthetic-network + cap_add: + - NET_ADMIN + - NET_RAW + - SYS_PTRACE + environment: + - SYNTHETIC_NETWORK=10.77.0.0/16 + - PUBLIC_VALIDATOR_ADDRESS=Node0:30343 + ports: + - 3000:80 + + Node1: + extends: + file: docker-compose.base.yml + service: Node1 + image: aleph-node:syntheticnet + networks: + - synthetic-network + cap_add: + - NET_ADMIN + - NET_RAW + - SYS_PTRACE + environment: + - SYNTHETIC_NETWORK=10.77.0.0/16 + - PUBLIC_VALIDATOR_ADDRESS=Node1:30344 + - BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID + ports: + - 3001:80 + + Node2: + extends: + file: docker-compose.base.yml + service: Node2 + image: aleph-node:syntheticnet + networks: + - synthetic-network + cap_add: + - NET_ADMIN + - NET_RAW + - SYS_PTRACE + environment: + - SYNTHETIC_NETWORK=10.77.0.0/16 + - PUBLIC_VALIDATOR_ADDRESS=Node2:30345 + - BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID + ports: + - 3002:80 + + Node3: + extends: + file: docker-compose.base.yml + service: Node3 + image: aleph-node:syntheticnet + networks: + - synthetic-network + cap_add: + - NET_ADMIN + - NET_RAW + - SYS_PTRACE + environment: + - SYNTHETIC_NETWORK=10.77.0.0/16 + - PUBLIC_VALIDATOR_ADDRESS=Node3:30346 + - BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID + ports: + - 3003:80 + + Node4: + extends: + file: docker-compose.base.yml + service: Node4 + image: aleph-node:syntheticnet + networks: + - synthetic-network + cap_add: + - NET_ADMIN + - NET_RAW + - SYS_PTRACE + environment: + - SYNTHETIC_NETWORK=10.77.0.0/16 + - PUBLIC_VALIDATOR_ADDRESS=Node4:30347 + - BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID + ports: + - 3004:80 + +networks: + synthetic-network: + ipam: + config: + - subnet: 10.77.0.0/16 + diff --git a/finality-aleph/src/network/io.rs b/finality-aleph/src/network/io.rs index 868115214b..8557349581 100644 --- a/finality-aleph/src/network/io.rs +++ b/finality-aleph/src/network/io.rs @@ -1,42 +1,43 @@ use futures::channel::mpsc; -use crate::network::{ - manager::{DataInSession, VersionedAuthentication}, - ConnectionManagerIO, Data, Multiaddress, NetworkServiceIO as NetworkIO, SessionManagerIO, +use crate::{ + network::{ + manager::{DataInSession, VersionedAuthentication}, + ConnectionManagerIO, Data, Multiaddress, NetworkServiceIO as NetworkIO, SessionManagerIO, + }, + validator_network::{Network as ValidatorNetwork, PublicKey}, }; -type AuthenticationNetworkIO = NetworkIO, DataInSession, M>; +type AuthenticationNetworkIO = NetworkIO>; -pub fn setup() -> ( - ConnectionManagerIO, - AuthenticationNetworkIO, +pub fn setup< + D: Data, + M: Multiaddress + 'static, + VN: ValidatorNetwork>, +>( + validator_network: VN, +) -> ( + ConnectionManagerIO, + AuthenticationNetworkIO, SessionManagerIO, -) { +) +where + M::PeerId: PublicKey, +{ // Prepare and start the network - let (commands_for_network, commands_from_io) = mpsc::unbounded(); - let (data_for_network, data_from_user) = mpsc::unbounded(); let (messages_for_network, messages_from_user) = mpsc::unbounded(); let (commands_for_service, commands_from_user) = mpsc::unbounded(); let (messages_for_service, commands_from_manager) = mpsc::unbounded(); - let (data_for_user, data_from_network) = mpsc::unbounded(); let (messages_for_user, messages_from_network) = mpsc::unbounded(); let connection_io = ConnectionManagerIO::new( - commands_for_network, - data_for_network, messages_for_network, commands_from_user, commands_from_manager, - data_from_network, messages_from_network, + validator_network, ); - let channels_for_network = NetworkIO::new( - data_from_user, - messages_from_user, - data_for_user, - messages_for_user, - commands_from_io, - ); + let channels_for_network = NetworkIO::new(messages_from_user, messages_for_user); let channels_for_session_manager = SessionManagerIO::new(commands_for_service, messages_for_service); diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/manager/service.rs index 247d3c3e23..e52d75232a 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/manager/service.rs @@ -21,6 +21,7 @@ use crate::{ }, AddressedData, ConnectionCommand, Data, Multiaddress, NetworkIdentity, PeerId, }, + validator_network::{Network as ValidatorNetwork, PublicKey}, MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL, }; @@ -595,22 +596,22 @@ impl Service { } } -/// Input/output interface for the connectiona manager service. -pub struct IO { - commands_for_network: mpsc::UnboundedSender>, - data_for_network: mpsc::UnboundedSender, M::PeerId>>, +/// Input/output interface for the connection manager service. +pub struct IO>> +where + M::PeerId: PublicKey, +{ authentications_for_network: mpsc::UnboundedSender>, commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - data_from_network: mpsc::UnboundedReceiver>, authentications_from_network: mpsc::UnboundedReceiver>, + validator_network: VN, } /// Errors that can happen during the network service operations. #[derive(Debug, PartialEq, Eq)] pub enum Error { NetworkSend, - CommandSend, /// Should never be fatal. UserSend, /// Should never be fatal. @@ -620,31 +621,28 @@ pub enum Error { NetworkChannel, } -impl IO { +impl>> IO +where + M::PeerId: PublicKey, +{ pub fn new( - commands_for_network: mpsc::UnboundedSender>, - data_for_network: mpsc::UnboundedSender, M::PeerId>>, authentications_for_network: mpsc::UnboundedSender>, commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - data_from_network: mpsc::UnboundedReceiver>, authentications_from_network: mpsc::UnboundedReceiver>, - ) -> IO { + validator_network: VN, + ) -> IO { IO { - commands_for_network, - data_for_network, authentications_for_network, commands_from_user, messages_from_user, - data_from_network, authentications_from_network, + validator_network, } } - fn send_data(&self, to_send: AddressedData, M::PeerId>) -> Result<(), Error> { - self.data_for_network - .unbounded_send(to_send) - .map_err(|_| Error::NetworkSend) + fn send_data(&self, to_send: AddressedData, M::PeerId>) { + self.validator_network.send(to_send.0, to_send.1) } fn send_authentication(&self, to_send: DiscoveryMessage) -> Result<(), Error> { @@ -653,21 +651,32 @@ impl IO { .map_err(|_| Error::NetworkSend) } - fn send_command(&self, to_send: ConnectionCommand) -> Result<(), Error> { - self.commands_for_network - .unbounded_send(to_send) - .map_err(|_| Error::CommandSend) + fn handle_connection_command(&mut self, connection_command: ConnectionCommand) { + match connection_command { + ConnectionCommand::AddReserved(addresses) => { + for multi in addresses { + if let Some(peer_id) = multi.get_peer_id() { + self.validator_network.add_connection(peer_id, vec![multi]); + } + } + } + ConnectionCommand::DelReserved(peers) => { + for peer in peers { + self.validator_network.remove_connection(peer); + } + } + }; } - fn send( - &self, + fn handle_service_actions( + &mut self, ServiceActions { maybe_command, maybe_message, }: ServiceActions, ) -> Result<(), Error> { if let Some(command) = maybe_command { - self.send_command(command)?; + self.handle_connection_command(command); } if let Some(message) = maybe_message { self.send_authentication(message)?; @@ -695,7 +704,7 @@ impl IO { trace!(target: "aleph-network", "Manager received a command from user"); match maybe_command { Some(command) => match service.on_command(command).await { - Ok(to_send) => self.send(to_send)?, + Ok(to_send) => self.handle_service_actions(to_send)?, Err(e) => warn!(target: "aleph-network", "Failed to update handler: {:?}", e), }, None => return Err(Error::CommandsChannel), @@ -705,12 +714,12 @@ impl IO { trace!(target: "aleph-network", "Manager received a message from user"); match maybe_message { Some((message, session_id, recipient)) => for message in service.on_user_message(message, session_id, recipient) { - self.send_data(message)?; + self.send_data(message); }, None => return Err(Error::MessageChannel), } }, - maybe_data = self.data_from_network.next() => { + maybe_data = self.validator_network.next() => { trace!(target: "aleph-network", "Manager received some data from network"); match maybe_data { Some(DataInSession{data, session_id}) => if let Err(e) = service.send_session_data(&session_id, data) { @@ -727,7 +736,7 @@ impl IO { trace!(target: "aleph-network", "Manager received an authentication from network"); match maybe_authentication { Some(authentication) => match authentication.try_into() { - Ok(message) => self.send(service.on_discovery_message(message))?, + Ok(message) => self.handle_service_actions(service.on_discovery_message(message))?, Err(e) => warn!(target: "aleph-network", "Error casting versioned authentication to discovery message: {:?}", e), }, None => return Err(Error::NetworkChannel), @@ -736,7 +745,7 @@ impl IO { _ = maintenance.tick() => { debug!(target: "aleph-network", "Manager starts maintenence"); match service.retry_session_start().await { - Ok(to_send) => self.send(to_send)?, + Ok(to_send) => self.handle_service_actions(to_send)?, Err(e) => warn!(target: "aleph-network", "Retry failed to update handler: {:?}", e), } for to_send in service.discovery() { diff --git a/finality-aleph/src/network/mock.rs b/finality-aleph/src/network/mock.rs index 83db534947..2567b307b4 100644 --- a/finality-aleph/src/network/mock.rs +++ b/finality-aleph/src/network/mock.rs @@ -1,9 +1,4 @@ -use std::{ - collections::{HashSet, VecDeque}, - fmt, - sync::Arc, - time::Duration, -}; +use std::{collections::VecDeque, fmt, sync::Arc, time::Duration}; use aleph_primitives::KEY_TYPE; use async_trait::async_trait; @@ -17,10 +12,7 @@ use tokio::time::timeout; use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, - network::{ - manager::VersionedAuthentication, AddressedData, ConnectionCommand, Event, EventStream, - Multiaddress, Network, NetworkIdentity, NetworkSender, NetworkServiceIO, Protocol, - }, + network::{Event, EventStream, Network, NetworkIdentity, NetworkSender, Protocol}, testing::mocks::validator_network::{random_identity, MockMultiaddress}, validator_network::mock::MockPublicKey, AuthorityId, NodeIndex, @@ -97,51 +89,14 @@ impl Default for Channel { } } -pub type MockEvent = Event; +pub type MockEvent = Event; pub type MockData = Vec; -pub struct MockIO { - pub messages_for_network: mpsc::UnboundedSender>, - pub data_for_network: mpsc::UnboundedSender>, - pub messages_from_network: mpsc::UnboundedReceiver>, - pub data_from_network: mpsc::UnboundedReceiver, - pub commands_for_network: mpsc::UnboundedSender>, -} - -impl MockIO { - pub fn new() -> ( - MockIO, - NetworkServiceIO, MockData, M>, - ) { - let (messages_for_network, messages_from_user) = mpsc::unbounded(); - let (data_for_network, data_from_user) = mpsc::unbounded(); - let (messages_for_user, messages_from_network) = mpsc::unbounded(); - let (data_for_user, data_from_network) = mpsc::unbounded(); - let (commands_for_network, commands_from_manager) = mpsc::unbounded(); - ( - MockIO { - messages_for_network, - data_for_network, - messages_from_network, - data_from_network, - commands_for_network, - }, - NetworkServiceIO::new( - data_from_user, - messages_from_user, - data_for_user, - messages_for_user, - commands_from_manager, - ), - ) - } -} - pub struct MockEventStream(mpsc::UnboundedReceiver); #[async_trait] -impl EventStream for MockEventStream { +impl EventStream for MockEventStream { async fn next_event(&mut self) -> Option { self.0.next().await } @@ -172,8 +127,6 @@ impl NetworkSender for MockNetworkSender { #[derive(Clone)] pub struct MockNetwork { - pub add_reserved: Channel<(HashSet, Protocol)>, - pub remove_reserved: Channel<(HashSet, Protocol)>, pub send_message: Channel<(Vec, MockPublicKey, Protocol)>, pub event_sinks: Arc>>>, event_stream_taken_oneshot: Arc>>>, @@ -182,17 +135,11 @@ pub struct MockNetwork { } #[derive(Debug, Copy, Clone)] -pub enum MockSenderError { - SomeError, -} +pub struct MockSenderError; impl fmt::Display for MockSenderError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - MockSenderError::SomeError => { - write!(f, "Some error message") - } - } + write!(f, "Some error message") } } @@ -202,7 +149,6 @@ impl Network for MockNetwork { type SenderError = MockSenderError; type NetworkSender = MockNetworkSender; type PeerId = MockPublicKey; - type Multiaddress = MockMultiaddress; type EventStream = MockEventStream; fn event_stream(&self) -> Self::EventStream { @@ -232,21 +178,11 @@ impl Network for MockNetwork { error, }) } - - fn add_reserved(&self, addresses: HashSet, protocol: Protocol) { - self.add_reserved.send((addresses, protocol)); - } - - fn remove_reserved(&self, peers: HashSet, protocol: Protocol) { - self.remove_reserved.send((peers, protocol)); - } } impl MockNetwork { pub fn new(oneshot_sender: oneshot::Sender<()>) -> Self { MockNetwork { - add_reserved: Channel::new(), - remove_reserved: Channel::new(), send_message: Channel::new(), event_sinks: Arc::new(Mutex::new(vec![])), event_stream_taken_oneshot: Arc::new(Mutex::new(Some(oneshot_sender))), diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index cdffb48152..0870b296f6 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -59,7 +59,7 @@ pub trait PeerId: PartialEq + Eq + Clone + Debug + Display + Hash + Codec + Send } /// Represents the address of an arbitrary node. -pub trait Multiaddress: Debug + Hash + Codec + Clone + Eq + Send + Sync { +pub trait Multiaddress: Debug + Hash + Codec + Clone + Eq + Send + Sync + 'static { type PeerId: PeerId; /// Returns the peer id associated with this multiaddress if it exists and is unique. @@ -88,17 +88,15 @@ pub trait NetworkSender: Send + Sync + 'static { } #[derive(Clone)] -pub enum Event { - Connected(M), - Disconnected(P), +pub enum Event

{ StreamOpened(P, Protocol), StreamClosed(P, Protocol), Messages(Vec<(Protocol, Bytes)>), } #[async_trait] -pub trait EventStream { - async fn next_event(&mut self) -> Option>; +pub trait EventStream

{ + async fn next_event(&mut self) -> Option>; } /// Abstraction over a network. @@ -106,8 +104,7 @@ pub trait Network: Clone + Send + Sync + 'static { type SenderError: std::error::Error; type NetworkSender: NetworkSender; type PeerId: Clone + Debug + Eq + Hash + Send; - type Multiaddress: Debug + Eq + Hash; - type EventStream: EventStream; + type EventStream: EventStream; /// Returns a stream of events representing what happens on the network. fn event_stream(&self) -> Self::EventStream; @@ -118,12 +115,6 @@ pub trait Network: Clone + Send + Sync + 'static { peer_id: Self::PeerId, protocol: Protocol, ) -> Result; - - /// Add peers to one of the reserved sets. - fn add_reserved(&self, addresses: HashSet, protocol: Protocol); - - /// Remove peers from one of the reserved sets. - fn remove_reserved(&self, peers: HashSet, protocol: Protocol); } /// Abstraction for requesting own network addresses and PeerId. diff --git a/finality-aleph/src/network/service.rs b/finality-aleph/src/network/service.rs index 42c2ef954b..7c231e80bd 100644 --- a/finality-aleph/src/network/service.rs +++ b/finality-aleph/src/network/service.rs @@ -1,7 +1,6 @@ use std::{ collections::{HashMap, HashSet}, future::Future, - iter, }; use futures::{channel::mpsc, StreamExt}; @@ -11,11 +10,7 @@ use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound use tokio::time; use crate::{ - network::{ - AddressedData, ConnectionCommand, Data, Event, EventStream, Multiaddress, Network, - NetworkSender, Protocol, - }, - validator_network::{Network as ValidatorNetwork, PublicKey}, + network::{Data, Event, EventStream, Network, NetworkSender, Protocol}, STATUS_REPORT_INTERVAL, }; @@ -24,54 +19,30 @@ use crate::{ /// 1. Incoming network events /// 1. Messages are forwarded to the user. /// 2. Various forms of (dis)connecting, keeping track of all currently connected nodes. -/// 2. Commands from the network manager, modifying the reserved peer set. /// 3. Outgoing messages, sending them out, using 1.2. to broadcast. -/// Currently this also handles the validator network for sending in-session data, but this is -/// likely to change in the future. -pub struct Service< - N: Network, - D: Data, - VD: Data, - A: Data + Multiaddress, - VN: ValidatorNetwork, -> where - A::PeerId: PublicKey, -{ +pub struct Service { network: N, - validator_network: VN, - data_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver, - data_for_user: mpsc::UnboundedSender, messages_for_user: mpsc::UnboundedSender, - commands_from_manager: mpsc::UnboundedReceiver>, authentication_connected_peers: HashSet, authentication_peer_senders: HashMap>, spawn_handle: SpawnTaskHandle, } /// Input/output channels for the network service. -pub struct IO { - pub data_from_user: mpsc::UnboundedReceiver>, +pub struct IO { pub messages_from_user: mpsc::UnboundedReceiver, - pub data_for_user: mpsc::UnboundedSender, pub messages_for_user: mpsc::UnboundedSender, - pub commands_from_manager: mpsc::UnboundedReceiver>, } -impl IO { +impl IO { pub fn new( - data_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver, - data_for_user: mpsc::UnboundedSender, messages_for_user: mpsc::UnboundedSender, - commands_from_manager: mpsc::UnboundedReceiver>, - ) -> IO { + ) -> IO { IO { - data_from_user, messages_from_user, - data_for_user, messages_for_user, - commands_from_manager, } } } @@ -82,30 +53,12 @@ enum SendError { SendingFailed, } -impl< - N: Network, - D: Data, - VD: Data, - A: Data + Multiaddress, - VN: ValidatorNetwork, - > Service -where - A::PeerId: PublicKey, -{ - pub fn new( - network: N, - validator_network: VN, - spawn_handle: SpawnTaskHandle, - io: IO, - ) -> Service { +impl Service { + pub fn new(network: N, spawn_handle: SpawnTaskHandle, io: IO) -> Service { Service { network, - validator_network, - data_from_user: io.data_from_user, messages_from_user: io.messages_from_user, - data_for_user: io.data_for_user, messages_for_user: io.messages_for_user, - commands_from_manager: io.commands_from_manager, spawn_handle, authentication_connected_peers: HashSet::new(), authentication_peer_senders: HashMap::new(), @@ -193,20 +146,10 @@ where fn handle_network_event( &mut self, - event: Event, + event: Event, ) -> Result<(), mpsc::TrySendError> { use Event::*; match event { - Connected(multiaddress) => { - trace!(target: "aleph-network", "Connected event from address {:?}", multiaddress); - self.network - .add_reserved(iter::once(multiaddress).collect(), Protocol::Authentication); - } - Disconnected(peer) => { - trace!(target: "aleph-network", "Disconnected event for peer {:?}", peer); - self.network - .remove_reserved(iter::once(peer).collect(), Protocol::Authentication); - } StreamOpened(peer, protocol) => { trace!(target: "aleph-network", "StreamOpened event for peer {:?} and the protocol {:?}.", peer, protocol); let rx = match &protocol { @@ -248,28 +191,6 @@ where Ok(()) } - fn handle_validator_network_data(&mut self, data: VD) -> Result<(), mpsc::TrySendError> { - self.data_for_user.unbounded_send(data) - } - - fn on_manager_command(&mut self, command: ConnectionCommand) { - use ConnectionCommand::*; - match command { - AddReserved(addresses) => { - for multi in addresses { - if let Some(peer_id) = multi.get_peer_id() { - self.validator_network.add_connection(peer_id, vec![multi]); - } - } - } - DelReserved(peers) => { - for peer in peers { - self.validator_network.remove_connection(peer); - } - } - } - } - fn status_report(&self) { let mut status = String::from("Network status report: "); @@ -297,22 +218,6 @@ where return; } }, - maybe_data = self.validator_network.next() => match maybe_data { - Some(data) => if let Err(e) = self.handle_validator_network_data(data) { - error!(target: "aleph-network", "Cannot forward messages to user: {:?}", e); - return; - }, - None => { - error!(target: "aleph-network", "Validator network event stream ended."); - } - }, - maybe_data = self.data_from_user.next() => match maybe_data { - Some((data, peer_id)) => self.validator_network.send(data, peer_id), - None => { - error!(target: "aleph-network", "User data stream ended."); - return; - } - }, maybe_message = self.messages_from_user.next() => match maybe_message { Some(message) => self.broadcast(message, Protocol::Authentication), None => { @@ -320,13 +225,6 @@ where return; } }, - maybe_command = self.commands_from_manager.next() => match maybe_command { - Some(command) => self.on_manager_command(command), - None => { - error!(target: "aleph-network", "Manager command stream ended."); - return; - } - }, _ = status_ticker.tick() => { self.status_report(); }, @@ -337,34 +235,31 @@ where #[cfg(test)] mod tests { - use std::{collections::HashSet, iter, iter::FromIterator}; + use std::collections::HashSet; use codec::Encode; - use futures::{channel::oneshot, StreamExt}; + use futures::{ + channel::{mpsc, oneshot}, + StreamExt, + }; use sc_service::TaskManager; - use tokio::{runtime::Handle, task::JoinHandle}; + use tokio::runtime::Handle; - use super::{ConnectionCommand, Service}; + use super::Service; use crate::{ network::{ - manager::{SessionHandler, VersionedAuthentication}, - mock::{crypto_basics, MockData, MockEvent, MockIO, MockNetwork, MockSenderError}, - testing::DiscoveryMessage, - NetworkIdentity, Protocol, - }, - testing::mocks::validator_network::{ - random_multiaddress, random_peer_id, MockMultiaddress, - MockNetwork as MockValidatorNetwork, + mock::{MockData, MockEvent, MockNetwork, MockSenderError}, + NetworkServiceIO, Protocol, }, - SessionId, + testing::mocks::validator_network::random_peer_id, }; + const PROTOCOL: Protocol = Protocol::Authentication; + pub struct TestData { - pub service_handle: JoinHandle<()>, - pub exit_tx: oneshot::Sender<()>, pub network: MockNetwork, - pub validator_network: MockValidatorNetwork, - pub mock_io: MockIO, + pub messages_from_network: mpsc::UnboundedReceiver, + pub service: Service, // `TaskManager` can't be dropped for `SpawnTaskHandle` to work _task_manager: TaskManager, } @@ -374,59 +269,28 @@ mod tests { let task_manager = TaskManager::new(Handle::current(), None).unwrap(); // Prepare communication with service - let (mock_io, io) = MockIO::new(); + // We can drop the sender, as we will call service.broadcast directly + let (_, messages_from_user) = mpsc::unbounded(); + let (messages_for_user, messages_from_network) = mpsc::unbounded(); + let io = NetworkServiceIO::new(messages_from_user, messages_for_user); + // Event stream will never be taken, so we can drop the receiver + let (event_stream_oneshot_tx, _) = oneshot::channel(); + // Prepare service - let (event_stream_oneshot_tx, event_stream_oneshot_rx) = oneshot::channel(); let network = MockNetwork::new(event_stream_oneshot_tx); - let validator_network = MockValidatorNetwork::new("addr").await; - let service = Service::new( - network.clone(), - validator_network.clone(), - task_manager.spawn_handle(), - io, - ); - let (exit_tx, exit_rx) = oneshot::channel(); - let task_handle = async move { - tokio::select! { - _ = service.run() => { }, - _ = exit_rx => { }, - }; - }; - let service_handle = tokio::spawn(task_handle); - // wait till service takes the event_stream - event_stream_oneshot_rx.await.unwrap(); + let service = Service::new(network.clone(), task_manager.spawn_handle(), io); - // `TaskManager` needs to be passed. + // `TaskManager` needs to be passed, so sender threads are running in background. Self { - service_handle, - exit_tx, network, - validator_network, - mock_io, + service, + messages_from_network, _task_manager: task_manager, } } async fn cleanup(self) { - self.exit_tx.send(()).unwrap(); - self.service_handle.await.unwrap(); self.network.close_channels().await; - self.validator_network.close_channels().await; - } - - // We do this only to make sure that NotificationStreamOpened/NotificationStreamClosed events are handled - async fn wait_for_events_handled(&mut self) { - let address = random_multiaddress(); - self.network - .emit_event(MockEvent::Connected(address.clone())); - assert_eq!( - self.network - .add_reserved - .next() - .await - .expect("Should receive message"), - (iter::once(address).collect(), Protocol::Authentication,) - ); } } @@ -434,72 +298,6 @@ mod tests { vec![i, i + 1, i + 2] } - async fn authentication( - multiaddresses: Vec, - ) -> VersionedAuthentication { - let crypto_basics = crypto_basics(1).await; - let handler = SessionHandler::new( - Some(crypto_basics.0[0].clone()), - crypto_basics.1.clone(), - SessionId(43), - multiaddresses, - ) - .await - .unwrap(); - VersionedAuthentication::V1(DiscoveryMessage::AuthenticationBroadcast( - handler.authentication().unwrap(), - )) - } - - #[tokio::test] - async fn test_sync_connected() { - let mut test_data = TestData::prepare().await; - - let address = random_multiaddress(); - test_data - .network - .emit_event(MockEvent::Connected(address.clone())); - - let expected = (iter::once(address).collect(), Protocol::Authentication); - - assert_eq!( - test_data - .network - .add_reserved - .next() - .await - .expect("Should receive message"), - expected - ); - - test_data.cleanup().await - } - - #[tokio::test] - async fn test_sync_disconnected() { - let mut test_data = TestData::prepare().await; - - let peer_id = random_peer_id(); - - test_data - .network - .emit_event(MockEvent::Disconnected(peer_id.clone())); - - let expected = (iter::once(peer_id).collect(), Protocol::Authentication); - - assert_eq!( - test_data - .network - .remove_reserved - .next() - .await - .expect("Should receive message"), - expected - ); - - test_data.cleanup().await - } - #[tokio::test] async fn test_notification_stream_opened() { let mut test_data = TestData::prepare().await; @@ -507,21 +305,14 @@ mod tests { let peer_ids: Vec<_> = (0..3).map(|_| random_peer_id()).collect(); peer_ids.iter().for_each(|peer_id| { - test_data.network.emit_event(MockEvent::StreamOpened( - peer_id.clone(), - Protocol::Authentication, - )); + test_data + .service + .handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL)) + .expect("Should handle"); }); - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; - - let message = authentication(test_data.validator_network.identity().0).await; - test_data - .mock_io - .messages_for_network - .unbounded_send(message.clone()) - .unwrap(); + let message = message(1); + test_data.service.broadcast(message.clone(), PROTOCOL); let broadcasted_messages = HashSet::<_>::from_iter( test_data @@ -535,7 +326,7 @@ mod tests { let expected_messages = HashSet::from_iter( peer_ids .into_iter() - .map(|peer_id| (message.clone().encode(), peer_id, Protocol::Authentication)), + .map(|peer_id| (message.clone().encode(), peer_id, PROTOCOL)), ); assert_eq!(broadcasted_messages, expected_messages); @@ -551,31 +342,24 @@ mod tests { let opened_authorities_n = 2; peer_ids.iter().for_each(|peer_id| { - test_data.network.emit_event(MockEvent::StreamOpened( - peer_id.clone(), - Protocol::Authentication, - )); + test_data + .service + .handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL)) + .expect("Should handle"); }); peer_ids .iter() .skip(opened_authorities_n) .for_each(|peer_id| { - test_data.network.emit_event(MockEvent::StreamClosed( - peer_id.clone(), - Protocol::Authentication, - )); + test_data + .service + .handle_network_event(MockEvent::StreamClosed(peer_id.clone(), PROTOCOL)) + .expect("Should handle"); }); - // We do this only to make sure that NotificationStreamClosed events are handled - test_data.wait_for_events_handled().await; - - let message = authentication(test_data.validator_network.identity().0).await; - test_data - .mock_io - .messages_for_network - .unbounded_send(message.clone()) - .unwrap(); + let message = message(1); + test_data.service.broadcast(message.clone(), PROTOCOL); let broadcasted_messages = HashSet::<_>::from_iter( test_data @@ -590,7 +374,7 @@ mod tests { peer_ids .into_iter() .take(opened_authorities_n) - .map(|peer_id| (message.clone().encode(), peer_id, Protocol::Authentication)), + .map(|peer_id| (message.clone().encode(), peer_id, PROTOCOL)), ); assert_eq!(broadcasted_messages, expected_messages); @@ -598,56 +382,6 @@ mod tests { test_data.cleanup().await } - #[tokio::test] - async fn test_send_validator_data() { - let mut test_data = TestData::prepare().await; - - let peer_id = random_peer_id(); - - let message = message(1); - - test_data - .mock_io - .data_for_network - .unbounded_send((message.clone(), peer_id.clone())) - .unwrap(); - - let expected = (message, peer_id); - - assert_eq!( - test_data - .validator_network - .send - .next() - .await - .expect("Should receive message"), - expected, - ); - - test_data.cleanup().await - } - - #[tokio::test] - async fn test_receives_validator_data() { - let mut test_data = TestData::prepare().await; - - let message = message(1); - - test_data.validator_network.next.send(message.clone()); - - assert_eq!( - test_data - .mock_io - .data_from_network - .next() - .await - .expect("Should receive message"), - message, - ); - - test_data.cleanup().await - } - #[tokio::test] async fn test_create_sender_error() { let mut test_data = TestData::prepare().await; @@ -656,34 +390,23 @@ mod tests { .network .create_sender_errors .lock() - .push_back(MockSenderError::SomeError); + .push_back(MockSenderError); let peer_id = random_peer_id(); - let message_1 = authentication(vec![(random_peer_id(), String::from("other_1"))]).await; - let message_2 = authentication(vec![(random_peer_id(), String::from("other_2"))]).await; - - test_data.network.emit_event(MockEvent::StreamOpened( - peer_id.clone(), - Protocol::Authentication, - )); - - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; + let message_1 = message(1); + let message_2 = message(4); test_data - .mock_io - .messages_for_network - .unbounded_send(message_1) - .unwrap(); + .service + .handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL)) + .expect("Should handle"); - test_data - .mock_io - .messages_for_network - .unbounded_send(message_2.clone()) - .unwrap(); + test_data.service.broadcast(message_1, PROTOCOL); + + test_data.service.broadcast(message_2.clone(), PROTOCOL); - let expected = (message_2.encode(), peer_id, Protocol::Authentication); + let expected = (message_2.encode(), peer_id, PROTOCOL); assert_eq!( test_data @@ -706,34 +429,23 @@ mod tests { .network .send_errors .lock() - .push_back(MockSenderError::SomeError); + .push_back(MockSenderError); let peer_id = random_peer_id(); - let message_1 = authentication(vec![(random_peer_id(), String::from("other_1"))]).await; - let message_2 = authentication(vec![(random_peer_id(), String::from("other_2"))]).await; - - test_data.network.emit_event(MockEvent::StreamOpened( - peer_id.clone(), - Protocol::Authentication, - )); - - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; + let message_1 = message(1); + let message_2 = message(4); test_data - .mock_io - .messages_for_network - .unbounded_send(message_1) - .unwrap(); + .service + .handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL)) + .expect("Should handle"); - test_data - .mock_io - .messages_for_network - .unbounded_send(message_2.clone()) - .unwrap(); + test_data.service.broadcast(message_1, PROTOCOL); + + test_data.service.broadcast(message_2.clone(), PROTOCOL); - let expected = (message_2.encode(), peer_id, Protocol::Authentication); + let expected = (message_2.encode(), peer_id, PROTOCOL); assert_eq!( test_data @@ -752,16 +464,18 @@ mod tests { async fn test_notification_received() { let mut test_data = TestData::prepare().await; - let message = authentication(vec![(random_peer_id(), String::from("other_addr"))]).await; + let message = message(1); - test_data.network.emit_event(MockEvent::Messages(vec![( - Protocol::Authentication, - message.clone().encode().into(), - )])); + test_data + .service + .handle_network_event(MockEvent::Messages(vec![( + PROTOCOL, + message.clone().encode().into(), + )])) + .expect("Should handle"); assert_eq!( test_data - .mock_io .messages_from_network .next() .await @@ -771,60 +485,4 @@ mod tests { test_data.cleanup().await } - - #[tokio::test] - async fn test_command_add_reserved() { - let mut test_data = TestData::prepare().await; - - let multiaddress: MockMultiaddress = (random_peer_id(), String::from("other_addr")); - - test_data - .mock_io - .commands_for_network - .unbounded_send(ConnectionCommand::AddReserved( - iter::once(multiaddress.clone()).collect(), - )) - .unwrap(); - - let expected = (multiaddress.0.clone(), vec![multiaddress]); - - assert_eq!( - test_data - .validator_network - .add_connection - .next() - .await - .expect("Should receive message"), - expected - ); - - test_data.cleanup().await - } - - #[tokio::test] - async fn test_command_remove_reserved() { - let mut test_data = TestData::prepare().await; - - let peer_id = random_peer_id(); - - test_data - .mock_io - .commands_for_network - .unbounded_send(ConnectionCommand::DelReserved( - iter::once(peer_id.clone()).collect(), - )) - .unwrap(); - - assert_eq!( - test_data - .validator_network - .remove_connection - .next() - .await - .expect("Should receive message"), - peer_id - ); - - test_data.cleanup().await - } } diff --git a/finality-aleph/src/nodes/validator_node.rs b/finality-aleph/src/nodes/validator_node.rs index ec06def4bb..677346b754 100644 --- a/finality-aleph/src/nodes/validator_node.rs +++ b/finality-aleph/src/nodes/validator_node.rs @@ -115,7 +115,7 @@ where session_map: session_authorities.clone(), }); - let (connection_io, network_io, session_io) = setup_io(); + let (connection_io, network_io, session_io) = setup_io(validator_network); let connection_manager = ConnectionManager::new( network_identity, @@ -130,12 +130,7 @@ where }; let session_manager = SessionManager::new(session_io); - let network = NetworkService::new( - network.clone(), - validator_network, - spawn_handle.clone(), - network_io, - ); + let network = NetworkService::new(network.clone(), spawn_handle.clone(), network_io); let network_task = async move { network.run().await }; spawn_handle.spawn("aleph/justification_handler", None, handler_task); diff --git a/finality-aleph/src/substrate_network.rs b/finality-aleph/src/substrate_network.rs index 5ce9e126a3..0f3e0b2bae 100644 --- a/finality-aleph/src/substrate_network.rs +++ b/finality-aleph/src/substrate_network.rs @@ -1,12 +1,12 @@ -use std::{collections::HashSet, fmt, iter, pin::Pin, sync::Arc}; +use std::{fmt, iter, pin::Pin, sync::Arc}; use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; -use log::error; +use log::{error, trace}; use sc_consensus::JustificationSyncLink; use sc_network::{ - multiaddr::Protocol as MultiaddressProtocol, Event as SubstrateEvent, Multiaddr, - NetworkService, NetworkSyncForkRequest, PeerId, + multiaddr::Protocol as MultiaddressProtocol, Event as SubstrateEvent, NetworkService, + NetworkSyncForkRequest, PeerId, }; use sc_network_common::{ protocol::ProtocolName, @@ -122,22 +122,40 @@ impl NetworkSender for SubstrateNetworkSender { } } -type NetworkEventStream = Pin + Send>>; +pub struct NetworkEventStream { + stream: Pin + Send>>, + network: Arc>, +} #[async_trait] -impl EventStream for NetworkEventStream { - async fn next_event(&mut self) -> Option> { +impl EventStream for NetworkEventStream { + async fn next_event(&mut self) -> Option> { use Event::*; use SubstrateEvent::*; loop { - match self.next().await { + match self.stream.next().await { Some(event) => match event { SyncConnected { remote } => { - return Some(Connected( - iter::once(MultiaddressProtocol::P2p(remote.into())).collect(), - )) + let multiaddress = + iter::once(MultiaddressProtocol::P2p(remote.into())).collect(); + trace!(target: "aleph-network", "Connected event from address {:?}", multiaddress); + if let Err(e) = self.network.add_peers_to_reserved_set( + protocol_name(&Protocol::Authentication), + iter::once(multiaddress).collect(), + ) { + error!(target: "aleph-network", "add_reserved failed: {}", e); + } + continue; + } + SyncDisconnected { remote } => { + trace!(target: "aleph-network", "Disconnected event for peer {:?}", remote); + let addresses = iter::once(remote).collect(); + self.network.remove_peers_from_reserved_set( + protocol_name(&Protocol::Authentication), + addresses, + ); + continue; } - SyncDisconnected { remote } => return Some(Disconnected(remote)), NotificationStreamOpened { remote, protocol, .. } => match to_protocol(protocol.as_ref()) { @@ -177,11 +195,13 @@ impl Network for Arc> { type SenderError = SenderError; type NetworkSender = SubstrateNetworkSender; type PeerId = PeerId; - type Multiaddress = Multiaddr; - type EventStream = NetworkEventStream; + type EventStream = NetworkEventStream; fn event_stream(&self) -> Self::EventStream { - Box::pin(self.as_ref().event_stream("aleph-network")) + NetworkEventStream { + stream: Box::pin(self.as_ref().event_stream("aleph-network")), + network: self.clone(), + } } fn sender( @@ -198,17 +218,4 @@ impl Network for Arc> { peer_id, }) } - - fn add_reserved(&self, addresses: HashSet, protocol: Protocol) { - if let Err(e) = self - .add_peers_to_reserved_set(protocol_name(&protocol), addresses.into_iter().collect()) - { - error!(target: "aleph-network", "add_reserved failed: {}", e); - } - } - - fn remove_reserved(&self, peers: HashSet, protocol: Protocol) { - let addresses = peers.into_iter().collect(); - self.remove_peers_from_reserved_set(protocol_name(&protocol), addresses); - } } diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index 13a243001f..fae9bbee0b 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -102,7 +102,7 @@ async fn prepare_one_session_test_data() -> TestData { let validator_network = MockValidatorNetwork::from(authorities[0].addresses(), authorities[0].peer_id()); - let (connection_io, network_io, session_io) = setup_io(); + let (connection_io, network_io, session_io) = setup_io(validator_network.clone()); let connection_manager = ConnectionManager::new( validator_network.clone(), @@ -111,12 +111,8 @@ async fn prepare_one_session_test_data() -> TestData { let session_manager = SessionManager::new(session_io); - let network_service = NetworkService::new( - network.clone(), - validator_network.clone(), - task_manager.spawn_handle(), - network_io, - ); + let network_service = + NetworkService::new(network.clone(), task_manager.spawn_handle(), network_io); let network_manager_task = async move { tokio::select! { diff --git a/scripts/build_synthetic-network.sh b/scripts/build_synthetic-network.sh new file mode 100755 index 0000000000..ddb803829c --- /dev/null +++ b/scripts/build_synthetic-network.sh @@ -0,0 +1,35 @@ +#!/bin/env bash + +set -euo pipefail + +source ./scripts/common.sh + +GIT_COMMIT=${GIT_COMMIT:-72bbb4fde915e4132c19cd7ce3605364abac58a5} + +TMPDIR="$(dirname $0)/vendor" +mkdir -p $TMPDIR +log "created a temporary folder at $TMPDIR" + +pushd . + +log "cloning synthetic-network's git repo" +cd $TMPDIR +if [[ ! -d ./synthetic-network ]]; then + git clone https://github.com/daily-co/synthetic-network.git +fi +cd synthetic-network +git fetch origin +git checkout $GIT_COMMIT + +log "building base docker image for synthetic-network with support for synthetic-network" +log "patching synthetic network" +# aleph-node crashes since it uses newer glibc than this image +sed -i 's/FROM node:12.20.2/FROM node:19.2/' Dockerfile +docker build -t syntheticnet . + +popd + +log "building docker image for aleph-node that supports synthetic-network" +docker build -t aleph-node:syntheticnet -f docker/Dockerfile.synthetic_network . + +exit 0 diff --git a/scripts/common.sh b/scripts/common.sh index 7e575d265b..9df43ce744 100644 --- a/scripts/common.sh +++ b/scripts/common.sh @@ -103,7 +103,7 @@ function get_last_block() { local validator=$1 local rpc_port=$2 - local last_block=0 + local last_block="" while [[ -z "$last_block" ]]; do last_block=$(retrieve_last_block $validator $rpc_port) sleep 1 diff --git a/scripts/run_consensus_synthetic-network.sh b/scripts/run_consensus_synthetic-network.sh new file mode 100755 index 0000000000..3ebd19bf08 --- /dev/null +++ b/scripts/run_consensus_synthetic-network.sh @@ -0,0 +1,63 @@ +#!/bin/env bash + +set -euo pipefail + +source ./scripts/common.sh + +function usage(){ + cat << EOF +Usage: + $0 + This script allows you to run aleph-node within docker and simulate some custom network conditions, e.g. delays, rate limit, + package loss. Additionally, each node is preinstalled with the 'stress' tool, that allows to simulate high occupancy of nodes + cpu and io. It should allow us test more realistic high volume network conditions without the need to spawn hundreds of + aws instances. For more details on networking part of this solution, visit https://github.com/daily-co/synthetic-network . + IMPORTANT: this script requires aleph-node:latest docker image. + --no-build-image + skip docker image build + --commit 72bbb4fde915e4132c19cd7ce3605364abac58a5 + commit hash used to build synthetic-network, default is 72bbb4fde915e4132c19cd7ce3605364abac58a5 +EOF + exit 0 +} + +function build_test_image() { + local commit=$1 + + GIT_COMMIT=$commit ./scripts/build_synthetic-network.sh +} + +while [[ $# -gt 0 ]]; do + case $1 in + --no-build-image) + BUILD_IMAGE=false + shift + ;; + --commit) + GIT_COMMIT="$2" + shift;shift + ;; + --help) + usage + shift + ;; + *) + error "Unrecognized argument $1!" + ;; + esac +done + +BUILD_IMAGE=${BUILD_IMAGE:-true} +GIT_COMMIT=${GIT_COMMIT:-72bbb4fde915e4132c19cd7ce3605364abac58a5} + +if [[ "$BUILD_IMAGE" = true ]]; then + log "building custom docker image for synthetic-network tests" + build_test_image $GIT_COMMIT +fi + +log "running synthetic-network" +DOCKER_COMPOSE=./docker/docker-compose.synthetic-network.yml ./.github/scripts/run_consensus.sh +log "open a web browser at http://localhost:3000 (port 3000 is Node0, 3001 is Node1, ...)" +xdg-open http://localhost:3000 + +exit 0 diff --git a/scripts/run_script_for_synthetic-network.sh b/scripts/run_script_for_synthetic-network.sh new file mode 100755 index 0000000000..12a0e363e4 --- /dev/null +++ b/scripts/run_script_for_synthetic-network.sh @@ -0,0 +1,65 @@ +#!/bin/env bash + +set -euo pipefail + +source ./scripts/common.sh + +function usage(){ + cat << EOF +Usage: + $0 + This script allows you to run a custom .js script using the synthetic-network network simulation tool. + IMPORTANT: first you need to call 'scripts/run_consensus_synthetic-network.sh' and let it run in background. + It spawns docker-compose configured with synthetic-network. + It requires node.js to run. + --commit 72bbb4fde915e4132c19cd7ce3605364abac58a5 + commit hash used to build synthetic-network, default is 72bbb4fde915e4132c19cd7ce3605364abac58a5 + --script-path scripts/vendor/synthetic-network/frontend/udp_rate_sine_demo.js + path to a synthetic-network scrypt. Default is a demo scripts/vendor/synthetic-network/frontend/udp_rate_sine_demo.js + from the synthetic-network repo. Please consult synthetic-network repo for details: https://github.com/daily-co/synthetic-network +EOF + exit 0 +} + +while [[ $# -gt 0 ]]; do + case $1 in + --commit) + GIT_COMMIT="$2" + shift;shift + ;; + --script-path) + SCRIPT_PATH="$2" + shift;shift + ;; + --help) + usage + shift + ;; + *) + error "Unrecognized argument $1!" + ;; + esac +done + +GIT_COMMIT=${GIT_COMMIT:-72bbb4fde915e4132c19cd7ce3605364abac58a5} +SCRIPT_PATH=${SCRIPT_PATH:-scripts/vendor/synthetic-network/frontend/udp_rate_sine_demo.js} +SCRIPT_PATH=$(realpath $SCRIPT_PATH) + +TMPDIR="$(dirname $0)/vendor" +mkdir -p $TMPDIR +log "created a temporary folder at $TMPDIR" + +log "cloning synthetic-network's git repo" +cd $TMPDIR +if [[ ! -d ./synthetic-network ]]; then + git clone https://github.com/daily-co/synthetic-network.git +fi +cd synthetic-network +git fetch origin +git checkout $GIT_COMMIT +cd frontend + +log "running .js script" +node $SCRIPT_PATH + +exit 0