diff --git a/finality-aleph/src/abft/current.rs b/finality-aleph/src/abft/current.rs index 15744d7f63..f647b5c791 100644 --- a/finality-aleph/src/abft/current.rs +++ b/finality-aleph/src/abft/current.rs @@ -12,7 +12,7 @@ use crate::{ }, crypto::Signature, data_io::{AlephData, OrderedDataInterpreter}, - network::DataNetwork, + network::data::Network, oneshot, party::{ backup::ABFTBackup, @@ -27,7 +27,7 @@ pub const VERSION: u32 = 1; pub fn run_member< B: Block, C: HeaderBackend + Send + 'static, - ADN: DataNetwork> + 'static, + ADN: Network> + 'static, >( subtask_common: SubtaskCommon, multikeychain: Keychain, diff --git a/finality-aleph/src/abft/legacy.rs b/finality-aleph/src/abft/legacy.rs index b8613c2710..be749d526b 100644 --- a/finality-aleph/src/abft/legacy.rs +++ b/finality-aleph/src/abft/legacy.rs @@ -11,7 +11,7 @@ use crate::{ NetworkWrapper, SpawnHandleT, }, data_io::{AlephData, OrderedDataInterpreter}, - network::DataNetwork, + network::data::Network, oneshot, party::{ backup::ABFTBackup, @@ -26,7 +26,7 @@ pub const VERSION: u32 = 0; pub fn run_member< B: Block, C: HeaderBackend + Send + 'static, - ADN: DataNetwork> + 'static, + ADN: Network> + 'static, >( subtask_common: SubtaskCommon, multikeychain: Keychain, diff --git a/finality-aleph/src/abft/network.rs b/finality-aleph/src/abft/network.rs index 6cb22b57ab..ee1cac44c6 100644 --- a/finality-aleph/src/abft/network.rs +++ b/finality-aleph/src/abft/network.rs @@ -7,7 +7,7 @@ use crate::{ abft::SignatureSet, crypto::Signature, data_io::{AlephData, AlephNetworkMessage}, - network::{Data, DataNetwork}, + network::{data::Network, Data}, Hasher, Recipient, }; @@ -34,12 +34,12 @@ impl AlephNetworkMessage } /// A wrapper needed only because of type system theoretical constraints. Sadness. -pub struct NetworkWrapper> { +pub struct NetworkWrapper> { inner: DN, _phantom: PhantomData, } -impl> From for NetworkWrapper { +impl> From for NetworkWrapper { fn from(inner: DN) -> Self { NetworkWrapper { inner, @@ -48,7 +48,7 @@ impl> From for NetworkWrapper { } } -impl> NetworkWrapper { +impl> NetworkWrapper { fn send(&self, data: D, recipient: R) where R: Into, @@ -64,7 +64,7 @@ impl> NetworkWrapper { } #[async_trait::async_trait] -impl> current_aleph_bft::Network for NetworkWrapper { +impl> current_aleph_bft::Network for NetworkWrapper { fn send(&self, data: D, recipient: current_aleph_bft::Recipient) { NetworkWrapper::send(self, data, recipient) } @@ -75,7 +75,7 @@ impl> current_aleph_bft::Network for NetworkWrapp } #[async_trait::async_trait] -impl> legacy_aleph_bft::Network for NetworkWrapper { +impl> legacy_aleph_bft::Network for NetworkWrapper { fn send(&self, data: D, recipient: legacy_aleph_bft::Recipient) { NetworkWrapper::send(self, data, recipient) } diff --git a/finality-aleph/src/aggregation/mod.rs b/finality-aleph/src/aggregation/mod.rs index 728da661c9..c8f077a47c 100644 --- a/finality-aleph/src/aggregation/mod.rs +++ b/finality-aleph/src/aggregation/mod.rs @@ -11,7 +11,10 @@ use crate::{ crypto::Signature, metrics::Checkpoint, mpsc, - network::{Data, DataNetwork, SendError}, + network::{ + data::{Network, SendError}, + Data, + }, Keychain, Metrics, }; @@ -50,8 +53,8 @@ pub type CurrentAggregator<'a, B, N> = current_aleph_aggregator::IO< enum EitherAggregator<'a, B, CN, LN> where B: Block, - LN: DataNetwork>, - CN: DataNetwork>, + LN: Network>, + CN: Network>, ::Hash: AsRef<[u8]>, { Current(CurrentAggregator<'a, B, CN>), @@ -63,8 +66,8 @@ where pub struct Aggregator<'a, B, CN, LN> where B: Block, - LN: DataNetwork>, - CN: DataNetwork>, + LN: Network>, + CN: Network>, ::Hash: AsRef<[u8]>, { agg: EitherAggregator<'a, B, CN, LN>, @@ -73,8 +76,8 @@ where impl<'a, B, CN, LN> Aggregator<'a, B, CN, LN> where B: Block, - LN: DataNetwork>, - CN: DataNetwork>, + LN: Network>, + CN: Network>, ::Hash: AsRef<[u8]>, { pub fn new_legacy( @@ -163,9 +166,9 @@ where } } -pub struct NetworkWrapper>(N, PhantomData); +pub struct NetworkWrapper>(N, PhantomData); -impl> NetworkWrapper { +impl> NetworkWrapper { pub fn new(network: N) -> Self { Self(network, PhantomData) } @@ -186,7 +189,7 @@ impl current_aleph_aggregator::Metrics f #[async_trait::async_trait] impl legacy_aleph_aggregator::ProtocolSink for NetworkWrapper where - T: DataNetwork, + T: Network, D: Data, { async fn next(&mut self) -> Option { @@ -207,7 +210,7 @@ where #[async_trait::async_trait] impl current_aleph_aggregator::ProtocolSink for NetworkWrapper where - T: DataNetwork, + T: Network, D: Data, { async fn next(&mut self) -> Option { diff --git a/finality-aleph/src/data_io/data_store.rs b/finality-aleph/src/data_io/data_store.rs index bd9311a51c..55610ddfd9 100644 --- a/finality-aleph/src/data_io/data_store.rs +++ b/finality-aleph/src/data_io/data_store.rs @@ -26,7 +26,13 @@ use crate::{ status_provider::get_proposal_status, AlephNetworkMessage, }, - network::{ComponentNetwork, DataNetwork, ReceiverComponent, RequestBlocks, SimpleNetwork}, + network::{ + data::{ + component::{Network as ComponentNetwork, Receiver, SimpleNetwork}, + Network as DataNetwork, + }, + RequestBlocks, + }, BlockHashNum, SessionBoundaries, }; @@ -174,7 +180,7 @@ where RB: RequestBlocks + 'static, Message: AlephNetworkMessage + std::fmt::Debug + Send + Sync + Clone + codec::Codec + 'static, - R: ReceiverComponent, + R: Receiver, { next_free_id: MessageId, pending_proposals: HashMap, PendingProposalInfo>, @@ -201,7 +207,7 @@ where RB: RequestBlocks + 'static, Message: AlephNetworkMessage + std::fmt::Debug + Send + Sync + Clone + codec::Codec + 'static, - R: ReceiverComponent, + R: Receiver, { /// Returns a struct to be run and a network that outputs messages filtered as appropriate pub fn new>( diff --git a/finality-aleph/src/lib.rs b/finality-aleph/src/lib.rs index d6772d6aa8..84c083d5b3 100644 --- a/finality-aleph/src/lib.rs +++ b/finality-aleph/src/lib.rs @@ -22,7 +22,7 @@ use tokio::time::Duration; use crate::{ abft::{CurrentNetworkData, LegacyNetworkData}, aggregation::{CurrentRmcNetworkData, LegacyRmcNetworkData}, - network::{protocol_name, Split}, + network::{data::split::Split, protocol_name}, session::{ first_block_of_session, last_block_of_session, session_id_from_block_num, SessionBoundaries, SessionId, diff --git a/finality-aleph/src/network/component.rs b/finality-aleph/src/network/data/component.rs similarity index 97% rename from finality-aleph/src/network/component.rs rename to finality-aleph/src/network/data/component.rs index fb69ede92c..75cfd3c323 100644 --- a/finality-aleph/src/network/component.rs +++ b/finality-aleph/src/network/data/component.rs @@ -4,7 +4,10 @@ use futures::{channel::mpsc, StreamExt}; use log::warn; use crate::{ - network::{Data, DataNetwork, SendError}, + network::{ + data::{Network as DataNetwork, SendError}, + Data, + }, Recipient, }; @@ -185,8 +188,11 @@ mod tests { use super::{DataNetwork, NetworkMap, Receiver, Sender}; use crate::{ network::{ - component::{Network, ReceiverMap, SenderMap}, - Data, SendError, + data::{ + component::{Network, ReceiverMap, SenderMap}, + SendError, + }, + Data, }, Recipient, }; diff --git a/finality-aleph/src/network/data/mod.rs b/finality-aleph/src/network/data/mod.rs new file mode 100644 index 0000000000..2ee22d75d7 --- /dev/null +++ b/finality-aleph/src/network/data/mod.rs @@ -0,0 +1,18 @@ +//! Abstraction over an abstract network sending data to a set of nodes. +use crate::{abft::Recipient, network::Data}; + +pub mod component; +pub mod split; + +/// Returned when something went wrong when sending data using a Network. +#[derive(Debug)] +pub enum SendError { + SendFailed, +} + +/// A generic interface for sending and receiving data. +#[async_trait::async_trait] +pub trait Network: Send + Sync { + fn send(&self, data: D, recipient: Recipient) -> Result<(), SendError>; + async fn next(&mut self) -> Option; +} diff --git a/finality-aleph/src/network/split.rs b/finality-aleph/src/network/data/split.rs similarity index 87% rename from finality-aleph/src/network/split.rs rename to finality-aleph/src/network/data/split.rs index 73268ec6d2..c674902d1b 100644 --- a/finality-aleph/src/network/split.rs +++ b/finality-aleph/src/network/data/split.rs @@ -9,8 +9,11 @@ use tokio::sync::Mutex; use crate::{ network::{ - ComponentNetwork, ComponentNetworkExt, Data, ReceiverComponent, SendError, SenderComponent, - SimpleNetwork, + data::{ + component::{Network, NetworkExt, Receiver, Sender, SimpleNetwork}, + SendError, + }, + Data, }, Recipient, Version, Versioned, }; @@ -65,7 +68,7 @@ impl Convert for ToRightSplitConvert { struct SplitSender< LeftData: Data, RightData: Data, - S: SenderComponent>, + S: Sender>, Conv: Convert, > { sender: S, @@ -75,9 +78,9 @@ struct SplitSender< impl< LeftData: Data, RightData: Data, - S: SenderComponent>, + S: Sender>, Conv: Convert> + Clone + Send + Sync, - > SenderComponent for SplitSender + > Sender for SplitSender where ::From: Data, ::To: Data, @@ -96,7 +99,7 @@ type RightSender = struct SplitReceiver< LeftData: Data, RightData: Data, - R: ReceiverComponent>, + R: Receiver>, TranslatedData: Data, > { receiver: Arc>, @@ -110,9 +113,9 @@ struct SplitReceiver< impl< LeftData: Data, RightData: Data, - R: ReceiverComponent>, + R: Receiver>, TranslatedData: Data, - > ReceiverComponent for SplitReceiver + > Receiver for SplitReceiver { async fn next(&mut self) -> Option { loop { @@ -137,7 +140,7 @@ type RightReceiver = SplitReceiver>, + R: Receiver>, >( receiver: &Arc>, left_sender: &mpsc::UnboundedSender, @@ -169,7 +172,7 @@ async fn forward_or_wait< } } -fn split_sender>>( +fn split_sender>>( sender: S, ) -> ( LeftSender, @@ -187,11 +190,7 @@ fn split_sender>, ->( +fn split_receiver>>( receiver: R, left_name: &'static str, right_name: &'static str, @@ -229,14 +228,11 @@ fn split_receiver< /// /// The main example for now is creating an `aleph_bft::Network` and a separate one for accumulating /// signatures for justifications. -pub fn split>>( +pub fn split>>( network: CN, left_name: &'static str, right_name: &'static str, -) -> ( - impl ComponentNetworkExt, - impl ComponentNetworkExt, -) { +) -> (impl NetworkExt, impl NetworkExt) { let (sender, receiver) = network.into(); let (left_sender, right_sender) = split_sender(sender); let (left_receiver, right_receiver) = split_receiver(receiver, left_name, right_name); diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index 4f76b90dcb..dda72e4486 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -8,31 +8,22 @@ use codec::Codec; use sp_api::NumberFor; use sp_runtime::traits::Block; -use crate::abft::Recipient; - -mod component; +pub mod data; mod gossip; mod io; mod manager; #[cfg(test)] pub mod mock; mod session; -mod split; mod substrate; -pub use component::{ - Network as ComponentNetwork, NetworkExt as ComponentNetworkExt, - NetworkMap as ComponentNetworkMap, Receiver as ReceiverComponent, Sender as SenderComponent, - SimpleNetwork, -}; pub use gossip::{Network as GossipNetwork, Protocol, Service as GossipService}; pub use io::setup as setup_io; use manager::SessionCommand; pub use manager::{ ConnectionIO as ConnectionManagerIO, ConnectionManager, ConnectionManagerConfig, }; -pub use session::{Manager as SessionManager, ManagerError, Sender, IO as SessionManagerIO}; -pub use split::{split, Split}; +pub use session::{Manager as SessionManager, ManagerError, SessionSender, IO as SessionManagerIO}; pub use substrate::protocol_name; #[cfg(test)] pub mod testing { @@ -135,23 +126,10 @@ pub enum ConnectionCommand { DelReserved(HashSet), } -/// Returned when something went wrong when sending data using a DataNetwork. -#[derive(Debug)] -pub enum SendError { - SendFailed, -} - -/// What the data sent using the network has to satisfy. +/// A basic alias for properties we expect basic data to satisfy. pub trait Data: Clone + Codec + Send + Sync + 'static {} impl Data for D {} // In practice D: Data and P: PeerId, but we cannot require that in type aliases. type AddressedData = (D, P); - -/// A generic interface for sending and receiving data. -#[async_trait::async_trait] -pub trait DataNetwork: Send + Sync { - fn send(&self, data: D, recipient: Recipient) -> Result<(), SendError>; - async fn next(&mut self) -> Option; -} diff --git a/finality-aleph/src/network/session.rs b/finality-aleph/src/network/session.rs index 4b835e1bc4..9b91ef77bd 100644 --- a/finality-aleph/src/network/session.rs +++ b/finality-aleph/src/network/session.rs @@ -1,21 +1,26 @@ use futures::channel::{mpsc, oneshot}; -use super::SimpleNetwork; use crate::{ abft::Recipient, crypto::{AuthorityPen, AuthorityVerifier}, - network::{Data, SendError, SenderComponent, SessionCommand}, + network::{ + data::{ + component::{Sender, SimpleNetwork}, + SendError, + }, + Data, SessionCommand, + }, NodeIndex, SessionId, }; /// Sends data within a single session. #[derive(Clone)] -pub struct Sender { +pub struct SessionSender { session_id: SessionId, messages_for_network: mpsc::UnboundedSender<(D, SessionId, Recipient)>, } -impl SenderComponent for Sender { +impl Sender for SessionSender { fn send(&self, data: D, recipient: Recipient) -> Result<(), SendError> { self.messages_for_network .unbounded_send((data, self.session_id, recipient)) @@ -24,7 +29,7 @@ impl SenderComponent for Sender { } /// Sends and receives data within a single session. -type Network = SimpleNetwork, Sender>; +type Network = SimpleNetwork, SessionSender>; /// Manages sessions for which the network should be active. pub struct Manager { @@ -105,7 +110,7 @@ impl Manager { Ok(Network::new( data_from_network, - Sender { + SessionSender { session_id, messages_for_network, }, diff --git a/finality-aleph/src/party/manager/aggregator.rs b/finality-aleph/src/party/manager/aggregator.rs index e947b8dcce..852104bed3 100644 --- a/finality-aleph/src/party/manager/aggregator.rs +++ b/finality-aleph/src/party/manager/aggregator.rs @@ -15,7 +15,7 @@ use crate::{ crypto::Signature, justification::{AlephJustification, JustificationNotification}, metrics::Checkpoint, - network::DataNetwork, + network::data::Network, party::{ manager::aggregator::AggregatorVersion::{Current, Legacy}, AuthoritySubtaskCommon, Task, @@ -36,8 +36,8 @@ async fn process_new_block_data( metrics: &Option::Hash>>, ) where B: Block, - CN: DataNetwork>, - LN: DataNetwork>, + CN: Network>, + LN: Network>, ::Hash: AsRef<[u8]>, { trace!(target: "aleph-party", "Received unit {:?} in aggregator.", block); @@ -83,8 +83,8 @@ async fn run_aggregator( where B: Block, C: HeaderBackend + Send + Sync + 'static, - LN: DataNetwork>, - CN: DataNetwork>, + LN: Network>, + CN: Network>, ::Hash: AsRef<[u8]>, { let IO { @@ -169,8 +169,8 @@ pub fn task( where B: Block, C: HeaderBackend + Send + Sync + 'static, - LN: DataNetwork> + 'static, - CN: DataNetwork> + 'static, + LN: Network> + 'static, + CN: Network> + 'static, { let AuthoritySubtaskCommon { spawn_handle, diff --git a/finality-aleph/src/party/manager/data_store.rs b/finality-aleph/src/party/manager/data_store.rs index 6a46e0a892..be3988b675 100644 --- a/finality-aleph/src/party/manager/data_store.rs +++ b/finality-aleph/src/party/manager/data_store.rs @@ -9,7 +9,7 @@ use sp_runtime::traits::Block; use crate::{ abft::SpawnHandleT, data_io::{AlephNetworkMessage, DataStore}, - network::{ReceiverComponent, RequestBlocks}, + network::{data::component::Receiver, RequestBlocks}, party::{AuthoritySubtaskCommon, Task}, }; @@ -23,7 +23,7 @@ where C: HeaderBackend + BlockchainEvents + Send + Sync + 'static, RB: RequestBlocks + 'static, Message: AlephNetworkMessage + Debug + Send + Sync + Codec + 'static, - R: ReceiverComponent + 'static, + R: Receiver + 'static, { let AuthoritySubtaskCommon { spawn_handle, diff --git a/finality-aleph/src/party/manager/mod.rs b/finality-aleph/src/party/manager/mod.rs index a9321f7f70..199e5c9092 100644 --- a/finality-aleph/src/party/manager/mod.rs +++ b/finality-aleph/src/party/manager/mod.rs @@ -21,8 +21,11 @@ use crate::{ data_io::{ChainTracker, DataStore, OrderedDataInterpreter}, mpsc, network::{ - split, ComponentNetworkMap, ManagerError, RequestBlocks, Sender, SessionManager, - SimpleNetwork, + data::{ + component::{Network, NetworkMap, SimpleNetwork}, + split::split, + }, + ManagerError, RequestBlocks, SessionManager, SessionSender, }, party::{ backup::ABFTBackup, manager::aggregator::AggregatorVersion, traits::NodeSessionManager, @@ -44,7 +47,6 @@ pub use task::{Handle, Task}; use crate::{ abft::{CURRENT_VERSION, LEGACY_VERSION}, data_io::DataProvider, - network::ComponentNetwork, }; #[cfg(feature = "only_legacy")] @@ -53,12 +55,12 @@ const ONLY_LEGACY_ENV: &str = "ONLY_LEGACY_PROTOCOL"; type LegacyNetworkType = SimpleNetwork< LegacyRmcNetworkData, mpsc::UnboundedReceiver>, - Sender>, + SessionSender>, >; type CurrentNetworkType = SimpleNetwork< CurrentRmcNetworkData, mpsc::UnboundedReceiver>, - Sender>, + SessionSender>, >; struct SubtasksParams @@ -67,7 +69,7 @@ where C: crate::ClientForAleph + Send + Sync + 'static, BE: Backend + 'static, SC: SelectChain + 'static, - N: ComponentNetwork> + 'static, + N: Network> + 'static, { n_members: usize, node_id: NodeIndex, @@ -143,7 +145,7 @@ where } } - fn legacy_subtasks> + 'static>( + fn legacy_subtasks> + 'static>( &self, params: SubtasksParams, ) -> Subtasks { @@ -201,7 +203,7 @@ where ) } - fn current_subtasks> + 'static>( + fn current_subtasks> + 'static>( &self, params: SubtasksParams, ) -> Subtasks { diff --git a/finality-aleph/src/testing/data_store.rs b/finality-aleph/src/testing/data_store.rs index 1342392751..7005b69942 100644 --- a/finality-aleph/src/testing/data_store.rs +++ b/finality-aleph/src/testing/data_store.rs @@ -18,7 +18,10 @@ use tokio::time::timeout; use crate::{ data_io::{AlephData, AlephNetworkMessage, DataStore, DataStoreConfig, MAX_DATA_BRANCH_LEN}, - network::{ComponentNetwork, Data, DataNetwork, RequestBlocks}, + network::{ + data::{component::Network as ComponentNetwork, Network as DataNetwork}, + Data, RequestBlocks, + }, session::{SessionBoundaries, SessionId, SessionPeriod}, testing::{ client_chain_builder::ClientChainBuilder, diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index 2cb7dd7e2f..9c2c1bea0c 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -12,14 +12,15 @@ use tokio::{runtime::Handle, task::JoinHandle, time::timeout}; use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, network::{ + data::Network, mock::{crypto_basics, MockData}, setup_io, testing::{ authentication, legacy_authentication, DataInSession, LegacyDiscoveryMessage, MockEvent, MockRawNetwork, SessionHandler, VersionedAuthentication, }, - AddressingInformation, ConnectionManager, ConnectionManagerConfig, DataNetwork, - GossipService, NetworkIdentity, Protocol, SessionManager, + AddressingInformation, ConnectionManager, ConnectionManagerConfig, GossipService, + NetworkIdentity, Protocol, SessionManager, }, testing::mocks::validator_network::{ random_address_from, MockAddressingInformation, MockNetwork as MockValidatorNetwork, @@ -159,7 +160,7 @@ impl TestData { &self, node_id: usize, session_id: u32, - ) -> impl DataNetwork { + ) -> impl Network { self.session_manager .start_validator_session( SessionId(session_id), @@ -242,7 +243,7 @@ impl TestData { } } - async fn start_session(&mut self, session_id: u32) -> impl DataNetwork { + async fn start_session(&mut self, session_id: u32) -> impl Network { let data_network = self.start_validator_session(0, session_id).await; self.connect_session_authorities(session_id).await; self.check_add_connection().await;