From 3adf62b52b7466465f087b66d692ede7db702eaa Mon Sep 17 00:00:00 2001 From: timorl Date: Wed, 7 Dec 2022 16:36:29 +0100 Subject: [PATCH 1/3] Addressing information refactor --- finality-aleph/src/network/io.rs | 18 +- .../src/network/manager/compatibility.rs | 296 ++++++++-- .../src/network/manager/discovery.rs | 291 ++++----- finality-aleph/src/network/manager/mod.rs | 78 ++- finality-aleph/src/network/manager/service.rs | 244 ++++---- finality-aleph/src/network/manager/session.rs | 552 +++++++----------- finality-aleph/src/network/mock.rs | 24 +- finality-aleph/src/network/mod.rs | 50 +- finality-aleph/src/tcp_network.rs | 167 ++++-- .../src/testing/mocks/validator_network.rs | 127 ++-- finality-aleph/src/testing/network.rs | 162 +++-- .../validator_network/manager/direction.rs | 52 +- .../src/validator_network/manager/legacy.rs | 44 +- .../src/validator_network/manager/mod.rs | 42 +- finality-aleph/src/validator_network/mod.rs | 7 +- .../src/validator_network/outgoing.rs | 13 +- .../src/validator_network/service.rs | 40 +- 17 files changed, 1235 insertions(+), 972 deletions(-) diff --git a/finality-aleph/src/network/io.rs b/finality-aleph/src/network/io.rs index 8557349581..497d16cab6 100644 --- a/finality-aleph/src/network/io.rs +++ b/finality-aleph/src/network/io.rs @@ -1,28 +1,32 @@ +use std::fmt::Debug; + use futures::channel::mpsc; use crate::{ network::{ manager::{DataInSession, VersionedAuthentication}, - ConnectionManagerIO, Data, Multiaddress, NetworkServiceIO as NetworkIO, SessionManagerIO, + AddressingInformation, ConnectionManagerIO, Data, NetworkServiceIO as NetworkIO, + SessionManagerIO, }, validator_network::{Network as ValidatorNetwork, PublicKey}, }; -type AuthenticationNetworkIO = NetworkIO>; +type AuthenticationNetworkIO = NetworkIO>; pub fn setup< D: Data, - M: Multiaddress + 'static, - VN: ValidatorNetwork>, + M: Data + Debug, + A: AddressingInformation + TryFrom> + Into>, + VN: ValidatorNetwork>, >( validator_network: VN, ) -> ( - ConnectionManagerIO, - AuthenticationNetworkIO, + ConnectionManagerIO, + AuthenticationNetworkIO, SessionManagerIO, ) where - M::PeerId: PublicKey, + A::PeerId: PublicKey, { // Prepare and start the network let (messages_for_network, messages_from_user) = mpsc::unbounded(); diff --git a/finality-aleph/src/network/manager/compatibility.rs b/finality-aleph/src/network/manager/compatibility.rs index 5c68521dea..21afeb79bb 100644 --- a/finality-aleph/src/network/manager/compatibility.rs +++ b/finality-aleph/src/network/manager/compatibility.rs @@ -7,8 +7,11 @@ use codec::{Decode, Encode, Error as CodecError, Input as CodecInput}; use log::warn; use crate::{ - network::{manager::DiscoveryMessage, Multiaddress}, - Version, + network::{ + manager::{AuthData, Authentication, LegacyAuthentication}, + AddressingInformation, Data, + }, + SessionId, Version, }; type ByteCount = u16; @@ -16,28 +19,131 @@ type ByteCount = u16; // We allow sending authentications of size up to 16KiB, that should be enough. const MAX_AUTHENTICATION_SIZE: u16 = 16 * 1024; +/// The possible forms of peer authentications we can have, either only new, only legacy or both. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum PeerAuthentications> + Into>> { + NewOnly(Authentication), + LegacyOnly(LegacyAuthentication), + Both(Authentication, LegacyAuthentication), +} + +impl> + Into>> PeerAuthentications { + /// The session with which these authentications are associated. + pub fn session_id(&self) -> SessionId { + use PeerAuthentications::*; + match self { + NewOnly((auth_data, _)) | Both((auth_data, _), _) => auth_data.session(), + LegacyOnly((auth_data, _)) => auth_data.session(), + } + } + + /// Add an authentication, overriding one that might have been here previously. + pub fn add_authentication(&mut self, authentication: Authentication) { + use PeerAuthentications::*; + match self { + NewOnly(_) => *self = NewOnly(authentication), + LegacyOnly(legacy_authentication) | Both(_, legacy_authentication) => { + *self = Both(authentication, legacy_authentication.clone()) + } + } + } + + /// Add a legacy authentication, overriding one that might have been here previously. + pub fn add_legacy_authentication(&mut self, legacy_authentication: LegacyAuthentication) { + use PeerAuthentications::*; + match self { + LegacyOnly(_) => *self = LegacyOnly(legacy_authentication), + NewOnly(authentication) | Both(authentication, _) => { + *self = Both(authentication.clone(), legacy_authentication) + } + } + } + + /// The associated address, if any. Can be `None` only for legacy authentications. + pub fn maybe_address(&self) -> Option { + use PeerAuthentications::*; + match self { + NewOnly((auth_data, _)) | Both((auth_data, _), _) => Some(auth_data.address()), + LegacyOnly((legacy_auth_data, _)) => legacy_auth_data + .clone() + .try_into() + .map(|auth_data: AuthData| auth_data.address()) + .ok(), + } + } +} + +/// Legacy messages used for discovery and authentication. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Encode, Decode)] +pub enum LegacyDiscoveryMessage { + AuthenticationBroadcast(LegacyAuthentication), + Authentication(LegacyAuthentication), +} + #[derive(Clone, Debug, PartialEq, Eq)] -pub enum VersionedAuthentication { +pub enum VersionedAuthentication { // Most likely from the future. Other(Version, Vec), - V1(DiscoveryMessage), + V1(LegacyDiscoveryMessage), + V2(Authentication), } -impl TryInto> for VersionedAuthentication { - type Error = Error; - - fn try_into(self) -> Result, Self::Error> { +impl> + Into>> + From> for Vec> +{ + fn from(authentications: PeerAuthentications) -> Self { + use LegacyDiscoveryMessage::*; + use PeerAuthentications::*; use VersionedAuthentication::*; + match authentications { + NewOnly(authentication) => vec![V2(authentication)], + LegacyOnly(legacy_authentication) => { + vec![V1(AuthenticationBroadcast(legacy_authentication))] + } + Both(authentication, legacy_authentication) => vec![ + V2(authentication), + V1(AuthenticationBroadcast(legacy_authentication)), + ], + } + } +} + +/// One of the possible messages we could have gotten as part of discovery. +/// Ignores whether the old authentication was a broadcast or not, we don't send the +/// non-broadcasts anymore anyway, and treating them as broadcasts doesn't break anything. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum DiscoveryMessage> + Into>> { + Authentication(Authentication), + LegacyAuthentication(LegacyAuthentication), +} + +impl> + Into>> DiscoveryMessage { + /// Session ID associated with this message. + pub fn session_id(&self) -> SessionId { + use DiscoveryMessage::*; match self { - V1(message) => Ok(message), - Other(v, _) => Err(Error::UnknownVersion(v)), + Authentication((auth_data, _)) => auth_data.session(), + LegacyAuthentication((auth_data, _)) => auth_data.session(), } } } -impl From> for VersionedAuthentication { - fn from(message: DiscoveryMessage) -> VersionedAuthentication { - VersionedAuthentication::V1(message) +impl> + Into>> + TryInto> for VersionedAuthentication +{ + type Error = Error; + + fn try_into(self) -> Result, Self::Error> { + use LegacyDiscoveryMessage::*; + use VersionedAuthentication::*; + match self { + V1(AuthenticationBroadcast(legacy_authentication)) + | V1(Authentication(legacy_authentication)) => Ok( + DiscoveryMessage::LegacyAuthentication(legacy_authentication), + ), + V2(authentication) => Ok(DiscoveryMessage::Authentication(authentication)), + Other(v, _) => Err(Error::UnknownVersion(v)), + } } } @@ -71,7 +177,9 @@ fn encode_with_version(version: Version, payload: &[u8]) -> Vec { result } -impl Encode for VersionedAuthentication { +impl> + Into>> Encode + for VersionedAuthentication +{ fn size_hint(&self) -> usize { use VersionedAuthentication::*; let version_size = size_of::(); @@ -81,6 +189,7 @@ impl Encode for VersionedAuthentication { + match self { Other(_, payload) => payload.len(), V1(data) => data.size_hint(), + V2(data) => data.size_hint(), } } @@ -89,17 +198,21 @@ impl Encode for VersionedAuthentication { match self { Other(version, payload) => encode_with_version(*version, payload), V1(data) => encode_with_version(Version(1), &data.encode()), + V2(data) => encode_with_version(Version(2), &data.encode()), } } } -impl Decode for VersionedAuthentication { +impl> + Into>> Decode + for VersionedAuthentication +{ fn decode(input: &mut I) -> Result { use VersionedAuthentication::*; let version = Version::decode(input)?; let num_bytes = ByteCount::decode(input)?; match version { - Version(1) => Ok(V1(DiscoveryMessage::decode(input)?)), + Version(1) => Ok(V1(LegacyDiscoveryMessage::decode(input)?)), + Version(2) => Ok(V2(Authentication::decode(input)?)), _ => { if num_bytes > MAX_AUTHENTICATION_SIZE { Err("Authentication has unknown version and is encoded as more than 16KiB.")?; @@ -135,21 +248,24 @@ mod test { use codec::{Decode, Encode}; use sp_keystore::testing::KeyStore; - use super::{DiscoveryMessage, VersionedAuthentication}; + use super::VersionedAuthentication; use crate::{ crypto::AuthorityVerifier, network::{ - manager::{compatibility::MAX_AUTHENTICATION_SIZE, SessionHandler}, + manager::{ + compatibility::{PeerAuthentications, MAX_AUTHENTICATION_SIZE}, + LegacyDiscoveryMessage, SessionHandler, + }, NetworkIdentity, }, nodes::testing::new_pen, - tcp_network::{testing::new_identity, TcpMultiaddress}, - testing::mocks::validator_network::MockMultiaddress, + tcp_network::{testing::new_identity, LegacyTcpMultiaddress, TcpAddressingInformation}, + testing::mocks::validator_network::MockAddressingInformation, NodeIndex, SessionId, Version, }; /// Session Handler used for generating versioned authentication in `raw_authentication_v1` - async fn handler() -> SessionHandler { + async fn handler() -> SessionHandler { let mnemonic = "ring cool spatial rookie need wing opinion pond fork garbage more april"; let external_addresses = vec![ String::from("addr1"), @@ -159,19 +275,43 @@ mod test { let keystore = Arc::new(KeyStore::new()); let pen = new_pen(mnemonic, keystore).await; - let identity = new_identity( - external_addresses.into_iter().map(String::from).collect(), - pen.authority_id(), - ); + let identity = new_identity(external_addresses, pen.authority_id()); SessionHandler::new( Some((NodeIndex(21), pen)), AuthorityVerifier::new(vec![]), SessionId(37), - identity.identity().0, + identity.identity(), ) .await - .unwrap() + } + + fn authentication_v1( + handler: SessionHandler, + ) -> VersionedAuthentication { + match handler + .authentication() + .expect("should have authentication") + { + PeerAuthentications::Both(_, authentication) => { + VersionedAuthentication::V1(LegacyDiscoveryMessage::Authentication(authentication)) + } + _ => panic!("handler doesn't have both authentications"), + } + } + + fn authentication_v2( + handler: SessionHandler, + ) -> VersionedAuthentication { + match handler + .authentication() + .expect("should have authentication") + { + PeerAuthentications::Both(authentication, _) => { + VersionedAuthentication::V2(authentication) + } + _ => panic!("handler doesn't have both authentications"), + } } /// Versioned authentication for authority with: @@ -194,14 +334,29 @@ mod test { ] } + /// Versioned authentication for authority with: + /// external_addresses: [String::from("addr1"), String::from("addr2"), String::from("addr3")] + /// derived from mnemonic "ring cool spatial rookie need wing opinion pond fork garbage more april" + /// for node index 21 and session id 37 + /// encoded at version of Aleph Node after 8.0 + fn raw_authentication_v2() -> Vec { + //TODO: this will fail, check what it should be + vec![ + 2, 0, 127, 0, 50, 40, 192, 239, 72, 72, 119, 156, 76, 37, 212, 220, 76, 165, 39, 73, + 20, 89, 77, 66, 171, 174, 61, 31, 254, 137, 186, 1, 7, 141, 187, 219, 20, 97, 100, 100, + 114, 49, 8, 20, 97, 100, 100, 114, 50, 20, 97, 100, 100, 114, 51, 21, 0, 0, 0, 0, 0, 0, + 0, 37, 0, 0, 0, 62, 4, 215, 148, 82, 197, 128, 124, 68, 183, 132, 114, 101, 15, 49, + 220, 175, 29, 128, 15, 163, 6, 147, 56, 103, 140, 125, 92, 92, 243, 194, 168, 63, 65, + 101, 78, 165, 63, 169, 132, 73, 212, 6, 10, 231, 78, 48, 219, 70, 23, 180, 227, 95, + 141, 111, 60, 245, 119, 27, 84, 187, 33, 77, 2, + ] + } + #[tokio::test] async fn correcly_encodes_v1_to_bytes() { let handler = handler().await; let raw = raw_authentication_v1(); - - let authentication_v1 = VersionedAuthentication::V1(DiscoveryMessage::Authentication( - handler.authentication().unwrap(), - )); + let authentication_v1 = authentication_v1(handler); assert_eq!(authentication_v1.encode(), raw); } @@ -210,10 +365,7 @@ mod test { async fn correcly_decodes_v1_from_bytes() { let handler = handler().await; let raw = raw_authentication_v1(); - - let authentication_v1 = VersionedAuthentication::V1(DiscoveryMessage::Authentication( - handler.authentication().unwrap(), - )); + let authentication_v1 = authentication_v1(handler); let decoded = VersionedAuthentication::decode(&mut raw.as_slice()); @@ -223,10 +375,7 @@ mod test { #[tokio::test] async fn correctly_decodes_v1_roundtrip() { let handler = handler().await; - - let authentication_v1 = VersionedAuthentication::V1(DiscoveryMessage::Authentication( - handler.authentication().unwrap(), - )); + let authentication_v1 = authentication_v1(handler); let encoded = authentication_v1.encode(); let decoded = VersionedAuthentication::decode(&mut encoded.as_slice()); @@ -234,9 +383,44 @@ mod test { assert_eq!(decoded, Ok(authentication_v1)) } + #[tokio::test] + async fn correcly_encodes_v2_to_bytes() { + let handler = handler().await; + let raw = raw_authentication_v2(); + let authentication_v2 = authentication_v2(handler); + + assert_eq!(authentication_v2.encode(), raw); + } + + #[tokio::test] + async fn correcly_decodes_v2_from_bytes() { + let handler = handler().await; + let raw = raw_authentication_v2(); + let authentication_v2 = authentication_v2(handler); + + let decoded = VersionedAuthentication::decode(&mut raw.as_slice()); + + assert_eq!(decoded, Ok(authentication_v2)); + } + + #[tokio::test] + async fn correctly_decodes_v2_roundtrip() { + let handler = handler().await; + let authentication_v2 = authentication_v2(handler); + + let encoded = authentication_v2.encode(); + let decoded = VersionedAuthentication::decode(&mut encoded.as_slice()); + + assert_eq!(decoded, Ok(authentication_v2)) + } + #[tokio::test] async fn correctly_decodes_other() { - let other = VersionedAuthentication::::Other(Version(42), vec![21, 37]); + let other = + VersionedAuthentication::::Other( + Version(42), + vec![21, 37], + ); let encoded = other.encode(); let decoded = VersionedAuthentication::decode(&mut encoded.as_slice()); assert_eq!(decoded, Ok(other)); @@ -245,13 +429,15 @@ mod test { other_big.append(&mut (MAX_AUTHENTICATION_SIZE).encode()); other_big.append(&mut vec![0u8; (MAX_AUTHENTICATION_SIZE).into()]); let decoded = - VersionedAuthentication::::decode(&mut other_big.as_slice()); + VersionedAuthentication::::decode( + &mut other_big.as_slice(), + ); assert_eq!( decoded, - Ok(VersionedAuthentication::::Other( - Version(42), - other_big[4..].to_vec() - )) + Ok(VersionedAuthentication::< + MockAddressingInformation, + MockAddressingInformation, + >::Other(Version(42), other_big[4..].to_vec())) ); } @@ -261,13 +447,22 @@ mod test { let size = MAX_AUTHENTICATION_SIZE + 1; other.append(&mut size.encode()); other.append(&mut vec![0u8; size.into()]); - let decoded = VersionedAuthentication::::decode(&mut other.as_slice()); + let decoded = + VersionedAuthentication::::decode( + &mut other.as_slice(), + ); assert!(decoded.is_err()); let other = - VersionedAuthentication::::Other(Version(42), vec![0u8; size.into()]); + VersionedAuthentication::::Other( + Version(42), + vec![0u8; size.into()], + ); let encoded = other.encode(); - let decoded = VersionedAuthentication::::decode(&mut encoded.as_slice()); + let decoded = + VersionedAuthentication::::decode( + &mut encoded.as_slice(), + ); assert!(decoded.is_err()); } @@ -276,7 +471,10 @@ mod test { let mut other = 42u16.encode(); other.append(&mut MAX_AUTHENTICATION_SIZE.encode()); other.append(&mut vec![21, 37]); - let decoded = VersionedAuthentication::::decode(&mut other.as_slice()); + let decoded = + VersionedAuthentication::::decode( + &mut other.as_slice(), + ); assert!(decoded.is_err()); } } diff --git a/finality-aleph/src/network/manager/discovery.rs b/finality-aleph/src/network/manager/discovery.rs index 112fceadf3..f1b3d2bf3b 100644 --- a/finality-aleph/src/network/manager/discovery.rs +++ b/finality-aleph/src/network/manager/discovery.rs @@ -1,51 +1,38 @@ use std::{ collections::HashMap, + fmt::Debug, marker::PhantomData, time::{Duration, Instant}, }; -use codec::{Decode, Encode}; use log::{debug, info, trace}; use crate::{ network::{ - manager::{Authentication, SessionHandler}, - Multiaddress, + manager::{ + compatibility::PeerAuthentications, Authentication, LegacyAuthentication, + SessionHandler, + }, + AddressingInformation, Data, }, - NodeIndex, SessionId, + NodeIndex, }; -/// Messages used for discovery and authentication. -#[derive(Clone, Debug, PartialEq, Eq, Hash, Encode, Decode)] -pub enum DiscoveryMessage { - AuthenticationBroadcast(Authentication), - Authentication(Authentication), -} - -impl DiscoveryMessage { - pub fn session_id(&self) -> SessionId { - use DiscoveryMessage::*; - match self { - AuthenticationBroadcast((auth_data, _)) | Authentication((auth_data, _)) => { - auth_data.session() - } - } - } -} - /// Handles creating and rebroadcasting discovery messages. -pub struct Discovery { +pub struct Discovery> + Into>> { cooldown: Duration, last_broadcast: HashMap, - _phantom: PhantomData, + last_legacy_broadcast: HashMap, + _phantom: PhantomData<(M, A)>, } -impl Discovery { +impl> + Into>> Discovery { /// Create a new discovery handler with the given response/broadcast cooldown. pub fn new(cooldown: Duration) -> Self { Discovery { cooldown, last_broadcast: HashMap::new(), + last_legacy_broadcast: HashMap::new(), _phantom: PhantomData, } } @@ -53,8 +40,8 @@ impl Discovery { /// Returns a message that should be sent as part of authority discovery at this moment. pub fn discover_authorities( &mut self, - handler: &SessionHandler, - ) -> Option> { + handler: &SessionHandler, + ) -> Option> { let authentication = match handler.authentication() { Some(authentication) => authentication, None => return None, @@ -63,68 +50,68 @@ impl Discovery { let missing_authorities = handler.missing_nodes(); let node_count = handler.node_count(); info!(target: "aleph-network", "{}/{} authorities known for session {}.", node_count.0-missing_authorities.len(), node_count.0, handler.session_id().0); - Some(DiscoveryMessage::AuthenticationBroadcast(authentication)) + Some(authentication) } - /// Checks the authentication using the handler and returns the addresses we should be - /// connected to if the authentication is correct. - fn handle_authentication( - &mut self, - authentication: Authentication, - handler: &mut SessionHandler, - ) -> Vec { - if !handler.handle_authentication(authentication.clone()) { - return Vec::new(); + fn should_rebroadcast(&self, node_id: &NodeIndex) -> bool { + match self.last_broadcast.get(node_id) { + Some(instant) => Instant::now() > *instant + self.cooldown, + None => true, } - authentication.0.addresses() } - fn should_rebroadcast(&self, node_id: &NodeIndex) -> bool { - match self.last_broadcast.get(node_id) { + fn should_legacy_rebroadcast(&self, node_id: &NodeIndex) -> bool { + match self.last_legacy_broadcast.get(node_id) { Some(instant) => Instant::now() > *instant + self.cooldown, None => true, } } - fn handle_broadcast( + /// Processes the provided authentication and returns any new address we should + /// be connected to if we want to stay connected to the committee and an optional + /// message that we should send as a result of it. + pub fn handle_authentication( &mut self, - authentication: Authentication, - handler: &mut SessionHandler, - ) -> (Vec, Option>) { + authentication: Authentication, + handler: &mut SessionHandler, + ) -> (Option, Option>) { debug!(target: "aleph-network", "Handling broadcast with authentication {:?}.", authentication); - let addresses = self.handle_authentication(authentication.clone(), handler); - if addresses.is_empty() { - return (Vec::new(), None); - } + let address = match handler.handle_authentication(authentication.clone()) { + Some(address) => Some(address), + None => return (None, None), + }; let node_id = authentication.0.creator(); if !self.should_rebroadcast(&node_id) { - return (addresses, None); + return (address, None); } trace!(target: "aleph-network", "Rebroadcasting {:?}.", authentication); self.last_broadcast.insert(node_id, Instant::now()); - ( - addresses, - Some(DiscoveryMessage::AuthenticationBroadcast(authentication)), - ) + (address, Some(PeerAuthentications::NewOnly(authentication))) } - /// Analyzes the provided message and returns all the new multiaddresses we should + /// Processes the legacy authentication and returns any new address we should /// be connected to if we want to stay connected to the committee and an optional /// message that we should send as a result of it. - pub fn handle_message( + pub fn handle_legacy_authentication( &mut self, - message: DiscoveryMessage, - handler: &mut SessionHandler, - ) -> (Vec, Option>) { - use DiscoveryMessage::*; - match message { - AuthenticationBroadcast(authentication) => { - self.handle_broadcast(authentication, handler) - } - Authentication(authentication) => { - (self.handle_authentication(authentication, handler), None) - } + legacy_authentication: LegacyAuthentication, + handler: &mut SessionHandler, + ) -> (Option, Option>) { + debug!(target: "aleph-network", "Handling broadcast with legacy authentication {:?}.", legacy_authentication); + let address = match handler.handle_legacy_authentication(legacy_authentication.clone()) { + Some(address) => Some(address), + None => return (None, None), + }; + let node_id = legacy_authentication.0.creator(); + if !self.should_legacy_rebroadcast(&node_id) { + return (address, None); } + trace!(target: "aleph-network", "Rebroadcasting {:?}.", legacy_authentication); + self.last_legacy_broadcast.insert(node_id, Instant::now()); + ( + address, + Some(PeerAuthentications::LegacyOnly(legacy_authentication)), + ) } } @@ -132,30 +119,30 @@ impl Discovery { mod tests { use std::{thread::sleep, time::Duration}; - use codec::Encode; - - use super::{Discovery, DiscoveryMessage}; + use super::Discovery; use crate::{ - network::{manager::SessionHandler, mock::crypto_basics}, - testing::mocks::validator_network::{ - random_identity, random_multiaddress, MockMultiaddress, + network::{ + manager::{compatibility::PeerAuthentications, SessionHandler}, + mock::crypto_basics, + testing::{authentication, legacy_authentication}, }, + testing::mocks::validator_network::{random_address, MockAddressingInformation}, SessionId, }; const NUM_NODES: u8 = 7; const MS_COOLDOWN: u64 = 200; - fn addresses() -> Vec { - (0..NUM_NODES).map(|_| random_multiaddress()).collect() + fn addresses() -> Vec { + (0..NUM_NODES).map(|_| random_address()).collect() } async fn build_number( num_nodes: u8, ) -> ( - Discovery, - Vec>, - SessionHandler, + Discovery, + Vec>, + SessionHandler, ) { let crypto_basics = crypto_basics(num_nodes.into()).await; let mut handlers = Vec::new(); @@ -165,20 +152,18 @@ mod tests { Some(authority_index_and_pen), crypto_basics.1.clone(), SessionId(43), - vec![address], + address, ) - .await - .unwrap(), + .await, ); } let non_validator = SessionHandler::new( None, crypto_basics.1.clone(), SessionId(43), - random_identity().0, + random_address(), ) - .await - .unwrap(); + .await; ( Discovery::new(Duration::from_millis(MS_COOLDOWN)), handlers, @@ -187,9 +172,9 @@ mod tests { } async fn build() -> ( - Discovery, - Vec>, - SessionHandler, + Discovery, + Vec>, + SessionHandler, ) { build_number(NUM_NODES).await } @@ -199,10 +184,12 @@ mod tests { for num_nodes in 2..NUM_NODES { let (mut discovery, mut handlers, _) = build_number(num_nodes).await; let handler = &mut handlers[0]; - let message = discovery.discover_authorities(handler); + let maybe_authentication = discovery.discover_authorities(handler); assert_eq!( - message.expect("there is a discovery message"), - DiscoveryMessage::AuthenticationBroadcast(handler.authentication().unwrap()), + maybe_authentication.expect("there is an authentication"), + handler + .authentication() + .expect("the handler has an authentication"), ); } } @@ -210,100 +197,118 @@ mod tests { #[tokio::test] async fn non_validator_discover_authorities_returns_empty_vector() { let (mut discovery, _, non_validator) = build().await; - let message = discovery.discover_authorities(&non_validator); - assert!(message.is_none()); + let maybe_authentication = discovery.discover_authorities(&non_validator); + assert!(maybe_authentication.is_none()); } #[tokio::test] async fn rebroadcasts_and_accepts_addresses() { let (mut discovery, mut handlers, _) = build().await; - let authentication = handlers[1].authentication().unwrap(); + let authentication = authentication(&handlers[1]); + let handler = &mut handlers[0]; + let (address, command) = discovery.handle_authentication(authentication.clone(), handler); + assert_eq!(address, Some(authentication.0.address())); + assert!(matches!(command, Some( + PeerAuthentications::NewOnly(rebroadcast_authentication), + ) if rebroadcast_authentication == authentication)); + } + + #[tokio::test] + async fn legacy_rebroadcasts_and_accepts_addresses() { + let (mut discovery, mut handlers, _) = build().await; + let authentication = legacy_authentication(&handlers[1]); let handler = &mut handlers[0]; - let (addresses, command) = discovery.handle_message( - DiscoveryMessage::AuthenticationBroadcast(authentication.clone()), - handler, - ); - assert_eq!(addresses, authentication.0.addresses()); + let (_, command) = discovery.handle_legacy_authentication(authentication.clone(), handler); assert!(matches!(command, Some( - DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication), + PeerAuthentications::LegacyOnly(rebroadcast_authentication), ) if rebroadcast_authentication == authentication)); } #[tokio::test] - async fn non_validators_rebroadcasts() { + async fn non_validator_rebroadcasts() { let (mut discovery, handlers, mut non_validator) = build().await; - let authentication = handlers[1].authentication().unwrap(); - let (addresses, command) = discovery.handle_message( - DiscoveryMessage::AuthenticationBroadcast(authentication.clone()), - &mut non_validator, - ); - assert_eq!(addresses, authentication.0.addresses()); + let authentication = authentication(&handlers[1]); + let (address, command) = + discovery.handle_authentication(authentication.clone(), &mut non_validator); + assert_eq!(address, Some(authentication.0.address())); assert!(matches!(command, Some( - DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication), + PeerAuthentications::NewOnly(rebroadcast_authentication), + ) if rebroadcast_authentication == authentication)); + } + + #[tokio::test] + async fn legacy_non_validator_rebroadcasts() { + let (mut discovery, handlers, mut non_validator) = build().await; + let authentication = legacy_authentication(&handlers[1]); + let (_, command) = + discovery.handle_legacy_authentication(authentication.clone(), &mut non_validator); + assert!(matches!(command, Some( + PeerAuthentications::LegacyOnly(rebroadcast_authentication), ) if rebroadcast_authentication == authentication)); } #[tokio::test] async fn does_not_rebroadcast_wrong_authentications() { let (mut discovery, mut handlers, _) = build().await; - let (auth_data, _) = handlers[1].authentication().unwrap(); - let (_, signature) = handlers[2].authentication().unwrap(); + let (auth_data, _) = authentication(&handlers[1]); + let (_, signature) = authentication(&handlers[2]); let authentication = (auth_data, signature); let handler = &mut handlers[0]; - let (addresses, command) = discovery.handle_message( - DiscoveryMessage::AuthenticationBroadcast(authentication), - handler, - ); - assert!(addresses.is_empty()); + let (address, command) = discovery.handle_authentication(authentication, handler); + assert!(address.is_none()); + assert!(command.is_none()); + } + + #[tokio::test] + async fn legacy_does_not_rebroadcast_wrong_authentications() { + let (mut discovery, mut handlers, _) = build().await; + let (auth_data, _) = legacy_authentication(&handlers[1]); + let (_, signature) = legacy_authentication(&handlers[2]); + let authentication = (auth_data, signature); + let handler = &mut handlers[0]; + let (address, command) = discovery.handle_legacy_authentication(authentication, handler); + assert!(address.is_none()); assert!(command.is_none()); } #[tokio::test] async fn rebroadcasts_after_cooldown() { let (mut discovery, mut handlers, _) = build().await; - let authentication = handlers[1].authentication().unwrap(); + let authentication = authentication(&handlers[1]); let handler = &mut handlers[0]; - discovery.handle_message( - DiscoveryMessage::AuthenticationBroadcast(authentication.clone()), - handler, - ); + discovery.handle_authentication(authentication.clone(), handler); sleep(Duration::from_millis(MS_COOLDOWN + 5)); - let (addresses, command) = discovery.handle_message( - DiscoveryMessage::AuthenticationBroadcast(authentication.clone()), - handler, - ); - assert_eq!(addresses, authentication.0.addresses()); + let (address, command) = discovery.handle_authentication(authentication.clone(), handler); + assert_eq!(address, Some(authentication.0.address())); assert!(matches!(command, Some( - DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication), + PeerAuthentications::NewOnly(rebroadcast_authentication), ) if rebroadcast_authentication == authentication)); } #[tokio::test] - async fn accepts_correct_authentications() { + async fn legacy_rebroadcasts_after_cooldown() { let (mut discovery, mut handlers, _) = build().await; - let expected_address = handlers[1].authentication().unwrap().0.addresses()[0].encode(); - let authentication = handlers[1].authentication().unwrap(); + let authentication = legacy_authentication(&handlers[1]); let handler = &mut handlers[0]; - let (addresses, command) = - discovery.handle_message(DiscoveryMessage::Authentication(authentication), handler); - assert_eq!(addresses.len(), 1); - let address = addresses[0].encode(); - assert_eq!(address, expected_address); - assert!(command.is_none()); + discovery.handle_legacy_authentication(authentication.clone(), handler); + sleep(Duration::from_millis(MS_COOLDOWN + 5)); + let (_, command) = discovery.handle_legacy_authentication(authentication.clone(), handler); + assert!(matches!(command, Some( + PeerAuthentications::LegacyOnly(rebroadcast_authentication), + ) if rebroadcast_authentication == authentication)); } #[tokio::test] - async fn does_not_accept_incorrect_authentications() { + async fn rebroadcasts_legacy_immediately() { let (mut discovery, mut handlers, _) = build().await; - let (auth_data, _) = handlers[1].authentication().unwrap(); - let (_, signature) = handlers[2].authentication().unwrap(); - let incorrect_authentication = (auth_data, signature); + let authentication = authentication(&handlers[1]); + let legacy_authentication = legacy_authentication(&handlers[1]); let handler = &mut handlers[0]; - let (addresses, command) = discovery.handle_message( - DiscoveryMessage::Authentication(incorrect_authentication), - handler, - ); - assert!(addresses.is_empty()); - assert!(command.is_none()); + discovery.handle_authentication(authentication, handler); + let (_, command) = + discovery.handle_legacy_authentication(legacy_authentication.clone(), handler); + assert!(matches!(command, Some( + PeerAuthentications::LegacyOnly(rebroadcast_authentication), + ) if rebroadcast_authentication == legacy_authentication)); } } diff --git a/finality-aleph/src/network/manager/mod.rs b/finality-aleph/src/network/manager/mod.rs index 35ab274ec3..ad79ae7bc8 100644 --- a/finality-aleph/src/network/manager/mod.rs +++ b/finality-aleph/src/network/manager/mod.rs @@ -2,7 +2,7 @@ use codec::{Decode, Encode, Error, Input, Output}; use crate::{ crypto::Signature, - network::{Data, Multiaddress}, + network::{AddressingInformation, Data}, NodeIndex, SessionId, }; @@ -12,25 +12,46 @@ mod discovery; mod service; mod session; -pub use compatibility::VersionedAuthentication; +pub use compatibility::{ + DiscoveryMessage, LegacyDiscoveryMessage, PeerAuthentications, VersionedAuthentication, +}; use connections::Connections; -pub use discovery::{Discovery, DiscoveryMessage}; +pub use discovery::Discovery; pub use service::{ Config as ConnectionManagerConfig, Service as ConnectionManager, SessionCommand, IO as ConnectionIO, }; pub use session::{Handler as SessionHandler, HandlerError as SessionHandlerError}; -/// Data validators use to authenticate themselves for a single session +/// Data validators used to use to authenticate themselves for a single session /// and disseminate their addresses. #[derive(Clone, Debug, PartialEq, Eq, Hash, Encode, Decode)] -pub struct AuthData { +pub struct LegacyAuthData { addresses: Vec, node_id: NodeIndex, session_id: SessionId, } -impl AuthData { +impl LegacyAuthData { + pub fn session(&self) -> SessionId { + self.session_id + } + + pub fn creator(&self) -> NodeIndex { + self.node_id + } +} + +/// Data validators use to authenticate themselves for a single session +/// and disseminate their addresses. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Encode, Decode)] +pub struct AuthData { + address: A, + node_id: NodeIndex, + session_id: SessionId, +} + +impl AuthData { pub fn session(&self) -> SessionId { self.session_id } @@ -39,13 +60,52 @@ impl AuthData { self.node_id } - pub fn addresses(&self) -> Vec { - self.addresses.clone() + pub fn address(&self) -> A { + self.address.clone() } } +impl>> From> for LegacyAuthData { + fn from(auth_data: AuthData) -> Self { + let AuthData { + address, + node_id, + session_id, + } = auth_data; + let addresses = address.into(); + LegacyAuthData { + addresses, + node_id, + session_id, + } + } +} + +impl>> TryFrom> + for AuthData +{ + type Error = (); + + fn try_from(legacy_auth_data: LegacyAuthData) -> Result { + let LegacyAuthData { + addresses, + node_id, + session_id, + } = legacy_auth_data; + let address = addresses.try_into().map_err(|_| ())?; + Ok(AuthData { + address, + node_id, + session_id, + }) + } +} + +/// A full legacy authentication, consisting of a signed LegacyAuthData. +pub type LegacyAuthentication = (LegacyAuthData, Signature); + /// A full authentication, consisting of a signed AuthData. -pub type Authentication = (AuthData, Signature); +pub type Authentication = (AuthData, Signature); /// Data inside session, sent to validator network. /// Wrapper for data send over network. We need it to ensure compatibility. diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/manager/service.rs index e52d75232a..4c9f79d508 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/manager/service.rs @@ -1,6 +1,7 @@ use std::{ cmp, collections::{HashMap, HashSet}, + fmt::Debug, time::Duration, }; @@ -16,10 +17,11 @@ use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, network::{ manager::{ + compatibility::{LegacyDiscoveryMessage, PeerAuthentications}, Connections, DataInSession, Discovery, DiscoveryMessage, SessionHandler, SessionHandlerError, VersionedAuthentication, }, - AddressedData, ConnectionCommand, Data, Multiaddress, NetworkIdentity, PeerId, + AddressedData, AddressingInformation, ConnectionCommand, Data, NetworkIdentity, PeerId, }, validator_network::{Network as ValidatorNetwork, PublicKey}, MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL, @@ -39,9 +41,9 @@ pub enum SessionCommand { Stop(SessionId), } -struct Session { - handler: SessionHandler, - discovery: Discovery, +struct Session> + Into>> { + handler: SessionHandler, + discovery: Discovery, data_for_user: Option>, } @@ -114,12 +116,12 @@ impl Config { /// Actions that the service wants to take as the result of some information. Might contain a /// command for connecting to or disconnecting from some peers or a message to broadcast for /// discovery purposes. -pub struct ServiceActions { - maybe_command: Option>, - maybe_message: Option>, +pub struct ServiceActions> + Into>> { + maybe_command: Option>, + maybe_message: Option>, } -impl ServiceActions { +impl> + Into>> ServiceActions { fn noop() -> Self { ServiceActions { maybe_command: None, @@ -137,10 +139,13 @@ impl ServiceActions { /// 1. In-session messages are forwarded to the user. /// 2. Authentication messages forwarded to session handlers. /// 4. Running periodic maintenance, mostly related to node discovery. -pub struct Service { +pub struct Service +where + NI::AddressingInformation: TryFrom> + Into>, +{ network_identity: NI, - connections: Connections<::PeerId>, - sessions: HashMap>, + connections: Connections, + sessions: HashMap>, to_retry: Vec<( PreSession, Option>>, @@ -150,7 +155,10 @@ pub struct Service { initial_delay: Duration, } -impl Service { +impl Service +where + NI::AddressingInformation: TryFrom> + Into>, +{ /// Create a new connection manager service. pub fn new(network_identity: NI, config: Config) -> Self { let Config { @@ -171,7 +179,7 @@ impl Service { fn delete_reserved( to_remove: HashSet, - ) -> Option> { + ) -> Option> { match to_remove.is_empty() { true => None, false => Some(ConnectionCommand::DelReserved(to_remove)), @@ -181,7 +189,7 @@ impl Service { fn finish_session( &mut self, session_id: SessionId, - ) -> Option> { + ) -> Option> { self.sessions.remove(&session_id); self.to_retry .retain(|(pre_session, _)| pre_session.session_id() != session_id); @@ -191,7 +199,7 @@ impl Service { fn discover_authorities( &mut self, session_id: &SessionId, - ) -> Option> { + ) -> Option> { self.sessions.get_mut(session_id).and_then( |Session { handler, discovery, .. @@ -200,7 +208,7 @@ impl Service { } /// Returns all the network messages that should be sent as part of discovery at this moment. - pub fn discovery(&mut self) -> Vec> { + pub fn discovery(&mut self) -> Vec> { let sessions: Vec<_> = self.sessions.keys().cloned().collect(); sessions .iter() @@ -208,26 +216,14 @@ impl Service { .collect() } - fn addresses(&self) -> Vec { - let (addresses, peer_id) = self.network_identity.identity(); - debug!(target: "aleph-network", "Got addresses:\n{:?}\n and peer_id:{:?}", addresses, peer_id); - addresses - .into_iter() - .filter_map(|address| address.add_matching_peer_id(peer_id.clone())) - .collect() - } - async fn start_validator_session( &mut self, pre_session: PreValidatorSession, - addresses: Vec, - ) -> Result< - ( - Option>, - mpsc::UnboundedReceiver, - ), - SessionHandlerError, - > { + address: NI::AddressingInformation, + ) -> ( + Option>, + mpsc::UnboundedReceiver, + ) { let PreValidatorSession { session_id, verifier, @@ -235,7 +231,7 @@ impl Service { pen, } = pre_session; let handler = - SessionHandler::new(Some((node_id, pen)), verifier, session_id, addresses).await?; + SessionHandler::new(Some((node_id, pen)), verifier, session_id, address).await; let discovery = Discovery::new(self.discovery_cooldown); let (data_for_user, data_from_network) = mpsc::unbounded(); let data_for_user = Some(data_for_user); @@ -247,20 +243,25 @@ impl Service { data_for_user, }, ); - Ok((self.discover_authorities(&session_id), data_from_network)) + (self.discover_authorities(&session_id), data_from_network) } async fn update_validator_session( &mut self, pre_session: PreValidatorSession, - ) -> Result<(ServiceActions, mpsc::UnboundedReceiver), SessionHandlerError> - { - let addresses = self.addresses(); + ) -> Result< + ( + ServiceActions, + mpsc::UnboundedReceiver, + ), + SessionHandlerError, + > { + let address = self.network_identity.identity(); let session = match self.sessions.get_mut(&pre_session.session_id) { Some(session) => session, None => { let (maybe_message, data_from_network) = - self.start_validator_session(pre_session, addresses).await?; + self.start_validator_session(pre_session, address).await; return Ok(( ServiceActions { maybe_command: None, @@ -278,10 +279,10 @@ impl Service { } = pre_session; let peers_to_stay = session .handler - .update(Some((node_id, pen)), verifier, addresses) + .update(Some((node_id, pen)), verifier, address) .await? .iter() - .flat_map(|address| address.get_peer_id()) + .map(|address| address.peer_id()) .collect(); let maybe_command = Self::delete_reserved( self.connections @@ -306,7 +307,7 @@ impl Service { &mut self, pre_session: PreValidatorSession, result_for_user: Option>>, - ) -> Result, SessionHandlerError> { + ) -> Result, SessionHandlerError> { match self.update_validator_session(pre_session.clone()).await { Ok((actions, data_from_network)) => { if let Some(result_for_user) = result_for_user { @@ -327,13 +328,13 @@ impl Service { async fn start_nonvalidator_session( &mut self, pre_session: PreNonvalidatorSession, - addresses: Vec, - ) -> Result<(), SessionHandlerError> { + address: NI::AddressingInformation, + ) { let PreNonvalidatorSession { session_id, verifier, } = pre_session; - let handler = SessionHandler::new(None, verifier, session_id, addresses).await?; + let handler = SessionHandler::new(None, verifier, session_id, address).await; let discovery = Discovery::new(self.discovery_cooldown); self.sessions.insert( session_id, @@ -343,25 +344,22 @@ impl Service { data_for_user: None, }, ); - Ok(()) } async fn update_nonvalidator_session( &mut self, pre_session: PreNonvalidatorSession, ) -> Result<(), SessionHandlerError> { - let addresses = self.addresses(); + let address = self.network_identity.identity(); let session = match self.sessions.get_mut(&pre_session.session_id) { Some(session) => session, None => { - return self - .start_nonvalidator_session(pre_session, addresses) - .await; + return Ok(self.start_nonvalidator_session(pre_session, address).await); } }; session .handler - .update(None, pre_session.verifier, addresses) + .update(None, pre_session.verifier, address) .await?; Ok(()) } @@ -384,7 +382,7 @@ impl Service { pub async fn on_command( &mut self, command: SessionCommand, - ) -> Result, SessionHandlerError> { + ) -> Result, SessionHandlerError> { use SessionCommand::*; match command { StartValidator(session_id, verifier, node_id, pen, result_for_user) => { @@ -419,7 +417,7 @@ impl Service { data: D, session_id: SessionId, recipient: Recipient, - ) -> Vec, ::PeerId>> { + ) -> Vec, NI::PeerId>> { if let Some(handler) = self .sessions .get(&session_id) @@ -447,26 +445,29 @@ impl Service { /// Returns actions the service wants to take. pub fn on_discovery_message( &mut self, - message: DiscoveryMessage, - ) -> ServiceActions { + message: DiscoveryMessage, + ) -> ServiceActions { + use DiscoveryMessage::*; let session_id = message.session_id(); match self.sessions.get_mut(&session_id) { Some(Session { handler, discovery, .. }) => { - let (addresses, maybe_message) = discovery.handle_message(message, handler); - let maybe_command = match !addresses.is_empty() && handler.is_validator() { - true => { - debug!(target: "aleph-network", "Adding addresses for session {:?} to reserved: {:?}", session_id, addresses); - self.connections.add_peers( - session_id, - addresses.iter().flat_map(|address| address.get_peer_id()), - ); - Some(ConnectionCommand::AddReserved( - addresses.into_iter().collect(), - )) + let (maybe_address, maybe_message) = match message { + Authentication(authentication) => { + discovery.handle_authentication(authentication, handler) + } + LegacyAuthentication(legacy_authentication) => { + discovery.handle_legacy_authentication(legacy_authentication, handler) + } + }; + let maybe_command = match (maybe_address, handler.is_validator()) { + (Some(address), true) => { + debug!(target: "aleph-network", "Adding addresses for session {:?} to reserved: {:?}", session_id, address); + self.connections.add_peers(session_id, [address.peer_id()]); + Some(ConnectionCommand::AddReserved([address].into())) } - false => None, + _ => None, }; ServiceActions { maybe_command, @@ -499,7 +500,7 @@ impl Service { /// the request. pub async fn retry_session_start( &mut self, - ) -> Result, SessionHandlerError> { + ) -> Result, SessionHandlerError> { let (pre_session, result_for_user) = match self.to_retry.pop() { Some(to_retry) => to_retry, None => return Ok(ServiceActions::noop()), @@ -596,15 +597,19 @@ impl Service { } } -/// Input/output interface for the connection manager service. -pub struct IO>> -where - M::PeerId: PublicKey, +/// Input/output interface for the connectiona manager service. +pub struct IO< + D: Data, + M: Data, + A: AddressingInformation + TryFrom> + Into>, + VN: ValidatorNetwork>, +> where + A::PeerId: PublicKey, { - authentications_for_network: mpsc::UnboundedSender>, + authentications_for_network: mpsc::UnboundedSender>, commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - authentications_from_network: mpsc::UnboundedReceiver>, + authentications_from_network: mpsc::UnboundedReceiver>, validator_network: VN, } @@ -621,17 +626,22 @@ pub enum Error { NetworkChannel, } -impl>> IO +impl< + D: Data, + M: Data + Debug, + A: AddressingInformation + TryFrom> + Into>, + VN: ValidatorNetwork>, + > IO where - M::PeerId: PublicKey, + A::PeerId: PublicKey, { pub fn new( - authentications_for_network: mpsc::UnboundedSender>, + authentications_for_network: mpsc::UnboundedSender>, commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - authentications_from_network: mpsc::UnboundedReceiver>, + authentications_from_network: mpsc::UnboundedReceiver>, validator_network: VN, - ) -> IO { + ) -> IO { IO { authentications_for_network, commands_from_user, @@ -641,23 +651,42 @@ where } } - fn send_data(&self, to_send: AddressedData, M::PeerId>) { + fn send_data(&self, to_send: AddressedData, A::PeerId>) { self.validator_network.send(to_send.0, to_send.1) } - fn send_authentication(&self, to_send: DiscoveryMessage) -> Result<(), Error> { - self.authentications_for_network - .unbounded_send(VersionedAuthentication::V1(to_send)) - .map_err(|_| Error::NetworkSend) + fn send_authentication(&self, to_send: PeerAuthentications) -> Result<(), Error> { + use PeerAuthentications::*; + match to_send { + NewOnly(authentication) => self + .authentications_for_network + .unbounded_send(VersionedAuthentication::V2(authentication)) + .map_err(|_| Error::NetworkSend), + LegacyOnly(legacy_authentication) => self + .authentications_for_network + .unbounded_send(VersionedAuthentication::V1( + LegacyDiscoveryMessage::AuthenticationBroadcast(legacy_authentication), + )) + .map_err(|_| Error::NetworkSend), + Both(authentication, legacy_authentication) => { + self.authentications_for_network + .unbounded_send(VersionedAuthentication::V2(authentication)) + .map_err(|_| Error::NetworkSend)?; + self.authentications_for_network + .unbounded_send(VersionedAuthentication::V1( + LegacyDiscoveryMessage::AuthenticationBroadcast(legacy_authentication), + )) + .map_err(|_| Error::NetworkSend) + } + } } - fn handle_connection_command(&mut self, connection_command: ConnectionCommand) { + fn handle_connection_command(&mut self, connection_command: ConnectionCommand) { match connection_command { ConnectionCommand::AddReserved(addresses) => { - for multi in addresses { - if let Some(peer_id) = multi.get_peer_id() { - self.validator_network.add_connection(peer_id, vec![multi]); - } + for address in addresses { + self.validator_network + .add_connection(address.peer_id(), address); } } ConnectionCommand::DelReserved(peers) => { @@ -673,7 +702,7 @@ where ServiceActions { maybe_command, maybe_message, - }: ServiceActions, + }: ServiceActions, ) -> Result<(), Error> { if let Some(command) = maybe_command { self.handle_connection_command(command); @@ -685,9 +714,9 @@ where } /// Run the connection manager service with this IO. - pub async fn run>( + pub async fn run>( mut self, - mut service: Service, + mut service: Service, ) -> Result<(), Error> { // Initial delay is needed so that Network is fully set up and we received some first discovery broadcasts from other nodes. // Otherwise this might cause first maintenance never working, as it happens before first broadcasts. @@ -749,7 +778,7 @@ where Err(e) => warn!(target: "aleph-network", "Retry failed to update handler: {:?}", e), } for to_send in service.discovery() { - self.send_authentication(to_send)?; + self.send_authentication(to_send.into())?; } }, _ = status_ticker.tick() => { @@ -762,17 +791,18 @@ where #[cfg(test)] mod tests { - use std::time::Duration; + use std::{iter, time::Duration}; use futures::{channel::oneshot, StreamExt}; use super::{Config, Error, Service, ServiceActions, SessionCommand}; use crate::{ network::{ - manager::{DataInSession, DiscoveryMessage}, - mock::{crypto_basics, MockNetworkIdentity}, + manager::{compatibility::PeerAuthentications, DataInSession, DiscoveryMessage}, + mock::crypto_basics, ConnectionCommand, }, + testing::mocks::validator_network::{random_address, MockAddressingInformation}, Recipient, SessionId, }; @@ -781,9 +811,9 @@ mod tests { const DISCOVERY_PERIOD: Duration = Duration::from_secs(60); const INITIAL_DELAY: Duration = Duration::from_secs(5); - fn build() -> Service { + fn build() -> Service { Service::new( - MockNetworkIdentity::new(), + random_address(), Config::new(MAINTENANCE_PERIOD, DISCOVERY_PERIOD, INITIAL_DELAY), ) } @@ -900,9 +930,12 @@ mod tests { .await .unwrap(); let message = maybe_message.expect("there should be a discovery message"); - let addresses = match &message { - DiscoveryMessage::AuthenticationBroadcast((auth_data, _)) => auth_data.addresses(), - _ => panic!("Expected an authentication broadcast, got {:?}", message), + let (address, message) = match message { + PeerAuthentications::Both(authentication, _) => ( + authentication.0.address(), + DiscoveryMessage::Authentication(authentication), + ), + message => panic!("Expected both authentications, got {:?}", message), }; let ServiceActions { maybe_command, @@ -911,7 +944,7 @@ mod tests { assert_eq!( maybe_command, Some(ConnectionCommand::AddReserved( - addresses.into_iter().collect() + iter::once(address).collect() )) ); assert!(maybe_message.is_some()); @@ -941,7 +974,12 @@ mod tests { )) .await .unwrap(); - let message = maybe_message.expect("there should be a discovery message"); + let message = match maybe_message.expect("there should be a discovery message") { + PeerAuthentications::Both(authentication, _) => { + DiscoveryMessage::Authentication(authentication) + } + message => panic!("Expected both authentications, got {:?}", message), + }; service.on_discovery_message(message); let messages = service.on_user_message(2137, session_id, Recipient::Everyone); assert_eq!(messages.len(), 1); diff --git a/finality-aleph/src/network/manager/session.rs b/finality-aleph/src/network/manager/session.rs index 1c8f84906d..ca03869aff 100644 --- a/finality-aleph/src/network/manager/session.rs +++ b/finality-aleph/src/network/manager/session.rs @@ -6,36 +6,39 @@ use crate::{ abft::NodeCount, crypto::{AuthorityPen, AuthorityVerifier}, network::{ - manager::{AuthData, Authentication}, - Multiaddress, PeerId, + manager::{ + compatibility::PeerAuthentications, AuthData, Authentication, LegacyAuthData, + LegacyAuthentication, + }, + AddressingInformation, Data, }, NodeIndex, SessionId, }; #[derive(Debug)] -pub enum SessionInfo { +pub enum SessionInfo> + Into>> { SessionId(SessionId), - OwnAuthentication(Authentication), + OwnAuthentication(PeerAuthentications), } -impl SessionInfo { +impl> + Into>> SessionInfo { fn session_id(&self) -> SessionId { match self { SessionInfo::SessionId(session_id) => *session_id, - SessionInfo::OwnAuthentication((auth_data, _)) => auth_data.session_id, + SessionInfo::OwnAuthentication(peer_authentications) => { + peer_authentications.session_id() + } } } } -type PeerAuthentications = (Authentication, Option>); - /// A struct for handling authentications for a given session and maintaining /// mappings between PeerIds and NodeIndexes within that session. -pub struct Handler { - peers_by_node: HashMap, - authentications: HashMap>, - session_info: SessionInfo, - own_peer_id: M::PeerId, +pub struct Handler> + Into>> { + peers_by_node: HashMap, + authentications: HashMap>, + session_info: SessionInfo, + own_peer_id: A::PeerId, authority_index_and_pen: Option<(NodeIndex, AuthorityPen)>, authority_verifier: AuthorityVerifier, } @@ -45,105 +48,56 @@ pub enum HandlerError { /// Returned when handler is change from validator to nonvalidator /// or vice versa TypeChange, - /// Returned when a set of addresses is not usable for creating authentications. - /// Either because none of the addresses are externally reachable libp2p addresses, - /// or the addresses contain multiple libp2p PeerIds. - NoP2pAddresses, - MultiplePeerIds, -} - -enum CommonPeerId { - Unknown, - Unique(PID), - NotUnique, -} - -impl From> for Option { - fn from(cpi: CommonPeerId) -> Self { - use CommonPeerId::*; - match cpi { - Unique(peer_id) => Some(peer_id), - Unknown | NotUnique => None, - } - } -} - -impl CommonPeerId { - fn aggregate(self, peer_id: PID) -> Self { - use CommonPeerId::*; - match self { - Unknown => Unique(peer_id), - Unique(current_peer_id) => match peer_id == current_peer_id { - true => Unique(current_peer_id), - false => NotUnique, - }, - NotUnique => NotUnique, - } - } } -fn get_common_peer_id(addresses: &[M]) -> Option { - addresses - .iter() - .fold( - CommonPeerId::Unknown, - |common_peer_id, address| match address.get_peer_id() { - Some(peer_id) => common_peer_id.aggregate(peer_id), - None => CommonPeerId::NotUnique, - }, - ) - .into() -} - -fn retrieve_peer_id(addresses: &[M]) -> Result { - if addresses.is_empty() { - return Err(HandlerError::NoP2pAddresses); - } - get_common_peer_id(addresses).ok_or(HandlerError::MultiplePeerIds) -} - -async fn construct_session_info( +async fn construct_session_info< + M: Data, + A: AddressingInformation + TryFrom> + Into>, +>( authority_index_and_pen: &Option<(NodeIndex, AuthorityPen)>, session_id: SessionId, - addresses: Vec, -) -> Result<(SessionInfo, M::PeerId), HandlerError> { - let addresses: Vec<_> = addresses - .into_iter() - .filter(|address| address.get_peer_id().is_some()) - .collect(); - let peer = retrieve_peer_id(&addresses)?; - - if let Some((node_index, authority_pen)) = authority_index_and_pen { - let auth_data = AuthData { - addresses, - node_id: *node_index, - session_id, - }; - let signature = authority_pen.sign(&auth_data.encode()).await; - return Ok((SessionInfo::OwnAuthentication((auth_data, signature)), peer)); + address: A, +) -> (SessionInfo, A::PeerId) { + let peer_id = address.peer_id(); + match authority_index_and_pen { + Some((node_index, authority_pen)) => { + let auth_data = AuthData { + address, + node_id: *node_index, + session_id, + }; + let legacy_auth_data: LegacyAuthData = auth_data.clone().into(); + let signature = authority_pen.sign(&auth_data.encode()).await; + let legacy_signature = authority_pen.sign(&legacy_auth_data.encode()).await; + let authentications = PeerAuthentications::Both( + (auth_data, signature), + (legacy_auth_data, legacy_signature), + ); + (SessionInfo::OwnAuthentication(authentications), peer_id) + } + None => (SessionInfo::SessionId(session_id), peer_id), } - Ok((SessionInfo::SessionId(session_id), peer)) } -impl Handler { - /// Returns an error if the set of addresses contains no external libp2p addresses, or contains - /// at least two such addresses with differing PeerIds. +impl> + Into>> Handler { + /// Creates a new session handler. It will be a validator session handler if the authority + /// index and pen are provided. pub async fn new( authority_index_and_pen: Option<(NodeIndex, AuthorityPen)>, authority_verifier: AuthorityVerifier, session_id: SessionId, - addresses: Vec, - ) -> Result, HandlerError> { + address: A, + ) -> Handler { let (session_info, own_peer_id) = - construct_session_info(&authority_index_and_pen, session_id, addresses).await?; - Ok(Handler { + construct_session_info(&authority_index_and_pen, session_id, address).await; + Handler { peers_by_node: HashMap::new(), authentications: HashMap::new(), session_info, authority_index_and_pen, authority_verifier, own_peer_id, - }) + } } fn index(&self) -> Option { @@ -166,10 +120,12 @@ impl Handler { } /// Returns the authentication for the node and session this handler is responsible for. - pub fn authentication(&self) -> Option> { + pub fn authentication(&self) -> Option> { match &self.session_info { SessionInfo::SessionId(_) => None, - SessionInfo::OwnAuthentication(own_authentication) => Some(own_authentication.clone()), + SessionInfo::OwnAuthentication(own_authentications) => { + Some(own_authentications.clone()) + } } } @@ -187,61 +143,95 @@ impl Handler { .collect() } - /// Verifies the authentication, uses it to update mappings, and returns whether we should - /// remain connected to the multiaddresses. - pub fn handle_authentication(&mut self, authentication: Authentication) -> bool { - if authentication.0.session_id != self.session_id() { - return false; + /// Verifies the authentication, uses it to update mappings, and returns the address we + /// should stay connected to if any. + pub fn handle_authentication(&mut self, authentication: Authentication) -> Option { + if authentication.0.session() != self.session_id() { + return None; } let (auth_data, signature) = &authentication; - // The auth is completely useless if it doesn't have a consistent PeerId. - let peer_id = match get_common_peer_id(&auth_data.addresses) { - Some(peer_id) => peer_id, - None => return false, - }; + let address = auth_data.address(); + let peer_id = address.peer_id(); if peer_id == self.own_peer_id { - return false; + return None; } if !self .authority_verifier - .verify(&auth_data.encode(), signature, auth_data.node_id) + .verify(&auth_data.encode(), signature, auth_data.creator()) { - // This might be an authentication for a key that has been changed, but we are not yet - // aware of the change. - if let Some(auth_pair) = self.authentications.get_mut(&peer_id) { - auth_pair.1 = Some(authentication.clone()); - } - return false; + return None; + } + self.peers_by_node + .insert(auth_data.creator(), peer_id.clone()); + self.authentications + .entry(peer_id) + .and_modify(|authentications| { + authentications.add_authentication(authentication.clone()) + }) + .or_insert(PeerAuthentications::NewOnly(authentication)); + Some(address) + } + + /// Verifies the legacy authentication, uses it to update mappings, and returns the address we should stay connected to if any. + pub fn handle_legacy_authentication( + &mut self, + legacy_authentication: LegacyAuthentication, + ) -> Option { + if legacy_authentication.0.session() != self.session_id() { + return None; + } + let (legacy_auth_data, signature) = &legacy_authentication; + + if !self.authority_verifier.verify( + &legacy_auth_data.encode(), + signature, + legacy_auth_data.creator(), + ) { + return None; + } + + let maybe_auth_data: Option> = legacy_auth_data.clone().try_into().ok(); + let address = match maybe_auth_data { + Some(auth_data) => auth_data.address(), + None => return None, + }; + let peer_id = address.peer_id(); + if peer_id == self.own_peer_id { + return None; } self.peers_by_node - .insert(auth_data.node_id, peer_id.clone()); - self.authentications.insert(peer_id, (authentication, None)); - true + .insert(legacy_auth_data.creator(), peer_id.clone()); + self.authentications + .entry(peer_id) + .and_modify(|authentications| { + authentications.add_legacy_authentication(legacy_authentication.clone()) + }) + .or_insert(PeerAuthentications::LegacyOnly(legacy_authentication)); + Some(address) } /// Returns the PeerId of the node with the given NodeIndex, if known. - pub fn peer_id(&self, node_id: &NodeIndex) -> Option { + pub fn peer_id(&self, node_id: &NodeIndex) -> Option { self.peers_by_node.get(node_id).cloned() } /// Returns maping from NodeIndex to PeerId - pub fn peers(&self) -> HashMap { + pub fn peers(&self) -> HashMap { self.peers_by_node.clone() } /// Updates the handler with the given keychain and set of own addresses. /// Returns an error if the set of addresses is not valid. - /// All authentications will be rechecked, invalid ones purged and cached ones that turn out to - /// now be valid canonalized. + /// All authentications will be rechecked, invalid ones purged. /// Own authentication will be regenerated. /// If successful returns a set of addresses that we should be connected to. pub async fn update( &mut self, authority_index_and_pen: Option<(NodeIndex, AuthorityPen)>, authority_verifier: AuthorityVerifier, - addresses: Vec, - ) -> Result, HandlerError> { + address: A, + ) -> Result, HandlerError> { if authority_index_and_pen.is_none() != self.authority_index_and_pen.is_none() { return Err(HandlerError::TypeChange); } @@ -252,77 +242,44 @@ impl Handler { authority_index_and_pen, authority_verifier, self.session_id(), - addresses, + address, ) - .await?; - - for (_, (auth, maybe_auth)) in authentications { - self.handle_authentication(auth); - if let Some(auth) = maybe_auth { - self.handle_authentication(auth); - } + .await; + + use PeerAuthentications::*; + for (_, authentication) in authentications { + match authentication { + NewOnly(auth) => self.handle_authentication(auth), + LegacyOnly(legacy_auth) => self.handle_legacy_authentication(legacy_auth), + Both(auth, legacy_auth) => { + self.handle_legacy_authentication(legacy_auth); + self.handle_authentication(auth) + } + }; } Ok(self .authentications .values() - .flat_map(|((auth_data, _), _)| auth_data.addresses.iter().cloned()) + .flat_map(|authentication| authentication.maybe_address()) .collect()) } } #[cfg(test)] mod tests { - use super::{get_common_peer_id, Handler, HandlerError}; + use super::{Handler, HandlerError}; use crate::{ network::{ - mock::{crypto_basics, MockNetworkIdentity}, - NetworkIdentity, + mock::crypto_basics, + testing::{authentication, legacy_authentication}, + AddressingInformation, }, - testing::mocks::validator_network::{random_multiaddress, MockMultiaddress}, + testing::mocks::validator_network::random_address, NodeIndex, SessionId, }; const NUM_NODES: usize = 7; - #[tokio::test] - async fn creates_with_correct_data() { - let mut crypto_basics = crypto_basics(NUM_NODES).await; - assert!(Handler::new( - Some(crypto_basics.0.pop().unwrap()), - crypto_basics.1, - SessionId(43), - MockNetworkIdentity::new().identity().0, - ) - .await - .is_ok()); - } - - #[tokio::test] - async fn creates_with_local_address() { - let mut crypto_basics = crypto_basics(NUM_NODES).await; - assert!(Handler::new( - Some(crypto_basics.0.pop().unwrap()), - crypto_basics.1, - SessionId(43), - MockNetworkIdentity::new().identity().0, - ) - .await - .is_ok()); - } - - #[tokio::test] - async fn creates_without_node_index_nor_authority_pen() { - let crypto_basics = crypto_basics(NUM_NODES).await; - assert!(Handler::new( - None, - crypto_basics.1, - SessionId(43), - MockNetworkIdentity::new().identity().0, - ) - .await - .is_ok()); - } - #[tokio::test] async fn identifies_whether_node_is_authority_in_current_session() { let mut crypto_basics = crypto_basics(NUM_NODES).await; @@ -330,18 +287,16 @@ mod tests { None, crypto_basics.1.clone(), SessionId(43), - MockNetworkIdentity::new().identity().0, + random_address(), ) - .await - .unwrap(); + .await; let authority_handler = Handler::new( Some(crypto_basics.0.pop().unwrap()), crypto_basics.1, SessionId(43), - MockNetworkIdentity::new().identity().0, + random_address(), ) - .await - .unwrap(); + .await; assert!(!no_authority_handler.is_validator()); assert!(authority_handler.is_validator()); } @@ -349,64 +304,28 @@ mod tests { #[tokio::test] async fn non_validator_handler_returns_none_for_authentication() { let crypto_basics = crypto_basics(NUM_NODES).await; - assert!(Handler::new( - None, - crypto_basics.1, - SessionId(43), - MockNetworkIdentity::new().identity().0 - ) - .await - .unwrap() - .authentication() - .is_none()); - } - - #[tokio::test] - async fn fails_to_create_with_no_addresses() { - let mut crypto_basics = crypto_basics(NUM_NODES).await; - assert!(matches!( - Handler::new( - Some(crypto_basics.0.pop().unwrap()), - crypto_basics.1, - SessionId(43), - Vec::::new() - ) - .await, - Err(HandlerError::NoP2pAddresses) - )); - } - - #[tokio::test] - async fn fails_to_create_with_non_unique_peer_id() { - let mut crypto_basics = crypto_basics(NUM_NODES).await; - let addresses = vec![random_multiaddress(), random_multiaddress()]; - assert!(matches!( - Handler::new( - Some(crypto_basics.0.pop().unwrap()), - crypto_basics.1, - SessionId(43), - addresses - ) - .await, - Err(HandlerError::MultiplePeerIds) - )); + assert!( + Handler::new(None, crypto_basics.1, SessionId(43), random_address(),) + .await + .authentication() + .is_none() + ); } #[tokio::test] async fn fails_to_update_from_validator_to_non_validator() { let mut crypto_basics = crypto_basics(NUM_NODES).await; - let addresses = MockNetworkIdentity::new().identity().0; + let address = random_address(); let mut handler0 = Handler::new( Some(crypto_basics.0.pop().unwrap()), crypto_basics.1.clone(), SessionId(43), - addresses.clone(), + address.clone(), ) - .await - .unwrap(); + .await; assert!(matches!( handler0 - .update(None, crypto_basics.1.clone(), addresses) + .update(None, crypto_basics.1.clone(), address) .await, Err(HandlerError::TypeChange) )); @@ -415,21 +334,20 @@ mod tests { #[tokio::test] async fn fails_to_update_from_non_validator_to_validator() { let mut crypto_basics = crypto_basics(NUM_NODES).await; - let addresses = MockNetworkIdentity::new().identity().0; + let address = random_address(); let mut handler0 = Handler::new( None, crypto_basics.1.clone(), SessionId(43), - addresses.clone(), + address.clone(), ) - .await - .unwrap(); + .await; assert!(matches!( handler0 .update( Some(crypto_basics.0.pop().unwrap()), crypto_basics.1.clone(), - addresses, + address, ) .await, Err(HandlerError::TypeChange) @@ -443,10 +361,9 @@ mod tests { Some(crypto_basics.0.pop().unwrap()), crypto_basics.1, SessionId(43), - MockNetworkIdentity::new().identity().0, + random_address(), ) - .await - .unwrap(); + .await; assert!(handler0.peer_id(&NodeIndex(0)).is_none()); } @@ -457,10 +374,9 @@ mod tests { Some(crypto_basics.0.pop().unwrap()), crypto_basics.1, SessionId(43), - MockNetworkIdentity::new().identity().0, + random_address(), ) - .await - .unwrap(); + .await; let missing_nodes = handler0.missing_nodes(); let expected_missing: Vec<_> = (0..NUM_NODES - 1).map(NodeIndex).collect(); assert_eq!(missing_nodes, expected_missing); @@ -474,25 +390,28 @@ mod tests { Some(crypto_basics.0[0].clone()), crypto_basics.1.clone(), SessionId(43), - MockNetworkIdentity::new().identity().0, + random_address(), ) - .await - .unwrap(); - let addresses = MockNetworkIdentity::new().identity().0; + .await; + let address = random_address(); let handler1 = Handler::new( Some(crypto_basics.0[1].clone()), crypto_basics.1.clone(), SessionId(43), - addresses.clone(), + address.clone(), ) - .await - .unwrap(); - assert!(handler0.handle_authentication(handler1.authentication().unwrap())); + .await; + assert!(handler0 + .handle_authentication(authentication(&handler1)) + .is_some()); + assert!(handler0 + .handle_legacy_authentication(legacy_authentication(&handler1)) + .is_some()); let missing_nodes = handler0.missing_nodes(); let expected_missing: Vec<_> = (2..NUM_NODES).map(NodeIndex).collect(); assert_eq!(missing_nodes, expected_missing); - let peer_id1 = get_common_peer_id(&addresses); - assert_eq!(handler0.peer_id(&NodeIndex(1)), peer_id1); + let peer_id1 = address.peer_id(); + assert_eq!(handler0.peer_id(&NodeIndex(1)), Some(peer_id1)); } #[tokio::test] @@ -502,26 +421,29 @@ mod tests { None, crypto_basics.1.clone(), SessionId(43), - MockNetworkIdentity::new().identity().0, + random_address(), ) - .await - .unwrap(); - let addresses = MockNetworkIdentity::new().identity().0; + .await; + let address = random_address(); let handler1 = Handler::new( Some(crypto_basics.0[1].clone()), crypto_basics.1.clone(), SessionId(43), - addresses.clone(), + address.clone(), ) - .await - .unwrap(); - assert!(handler0.handle_authentication(handler1.authentication().unwrap())); + .await; + assert!(handler0 + .handle_authentication(authentication(&handler1)) + .is_some()); + assert!(handler0 + .handle_legacy_authentication(legacy_authentication(&handler1)) + .is_some()); let missing_nodes = handler0.missing_nodes(); let mut expected_missing: Vec<_> = (0..NUM_NODES).map(NodeIndex).collect(); expected_missing.remove(1); assert_eq!(missing_nodes, expected_missing); - let peer_id1 = get_common_peer_id(&addresses); - assert_eq!(handler0.peer_id(&NodeIndex(1)), peer_id1); + let peer_id1 = address.peer_id(); + assert_eq!(handler0.peer_id(&NodeIndex(1)), Some(peer_id1)); } #[tokio::test] @@ -531,21 +453,19 @@ mod tests { Some(crypto_basics.0[0].clone()), crypto_basics.1.clone(), SessionId(43), - MockNetworkIdentity::new().identity().0, + random_address(), ) - .await - .unwrap(); + .await; let handler1 = Handler::new( Some(crypto_basics.0[1].clone()), crypto_basics.1.clone(), SessionId(43), - MockNetworkIdentity::new().identity().0, + random_address(), ) - .await - .unwrap(); - let mut authentication = handler1.authentication().unwrap(); - authentication.1 = handler0.authentication().unwrap().1; - assert!(!handler0.handle_authentication(authentication)); + .await; + let mut bad_authentication = authentication(&handler1); + bad_authentication.1 = authentication(&handler0).1; + assert!(handler0.handle_authentication(bad_authentication).is_none()); let missing_nodes = handler0.missing_nodes(); let expected_missing: Vec<_> = (1..NUM_NODES).map(NodeIndex).collect(); assert_eq!(missing_nodes, expected_missing); @@ -558,19 +478,22 @@ mod tests { Some(crypto_basics.0[0].clone()), crypto_basics.1.clone(), SessionId(43), - MockNetworkIdentity::new().identity().0, + random_address(), ) - .await - .unwrap(); + .await; let handler1 = Handler::new( Some(crypto_basics.0[1].clone()), crypto_basics.1.clone(), SessionId(44), - MockNetworkIdentity::new().identity().0, + random_address(), ) - .await - .unwrap(); - assert!(!handler0.handle_authentication(handler1.authentication().unwrap())); + .await; + assert!(handler0 + .handle_authentication(authentication(&handler1)) + .is_none()); + assert!(handler0 + .handle_legacy_authentication(legacy_authentication(&handler1)) + .is_none()); let missing_nodes = handler0.missing_nodes(); let expected_missing: Vec<_> = (1..NUM_NODES).map(NodeIndex).collect(); assert_eq!(missing_nodes, expected_missing); @@ -583,11 +506,15 @@ mod tests { Some(awaited_crypto_basics.0[0].clone()), awaited_crypto_basics.1.clone(), SessionId(43), - MockNetworkIdentity::new().identity().0, + random_address(), ) - .await - .unwrap(); - assert!(!handler0.handle_authentication(handler0.authentication().unwrap())); + .await; + assert!(handler0 + .handle_authentication(authentication(&handler0)) + .is_none()); + assert!(handler0 + .handle_legacy_authentication(legacy_authentication(&handler0)) + .is_none()); let missing_nodes = handler0.missing_nodes(); let expected_missing: Vec<_> = (1..NUM_NODES).map(NodeIndex).collect(); assert_eq!(missing_nodes, expected_missing); @@ -600,25 +527,28 @@ mod tests { Some(awaited_crypto_basics.0[0].clone()), awaited_crypto_basics.1.clone(), SessionId(43), - MockNetworkIdentity::new().identity().0, + random_address(), ) - .await - .unwrap(); + .await; let handler1 = Handler::new( Some(awaited_crypto_basics.0[1].clone()), awaited_crypto_basics.1.clone(), SessionId(43), - MockNetworkIdentity::new().identity().0, + random_address(), ) - .await - .unwrap(); - assert!(handler0.handle_authentication(handler1.authentication().unwrap())); + .await; + assert!(handler0 + .handle_authentication(authentication(&handler1)) + .is_some()); + assert!(handler0 + .handle_legacy_authentication(legacy_authentication(&handler1)) + .is_some()); let new_crypto_basics = crypto_basics(NUM_NODES).await; handler0 .update( Some(new_crypto_basics.0[0].clone()), new_crypto_basics.1.clone(), - MockNetworkIdentity::new().identity().0, + random_address(), ) .await .unwrap(); @@ -627,54 +557,4 @@ mod tests { assert_eq!(missing_nodes, expected_missing); assert!(handler0.peer_id(&NodeIndex(1)).is_none()); } - - #[tokio::test] - async fn uses_cached_authentication() { - let awaited_crypto_basics = crypto_basics(NUM_NODES).await; - let addresses0 = MockNetworkIdentity::new().identity().0; - let mut handler0 = Handler::new( - Some(awaited_crypto_basics.0[0].clone()), - awaited_crypto_basics.1.clone(), - SessionId(43), - addresses0.clone(), - ) - .await - .unwrap(); - let addresses1 = MockNetworkIdentity::new().identity().0; - let mut handler1 = Handler::new( - Some(awaited_crypto_basics.0[1].clone()), - awaited_crypto_basics.1.clone(), - SessionId(43), - addresses1.clone(), - ) - .await - .unwrap(); - assert!(handler0.handle_authentication(handler1.authentication().unwrap())); - let new_crypto_basics = crypto_basics(NUM_NODES).await; - assert!(handler1 - .update( - Some(new_crypto_basics.0[1].clone()), - new_crypto_basics.1.clone(), - addresses1.clone(), - ) - .await - .unwrap() - .is_empty()); - assert!(!handler0.handle_authentication(handler1.authentication().unwrap())); - handler0 - .update( - Some(new_crypto_basics.0[0].clone()), - new_crypto_basics.1.clone(), - addresses0, - ) - .await - .unwrap(); - let missing_nodes = handler0.missing_nodes(); - let expected_missing: Vec<_> = (2..NUM_NODES).map(NodeIndex).collect(); - assert_eq!(missing_nodes, expected_missing); - assert_eq!( - handler0.peer_id(&NodeIndex(1)), - get_common_peer_id(&addresses1) - ); - } } diff --git a/finality-aleph/src/network/mock.rs b/finality-aleph/src/network/mock.rs index 2567b307b4..69886efb98 100644 --- a/finality-aleph/src/network/mock.rs +++ b/finality-aleph/src/network/mock.rs @@ -12,33 +12,11 @@ use tokio::time::timeout; use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, - network::{Event, EventStream, Network, NetworkIdentity, NetworkSender, Protocol}, - testing::mocks::validator_network::{random_identity, MockMultiaddress}, + network::{Event, EventStream, Network, NetworkSender, Protocol}, validator_network::mock::MockPublicKey, AuthorityId, NodeIndex, }; -pub struct MockNetworkIdentity { - addresses: Vec, - peer_id: MockPublicKey, -} - -impl MockNetworkIdentity { - pub fn new() -> Self { - let (addresses, peer_id) = random_identity(); - MockNetworkIdentity { addresses, peer_id } - } -} - -impl NetworkIdentity for MockNetworkIdentity { - type PeerId = MockPublicKey; - type Multiaddress = MockMultiaddress; - - fn identity(&self) -> (Vec, Self::PeerId) { - (self.addresses.clone(), self.peer_id.clone()) - } -} - #[derive(Clone)] pub struct Channel( pub mpsc::UnboundedSender, diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index 0870b296f6..1dad911b0f 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -36,9 +36,36 @@ pub use session::{Manager as SessionManager, ManagerError, Sender, IO as Session pub use split::{split, Split}; #[cfg(test)] pub mod testing { + use super::manager::LegacyAuthentication; pub use super::manager::{ - Authentication, DataInSession, DiscoveryMessage, SessionHandler, VersionedAuthentication, + Authentication, DataInSession, DiscoveryMessage, LegacyDiscoveryMessage, + PeerAuthentications, SessionHandler, VersionedAuthentication, }; + use crate::testing::mocks::validator_network::MockAddressingInformation; + + pub fn legacy_authentication( + handler: &SessionHandler, + ) -> LegacyAuthentication { + match handler + .authentication() + .expect("this is a validator handler") + { + PeerAuthentications::Both(_, authentication) => authentication, + _ => panic!("handler doesn't have both authentications"), + } + } + + pub fn authentication( + handler: &SessionHandler, + ) -> Authentication { + match handler + .authentication() + .expect("this is a validator handler") + { + PeerAuthentications::Both(authentication, _) => authentication, + _ => panic!("handler doesn't have both authentications"), + } + } } /// Represents the id of an arbitrary node. @@ -59,14 +86,11 @@ pub trait PeerId: PartialEq + Eq + Clone + Debug + Display + Hash + Codec + Send } /// Represents the address of an arbitrary node. -pub trait Multiaddress: Debug + Hash + Codec + Clone + Eq + Send + Sync + 'static { +pub trait AddressingInformation: Debug + Hash + Codec + Clone + Eq + Send + Sync + 'static { type PeerId: PeerId; /// Returns the peer id associated with this multiaddress if it exists and is unique. - fn get_peer_id(&self) -> Option; - - /// Returns the address extended by the peer id, unless it already contained another peer id. - fn add_matching_peer_id(self, peer_id: Self::PeerId) -> Option; + fn peer_id(&self) -> Self::PeerId; } /// The Authentication protocol is used for validator discovery. @@ -117,13 +141,13 @@ pub trait Network: Clone + Send + Sync + 'static { ) -> Result; } -/// Abstraction for requesting own network addresses and PeerId. +/// Abstraction for requesting own network addressing information. pub trait NetworkIdentity { type PeerId: PeerId; - type Multiaddress: Multiaddress; + type AddressingInformation: AddressingInformation; - /// The external identity of this node, consisting of addresses and the PeerId. - fn identity(&self) -> (Vec, Self::PeerId); + /// The external identity of this node. + fn identity(&self) -> Self::AddressingInformation; } /// Abstraction for requesting justifications for finalized blocks and stale blocks. @@ -148,9 +172,9 @@ pub trait RequestBlocks: Clone + Send + Sync + 'static { /// Commands for manipulating the reserved peers set. #[derive(Debug, PartialEq, Eq)] -pub enum ConnectionCommand { - AddReserved(HashSet), - DelReserved(HashSet), +pub enum ConnectionCommand { + AddReserved(HashSet), + DelReserved(HashSet), } /// Returned when something went wrong when sending data using a DataNetwork. diff --git a/finality-aleph/src/tcp_network.rs b/finality-aleph/src/tcp_network.rs index 9c0dd4bf4a..7c94120aa9 100644 --- a/finality-aleph/src/tcp_network.rs +++ b/finality-aleph/src/tcp_network.rs @@ -1,4 +1,4 @@ -use std::{io::Result as IoResult, net::ToSocketAddrs as _}; +use std::{io::Error as IoError, iter, net::ToSocketAddrs as _}; use aleph_primitives::AuthorityId; use codec::{Decode, Encode}; @@ -11,7 +11,7 @@ use tokio::net::{ use crate::{ crypto::{verify, AuthorityPen, Signature}, - network::{Multiaddress, NetworkIdentity, PeerId}, + network::{AddressingInformation, NetworkIdentity, PeerId}, validator_network::{ConnectionInfo, Dialer, Listener, PublicKey, SecretKey, Splittable}, }; @@ -94,23 +94,97 @@ impl SecretKey for AuthorityPen { /// A representation of a single TCP address with an associated peer ID. #[derive(Debug, Hash, Encode, Decode, Clone, PartialEq, Eq)] -pub struct TcpMultiaddress { +pub struct LegacyTcpMultiaddress { peer_id: AuthorityId, address: String, } -impl Multiaddress for TcpMultiaddress { +/// What can go wrong when handling addressing information. +#[derive(Debug, Hash, Clone, PartialEq, Eq)] +pub enum AddressingInformationError { + /// Construction of an addressing information object requires at least one address. + NoAddress, +} + +/// A representation of TCP addressing information with an associated peer ID. +#[derive(Debug, Hash, Encode, Decode, Clone, PartialEq, Eq)] +pub struct TcpAddressingInformation { + peer_id: AuthorityId, + // Easiest way to ensure that the Vec below is nonempty... + primary_address: String, + other_addresses: Vec, +} + +impl TryFrom> for TcpAddressingInformation { + type Error = AddressingInformationError; + + fn try_from(legacy: Vec) -> Result { + let mut legacy = legacy.into_iter(); + let (peer_id, primary_address) = match legacy.next() { + Some(LegacyTcpMultiaddress { peer_id, address }) => (peer_id, address), + None => return Err(AddressingInformationError::NoAddress), + }; + let other_addresses = legacy + .filter(|la| la.peer_id == peer_id) + .map(|la| la.address) + .collect(); + Ok(TcpAddressingInformation { + peer_id, + primary_address, + other_addresses, + }) + } +} + +impl From for Vec { + fn from(address: TcpAddressingInformation) -> Self { + let TcpAddressingInformation { + peer_id, + primary_address, + other_addresses, + } = address; + iter::once(primary_address) + .chain(other_addresses) + .map(|address| LegacyTcpMultiaddress { + peer_id: peer_id.clone(), + address, + }) + .collect() + } +} + +impl AddressingInformation for TcpAddressingInformation { type PeerId = AuthorityId; - fn get_peer_id(&self) -> Option { - Some(self.peer_id.clone()) + fn peer_id(&self) -> Self::PeerId { + self.peer_id.clone() } +} - fn add_matching_peer_id(self, peer_id: Self::PeerId) -> Option { - match self.peer_id == peer_id { - true => Some(self), - false => None, - } +impl NetworkIdentity for TcpAddressingInformation { + type PeerId = AuthorityId; + type AddressingInformation = TcpAddressingInformation; + + fn identity(&self) -> Self::AddressingInformation { + self.clone() + } +} + +impl TcpAddressingInformation { + fn new( + addresses: Vec, + peer_id: AuthorityId, + ) -> Result { + let mut addresses = addresses.into_iter(); + let primary_address = match addresses.next() { + Some(address) => address, + None => return Err(AddressingInformationError::NoAddress), + }; + Ok(TcpAddressingInformation { + primary_address, + other_addresses: addresses.collect(), + peer_id, + }) } } @@ -118,17 +192,22 @@ impl Multiaddress for TcpMultiaddress { struct TcpDialer; #[async_trait::async_trait] -impl Dialer for TcpDialer { +impl Dialer for TcpDialer { type Connection = TcpStream; type Error = std::io::Error; async fn connect( &mut self, - addresses: Vec, + address: TcpAddressingInformation, ) -> Result { - let parsed_addresses: Vec<_> = addresses - .into_iter() - .filter_map(|address| address.address.to_socket_addrs().ok()) + let TcpAddressingInformation { + primary_address, + other_addresses, + .. + } = address; + let parsed_addresses: Vec<_> = iter::once(primary_address) + .chain(other_addresses) + .filter_map(|address| address.to_socket_addrs().ok()) .flatten() .collect(); let stream = TcpStream::connect(&parsed_addresses[..]).await?; @@ -139,32 +218,22 @@ impl Dialer for TcpDialer { } } -struct TcpNetworkIdentity { - peer_id: AuthorityId, - addresses: Vec, +/// Possible errors when creating a TCP network. +#[derive(Debug)] +pub enum Error { + Io(IoError), + AddressingInformation(AddressingInformationError), } -impl NetworkIdentity for TcpNetworkIdentity { - type PeerId = AuthorityId; - type Multiaddress = TcpMultiaddress; - - fn identity(&self) -> (Vec, Self::PeerId) { - (self.addresses.clone(), self.peer_id.clone()) +impl From for Error { + fn from(e: IoError) -> Self { + Error::Io(e) } } -impl TcpNetworkIdentity { - fn new(external_addresses: Vec, peer_id: AuthorityId) -> TcpNetworkIdentity { - TcpNetworkIdentity { - addresses: external_addresses - .into_iter() - .map(|address| TcpMultiaddress { - peer_id: peer_id.clone(), - address, - }) - .collect(), - peer_id, - } +impl From for Error { + fn from(e: AddressingInformationError) -> Self { + Error::AddressingInformation(e) } } @@ -174,13 +243,16 @@ pub async fn new_tcp_network( listening_addresses: A, external_addresses: Vec, peer_id: AuthorityId, -) -> IoResult<( - impl Dialer, - impl Listener, - impl NetworkIdentity, -)> { +) -> Result< + ( + impl Dialer, + impl Listener, + impl NetworkIdentity, + ), + Error, +> { let listener = TcpListener::bind(listening_addresses).await?; - let identity = TcpNetworkIdentity::new(external_addresses, peer_id); + let identity = TcpAddressingInformation::new(external_addresses, peer_id)?; Ok((TcpDialer {}, listener, identity)) } @@ -188,13 +260,16 @@ pub async fn new_tcp_network( pub mod testing { use aleph_primitives::AuthorityId; - use super::{TcpMultiaddress, TcpNetworkIdentity}; + use super::TcpAddressingInformation; use crate::network::NetworkIdentity; + /// Creates a realistic identity. pub fn new_identity( external_addresses: Vec, peer_id: AuthorityId, - ) -> impl NetworkIdentity { - TcpNetworkIdentity::new(external_addresses, peer_id) + ) -> impl NetworkIdentity + { + TcpAddressingInformation::new(external_addresses, peer_id) + .expect("the provided addresses are fine") } } diff --git a/finality-aleph/src/testing/mocks/validator_network.rs b/finality-aleph/src/testing/mocks/validator_network.rs index e0fcb0ecf5..a9cb23ec6c 100644 --- a/finality-aleph/src/testing/mocks/validator_network.rs +++ b/finality-aleph/src/testing/mocks/validator_network.rs @@ -20,7 +20,7 @@ use tokio::{ }; use crate::{ - network::{mock::Channel, Data, Multiaddress, NetworkIdentity}, + network::{mock::Channel, AddressingInformation, Data, NetworkIdentity}, validator_network::{ mock::{key, random_keys, MockPublicKey, MockSecretKey}, ConnectionInfo, Dialer as DialerT, Listener as ListenerT, Network, PeerAddressInfo, @@ -28,58 +28,43 @@ use crate::{ }, }; -pub type MockMultiaddress = (MockPublicKey, String); +#[derive(Clone, Debug, PartialEq, Eq, Hash, Encode, Decode)] +pub struct MockAddressingInformation { + peer_id: MockPublicKey, + address: String, +} -impl Multiaddress for MockMultiaddress { +impl AddressingInformation for MockAddressingInformation { type PeerId = MockPublicKey; - fn get_peer_id(&self) -> Option { - Some(self.0.clone()) - } - - fn add_matching_peer_id(self, peer_id: Self::PeerId) -> Option { - match self.0 == peer_id { - true => Some(self), - false => None, - } + fn peer_id(&self) -> Self::PeerId { + self.peer_id.clone() } } -#[derive(Clone)] -pub struct MockNetwork { - pub add_connection: Channel<(MockPublicKey, Vec)>, - pub remove_connection: Channel, - pub send: Channel<(D, MockPublicKey)>, - pub next: Channel, - id: MockPublicKey, - addresses: Vec, -} - -#[async_trait::async_trait] -impl Network for MockNetwork { - fn add_connection(&mut self, peer: MockPublicKey, addresses: Vec) { - self.add_connection.send((peer, addresses)); - } - - fn remove_connection(&mut self, peer: MockPublicKey) { - self.remove_connection.send(peer); - } +impl NetworkIdentity for MockAddressingInformation { + type PeerId = MockPublicKey; + type AddressingInformation = MockAddressingInformation; - fn send(&self, data: D, recipient: MockPublicKey) { - self.send.send((data, recipient)); + fn identity(&self) -> Self::AddressingInformation { + self.clone() } +} - async fn next(&mut self) -> Option { - self.next.next().await +impl From for Vec { + fn from(address: MockAddressingInformation) -> Self { + vec![address] } } -impl NetworkIdentity for MockNetwork { - type PeerId = MockPublicKey; - type Multiaddress = MockMultiaddress; +impl TryFrom> for MockAddressingInformation { + type Error = (); - fn identity(&self) -> (Vec, Self::PeerId) { - (self.addresses.clone(), self.id.clone()) + fn try_from(mut addresses: Vec) -> Result { + match addresses.pop() { + Some(address) => Ok(address), + None => Err(()), + } } } @@ -87,13 +72,13 @@ pub fn random_peer_id() -> MockPublicKey { key().0 } -pub fn random_identity_with_address(address: String) -> (Vec, MockPublicKey) { - let id = random_peer_id(); - (vec![(id.clone(), address)], id) +pub fn random_address_from(address: String) -> MockAddressingInformation { + let peer_id = random_peer_id(); + MockAddressingInformation { peer_id, address } } -pub fn random_identity() -> (Vec, MockPublicKey) { - random_identity_with_address( +pub fn random_address() -> MockAddressingInformation { + random_address_from( rand::thread_rng() .sample_iter(&rand::distributions::Alphanumeric) .map(char::from) @@ -102,32 +87,40 @@ pub fn random_identity() -> (Vec, MockPublicKey) { ) } -pub fn random_multiaddress() -> MockMultiaddress { - random_identity().0.pop().expect("we created an address") +#[derive(Clone)] +pub struct MockNetwork { + pub add_connection: Channel<(MockPublicKey, MockAddressingInformation)>, + pub remove_connection: Channel, + pub send: Channel<(D, MockPublicKey)>, + pub next: Channel, } -impl MockNetwork { - pub async fn new(address: &str) -> Self { - let id = random_peer_id(); - let addresses = vec![(id.clone(), String::from(address))]; - MockNetwork { - add_connection: Channel::new(), - remove_connection: Channel::new(), - send: Channel::new(), - next: Channel::new(), - addresses, - id, - } +#[async_trait::async_trait] +impl Network for MockNetwork { + fn add_connection(&mut self, peer: MockPublicKey, address: MockAddressingInformation) { + self.add_connection.send((peer, address)); + } + + fn remove_connection(&mut self, peer: MockPublicKey) { + self.remove_connection.send(peer); + } + + fn send(&self, data: D, recipient: MockPublicKey) { + self.send.send((data, recipient)); } - pub fn from(addresses: Vec, id: MockPublicKey) -> Self { + async fn next(&mut self) -> Option { + self.next.next().await + } +} + +impl MockNetwork { + pub fn new() -> Self { MockNetwork { add_connection: Channel::new(), remove_connection: Channel::new(), send: Channel::new(), next: Channel::new(), - addresses, - id, } } @@ -278,7 +271,7 @@ impl Splittable for UnreliableSplittable { } type Address = u32; -type Addresses = HashMap>; +type Addresses = HashMap; type Callers = HashMap; type Connection = UnreliableSplittable; @@ -296,10 +289,10 @@ impl DialerT
for MockDialer { type Connection = Connection; type Error = std::io::Error; - async fn connect(&mut self, addresses: Vec
) -> Result { + async fn connect(&mut self, address: Address) -> Result { let (tx, rx) = oneshot::channel(); self.channel_connect - .unbounded_send((self.own_address, addresses[0], tx)) + .unbounded_send((self.own_address, address, tx)) .expect("should send"); Ok(rx.await.expect("should receive")) } @@ -336,13 +329,13 @@ impl UnreliableConnectionMaker { .clone() .into_iter() .zip(0..ids.len()) - .map(|(id, u)| (id, vec![u as u32])) + .map(|(id, u)| (id, u as u32)) .collect(); // create callers for every peer, keep channels for communicating with them for id in ids.into_iter() { let (tx_listener, rx_listener) = mpsc::unbounded(); let dialer = MockDialer { - own_address: addr.get(&id).expect("should be there")[0], + own_address: *addr.get(&id).expect("should be there"), channel_connect: tx_dialer.clone(), }; let listener = MockListener { diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index fae9bbee0b..10b4508ef0 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -14,12 +14,15 @@ use crate::{ network::{ mock::{crypto_basics, MockData, MockEvent, MockNetwork}, setup_io, - testing::{DataInSession, DiscoveryMessage, SessionHandler, VersionedAuthentication}, - ConnectionManager, ConnectionManagerConfig, DataNetwork, NetworkIdentity, Protocol, - Service as NetworkService, SessionManager, + testing::{ + authentication, legacy_authentication, DataInSession, LegacyDiscoveryMessage, + SessionHandler, VersionedAuthentication, + }, + AddressingInformation, ConnectionManager, ConnectionManagerConfig, DataNetwork, + NetworkIdentity, Protocol, Service as NetworkService, SessionManager, }, testing::mocks::validator_network::{ - random_identity_with_address, MockMultiaddress, MockNetwork as MockValidatorNetwork, + random_address_from, MockAddressingInformation, MockNetwork as MockValidatorNetwork, }, validator_network::mock::{key, MockPublicKey}, MillisecsPerBlock, NodeIndex, Recipient, SessionId, SessionPeriod, @@ -33,7 +36,7 @@ const NODES_N: usize = 3; #[derive(Clone)] struct Authority { pen: AuthorityPen, - addresses: Vec, + address: MockAddressingInformation, peer_id: MockPublicKey, auth_peer_id: MockPublicKey, } @@ -43,8 +46,8 @@ impl Authority { self.pen.clone() } - fn addresses(&self) -> Vec { - self.addresses.clone() + fn address(&self) -> MockAddressingInformation { + self.address.clone() } fn peer_id(&self) -> MockPublicKey { @@ -58,10 +61,10 @@ impl Authority { impl NetworkIdentity for Authority { type PeerId = MockPublicKey; - type Multiaddress = MockMultiaddress; + type AddressingInformation = MockAddressingInformation; - fn identity(&self) -> (Vec, Self::PeerId) { - (self.addresses.clone(), self.peer_id.clone()) + fn identity(&self) -> Self::AddressingInformation { + self.address.clone() } } @@ -84,12 +87,12 @@ async fn prepare_one_session_test_data() -> TestData { let (authority_pens, authority_verifier) = crypto_basics(NODES_N).await; let mut authorities = Vec::new(); for (index, p) in authority_pens { - let identity = random_identity_with_address(index.0.to_string()); + let address = random_address_from(index.0.to_string()); let auth_peer_id = key().0; authorities.push(Authority { pen: p, - addresses: identity.0, - peer_id: identity.1, + peer_id: address.peer_id(), + address, auth_peer_id, }); } @@ -99,13 +102,12 @@ async fn prepare_one_session_test_data() -> TestData { let (network_manager_exit_tx, network_manager_exit_rx) = oneshot::channel(); let (network_service_exit_tx, network_service_exit_rx) = oneshot::channel(); let network = MockNetwork::new(event_stream_tx); - let validator_network = - MockValidatorNetwork::from(authorities[0].addresses(), authorities[0].peer_id()); + let validator_network = MockValidatorNetwork::new(); let (connection_io, network_io, session_io) = setup_io(validator_network.clone()); let connection_manager = ConnectionManager::new( - validator_network.clone(), + authorities[0].clone(), ConnectionManagerConfig::with_session_period(&SESSION_PERIOD, &MILLISECS_PER_BLOCK), ); @@ -184,32 +186,40 @@ impl TestData { &self, node_id: usize, session_id: u32, - ) -> SessionHandler { + ) -> SessionHandler { SessionHandler::new( Some((NodeIndex(node_id), self.authorities[node_id].pen())), self.authority_verifier.clone(), SessionId(session_id), - self.authorities[node_id].addresses().to_vec(), + self.authorities[node_id].address(), ) .await - .unwrap() } async fn check_add_connection(&mut self) { let mut reserved_addresses = HashSet::new(); for _ in self.authorities.iter().skip(1) { - let (_, addresses) = self + let (_, address) = self .validator_network .add_connection .next() .await .expect("Should add reserved nodes"); - reserved_addresses.extend(addresses.into_iter()); + reserved_addresses.insert(address); + // Gotta repeat this, because we are adding every address twice, due to legacy + // authentications. + let (_, address) = self + .validator_network + .add_connection + .next() + .await + .expect("Should add reserved nodes"); + reserved_addresses.insert(address); } let mut expected_addresses = HashSet::new(); for authority in self.authorities.iter().skip(1) { - expected_addresses.extend(authority.addresses()); + expected_addresses.insert(authority.address()); } assert_eq!(reserved_addresses, expected_addresses); @@ -221,14 +231,14 @@ impl TestData { self.connect_identity_to_network(authority.auth_peer_id(), Protocol::Authentication); - self.network.emit_event(MockEvent::Messages(vec![( - Protocol::Authentication, - VersionedAuthentication::V1(DiscoveryMessage::AuthenticationBroadcast( - handler.authentication().unwrap(), - )) - .encode() - .into(), - )])); + for versioned_authentication in + Vec::>::from(handler.authentication().unwrap()) + { + self.network.emit_event(MockEvent::Messages(vec![( + Protocol::Authentication, + versioned_authentication.encode().into(), + )])); + } } } @@ -243,7 +253,7 @@ impl TestData { async fn next_sent_auth( &mut self, ) -> Option<( - VersionedAuthentication, + VersionedAuthentication, MockPublicKey, Protocol, )> { @@ -252,9 +262,10 @@ impl TestData { Some((data, peer_id, protocol)) => { if protocol == Protocol::Authentication { return Some(( - VersionedAuthentication::::decode( - &mut data.as_slice(), - ) + VersionedAuthentication::< + MockAddressingInformation, + MockAddressingInformation, + >::decode(&mut data.as_slice()) .expect("should decode"), peer_id, protocol, @@ -289,12 +300,18 @@ async fn test_sends_discovery_message() { for _ in 0..4 { match test_data.next_sent_auth().await { Some(( - VersionedAuthentication::V1(DiscoveryMessage::AuthenticationBroadcast(auth_data)), + VersionedAuthentication::V1(LegacyDiscoveryMessage::AuthenticationBroadcast( + authentication, + )), peer_id, _, )) => { assert_eq!(peer_id, connected_peer_id); - assert_eq!(auth_data, handler.authentication().unwrap()); + assert_eq!(authentication, legacy_authentication(&handler)); + } + Some((VersionedAuthentication::V2(new_authentication), peer_id, _)) => { + assert_eq!(peer_id, connected_peer_id); + assert_eq!(new_authentication, authentication(&handler)); } None => panic!("Not sending authentications"), _ => panic!("Should broadcast own authentication, nothing else"), @@ -321,48 +338,67 @@ async fn test_forwards_authentication_broadcast() { test_data.connect_identity_to_network(authority.auth_peer_id(), Protocol::Authentication); } - test_data.network.emit_event(MockEvent::Messages(vec![( - Protocol::Authentication, - VersionedAuthentication::V1(DiscoveryMessage::AuthenticationBroadcast( - sending_peer_handler.authentication().unwrap(), - )) - .encode() - .into(), - )])); + for versioned_authentication in + Vec::>::from(sending_peer_handler.authentication().unwrap()) + { + test_data.network.emit_event(MockEvent::Messages(vec![( + Protocol::Authentication, + versioned_authentication.encode().into(), + )])); + } - assert_eq!( - test_data - .validator_network - .add_connection - .next() - .await - .expect("Should add reserved nodes"), - (sending_peer.peer_id(), sending_peer.addresses()), - ); + for _ in 0..2 { + // Since we send the legacy auth and both are correct this should happen twice. + assert_eq!( + test_data + .validator_network + .add_connection + .next() + .await + .expect("Should add reserved nodes"), + (sending_peer.peer_id(), sending_peer.address()), + ); + } let mut expected_authentication = HashMap::new(); + let mut expected_legacy_authentication = HashMap::new(); for authority in test_data.authorities.iter().skip(1) { expected_authentication.insert( authority.auth_peer_id(), - sending_peer_handler.authentication().unwrap(), + authentication(&sending_peer_handler), + ); + expected_legacy_authentication.insert( + authority.auth_peer_id(), + legacy_authentication(&sending_peer_handler), ); } let mut sent_authentication = HashMap::new(); - while sent_authentication.len() < NODES_N - 1 { - if let Some(( - VersionedAuthentication::V1(DiscoveryMessage::AuthenticationBroadcast(auth_data)), - peer_id, - _, - )) = test_data.next_sent_auth().await - { - if auth_data != handler.authentication().unwrap() { - sent_authentication.insert(peer_id, auth_data); + let mut sent_legacy_authentication = HashMap::new(); + while sent_authentication.len() < NODES_N - 1 || sent_legacy_authentication.len() < NODES_N - 1 + { + match test_data.next_sent_auth().await { + Some(( + VersionedAuthentication::V1(LegacyDiscoveryMessage::AuthenticationBroadcast(auth)), + peer_id, + _, + )) => { + if auth != legacy_authentication(&handler) { + sent_legacy_authentication.insert(peer_id, auth); + } + } + Some((VersionedAuthentication::V2(auth), peer_id, _)) => { + if auth != authentication(&handler) { + sent_authentication.insert(peer_id, auth); + } } + None => panic!("not enough authentications sent"), + _ => (), } } assert_eq!(sent_authentication, expected_authentication); + assert_eq!(sent_legacy_authentication, expected_legacy_authentication); test_data.cleanup().await; assert_eq!( diff --git a/finality-aleph/src/validator_network/manager/direction.rs b/finality-aleph/src/validator_network/manager/direction.rs index 3197b1a12d..de69416ba6 100644 --- a/finality-aleph/src/validator_network/manager/direction.rs +++ b/finality-aleph/src/validator_network/manager/direction.rs @@ -9,7 +9,7 @@ use crate::validator_network::{Data, PublicKey}; /// case also keeps the peers' addresses. pub struct DirectedPeers { own_id: PK, - outgoing: HashMap>, + outgoing: HashMap, incoming: HashSet, } @@ -38,24 +38,24 @@ impl DirectedPeers { } /// Add a peer to the list of peers we want to stay connected to, or - /// update the list of addresses if the peer was already added. + /// update the address if the peer was already added. /// Returns whether we should start attempts at connecting with the peer, which is the case /// exactly when the peer is one with which we should attempt connections AND it was added for /// the first time. - pub fn add_peer(&mut self, peer_id: PK, addresses: Vec) -> bool { + pub fn add_peer(&mut self, peer_id: PK, address: A) -> bool { match should_we_call(self.own_id.as_ref(), peer_id.as_ref()) { - true => self.outgoing.insert(peer_id, addresses).is_none(), + true => self.outgoing.insert(peer_id, address).is_none(), false => { - // We discard the addresses here, as we will never want to call this peer anyway, - // so we don't need them. + // We discard the address here, as we will never want to call this peer anyway, + // so we don't need it. self.incoming.insert(peer_id); false } } } - /// Return the addresses of the given peer, or None if we shouldn't attempt connecting with the peer. - pub fn peer_addresses(&self, peer_id: &PK) -> Option> { + /// Return the address of the given peer, or None if we shouldn't attempt connecting with the peer. + pub fn peer_address(&self, peer_id: &PK) -> Option { self.outgoing.get(peer_id).cloned() } @@ -95,31 +95,27 @@ mod tests { (container, id) } - fn some_addresses() -> Vec
{ - vec![ - String::from(""), - String::from("a/b/c"), - String::from("43.43.43.43:43000"), - ] + fn some_address() -> Address { + String::from("43.43.43.43:43000") } #[test] fn exactly_one_direction_attempts_connections() { let (mut container0, id0) = container_with_id(); let (mut container1, id1) = container_with_id(); - let addresses = some_addresses(); - assert!(container0.add_peer(id1, addresses.clone()) != container1.add_peer(id0, addresses)); + let address = some_address(); + assert!(container0.add_peer(id1, address.clone()) != container1.add_peer(id0, address)); } fn container_with_added_connecting_peer( ) -> (DirectedPeers, MockPublicKey) { let (mut container0, id0) = container_with_id(); let (mut container1, id1) = container_with_id(); - let addresses = some_addresses(); - match container0.add_peer(id1.clone(), addresses.clone()) { + let address = some_address(); + match container0.add_peer(id1.clone(), address.clone()) { true => (container0, id1), false => { - container1.add_peer(id0.clone(), addresses); + container1.add_peer(id0.clone(), address); (container1, id0) } } @@ -129,11 +125,11 @@ mod tests { ) -> (DirectedPeers, MockPublicKey) { let (mut container0, id0) = container_with_id(); let (mut container1, id1) = container_with_id(); - let addresses = some_addresses(); - match container0.add_peer(id1.clone(), addresses.clone()) { + let address = some_address(); + match container0.add_peer(id1.clone(), address.clone()) { false => (container0, id1), true => { - container1.add_peer(id0.clone(), addresses); + container1.add_peer(id0.clone(), address); (container1, id0) } } @@ -142,20 +138,20 @@ mod tests { #[test] fn no_connecting_on_subsequent_add() { let (mut container0, id1) = container_with_added_connecting_peer(); - let addresses = some_addresses(); - assert!(!container0.add_peer(id1, addresses)); + let address = some_address(); + assert!(!container0.add_peer(id1, address)); } #[test] - fn peer_addresses_when_connecting() { + fn peer_address_when_connecting() { let (container0, id1) = container_with_added_connecting_peer(); - assert!(container0.peer_addresses(&id1).is_some()); + assert!(container0.peer_address(&id1).is_some()); } #[test] - fn no_peer_addresses_when_nonconnecting() { + fn no_peer_address_when_nonconnecting() { let (container0, id1) = container_with_added_nonconnecting_peer(); - assert!(container0.peer_addresses(&id1).is_none()); + assert!(container0.peer_address(&id1).is_none()); } #[test] diff --git a/finality-aleph/src/validator_network/manager/legacy.rs b/finality-aleph/src/validator_network/manager/legacy.rs index 3162161983..cac850d189 100644 --- a/finality-aleph/src/validator_network/manager/legacy.rs +++ b/finality-aleph/src/validator_network/manager/legacy.rs @@ -16,7 +16,7 @@ use crate::{ /// Network component responsible for holding the list of peers that we /// want to connect to, and managing the established connections. pub struct Manager { - addresses: HashMap>, + addresses: HashMap, outgoing: HashMap>, incoming: HashMap>, } @@ -146,15 +146,15 @@ impl Manager { } /// Add a peer to the list of peers we want to stay connected to, or - /// update the list of addresses if the peer was already added. + /// update the address if the peer was already added. /// Returns whether this peer is a new peer. - pub fn add_peer(&mut self, peer_id: PK, addresses: Vec) -> bool { - self.addresses.insert(peer_id, addresses).is_none() + pub fn add_peer(&mut self, peer_id: PK, address: A) -> bool { + self.addresses.insert(peer_id, address).is_none() } - /// Return Option containing addresses of the given peer, or None if + /// Return Option containing the address of the given peer, or None if /// the peer is unknown. - pub fn peer_addresses(&self, peer_id: &PK) -> Option> { + pub fn peer_address(&self, peer_id: &PK) -> Option { self.addresses.get(peer_id).cloned() } @@ -228,23 +228,19 @@ mod tests { let mut manager = Manager::::new(); let (peer_id, _) = key(); let (peer_id_b, _) = key(); - let addresses = vec![ - String::from(""), - String::from("a/b/c"), - String::from("43.43.43.43:43000"), - ]; + let address = String::from("43.43.43.43:43000"); // add new peer - returns true - assert!(manager.add_peer(peer_id.clone(), addresses.clone())); + assert!(manager.add_peer(peer_id.clone(), address.clone())); // add known peer - returns false - assert!(!manager.add_peer(peer_id.clone(), addresses.clone())); + assert!(!manager.add_peer(peer_id.clone(), address.clone())); // get address - assert_eq!(manager.peer_addresses(&peer_id), Some(addresses)); + assert_eq!(manager.peer_address(&peer_id), Some(address)); // try to get address of an unknown peer - assert_eq!(manager.peer_addresses(&peer_id_b), None); + assert_eq!(manager.peer_address(&peer_id_b), None); // remove peer manager.remove_peer(&peer_id); // try to get address of removed peer - assert_eq!(manager.peer_addresses(&peer_id), None); + assert_eq!(manager.peer_address(&peer_id), None); } #[tokio::test] @@ -252,11 +248,7 @@ mod tests { let mut manager = Manager::::new(); let data = String::from("DATA"); let (peer_id, _) = key(); - let addresses = vec![ - String::from(""), - String::from("a/b/c"), - String::from("43.43.43.43:43000"), - ]; + let address = String::from("43.43.43.43:43000"); let (tx, _rx) = mpsc::unbounded(); // try add unknown peer manager.add_outgoing(peer_id.clone(), tx); @@ -266,7 +258,7 @@ mod tests { Err(SendError::PeerNotFound) ); // add peer, this time for real - assert!(manager.add_peer(peer_id.clone(), addresses.clone())); + assert!(manager.add_peer(peer_id.clone(), address.clone())); let (tx, mut rx) = mpsc::unbounded(); assert_eq!(manager.add_outgoing(peer_id.clone(), tx), Added); // send and receive @@ -282,18 +274,14 @@ mod tests { fn incoming() { let mut manager = Manager::::new(); let (peer_id, _) = key(); - let addresses = vec![ - String::from(""), - String::from("a/b/c"), - String::from("43.43.43.43:43000"), - ]; + let address = String::from("43.43.43.43:43000"); let (tx, mut rx) = mpsc::unbounded(); // try add unknown peer assert_eq!(manager.add_incoming(peer_id.clone(), tx), Uninterested); // rx should fail assert!(rx.try_next().expect("channel should be closed").is_none()); // add peer, this time for real - assert!(manager.add_peer(peer_id.clone(), addresses)); + assert!(manager.add_peer(peer_id.clone(), address)); let (tx, mut rx) = mpsc::unbounded(); // should just add assert_eq!(manager.add_incoming(peer_id.clone(), tx), Added); diff --git a/finality-aleph/src/validator_network/manager/mod.rs b/finality-aleph/src/validator_network/manager/mod.rs index 4949f4687b..2a5d094bd3 100644 --- a/finality-aleph/src/validator_network/manager/mod.rs +++ b/finality-aleph/src/validator_network/manager/mod.rs @@ -184,17 +184,17 @@ impl Manager { } /// Add a peer to the list of peers we want to stay connected to, or - /// update the list of addresses if the peer was already added. + /// update the address if the peer was already added. /// Returns whether we should start attempts at connecting with the peer, which depends on the /// coorddinated pseudorandom decision on the direction of the connection and whether this was /// added for the first time. - pub fn add_peer(&mut self, peer_id: PK, addresses: Vec) -> bool { - self.wanted.add_peer(peer_id, addresses) + pub fn add_peer(&mut self, peer_id: PK, address: A) -> bool { + self.wanted.add_peer(peer_id, address) } - /// Return the addresses of the given peer, or None if we shouldn't attempt connecting with the peer. - pub fn peer_addresses(&self, peer_id: &PK) -> Option> { - self.wanted.peer_addresses(peer_id) + /// Return the address of the given peer, or None if we shouldn't attempt connecting with the peer. + pub fn peer_address(&self, peer_id: &PK) -> Option { + self.wanted.peer_address(peer_id) } /// Add an established connection with a known peer, but only if the peer is among the peers we want to be connected to. @@ -253,26 +253,22 @@ mod tests { let mut manager = Manager::::new(own_id); let (peer_id, _) = key(); let (peer_id_b, _) = key(); - let addresses = vec![ - String::from(""), - String::from("a/b/c"), - String::from("43.43.43.43:43000"), - ]; + let address = String::from("43.43.43.43:43000"); // add new peer - might return either true or false, depending on the ids - let attempting_connections = manager.add_peer(peer_id.clone(), addresses.clone()); + let attempting_connections = manager.add_peer(peer_id.clone(), address.clone()); // add known peer - always returns false - assert!(!manager.add_peer(peer_id.clone(), addresses.clone())); + assert!(!manager.add_peer(peer_id.clone(), address.clone())); // get address match attempting_connections { - true => assert_eq!(manager.peer_addresses(&peer_id), Some(addresses)), - false => assert_eq!(manager.peer_addresses(&peer_id), None), + true => assert_eq!(manager.peer_address(&peer_id), Some(address)), + false => assert_eq!(manager.peer_address(&peer_id), None), } // try to get address of an unknown peer - assert_eq!(manager.peer_addresses(&peer_id_b), None); + assert_eq!(manager.peer_address(&peer_id_b), None); // remove peer manager.remove_peer(&peer_id); // try to get address of removed peer - assert_eq!(manager.peer_addresses(&peer_id), None); + assert_eq!(manager.peer_address(&peer_id), None); // remove again manager.remove_peer(&peer_id); // remove unknown peer @@ -288,11 +284,7 @@ mod tests { let mut listening_manager = Manager::::new(listening_id.clone()); let data = String::from("DATA"); - let addresses = vec![ - String::from(""), - String::from("a/b/c"), - String::from("43.43.43.43:43000"), - ]; + let address = String::from("43.43.43.43:43000"); let (tx, _rx) = mpsc::unbounded(); // try add unknown peer assert_eq!( @@ -305,14 +297,14 @@ mod tests { Err(SendError::PeerNotFound) ); // add peer, this time for real - if connecting_manager.add_peer(listening_id.clone(), addresses.clone()) { - assert!(!listening_manager.add_peer(connecting_id.clone(), addresses.clone())) + if connecting_manager.add_peer(listening_id.clone(), address.clone()) { + assert!(!listening_manager.add_peer(connecting_id.clone(), address.clone())) } else { // We need to switch the names around, because the connection was randomly the // other way around. std::mem::swap(&mut connecting_id, &mut listening_id); std::mem::swap(&mut connecting_manager, &mut listening_manager); - assert!(connecting_manager.add_peer(listening_id.clone(), addresses.clone())); + assert!(connecting_manager.add_peer(listening_id.clone(), address.clone())); } // add outgoing to connecting let (tx, mut rx) = mpsc::unbounded(); diff --git a/finality-aleph/src/validator_network/mod.rs b/finality-aleph/src/validator_network/mod.rs index 5a35646ec0..9880754996 100644 --- a/finality-aleph/src/validator_network/mod.rs +++ b/finality-aleph/src/validator_network/mod.rs @@ -31,7 +31,7 @@ impl Data for D {} #[async_trait::async_trait] pub trait Network: Send + 'static { /// Add the peer to the set of connected peers. - fn add_connection(&mut self, peer: PK, addresses: Vec); + fn add_connection(&mut self, peer: PK, address: A); /// Remove the peer from the set of connected peers and close the connection. fn remove_connection(&mut self, peer: PK); @@ -67,9 +67,8 @@ pub trait Dialer: Clone + Send + 'static { type Connection: Splittable; type Error: Display + Send; - /// Attempt to connect to a peer using the provided addresses. Should work if at least one of - /// the addresses is correct. - async fn connect(&mut self, addresses: Vec) -> Result; + /// Attempt to connect to a peer using the provided addressing information. + async fn connect(&mut self, address: A) -> Result; } /// Accepts new connections. Usually will be created listening on a specific interface and this is diff --git a/finality-aleph/src/validator_network/outgoing.rs b/finality-aleph/src/validator_network/outgoing.rs index 679b9d0b9b..7091e15a7c 100644 --- a/finality-aleph/src/validator_network/outgoing.rs +++ b/finality-aleph/src/validator_network/outgoing.rs @@ -40,15 +40,12 @@ async fn manage_outgoing>( secret_key: SK, public_key: SK::PublicKey, mut dialer: ND, - addresses: Vec, + address: A, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), OutgoingError> { debug!(target: "validator-network", "Trying to connect to {}.", public_key); - let stream = dialer - .connect(addresses) - .await - .map_err(OutgoingError::Dial)?; + let stream = dialer.connect(address).await.map_err(OutgoingError::Dial)?; let peer_address_info = stream.peer_address_info(); debug!(target: "validator-network", "Performing outgoing protocol negotiation."); let (stream, protocol) = protocol(stream) @@ -76,7 +73,7 @@ pub async fn outgoing>( secret_key: SK, public_key: SK::PublicKey, dialer: ND, - addresses: Vec, + address: A, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) { @@ -84,13 +81,13 @@ pub async fn outgoing>( secret_key, public_key.clone(), dialer, - addresses.clone(), + address.clone(), result_for_parent.clone(), data_for_user, ) .await { - info!(target: "validator-network", "Outgoing connection to {} {:?} failed: {}, will retry after {}s.", public_key, addresses, e, RETRY_DELAY.as_secs()); + info!(target: "validator-network", "Outgoing connection to {} {:?} failed: {}, will retry after {}s.", public_key, address, e, RETRY_DELAY.as_secs()); sleep(RETRY_DELAY).await; // we send the "new" connection type, because we always assume it's new until proven // otherwise, and here we did not even get the chance to attempt negotiating a protocol diff --git a/finality-aleph/src/validator_network/service.rs b/finality-aleph/src/validator_network/service.rs index c1c2f4a762..320e78c62d 100644 --- a/finality-aleph/src/validator_network/service.rs +++ b/finality-aleph/src/validator_network/service.rs @@ -20,7 +20,7 @@ use crate::{ }; enum ServiceCommand { - AddConnection(PK, Vec), + AddConnection(PK, A), DelConnection(PK), SendData(D, PK), } @@ -33,10 +33,10 @@ struct ServiceInterface { #[async_trait::async_trait] impl Network for ServiceInterface { /// Add the peer to the set of connected peers. - fn add_connection(&mut self, peer: PK, addresses: Vec) { + fn add_connection(&mut self, peer: PK, address: A) { if self .commands_for_service - .unbounded_send(ServiceCommand::AddConnection(peer, addresses)) + .unbounded_send(ServiceCommand::AddConnection(peer, address)) .is_err() { info!(target: "validator-network", "Service is dead."); @@ -126,7 +126,7 @@ where fn spawn_new_outgoing( &self, public_key: SK::PublicKey, - addresses: Vec, + address: A, result_for_parent: mpsc::UnboundedSender>, ) { let secret_key = self.secret_key.clone(); @@ -138,7 +138,7 @@ where secret_key, public_key, dialer, - addresses, + address, result_for_parent, next_to_interface, ) @@ -159,10 +159,10 @@ where }); } - fn peer_addresses(&self, public_key: &SK::PublicKey) -> Option> { + fn peer_address(&self, public_key: &SK::PublicKey) -> Option { match self.legacy_connected.contains(public_key) { - true => self.legacy_manager.peer_addresses(public_key), - false => self.manager.peer_addresses(public_key), + true => self.legacy_manager.peer_address(public_key), + false => self.manager.peer_address(public_key), } } @@ -200,8 +200,8 @@ where fn unmark_legacy(&mut self, public_key: &SK::PublicKey) { self.legacy_connected.remove(public_key); // Put it back if we still want to be connected. - if let Some(addresses) = self.legacy_manager.peer_addresses(public_key) { - self.manager.add_peer(public_key.clone(), addresses); + if let Some(address) = self.legacy_manager.peer_address(public_key) { + self.manager.add_peer(public_key.clone(), address); } } @@ -241,17 +241,17 @@ where }, // got a new command from the interface Some(command) = self.commands_from_interface.next() => match command { - // register new peer in manager or update its list of addresses if already there + // register new peer in manager or update its address if already there // spawn a worker managing outgoing connection if the peer was not known - AddConnection(public_key, addresses) => { + AddConnection(public_key, address) => { // we add all the peers to the legacy manager so we don't lose the - // addresses, but only care about its opinion when it turns out we have to + // address, but only care about its opinion when it turns out we have to // in particular the first time we add a peer we never know whether it // requires legacy connecting, so we only attempt to connect to it if the // new criterion is satisfied, otherwise we wait for it to connect to us - self.legacy_manager.add_peer(public_key.clone(), addresses.clone()); - if self.manager.add_peer(public_key.clone(), addresses.clone()) { - self.spawn_new_outgoing(public_key, addresses, result_for_parent.clone()); + self.legacy_manager.add_peer(public_key.clone(), address.clone()); + if self.manager.add_peer(public_key.clone(), address.clone()) { + self.spawn_new_outgoing(public_key, address, result_for_parent.clone()); }; }, // remove the peer from the manager all workers will be killed automatically, due to closed channels @@ -278,8 +278,8 @@ where // check if we still want to be connected to the peer, and if so, spawn a new worker or actually add proper connection Some((public_key, maybe_data_for_network, connection_type)) = worker_results.next() => { if self.check_for_legacy(&public_key, connection_type) { - match self.legacy_manager.peer_addresses(&public_key) { - Some(addresses) => self.spawn_new_outgoing(public_key.clone(), addresses, result_for_parent.clone()), + match self.legacy_manager.peer_address(&public_key) { + Some(address) => self.spawn_new_outgoing(public_key.clone(), address, result_for_parent.clone()), None => { // We received a result from a worker we are no longer interested // in. @@ -294,8 +294,8 @@ where Added => info!(target: "validator-network", "New connection with peer {}.", public_key), Replaced => info!(target: "validator-network", "Replaced connection with peer {}.", public_key), }, - None => if let Some(addresses) = self.peer_addresses(&public_key) { - self.spawn_new_outgoing(public_key, addresses, result_for_parent.clone()); + None => if let Some(address) = self.peer_address(&public_key) { + self.spawn_new_outgoing(public_key, address, result_for_parent.clone()); } } }, From db73a35c6dd98d17fddeb4ccc15a2db5c1625f63 Mon Sep 17 00:00:00 2001 From: timorl Date: Wed, 7 Dec 2022 17:06:55 +0100 Subject: [PATCH 2/3] Clippy being actually helpful --- finality-aleph/src/network/io.rs | 12 ++--- finality-aleph/src/network/manager/service.rs | 44 +++++++------------ 2 files changed, 22 insertions(+), 34 deletions(-) diff --git a/finality-aleph/src/network/io.rs b/finality-aleph/src/network/io.rs index 497d16cab6..32c890fb23 100644 --- a/finality-aleph/src/network/io.rs +++ b/finality-aleph/src/network/io.rs @@ -13,6 +13,12 @@ use crate::{ type AuthenticationNetworkIO = NetworkIO>; +type FullIO = ( + ConnectionManagerIO, + AuthenticationNetworkIO, + SessionManagerIO, +); + pub fn setup< D: Data, M: Data + Debug, @@ -20,11 +26,7 @@ pub fn setup< VN: ValidatorNetwork>, >( validator_network: VN, -) -> ( - ConnectionManagerIO, - AuthenticationNetworkIO, - SessionManagerIO, -) +) -> FullIO where A::PeerId: PublicKey, { diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/manager/service.rs index 4c9f79d508..c641abfd85 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/manager/service.rs @@ -17,9 +17,8 @@ use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, network::{ manager::{ - compatibility::{LegacyDiscoveryMessage, PeerAuthentications}, - Connections, DataInSession, Discovery, DiscoveryMessage, SessionHandler, - SessionHandlerError, VersionedAuthentication, + compatibility::PeerAuthentications, Connections, DataInSession, Discovery, + DiscoveryMessage, SessionHandler, SessionHandlerError, VersionedAuthentication, }, AddressedData, AddressingInformation, ConnectionCommand, Data, NetworkIdentity, PeerId, }, @@ -354,7 +353,8 @@ where let session = match self.sessions.get_mut(&pre_session.session_id) { Some(session) => session, None => { - return Ok(self.start_nonvalidator_session(pre_session, address).await); + self.start_nonvalidator_session(pre_session, address).await; + return Ok(()); } }; session @@ -655,30 +655,16 @@ where self.validator_network.send(to_send.0, to_send.1) } - fn send_authentication(&self, to_send: PeerAuthentications) -> Result<(), Error> { - use PeerAuthentications::*; - match to_send { - NewOnly(authentication) => self - .authentications_for_network - .unbounded_send(VersionedAuthentication::V2(authentication)) - .map_err(|_| Error::NetworkSend), - LegacyOnly(legacy_authentication) => self - .authentications_for_network - .unbounded_send(VersionedAuthentication::V1( - LegacyDiscoveryMessage::AuthenticationBroadcast(legacy_authentication), - )) - .map_err(|_| Error::NetworkSend), - Both(authentication, legacy_authentication) => { - self.authentications_for_network - .unbounded_send(VersionedAuthentication::V2(authentication)) - .map_err(|_| Error::NetworkSend)?; - self.authentications_for_network - .unbounded_send(VersionedAuthentication::V1( - LegacyDiscoveryMessage::AuthenticationBroadcast(legacy_authentication), - )) - .map_err(|_| Error::NetworkSend) - } + fn send_authentications( + &self, + to_send: Vec>, + ) -> Result<(), Error> { + for auth in to_send { + self.authentications_for_network + .unbounded_send(auth) + .map_err(|_| Error::NetworkSend)?; } + Ok(()) } fn handle_connection_command(&mut self, connection_command: ConnectionCommand) { @@ -708,7 +694,7 @@ where self.handle_connection_command(command); } if let Some(message) = maybe_message { - self.send_authentication(message)?; + self.send_authentications(message.into())?; } Ok(()) } @@ -778,7 +764,7 @@ where Err(e) => warn!(target: "aleph-network", "Retry failed to update handler: {:?}", e), } for to_send in service.discovery() { - self.send_authentication(to_send.into())?; + self.send_authentications(to_send.into())?; } }, _ = status_ticker.tick() => { From b365f6fc1bc5fdea1f8e13d278f2b77366093928 Mon Sep 17 00:00:00 2001 From: timorl Date: Thu, 8 Dec 2022 11:52:02 +0100 Subject: [PATCH 3/3] Demariofy --- finality-aleph/src/network/manager/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/manager/service.rs index c641abfd85..14393aed5b 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/manager/service.rs @@ -597,7 +597,7 @@ where } } -/// Input/output interface for the connectiona manager service. +/// Input/output interface for the connection manager service. pub struct IO< D: Data, M: Data,