diff --git a/Cargo.lock b/Cargo.lock index 5d624605b6e9..38b6cc561915 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6299,6 +6299,7 @@ dependencies = [ "assert_matches", "async-trait", "bytes", + "fatality", "futures", "futures-timer", "parity-scale-codec", @@ -6314,6 +6315,7 @@ dependencies = [ "sp-consensus", "sp-core", "sp-keyring", + "thiserror", "tracing-gum", ] diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index 900fd5339dcb..b839a120bef9 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -32,7 +32,7 @@ use polkadot_node_primitives::approval::{ use polkadot_node_subsystem::{ messages::{ ApprovalCheckResult, ApprovalDistributionMessage, ApprovalVotingMessage, - AssignmentCheckResult, NetworkBridgeEvent, NetworkBridgeMessage, + AssignmentCheckResult, NetworkBridgeEvent, NetworkBridgeTxMessage, }, overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, }; @@ -938,7 +938,7 @@ impl State { "Sending an assignment to peers", ); - ctx.send_message(NetworkBridgeMessage::SendValidationMessage( + ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( peers, Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(assignments), @@ -1200,7 +1200,7 @@ impl State { "Sending an approval to peers", ); - ctx.send_message(NetworkBridgeMessage::SendValidationMessage( + ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( peers, Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(approvals), @@ -1328,7 +1328,7 @@ impl State { ); sender - .send_message(NetworkBridgeMessage::SendValidationMessage( + .send_message(NetworkBridgeTxMessage::SendValidationMessage( vec![peer_id.clone()], Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Assignments(assignments_to_send), @@ -1346,7 +1346,7 @@ impl State { ); sender - .send_message(NetworkBridgeMessage::SendValidationMessage( + .send_message(NetworkBridgeTxMessage::SendValidationMessage( vec![peer_id], Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( protocol_v1::ApprovalDistributionMessage::Approvals(approvals_to_send), @@ -1549,7 +1549,7 @@ async fn adjust_required_routing_and_propagate( ); let full_req = Requests::PoVFetchingV1(req); - ctx.send_message(NetworkBridgeMessage::SendRequests( + ctx.send_message(NetworkBridgeTxMessage::SendRequests( vec![full_req], IfDisconnected::ImmediateError, )) @@ -195,7 +195,10 @@ mod tests { )) => { tx.send(Ok(Some(make_session_info()))).unwrap(); }, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(mut reqs, _)) => { + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests( + mut reqs, + _, + )) => { let req = assert_matches!( reqs.pop(), Some(Requests::PoVFetchingV1(outgoing)) => {outgoing} diff --git a/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/node/network/availability-distribution/src/requester/fetch_task/mod.rs index ff1e5e6a73f3..89b634a5ce7e 100644 --- a/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -30,7 +30,7 @@ use polkadot_node_network_protocol::request_response::{ use polkadot_node_primitives::ErasureChunk; use polkadot_node_subsystem::{ jaeger, - messages::{AvailabilityStoreMessage, IfDisconnected, NetworkBridgeMessage}, + messages::{AvailabilityStoreMessage, IfDisconnected, NetworkBridgeTxMessage}, overseer, }; use polkadot_primitives::v2::{ @@ -332,8 +332,11 @@ impl RunningTask { self.sender .send(FromFetchTask::Message( - NetworkBridgeMessage::SendRequests(vec![requests], IfDisconnected::ImmediateError) - .into(), + NetworkBridgeTxMessage::SendRequests( + vec![requests], + IfDisconnected::ImmediateError, + ) + .into(), )) .await .map_err(|_| TaskError::ShuttingDown)?; diff --git a/node/network/availability-distribution/src/requester/fetch_task/tests.rs b/node/network/availability-distribution/src/requester/fetch_task/tests.rs index 3030cb877ec7..7bc91c0a4d12 100644 --- a/node/network/availability-distribution/src/requester/fetch_task/tests.rs +++ b/node/network/availability-distribution/src/requester/fetch_task/tests.rs @@ -233,7 +233,7 @@ impl TestRun { ) -> bool { let msg = AllMessages::from(msg); match msg { - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests( + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests( reqs, IfDisconnected::ImmediateError, )) => { diff --git a/node/network/availability-distribution/src/requester/tests.rs b/node/network/availability-distribution/src/requester/tests.rs index 9fd738a04388..bd39e7748ca8 100644 --- a/node/network/availability-distribution/src/requester/tests.rs +++ b/node/network/availability-distribution/src/requester/tests.rs @@ -30,7 +30,7 @@ use sp_core::traits::SpawnNamed; use polkadot_node_subsystem::{ messages::{ AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage, - NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest, + NetworkBridgeTxMessage, RuntimeApiMessage, RuntimeApiRequest, }, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SpawnGlue, }; @@ -85,7 +85,7 @@ fn spawn_virtual_overseer( break } match msg.unwrap() { - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(..)) => {}, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests(..)) => {}, AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryChunk( .., tx, diff --git a/node/network/availability-distribution/src/tests/state.rs b/node/network/availability-distribution/src/tests/state.rs index 140713b9c4aa..c021f1bfb81b 100644 --- a/node/network/availability-distribution/src/tests/state.rs +++ b/node/network/availability-distribution/src/tests/state.rs @@ -42,7 +42,7 @@ use polkadot_node_primitives::ErasureChunk; use polkadot_node_subsystem::{ messages::{ AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage, - NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest, + NetworkBridgeTxMessage, RuntimeApiMessage, RuntimeApiRequest, }, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, LeafStatus, OverseerSignal, }; @@ -214,7 +214,7 @@ impl TestState { gum::trace!(target: LOG_TARGET, remaining_stores, "Stores left to go"); let msg = overseer_recv(&mut rx).await; match msg { - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests( + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests( reqs, IfDisconnected::ImmediateError, )) => { diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 1d50dcfef5cb..92eefeaaeb79 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -50,7 +50,7 @@ use polkadot_node_primitives::{AvailableData, ErasureChunk}; use polkadot_node_subsystem::{ errors::RecoveryError, jaeger, - messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage, NetworkBridgeMessage}, + messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage, NetworkBridgeTxMessage}, overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult, }; @@ -198,7 +198,7 @@ impl RequestFromBackers { ); sender - .send_message(NetworkBridgeMessage::SendRequests( + .send_message(NetworkBridgeTxMessage::SendRequests( vec![Requests::AvailableDataFetchingV1(req)], IfDisconnected::ImmediateError, )) @@ -356,7 +356,7 @@ impl RequestChunksFromValidators { } sender - .send_message(NetworkBridgeMessage::SendRequests( + .send_message(NetworkBridgeTxMessage::SendRequests( requests, IfDisconnected::ImmediateError, )) diff --git a/node/network/availability-recovery/src/tests.rs b/node/network/availability-recovery/src/tests.rs index 3a19c07e1082..3c16157f4882 100644 --- a/node/network/availability-recovery/src/tests.rs +++ b/node/network/availability-recovery/src/tests.rs @@ -282,8 +282,8 @@ impl TestState { // Receive a request for a chunk. assert_matches!( overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendRequests( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendRequests( requests, IfDisconnected::ImmediateError, ) @@ -331,8 +331,8 @@ impl TestState { // Receive a request for a chunk. assert_matches!( overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendRequests( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendRequests( mut requests, IfDisconnected::ImmediateError, ) diff --git a/node/network/bitfield-distribution/src/lib.rs b/node/network/bitfield-distribution/src/lib.rs index 06bad64911b1..a0f82dc5ed1d 100644 --- a/node/network/bitfield-distribution/src/lib.rs +++ b/node/network/bitfield-distribution/src/lib.rs @@ -281,7 +281,7 @@ async fn modify_reputation( ) { gum::trace!(target: LOG_TARGET, ?relay_parent, ?rep, %peer, "reputation change"); - sender.send_message(NetworkBridgeMessage::ReportPeer(peer, rep)).await + sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await } /// Distribute a given valid and signature checked bitfield message. /// @@ -427,7 +427,7 @@ async fn relay_message( ); } else { let _span = span.child("gossip"); - ctx.send_message(NetworkBridgeMessage::SendValidationMessage( + ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( interested_peers, message.into_validation_protocol(), )) @@ -722,7 +722,7 @@ async fn send_tracked_gossip_message( .or_default() .insert(validator.clone()); - ctx.send_message(NetworkBridgeMessage::SendValidationMessage( + ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( vec![dest], message.into_validation_protocol(), )) diff --git a/node/network/bitfield-distribution/src/tests.rs b/node/network/bitfield-distribution/src/tests.rs index 9bd17c542f8f..6509db3ba660 100644 --- a/node/network/bitfield-distribution/src/tests.rs +++ b/node/network/bitfield-distribution/src/tests.rs @@ -227,8 +227,8 @@ fn receive_invalid_signature() { // reputation change due to invalid signature assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_b); assert_eq!(rep, COST_SIGNATURE_INVALID) @@ -288,8 +288,8 @@ fn receive_invalid_validator_index() { // reputation change due to invalid validator index assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_b); assert_eq!(rep, COST_VALIDATOR_INDEX_INVALID) @@ -364,8 +364,8 @@ fn receive_duplicate_messages() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_b); assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST) @@ -383,8 +383,8 @@ fn receive_duplicate_messages() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_a); assert_eq!(rep, BENEFIT_VALID_MESSAGE) @@ -402,8 +402,8 @@ fn receive_duplicate_messages() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_b); assert_eq!(rep, COST_PEER_DUPLICATE_MESSAGE) @@ -484,8 +484,8 @@ fn do_not_relay_message_twice() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage(peers, send_msg), + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendValidationMessage(peers, send_msg), ) => { assert_eq!(2, peers.len()); assert!(peers.contains(&peer_a)); @@ -607,8 +607,8 @@ fn changing_view() { // reputation change for peer B assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_b); assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST) @@ -639,8 +639,8 @@ fn changing_view() { // reputation change for peer B assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_b); assert_eq!(rep, COST_PEER_DUPLICATE_MESSAGE) @@ -671,8 +671,8 @@ fn changing_view() { // reputation change for peer B assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_a); assert_eq!(rep, COST_NOT_IN_VIEW) @@ -745,8 +745,8 @@ fn do_not_send_message_back_to_origin() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage(peers, send_msg), + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendValidationMessage(peers, send_msg), ) => { assert_eq!(1, peers.len()); assert!(peers.contains(&peer_a)); @@ -756,8 +756,8 @@ fn do_not_send_message_back_to_origin() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_b); assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST) @@ -851,8 +851,8 @@ fn topology_test() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage(peers, send_msg), + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendValidationMessage(peers, send_msg), ) => { let topology = state.topologies.get_current_topology(); // It should send message to all peers in y direction and to 4 random peers in x direction @@ -867,8 +867,8 @@ fn topology_test() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peers_x[0]); assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST) diff --git a/node/network/bridge/Cargo.toml b/node/network/bridge/Cargo.toml index b92b152e8b04..40ae9cac7fe2 100644 --- a/node/network/bridge/Cargo.toml +++ b/node/network/bridge/Cargo.toml @@ -19,6 +19,8 @@ polkadot-node-network-protocol = { path = "../protocol" } polkadot-node-subsystem-util = { path = "../../subsystem-util"} parking_lot = "0.12.0" bytes = "1" +fatality = "0.0.6" +thiserror = "1" [dev-dependencies] assert_matches = "1.4.0" diff --git a/node/network/bridge/src/errors.rs b/node/network/bridge/src/errors.rs new file mode 100644 index 000000000000..43ba61a58902 --- /dev/null +++ b/node/network/bridge/src/errors.rs @@ -0,0 +1,20 @@ +use polkadot_node_subsystem::SubsystemError; +pub(crate) use polkadot_overseer::OverseerError; + +#[fatality::fatality(splitable)] +pub(crate) enum Error { + /// Received error from overseer: + #[fatal] + #[error(transparent)] + SubsystemError(#[from] SubsystemError), + /// The stream of incoming events concluded. + #[fatal] + #[error("Event stream closed unexpectedly")] + EventStreamConcluded, +} + +impl From for Error { + fn from(e: OverseerError) -> Self { + Error::SubsystemError(SubsystemError::from(e)) + } +} diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index f10f7fa5b67a..d599ae25d9c1 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -15,48 +15,28 @@ // along with Polkadot. If not, see . //! The Network Bridge Subsystem - protocol multiplexer for Polkadot. +//! +//! Split into incoming (`..In`) and outgoing (`..Out`) subsystems. #![deny(unused_crate_dependencies)] #![warn(missing_docs)] -use always_assert::never; -use bytes::Bytes; -use futures::{prelude::*, stream::BoxStream}; -use parity_scale_codec::{Decode, DecodeAll, Encode}; +use futures::prelude::*; +use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; -use sc_network::Event as NetworkEvent; + use sp_consensus::SyncOracle; use polkadot_node_network_protocol::{ - self as net_protocol, - peer_set::{PeerSet, PerPeerSet}, - v1 as protocol_v1, ObservedRole, OurView, PeerId, ProtocolVersion, - UnifiedReputationChange as Rep, Versioned, View, -}; - -use polkadot_node_subsystem::{ - errors::{SubsystemError, SubsystemResult}, - messages::{ - network_bridge_event::{NewGossipTopology, TopologyPeerInfo}, - ApprovalDistributionMessage, BitfieldDistributionMessage, CollatorProtocolMessage, - GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeMessage, - StatementDistributionMessage, - }, - overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, + peer_set::PeerSet, PeerId, ProtocolVersion, UnifiedReputationChange as Rep, View, }; -use polkadot_overseer::gen::OrchestraError as OverseerError; -use polkadot_primitives::v2::{AuthorityDiscoveryId, BlockNumber, Hash, ValidatorIndex}; /// Peer set info for network initialization. /// /// To be added to [`NetworkConfiguration::extra_sets`]. pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority}; -use std::{ - collections::{hash_map, HashMap}, - iter::ExactSizeIterator, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; mod validator_discovery; @@ -64,34 +44,36 @@ mod validator_discovery; /// /// Defines the `Network` trait with an implementation for an `Arc`. mod network; -use network::{send_message, Network}; - -use crate::network::get_peer_id_by_authority_id; +use self::network::Network; mod metrics; -use self::metrics::Metrics; +pub use self::metrics::Metrics; -#[cfg(test)] -mod tests; +mod errors; +pub(crate) use self::errors::Error; + +mod tx; +pub use self::tx::*; + +mod rx; +pub use self::rx::*; /// The maximum amount of heads a peer is allowed to have in their view at any time. /// /// We use the same limit to compute the view sent to peers locally. -const MAX_VIEW_HEADS: usize = 5; +pub(crate) const MAX_VIEW_HEADS: usize = 5; -const MALFORMED_MESSAGE_COST: Rep = Rep::CostMajor("Malformed Network-bridge message"); -const UNCONNECTED_PEERSET_COST: Rep = Rep::CostMinor("Message sent to un-connected peer-set"); -const MALFORMED_VIEW_COST: Rep = Rep::CostMajor("Malformed view"); -const EMPTY_VIEW_COST: Rep = Rep::CostMajor("Peer sent us an empty view"); - -// network bridge log target -const LOG_TARGET: &'static str = "parachain::network-bridge"; +pub(crate) const MALFORMED_MESSAGE_COST: Rep = Rep::CostMajor("Malformed Network-bridge message"); +pub(crate) const UNCONNECTED_PEERSET_COST: Rep = + Rep::CostMinor("Message sent to un-connected peer-set"); +pub(crate) const MALFORMED_VIEW_COST: Rep = Rep::CostMajor("Malformed view"); +pub(crate) const EMPTY_VIEW_COST: Rep = Rep::CostMajor("Peer sent us an empty view"); /// Messages from and to the network. /// /// As transmitted to and received from subsystems. #[derive(Debug, Encode, Decode, Clone)] -pub enum WireMessage { +pub(crate) enum WireMessage { /// A message from a peer on a specific protocol. #[codec(index = 1)] ProtocolMessage(M), @@ -100,78 +82,16 @@ pub enum WireMessage { ViewUpdate(View), } -/// The network bridge subsystem. -pub struct NetworkBridge { - /// `Network` trait implementing type. - network_service: N, - authority_discovery_service: AD, - sync_oracle: Box, - metrics: Metrics, -} - -impl NetworkBridge { - /// Create a new network bridge subsystem with underlying network service and authority discovery service. - /// - /// This assumes that the network service has had the notifications protocol for the network - /// bridge already registered. See [`peers_sets_info`](peers_sets_info). - pub fn new( - network_service: N, - authority_discovery_service: AD, - sync_oracle: Box, - metrics: Metrics, - ) -> Self { - NetworkBridge { network_service, authority_discovery_service, sync_oracle, metrics } - } -} - -#[overseer::subsystem(NetworkBridge, error = SubsystemError, prefix = self::overseer)] -impl NetworkBridge -where - Net: Network + Sync, - AD: validator_discovery::AuthorityDiscovery + Clone, -{ - fn start(mut self, ctx: Context) -> SpawnedSubsystem { - // The stream of networking events has to be created at initialization, otherwise the - // networking might open connections before the stream of events has been grabbed. - let network_stream = self.network_service.event_stream(); - - // Swallow error because failure is fatal to the node and we log with more precision - // within `run_network`. - let future = run_network(self, ctx, network_stream) - .map_err(|e| SubsystemError::with_origin("network-bridge", e)) - .boxed(); - SpawnedSubsystem { name: "network-bridge-subsystem", future } - } -} - -struct PeerData { +pub(crate) struct PeerData { /// The Latest view sent by the peer. view: View, version: ProtocolVersion, } -#[derive(Debug)] -enum UnexpectedAbort { - /// Received error from overseer: - SubsystemError(SubsystemError), - /// The stream of incoming events concluded. - EventStreamConcluded, -} - -impl From for UnexpectedAbort { - fn from(e: SubsystemError) -> Self { - UnexpectedAbort::SubsystemError(e) - } -} - -impl From for UnexpectedAbort { - fn from(e: OverseerError) -> Self { - UnexpectedAbort::SubsystemError(SubsystemError::from(e)) - } -} +/// Shared state between incoming and outgoing. #[derive(Default, Clone)] -struct Shared(Arc>); +pub(crate) struct Shared(Arc>); #[derive(Default)] struct SharedInner { @@ -180,942 +100,7 @@ struct SharedInner { collation_peers: HashMap, } -enum Mode { +pub(crate) enum Mode { Syncing(Box), Active, } - -#[overseer::contextbounds(NetworkBridge, prefix = self::overseer)] -async fn handle_subsystem_messages( - mut ctx: Context, - mut network_service: N, - mut authority_discovery_service: AD, - shared: Shared, - sync_oracle: Box, - metrics: Metrics, -) -> Result<(), UnexpectedAbort> -where - N: Network, - AD: validator_discovery::AuthorityDiscovery + Clone, -{ - // This is kept sorted, descending, by block number. - let mut live_heads: Vec = Vec::with_capacity(MAX_VIEW_HEADS); - let mut finalized_number = 0; - let mut validator_discovery = validator_discovery::Service::::new(); - - let mut mode = Mode::Syncing(sync_oracle); - - loop { - futures::select! { - msg = ctx.recv().fuse() => match msg { - Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(active_leaves))) => { - let ActiveLeavesUpdate { activated, deactivated } = active_leaves; - gum::trace!( - target: LOG_TARGET, - action = "ActiveLeaves", - has_activated = activated.is_some(), - num_deactivated = %deactivated.len(), - ); - - for activated in activated { - let pos = live_heads - .binary_search_by(|probe| probe.number.cmp(&activated.number).reverse()) - .unwrap_or_else(|i| i); - - live_heads.insert(pos, activated); - } - live_heads.retain(|h| !deactivated.contains(&h.hash)); - - // if we're done syncing, set the mode to `Mode::Active`. - // Otherwise, we don't need to send view updates. - { - let is_done_syncing = match mode { - Mode::Active => true, - Mode::Syncing(ref mut sync_oracle) => !sync_oracle.is_major_syncing(), - }; - - if is_done_syncing { - mode = Mode::Active; - - update_our_view( - &mut network_service, - &mut ctx, - &live_heads, - &shared, - finalized_number, - &metrics, - ); - } - } - } - Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number))) => { - gum::trace!( - target: LOG_TARGET, - action = "BlockFinalized" - ); - - debug_assert!(finalized_number < number); - - // we don't send the view updates here, but delay them until the next `ActiveLeaves` - // otherwise it might break assumptions of some of the subsystems - // that we never send the same `ActiveLeavesUpdate` - finalized_number = number; - } - Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => { - return Ok(()); - } - Ok(FromOrchestra::Communication { msg }) => match msg { - NetworkBridgeMessage::ReportPeer(peer, rep) => { - if !rep.is_benefit() { - gum::debug!( - target: LOG_TARGET, - ?peer, - ?rep, - action = "ReportPeer" - ); - } - - metrics.on_report_event(); - network_service.report_peer(peer, rep); - } - NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => { - gum::trace!( - target: LOG_TARGET, - action = "DisconnectPeer", - ?peer, - peer_set = ?peer_set, - ); - - network_service.disconnect_peer(peer, peer_set); - } - NetworkBridgeMessage::SendValidationMessage(peers, msg) => { - gum::trace!( - target: LOG_TARGET, - action = "SendValidationMessages", - num_messages = 1usize, - ); - - match msg { - Versioned::V1(msg) => send_validation_message_v1( - &mut network_service, - peers, - WireMessage::ProtocolMessage(msg), - &metrics, - ), - } - } - NetworkBridgeMessage::SendValidationMessages(msgs) => { - gum::trace!( - target: LOG_TARGET, - action = "SendValidationMessages", - num_messages = %msgs.len(), - ); - - for (peers, msg) in msgs { - match msg { - Versioned::V1(msg) => send_validation_message_v1( - &mut network_service, - peers, - WireMessage::ProtocolMessage(msg), - &metrics, - ), - } - } - } - NetworkBridgeMessage::SendCollationMessage(peers, msg) => { - gum::trace!( - target: LOG_TARGET, - action = "SendCollationMessages", - num_messages = 1usize, - ); - - match msg { - Versioned::V1(msg) => send_collation_message_v1( - &mut network_service, - peers, - WireMessage::ProtocolMessage(msg), - &metrics, - ), - } - } - NetworkBridgeMessage::SendCollationMessages(msgs) => { - gum::trace!( - target: LOG_TARGET, - action = "SendCollationMessages", - num_messages = %msgs.len(), - ); - - for (peers, msg) in msgs { - match msg { - Versioned::V1(msg) => send_collation_message_v1( - &mut network_service, - peers, - WireMessage::ProtocolMessage(msg), - &metrics, - ), - } - } - } - NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => { - gum::trace!( - target: LOG_TARGET, - action = "SendRequests", - num_requests = %reqs.len(), - ); - - for req in reqs { - network_service - .start_request(&mut authority_discovery_service, req, if_disconnected) - .await; - } - } - NetworkBridgeMessage::ConnectToValidators { - validator_ids, - peer_set, - failed, - } => { - gum::trace!( - target: LOG_TARGET, - action = "ConnectToValidators", - peer_set = ?peer_set, - ids = ?validator_ids, - "Received a validator connection request", - ); - - metrics.note_desired_peer_count(peer_set, validator_ids.len()); - - let (ns, ads) = validator_discovery.on_request( - validator_ids, - peer_set, - failed, - network_service, - authority_discovery_service, - ).await; - - network_service = ns; - authority_discovery_service = ads; - } - NetworkBridgeMessage::ConnectToResolvedValidators { - validator_addrs, - peer_set, - } => { - gum::trace!( - target: LOG_TARGET, - action = "ConnectToPeers", - peer_set = ?peer_set, - ?validator_addrs, - "Received a resolved validator connection request", - ); - - metrics.note_desired_peer_count(peer_set, validator_addrs.len()); - - let all_addrs = validator_addrs.into_iter().flatten().collect(); - network_service = validator_discovery.on_resolved_request( - all_addrs, - peer_set, - network_service, - ).await; - } - NetworkBridgeMessage::NewGossipTopology { - session, - our_neighbors_x, - our_neighbors_y, - } => { - gum::debug!( - target: LOG_TARGET, - action = "NewGossipTopology", - neighbors_x = our_neighbors_x.len(), - neighbors_y = our_neighbors_y.len(), - "Gossip topology has changed", - ); - - let gossip_peers_x = update_gossip_peers_1d( - &mut authority_discovery_service, - our_neighbors_x, - ).await; - - let gossip_peers_y = update_gossip_peers_1d( - &mut authority_discovery_service, - our_neighbors_y, - ).await; - - dispatch_validation_event_to_all_unbounded( - NetworkBridgeEvent::NewGossipTopology( - NewGossipTopology { - session, - our_neighbors_x: gossip_peers_x, - our_neighbors_y: gossip_peers_y, - } - ), - ctx.sender(), - ); - } - } - Err(e) => return Err(e.into()), - }, - } - } -} - -async fn update_gossip_peers_1d( - ads: &mut AD, - neighbors: N, -) -> HashMap -where - AD: validator_discovery::AuthorityDiscovery, - N: IntoIterator, - N::IntoIter: std::iter::ExactSizeIterator, -{ - let neighbors = neighbors.into_iter(); - let mut peers = HashMap::with_capacity(neighbors.len()); - for (authority, validator_index) in neighbors { - let addr = get_peer_id_by_authority_id(ads, authority.clone()).await; - - if let Some(peer_id) = addr { - peers.insert(authority, TopologyPeerInfo { peer_ids: vec![peer_id], validator_index }); - } - } - - peers -} - -async fn handle_network_messages( - mut sender: impl overseer::NetworkBridgeSenderTrait, - mut network_service: impl Network, - network_stream: BoxStream<'static, NetworkEvent>, - mut authority_discovery_service: AD, - metrics: Metrics, - shared: Shared, -) -> Result<(), UnexpectedAbort> { - let mut network_stream = network_stream.fuse(); - loop { - match network_stream.next().await { - None => return Err(UnexpectedAbort::EventStreamConcluded), - Some(NetworkEvent::Dht(_)) | - Some(NetworkEvent::SyncConnected { .. }) | - Some(NetworkEvent::SyncDisconnected { .. }) => {}, - Some(NetworkEvent::NotificationStreamOpened { - remote: peer, - protocol, - role, - negotiated_fallback, - }) => { - let role = ObservedRole::from(role); - let (peer_set, version) = { - let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) { - None => continue, - Some(p) => p, - }; - - if let Some(fallback) = negotiated_fallback { - match PeerSet::try_from_protocol_name(&fallback) { - None => { - gum::debug!( - target: LOG_TARGET, - fallback = &*fallback, - ?peer, - ?peer_set, - "Unknown fallback", - ); - - continue - }, - Some((p2, v2)) => { - if p2 != peer_set { - gum::debug!( - target: LOG_TARGET, - fallback = &*fallback, - fallback_peerset = ?p2, - protocol = &*protocol, - peerset = ?peer_set, - "Fallback mismatched peer-set", - ); - - continue - } - - (p2, v2) - }, - } - } else { - (peer_set, version) - } - }; - - gum::debug!( - target: LOG_TARGET, - action = "PeerConnected", - peer_set = ?peer_set, - version, - peer = ?peer, - role = ?role - ); - - let local_view = { - let mut shared = shared.0.lock(); - let peer_map = match peer_set { - PeerSet::Validation => &mut shared.validation_peers, - PeerSet::Collation => &mut shared.collation_peers, - }; - - match peer_map.entry(peer.clone()) { - hash_map::Entry::Occupied(_) => continue, - hash_map::Entry::Vacant(vacant) => { - vacant.insert(PeerData { view: View::default(), version }); - }, - } - - metrics.on_peer_connected(peer_set, version); - metrics.note_peer_count(peer_set, version, peer_map.len()); - - shared.local_view.clone().unwrap_or(View::default()) - }; - - let maybe_authority = - authority_discovery_service.get_authority_ids_by_peer_id(peer).await; - - match peer_set { - PeerSet::Validation => { - dispatch_validation_events_to_all( - vec![ - NetworkBridgeEvent::PeerConnected( - peer.clone(), - role, - 1, - maybe_authority, - ), - NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()), - ], - &mut sender, - ) - .await; - - send_message( - &mut network_service, - vec![peer], - PeerSet::Validation, - version, - WireMessage::::ViewUpdate(local_view), - &metrics, - ); - }, - PeerSet::Collation => { - dispatch_collation_events_to_all( - vec![ - NetworkBridgeEvent::PeerConnected( - peer.clone(), - role, - 1, - maybe_authority, - ), - NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()), - ], - &mut sender, - ) - .await; - - send_message( - &mut network_service, - vec![peer], - PeerSet::Collation, - version, - WireMessage::::ViewUpdate(local_view), - &metrics, - ); - }, - } - }, - Some(NetworkEvent::NotificationStreamClosed { remote: peer, protocol }) => { - let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) { - None => continue, - Some(peer_set) => peer_set, - }; - - gum::debug!( - target: LOG_TARGET, - action = "PeerDisconnected", - peer_set = ?peer_set, - peer = ?peer - ); - - let was_connected = { - let mut shared = shared.0.lock(); - let peer_map = match peer_set { - PeerSet::Validation => &mut shared.validation_peers, - PeerSet::Collation => &mut shared.collation_peers, - }; - - let w = peer_map.remove(&peer).is_some(); - - metrics.on_peer_disconnected(peer_set, version); - metrics.note_peer_count(peer_set, version, peer_map.len()); - - w - }; - - if was_connected && version == peer_set.get_default_version() { - match peer_set { - PeerSet::Validation => - dispatch_validation_event_to_all( - NetworkBridgeEvent::PeerDisconnected(peer), - &mut sender, - ) - .await, - PeerSet::Collation => - dispatch_collation_event_to_all( - NetworkBridgeEvent::PeerDisconnected(peer), - &mut sender, - ) - .await, - } - } - }, - Some(NetworkEvent::NotificationsReceived { remote, messages }) => { - let expected_versions = { - let mut versions = PerPeerSet::>::default(); - let shared = shared.0.lock(); - if let Some(peer_data) = shared.validation_peers.get(&remote) { - versions[PeerSet::Validation] = Some(peer_data.version); - } - - if let Some(peer_data) = shared.collation_peers.get(&remote) { - versions[PeerSet::Collation] = Some(peer_data.version); - } - - versions - }; - - // non-decoded, but version-checked validation messages. - let v_messages: Result, _> = messages - .iter() - .filter_map(|(protocol, msg_bytes)| { - // version doesn't matter because we always receive on the 'correct' - // protocol name, not the negotiated fallback. - let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?; - if peer_set == PeerSet::Validation { - if expected_versions[PeerSet::Validation].is_none() { - return Some(Err(UNCONNECTED_PEERSET_COST)) - } - - Some(Ok(msg_bytes.clone())) - } else { - None - } - }) - .collect(); - - let v_messages = match v_messages { - Err(rep) => { - gum::debug!(target: LOG_TARGET, action = "ReportPeer"); - network_service.report_peer(remote, rep); - - continue - }, - Ok(v) => v, - }; - - // non-decoded, but version-checked colldation messages. - let c_messages: Result, _> = messages - .iter() - .filter_map(|(protocol, msg_bytes)| { - // version doesn't matter because we always receive on the 'correct' - // protocol name, not the negotiated fallback. - let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?; - - if peer_set == PeerSet::Collation { - if expected_versions[PeerSet::Collation].is_none() { - return Some(Err(UNCONNECTED_PEERSET_COST)) - } - - Some(Ok(msg_bytes.clone())) - } else { - None - } - }) - .collect(); - - let c_messages = match c_messages { - Err(rep) => { - gum::debug!(target: LOG_TARGET, action = "ReportPeer"); - network_service.report_peer(remote, rep); - - continue - }, - Ok(v) => v, - }; - - if v_messages.is_empty() && c_messages.is_empty() { - continue - } - - gum::trace!( - target: LOG_TARGET, - action = "PeerMessages", - peer = ?remote, - num_validation_messages = %v_messages.len(), - num_collation_messages = %c_messages.len() - ); - - if !v_messages.is_empty() { - let (events, reports) = - if expected_versions[PeerSet::Validation] == Some(1) { - handle_v1_peer_messages::( - remote.clone(), - PeerSet::Validation, - &mut shared.0.lock().validation_peers, - v_messages, - &metrics, - ) - } else { - gum::warn!( - target: LOG_TARGET, - version = ?expected_versions[PeerSet::Validation], - "Major logic bug. Peer somehow has unsupported validation protocol version." - ); - - never!("Only version 1 is supported; peer set connection checked above; qed"); - - // If a peer somehow triggers this, we'll disconnect them - // eventually. - (Vec::new(), vec![UNCONNECTED_PEERSET_COST]) - }; - - for report in reports { - network_service.report_peer(remote.clone(), report); - } - - dispatch_validation_events_to_all(events, &mut sender).await; - } - - if !c_messages.is_empty() { - let (events, reports) = - if expected_versions[PeerSet::Collation] == Some(1) { - handle_v1_peer_messages::( - remote.clone(), - PeerSet::Collation, - &mut shared.0.lock().collation_peers, - c_messages, - &metrics, - ) - } else { - gum::warn!( - target: LOG_TARGET, - version = ?expected_versions[PeerSet::Collation], - "Major logic bug. Peer somehow has unsupported collation protocol version." - ); - - never!("Only version 1 is supported; peer set connection checked above; qed"); - - // If a peer somehow triggers this, we'll disconnect them - // eventually. - (Vec::new(), vec![UNCONNECTED_PEERSET_COST]) - }; - - for report in reports { - network_service.report_peer(remote.clone(), report); - } - - dispatch_collation_events_to_all(events, &mut sender).await; - } - }, - } - } -} - -/// Main driver, processing network events and messages from other subsystems. -/// -/// THIS IS A HACK. We need to ensure we never hold the mutex across a `.await` boundary -/// and `parking_lot` currently does not provide `Send`, which helps us enforce that. -/// If this breaks, we need to find another way to protect ourselves. -/// -/// ```compile_fail -/// #use parking_lot::MutexGuard; -/// #fn is_send(); -/// #is_send::(); -/// ``` -#[overseer::contextbounds(NetworkBridge, prefix = self::overseer)] -async fn run_network( - bridge: NetworkBridge, - mut ctx: Context, - network_stream: BoxStream<'static, NetworkEvent>, -) -> SubsystemResult<()> -where - N: Network, - AD: validator_discovery::AuthorityDiscovery + Clone, -{ - let shared = Shared::default(); - - let NetworkBridge { network_service, authority_discovery_service, metrics, sync_oracle } = - bridge; - - let (remote, network_event_handler) = handle_network_messages( - ctx.sender().clone(), - network_service.clone(), - network_stream, - authority_discovery_service.clone(), - metrics.clone(), - shared.clone(), - ) - .remote_handle(); - - ctx.spawn("network-bridge-network-worker", Box::pin(remote))?; - - let subsystem_event_handler = handle_subsystem_messages( - ctx, - network_service, - authority_discovery_service, - shared, - sync_oracle, - metrics, - ); - - futures::pin_mut!(subsystem_event_handler); - - match futures::future::select(subsystem_event_handler, network_event_handler) - .await - .factor_first() - .0 - { - Ok(()) => Ok(()), - Err(UnexpectedAbort::SubsystemError(err)) => { - gum::warn!( - target: LOG_TARGET, - err = ?err, - "Shutting down Network Bridge due to error" - ); - - Err(SubsystemError::Context(format!( - "Received SubsystemError from overseer: {:?}", - err - ))) - }, - Err(UnexpectedAbort::EventStreamConcluded) => { - gum::info!( - target: LOG_TARGET, - "Shutting down Network Bridge: underlying request stream concluded" - ); - Err(SubsystemError::Context("Incoming network event stream concluded.".to_string())) - }, - } -} - -fn construct_view( - live_heads: impl DoubleEndedIterator, - finalized_number: BlockNumber, -) -> View { - View::new(live_heads.take(MAX_VIEW_HEADS), finalized_number) -} - -#[overseer::contextbounds(NetworkBridge, prefix = self::overseer)] -fn update_our_view( - net: &mut Net, - ctx: &mut Context, - live_heads: &[ActivatedLeaf], - shared: &Shared, - finalized_number: BlockNumber, - metrics: &Metrics, -) where - Net: Network, -{ - let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number); - - let (validation_peers, collation_peers) = { - let mut shared = shared.0.lock(); - - // We only want to send a view update when the heads changed. - // A change in finalized block number only is _not_ sufficient. - // - // If this is the first view update since becoming active, but our view is empty, - // there is no need to send anything. - match shared.local_view { - Some(ref v) if v.check_heads_eq(&new_view) => return, - None if live_heads.is_empty() => { - shared.local_view = Some(new_view); - return - }, - _ => { - shared.local_view = Some(new_view.clone()); - }, - } - - ( - shared.validation_peers.keys().cloned().collect::>(), - shared.collation_peers.keys().cloned().collect::>(), - ) - }; - - send_validation_message_v1( - net, - validation_peers, - WireMessage::ViewUpdate(new_view.clone()), - metrics, - ); - - send_collation_message_v1(net, collation_peers, WireMessage::ViewUpdate(new_view), metrics); - - let our_view = OurView::new( - live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| (a.hash, a.span)), - finalized_number, - ); - - dispatch_validation_event_to_all_unbounded( - NetworkBridgeEvent::OurViewChange(our_view.clone()), - ctx.sender(), - ); - - dispatch_collation_event_to_all_unbounded( - NetworkBridgeEvent::OurViewChange(our_view), - ctx.sender(), - ); -} - -// Handle messages on a specific v1 peer-set. The peer is expected to be connected on that -// peer-set. -fn handle_v1_peer_messages>( - peer: PeerId, - peer_set: PeerSet, - peers: &mut HashMap, - messages: Vec, - metrics: &Metrics, -) -> (Vec>, Vec) { - let peer_data = match peers.get_mut(&peer) { - None => return (Vec::new(), vec![UNCONNECTED_PEERSET_COST]), - Some(d) => d, - }; - - let mut outgoing_events = Vec::with_capacity(messages.len()); - let mut reports = Vec::new(); - - for message in messages { - metrics.on_notification_received(peer_set, peer_data.version, message.len()); - let message = match WireMessage::::decode_all(&mut message.as_ref()) { - Err(_) => { - reports.push(MALFORMED_MESSAGE_COST); - continue - }, - Ok(m) => m, - }; - - outgoing_events.push(match message { - WireMessage::ViewUpdate(new_view) => { - if new_view.len() > MAX_VIEW_HEADS || - new_view.finalized_number < peer_data.view.finalized_number - { - reports.push(MALFORMED_VIEW_COST); - continue - } else if new_view.is_empty() { - reports.push(EMPTY_VIEW_COST); - continue - } else if new_view == peer_data.view { - continue - } else { - peer_data.view = new_view; - - NetworkBridgeEvent::PeerViewChange(peer.clone(), peer_data.view.clone()) - } - }, - WireMessage::ProtocolMessage(message) => - NetworkBridgeEvent::PeerMessage(peer.clone(), message.into()), - }) - } - - (outgoing_events, reports) -} - -fn send_validation_message_v1( - net: &mut impl Network, - peers: Vec, - message: WireMessage, - metrics: &Metrics, -) { - send_message(net, peers, PeerSet::Validation, 1, message, metrics); -} - -fn send_collation_message_v1( - net: &mut impl Network, - peers: Vec, - message: WireMessage, - metrics: &Metrics, -) { - send_message(net, peers, PeerSet::Collation, 1, message, metrics) -} - -async fn dispatch_validation_event_to_all( - event: NetworkBridgeEvent, - ctx: &mut impl overseer::NetworkBridgeSenderTrait, -) { - dispatch_validation_events_to_all(std::iter::once(event), ctx).await -} - -async fn dispatch_collation_event_to_all( - event: NetworkBridgeEvent, - ctx: &mut impl overseer::NetworkBridgeSenderTrait, -) { - dispatch_collation_events_to_all(std::iter::once(event), ctx).await -} - -fn dispatch_validation_event_to_all_unbounded( - event: NetworkBridgeEvent, - sender: &mut impl overseer::NetworkBridgeSenderTrait, -) { - event - .focus() - .ok() - .map(StatementDistributionMessage::from) - .and_then(|msg| Some(sender.send_unbounded_message(msg))); - event - .focus() - .ok() - .map(BitfieldDistributionMessage::from) - .and_then(|msg| Some(sender.send_unbounded_message(msg))); - event - .focus() - .ok() - .map(ApprovalDistributionMessage::from) - .and_then(|msg| Some(sender.send_unbounded_message(msg))); - event - .focus() - .ok() - .map(GossipSupportMessage::from) - .and_then(|msg| Some(sender.send_unbounded_message(msg))); -} - -fn dispatch_collation_event_to_all_unbounded( - event: NetworkBridgeEvent, - sender: &mut impl overseer::NetworkBridgeSenderTrait, -) { - if let Ok(msg) = event.focus() { - sender.send_unbounded_message(CollatorProtocolMessage::NetworkBridgeUpdate(msg)) - } -} - -async fn dispatch_validation_events_to_all( - events: I, - sender: &mut impl overseer::NetworkBridgeSenderTrait, -) where - I: IntoIterator>, - I::IntoIter: Send, -{ - for event in events { - sender - .send_messages(event.focus().map(StatementDistributionMessage::from)) - .await; - sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await; - sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await; - sender.send_messages(event.focus().map(GossipSupportMessage::from)).await; - } -} - -async fn dispatch_collation_events_to_all( - events: I, - ctx: &mut impl overseer::NetworkBridgeSenderTrait, -) where - I: IntoIterator>, - I::IntoIter: Send, -{ - let messages_for = |event: NetworkBridgeEvent| { - event.focus().ok().map(|m| CollatorProtocolMessage::NetworkBridgeUpdate(m)) - }; - - ctx.send_messages(events.into_iter().flat_map(messages_for)).await -} diff --git a/node/network/bridge/src/metrics.rs b/node/network/bridge/src/metrics.rs index 4ecdd5bd6f13..0224c4960ac9 100644 --- a/node/network/bridge/src/metrics.rs +++ b/node/network/bridge/src/metrics.rs @@ -26,6 +26,7 @@ fn peer_set_label(peer_set: PeerSet, version: ProtocolVersion) -> &'static str { peer_set.get_protocol_name_static(version).unwrap_or("") } +#[allow(missing_docs)] impl Metrics { pub fn on_peer_connected(&self, peer_set: PeerSet, version: ProtocolVersion) { self.0.as_ref().map(|metrics| { diff --git a/node/network/bridge/src/network.rs b/node/network/bridge/src/network.rs index 538958602d25..e1310f57414c 100644 --- a/node/network/bridge/src/network.rs +++ b/node/network/bridge/src/network.rs @@ -35,7 +35,8 @@ use polkadot_primitives::v2::{AuthorityDiscoveryId, Block, Hash}; use crate::validator_discovery::AuthorityDiscovery; -use super::LOG_TARGET; +// network bridge network abstraction log target +const LOG_TARGET: &'static str = "parachain::network-bridge-net"; /// Send a message to the network. /// diff --git a/node/network/bridge/src/rx/mod.rs b/node/network/bridge/src/rx/mod.rs new file mode 100644 index 000000000000..f135b006f114 --- /dev/null +++ b/node/network/bridge/src/rx/mod.rs @@ -0,0 +1,874 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The Network Bridge Subsystem - handles _incoming_ messages from the network, forwarded to the relevant subsystems. +use super::*; + +use always_assert::never; +use bytes::Bytes; +use futures::stream::BoxStream; +use parity_scale_codec::{Decode, DecodeAll}; + +use sc_network::Event as NetworkEvent; +use sp_consensus::SyncOracle; + +use polkadot_node_network_protocol::{ + self as net_protocol, + peer_set::{PeerSet, PerPeerSet}, + v1 as protocol_v1, ObservedRole, OurView, PeerId, ProtocolVersion, + UnifiedReputationChange as Rep, View, +}; + +use polkadot_node_subsystem::{ + errors::SubsystemError, + messages::{ + network_bridge_event::{NewGossipTopology, TopologyPeerInfo}, + ApprovalDistributionMessage, BitfieldDistributionMessage, CollatorProtocolMessage, + GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeRxMessage, + StatementDistributionMessage, + }, + overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, +}; + +use polkadot_primitives::v2::{AuthorityDiscoveryId, BlockNumber, Hash, ValidatorIndex}; + +/// Peer set info for network initialization. +/// +/// To be added to [`NetworkConfiguration::extra_sets`]. +pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority}; + +use std::{ + collections::{hash_map, HashMap}, + iter::ExactSizeIterator, +}; + +use super::validator_discovery; + +/// Actual interfacing to the network based on the `Network` trait. +/// +/// Defines the `Network` trait with an implementation for an `Arc`. +use crate::network::{send_message, Network}; + +use crate::network::get_peer_id_by_authority_id; + +use super::metrics::Metrics; + +#[cfg(test)] +mod tests; + +// network bridge log target +const LOG_TARGET: &'static str = "parachain::network-bridge-rx"; + +/// The network bridge subsystem - network receiving side. +pub struct NetworkBridgeRx { + /// `Network` trait implementing type. + network_service: N, + authority_discovery_service: AD, + sync_oracle: Box, + shared: Shared, + metrics: Metrics, +} + +impl NetworkBridgeRx { + /// Create a new network bridge subsystem with underlying network service and authority discovery service. + /// + /// This assumes that the network service has had the notifications protocol for the network + /// bridge already registered. See [`peers_sets_info`](peers_sets_info). + pub fn new( + network_service: N, + authority_discovery_service: AD, + sync_oracle: Box, + metrics: Metrics, + ) -> Self { + let shared = Shared::default(); + Self { network_service, authority_discovery_service, sync_oracle, shared, metrics } + } +} + +#[overseer::subsystem(NetworkBridgeRx, error = SubsystemError, prefix = self::overseer)] +impl NetworkBridgeRx +where + Net: Network + Sync, + AD: validator_discovery::AuthorityDiscovery + Clone + Sync, +{ + fn start(mut self, ctx: Context) -> SpawnedSubsystem { + // The stream of networking events has to be created at initialization, otherwise the + // networking might open connections before the stream of events has been grabbed. + let network_stream = self.network_service.event_stream(); + + // Swallow error because failure is fatal to the node and we log with more precision + // within `run_network`. + let future = run_network_in(self, ctx, network_stream) + .map_err(|e| SubsystemError::with_origin("network-bridge", e)) + .boxed(); + SpawnedSubsystem { name: "network-bridge-subsystem", future } + } +} + +async fn update_gossip_peers_1d( + ads: &mut AD, + neighbors: N, +) -> HashMap +where + AD: validator_discovery::AuthorityDiscovery, + N: IntoIterator, + N::IntoIter: std::iter::ExactSizeIterator, +{ + let neighbors = neighbors.into_iter(); + let mut peers = HashMap::with_capacity(neighbors.len()); + for (authority, validator_index) in neighbors { + let addr = get_peer_id_by_authority_id(ads, authority.clone()).await; + + if let Some(peer_id) = addr { + peers.insert(authority, TopologyPeerInfo { peer_ids: vec![peer_id], validator_index }); + } + } + + peers +} + +async fn handle_network_messages( + mut sender: impl overseer::NetworkBridgeRxSenderTrait, + mut network_service: impl Network, + network_stream: BoxStream<'static, NetworkEvent>, + mut authority_discovery_service: AD, + metrics: Metrics, + shared: Shared, +) -> Result<(), Error> +where + AD: validator_discovery::AuthorityDiscovery + Send, +{ + let mut network_stream = network_stream.fuse(); + loop { + match network_stream.next().await { + None => return Err(Error::EventStreamConcluded), + Some(NetworkEvent::Dht(_)) | + Some(NetworkEvent::SyncConnected { .. }) | + Some(NetworkEvent::SyncDisconnected { .. }) => {}, + Some(NetworkEvent::NotificationStreamOpened { + remote: peer, + protocol, + role, + negotiated_fallback, + }) => { + let role = ObservedRole::from(role); + let (peer_set, version) = { + let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) { + None => continue, + Some(p) => p, + }; + + if let Some(fallback) = negotiated_fallback { + match PeerSet::try_from_protocol_name(&fallback) { + None => { + gum::debug!( + target: LOG_TARGET, + fallback = &*fallback, + ?peer, + ?peer_set, + "Unknown fallback", + ); + + continue + }, + Some((p2, v2)) => { + if p2 != peer_set { + gum::debug!( + target: LOG_TARGET, + fallback = &*fallback, + fallback_peerset = ?p2, + protocol = &*protocol, + peerset = ?peer_set, + "Fallback mismatched peer-set", + ); + + continue + } + + (p2, v2) + }, + } + } else { + (peer_set, version) + } + }; + + gum::debug!( + target: LOG_TARGET, + action = "PeerConnected", + peer_set = ?peer_set, + version, + peer = ?peer, + role = ?role + ); + + let local_view = { + let mut shared = shared.0.lock(); + let peer_map = match peer_set { + PeerSet::Validation => &mut shared.validation_peers, + PeerSet::Collation => &mut shared.collation_peers, + }; + + match peer_map.entry(peer.clone()) { + hash_map::Entry::Occupied(_) => continue, + hash_map::Entry::Vacant(vacant) => { + vacant.insert(PeerData { view: View::default(), version }); + }, + } + + metrics.on_peer_connected(peer_set, version); + metrics.note_peer_count(peer_set, version, peer_map.len()); + + shared.local_view.clone().unwrap_or(View::default()) + }; + + let maybe_authority = + authority_discovery_service.get_authority_ids_by_peer_id(peer).await; + + match peer_set { + PeerSet::Validation => { + dispatch_validation_events_to_all( + vec![ + NetworkBridgeEvent::PeerConnected( + peer.clone(), + role, + 1, + maybe_authority, + ), + NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()), + ], + &mut sender, + ) + .await; + + send_message( + &mut network_service, + vec![peer], + PeerSet::Validation, + version, + WireMessage::::ViewUpdate(local_view), + &metrics, + ); + }, + PeerSet::Collation => { + dispatch_collation_events_to_all( + vec![ + NetworkBridgeEvent::PeerConnected( + peer.clone(), + role, + 1, + maybe_authority, + ), + NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()), + ], + &mut sender, + ) + .await; + + send_message( + &mut network_service, + vec![peer], + PeerSet::Collation, + version, + WireMessage::::ViewUpdate(local_view), + &metrics, + ); + }, + } + }, + Some(NetworkEvent::NotificationStreamClosed { remote: peer, protocol }) => { + let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) { + None => continue, + Some(peer_set) => peer_set, + }; + + gum::debug!( + target: LOG_TARGET, + action = "PeerDisconnected", + peer_set = ?peer_set, + peer = ?peer + ); + + let was_connected = { + let mut shared = shared.0.lock(); + let peer_map = match peer_set { + PeerSet::Validation => &mut shared.validation_peers, + PeerSet::Collation => &mut shared.collation_peers, + }; + + let w = peer_map.remove(&peer).is_some(); + + metrics.on_peer_disconnected(peer_set, version); + metrics.note_peer_count(peer_set, version, peer_map.len()); + + w + }; + + if was_connected && version == peer_set.get_default_version() { + match peer_set { + PeerSet::Validation => + dispatch_validation_event_to_all( + NetworkBridgeEvent::PeerDisconnected(peer), + &mut sender, + ) + .await, + PeerSet::Collation => + dispatch_collation_event_to_all( + NetworkBridgeEvent::PeerDisconnected(peer), + &mut sender, + ) + .await, + } + } + }, + Some(NetworkEvent::NotificationsReceived { remote, messages }) => { + let expected_versions = { + let mut versions = PerPeerSet::>::default(); + let shared = shared.0.lock(); + if let Some(peer_data) = shared.validation_peers.get(&remote) { + versions[PeerSet::Validation] = Some(peer_data.version); + } + + if let Some(peer_data) = shared.collation_peers.get(&remote) { + versions[PeerSet::Collation] = Some(peer_data.version); + } + + versions + }; + + // non-decoded, but version-checked validation messages. + let v_messages: Result, _> = messages + .iter() + .filter_map(|(protocol, msg_bytes)| { + // version doesn't matter because we always receive on the 'correct' + // protocol name, not the negotiated fallback. + let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?; + if peer_set == PeerSet::Validation { + if expected_versions[PeerSet::Validation].is_none() { + return Some(Err(UNCONNECTED_PEERSET_COST)) + } + + Some(Ok(msg_bytes.clone())) + } else { + None + } + }) + .collect(); + + let v_messages = match v_messages { + Err(rep) => { + gum::debug!(target: LOG_TARGET, action = "ReportPeer"); + network_service.report_peer(remote, rep); + + continue + }, + Ok(v) => v, + }; + + // non-decoded, but version-checked colldation messages. + let c_messages: Result, _> = messages + .iter() + .filter_map(|(protocol, msg_bytes)| { + // version doesn't matter because we always receive on the 'correct' + // protocol name, not the negotiated fallback. + let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?; + + if peer_set == PeerSet::Collation { + if expected_versions[PeerSet::Collation].is_none() { + return Some(Err(UNCONNECTED_PEERSET_COST)) + } + + Some(Ok(msg_bytes.clone())) + } else { + None + } + }) + .collect(); + + let c_messages = match c_messages { + Err(rep) => { + gum::debug!(target: LOG_TARGET, action = "ReportPeer"); + network_service.report_peer(remote, rep); + + continue + }, + Ok(v) => v, + }; + + if v_messages.is_empty() && c_messages.is_empty() { + continue + } + + gum::trace!( + target: LOG_TARGET, + action = "PeerMessages", + peer = ?remote, + num_validation_messages = %v_messages.len(), + num_collation_messages = %c_messages.len() + ); + + if !v_messages.is_empty() { + let (events, reports) = + if expected_versions[PeerSet::Validation] == Some(1) { + handle_v1_peer_messages::( + remote.clone(), + PeerSet::Validation, + &mut shared.0.lock().validation_peers, + v_messages, + &metrics, + ) + } else { + gum::warn!( + target: LOG_TARGET, + version = ?expected_versions[PeerSet::Validation], + "Major logic bug. Peer somehow has unsupported validation protocol version." + ); + + never!("Only version 1 is supported; peer set connection checked above; qed"); + + // If a peer somehow triggers this, we'll disconnect them + // eventually. + (Vec::new(), vec![UNCONNECTED_PEERSET_COST]) + }; + + for report in reports { + network_service.report_peer(remote.clone(), report); + } + + dispatch_validation_events_to_all(events, &mut sender).await; + } + + if !c_messages.is_empty() { + let (events, reports) = + if expected_versions[PeerSet::Collation] == Some(1) { + handle_v1_peer_messages::( + remote.clone(), + PeerSet::Collation, + &mut shared.0.lock().collation_peers, + c_messages, + &metrics, + ) + } else { + gum::warn!( + target: LOG_TARGET, + version = ?expected_versions[PeerSet::Collation], + "Major logic bug. Peer somehow has unsupported collation protocol version." + ); + + never!("Only version 1 is supported; peer set connection checked above; qed"); + + // If a peer somehow triggers this, we'll disconnect them + // eventually. + (Vec::new(), vec![UNCONNECTED_PEERSET_COST]) + }; + + for report in reports { + network_service.report_peer(remote.clone(), report); + } + + dispatch_collation_events_to_all(events, &mut sender).await; + } + }, + } + } +} + +#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)] +async fn run_incoming_orchestra_signals( + mut ctx: Context, + mut network_service: N, + mut authority_discovery_service: AD, + shared: Shared, + sync_oracle: Box, + metrics: Metrics, +) -> Result<(), Error> +where + N: Network, + AD: validator_discovery::AuthorityDiscovery + Clone, +{ + // This is kept sorted, descending, by block number. + let mut live_heads: Vec = Vec::with_capacity(MAX_VIEW_HEADS); + let mut finalized_number = 0; + + let mut mode = Mode::Syncing(sync_oracle); + loop { + match ctx.recv().fuse().await? { + FromOrchestra::Communication { + msg: + NetworkBridgeRxMessage::NewGossipTopology { + session, + our_neighbors_x, + our_neighbors_y, + }, + } => { + gum::debug!( + target: LOG_TARGET, + action = "NewGossipTopology", + neighbors_x = our_neighbors_x.len(), + neighbors_y = our_neighbors_y.len(), + "Gossip topology has changed", + ); + + let gossip_peers_x = + update_gossip_peers_1d(&mut authority_discovery_service, our_neighbors_x).await; + + let gossip_peers_y = + update_gossip_peers_1d(&mut authority_discovery_service, our_neighbors_y).await; + + dispatch_validation_event_to_all_unbounded( + NetworkBridgeEvent::NewGossipTopology(NewGossipTopology { + session, + our_neighbors_x: gossip_peers_x, + our_neighbors_y: gossip_peers_y, + }), + ctx.sender(), + ); + }, + FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), + FromOrchestra::Signal(OverseerSignal::ActiveLeaves(active_leaves)) => { + let ActiveLeavesUpdate { activated, deactivated } = active_leaves; + gum::trace!( + target: LOG_TARGET, + action = "ActiveLeaves", + has_activated = activated.is_some(), + num_deactivated = %deactivated.len(), + ); + + for activated in activated { + let pos = live_heads + .binary_search_by(|probe| probe.number.cmp(&activated.number).reverse()) + .unwrap_or_else(|i| i); + + live_heads.insert(pos, activated); + } + live_heads.retain(|h| !deactivated.contains(&h.hash)); + + // if we're done syncing, set the mode to `Mode::Active`. + // Otherwise, we don't need to send view updates. + { + let is_done_syncing = match mode { + Mode::Active => true, + Mode::Syncing(ref mut sync_oracle) => !sync_oracle.is_major_syncing(), + }; + + if is_done_syncing { + mode = Mode::Active; + + update_our_view( + &mut network_service, + &mut ctx, + &live_heads, + &shared, + finalized_number, + &metrics, + ); + } + } + }, + FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { + gum::trace!(target: LOG_TARGET, action = "BlockFinalized"); + + debug_assert!(finalized_number < number); + + // we don't send the view updates here, but delay them until the next `ActiveLeaves` + // otherwise it might break assumptions of some of the subsystems + // that we never send the same `ActiveLeavesUpdate` + finalized_number = number; + }, + } + } +} + +/// Main driver, processing network events and overseer signals. +/// +/// THIS IS A HACK. We need to ensure we never hold the mutex across an `.await` boundary +/// and `parking_lot` currently does not provide `Send`, which helps us enforce that. +/// If this breaks, we need to find another way to protect ourselves. +/// +/// ```compile_fail +/// #use parking_lot::MutexGuard; +/// #fn is_send(); +/// #is_send::(); +/// ``` +#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)] +async fn run_network_in( + bridge: NetworkBridgeRx, + mut ctx: Context, + network_stream: BoxStream<'static, NetworkEvent>, +) -> Result<(), Error> +where + N: Network, + AD: validator_discovery::AuthorityDiscovery + Clone, +{ + let NetworkBridgeRx { + network_service, + authority_discovery_service, + metrics, + sync_oracle, + shared, + } = bridge; + + let (task, network_event_handler) = handle_network_messages( + ctx.sender().clone(), + network_service.clone(), + network_stream, + authority_discovery_service.clone(), + metrics.clone(), + shared.clone(), + ) + .remote_handle(); + + ctx.spawn("network-bridge-in-network-worker", Box::pin(task))?; + futures::pin_mut!(network_event_handler); + + let orchestra_signal_handler = run_incoming_orchestra_signals( + ctx, + network_service, + authority_discovery_service, + shared, + sync_oracle, + metrics, + ); + + futures::pin_mut!(orchestra_signal_handler); + + futures::future::select(orchestra_signal_handler, network_event_handler) + .await + .factor_first() + .0?; + Ok(()) +} + +fn construct_view( + live_heads: impl DoubleEndedIterator, + finalized_number: BlockNumber, +) -> View { + View::new(live_heads.take(MAX_VIEW_HEADS), finalized_number) +} + +#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)] +fn update_our_view( + net: &mut Net, + ctx: &mut Context, + live_heads: &[ActivatedLeaf], + shared: &Shared, + finalized_number: BlockNumber, + metrics: &Metrics, +) where + Net: Network, +{ + let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number); + + let (validation_peers, collation_peers) = { + let mut shared = shared.0.lock(); + + // We only want to send a view update when the heads changed. + // A change in finalized block number only is _not_ sufficient. + // + // If this is the first view update since becoming active, but our view is empty, + // there is no need to send anything. + match shared.local_view { + Some(ref v) if v.check_heads_eq(&new_view) => return, + None if live_heads.is_empty() => { + shared.local_view = Some(new_view); + return + }, + _ => { + shared.local_view = Some(new_view.clone()); + }, + } + + ( + shared.validation_peers.keys().cloned().collect::>(), + shared.collation_peers.keys().cloned().collect::>(), + ) + }; + + send_validation_message_v1( + net, + validation_peers, + WireMessage::ViewUpdate(new_view.clone()), + metrics, + ); + + send_collation_message_v1(net, collation_peers, WireMessage::ViewUpdate(new_view), metrics); + + let our_view = OurView::new( + live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| (a.hash, a.span)), + finalized_number, + ); + + dispatch_validation_event_to_all_unbounded( + NetworkBridgeEvent::OurViewChange(our_view.clone()), + ctx.sender(), + ); + + dispatch_collation_event_to_all_unbounded( + NetworkBridgeEvent::OurViewChange(our_view), + ctx.sender(), + ); +} + +// Handle messages on a specific v1 peer-set. The peer is expected to be connected on that +// peer-set. +fn handle_v1_peer_messages>( + peer: PeerId, + peer_set: PeerSet, + peers: &mut HashMap, + messages: Vec, + metrics: &Metrics, +) -> (Vec>, Vec) { + let peer_data = match peers.get_mut(&peer) { + None => return (Vec::new(), vec![UNCONNECTED_PEERSET_COST]), + Some(d) => d, + }; + + let mut outgoing_events = Vec::with_capacity(messages.len()); + let mut reports = Vec::new(); + + for message in messages { + metrics.on_notification_received(peer_set, peer_data.version, message.len()); + let message = match WireMessage::::decode_all(&mut message.as_ref()) { + Err(_) => { + reports.push(MALFORMED_MESSAGE_COST); + continue + }, + Ok(m) => m, + }; + + outgoing_events.push(match message { + WireMessage::ViewUpdate(new_view) => { + if new_view.len() > MAX_VIEW_HEADS || + new_view.finalized_number < peer_data.view.finalized_number + { + reports.push(MALFORMED_VIEW_COST); + continue + } else if new_view.is_empty() { + reports.push(EMPTY_VIEW_COST); + continue + } else if new_view == peer_data.view { + continue + } else { + peer_data.view = new_view; + + NetworkBridgeEvent::PeerViewChange(peer.clone(), peer_data.view.clone()) + } + }, + WireMessage::ProtocolMessage(message) => + NetworkBridgeEvent::PeerMessage(peer.clone(), message.into()), + }) + } + + (outgoing_events, reports) +} + +fn send_validation_message_v1( + net: &mut impl Network, + peers: Vec, + message: WireMessage, + metrics: &Metrics, +) { + send_message(net, peers, PeerSet::Validation, 1, message, metrics); +} + +fn send_collation_message_v1( + net: &mut impl Network, + peers: Vec, + message: WireMessage, + metrics: &Metrics, +) { + send_message(net, peers, PeerSet::Collation, 1, message, metrics) +} + +async fn dispatch_validation_event_to_all( + event: NetworkBridgeEvent, + ctx: &mut impl overseer::NetworkBridgeRxSenderTrait, +) { + dispatch_validation_events_to_all(std::iter::once(event), ctx).await +} + +async fn dispatch_collation_event_to_all( + event: NetworkBridgeEvent, + ctx: &mut impl overseer::NetworkBridgeRxSenderTrait, +) { + dispatch_collation_events_to_all(std::iter::once(event), ctx).await +} + +fn dispatch_validation_event_to_all_unbounded( + event: NetworkBridgeEvent, + sender: &mut impl overseer::NetworkBridgeRxSenderTrait, +) { + event + .focus() + .ok() + .map(StatementDistributionMessage::from) + .and_then(|msg| Some(sender.send_unbounded_message(msg))); + event + .focus() + .ok() + .map(BitfieldDistributionMessage::from) + .and_then(|msg| Some(sender.send_unbounded_message(msg))); + event + .focus() + .ok() + .map(ApprovalDistributionMessage::from) + .and_then(|msg| Some(sender.send_unbounded_message(msg))); + event + .focus() + .ok() + .map(GossipSupportMessage::from) + .and_then(|msg| Some(sender.send_unbounded_message(msg))); +} + +fn dispatch_collation_event_to_all_unbounded( + event: NetworkBridgeEvent, + sender: &mut impl overseer::NetworkBridgeRxSenderTrait, +) { + if let Ok(msg) = event.focus() { + sender.send_unbounded_message(CollatorProtocolMessage::NetworkBridgeUpdate(msg)) + } +} + +async fn dispatch_validation_events_to_all( + events: I, + sender: &mut impl overseer::NetworkBridgeRxSenderTrait, +) where + I: IntoIterator>, + I::IntoIter: Send, +{ + for event in events { + sender + .send_messages(event.focus().map(StatementDistributionMessage::from)) + .await; + sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await; + sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await; + sender.send_messages(event.focus().map(GossipSupportMessage::from)).await; + } +} + +async fn dispatch_collation_events_to_all( + events: I, + ctx: &mut impl overseer::NetworkBridgeRxSenderTrait, +) where + I: IntoIterator>, + I::IntoIter: Send, +{ + let messages_for = |event: NetworkBridgeEvent| { + event.focus().ok().map(|m| CollatorProtocolMessage::NetworkBridgeUpdate(m)) + }; + + ctx.send_messages(events.into_iter().flat_map(messages_for)).await +} diff --git a/node/network/bridge/src/tests.rs b/node/network/bridge/src/rx/tests.rs similarity index 89% rename from node/network/bridge/src/tests.rs rename to node/network/bridge/src/rx/tests.rs index 80929580d165..d7e95a966bcc 100644 --- a/node/network/bridge/src/tests.rs +++ b/node/network/bridge/src/rx/tests.rs @@ -16,6 +16,8 @@ use super::*; use futures::{channel::oneshot, executor, stream::BoxStream}; +use polkadot_node_network_protocol::{self as net_protocol, OurView}; +use polkadot_node_subsystem::{messages::NetworkBridgeEvent, ActivatedLeaf}; use assert_matches::assert_matches; use async_trait::async_trait; @@ -44,7 +46,7 @@ use polkadot_node_subsystem_test_helpers::{ }; use polkadot_node_subsystem_util::metered; use polkadot_primitives::v2::AuthorityDiscoveryId; -use polkadot_primitives_test_helpers::dummy_collator_signature; + use sc_network::Multiaddr; use sp_keyring::Sr25519Keyring; @@ -214,18 +216,18 @@ fn assert_network_actions_contains(actions: &[NetworkAction], action: &NetworkAc #[derive(Clone)] struct TestSyncOracle { - flag: Arc, + is_major_syncing: Arc, done_syncing_sender: Arc>>>, } struct TestSyncOracleHandle { done_syncing_receiver: oneshot::Receiver<()>, - flag: Arc, + is_major_syncing: Arc, } impl TestSyncOracleHandle { fn set_done(&self) { - self.flag.store(false, Ordering::SeqCst); + self.is_major_syncing.store(false, Ordering::SeqCst); } async fn await_mode_switch(self) { @@ -235,7 +237,7 @@ impl TestSyncOracleHandle { impl SyncOracle for TestSyncOracle { fn is_major_syncing(&mut self) -> bool { - let is_major_syncing = self.flag.load(Ordering::SeqCst); + let is_major_syncing = self.is_major_syncing.load(Ordering::SeqCst); if !is_major_syncing { if let Some(sender) = self.done_syncing_sender.lock().take() { @@ -252,13 +254,16 @@ impl SyncOracle for TestSyncOracle { } // val - result of `is_major_syncing`. -fn make_sync_oracle(val: bool) -> (TestSyncOracle, TestSyncOracleHandle) { +fn make_sync_oracle(is_major_syncing: bool) -> (TestSyncOracle, TestSyncOracleHandle) { let (tx, rx) = oneshot::channel(); - let flag = Arc::new(AtomicBool::new(val)); + let is_major_syncing = Arc::new(AtomicBool::new(is_major_syncing)); ( - TestSyncOracle { flag: flag.clone(), done_syncing_sender: Arc::new(Mutex::new(Some(tx))) }, - TestSyncOracleHandle { flag, done_syncing_receiver: rx }, + TestSyncOracle { + is_major_syncing: is_major_syncing.clone(), + done_syncing_sender: Arc::new(Mutex::new(Some(tx))), + }, + TestSyncOracleHandle { is_major_syncing, done_syncing_receiver: rx }, ) } @@ -267,7 +272,7 @@ fn done_syncing_oracle() -> Box { Box::new(oracle) } -type VirtualOverseer = TestSubsystemContextHandle; +type VirtualOverseer = TestSubsystemContextHandle; struct TestHarness { network_handle: TestNetworkHandle, @@ -284,14 +289,15 @@ fn test_harness>( polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); let network_stream = network.event_stream(); - let bridge = NetworkBridge { + let bridge = NetworkBridgeRx { network_service: network, authority_discovery_service: discovery, metrics: Metrics(None), sync_oracle, + shared: Shared::default(), }; - let network_bridge = run_network(bridge, context, network_stream) + let network_bridge = run_network_in(bridge, context, network_stream) .map_err(|_| panic!("subsystem execution failed")) .map(|_| ()); @@ -311,7 +317,7 @@ fn test_harness>( async fn assert_sends_validation_event_to_all( event: NetworkBridgeEvent, - virtual_overseer: &mut TestSubsystemContextHandle, + virtual_overseer: &mut TestSubsystemContextHandle, ) { // Ordering must be consistent across: // `fn dispatch_validation_event_to_all_unbounded` @@ -347,7 +353,7 @@ async fn assert_sends_validation_event_to_all( async fn assert_sends_collation_event_to_all( event: NetworkBridgeEvent, - virtual_overseer: &mut TestSubsystemContextHandle, + virtual_overseer: &mut TestSubsystemContextHandle, ) { assert_matches!( virtual_overseer.recv().await, @@ -485,6 +491,7 @@ fn do_not_send_view_update_until_synced() { let peer_a = PeerId::random(); let peer_b = PeerId::random(); + assert_ne!(peer_a, peer_b); network_handle .connect_peer(peer_a.clone(), PeerSet::Validation, ObservedRole::Full) @@ -1082,117 +1089,6 @@ fn view_finalized_number_can_not_go_down() { }); } -#[test] -fn send_messages_to_peers() { - test_harness(done_syncing_oracle(), |test_harness| async move { - let TestHarness { mut network_handle, mut virtual_overseer } = test_harness; - - let peer = PeerId::random(); - - network_handle - .connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full) - .await; - network_handle - .connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full) - .await; - - // bridge will inform about all connected peers. - { - assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), - &mut virtual_overseer, - ) - .await; - - assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()), - &mut virtual_overseer, - ) - .await; - } - - { - assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None), - &mut virtual_overseer, - ) - .await; - - assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()), - &mut virtual_overseer, - ) - .await; - } - - // consume peer view changes - { - let _peer_view_changes = network_handle.next_network_actions(2).await; - } - - // send a validation protocol message. - - { - let approval_distribution_message = - protocol_v1::ApprovalDistributionMessage::Approvals(Vec::new()); - - let message_v1 = protocol_v1::ValidationProtocol::ApprovalDistribution( - approval_distribution_message.clone(), - ); - - virtual_overseer - .send(FromOrchestra::Communication { - msg: NetworkBridgeMessage::SendValidationMessage( - vec![peer.clone()], - Versioned::V1(message_v1.clone()), - ), - }) - .await; - - assert_eq!( - network_handle.next_network_action().await, - NetworkAction::WriteNotification( - peer.clone(), - PeerSet::Validation, - WireMessage::ProtocolMessage(message_v1).encode(), - ) - ); - } - - // send a collation protocol message. - - { - let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare( - Sr25519Keyring::Alice.public().into(), - 0_u32.into(), - dummy_collator_signature(), - ); - - let message_v1 = - protocol_v1::CollationProtocol::CollatorProtocol(collator_protocol_message.clone()); - - virtual_overseer - .send(FromOrchestra::Communication { - msg: NetworkBridgeMessage::SendCollationMessage( - vec![peer.clone()], - Versioned::V1(message_v1.clone()), - ), - }) - .await; - - assert_eq!( - network_handle.next_network_action().await, - NetworkAction::WriteNotification( - peer.clone(), - PeerSet::Collation, - WireMessage::ProtocolMessage(message_v1).encode(), - ) - ); - } - virtual_overseer - }); -} - #[test] fn our_view_updates_decreasing_order_and_limited_to_max() { test_harness(done_syncing_oracle(), |test_harness| async move { diff --git a/node/network/bridge/src/tx/mod.rs b/node/network/bridge/src/tx/mod.rs new file mode 100644 index 000000000000..d58ccf8fbb95 --- /dev/null +++ b/node/network/bridge/src/tx/mod.rs @@ -0,0 +1,301 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The Network Bridge Subsystem - handles _outgoing_ messages, from subsystem to the network. +use super::*; + +use polkadot_node_network_protocol::{peer_set::PeerSet, v1 as protocol_v1, PeerId, Versioned}; + +use polkadot_node_subsystem::{ + errors::SubsystemError, messages::NetworkBridgeTxMessage, overseer, FromOrchestra, + OverseerSignal, SpawnedSubsystem, +}; + +/// Peer set info for network initialization. +/// +/// To be added to [`NetworkConfiguration::extra_sets`]. +pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority}; + +use crate::validator_discovery; + +/// Actual interfacing to the network based on the `Network` trait. +/// +/// Defines the `Network` trait with an implementation for an `Arc`. +use crate::network::{send_message, Network}; + +use crate::metrics::Metrics; + +#[cfg(test)] +mod tests; + +// network bridge log target +const LOG_TARGET: &'static str = "parachain::network-bridge-tx"; + +/// The network bridge subsystem. +pub struct NetworkBridgeTx { + /// `Network` trait implementing type. + network_service: N, + authority_discovery_service: AD, + metrics: Metrics, +} + +impl NetworkBridgeTx { + /// Create a new network bridge subsystem with underlying network service and authority discovery service. + /// + /// This assumes that the network service has had the notifications protocol for the network + /// bridge already registered. See [`peers_sets_info`](peers_sets_info). + pub fn new(network_service: N, authority_discovery_service: AD, metrics: Metrics) -> Self { + Self { network_service, authority_discovery_service, metrics } + } +} + +#[overseer::subsystem(NetworkBridgeTx, error = SubsystemError, prefix = self::overseer)] +impl NetworkBridgeTx +where + Net: Network + Sync, + AD: validator_discovery::AuthorityDiscovery + Clone + Sync, +{ + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = run_network_out(self, ctx) + .map_err(|e| SubsystemError::with_origin("network-bridge", e)) + .boxed(); + SpawnedSubsystem { name: "network-bridge-subsystem", future } + } +} + +#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)] +async fn handle_subsystem_messages( + mut ctx: Context, + mut network_service: N, + mut authority_discovery_service: AD, + metrics: Metrics, +) -> Result<(), Error> +where + N: Network, + AD: validator_discovery::AuthorityDiscovery + Clone, +{ + let mut validator_discovery = validator_discovery::Service::::new(); + + loop { + match ctx.recv().fuse().await? { + FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), + FromOrchestra::Signal(_) => { /* handled by incoming */ }, + FromOrchestra::Communication { msg } => { + (network_service, authority_discovery_service) = + handle_incoming_subsystem_communication( + &mut ctx, + network_service, + &mut validator_discovery, + authority_discovery_service.clone(), + msg, + &metrics, + ) + .await; + }, + } + } +} + +#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)] +async fn handle_incoming_subsystem_communication( + _ctx: &mut Context, + mut network_service: N, + validator_discovery: &mut validator_discovery::Service, + mut authority_discovery_service: AD, + msg: NetworkBridgeTxMessage, + metrics: &Metrics, +) -> (N, AD) +where + N: Network, + AD: validator_discovery::AuthorityDiscovery + Clone, +{ + match msg { + NetworkBridgeTxMessage::ReportPeer(peer, rep) => { + if !rep.is_benefit() { + gum::debug!(target: LOG_TARGET, ?peer, ?rep, action = "ReportPeer"); + } + + metrics.on_report_event(); + network_service.report_peer(peer, rep); + }, + NetworkBridgeTxMessage::DisconnectPeer(peer, peer_set) => { + gum::trace!( + target: LOG_TARGET, + action = "DisconnectPeer", + ?peer, + peer_set = ?peer_set, + ); + + network_service.disconnect_peer(peer, peer_set); + }, + NetworkBridgeTxMessage::SendValidationMessage(peers, msg) => { + gum::trace!( + target: LOG_TARGET, + action = "SendValidationMessages", + num_messages = 1usize, + ); + + match msg { + Versioned::V1(msg) => send_validation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } + }, + NetworkBridgeTxMessage::SendValidationMessages(msgs) => { + gum::trace!( + target: LOG_TARGET, + action = "SendValidationMessages", + num_messages = %msgs.len(), + ); + + for (peers, msg) in msgs { + match msg { + Versioned::V1(msg) => send_validation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } + } + }, + NetworkBridgeTxMessage::SendCollationMessage(peers, msg) => { + gum::trace!( + target: LOG_TARGET, + action = "SendCollationMessages", + num_messages = 1usize, + ); + + match msg { + Versioned::V1(msg) => send_collation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } + }, + NetworkBridgeTxMessage::SendCollationMessages(msgs) => { + gum::trace!( + target: LOG_TARGET, + action = "SendCollationMessages", + num_messages = %msgs.len(), + ); + + for (peers, msg) in msgs { + match msg { + Versioned::V1(msg) => send_collation_message_v1( + &mut network_service, + peers, + WireMessage::ProtocolMessage(msg), + &metrics, + ), + } + } + }, + NetworkBridgeTxMessage::SendRequests(reqs, if_disconnected) => { + gum::trace!( + target: LOG_TARGET, + action = "SendRequests", + num_requests = %reqs.len(), + ); + + for req in reqs { + network_service + .start_request(&mut authority_discovery_service, req, if_disconnected) + .await; + } + }, + NetworkBridgeTxMessage::ConnectToValidators { validator_ids, peer_set, failed } => { + gum::trace!( + target: LOG_TARGET, + action = "ConnectToValidators", + peer_set = ?peer_set, + ids = ?validator_ids, + "Received a validator connection request", + ); + + metrics.note_desired_peer_count(peer_set, validator_ids.len()); + + let (network_service, ads) = validator_discovery + .on_request( + validator_ids, + peer_set, + failed, + network_service, + authority_discovery_service, + ) + .await; + + return (network_service, ads) + }, + NetworkBridgeTxMessage::ConnectToResolvedValidators { validator_addrs, peer_set } => { + gum::trace!( + target: LOG_TARGET, + action = "ConnectToPeers", + peer_set = ?peer_set, + ?validator_addrs, + "Received a resolved validator connection request", + ); + + metrics.note_desired_peer_count(peer_set, validator_addrs.len()); + + let all_addrs = validator_addrs.into_iter().flatten().collect(); + let network_service = validator_discovery + .on_resolved_request(all_addrs, peer_set, network_service) + .await; + return (network_service, authority_discovery_service) + }, + } + (network_service, authority_discovery_service) +} + +#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)] +async fn run_network_out( + bridge: NetworkBridgeTx, + ctx: Context, +) -> Result<(), Error> +where + N: Network, + AD: validator_discovery::AuthorityDiscovery + Clone + Sync, +{ + let NetworkBridgeTx { network_service, authority_discovery_service, metrics } = bridge; + + handle_subsystem_messages(ctx, network_service, authority_discovery_service, metrics).await?; + + Ok(()) +} + +fn send_validation_message_v1( + net: &mut impl Network, + peers: Vec, + message: WireMessage, + metrics: &Metrics, +) { + send_message(net, peers, PeerSet::Validation, 1, message, metrics); +} + +fn send_collation_message_v1( + net: &mut impl Network, + peers: Vec, + message: WireMessage, + metrics: &Metrics, +) { + send_message(net, peers, PeerSet::Collation, 1, message, metrics) +} diff --git a/node/network/bridge/src/tx/tests.rs b/node/network/bridge/src/tx/tests.rs new file mode 100644 index 000000000000..63d9730e6599 --- /dev/null +++ b/node/network/bridge/src/tx/tests.rs @@ -0,0 +1,298 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use super::*; +use futures::{executor, stream::BoxStream}; +use polkadot_node_subsystem_util::TimeoutExt; + +use async_trait::async_trait; +use parking_lot::Mutex; +use std::{borrow::Cow, collections::HashSet}; + +use sc_network::{Event as NetworkEvent, IfDisconnected}; + +use polkadot_node_network_protocol::{ + request_response::outgoing::Requests, ObservedRole, Versioned, +}; +use polkadot_node_subsystem::{FromOrchestra, OverseerSignal}; +use polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle; +use polkadot_node_subsystem_util::metered; +use polkadot_primitives::v2::AuthorityDiscoveryId; +use polkadot_primitives_test_helpers::dummy_collator_signature; +use sc_network::Multiaddr; +use sp_keyring::Sr25519Keyring; + +const TIMEOUT: std::time::Duration = polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle::::TIMEOUT; + +use crate::{network::Network, validator_discovery::AuthorityDiscovery, Rep}; + +#[derive(Debug, PartialEq)] +pub enum NetworkAction { + /// Note a change in reputation for a peer. + ReputationChange(PeerId, Rep), + /// Disconnect a peer from the given peer-set. + DisconnectPeer(PeerId, PeerSet), + /// Write a notification to a given peer on the given peer-set. + WriteNotification(PeerId, PeerSet, Vec), +} + +// The subsystem's view of the network - only supports a single call to `event_stream`. +#[derive(Clone)] +struct TestNetwork { + net_events: Arc>>>, + action_tx: Arc>>, +} + +#[derive(Clone, Debug)] +struct TestAuthorityDiscovery; + +// The test's view of the network. This receives updates from the subsystem in the form +// of `NetworkAction`s. +struct TestNetworkHandle { + action_rx: metered::UnboundedMeteredReceiver, + net_tx: metered::MeteredSender, +} + +fn new_test_network() -> (TestNetwork, TestNetworkHandle, TestAuthorityDiscovery) { + let (net_tx, net_rx) = metered::channel(10); + let (action_tx, action_rx) = metered::unbounded(); + + ( + TestNetwork { + net_events: Arc::new(Mutex::new(Some(net_rx))), + action_tx: Arc::new(Mutex::new(action_tx)), + }, + TestNetworkHandle { action_rx, net_tx }, + TestAuthorityDiscovery, + ) +} + +#[async_trait] +impl Network for TestNetwork { + fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { + self.net_events + .lock() + .take() + .expect("Subsystem made more than one call to `event_stream`") + .boxed() + } + + async fn set_reserved_peers( + &mut self, + _protocol: Cow<'static, str>, + _: HashSet, + ) -> Result<(), String> { + Ok(()) + } + + async fn remove_from_peers_set(&mut self, _protocol: Cow<'static, str>, _: Vec) {} + + async fn start_request( + &self, + _: &mut AD, + _: Requests, + _: IfDisconnected, + ) { + } + + fn report_peer(&self, who: PeerId, cost_benefit: Rep) { + self.action_tx + .lock() + .unbounded_send(NetworkAction::ReputationChange(who, cost_benefit)) + .unwrap(); + } + + fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet) { + self.action_tx + .lock() + .unbounded_send(NetworkAction::DisconnectPeer(who, peer_set)) + .unwrap(); + } + + fn write_notification(&self, who: PeerId, peer_set: PeerSet, message: Vec) { + self.action_tx + .lock() + .unbounded_send(NetworkAction::WriteNotification(who, peer_set, message)) + .unwrap(); + } +} + +#[async_trait] +impl validator_discovery::AuthorityDiscovery for TestAuthorityDiscovery { + async fn get_addresses_by_authority_id( + &mut self, + _authority: AuthorityDiscoveryId, + ) -> Option> { + None + } + + async fn get_authority_ids_by_peer_id( + &mut self, + _peer_id: PeerId, + ) -> Option> { + None + } +} + +impl TestNetworkHandle { + // Get the next network action. + async fn next_network_action(&mut self) -> NetworkAction { + self.action_rx.next().await.expect("subsystem concluded early") + } + + async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) { + self.send_network_event(NetworkEvent::NotificationStreamOpened { + remote: peer, + protocol: peer_set.into_default_protocol_name(), + negotiated_fallback: None, + role: role.into(), + }) + .await; + } + + async fn send_network_event(&mut self, event: NetworkEvent) { + self.net_tx.send(event).await.expect("subsystem concluded early"); + } +} + +type VirtualOverseer = TestSubsystemContextHandle; + +struct TestHarness { + network_handle: TestNetworkHandle, + virtual_overseer: VirtualOverseer, +} + +fn test_harness>(test: impl FnOnce(TestHarness) -> T) { + let pool = sp_core::testing::TaskExecutor::new(); + let (network, network_handle, discovery) = new_test_network(); + + let (context, virtual_overseer) = + polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); + + let bridge_out = NetworkBridgeTx::new(network, discovery, Metrics(None)); + + let network_bridge_out_fut = run_network_out(bridge_out, context) + .map_err(|e| panic!("bridge-out subsystem execution failed {:?}", e)) + .map(|_| ()); + + let test_fut = test(TestHarness { network_handle, virtual_overseer }); + + futures::pin_mut!(test_fut); + futures::pin_mut!(network_bridge_out_fut); + + let _ = executor::block_on(future::join( + async move { + let mut virtual_overseer = test_fut.await; + virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; + }, + network_bridge_out_fut, + )); +} + +#[test] +fn send_messages_to_peers() { + test_harness(|test_harness| async move { + let TestHarness { mut network_handle, mut virtual_overseer } = test_harness; + + let peer = PeerId::random(); + + network_handle + .connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full) + .timeout(TIMEOUT) + .await + .expect("Timeout does not occur"); + + // the outgoing side does not consume network messages + // so the single item sink has to be free explicitly + + network_handle + .connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full) + .timeout(TIMEOUT) + .await + .expect("Timeout does not occur"); + + // send a validation protocol message. + + { + let approval_distribution_message = + protocol_v1::ApprovalDistributionMessage::Approvals(Vec::new()); + + let message_v1 = protocol_v1::ValidationProtocol::ApprovalDistribution( + approval_distribution_message.clone(), + ); + + virtual_overseer + .send(FromOrchestra::Communication { + msg: NetworkBridgeTxMessage::SendValidationMessage( + vec![peer.clone()], + Versioned::V1(message_v1.clone()), + ), + }) + .timeout(TIMEOUT) + .await + .expect("Timeout does not occur"); + + assert_eq!( + network_handle + .next_network_action() + .timeout(TIMEOUT) + .await + .expect("Timeout does not occur"), + NetworkAction::WriteNotification( + peer.clone(), + PeerSet::Validation, + WireMessage::ProtocolMessage(message_v1).encode(), + ) + ); + } + + // send a collation protocol message. + + { + let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare( + Sr25519Keyring::Alice.public().into(), + 0_u32.into(), + dummy_collator_signature(), + ); + + let message_v1 = + protocol_v1::CollationProtocol::CollatorProtocol(collator_protocol_message.clone()); + + virtual_overseer + .send(FromOrchestra::Communication { + msg: NetworkBridgeTxMessage::SendCollationMessage( + vec![peer.clone()], + Versioned::V1(message_v1.clone()), + ), + }) + .await; + + assert_eq!( + network_handle + .next_network_action() + .timeout(TIMEOUT) + .await + .expect("Timeout does not occur"), + NetworkAction::WriteNotification( + peer.clone(), + PeerSet::Collation, + WireMessage::ProtocolMessage(message_v1).encode(), + ) + ); + } + virtual_overseer + }); +} diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 66b404551c52..c1a20a2a670b 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -39,7 +39,7 @@ use polkadot_node_primitives::{CollationSecondedSignal, PoV, Statement}; use polkadot_node_subsystem::{ jaeger, messages::{ - CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeMessage, RuntimeApiMessage, + CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeTxMessage, RuntimeApiMessage, }, overseer, FromOrchestra, OverseerSignal, PerLeafSpan, }; @@ -533,7 +533,7 @@ async fn declare(ctx: &mut Context, state: &mut State, peer: PeerId) { state.collator_pair.sign(&declare_signature_payload), ); - ctx.send_message(NetworkBridgeMessage::SendCollationMessage( + ctx.send_message(NetworkBridgeTxMessage::SendCollationMessage( vec![peer], Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)), )) @@ -551,7 +551,7 @@ async fn connect_to_validators( // ignore address resolution failure // will reissue a new request on new collation let (failed, _) = oneshot::channel(); - ctx.send_message(NetworkBridgeMessage::ConnectToValidators { + ctx.send_message(NetworkBridgeTxMessage::ConnectToValidators { validator_ids, peer_set: PeerSet::Collation, failed, @@ -608,7 +608,7 @@ async fn advertise_collation( let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent); - ctx.send_message(NetworkBridgeMessage::SendCollationMessage( + ctx.send_message(NetworkBridgeTxMessage::SendCollationMessage( vec![peer.clone()], Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)), )) @@ -751,7 +751,7 @@ async fn handle_incoming_peer_message( ); // If we are declared to, this is another collator, and we should disconnect. - ctx.send_message(NetworkBridgeMessage::DisconnectPeer(origin, PeerSet::Collation)) + ctx.send_message(NetworkBridgeTxMessage::DisconnectPeer(origin, PeerSet::Collation)) .await; }, AdvertiseCollation(_) => { @@ -761,14 +761,14 @@ async fn handle_incoming_peer_message( "AdvertiseCollation message is not expected on the collator side of the protocol", ); - ctx.send_message(NetworkBridgeMessage::ReportPeer( + ctx.send_message(NetworkBridgeTxMessage::ReportPeer( origin.clone(), COST_UNEXPECTED_MESSAGE, )) .await; // If we are advertised to, this is another collator, and we should disconnect. - ctx.send_message(NetworkBridgeMessage::DisconnectPeer(origin, PeerSet::Collation)) + ctx.send_message(NetworkBridgeTxMessage::DisconnectPeer(origin, PeerSet::Collation)) .await; }, CollationSeconded(relay_parent, statement) => { @@ -851,7 +851,7 @@ async fn handle_incoming_request( target: LOG_TARGET, "Dropping incoming request as peer has a request in flight already." ); - ctx.send_message(NetworkBridgeMessage::ReportPeer(req.peer, COST_APPARENT_FLOOD)) + ctx.send_message(NetworkBridgeTxMessage::ReportPeer(req.peer, COST_APPARENT_FLOOD)) .await; return Ok(()) } diff --git a/node/network/collator-protocol/src/collator_side/tests.rs b/node/network/collator-protocol/src/collator_side/tests.rs index 41cabb39a600..4d95b7c807e2 100644 --- a/node/network/collator-protocol/src/collator_side/tests.rs +++ b/node/network/collator-protocol/src/collator_side/tests.rs @@ -369,8 +369,8 @@ async fn distribute_collation( if should_connect { assert_matches!( overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ConnectToValidators { + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ConnectToValidators { .. } ) => {} @@ -424,8 +424,8 @@ async fn expect_declare_msg( ) { assert_matches!( overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendCollationMessage( to, Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)), ) @@ -458,8 +458,8 @@ async fn expect_advertise_collation_msg( ) { assert_matches!( overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendCollationMessage( to, Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)), ) @@ -570,7 +570,7 @@ fn advertise_and_send_collation() { .unwrap(); assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(bad_peer, _)) => { + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(bad_peer, _)) => { assert_eq!(bad_peer, peer); } ); @@ -838,7 +838,7 @@ fn collators_reject_declare_messages() { assert_matches!( overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::DisconnectPeer( p, PeerSet::Collation, )) if p == peer diff --git a/node/network/collator-protocol/src/lib.rs b/node/network/collator-protocol/src/lib.rs index bdf8904b7d07..66659e4b5bee 100644 --- a/node/network/collator-protocol/src/lib.rs +++ b/node/network/collator-protocol/src/lib.rs @@ -34,7 +34,7 @@ use polkadot_node_network_protocol::{ use polkadot_primitives::v2::CollatorPair; use polkadot_node_subsystem::{ - errors::SubsystemError, messages::NetworkBridgeMessage, overseer, SpawnedSubsystem, + errors::SubsystemError, messages::NetworkBridgeTxMessage, overseer, SpawnedSubsystem, }; mod error; @@ -132,5 +132,5 @@ async fn modify_reputation( "reputation change for peer", ); - sender.send_message(NetworkBridgeMessage::ReportPeer(peer, rep)).await; + sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await; } diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index 592feaf9124a..997ad62a9804 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -48,7 +48,7 @@ use polkadot_node_subsystem::{ jaeger, messages::{ CandidateBackingMessage, CollatorProtocolMessage, IfDisconnected, NetworkBridgeEvent, - NetworkBridgeMessage, RuntimeApiMessage, + NetworkBridgeTxMessage, RuntimeApiMessage, }, overseer, FromOrchestra, OverseerSignal, PerLeafSpan, SubsystemSender, }; @@ -632,7 +632,7 @@ fn collator_peer_id( async fn disconnect_peer(sender: &mut impl overseer::CollatorProtocolSenderTrait, peer_id: PeerId) { sender - .send_message(NetworkBridgeMessage::DisconnectPeer(peer_id, PeerSet::Collation)) + .send_message(NetworkBridgeTxMessage::DisconnectPeer(peer_id, PeerSet::Collation)) .await } @@ -712,7 +712,7 @@ async fn notify_collation_seconded( let wire_message = protocol_v1::CollatorProtocolMessage::CollationSeconded(relay_parent, statement.into()); sender - .send_message(NetworkBridgeMessage::SendCollationMessage( + .send_message(NetworkBridgeTxMessage::SendCollationMessage( vec![peer_id], Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)), )) @@ -800,7 +800,7 @@ async fn request_collation( ); sender - .send_message(NetworkBridgeMessage::SendRequests( + .send_message(NetworkBridgeTxMessage::SendRequests( vec![requests], IfDisconnected::ImmediateError, )) diff --git a/node/network/collator-protocol/src/validator_side/tests.rs b/node/network/collator-protocol/src/validator_side/tests.rs index 77c209361422..cfb8b967bb34 100644 --- a/node/network/collator-protocol/src/validator_side/tests.rs +++ b/node/network/collator-protocol/src/validator_side/tests.rs @@ -260,7 +260,7 @@ async fn assert_candidate_backing_second( async fn assert_collator_disconnect(virtual_overseer: &mut VirtualOverseer, expected_peer: PeerId) { assert_matches!( overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::DisconnectPeer( peer, peer_set, )) => { @@ -278,7 +278,7 @@ async fn assert_fetch_collation_request( ) -> ResponseSender { assert_matches!( overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests(reqs, IfDisconnected::ImmediateError) ) => { let req = reqs.into_iter().next() .expect("There should be exactly one request"); @@ -431,8 +431,8 @@ fn collator_reporting_works() { assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep), + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(peer, rep), ) => { assert_eq!(peer, peer_b); assert_eq!(rep, COST_REPORT_BAD); @@ -481,8 +481,8 @@ fn collator_authentication_verification_works() { // it should be reported for sending a message with an invalid signature assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep), + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(peer, rep), ) => { assert_eq!(peer, peer_b); assert_eq!(rep, COST_INVALID_SIGNATURE); @@ -697,7 +697,7 @@ fn reject_connection_to_next_group() { assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer( + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer( peer, rep, )) => { @@ -790,7 +790,7 @@ fn fetch_next_collation_on_invalid_collation() { assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer( + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer( peer, rep, )) => { @@ -1005,7 +1005,7 @@ fn disconnect_if_wrong_declare() { assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer( + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer( peer, rep, )) => { diff --git a/node/network/dispute-distribution/src/sender/send_task.rs b/node/network/dispute-distribution/src/sender/send_task.rs index 03ce192697bb..a2b8cdcf7441 100644 --- a/node/network/dispute-distribution/src/sender/send_task.rs +++ b/node/network/dispute-distribution/src/sender/send_task.rs @@ -26,7 +26,7 @@ use polkadot_node_network_protocol::{ }, IfDisconnected, }; -use polkadot_node_subsystem::{messages::NetworkBridgeMessage, overseer}; +use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, overseer}; use polkadot_node_subsystem_util::{metrics, runtime::RuntimeInfo}; use polkadot_primitives::v2::{ AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, ValidatorIndex, @@ -270,7 +270,7 @@ async fn send_requests( statuses.insert(receiver, DeliveryStatus::Pending(remote_handle)); } - let msg = NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError); + let msg = NetworkBridgeTxMessage::SendRequests(reqs, IfDisconnected::ImmediateError); ctx.send_message(msg).await; Ok(statuses) } diff --git a/node/network/dispute-distribution/src/tests/mod.rs b/node/network/dispute-distribution/src/tests/mod.rs index dd9cd1da9420..9c843f3e786b 100644 --- a/node/network/dispute-distribution/src/tests/mod.rs +++ b/node/network/dispute-distribution/src/tests/mod.rs @@ -44,7 +44,7 @@ use polkadot_node_primitives::{CandidateVotes, UncheckedDisputeMessage}; use polkadot_node_subsystem::{ messages::{ AllMessages, DisputeCoordinatorMessage, DisputeDistributionMessage, ImportStatementsResult, - NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest, + NetworkBridgeTxMessage, RuntimeApiMessage, RuntimeApiRequest, }, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, LeafStatus, OverseerSignal, Span, }; @@ -662,8 +662,8 @@ async fn check_sent_requests( // Sends to concerned validators: assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendRequests(reqs, IfDisconnected::ImmediateError) ) => { let reqs: Vec<_> = reqs.into_iter().map(|r| assert_matches!( diff --git a/node/network/gossip-support/src/lib.rs b/node/network/gossip-support/src/lib.rs index 1cebcc64f78e..df90914b6f58 100644 --- a/node/network/gossip-support/src/lib.rs +++ b/node/network/gossip-support/src/lib.rs @@ -22,7 +22,7 @@ //! which limits the amount of messages sent and received //! to be an order of sqrt of the validators. Our neighbors //! in this graph will be forwarded to the network bridge with -//! the `NetworkBridgeMessage::NewGossipTopology` message. +//! the `NetworkBridgeRxMessage::NewGossipTopology` message. use std::{ collections::{HashMap, HashSet}, @@ -45,8 +45,8 @@ use polkadot_node_network_protocol::{ }; use polkadot_node_subsystem::{ messages::{ - GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeMessage, RuntimeApiMessage, - RuntimeApiRequest, + GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeRxMessage, NetworkBridgeTxMessage, + RuntimeApiMessage, RuntimeApiRequest, }, overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, }; @@ -345,7 +345,7 @@ where gum::debug!(target: LOG_TARGET, %num, "Issuing a connection request"); sender - .send_message(NetworkBridgeMessage::ConnectToResolvedValidators { + .send_message(NetworkBridgeTxMessage::ConnectToResolvedValidators { validator_addrs, peer_set: PeerSet::Validation, }) @@ -545,7 +545,7 @@ async fn update_gossip_topology( .collect(); sender - .send_message(NetworkBridgeMessage::NewGossipTopology { + .send_message(NetworkBridgeRxMessage::NewGossipTopology { session: session_index, our_neighbors_x: row_neighbors, our_neighbors_y: column_neighbors, diff --git a/node/network/gossip-support/src/tests.rs b/node/network/gossip-support/src/tests.rs index dbe44ea835d0..831f0aa94342 100644 --- a/node/network/gossip-support/src/tests.rs +++ b/node/network/gossip-support/src/tests.rs @@ -255,7 +255,7 @@ async fn test_neighbors(overseer: &mut VirtualOverseer, expected_session: Sessio assert_matches!( overseer_recv(overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::NewGossipTopology { + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::NewGossipTopology { session: got_session, our_neighbors_x, our_neighbors_y, @@ -313,7 +313,7 @@ fn issues_a_connection_request_on_new_session() { assert_matches!( overseer_recv(overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators { + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { validator_addrs, peer_set, }) => { @@ -392,7 +392,7 @@ fn issues_a_connection_request_on_new_session() { assert_matches!( overseer_recv(overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators { + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { validator_addrs, peer_set, }) => { @@ -451,7 +451,7 @@ fn issues_connection_request_to_past_present_future() { assert_matches!( overseer_recv(overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators { + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { validator_addrs, peer_set, }) => { @@ -553,7 +553,7 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { assert_matches!( overseer_recv(overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators { + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { validator_addrs, peer_set, }) => { @@ -618,7 +618,7 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { assert_matches!( overseer_recv(overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators { + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { validator_addrs, peer_set, }) => { diff --git a/node/network/protocol/src/request_response/outgoing.rs b/node/network/protocol/src/request_response/outgoing.rs index a9353965a48f..b93c4e93cd31 100644 --- a/node/network/protocol/src/request_response/outgoing.rs +++ b/node/network/protocol/src/request_response/outgoing.rs @@ -25,7 +25,7 @@ use polkadot_primitives::v2::AuthorityDiscoveryId; use super::{v1, IsRequest, Protocol}; -/// All requests that can be sent to the network bridge via `NetworkBridgeMessage::SendRequest`. +/// All requests that can be sent to the network bridge via `NetworkBridgeTxMessage::SendRequest`. #[derive(Debug)] pub enum Requests { /// Request an availability chunk from a node. diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 2abb765f392b..38d4022c633b 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -39,7 +39,7 @@ use polkadot_node_subsystem_util::{self as util, rand, MIN_GOSSIP_PEERS}; use polkadot_node_subsystem::{ jaeger, messages::{ - CandidateBackingMessage, NetworkBridgeEvent, NetworkBridgeMessage, + CandidateBackingMessage, NetworkBridgeEvent, NetworkBridgeTxMessage, StatementDistributionMessage, }, overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, PerLeafSpan, SpawnedSubsystem, @@ -1083,7 +1083,7 @@ async fn circulate_statement<'a, Context>( statement = ?stored.statement, "Sending statement", ); - ctx.send_message(NetworkBridgeMessage::SendValidationMessage( + ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( peers_to_send.iter().map(|(p, _)| p.clone()).collect(), payload, )) @@ -1123,8 +1123,11 @@ async fn send_statements_about( statement = ?statement.statement, "Sending statement", ); - ctx.send_message(NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)) - .await; + ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( + vec![peer.clone()], + payload, + )) + .await; metrics.on_statement_distributed(); } @@ -1155,8 +1158,11 @@ async fn send_statements( statement = ?statement.statement, "Sending statement" ); - ctx.send_message(NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)) - .await; + ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( + vec![peer.clone()], + payload, + )) + .await; metrics.on_statement_distributed(); } @@ -1167,7 +1173,7 @@ async fn report_peer( peer: PeerId, rep: Rep, ) { - sender.send_message(NetworkBridgeMessage::ReportPeer(peer, rep)).await + sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await } /// If message contains a statement, then retrieve it, otherwise fork task to fetch it. @@ -1926,7 +1932,7 @@ impl StatementDistributionSubsystem { } }, RequesterMessage::SendRequest(req) => { - ctx.send_message(NetworkBridgeMessage::SendRequests( + ctx.send_message(NetworkBridgeTxMessage::SendRequests( vec![req], IfDisconnected::ImmediateError, )) diff --git a/node/network/statement-distribution/src/tests.rs b/node/network/statement-distribution/src/tests.rs index 9f5b4f6de326..49a6e211bbd6 100644 --- a/node/network/statement-distribution/src/tests.rs +++ b/node/network/statement-distribution/src/tests.rs @@ -33,7 +33,7 @@ use polkadot_node_subsystem::{ ActivatedLeaf, LeafStatus, }; use polkadot_node_subsystem_test_helpers::mock::make_ferdie_keystore; -use polkadot_primitives::v2::{Hash, SessionInfo, ValidationCode}; +use polkadot_primitives::v2::{Hash, Id as ParaId, SessionInfo, ValidationCode}; use polkadot_primitives_test_helpers::{ dummy_committed_candidate_receipt, dummy_hash, AlwaysZeroRng, }; @@ -383,7 +383,7 @@ fn peer_view_update_sends_messages() { let candidate = { let mut c = dummy_committed_candidate_receipt(dummy_hash()); c.descriptor.relay_parent = hash_c; - c.descriptor.para_id = 1.into(); + c.descriptor.para_id = ParaId::from(1_u32); c }; let candidate_hash = candidate.hash(); @@ -549,7 +549,7 @@ fn peer_view_update_sends_messages() { assert_matches!( message, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( to, payload, )) => { @@ -570,7 +570,7 @@ fn circulated_statement_goes_to_all_peers_with_view() { let candidate = { let mut c = dummy_committed_candidate_receipt(dummy_hash()); c.descriptor.relay_parent = hash_b; - c.descriptor.para_id = 1.into(); + c.descriptor.para_id = ParaId::from(1_u32); c }; @@ -680,7 +680,7 @@ fn circulated_statement_goes_to_all_peers_with_view() { let message = handle.recv().await; assert_matches!( message, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( to, payload, )) => { @@ -847,8 +847,8 @@ fn receiving_from_one_sends_to_another_and_to_candidate_backing() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(p, r) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(p, r) ) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST => {} ); @@ -861,8 +861,8 @@ fn receiving_from_one_sends_to_another_and_to_candidate_backing() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendValidationMessage( recipients, Versioned::V1(protocol_v1::ValidationProtocol::StatementDistribution( protocol_v1::StatementDistributionMessage::Statement(r, s) @@ -1090,8 +1090,8 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendRequests( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendRequests( mut reqs, IfDisconnected::ImmediateError ) ) => { @@ -1143,8 +1143,8 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( // Let c fail once too: assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendRequests( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendRequests( mut reqs, IfDisconnected::ImmediateError ) ) => { @@ -1163,8 +1163,8 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( // a fails again: assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendRequests( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendRequests( mut reqs, IfDisconnected::ImmediateError ) ) => { @@ -1184,8 +1184,8 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( // Send invalid response (all other peers have been tried now): assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendRequests( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendRequests( mut reqs, IfDisconnected::ImmediateError ) ) => { @@ -1211,16 +1211,16 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( // Should get punished and never tried again: assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(p, r) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(p, r) ) if p == peer_bad && r == COST_WRONG_HASH => {} ); // a is tried again (retried in reverse order): assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendRequests( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendRequests( mut reqs, IfDisconnected::ImmediateError ) ) => { @@ -1240,8 +1240,8 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( // c succeeds now: assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendRequests( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendRequests( mut reqs, IfDisconnected::ImmediateError ) ) => { @@ -1262,22 +1262,22 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(p, r) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(p, r) ) if p == peer_a && r == COST_FETCH_FAIL => {} ); assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(p, r) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(p, r) ) if p == peer_c && r == BENEFIT_VALID_RESPONSE => {} ); assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(p, r) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(p, r) ) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST => {} ); @@ -1291,8 +1291,8 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( // Now messages should go out: assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendValidationMessage( mut recipients, Versioned::V1(protocol_v1::ValidationProtocol::StatementDistribution( protocol_v1::StatementDistributionMessage::LargeStatement(meta) @@ -1638,8 +1638,8 @@ fn share_prioritizes_backing_group() { // Messages should go out: assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendValidationMessage( mut recipients, Versioned::V1(protocol_v1::ValidationProtocol::StatementDistribution( protocol_v1::StatementDistributionMessage::LargeStatement(meta) @@ -1843,7 +1843,7 @@ fn peer_cant_flood_with_large_statements() { let mut punished = false; for _ in 0..2 { match handle.recv().await { - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests( + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests( mut reqs, IfDisconnected::ImmediateError, )) => { @@ -1860,7 +1860,7 @@ fn peer_cant_flood_with_large_statements() { requested = true; }, - AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(p, r)) + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(p, r)) if p == peer_a && r == COST_APPARENT_FLOOD => { punished = true; @@ -2088,8 +2088,8 @@ fn handle_multiple_seconded_statements() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(p, r) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(p, r) ) => { assert_eq!(p, peer_a); assert_eq!(r, BENEFIT_VALID_STATEMENT_FIRST); @@ -2109,8 +2109,8 @@ fn handle_multiple_seconded_statements() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendValidationMessage( recipients, Versioned::V1(protocol_v1::ValidationProtocol::StatementDistribution( protocol_v1::StatementDistributionMessage::Statement(r, s) @@ -2140,8 +2140,8 @@ fn handle_multiple_seconded_statements() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(p, r) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(p, r) ) => { assert_eq!(p, peer_b); assert_eq!(r, BENEFIT_VALID_STATEMENT); @@ -2191,8 +2191,8 @@ fn handle_multiple_seconded_statements() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(p, r) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(p, r) ) => { assert_eq!(p, peer_a); assert_eq!(r, BENEFIT_VALID_STATEMENT_FIRST); @@ -2211,8 +2211,8 @@ fn handle_multiple_seconded_statements() { assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendValidationMessage( + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::SendValidationMessage( recipients, Versioned::V1(protocol_v1::ValidationProtocol::StatementDistribution( protocol_v1::StatementDistributionMessage::Statement(r, s) @@ -2244,8 +2244,8 @@ fn handle_multiple_seconded_statements() { // the first when sending `Seconded` assert_matches!( handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(p, r) + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(p, r) ) => { assert_eq!(p, peer_b); assert_eq!(r, BENEFIT_VALID_STATEMENT); diff --git a/node/overseer/src/dummy.rs b/node/overseer/src/dummy.rs index 0b34339dfe7d..84ecdd1e8a89 100644 --- a/node/overseer/src/dummy.rs +++ b/node/overseer/src/dummy.rs @@ -85,6 +85,7 @@ pub fn dummy_overseer_builder<'a, Spawner, SupportsParachains>( DummySubsystem, DummySubsystem, DummySubsystem, + DummySubsystem, >, SubsystemError, > @@ -126,6 +127,7 @@ pub fn one_for_all_overseer_builder<'a, Spawner, SupportsParachains, Sub>( Sub, Sub, Sub, + Sub, >, SubsystemError, > @@ -143,7 +145,8 @@ where + Subsystem, SubsystemError> + Subsystem, SubsystemError> + Subsystem, SubsystemError> - + Subsystem, SubsystemError> + + Subsystem, SubsystemError> + + Subsystem, SubsystemError> + Subsystem, SubsystemError> + Subsystem, SubsystemError> + Subsystem, SubsystemError> @@ -169,7 +172,8 @@ where .chain_api(subsystem.clone()) .collation_generation(subsystem.clone()) .collator_protocol(subsystem.clone()) - .network_bridge(subsystem.clone()) + .network_bridge_tx(subsystem.clone()) + .network_bridge_rx(subsystem.clone()) .provisioner(subsystem.clone()) .runtime_api(subsystem.clone()) .statement_distribution(subsystem.clone()) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 8c38f8a8299f..fefcc4dba0ce 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -83,8 +83,8 @@ use polkadot_node_subsystem_types::messages::{ BitfieldSigningMessage, CandidateBackingMessage, CandidateValidationMessage, ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage, DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage, - NetworkBridgeMessage, ProvisionerMessage, PvfCheckerMessage, RuntimeApiMessage, - StatementDistributionMessage, + NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProvisionerMessage, PvfCheckerMessage, + RuntimeApiMessage, StatementDistributionMessage, }; pub use polkadot_node_subsystem_types::{ errors::{SubsystemError, SubsystemResult}, @@ -108,9 +108,9 @@ use parity_util_mem::MemoryAllocationTracker; pub use orchestra as gen; pub use orchestra::{ contextbounds, orchestra, subsystem, FromOrchestra, MapSubsystem, MessagePacket, - SignalsReceived, Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, - SubsystemInstance, SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, - ToOrchestra, + OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem, SubsystemContext, + SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts, SubsystemMeters, + SubsystemSender, TimeoutExt, ToOrchestra, }; /// Store 2 days worth of blocks, not accounting for forks, @@ -389,7 +389,7 @@ pub async fn forward_events>(client: Arc

, mut hand /// # }; /// # use polkadot_node_subsystem_types::messages::{ /// # CandidateValidationMessage, CandidateBackingMessage, -/// # NetworkBridgeMessage, +/// # NetworkBridgeTxMessage, /// # }; /// /// struct ValidationSubsystem; @@ -477,7 +477,7 @@ pub struct Overseer { candidate_backing: CandidateBacking, #[subsystem(StatementDistributionMessage, sends: [ - NetworkBridgeMessage, + NetworkBridgeTxMessage, CandidateBackingMessage, RuntimeApiMessage, ])] @@ -488,12 +488,12 @@ pub struct Overseer { AvailabilityRecoveryMessage, ChainApiMessage, RuntimeApiMessage, - NetworkBridgeMessage, + NetworkBridgeTxMessage, ])] availability_distribution: AvailabilityDistribution, #[subsystem(AvailabilityRecoveryMessage, sends: [ - NetworkBridgeMessage, + NetworkBridgeTxMessage, RuntimeApiMessage, AvailabilityStoreMessage, ])] @@ -508,7 +508,7 @@ pub struct Overseer { #[subsystem(BitfieldDistributionMessage, sends: [ RuntimeApiMessage, - NetworkBridgeMessage, + NetworkBridgeTxMessage, ProvisionerMessage, ])] bitfield_distribution: BitfieldDistribution, @@ -530,7 +530,7 @@ pub struct Overseer { ])] availability_store: AvailabilityStore, - #[subsystem(NetworkBridgeMessage, sends: [ + #[subsystem(NetworkBridgeRxMessage, sends: [ BitfieldDistributionMessage, StatementDistributionMessage, ApprovalDistributionMessage, @@ -539,7 +539,10 @@ pub struct Overseer { CollationGenerationMessage, CollatorProtocolMessage, ])] - network_bridge: NetworkBridge, + network_bridge_rx: NetworkBridgeRx, + + #[subsystem(NetworkBridgeTxMessage, sends: [])] + network_bridge_tx: NetworkBridgeTx, #[subsystem(blocking, ChainApiMessage, sends: [])] chain_api: ChainApi, @@ -551,14 +554,14 @@ pub struct Overseer { collation_generation: CollationGeneration, #[subsystem(CollatorProtocolMessage, sends: [ - NetworkBridgeMessage, + NetworkBridgeTxMessage, RuntimeApiMessage, CandidateBackingMessage, ])] collator_protocol: CollatorProtocol, #[subsystem(ApprovalDistributionMessage, sends: [ - NetworkBridgeMessage, + NetworkBridgeTxMessage, ApprovalVotingMessage, ])] approval_distribution: ApprovalDistribution, @@ -575,7 +578,8 @@ pub struct Overseer { approval_voting: ApprovalVoting, #[subsystem(GossipSupportMessage, sends: [ - NetworkBridgeMessage, + NetworkBridgeTxMessage, + NetworkBridgeRxMessage, // TODO RuntimeApiMessage, ChainSelectionMessage, ])] @@ -594,7 +598,7 @@ pub struct Overseer { #[subsystem(DisputeDistributionMessage, sends: [ RuntimeApiMessage, DisputeCoordinatorMessage, - NetworkBridgeMessage, + NetworkBridgeTxMessage, ])] dispute_distribution: DisputeDistribution, diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index a3f304466626..25a6d07e3934 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -29,7 +29,7 @@ use polkadot_node_subsystem_types::{ ActivatedLeaf, LeafStatus, }; use polkadot_primitives::v2::{ - CandidateHash, CandidateReceipt, CollatorPair, InvalidDisputeStatementKind, + CandidateHash, CandidateReceipt, CollatorPair, InvalidDisputeStatementKind, SessionIndex, ValidDisputeStatementKind, ValidatorIndex, }; @@ -864,8 +864,16 @@ fn test_availability_store_msg() -> AvailabilityStoreMessage { AvailabilityStoreMessage::QueryAvailableData(CandidateHash(Default::default()), sender) } -fn test_network_bridge_msg() -> NetworkBridgeMessage { - NetworkBridgeMessage::ReportPeer(PeerId::random(), UnifiedReputationChange::BenefitMinor("")) +fn test_network_bridge_tx_msg() -> NetworkBridgeTxMessage { + NetworkBridgeTxMessage::ReportPeer(PeerId::random(), UnifiedReputationChange::BenefitMinor("")) +} + +fn test_network_bridge_rx_msg() -> NetworkBridgeRxMessage { + NetworkBridgeRxMessage::NewGossipTopology { + session: SessionIndex::from(0_u32), + our_neighbors_x: HashMap::new(), + our_neighbors_y: HashMap::new(), + } } fn test_approval_distribution_msg() -> ApprovalDistributionMessage { @@ -913,7 +921,7 @@ fn test_chain_selection_msg() -> ChainSelectionMessage { // Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly. #[test] fn overseer_all_subsystems_receive_signals_and_messages() { - const NUM_SUBSYSTEMS: usize = 21; + const NUM_SUBSYSTEMS: usize = 22; // -4 for BitfieldSigning, GossipSupport, AvailabilityDistribution and PvfCheckerSubsystem. const NUM_SUBSYSTEMS_MESSAGED: usize = NUM_SUBSYSTEMS - 4; @@ -980,7 +988,10 @@ fn overseer_all_subsystems_receive_signals_and_messages() { .send_msg_anon(AllMessages::AvailabilityStore(test_availability_store_msg())) .await; handle - .send_msg_anon(AllMessages::NetworkBridge(test_network_bridge_msg())) + .send_msg_anon(AllMessages::NetworkBridgeTx(test_network_bridge_tx_msg())) + .await; + handle + .send_msg_anon(AllMessages::NetworkBridgeRx(test_network_bridge_rx_msg())) .await; handle.send_msg_anon(AllMessages::ChainApi(test_chain_api_msg())).await; handle @@ -1042,7 +1053,8 @@ fn context_holds_onto_message_until_enough_signals_received() { let (provisioner_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (runtime_api_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (availability_store_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); - let (network_bridge_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (network_bridge_rx_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (network_bridge_tx_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (chain_api_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (collator_protocol_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (collation_generation_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); @@ -1064,7 +1076,8 @@ fn context_holds_onto_message_until_enough_signals_received() { let (provisioner_unbounded_tx, _) = metered::unbounded(); let (runtime_api_unbounded_tx, _) = metered::unbounded(); let (availability_store_unbounded_tx, _) = metered::unbounded(); - let (network_bridge_unbounded_tx, _) = metered::unbounded(); + let (network_bridge_tx_unbounded_tx, _) = metered::unbounded(); + let (network_bridge_rx_unbounded_tx, _) = metered::unbounded(); let (chain_api_unbounded_tx, _) = metered::unbounded(); let (collator_protocol_unbounded_tx, _) = metered::unbounded(); let (collation_generation_unbounded_tx, _) = metered::unbounded(); @@ -1087,7 +1100,8 @@ fn context_holds_onto_message_until_enough_signals_received() { provisioner: provisioner_bounded_tx.clone(), runtime_api: runtime_api_bounded_tx.clone(), availability_store: availability_store_bounded_tx.clone(), - network_bridge: network_bridge_bounded_tx.clone(), + network_bridge_tx: network_bridge_tx_bounded_tx.clone(), + network_bridge_rx: network_bridge_rx_bounded_tx.clone(), chain_api: chain_api_bounded_tx.clone(), collator_protocol: collator_protocol_bounded_tx.clone(), collation_generation: collation_generation_bounded_tx.clone(), @@ -1109,7 +1123,8 @@ fn context_holds_onto_message_until_enough_signals_received() { provisioner_unbounded: provisioner_unbounded_tx.clone(), runtime_api_unbounded: runtime_api_unbounded_tx.clone(), availability_store_unbounded: availability_store_unbounded_tx.clone(), - network_bridge_unbounded: network_bridge_unbounded_tx.clone(), + network_bridge_tx_unbounded: network_bridge_tx_unbounded_tx.clone(), + network_bridge_rx_unbounded: network_bridge_rx_unbounded_tx.clone(), chain_api_unbounded: chain_api_unbounded_tx.clone(), collator_protocol_unbounded: collator_protocol_unbounded_tx.clone(), collation_generation_unbounded: collation_generation_unbounded_tx.clone(), diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs index c1f2352dd622..efc36de15423 100644 --- a/node/service/src/overseer.rs +++ b/node/service/src/overseer.rs @@ -51,7 +51,10 @@ pub use polkadot_availability_recovery::AvailabilityRecoverySubsystem; pub use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide}; pub use polkadot_dispute_distribution::DisputeDistributionSubsystem; pub use polkadot_gossip_support::GossipSupport as GossipSupportSubsystem; -pub use polkadot_network_bridge::NetworkBridge as NetworkBridgeSubsystem; +pub use polkadot_network_bridge::{ + Metrics as NetworkBridgeMetrics, NetworkBridgeRx as NetworkBridgeRxSubsystem, + NetworkBridgeTx as NetworkBridgeTxSubsystem, +}; pub use polkadot_node_collation_generation::CollationGenerationSubsystem; pub use polkadot_node_core_approval_voting::ApprovalVotingSubsystem; pub use polkadot_node_core_av_store::AvailabilityStoreSubsystem; @@ -158,7 +161,11 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>( ProvisionerSubsystem, RuntimeApiSubsystem, AvailabilityStoreSubsystem, - NetworkBridgeSubsystem< + NetworkBridgeRxSubsystem< + Arc>, + AuthorityDiscoveryService, + >, + NetworkBridgeTxSubsystem< Arc>, AuthorityDiscoveryService, >, @@ -185,7 +192,19 @@ where let spawner = SpawnGlue(spawner); + let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?; let builder = Overseer::builder() + .network_bridge_tx(NetworkBridgeTxSubsystem::new( + network_service.clone(), + authority_discovery_service.clone(), + network_bridge_metrics.clone(), + )) + .network_bridge_rx(NetworkBridgeRxSubsystem::new( + network_service.clone(), + authority_discovery_service.clone(), + Box::new(network_service.clone()), + network_bridge_metrics, + )) .availability_distribution(AvailabilityDistributionSubsystem::new( keystore.clone(), IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver }, @@ -237,12 +256,6 @@ where }; CollatorProtocolSubsystem::new(side) }) - .network_bridge(NetworkBridgeSubsystem::new( - network_service.clone(), - authority_discovery_service.clone(), - Box::new(network_service.clone()), - Metrics::register(registry)?, - )) .provisioner(ProvisionerSubsystem::new(Metrics::register(registry)?)) .runtime_api(RuntimeApiSubsystem::new( runtime_client.clone(), diff --git a/node/subsystem-test-helpers/src/lib.rs b/node/subsystem-test-helpers/src/lib.rs index 6ea0caabaddc..cc263266366e 100644 --- a/node/subsystem-test-helpers/src/lib.rs +++ b/node/subsystem-test-helpers/src/lib.rs @@ -245,20 +245,37 @@ pub struct TestSubsystemContextHandle { } impl TestSubsystemContextHandle { + /// Fallback timeout value used to never block test execution + /// indefinitely. + pub const TIMEOUT: Duration = Duration::from_secs(120); + /// Send a message or signal to the subsystem. This resolves at the point in time when the /// subsystem has _read_ the message. pub async fn send(&mut self, from_overseer: FromOrchestra) { - self.tx.send(from_overseer).await.expect("Test subsystem no longer live"); + self.tx + .send(from_overseer) + .timeout(Self::TIMEOUT) + .await + .expect("`fn send` does not timeout") + .expect("Test subsystem no longer live"); } /// Receive the next message from the subsystem. pub async fn recv(&mut self) -> AllMessages { - self.try_recv().await.expect("Test subsystem no longer live") + self.try_recv() + .timeout(Self::TIMEOUT) + .await + .expect("`fn recv` does not timeout") + .expect("Test subsystem no longer live") } /// Receive the next message from the subsystem, or `None` if the channel has been closed. pub async fn try_recv(&mut self) -> Option { - self.rx.next().await + self.rx + .next() + .timeout(Self::TIMEOUT) + .await + .expect("`try_recv` does not timeout") } } diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index db74ab11cd4d..b79785ce406f 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -318,9 +318,36 @@ pub enum DisputeDistributionMessage { SendDispute(DisputeMessage), } -/// Messages received by the network bridge subsystem. +/// Messages received from other subsystems. #[derive(Debug)] -pub enum NetworkBridgeMessage { +pub enum NetworkBridgeRxMessage { + /// Inform the distribution subsystems about the new + /// gossip network topology formed. + /// + /// The only reason to have this here, is the availability of the + /// authority discovery service, otherwise, the `GossipSupport` + /// subsystem would make more sense. + NewGossipTopology { + /// The session info this gossip topology is concerned with. + session: SessionIndex, + /// Ids of our neighbors in the X dimensions of the new gossip topology, + /// along with their validator indices within the session. + /// + /// We're not necessarily connected to all of them, but we should + /// try to be. + our_neighbors_x: HashMap, + /// Ids of our neighbors in the X dimensions of the new gossip topology, + /// along with their validator indices within the session. + /// + /// We're not necessarily connected to all of them, but we should + /// try to be. + our_neighbors_y: HashMap, + }, +} + +/// Messages received from other subsystems by the network bridge subsystem. +#[derive(Debug)] +pub enum NetworkBridgeTxMessage { /// Report a peer for their actions. ReportPeer(PeerId, UnifiedReputationChange), @@ -375,27 +402,9 @@ pub enum NetworkBridgeMessage { /// The peer set we want the connection on. peer_set: PeerSet, }, - /// Inform the distribution subsystems about the new - /// gossip network topology formed. - NewGossipTopology { - /// The session info this gossip topology is concerned with. - session: SessionIndex, - /// Ids of our neighbors in the X dimensions of the new gossip topology, - /// along with their validator indices within the session. - /// - /// We're not necessarily connected to all of them, but we should - /// try to be. - our_neighbors_x: HashMap, - /// Ids of our neighbors in the X dimensions of the new gossip topology, - /// along with their validator indices within the session. - /// - /// We're not necessarily connected to all of them, but we should - /// try to be. - our_neighbors_y: HashMap, - }, } -impl NetworkBridgeMessage { +impl NetworkBridgeTxMessage { /// If the current variant contains the relay parent hash, return it. pub fn relay_parent(&self) -> Option { match self { @@ -408,7 +417,6 @@ impl NetworkBridgeMessage { Self::ConnectToValidators { .. } => None, Self::ConnectToResolvedValidators { .. } => None, Self::SendRequests { .. } => None, - Self::NewGossipTopology { .. } => None, } } } diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs index ba8a62d08118..dd9282b50fe5 100644 --- a/node/subsystem-util/src/rolling_session_window.rs +++ b/node/subsystem-util/src/rolling_session_window.rs @@ -28,16 +28,18 @@ use polkadot_node_subsystem::{ messages::{RuntimeApiMessage, RuntimeApiRequest}, overseer, }; -use thiserror::Error; /// Sessions unavailable in state to cache. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, thiserror::Error)] pub enum SessionsUnavailableReason { /// Runtime API subsystem was unavailable. - RuntimeApiUnavailable(oneshot::Canceled), + #[error(transparent)] + RuntimeApiUnavailable(#[from] oneshot::Canceled), /// The runtime API itself returned an error. - RuntimeApi(RuntimeApiError), + #[error(transparent)] + RuntimeApi(#[from] RuntimeApiError), /// Missing session info from runtime API for given `SessionIndex`. + #[error("Missing session index {0:?}")] Missing(SessionIndex), } @@ -53,20 +55,16 @@ pub struct SessionsUnavailableInfo { } /// Sessions were unavailable to fetch from the state for some reason. -#[derive(Debug, Error, Clone)] +#[derive(Debug, thiserror::Error, Clone)] +#[error("Sessions unavailable: {kind:?}, info: {info:?}")] pub struct SessionsUnavailable { /// The error kind. + #[source] kind: SessionsUnavailableReason, /// The info about the session window, if any. info: Option, } -impl core::fmt::Display for SessionsUnavailable { - fn fmt(&self, f: &mut core::fmt::Formatter) -> Result<(), core::fmt::Error> { - write!(f, "Sessions unavailable: {:?}, info: {:?}", self.kind, self.info) - } -} - /// An indicated update of the rolling session window. #[derive(Debug, PartialEq, Clone)] pub enum SessionWindowUpdate {