diff --git a/Cargo.lock b/Cargo.lock
index 8ba8bad1f6b1..aecb54a7ff16 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5236,6 +5236,7 @@ dependencies = [
"polkadot-node-primitives",
"polkadot-primitives",
"sc-network",
+ "strum 0.20.0",
]
[[package]]
@@ -8402,7 +8403,7 @@ dependencies = [
"lazy_static",
"sp-core",
"sp-runtime",
- "strum",
+ "strum 0.16.0",
]
[[package]]
@@ -8792,7 +8793,16 @@ version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6138f8f88a16d90134763314e3fc76fa3ed6a7db4725d6acf9a3ef95a3188d22"
dependencies = [
- "strum_macros",
+ "strum_macros 0.16.0",
+]
+
+[[package]]
+name = "strum"
+version = "0.20.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7318c509b5ba57f18533982607f24070a55d353e90d4cae30c467cdb2ad5ac5c"
+dependencies = [
+ "strum_macros 0.20.1",
]
[[package]]
@@ -8807,6 +8817,18 @@ dependencies = [
"syn 1.0.58",
]
+[[package]]
+name = "strum_macros"
+version = "0.20.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ee8bc6b87a5112aeeab1f4a9f7ab634fe6cbefc4850006df31267f4cfb9e3149"
+dependencies = [
+ "heck",
+ "proc-macro2 1.0.24",
+ "quote 1.0.7",
+ "syn 1.0.58",
+]
+
[[package]]
name = "substrate-bip39"
version = "0.4.2"
diff --git a/node/network/bridge/src/action.rs b/node/network/bridge/src/action.rs
new file mode 100644
index 000000000000..27cbeefbbd2a
--- /dev/null
+++ b/node/network/bridge/src/action.rs
@@ -0,0 +1,184 @@
+// Copyright 2020-2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+//
+
+use futures::channel::mpsc;
+
+use parity_scale_codec::Decode;
+use polkadot_node_network_protocol::{
+ peer_set::PeerSet, v1 as protocol_v1, PeerId, ReputationChange,
+};
+use polkadot_primitives::v1::{AuthorityDiscoveryId, BlockNumber};
+use polkadot_subsystem::messages::NetworkBridgeMessage;
+use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal};
+use sc_network::Event as NetworkEvent;
+
+use polkadot_node_network_protocol::ObservedRole;
+
+use super::{WireMessage, LOG_TARGET, MALFORMED_MESSAGE_COST};
+
+/// Internal type combining all actions a `NetworkBridge` might perform.
+///
+/// Both messages coming from the network (`NetworkEvent`) and messages coming from other
+/// subsystems (`FromOverseer`) will be converted to `Action` in `run_network` before being
+/// processed.
+#[derive(Debug)]
+pub(crate) enum Action {
+ /// Ask network to send a validation message.
+ SendValidationMessages(Vec<(Vec, protocol_v1::ValidationProtocol)>),
+
+ /// Ask network to send a collation message.
+ SendCollationMessages(Vec<(Vec, protocol_v1::CollationProtocol)>),
+
+ /// Ask network to connect to validators.
+ ConnectToValidators {
+ validator_ids: Vec,
+ connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
+ },
+
+ /// Report a peer to the network implementation (decreasing/increasing its reputation).
+ ReportPeer(PeerId, ReputationChange),
+
+ /// A subsystem updates us on the relay chain leaves we consider active.
+ ///
+ /// Implementation will send `WireMessage::ViewUpdate` message to peers as appropriate to the
+ /// change.
+ ActiveLeaves(ActiveLeavesUpdate),
+
+ /// A subsystem updates our view on the latest finalized block.
+ ///
+ /// This information is used for view updates, see also `ActiveLeaves`.
+ BlockFinalized(BlockNumber),
+
+ /// Network tells us about a new peer.
+ PeerConnected(PeerSet, PeerId, ObservedRole),
+
+ /// Network tells us about a peer that left.
+ PeerDisconnected(PeerSet, PeerId),
+
+ /// Messages from the network targeted to other subsystems.
+ PeerMessages(
+ PeerId,
+ Vec>,
+ Vec>,
+ ),
+
+ Abort,
+ Nop,
+}
+
+impl From>> for Action {
+ #[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
+ fn from(res: polkadot_subsystem::SubsystemResult>) -> Self {
+ match res {
+ Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves))) => {
+ Action::ActiveLeaves(active_leaves)
+ }
+ Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number))) => {
+ Action::BlockFinalized(number)
+ }
+ Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort,
+ Ok(FromOverseer::Communication { msg }) => match msg {
+ NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
+ NetworkBridgeMessage::SendValidationMessage(peers, msg) => {
+ Action::SendValidationMessages(vec![(peers, msg)])
+ }
+ NetworkBridgeMessage::SendCollationMessage(peers, msg) => {
+ Action::SendCollationMessages(vec![(peers, msg)])
+ }
+ NetworkBridgeMessage::SendValidationMessages(msgs) => {
+ Action::SendValidationMessages(msgs)
+ }
+ NetworkBridgeMessage::SendCollationMessages(msgs) => {
+ Action::SendCollationMessages(msgs)
+ }
+ NetworkBridgeMessage::ConnectToValidators {
+ validator_ids,
+ connected,
+ } => Action::ConnectToValidators {
+ validator_ids,
+ connected,
+ },
+ },
+ Err(e) => {
+ tracing::warn!(target: LOG_TARGET, err = ?e, "Shutting down Network Bridge due to error");
+ Action::Abort
+ }
+ }
+ }
+}
+
+impl From> for Action {
+ #[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
+ fn from(event: Option) -> Action {
+ match event {
+ None => {
+ tracing::info!(
+ target: LOG_TARGET,
+ "Shutting down Network Bridge: underlying event stream concluded"
+ );
+ Action::Abort
+ }
+ Some(NetworkEvent::Dht(_))
+ | Some(NetworkEvent::SyncConnected { .. })
+ | Some(NetworkEvent::SyncDisconnected { .. }) => Action::Nop,
+ Some(NetworkEvent::NotificationStreamOpened {
+ remote,
+ protocol,
+ role,
+ }) => {
+ let role = role.into();
+ PeerSet::try_from_protocol_name(&protocol).map_or(Action::Nop, |peer_set| {
+ Action::PeerConnected(peer_set, remote, role)
+ })
+ }
+ Some(NetworkEvent::NotificationStreamClosed { remote, protocol }) => {
+ PeerSet::try_from_protocol_name(&protocol).map_or(Action::Nop, |peer_set| {
+ Action::PeerDisconnected(peer_set, remote)
+ })
+ }
+ Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
+ let v_messages: Result, _> = messages
+ .iter()
+ .filter(|(protocol, _)| protocol == &PeerSet::Validation.into_protocol_name())
+ .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
+ .collect();
+
+ let v_messages = match v_messages {
+ Err(_) => return Action::ReportPeer(remote, MALFORMED_MESSAGE_COST),
+ Ok(v) => v,
+ };
+
+ let c_messages: Result, _> = messages
+ .iter()
+ .filter(|(protocol, _)| protocol == &PeerSet::Collation.into_protocol_name())
+ .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
+ .collect();
+
+ match c_messages {
+ Err(_) => Action::ReportPeer(remote, MALFORMED_MESSAGE_COST),
+ Ok(c_messages) => {
+ if v_messages.is_empty() && c_messages.is_empty() {
+ Action::Nop
+ } else {
+ Action::PeerMessages(remote, v_messages, c_messages)
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs
index a0159479122e..cb10790f33cf 100644
--- a/node/network/bridge/src/lib.rs
+++ b/node/network/bridge/src/lib.rs
@@ -22,14 +22,9 @@
use parity_scale_codec::{Encode, Decode};
use futures::prelude::*;
-use futures::future::BoxFuture;
-use futures::stream::BoxStream;
-use futures::channel::mpsc;
-
-use sc_network::Event as NetworkEvent;
use polkadot_subsystem::{
- ActiveLeavesUpdate, FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
+ ActiveLeavesUpdate, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
SubsystemResult, JaegerSpan,
};
use polkadot_subsystem::messages::{
@@ -37,27 +32,41 @@ use polkadot_subsystem::messages::{
BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage,
CollatorProtocolMessage,
};
-use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash, BlockNumber};
+use polkadot_primitives::v1::{Hash, BlockNumber};
use polkadot_node_network_protocol::{
- ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1, OurView,
+ ReputationChange, PeerId, peer_set::PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1, OurView,
};
+/// Peer set infos for network initialization.
+///
+/// To be added to [`NetworkConfiguration::extra_sets`].
+pub use polkadot_node_network_protocol::peer_set::peer_sets_info;
+
use std::collections::{HashMap, hash_map};
use std::iter::ExactSizeIterator;
-use std::pin::Pin;
use std::sync::Arc;
mod validator_discovery;
+/// Internally used `Action` type.
+///
+/// All requested `NetworkBridgeMessage` user actions and `NetworkEvent` network messages are
+/// translated to `Action` before being processed by `run_network`.
+mod action;
+use action::Action;
+
+/// Actual interfacing to the network based on the `Network` trait.
+///
+/// Defines the `Network` trait with an implementation for an `Arc`.
+mod network;
+use network::{Network, send_message};
+
+
/// The maximum amount of heads a peer is allowed to have in their view at any time.
///
/// We use the same limit to compute the view sent to peers locally.
const MAX_VIEW_HEADS: usize = 5;
-/// The protocol name for the validation peer-set.
-pub const VALIDATION_PROTOCOL_NAME: &'static str = "/polkadot/validation/1";
-/// The protocol name for the collation peer-set.
-pub const COLLATION_PROTOCOL_NAME: &'static str = "/polkadot/collation/1";
const MALFORMED_MESSAGE_COST: ReputationChange = ReputationChange::new(-500, "Malformed Network-bridge message");
const UNCONNECTED_PEERSET_COST: ReputationChange = ReputationChange::new(-50, "Message sent to un-connected peer-set");
@@ -67,7 +76,9 @@ const EMPTY_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Peer sent
// network bridge log target
const LOG_TARGET: &'static str = "network_bridge";
-/// Messages received on the network.
+/// Messages from and to the network.
+///
+/// As transmitted to and received from subsystems.
#[derive(Debug, Encode, Decode, Clone)]
pub enum WireMessage {
/// A message from a peer on a specific protocol.
@@ -78,136 +89,10 @@ pub enum WireMessage {
ViewUpdate(View),
}
-/// Information about the extra peers set. Should be used during network configuration
-/// to register the protocol with the network service.
-pub fn peers_sets_info() -> Vec {
- vec![
- sc_network::config::NonDefaultSetConfig {
- notifications_protocol: VALIDATION_PROTOCOL_NAME.into(),
- set_config: sc_network::config::SetConfig {
- in_peers: 25,
- out_peers: 0,
- reserved_nodes: Vec::new(),
- non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept,
- },
- },
- sc_network::config::NonDefaultSetConfig {
- notifications_protocol: COLLATION_PROTOCOL_NAME.into(),
- set_config: sc_network::config::SetConfig {
- in_peers: 25,
- out_peers: 0,
- reserved_nodes: Vec::new(),
- non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept,
- },
- }
- ]
-}
-
-/// An action to be carried out by the network.
-#[derive(Debug, PartialEq)]
-pub enum NetworkAction {
- /// Note a change in reputation for a peer.
- ReputationChange(PeerId, ReputationChange),
- /// Write a notification to a given peer on the given peer-set.
- WriteNotification(PeerId, PeerSet, Vec),
-}
-
-/// An abstraction over networking for the purposes of this subsystem.
-pub trait Network: Send + 'static {
- /// Get a stream of all events occurring on the network. This may include events unrelated
- /// to the Polkadot protocol - the user of this function should filter only for events related
- /// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME)
- /// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME)
- fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;
-
- /// Get access to an underlying sink for all network actions.
- fn action_sink<'a>(&'a mut self) -> Pin<
- Box + Send + 'a>
- >;
-
- /// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
- fn report_peer(&mut self, who: PeerId, cost_benefit: ReputationChange)
- -> BoxFuture>
- {
- async move {
- self.action_sink().send(NetworkAction::ReputationChange(who, cost_benefit)).await
- }.boxed()
- }
-
- /// Write a notification to a peer on the given peer-set's protocol.
- fn write_notification(&mut self, who: PeerId, peer_set: PeerSet, message: Vec)
- -> BoxFuture>
- {
- async move {
- self.action_sink().send(NetworkAction::WriteNotification(who, peer_set, message)).await
- }.boxed()
- }
-}
-
-impl Network for Arc> {
- fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
- sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
- }
-
- #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
- fn action_sink<'a>(&'a mut self)
- -> Pin + Send + 'a>>
- {
- use futures::task::{Poll, Context};
-
- // wrapper around a NetworkService to make it act like a sink.
- struct ActionSink<'b>(&'b sc_network::NetworkService);
-
- impl<'b> Sink for ActionSink<'b> {
- type Error = SubsystemError;
-
- fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> {
- Poll::Ready(Ok(()))
- }
-
- fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> {
- match action {
- NetworkAction::ReputationChange(peer, cost_benefit) => {
- tracing::debug!(target: LOG_TARGET, "Changing reputation: {:?} for {}", cost_benefit, peer);
- self.0.report_peer(
- peer,
- cost_benefit,
- )
- }
- NetworkAction::WriteNotification(peer, peer_set, message) => {
- match peer_set {
- PeerSet::Validation => self.0.write_notification(
- peer,
- VALIDATION_PROTOCOL_NAME.into(),
- message,
- ),
- PeerSet::Collation => self.0.write_notification(
- peer,
- COLLATION_PROTOCOL_NAME.into(),
- message,
- ),
- }
- }
- }
-
- Ok(())
- }
-
- fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> {
- Poll::Ready(Ok(()))
- }
- }
-
- Box::pin(ActionSink(&**self))
- }
-}
/// The network bridge subsystem.
pub struct NetworkBridge {
+ /// `Network` trait implementing type.
network_service: N,
authority_discovery_service: AD,
}
@@ -256,114 +141,193 @@ struct PeerData {
view: View,
}
-#[derive(Debug)]
-enum Action {
- SendValidationMessages(Vec<(Vec, protocol_v1::ValidationProtocol)>),
- SendCollationMessages(Vec<(Vec, protocol_v1::CollationProtocol)>),
- ConnectToValidators {
- validator_ids: Vec,
- connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
- },
- ReportPeer(PeerId, ReputationChange),
-
- ActiveLeaves(ActiveLeavesUpdate),
- BlockFinalized(BlockNumber),
-
- PeerConnected(PeerSet, PeerId, ObservedRole),
- PeerDisconnected(PeerSet, PeerId),
- PeerMessages(
- PeerId,
- Vec>,
- Vec>,
- ),
-
- Abort,
- Nop,
-}
+/// Main driver, processing network events and messages from other subsystems.
+#[tracing::instrument(skip(network_service, authority_discovery_service, ctx), fields(subsystem = LOG_TARGET))]
+async fn run_network(
+ mut network_service: N,
+ mut authority_discovery_service: AD,
+ mut ctx: impl SubsystemContext,
+) -> SubsystemResult<()>
+where
+ N: Network + validator_discovery::Network,
+ AD: validator_discovery::AuthorityDiscovery,
+{
+ let mut event_stream = network_service.event_stream().fuse();
-#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
-fn action_from_overseer_message(
- res: polkadot_subsystem::SubsystemResult>,
-) -> Action {
- match res {
- Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves)))
- => Action::ActiveLeaves(active_leaves),
- Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)))
- => Action::BlockFinalized(number),
- Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort,
- Ok(FromOverseer::Communication { msg }) => match msg {
- NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
- NetworkBridgeMessage::SendValidationMessage(peers, msg)
- => Action::SendValidationMessages(vec![(peers, msg)]),
- NetworkBridgeMessage::SendCollationMessage(peers, msg)
- => Action::SendCollationMessages(vec![(peers, msg)]),
- NetworkBridgeMessage::SendValidationMessages(msgs)
- => Action::SendValidationMessages(msgs),
- NetworkBridgeMessage::SendCollationMessages(msgs)
- => Action::SendCollationMessages(msgs),
- NetworkBridgeMessage::ConnectToValidators { validator_ids, connected }
- => Action::ConnectToValidators { validator_ids, connected },
- },
- Err(e) => {
- tracing::warn!(target: LOG_TARGET, err = ?e, "Shutting down Network Bridge due to error");
- Action::Abort
- }
- }
-}
+ // Most recent heads are at the back.
+ let mut live_heads: Vec<(Hash, Arc)> = Vec::with_capacity(MAX_VIEW_HEADS);
+ let mut local_view = View::default();
+ let mut finalized_number = 0;
-#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
-fn action_from_network_message(event: Option) -> Action {
- match event {
- None => {
- tracing::info!(target: LOG_TARGET, "Shutting down Network Bridge: underlying event stream concluded");
- Action::Abort
- }
- Some(NetworkEvent::Dht(_)) |
- Some(NetworkEvent::SyncConnected { .. }) |
- Some(NetworkEvent::SyncDisconnected { .. }) => Action::Nop,
- Some(NetworkEvent::NotificationStreamOpened { remote, protocol, role }) => {
- let role = role.into();
- match protocol {
- x if x == VALIDATION_PROTOCOL_NAME
- => Action::PeerConnected(PeerSet::Validation, remote, role),
- x if x == COLLATION_PROTOCOL_NAME
- => Action::PeerConnected(PeerSet::Collation, remote, role),
- _ => Action::Nop,
+ let mut validation_peers: HashMap = HashMap::new();
+ let mut collation_peers: HashMap = HashMap::new();
+
+ let mut validator_discovery = validator_discovery::Service::::new();
+
+ loop {
+
+ let action = {
+ let subsystem_next = ctx.recv().fuse();
+ let mut net_event_next = event_stream.next().fuse();
+ futures::pin_mut!(subsystem_next);
+
+ futures::select! {
+ subsystem_msg = subsystem_next => Action::from(subsystem_msg),
+ net_event = net_event_next => Action::from(net_event),
}
- }
- Some(NetworkEvent::NotificationStreamClosed { remote, protocol }) => {
- match protocol {
- x if x == VALIDATION_PROTOCOL_NAME
- => Action::PeerDisconnected(PeerSet::Validation, remote),
- x if x == COLLATION_PROTOCOL_NAME
- => Action::PeerDisconnected(PeerSet::Collation, remote),
- _ => Action::Nop,
+ };
+
+ match action {
+ Action::Nop => {}
+ Action::Abort => return Ok(()),
+
+ Action::SendValidationMessages(msgs) => {
+ for (peers, msg) in msgs {
+ send_message(
+ &mut network_service,
+ peers,
+ PeerSet::Validation,
+ WireMessage::ProtocolMessage(msg),
+ ).await?
+ }
}
- }
- Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
- let v_messages: Result, _> = messages.iter()
- .filter(|(protocol, _)| protocol == &VALIDATION_PROTOCOL_NAME)
- .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
- .collect();
-
- let v_messages = match v_messages {
- Err(_) => return Action::ReportPeer(remote, MALFORMED_MESSAGE_COST),
- Ok(v) => v,
- };
-
- let c_messages: Result, _> = messages.iter()
- .filter(|(protocol, _)| protocol == &COLLATION_PROTOCOL_NAME)
- .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
- .collect();
-
- match c_messages {
- Err(_) => Action::ReportPeer(remote, MALFORMED_MESSAGE_COST),
- Ok(c_messages) => if v_messages.is_empty() && c_messages.is_empty() {
- Action::Nop
- } else {
- Action::PeerMessages(remote, v_messages, c_messages)
- },
+
+ Action::SendCollationMessages(msgs) => {
+ for (peers, msg) in msgs {
+ send_message(
+ &mut network_service,
+ peers,
+ PeerSet::Collation,
+ WireMessage::ProtocolMessage(msg),
+ ).await?
+ }
+ }
+
+ Action::ConnectToValidators {
+ validator_ids,
+ connected,
+ } => {
+ let (ns, ads) = validator_discovery.on_request(
+ validator_ids,
+ connected,
+ network_service,
+ authority_discovery_service,
+ ).await;
+ network_service = ns;
+ authority_discovery_service = ads;
+ },
+
+ Action::ReportPeer(peer, rep) => network_service.report_peer(peer, rep).await?,
+
+ Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
+ live_heads.extend(activated);
+ live_heads.retain(|h| !deactivated.contains(&h.0));
+
+ update_our_view(
+ &mut network_service,
+ &mut ctx,
+ &live_heads,
+ &mut local_view,
+ finalized_number,
+ &validation_peers,
+ &collation_peers,
+ ).await?;
}
+
+ Action::BlockFinalized(number) => {
+ debug_assert!(finalized_number < number);
+
+ // we don't send the view updates here, but delay them until the next `Action::ActiveLeaves`
+ // otherwise it might break assumptions of some of the subsystems
+ // that we never send the same `ActiveLeavesUpdate`
+ // this is fine, we will get `Action::ActiveLeaves` on block finalization anyway
+ finalized_number = number;
+ },
+
+ Action::PeerConnected(peer_set, peer, role) => {
+ let peer_map = match peer_set {
+ PeerSet::Validation => &mut validation_peers,
+ PeerSet::Collation => &mut collation_peers,
+ };
+
+ validator_discovery.on_peer_connected(&peer, &mut authority_discovery_service).await;
+
+ match peer_map.entry(peer.clone()) {
+ hash_map::Entry::Occupied(_) => continue,
+ hash_map::Entry::Vacant(vacant) => {
+ let _ = vacant.insert(PeerData {
+ view: View::default(),
+ });
+
+ match peer_set {
+ PeerSet::Validation => dispatch_validation_events_to_all(
+ vec![
+ NetworkBridgeEvent::PeerConnected(peer.clone(), role),
+ NetworkBridgeEvent::PeerViewChange(
+ peer,
+ View::default(),
+ ),
+ ],
+ &mut ctx,
+ ).await,
+ PeerSet::Collation => dispatch_collation_events_to_all(
+ vec![
+ NetworkBridgeEvent::PeerConnected(peer.clone(), role),
+ NetworkBridgeEvent::PeerViewChange(
+ peer,
+ View::default(),
+ ),
+ ],
+ &mut ctx,
+ ).await,
+ }
+ }
+ }
+ }
+ Action::PeerDisconnected(peer_set, peer) => {
+ let peer_map = match peer_set {
+ PeerSet::Validation => &mut validation_peers,
+ PeerSet::Collation => &mut collation_peers,
+ };
+
+ validator_discovery.on_peer_disconnected(&peer);
+
+ if peer_map.remove(&peer).is_some() {
+ match peer_set {
+ PeerSet::Validation => dispatch_validation_event_to_all(
+ NetworkBridgeEvent::PeerDisconnected(peer),
+ &mut ctx,
+ ).await,
+ PeerSet::Collation => dispatch_collation_event_to_all(
+ NetworkBridgeEvent::PeerDisconnected(peer),
+ &mut ctx,
+ ).await,
+ }
+ }
+ },
+ Action::PeerMessages(peer, v_messages, c_messages) => {
+ if !v_messages.is_empty() {
+ let events = handle_peer_messages(
+ peer.clone(),
+ &mut validation_peers,
+ v_messages,
+ &mut network_service,
+ ).await?;
+
+ dispatch_validation_events_to_all(events, &mut ctx).await;
+ }
+
+ if !c_messages.is_empty() {
+ let events = handle_peer_messages(
+ peer.clone(),
+ &mut collation_peers,
+ c_messages,
+ &mut network_service,
+ ).await?;
+
+ dispatch_collation_events_to_all(events, &mut ctx).await;
+ }
+ },
}
}
}
@@ -497,41 +461,6 @@ async fn send_collation_message(
send_message(net, peers, PeerSet::Collation, message).await
}
-async fn send_message(
- net: &mut impl Network,
- peers: I,
- peer_set: PeerSet,
- message: WireMessage,
-) -> SubsystemResult<()>
- where
- M: Encode + Clone,
- I: IntoIterator- ,
- I::IntoIter: ExactSizeIterator,
-{
- let mut message_producer = stream::iter({
- let peers = peers.into_iter();
- let n_peers = peers.len();
- let mut message = Some(message.encode());
-
- peers.enumerate().map(move |(i, peer)| {
- // optimization: avoid cloning the message for the last peer in the
- // list. The message payload can be quite large. If the underlying
- // network used `Bytes` this would not be necessary.
- let message = if i == n_peers - 1 {
- message.take()
- .expect("Only taken in last iteration of loop, never afterwards; qed")
- } else {
- message.as_ref()
- .expect("Only taken in last iteration of loop, we are not there yet; qed")
- .clone()
- };
-
- Ok(NetworkAction::WriteNotification(peer, peer_set, message))
- })
- });
-
- net.action_sink().send_all(&mut message_producer).await
-}
async fn dispatch_validation_event_to_all(
event: NetworkBridgeEvent
,
@@ -597,195 +526,7 @@ async fn dispatch_collation_events_to_all(
ctx.send_messages(events.into_iter().flat_map(messages_for)).await
}
-#[tracing::instrument(skip(network_service, authority_discovery_service, ctx), fields(subsystem = LOG_TARGET))]
-async fn run_network(
- mut network_service: N,
- mut authority_discovery_service: AD,
- mut ctx: impl SubsystemContext,
-) -> SubsystemResult<()>
-where
- N: Network + validator_discovery::Network,
- AD: validator_discovery::AuthorityDiscovery,
-{
- let mut event_stream = network_service.event_stream().fuse();
-
- // Most recent heads are at the back.
- let mut live_heads: Vec<(Hash, Arc)> = Vec::with_capacity(MAX_VIEW_HEADS);
- let mut local_view = View::default();
- let mut finalized_number = 0;
-
- let mut validation_peers: HashMap = HashMap::new();
- let mut collation_peers: HashMap = HashMap::new();
-
- let mut validator_discovery = validator_discovery::Service::::new();
-
- loop {
-
- let action = {
- let subsystem_next = ctx.recv().fuse();
- let mut net_event_next = event_stream.next().fuse();
- futures::pin_mut!(subsystem_next);
-
- futures::select! {
- subsystem_msg = subsystem_next => action_from_overseer_message(subsystem_msg),
- net_event = net_event_next => action_from_network_message(net_event),
- }
- };
-
- match action {
- Action::Nop => {}
- Action::Abort => return Ok(()),
-
- Action::SendValidationMessages(msgs) => {
- for (peers, msg) in msgs {
- send_message(
- &mut network_service,
- peers,
- PeerSet::Validation,
- WireMessage::ProtocolMessage(msg),
- ).await?
- }
- }
-
- Action::SendCollationMessages(msgs) => {
- for (peers, msg) in msgs {
- send_message(
- &mut network_service,
- peers,
- PeerSet::Collation,
- WireMessage::ProtocolMessage(msg),
- ).await?
- }
- }
-
- Action::ConnectToValidators {
- validator_ids,
- connected,
- } => {
- let (ns, ads) = validator_discovery.on_request(
- validator_ids,
- connected,
- network_service,
- authority_discovery_service,
- ).await;
- network_service = ns;
- authority_discovery_service = ads;
- },
-
- Action::ReportPeer(peer, rep) => network_service.report_peer(peer, rep).await?,
-
- Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
- live_heads.extend(activated);
- live_heads.retain(|h| !deactivated.contains(&h.0));
-
- update_our_view(
- &mut network_service,
- &mut ctx,
- &live_heads,
- &mut local_view,
- finalized_number,
- &validation_peers,
- &collation_peers,
- ).await?;
- }
-
- Action::BlockFinalized(number) => {
- debug_assert!(finalized_number < number);
-
- // we don't send the view updates here, but delay them until the next `Action::ActiveLeaves`
- // otherwise it might break assumptions of some of the subsystems
- // that we never send the same `ActiveLeavesUpdate`
- // this is fine, we will get `Action::ActiveLeaves` on block finalization anyway
- finalized_number = number;
- },
-
- Action::PeerConnected(peer_set, peer, role) => {
- let peer_map = match peer_set {
- PeerSet::Validation => &mut validation_peers,
- PeerSet::Collation => &mut collation_peers,
- };
- validator_discovery.on_peer_connected(&peer, &mut authority_discovery_service).await;
-
- match peer_map.entry(peer.clone()) {
- hash_map::Entry::Occupied(_) => continue,
- hash_map::Entry::Vacant(vacant) => {
- let _ = vacant.insert(PeerData {
- view: View::default(),
- });
-
- match peer_set {
- PeerSet::Validation => dispatch_validation_events_to_all(
- vec![
- NetworkBridgeEvent::PeerConnected(peer.clone(), role),
- NetworkBridgeEvent::PeerViewChange(
- peer,
- View::default(),
- ),
- ],
- &mut ctx,
- ).await,
- PeerSet::Collation => dispatch_collation_events_to_all(
- vec![
- NetworkBridgeEvent::PeerConnected(peer.clone(), role),
- NetworkBridgeEvent::PeerViewChange(
- peer,
- View::default(),
- ),
- ],
- &mut ctx,
- ).await,
- }
- }
- }
- }
- Action::PeerDisconnected(peer_set, peer) => {
- let peer_map = match peer_set {
- PeerSet::Validation => &mut validation_peers,
- PeerSet::Collation => &mut collation_peers,
- };
-
- validator_discovery.on_peer_disconnected(&peer);
-
- if peer_map.remove(&peer).is_some() {
- match peer_set {
- PeerSet::Validation => dispatch_validation_event_to_all(
- NetworkBridgeEvent::PeerDisconnected(peer),
- &mut ctx,
- ).await,
- PeerSet::Collation => dispatch_collation_event_to_all(
- NetworkBridgeEvent::PeerDisconnected(peer),
- &mut ctx,
- ).await,
- }
- }
- },
- Action::PeerMessages(peer, v_messages, c_messages) => {
- if !v_messages.is_empty() {
- let events = handle_peer_messages(
- peer.clone(),
- &mut validation_peers,
- v_messages,
- &mut network_service,
- ).await?;
-
- dispatch_validation_events_to_all(events, &mut ctx).await;
- }
-
- if !c_messages.is_empty() {
- let events = handle_peer_messages(
- peer.clone(),
- &mut collation_peers,
- c_messages,
- &mut network_service,
- ).await?;
-
- dispatch_collation_events_to_all(events, &mut ctx).await;
- }
- },
- }
- }
-}
#[cfg(test)]
@@ -793,21 +534,30 @@ mod tests {
use super::*;
use futures::channel::mpsc;
use futures::executor;
+ use futures::stream::BoxStream;
+ use std::pin::Pin;
+ use std::sync::Arc;
use std::borrow::Cow;
- use std::sync::Arc;
use std::collections::HashSet;
use async_trait::async_trait;
use parking_lot::Mutex;
use assert_matches::assert_matches;
+ use sc_network::Event as NetworkEvent;
+
use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage};
+ use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal};
use polkadot_node_subsystem_test_helpers::{
SingleItemSink, SingleItemStream, TestSubsystemContextHandle,
};
use polkadot_node_network_protocol::view;
use sc_network::Multiaddr;
use sp_keyring::Sr25519Keyring;
+ use polkadot_primitives::v1::AuthorityDiscoveryId;
+ use polkadot_node_network_protocol::ObservedRole;
+
+ use crate::network::{Network, NetworkAction};
// The subsystem's view of the network - only supports a single call to `event_stream`.
struct TestNetwork {
@@ -845,13 +595,6 @@ mod tests {
)
}
- fn peer_set_protocol(peer_set: PeerSet) -> std::borrow::Cow<'static, str> {
- match peer_set {
- PeerSet::Validation => VALIDATION_PROTOCOL_NAME.into(),
- PeerSet::Collation => COLLATION_PROTOCOL_NAME.into(),
- }
- }
-
impl Network for TestNetwork {
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
self.net_events.lock()
@@ -908,7 +651,7 @@ mod tests {
async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) {
self.send_network_event(NetworkEvent::NotificationStreamOpened {
remote: peer,
- protocol: peer_set_protocol(peer_set),
+ protocol: peer_set.into_protocol_name(),
role: role.into(),
}).await;
}
@@ -916,14 +659,14 @@ mod tests {
async fn disconnect_peer(&mut self, peer: PeerId, peer_set: PeerSet) {
self.send_network_event(NetworkEvent::NotificationStreamClosed {
remote: peer,
- protocol: peer_set_protocol(peer_set),
+ protocol: peer_set.into_protocol_name(),
}).await;
}
async fn peer_message(&mut self, peer: PeerId, peer_set: PeerSet, message: Vec) {
self.send_network_event(NetworkEvent::NotificationsReceived {
remote: peer,
- messages: vec![(peer_set_protocol(peer_set), message.into())],
+ messages: vec![(peer_set.into_protocol_name(), message.into())],
}).await;
}
diff --git a/node/network/bridge/src/network.rs b/node/network/bridge/src/network.rs
new file mode 100644
index 000000000000..c6a2bc4bdd2d
--- /dev/null
+++ b/node/network/bridge/src/network.rs
@@ -0,0 +1,183 @@
+// Copyright 2020-2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+use std::pin::Pin;
+use std::sync::Arc;
+
+use futures::future::BoxFuture;
+use futures::prelude::*;
+use futures::stream::BoxStream;
+
+use parity_scale_codec::Encode;
+
+use sc_network::Event as NetworkEvent;
+
+use super::LOG_TARGET;
+use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId, ReputationChange};
+use polkadot_primitives::v1::{Block, Hash};
+use polkadot_subsystem::{SubsystemError, SubsystemResult};
+
+/// Send a message to the network.
+///
+/// This function is only used internally by the network-bridge, which is responsible to only send
+/// messages that are compatible with the passed peer set, as that is currently not enforced by
+/// this function. These are messages of type `WireMessage` parameterized on the matching type.
+pub(crate) async fn send_message(
+ net: &mut impl Network,
+ peers: I,
+ peer_set: PeerSet,
+ message: M,
+) -> SubsystemResult<()>
+where
+ M: Encode + Clone,
+ I: IntoIterator- ,
+ I::IntoIter: ExactSizeIterator,
+{
+ let mut message_producer = stream::iter({
+ let peers = peers.into_iter();
+ let n_peers = peers.len();
+ let mut message = Some(message.encode());
+
+ peers.enumerate().map(move |(i, peer)| {
+ // optimization: avoid cloning the message for the last peer in the
+ // list. The message payload can be quite large. If the underlying
+ // network used `Bytes` this would not be necessary.
+ let message = if i == n_peers - 1 {
+ message
+ .take()
+ .expect("Only taken in last iteration of loop, never afterwards; qed")
+ } else {
+ message
+ .as_ref()
+ .expect("Only taken in last iteration of loop, we are not there yet; qed")
+ .clone()
+ };
+
+ Ok(NetworkAction::WriteNotification(peer, peer_set, message))
+ })
+ });
+
+ net.action_sink().send_all(&mut message_producer).await
+}
+
+/// An action to be carried out by the network.
+///
+/// This type is used for implementing `Sink` in order to cummunicate asynchronously with the
+/// underlying network implementation in the `Network` trait.
+#[derive(Debug, PartialEq)]
+pub enum NetworkAction {
+ /// Note a change in reputation for a peer.
+ ReputationChange(PeerId, ReputationChange),
+ /// Write a notification to a given peer on the given peer-set.
+ WriteNotification(PeerId, PeerSet, Vec
),
+}
+
+/// An abstraction over networking for the purposes of this subsystem.
+///
+pub trait Network: Send + 'static {
+ /// Get a stream of all events occurring on the network. This may include events unrelated
+ /// to the Polkadot protocol - the user of this function should filter only for events related
+ /// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME)
+ /// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME)
+ fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;
+
+ /// Get access to an underlying sink for all network actions.
+ fn action_sink<'a>(
+ &'a mut self,
+ ) -> Pin + Send + 'a>>;
+
+ /// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
+ fn report_peer(
+ &mut self,
+ who: PeerId,
+ cost_benefit: ReputationChange,
+ ) -> BoxFuture> {
+ async move {
+ self.action_sink()
+ .send(NetworkAction::ReputationChange(who, cost_benefit))
+ .await
+ }
+ .boxed()
+ }
+
+ /// Write a notification to a peer on the given peer-set's protocol.
+ fn write_notification(
+ &mut self,
+ who: PeerId,
+ peer_set: PeerSet,
+ message: Vec,
+ ) -> BoxFuture> {
+ async move {
+ self.action_sink()
+ .send(NetworkAction::WriteNotification(who, peer_set, message))
+ .await
+ }
+ .boxed()
+ }
+}
+
+impl Network for Arc> {
+ fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
+ sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
+ }
+
+ #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
+ fn action_sink<'a>(
+ &'a mut self,
+ ) -> Pin + Send + 'a>> {
+ use futures::task::{Context, Poll};
+
+ // wrapper around a NetworkService to make it act like a sink.
+ struct ActionSink<'b>(&'b sc_network::NetworkService);
+
+ impl<'b> Sink for ActionSink<'b> {
+ type Error = SubsystemError;
+
+ fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> {
+ match action {
+ NetworkAction::ReputationChange(peer, cost_benefit) => {
+ tracing::debug!(
+ target: LOG_TARGET,
+ "Changing reputation: {:?} for {}",
+ cost_benefit,
+ peer
+ );
+ self.0.report_peer(peer, cost_benefit)
+ }
+ NetworkAction::WriteNotification(peer, peer_set, message) => self
+ .0
+ .write_notification(peer, peer_set.into_protocol_name(), message),
+ }
+
+ Ok(())
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> {
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ Box::pin(ActionSink(&**self))
+ }
+}
diff --git a/node/network/bridge/src/validator_discovery.rs b/node/network/bridge/src/validator_discovery.rs
index 89e72a7aa9cf..926aa3706649 100644
--- a/node/network/bridge/src/validator_discovery.rs
+++ b/node/network/bridge/src/validator_discovery.rs
@@ -28,6 +28,7 @@ use sc_network::multiaddr::{Multiaddr, Protocol};
use sc_authority_discovery::Service as AuthorityDiscoveryService;
use polkadot_node_network_protocol::PeerId;
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
+use polkadot_node_network_protocol::peer_set::PeerSet;
const LOG_TARGET: &str = "validator_discovery";
@@ -276,24 +277,24 @@ impl Service {
// ask the network to connect to these nodes and not disconnect
// from them until removed from the set
if let Err(e) = network_service.add_peers_to_reserved_set(
- super::COLLATION_PROTOCOL_NAME.into(),
+ PeerSet::Collation.into_protocol_name(),
multiaddr_to_add.clone(),
).await {
tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}
if let Err(e) = network_service.add_peers_to_reserved_set(
- super::VALIDATION_PROTOCOL_NAME.into(),
+ PeerSet::Validation.into_protocol_name(),
multiaddr_to_add,
).await {
tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}
// the addresses are known to be valid
let _ = network_service.remove_peers_from_reserved_set(
- super::COLLATION_PROTOCOL_NAME.into(),
+ PeerSet::Collation.into_protocol_name(),
multiaddr_to_remove.clone()
).await;
let _ = network_service.remove_peers_from_reserved_set(
- super::VALIDATION_PROTOCOL_NAME.into(),
+ PeerSet::Validation.into_protocol_name(),
multiaddr_to_remove
).await;
diff --git a/node/network/protocol/Cargo.toml b/node/network/protocol/Cargo.toml
index 148cc14efd20..f06f2ccd4e06 100644
--- a/node/network/protocol/Cargo.toml
+++ b/node/network/protocol/Cargo.toml
@@ -11,3 +11,4 @@ polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-jaeger = { path = "../../jaeger" }
parity-scale-codec = { version = "1.3.6", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
+strum = { version = "0.20", features = ["derive"] }
diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs
index 35dc3b3a1702..c833ba4b1515 100644
--- a/node/network/protocol/src/lib.rs
+++ b/node/network/protocol/src/lib.rs
@@ -29,6 +29,10 @@ pub use polkadot_node_jaeger::JaegerSpan;
#[doc(hidden)]
pub use std::sync::Arc;
+
+/// Peer-sets and protocols used for parachains.
+pub mod peer_set;
+
/// A unique identifier of a request.
pub type RequestId = u64;
@@ -47,15 +51,6 @@ impl fmt::Display for WrongVariant {
impl std::error::Error for WrongVariant {}
-/// The peer-sets that the network manages. Different subsystems will use different peer-sets.
-#[derive(Debug, Clone, Copy, PartialEq)]
-pub enum PeerSet {
- /// The validation peer-set is responsible for all messages related to candidate validation and communication among validators.
- Validation,
- /// The collation peer-set is used for validator<>collator communication.
- Collation,
-}
-
/// The advertised role of a node.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ObservedRole {
diff --git a/node/network/protocol/src/peer_set.rs b/node/network/protocol/src/peer_set.rs
new file mode 100644
index 000000000000..3554aea5c0c3
--- /dev/null
+++ b/node/network/protocol/src/peer_set.rs
@@ -0,0 +1,90 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! All peersets and protocols used for parachains.
+
+use sc_network::config::{NonDefaultSetConfig, SetConfig};
+use std::borrow::Cow;
+use strum::{EnumIter, IntoEnumIterator};
+
+/// The peer-sets and thus the protocols which are used for the network.
+#[derive(Debug, Clone, Copy, PartialEq, EnumIter)]
+pub enum PeerSet {
+ /// The validation peer-set is responsible for all messages related to candidate validation and communication among validators.
+ Validation,
+ /// The collation peer-set is used for validator<>collator communication.
+ Collation,
+}
+
+impl PeerSet {
+ /// Get `sc_network` peer set configurations for each peerset.
+ ///
+ /// Those should be used in the network configuration to register the protocols with the
+ /// network service.
+ pub fn get_info(self) -> NonDefaultSetConfig {
+ let protocol = self.into_protocol_name();
+ match self {
+ PeerSet::Validation => NonDefaultSetConfig {
+ notifications_protocol: protocol,
+ set_config: sc_network::config::SetConfig {
+ in_peers: 25,
+ out_peers: 0,
+ reserved_nodes: Vec::new(),
+ non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept,
+ },
+ },
+ PeerSet::Collation => NonDefaultSetConfig {
+ notifications_protocol: protocol,
+ set_config: SetConfig {
+ in_peers: 25,
+ out_peers: 0,
+ reserved_nodes: Vec::new(),
+ non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept,
+ },
+ },
+ }
+ }
+
+ /// Get the protocol name associated with each peer set as static str.
+ pub const fn get_protocol_name_static(self) -> &'static str {
+ match self {
+ PeerSet::Validation => "/polkadot/validation/1",
+ PeerSet::Collation => "/polkadot/collation/1",
+ }
+ }
+
+ /// Convert a peer set into a protocol name as understood by Substrate.
+ pub fn into_protocol_name(self) -> Cow<'static, str> {
+ self.get_protocol_name_static().into()
+ }
+
+ /// Try parsing a protocol name into a peer set.
+ pub fn try_from_protocol_name(name: &Cow<'static, str>) -> Option {
+ match name {
+ n if n == &PeerSet::Validation.into_protocol_name() => Some(PeerSet::Validation),
+ n if n == &PeerSet::Collation.into_protocol_name() => Some(PeerSet::Collation),
+ _ => None,
+ }
+ }
+}
+
+/// Get `NonDefaultSetConfig`s for all available peer sets.
+///
+/// Should be used during network configuration (added to [`NetworkConfiguration::extra_sets`])
+/// or shortly after startup to register the protocols with the network service.
+pub fn peer_sets_info() -> Vec {
+ PeerSet::iter().map(PeerSet::get_info).collect()
+}
diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs
index ed86833888ad..0b37e3ae48f6 100644
--- a/node/service/src/lib.rs
+++ b/node/service/src/lib.rs
@@ -567,7 +567,7 @@ pub fn new_full(
// Substrate nodes.
config.network.extra_sets.push(grandpa::grandpa_peers_set_config());
#[cfg(feature = "real-overseer")]
- config.network.extra_sets.extend(polkadot_network_bridge::peers_sets_info());
+ config.network.extra_sets.extend(polkadot_network_bridge::peer_sets_info());
let (network, network_status_sinks, system_rpc_tx, network_starter) =
service::build_network(service::BuildNetworkParams {
diff --git a/roadmap/implementers-guide/src/node/utility/network-bridge.md b/roadmap/implementers-guide/src/node/utility/network-bridge.md
index 9f51094336f4..abcff82a16b6 100644
--- a/roadmap/implementers-guide/src/node/utility/network-bridge.md
+++ b/roadmap/implementers-guide/src/node/utility/network-bridge.md
@@ -32,7 +32,7 @@ Output:
This network bridge sends messages of these types over the network.
```rust
-enum ProtocolMessage {
+enum WireMessage {
ProtocolMessage(M),
ViewUpdate(View),
}
@@ -41,8 +41,8 @@ enum ProtocolMessage {
and instantiates this type twice, once using the [`ValidationProtocolV1`][VP1] message type, and once with the [`CollationProtocolV1`][CP1] message type.
```rust
-type ValidationV1Message = ProtocolMessage;
-type CollationV1Message = ProtocolMessage;
+type ValidationV1Message = WireMessage;
+type CollationV1Message = WireMessage;
```
### Startup