From 54873ea3757d3203fdcc3643966358fc95ead810 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 17 Jun 2020 19:31:37 -0400 Subject: [PATCH 01/33] network bridge skeleton --- Cargo.lock | 17 ++++++++ Cargo.toml | 2 + node/network/bridge/Cargo.toml | 18 ++++++++ node/network/bridge/src/lib.rs | 76 ++++++++++++++++++++++++++++++++++ 4 files changed, 113 insertions(+) create mode 100644 node/network/bridge/Cargo.toml create mode 100644 node/network/bridge/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 6e944dfa9351..8e47953cab75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4252,6 +4252,23 @@ dependencies = [ "wasm-timer", ] +[[package]] +name = "polkadot-network-bridge" +version = "0.1.0" +dependencies = [ + "futures 0.3.5", + "futures-timer 3.0.2", + "log 0.4.8", + "parity-scale-codec", + "polkadot-node-messages", + "polkadot-node-primitives", + "polkadot-overseer", + "polkadot-primitives", + "sc-network", + "sp-runtime", + "streamunordered", +] + [[package]] name = "polkadot-network-test" version = "0.8.10" diff --git a/Cargo.toml b/Cargo.toml index 8a43ec6ec032..254a8749c3f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,8 @@ members = [ "node/primitives", "node/service", + "node/network/bridge", + "parachain/test-parachains", "parachain/test-parachains/adder", "parachain/test-parachains/adder/collator", diff --git a/node/network/bridge/Cargo.toml b/node/network/bridge/Cargo.toml new file mode 100644 index 000000000000..bb502aa8b2e5 --- /dev/null +++ b/node/network/bridge/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "polkadot-network-bridge" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +futures = "0.3.5" +log = "0.4.8" +futures-timer = "3.0.2" +streamunordered = "0.5.1" +polkadot-primitives = { path = "../../../primitives" } +node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } +messages = { package = "polkadot-node-messages", path = "../../messages" } +parity-scale-codec = "1.3.0" +overseer = { package = "polkadot-overseer", path = "../../overseer" } +sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs new file mode 100644 index 000000000000..49a4f0e49f89 --- /dev/null +++ b/node/network/bridge/src/lib.rs @@ -0,0 +1,76 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The Network Bridge Subsystem - protocol multiplexer for Polkadot. + +use parity_scale_codec::{Encode, Decode}; + +use sc_network::{ + ObservedRole, ReputationChange, PeerId, config::ProtocolId as SubstrateProtocolId, +}; +use sp_runtime::ConsensusEngineId; + +use messages::{NetworkBridgeEvent, NetworkBridgeMessage}; +use overseer::{Subsystem, SubsystemContext, SpawnedSubsystem}; +use node_primitives::{ProtocolId, View}; +use polkadot_primitives::{Block, Hash}; + +use std::collections::HashMap; +use std::sync::Arc; + +const MAX_VIEW_HEADS: usize = 5; + +/// The engine ID of the polkadot network protocol. +pub const POLKADOT_ENGINE_ID: ConsensusEngineId = *b"dot2"; +/// The protocol name. +pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/2"; + +/// Messages received on the network. +#[derive(Encode, Decode)] +pub enum Message { + /// A message from a peer on a specific protocol. + #[codec(index = "1")] + ProtocolMessage(ProtocolId, Vec), + /// A view update from a peer. + #[codec(index = "2")] + ViewUpdate(View), +} + +/// The network bridge subsystem. +pub struct NetworkBridge(Arc>); + +impl NetworkBridge { + /// Create a new network bridge subsystem with underlying network service. + pub fn new(net_service: Arc>) -> Self { + NetworkBridge(net_service) + } +} + +impl Subsystem for NetworkBridge { + fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem { + unimplemented!(); + // TODO [now]: Spawn substrate-network notifications protocol & event stream. + } +} + +struct PeerData { + /// Latest view sent by the peer. + view: View, +} + +struct ProtocolHandler { + peers: HashMap, +} From 5eea3bfe3deb43fc955062bb57bd40cb0c3da8fa Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 17 Jun 2020 19:31:46 -0400 Subject: [PATCH 02/33] move some primitives around and add debug impls --- node/messages/src/lib.rs | 21 +++++++++++++-------- node/primitives/src/lib.rs | 10 ++++++++++ 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/node/messages/src/lib.rs b/node/messages/src/lib.rs index 4d6da67d84a2..9f853c6a68a1 100644 --- a/node/messages/src/lib.rs +++ b/node/messages/src/lib.rs @@ -24,14 +24,14 @@ use futures::channel::{mpsc, oneshot}; -use sc_network::{ObservedRole, ReputationChange, PeerId, config::ProtocolId}; +use sc_network::{ObservedRole, ReputationChange, PeerId}; use polkadot_primitives::{BlockNumber, Hash, Signature}; use polkadot_primitives::parachain::{ AbridgedCandidateReceipt, PoVBlock, ErasureChunk, BackedCandidate, Id as ParaId, SignedAvailabilityBitfield, SigningContext, ValidatorId, ValidationCode, ValidatorIndex, }; use polkadot_node_primitives::{ - MisbehaviorReport, SignedStatement, + MisbehaviorReport, SignedStatement, View, ProtocolId, }; /// Signals sent by an overseer to a subsystem. @@ -90,12 +90,8 @@ pub enum CandidateValidationMessage { ), } -/// Chain heads. -/// -/// Up to `N` (5?) chain heads. -pub struct View(pub Vec); - /// Events from network. +#[derive(Debug)] pub enum NetworkBridgeEvent { /// A peer has connected. PeerConnected(PeerId, ObservedRole), @@ -114,7 +110,8 @@ pub enum NetworkBridgeEvent { } /// Messages received by the network bridge subsystem. -pub enum NetworkBridgeSubsystemMessage { +#[derive(Debug)] +pub enum NetworkBridgeMessage { /// Register an event producer on startup. RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages), @@ -126,6 +123,7 @@ pub enum NetworkBridgeSubsystemMessage { } /// Availability Distribution Message. +#[derive(Debug)] pub enum AvailabilityDistributionMessage { /// Distribute an availability chunk to other validators. DistributeChunk(Hash, ErasureChunk), @@ -138,6 +136,7 @@ pub enum AvailabilityDistributionMessage { } /// Bitfield distribution message. +#[derive(Debug)] pub enum BitfieldDistributionMessage { /// Distribute a bitfield via gossip to other validators. DistributeBitfield(Hash, SignedAvailabilityBitfield), @@ -147,6 +146,7 @@ pub enum BitfieldDistributionMessage { } /// Availability store subsystem message. +#[derive(Debug)] pub enum AvailabilityStoreMessage { /// Query a `PoVBlock` from the AV store. QueryPoV(Hash, oneshot::Sender>), @@ -159,6 +159,7 @@ pub enum AvailabilityStoreMessage { } /// A request to the Runtime API subsystem. +#[derive(Debug)] pub enum RuntimeApiRequest { /// Get the current validator set. Validators(oneshot::Sender>), @@ -171,12 +172,14 @@ pub enum RuntimeApiRequest { } /// A message to the Runtime API subsystem. +#[derive(Debug)] pub enum RuntimeApiMessage { /// Make a request of the runtime API against the post-state of the given relay-parent. Request(Hash, RuntimeApiRequest), } /// Statement distribution message. +#[derive(Debug)] pub enum StatementDistributionMessage { /// We have originated a signed statement in the context of /// given relay-parent hash and it should be distributed to other validators. @@ -184,6 +187,7 @@ pub enum StatementDistributionMessage { } /// This data becomes intrinsics or extrinsics which should be included in a future relay chain block. +#[derive(Debug)] pub enum ProvisionableData { /// This bitfield indicates the availability of various candidate blocks. Bitfield(Hash, SignedAvailabilityBitfield), @@ -198,6 +202,7 @@ pub enum ProvisionableData { /// Message to the Provisioner. /// /// In all cases, the Hash is that of the relay parent. +#[derive(Debug)] pub enum ProvisionerMessage { /// This message allows potential block authors to be kept updated with all new authorship data /// as it becomes available. diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index 78a87cda39a9..15e99df0b0e8 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -92,6 +92,7 @@ impl SignedStatement { } /// A misbehaviour report. +#[derive(Debug)] pub enum MisbehaviorReport { /// These validator nodes disagree on this candidate's validity, please figure it out /// @@ -107,3 +108,12 @@ pub enum MisbehaviorReport { /// This peer has seconded more than one parachain candidate for this relay parent head DoubleVote(CandidateReceipt, SignedStatement, SignedStatement), } + +/// A unique identifier for a network protocol. +pub type ProtocolId = [u8; 4]; + +/// A succinct representation of a peer's view. This consists of a bounded amount of chain heads. +/// +/// Up to `N` (5?) chain heads. +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] +pub struct View(pub Vec); From 27456b580247e457f21e0c77e4f5a756cc50d287 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 17 Jun 2020 19:47:11 -0400 Subject: [PATCH 03/33] protocol registration glue & abstract network interface --- node/network/bridge/src/lib.rs | 48 +++++++++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 49a4f0e49f89..a2a9ee2d61f3 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -17,9 +17,12 @@ //! The Network Bridge Subsystem - protocol multiplexer for Polkadot. use parity_scale_codec::{Encode, Decode}; +use futures::prelude::*; +use futures::stream::BoxStream; use sc_network::{ ObservedRole, ReputationChange, PeerId, config::ProtocolId as SubstrateProtocolId, + Event as NetworkEvent, }; use sp_runtime::ConsensusEngineId; @@ -49,17 +52,54 @@ pub enum Message { ViewUpdate(View), } +/// Information about the notifications protocol. Should be used during network configuration +/// or shortly after startup to register the protocol with the network service. +pub fn notifications_protocol_info() -> (ConsensusEngineId, std::borrow::Cow<'static, [u8]>) { + (POLKADOT_ENGINE_ID, POLKADOT_PROTOCOL_NAME.into()) +} + +/// An abstraction over networking for the purposes of this subsystem. +pub trait Network: Clone + Send + 'static { + /// Get a stream of all events occurring on the network. This may include events unrelated + /// to the Polkadot protocol - the user of this function should filter only for events related + /// to the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID). + fn event_stream(&self) -> BoxStream; + + /// Report a given peer as either beneficial (+) or costly (-) according to the given scalar. + fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange); + + /// Write a notification to a peer on the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID) topic. + fn write_notification(&self, who: PeerId, message: Vec); +} + +impl Network for Arc> { + fn event_stream(&self) -> BoxStream { + sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed() + } + + fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { + sc_network::NetworkService::report_peer(self, who, cost_benefit) + } + + fn write_notification(&self, who: PeerId, message: Vec) { + sc_network::NetworkService::write_notification(self, who, POLKADOT_ENGINE_ID, message) + } +} + /// The network bridge subsystem. -pub struct NetworkBridge(Arc>); +pub struct NetworkBridge(N); -impl NetworkBridge { +impl NetworkBridge { /// Create a new network bridge subsystem with underlying network service. - pub fn new(net_service: Arc>) -> Self { + /// + /// This assumes that the network service has had the notifications protocol for the network + /// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info). + pub fn new(net_service: N) -> Self { NetworkBridge(net_service) } } -impl Subsystem for NetworkBridge { +impl Subsystem for NetworkBridge { fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem { unimplemented!(); // TODO [now]: Spawn substrate-network notifications protocol & event stream. From 3bf17fa09c5212d073b0f7678fbe399ffdc4ac68 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 17 Jun 2020 20:49:31 -0400 Subject: [PATCH 04/33] add send_msgs to subsystemctx --- node/overseer/src/lib.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 0d3c9b7b5095..01c5e4685468 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -65,7 +65,7 @@ use futures::channel::{mpsc, oneshot}; use futures::{ pending, poll, select, future::{BoxFuture, RemoteHandle}, - stream::FuturesUnordered, + stream::{self, FuturesUnordered}, task::{Spawn, SpawnError, SpawnExt}, Future, FutureExt, SinkExt, StreamExt, }; @@ -330,6 +330,16 @@ impl SubsystemContext { Ok(()) } + /// Send multiple direct messages to other `Subsystem`s, routed based on message type. + pub async fn send_msgs(&mut self, msgs: impl IntoIterator) + -> SubsystemResult<()> + { + let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok)); + self.tx.send_all(&mut msgs).await?; + + Ok(()) + } + fn new(rx: mpsc::Receiver>, tx: mpsc::Sender) -> Self { Self { rx, From e05abd2017fbad142bc1567100966f385335115c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 17 Jun 2020 20:51:41 -0400 Subject: [PATCH 05/33] select logic --- node/network/bridge/src/lib.rs | 50 ++++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 5 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index a2a9ee2d61f3..ef91e94a0d90 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -26,7 +26,7 @@ use sc_network::{ }; use sp_runtime::ConsensusEngineId; -use messages::{NetworkBridgeEvent, NetworkBridgeMessage}; +use messages::{NetworkBridgeEvent, NetworkBridgeMessage, FromOverseer, OverseerSignal}; use overseer::{Subsystem, SubsystemContext, SpawnedSubsystem}; use node_primitives::{ProtocolId, View}; use polkadot_primitives::{Block, Hash}; @@ -101,8 +101,7 @@ impl NetworkBridge { impl Subsystem for NetworkBridge { fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem { - unimplemented!(); - // TODO [now]: Spawn substrate-network notifications protocol & event stream. + SpawnedSubsystem(run_network(self.0.clone(), ctx).boxed()) } } @@ -111,6 +110,47 @@ struct PeerData { view: View, } -struct ProtocolHandler { - peers: HashMap, +async fn run_network(net: impl Network, mut ctx: SubsystemContext) { + let mut event_stream = net.event_stream().fuse(); + + // TODO [now] + // let peers = HashMap::new(); + // let event_listeners = HashMap::new(); + + loop { + let subsystem_next = ctx.recv().fuse(); + let mut net_event_next = event_stream.next().fuse(); + futures::pin_mut!(subsystem_next); + + futures::select! { + subsystem_msg = subsystem_next => match subsystem_msg { + Ok(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent))) => { + // TODO [now]: update local view and send view update to peers. + } + Ok(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))) => { + // TODO [now]: update local view and send view update to peers. + } + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return, + Ok(FromOverseer::Communication { msg }) => match msg { + NetworkBridgeMessage::RegisterEventProducer(protocol_id, message_producer) => { + // TODO [now]: add event producer. + } + NetworkBridgeMessage::ReportPeer(peer, rep) => { + // TODO [now]: report a peer to network service. + } + NetworkBridgeMessage::SendMessage(peers, protocol, message) => { + // TODO [now]: Send the message to all peers with `write_notification`. + } + }, + Err(e) => { + // TODO [now]: log error. + return; + } + }, + net_event = net_event_next => { + // TODO [now]: Update peer tracker, filter out anything not to do with this + // engine, and transform all updates to be sent to the overseer. + }, + } + } } From ba005c9ca71d0fc8ab60763d97dfbfbe0b998bfd Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 18 Jun 2020 12:27:47 -0400 Subject: [PATCH 06/33] transform different events into actions and handle --- node/network/bridge/src/lib.rs | 168 +++++++++++++++++++++++++++------ 1 file changed, 137 insertions(+), 31 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index ef91e94a0d90..c77abfd66544 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -21,12 +21,14 @@ use futures::prelude::*; use futures::stream::BoxStream; use sc_network::{ - ObservedRole, ReputationChange, PeerId, config::ProtocolId as SubstrateProtocolId, + ObservedRole, ReputationChange, PeerId, Event as NetworkEvent, }; use sp_runtime::ConsensusEngineId; -use messages::{NetworkBridgeEvent, NetworkBridgeMessage, FromOverseer, OverseerSignal}; +use messages::{ + NetworkBridgeEvent, NetworkBridgeMessage, FromOverseer, OverseerSignal, AllMessages, +}; use overseer::{Subsystem, SubsystemContext, SpawnedSubsystem}; use node_primitives::{ProtocolId, View}; use polkadot_primitives::{Block, Hash}; @@ -43,7 +45,7 @@ pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/2"; /// Messages received on the network. #[derive(Encode, Decode)] -pub enum Message { +pub enum WireMessage { /// A message from a peer on a specific protocol. #[codec(index = "1")] ProtocolMessage(ProtocolId, Vec), @@ -110,47 +112,151 @@ struct PeerData { view: View, } +enum Action { + RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages), + SendMessage(Vec, ProtocolId, Vec), + ReportPeer(PeerId, ReputationChange), + StartWork(Hash), + StopWork(Hash), + + PeerConnected(PeerId, ObservedRole), + PeerDisconnected(PeerId), + PeerMalformedMessage(PeerId), + PeerMessages(PeerId, Vec), + PeerViewChange(PeerId, View), + + Abort, +} + +fn action_from_overseer_message( + res: overseer::SubsystemResult>, +) -> Action { + match res { + Ok(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent))) + => Action::StartWork(relay_parent), + Ok(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))) + => Action::StopWork(relay_parent), + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort, + Ok(FromOverseer::Communication { msg }) => match msg { + NetworkBridgeMessage::RegisterEventProducer(protocol_id, message_producer) + => Action::RegisterEventProducer(protocol_id, message_producer), + NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep), + NetworkBridgeMessage::SendMessage(peers, protocol, message) + => Action::SendMessage(peers, protocol, message), + }, + Err(e) => { + log::warn!("Shutting down Network Bridge due to error {:?}", e); + Action::Abort + } + } +} + +fn action_from_network_message(event: Option) -> Option { + match event { + None => { + log::warn!("Shutting down Network Bridge: underlying event stream concluded"); + Some(Action::Abort) + } + Some(NetworkEvent::Dht(_)) => None, + Some(NetworkEvent::NotificationStreamOpened { remote, engine_id, role }) => { + if engine_id == POLKADOT_ENGINE_ID { + Some(Action::PeerConnected(remote, role)) + } else { + None + } + } + Some(NetworkEvent::NotificationStreamClosed { remote, engine_id }) => { + if engine_id == POLKADOT_ENGINE_ID { + Some(Action::PeerDisconnected(remote)) + } else { + None + } + } + Some(NetworkEvent::NotificationsReceived { remote, messages }) => { + let v: Result, _> = messages.iter() + .filter(|(engine_id, _)| engine_id == &POLKADOT_ENGINE_ID) + .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) + .collect(); + + match v { + Err(_) => Some(Action::PeerMalformedMessage(remote)), + Ok(v) => if v.is_empty() { + None + } else { + Some(Action::PeerMessages(remote, v)) + } + } + } + } +} + async fn run_network(net: impl Network, mut ctx: SubsystemContext) { let mut event_stream = net.event_stream().fuse(); + let mut local_view = Vec::with_capacity(MAX_VIEW_HEADS); - // TODO [now] - // let peers = HashMap::new(); - // let event_listeners = HashMap::new(); + //let mut peers = HashMap::new(); + let mut event_producers = HashMap::new(); loop { let subsystem_next = ctx.recv().fuse(); let mut net_event_next = event_stream.next().fuse(); futures::pin_mut!(subsystem_next); - futures::select! { - subsystem_msg = subsystem_next => match subsystem_msg { - Ok(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent))) => { - // TODO [now]: update local view and send view update to peers. - } - Ok(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))) => { - // TODO [now]: update local view and send view update to peers. + let action = futures::select! { + subsystem_msg = subsystem_next => Some(action_from_overseer_message(subsystem_msg)), + net_event = net_event_next => action_from_network_message(net_event), + }; + + let action = match action { + None => continue, + Some(a) => a, + }; + + match action { + Action::RegisterEventProducer(protocol_id, event_producer) => { + // insert only if none present. + event_producers.entry(protocol_id).or_insert(event_producer); + } + Action::SendMessage(peers, protocol, message) => { + let message = WireMessage::ProtocolMessage(protocol, message).encode(); + + for peer in peers.iter().skip(1).cloned() { + net.write_notification(peer, message.clone()); } - Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return, - Ok(FromOverseer::Communication { msg }) => match msg { - NetworkBridgeMessage::RegisterEventProducer(protocol_id, message_producer) => { - // TODO [now]: add event producer. - } - NetworkBridgeMessage::ReportPeer(peer, rep) => { - // TODO [now]: report a peer to network service. - } - NetworkBridgeMessage::SendMessage(peers, protocol, message) => { - // TODO [now]: Send the message to all peers with `write_notification`. - } - }, - Err(e) => { - // TODO [now]: log error. - return; + + if let Some(peer) = peers.first() { + net.write_notification(peer.clone(), message); } + } + Action::ReportPeer(peer, rep) => { + net.report_peer(peer, rep) + } + Action::StartWork(relay_parent) => { + local_view.push(relay_parent); + // TODO [now]: send view change. + } + Action::StopWork(relay_parent) => { + local_view.retain(|h| h != &relay_parent) + // TODO [now]: send view change. + } + + Action::PeerConnected(peer, role) => { + + } + Action::PeerDisconnected(peer) => { + + }, + Action::PeerMalformedMessage(peer) => { + }, - net_event = net_event_next => { - // TODO [now]: Update peer tracker, filter out anything not to do with this - // engine, and transform all updates to be sent to the overseer. + Action::PeerMessages(peer, messages) => { + + }, + Action::PeerViewChange(peer, new_view) => { + }, + + Action::Abort => return, } } } From 7572bbab43e8e35b2eb492bcfaa5bb20babe77e9 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 18 Jun 2020 14:36:46 -0400 Subject: [PATCH 07/33] implement remaining network bridge state machine --- node/messages/src/lib.rs | 2 +- node/network/bridge/src/lib.rs | 182 +++++++++++++++++++++++++++------ 2 files changed, 154 insertions(+), 30 deletions(-) diff --git a/node/messages/src/lib.rs b/node/messages/src/lib.rs index 9f853c6a68a1..79b28f107b3f 100644 --- a/node/messages/src/lib.rs +++ b/node/messages/src/lib.rs @@ -91,7 +91,7 @@ pub enum CandidateValidationMessage { } /// Events from network. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum NetworkBridgeEvent { /// A peer has connected. PeerConnected(PeerId, ObservedRole), diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index c77abfd66544..9d33a7e47855 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -33,7 +33,7 @@ use overseer::{Subsystem, SubsystemContext, SpawnedSubsystem}; use node_primitives::{ProtocolId, View}; use polkadot_primitives::{Block, Hash}; -use std::collections::HashMap; +use std::collections::hash_map::{HashMap, Entry}; use std::sync::Arc; const MAX_VIEW_HEADS: usize = 5; @@ -43,6 +43,13 @@ pub const POLKADOT_ENGINE_ID: ConsensusEngineId = *b"dot2"; /// The protocol name. pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/2"; +const MALFORMED_MESSAGE_COST: ReputationChange + = ReputationChange::new(-500, "Malformed Network-bridge message"); +const UNKNOWN_PROTO_COST: ReputationChange + = ReputationChange::new(-50, "Message sent to unknown protocol"); +const MALFORMED_VIEW_COST: ReputationChange + = ReputationChange::new(-500, "Malformed view"); + /// Messages received on the network. #[derive(Encode, Decode)] pub enum WireMessage { @@ -61,7 +68,7 @@ pub fn notifications_protocol_info() -> (ConsensusEngineId, std::borrow::Cow<'st } /// An abstraction over networking for the purposes of this subsystem. -pub trait Network: Clone + Send + 'static { +pub trait Network: Clone + Send + Sync + 'static { /// Get a stream of all events occurring on the network. This may include events unrelated /// to the Polkadot protocol - the user of this function should filter only for events related /// to the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID). @@ -121,9 +128,7 @@ enum Action { PeerConnected(PeerId, ObservedRole), PeerDisconnected(PeerId), - PeerMalformedMessage(PeerId), PeerMessages(PeerId, Vec), - PeerViewChange(PeerId, View), Abort, } @@ -179,7 +184,7 @@ fn action_from_network_message(event: Option) -> Option { .collect(); match v { - Err(_) => Some(Action::PeerMalformedMessage(remote)), + Err(_) => Some(Action::ReportPeer(remote, MALFORMED_MESSAGE_COST)), Ok(v) => if v.is_empty() { None } else { @@ -190,26 +195,61 @@ fn action_from_network_message(event: Option) -> Option { } } +fn construct_view(live_heads: &[Hash]) -> View { + View(live_heads.iter().rev().take(MAX_VIEW_HEADS).cloned().collect()) +} + +async fn dispatch_update_to_all( + update: NetworkBridgeEvent, + event_producers: impl IntoIterator AllMessages>, + ctx: &mut SubsystemContext, +) -> overseer::SubsystemResult<()> { + // collect messages here to avoid the borrow lasting across await boundary. + let messages: Vec<_> = event_producers.into_iter() + .map(|producer| producer(update.clone())) + .collect(); + + ctx.send_msgs(messages).await +} + async fn run_network(net: impl Network, mut ctx: SubsystemContext) { let mut event_stream = net.event_stream().fuse(); - let mut local_view = Vec::with_capacity(MAX_VIEW_HEADS); - //let mut peers = HashMap::new(); + // Most recent heads are at the back. + let mut live_heads = Vec::with_capacity(MAX_VIEW_HEADS); + let mut local_view = View(Vec::new()); + + let mut peers = HashMap::new(); let mut event_producers = HashMap::new(); loop { - let subsystem_next = ctx.recv().fuse(); - let mut net_event_next = event_stream.next().fuse(); - futures::pin_mut!(subsystem_next); - - let action = futures::select! { - subsystem_msg = subsystem_next => Some(action_from_overseer_message(subsystem_msg)), - net_event = net_event_next => action_from_network_message(net_event), + let action = { + let subsystem_next = ctx.recv().fuse(); + let mut net_event_next = event_stream.next().fuse(); + futures::pin_mut!(subsystem_next); + + let action = futures::select! { + subsystem_msg = subsystem_next => Some(action_from_overseer_message(subsystem_msg)), + net_event = net_event_next => action_from_network_message(net_event), + }; + + match action { + Some(a) => a, + None => continue, + } }; - let action = match action { - None => continue, - Some(a) => a, + let update_view = |peers: &HashMap, live_heads, local_view: &mut View| { + let new_view = construct_view(live_heads); + if *local_view == new_view { return None } + *local_view = new_view.clone(); + + let message = WireMessage::ViewUpdate(new_view).encode(); + for peer in peers.keys().cloned() { + net.write_notification(peer, message.clone()) + } + + Some(NetworkBridgeEvent::OurViewChange(local_view.clone())) }; match action { @@ -232,28 +272,112 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext { - local_view.push(relay_parent); - // TODO [now]: send view change. + live_heads.push(relay_parent); + if let Some(view_update) = update_view(&peers, &live_heads, &mut local_view) { + if let Err(_) = dispatch_update_to_all( + view_update, + event_producers.values(), + &mut ctx, + ).await { + log::warn!("Aborting - Failure to dispatch messages to overseer"); + return + } + } } Action::StopWork(relay_parent) => { - local_view.retain(|h| h != &relay_parent) - // TODO [now]: send view change. + live_heads.retain(|h| h != &relay_parent); + if let Some(view_update) = update_view(&peers, &live_heads, &mut local_view) { + if let Err(_) = dispatch_update_to_all( + view_update, + event_producers.values(), + &mut ctx, + ).await { + log::warn!("Aborting - Failure to dispatch messages to overseer"); + return + } + } } Action::PeerConnected(peer, role) => { - + match peers.entry(peer.clone()) { + Entry::Occupied(_) => continue, + Entry::Vacant(vacant) => { + vacant.insert(PeerData { + view: View(Vec::new()), + }); + + if let Err(_) = dispatch_update_to_all( + NetworkBridgeEvent::PeerConnected(peer, role), + event_producers.values(), + &mut ctx, + ).await { + log::warn!("Aborting - Failure to dispatch messages to overseer"); + return + } + } + } } Action::PeerDisconnected(peer) => { - - }, - Action::PeerMalformedMessage(peer) => { - + if peers.remove(&peer).is_some() { + if let Err(_) = dispatch_update_to_all( + NetworkBridgeEvent::PeerDisconnected(peer), + event_producers.values(), + &mut ctx, + ).await { + log::warn!("Aborting - Failure to dispatch messages to overseer"); + return + } + } }, Action::PeerMessages(peer, messages) => { + let peer_data = match peers.get_mut(&peer) { + None => continue, + Some(d) => d, + }; + + let mut outgoing_messages = Vec::with_capacity(messages.len()); + for message in messages { + match message { + WireMessage::ViewUpdate(new_view) => { + if new_view.0.len() > MAX_VIEW_HEADS { + net.report_peer(peer.clone(), MALFORMED_VIEW_COST); + continue + } + + if new_view == peer_data.view { continue } + peer_data.view = new_view; + + let update = NetworkBridgeEvent::PeerViewChange( + peer.clone(), + peer_data.view.clone(), + ); + + outgoing_messages.extend( + event_producers.values().map(|producer| producer(update.clone())) + ); + } + WireMessage::ProtocolMessage(protocol, message) => { + let message = match event_producers.get(&protocol) { + Some(producer) => Some(producer( + NetworkBridgeEvent::PeerMessage(peer.clone(), message) + )), + None => { + net.report_peer(peer.clone(), UNKNOWN_PROTO_COST); + None + } + }; + + if let Some(message) = message { + outgoing_messages.push(message); + } + } + } + } - }, - Action::PeerViewChange(peer, new_view) => { - + if let Err(_) = ctx.send_msgs(outgoing_messages).await { + log::warn!("Aborting - Failure to dispatch messages to overseer"); + return + } }, Action::Abort => return, From 24a91c4db17d6203fb2149907e2a4a749f41ba63 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 22 Jun 2020 10:52:51 -0400 Subject: [PATCH 08/33] start test skeleton --- node/network/bridge/Cargo.toml | 3 +++ node/network/bridge/src/lib.rs | 49 ++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/node/network/bridge/Cargo.toml b/node/network/bridge/Cargo.toml index bb502aa8b2e5..690e9b83118d 100644 --- a/node/network/bridge/Cargo.toml +++ b/node/network/bridge/Cargo.toml @@ -16,3 +16,6 @@ parity-scale-codec = "1.3.0" overseer = { package = "polkadot-overseer", path = "../../overseer" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } + +[dev-dependencies] +parking_lot = "0.10.0" diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 9d33a7e47855..d82ca538ff60 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -384,3 +384,52 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext), + } + + #[derive(Clone)] + struct TestNetwork { + net_events: Arc>>>, + outgoing_tx: mpsc::UnboundedSender, + } + + struct TestHost { + outgoing_rx: mpsc::UnboundedReceiver, + net_tx: mpsc::UnboundedSender, + } + + fn new_test_host() -> ( + TestNetwork, + mpsc::UnboundedSender, + mpsc::UnboundedReceiver, + { + let (net_tx, net_rx) = mpsc::unbounded(); + let (outgoing_tx, outgoing_rx) = mpsc::unbounded(); + + let net = TestNetwork { + net_events: Arc::new(Mutex::new(Some(net_rx))), + outgoing_tx, + }; + + (net, net_tx, outgoing_rx) + }) + + impl TestNetwork { + + } + + + // TODO [now]: our view updates are sent. + // TODO [now]: peer view updates get sent via overseer. + + // TODO [now]: peer messages are sent via event producer +} From 064a00c244c6c1c6a240e6998850fbe5ddc52edc Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 22 Jun 2020 11:31:29 -0400 Subject: [PATCH 09/33] make network methods asynchronous --- Cargo.lock | 1 + node/network/bridge/src/lib.rs | 71 ++++++++++++++++++++-------------- 2 files changed, 44 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e47953cab75..5e54a3ad554d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4260,6 +4260,7 @@ dependencies = [ "futures-timer 3.0.2", "log 0.4.8", "parity-scale-codec", + "parking_lot 0.10.2", "polkadot-node-messages", "polkadot-node-primitives", "polkadot-overseer", diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index d82ca538ff60..c71abd9f33e4 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -18,6 +18,7 @@ use parity_scale_codec::{Encode, Decode}; use futures::prelude::*; +use futures::future::BoxFuture; use futures::stream::BoxStream; use sc_network::{ @@ -75,10 +76,10 @@ pub trait Network: Clone + Send + Sync + 'static { fn event_stream(&self) -> BoxStream; /// Report a given peer as either beneficial (+) or costly (-) according to the given scalar. - fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange); + fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) -> BoxFuture<()>; /// Write a notification to a peer on the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID) topic. - fn write_notification(&self, who: PeerId, message: Vec); + fn write_notification(&self, who: PeerId, message: Vec) -> BoxFuture<()>; } impl Network for Arc> { @@ -86,12 +87,14 @@ impl Network for Arc> { sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed() } - fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { - sc_network::NetworkService::report_peer(self, who, cost_benefit) + fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) -> BoxFuture<()> { + sc_network::NetworkService::report_peer(self, who, cost_benefit); + future::ready(()).boxed() } - fn write_notification(&self, who: PeerId, message: Vec) { - sc_network::NetworkService::write_notification(self, who, POLKADOT_ENGINE_ID, message) + fn write_notification(&self, who: PeerId, message: Vec) -> BoxFuture<()> { + sc_network::NetworkService::write_notification(self, who, POLKADOT_ENGINE_ID, message); + future::ready(()).boxed() } } @@ -212,6 +215,27 @@ async fn dispatch_update_to_all( ctx.send_msgs(messages).await } +async fn update_view( + peers: &HashMap, + live_heads: &[Hash], + net: &impl Network, + local_view: &mut View, +) -> Option { + let new_view = construct_view(live_heads); + if *local_view == new_view { return None } + *local_view = new_view.clone(); + + let message = WireMessage::ViewUpdate(new_view.clone()).encode(); + + let write_all = peers.keys().cloned().map(|peer| { + net.write_notification(peer, message.clone()) + }); + + future::join_all(write_all).await; + + Some(NetworkBridgeEvent::OurViewChange(local_view.clone())) +} + async fn run_network(net: impl Network, mut ctx: SubsystemContext) { let mut event_stream = net.event_stream().fuse(); @@ -239,19 +263,6 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext, live_heads, local_view: &mut View| { - let new_view = construct_view(live_heads); - if *local_view == new_view { return None } - *local_view = new_view.clone(); - - let message = WireMessage::ViewUpdate(new_view).encode(); - for peer in peers.keys().cloned() { - net.write_notification(peer, message.clone()) - } - - Some(NetworkBridgeEvent::OurViewChange(local_view.clone())) - }; - match action { Action::RegisterEventProducer(protocol_id, event_producer) => { // insert only if none present. @@ -261,19 +272,21 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext { - net.report_peer(peer, rep) + net.report_peer(peer, rep).await } Action::StartWork(relay_parent) => { live_heads.push(relay_parent); - if let Some(view_update) = update_view(&peers, &live_heads, &mut local_view) { + if let Some(view_update) + = update_view(&peers, &live_heads, &net, &mut local_view).await + { if let Err(_) = dispatch_update_to_all( view_update, event_producers.values(), @@ -286,7 +299,9 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext { live_heads.retain(|h| h != &relay_parent); - if let Some(view_update) = update_view(&peers, &live_heads, &mut local_view) { + if let Some(view_update) + = update_view(&peers, &live_heads, &net, &mut local_view).await + { if let Err(_) = dispatch_update_to_all( view_update, event_producers.values(), @@ -340,7 +355,7 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext { if new_view.0.len() > MAX_VIEW_HEADS { - net.report_peer(peer.clone(), MALFORMED_VIEW_COST); + net.report_peer(peer.clone(), MALFORMED_VIEW_COST).await; continue } @@ -362,7 +377,7 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext { - net.report_peer(peer.clone(), UNKNOWN_PROTO_COST); + net.report_peer(peer.clone(), UNKNOWN_PROTO_COST).await; None } }; @@ -411,7 +426,7 @@ mod tests { TestNetwork, mpsc::UnboundedSender, mpsc::UnboundedReceiver, - { + ) { let (net_tx, net_rx) = mpsc::unbounded(); let (outgoing_tx, outgoing_rx) = mpsc::unbounded(); @@ -421,7 +436,7 @@ mod tests { }; (net, net_tx, outgoing_rx) - }) + } impl TestNetwork { From c1160613a40863e73ed4b441ff6554a03a0ad521 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 24 Jun 2020 17:59:01 -0400 Subject: [PATCH 10/33] extract subsystem out to subsystem crate --- Cargo.lock | 31 ++-- Cargo.toml | 2 +- node/network/bridge/Cargo.toml | 2 +- node/overseer/Cargo.toml | 2 +- node/primitives/Cargo.toml | 1 + node/{messages => subsystem}/Cargo.toml | 5 +- node/subsystem/src/lib.rs | 142 ++++++++++++++++++ .../src/lib.rs => subsystem/src/messages.rs} | 27 ---- 8 files changed, 171 insertions(+), 41 deletions(-) rename node/{messages => subsystem}/Cargo.toml (77%) create mode 100644 node/subsystem/src/lib.rs rename node/{messages/src/lib.rs => subsystem/src/messages.rs} (89%) diff --git a/Cargo.lock b/Cargo.lock index 5e54a3ad554d..6ce48e6c15d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -201,6 +201,17 @@ dependencies = [ "webpki-roots 0.19.0", ] +[[package]] +name = "async-trait" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a265e3abeffdce30b2e26b7a11b222fe37c6067404001b434101457d0385eb92" +dependencies = [ + "proc-macro2 1.0.17", + "quote 1.0.6", + "syn 1.0.27", +] + [[package]] name = "atty" version = "0.2.14" @@ -4261,8 +4272,8 @@ dependencies = [ "log 0.4.8", "parity-scale-codec", "parking_lot 0.10.2", - "polkadot-node-messages", "polkadot-node-primitives", + "polkadot-node-subsystem", "polkadot-overseer", "polkadot-primitives", "sc-network", @@ -4292,24 +4303,26 @@ dependencies = [ ] [[package]] -name = "polkadot-node-messages" +name = "polkadot-node-primitives" version = "0.1.0" dependencies = [ - "futures 0.3.5", - "polkadot-node-primitives", + "async-trait", + "parity-scale-codec", "polkadot-primitives", "polkadot-statement-table", - "sc-network", + "sp-runtime", ] [[package]] -name = "polkadot-node-primitives" +name = "polkadot-node-subsystem" version = "0.1.0" dependencies = [ - "parity-scale-codec", + "async-trait", + "futures 0.3.5", + "polkadot-node-primitives", "polkadot-primitives", "polkadot-statement-table", - "sp-runtime", + "sc-network", ] [[package]] @@ -4321,7 +4334,7 @@ dependencies = [ "futures-timer 3.0.2", "kv-log-macro", "log 0.4.8", - "polkadot-node-messages", + "polkadot-node-subsystem", "polkadot-primitives", "sc-client-api", "streamunordered", diff --git a/Cargo.toml b/Cargo.toml index 254a8749c3f9..f5fcd0335eb8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,10 +42,10 @@ members = [ "service", "validation", - "node/messages", "node/overseer", "node/primitives", "node/service", + "node/subsystem", "node/network/bridge", diff --git a/node/network/bridge/Cargo.toml b/node/network/bridge/Cargo.toml index 690e9b83118d..ce9e9545774d 100644 --- a/node/network/bridge/Cargo.toml +++ b/node/network/bridge/Cargo.toml @@ -11,11 +11,11 @@ futures-timer = "3.0.2" streamunordered = "0.5.1" polkadot-primitives = { path = "../../../primitives" } node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } -messages = { package = "polkadot-node-messages", path = "../../messages" } parity-scale-codec = "1.3.0" overseer = { package = "polkadot-overseer", path = "../../overseer" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } [dev-dependencies] parking_lot = "0.10.0" diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index 88626e2e05f3..dba52e7f4802 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -11,7 +11,7 @@ futures-timer = "3.0.2" streamunordered = "0.5.1" polkadot-primitives = { path = "../../primitives" } client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } -messages = { package = "polkadot-node-messages", path = "../messages" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } [dev-dependencies] futures = { version = "0.3.5", features = ["thread-pool"] } diff --git a/node/primitives/Cargo.toml b/node/primitives/Cargo.toml index f317565b2e99..b2bc9231ae74 100644 --- a/node/primitives/Cargo.toml +++ b/node/primitives/Cargo.toml @@ -10,3 +10,4 @@ polkadot-primitives = { path = "../../primitives" } polkadot-statement-table = { path = "../../statement-table" } parity-scale-codec = { version = "1.3.0", default-features = false, features = ["derive"] } runtime_primitives = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } +async-trait = "0.1" diff --git a/node/messages/Cargo.toml b/node/subsystem/Cargo.toml similarity index 77% rename from node/messages/Cargo.toml rename to node/subsystem/Cargo.toml index 9edb5a051987..43712319cb71 100644 --- a/node/messages/Cargo.toml +++ b/node/subsystem/Cargo.toml @@ -1,9 +1,9 @@ [package] -name = "polkadot-node-messages" +name = "polkadot-node-subsystem" version = "0.1.0" authors = ["Parity Technologies "] edition = "2018" -description = "Message types used by Subsystems" +description = "Subsystem traits and message definitions" [dependencies] polkadot-primitives = { path = "../../primitives" } @@ -11,3 +11,4 @@ polkadot-statement-table = { path = "../../statement-table" } polkadot-node-primitives = { path = "../primitives" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } futures = "0.3.5" +async-trait = "0.1" diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs new file mode 100644 index 000000000000..d7342b9087fd --- /dev/null +++ b/node/subsystem/src/lib.rs @@ -0,0 +1,142 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Subsystem trait definitions and message types. +//! +//! Node-side logic for Polkadot is mostly comprised of Subsystems, which are discrete components +//! that communicate via message-passing. They are coordinated by an overseer, provided by a +//! separate crate. + +use std::pin::Pin; + +use futures::prelude::*; +use futures::channel::{mpsc, oneshot}; +use futures::future::BoxFuture; + +use polkadot_primitives::Hash; +use async_trait::async_trait; + +use crate::messages::AllMessages; + +pub mod messages; + +/// Signals sent by an overseer to a subsystem. +#[derive(PartialEq, Clone, Debug)] +pub enum OverseerSignal { + /// `Subsystem` should start working on block-based work, given by the relay-chain block hash. + StartWork(Hash), + /// `Subsystem` should stop working on block-based work specified by the relay-chain block hash. + StopWork(Hash), + /// Conclude the work of the `Overseer` and all `Subsystem`s. + Conclude, +} + +/// A message type that a subsystem receives from an overseer. +/// It wraps signals from an overseer and messages that are circulating +/// between subsystems. +/// +/// It is generic over over the message type `M` that a particular `Subsystem` may use. +#[derive(Debug)] +pub enum FromOverseer { + /// Signal from the `Overseer`. + Signal(OverseerSignal), + + /// Some other `Subsystem`'s message. + Communication { + msg: M, + }, +} + +/// An error type that describes faults that may happen +/// +/// These are: +/// * Channels being closed +/// * Subsystems dying when they are not expected to +/// * Subsystems not dying when they are told to die +/// * etc. +#[derive(Debug)] +pub struct SubsystemError; + +impl From for SubsystemError { + fn from(_: mpsc::SendError) -> Self { + Self + } +} + +impl From for SubsystemError { + fn from(_: oneshot::Canceled) -> Self { + Self + } +} + +impl From for SubsystemError { + fn from(_: futures::task::SpawnError) -> Self { + Self + } +} + +/// An asynchronous subsystem task.. +/// +/// In essence it's just a newtype wrapping a `BoxFuture`. +pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>); + +/// A `Result` type that wraps [`SubsystemError`]. +/// +/// [`SubsystemError`]: struct.SubsystemError.html +pub type SubsystemResult = Result; + +/// A context type that is given to the [`Subsystem`] upon spawning. +/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s +/// or spawn jobs. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`SubsystemJob`]: trait.SubsystemJob.html +#[async_trait] +pub trait SubsystemContext { + /// The message type of this context. + type Message: Send; + + /// Try to asynchronously receive a message. + /// + /// This has to be used with caution, if you loop over this without + /// using `pending!()` macro you will end up with a busy loop! + async fn try_recv(&mut self) -> Result>, ()>; + + /// Receive a message. + async fn recv(&mut self) -> SubsystemResult>; + + /// Spawn a child task on the executor. + async fn spawn(&mut self, s: Pin + Send>>) -> SubsystemResult<()>; + + /// Send a direct message to some other `Subsystem`, routed based on message type. + async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()>; + + /// Send multiple direct messages to other `Subsystem`s, routed based on message type. + async fn send_messages(&mut self, msgs: impl IntoIterator) -> SubsystemResult<()>; +} + +/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`]. +/// +/// It is generic over the message type circulating in the system. +/// The idea that we want some type contaning persistent state that +/// can spawn actually running subsystems when asked to. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`Subsystem`]: trait.Subsystem.html +pub trait Subsystem { + /// Start this `Subsystem` and return `SpawnedSubsystem`. + fn start(&mut self, ctx: C) -> SpawnedSubsystem; +} diff --git a/node/messages/src/lib.rs b/node/subsystem/src/messages.rs similarity index 89% rename from node/messages/src/lib.rs rename to node/subsystem/src/messages.rs index 79b28f107b3f..42203a13dea8 100644 --- a/node/messages/src/lib.rs +++ b/node/subsystem/src/messages.rs @@ -34,17 +34,6 @@ use polkadot_node_primitives::{ MisbehaviorReport, SignedStatement, View, ProtocolId, }; -/// Signals sent by an overseer to a subsystem. -#[derive(PartialEq, Clone, Debug)] -pub enum OverseerSignal { - /// `Subsystem` should start working on block-based work, given by the relay-chain block hash. - StartWork(Hash), - /// `Subsystem` should stop working on block-based work specified by the relay-chain block hash. - StopWork(Hash), - /// Conclude the work of the `Overseer` and all `Subsystem`s. - Conclude, -} - /// A notification of a new backed candidate. #[derive(Debug)] pub struct NewBackedCandidate(pub BackedCandidate); @@ -219,19 +208,3 @@ pub enum AllMessages { /// Message for the candidate backing subsystem. CandidateBacking(CandidateBackingMessage), } - -/// A message type that a subsystem receives from an overseer. -/// It wraps signals from an overseer and messages that are circulating -/// between subsystems. -/// -/// It is generic over over the message type `M` that a particular `Subsystem` may use. -#[derive(Debug)] -pub enum FromOverseer { - /// Signal from the `Overseer`. - Signal(OverseerSignal), - - /// Some other `Subsystem`'s message. - Communication { - msg: M, - }, -} From e6162deeabdd5b8bf88dbf948391290bd21e6a40 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 24 Jun 2020 18:28:56 -0400 Subject: [PATCH 11/33] port over overseer to subsystem context trait --- Cargo.lock | 1 + node/overseer/Cargo.toml | 1 + node/overseer/src/lib.rs | 161 ++++++++++++++------------------------ node/subsystem/src/lib.rs | 8 +- 4 files changed, 67 insertions(+), 104 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6ce48e6c15d9..639b23c04318 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4329,6 +4329,7 @@ dependencies = [ name = "polkadot-overseer" version = "0.1.0" dependencies = [ + "async-trait", "femme", "futures 0.3.5", "futures-timer 3.0.2", diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index dba52e7f4802..6c6ce304e6d4 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -12,6 +12,7 @@ streamunordered = "0.5.1" polkadot-primitives = { path = "../../primitives" } client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } +async-trait = "0.1" [dev-dependencies] futures = { version = "0.3.5", features = ["thread-pool"] } diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 01c5e4685468..a0026812ffb3 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -66,7 +66,7 @@ use futures::{ pending, poll, select, future::{BoxFuture, RemoteHandle}, stream::{self, FuturesUnordered}, - task::{Spawn, SpawnError, SpawnExt}, + task::{Spawn, SpawnExt}, Future, FutureExt, SinkExt, StreamExt, }; use futures_timer::Delay; @@ -75,50 +75,14 @@ use streamunordered::{StreamYield, StreamUnordered}; use polkadot_primitives::{Block, BlockNumber, Hash}; use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; -pub use messages::{ - OverseerSignal, CandidateValidationMessage, CandidateBackingMessage, AllMessages, - FromOverseer, +use polkadot_subsystem::messages::{ + CandidateValidationMessage, CandidateBackingMessage, AllMessages +}; +pub use polkadot_subsystem::{ + Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, + SpawnedSubsystem, }; -/// An error type that describes faults that may happen -/// -/// These are: -/// * Channels being closed -/// * Subsystems dying when they are not expected to -/// * Subsystems not dying when they are told to die -/// * etc. -#[derive(Debug)] -pub struct SubsystemError; - -impl From for SubsystemError { - fn from(_: mpsc::SendError) -> Self { - Self - } -} - -impl From for SubsystemError { - fn from(_: oneshot::Canceled) -> Self { - Self - } -} - -impl From for SubsystemError { - fn from(_: SpawnError) -> Self { - Self - } -} - -/// A `Result` type that wraps [`SubsystemError`]. -/// -/// [`SubsystemError`]: struct.SubsystemError.html -pub type SubsystemResult = Result; - -/// An asynchronous subsystem task that runs inside and being overseen by the [`Overseer`]. -/// -/// In essence it's just a newtype wrapping a `BoxFuture`. -/// -/// [`Overseer`]: struct.Overseer.html -pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>); // A capacity of bounded channels inside the overseer. const CHANNEL_CAPACITY: usize = 1024; @@ -278,7 +242,7 @@ impl Debug for ToOverseer { /// A running instance of some [`Subsystem`]. /// /// [`Subsystem`]: trait.Subsystem.html -struct SubsystemInstance { +struct SubsystemInstance { tx: mpsc::Sender>, } @@ -289,17 +253,17 @@ struct SubsystemInstance { /// [`Overseer`]: struct.Overseer.html /// [`Subsystem`]: trait.Subsystem.html /// [`SubsystemJob`]: trait.SubsystemJob.html -pub struct SubsystemContext{ +#[derive(Debug)] +pub struct OverseerSubsystemContext{ rx: mpsc::Receiver>, tx: mpsc::Sender, } -impl SubsystemContext { - /// Try to asyncronously receive a message. - /// - /// This has to be used with caution, if you loop over this without - /// using `pending!()` macro you will end up with a busy loop! - pub async fn try_recv(&mut self) -> Result>, ()> { +#[async_trait::async_trait] +impl SubsystemContext for OverseerSubsystemContext { + type Message = M; + + async fn try_recv(&mut self) -> Result>, ()> { match poll!(self.rx.next()) { Poll::Ready(Some(msg)) => Ok(Some(msg)), Poll::Ready(None) => Err(()), @@ -307,13 +271,11 @@ impl SubsystemContext { } } - /// Receive a message. - pub async fn recv(&mut self) -> SubsystemResult> { + async fn recv(&mut self) -> SubsystemResult> { self.rx.next().await.ok_or(SubsystemError) } - /// Spawn a child task on the executor. - pub async fn spawn(&mut self, s: Pin + Send>>) -> SubsystemResult<()> { + async fn spawn(&mut self, s: Pin + Send>>) -> SubsystemResult<()> { let (tx, rx) = oneshot::channel(); self.tx.send(ToOverseer::SpawnJob { s, @@ -323,43 +285,25 @@ impl SubsystemContext { rx.await? } - /// Send a direct message to some other `Subsystem`, routed based on message type. - pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> { + async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { self.tx.send(ToOverseer::SubsystemMessage(msg)).await?; Ok(()) } - /// Send multiple direct messages to other `Subsystem`s, routed based on message type. - pub async fn send_msgs(&mut self, msgs: impl IntoIterator) - -> SubsystemResult<()> + async fn send_messages(&mut self, msgs: T) -> SubsystemResult<()> + where T: IntoIterator + Send, T::IntoIter: Send { let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok)); self.tx.send_all(&mut msgs).await?; Ok(()) } - - fn new(rx: mpsc::Receiver>, tx: mpsc::Sender) -> Self { - Self { - rx, - tx, - } - } } -/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`]. -/// -/// It is generic over the message type circulating in the system. -/// The idea that we want some type contaning persistent state that -/// can spawn actually running subsystems when asked to. -/// -/// [`Overseer`]: struct.Overseer.html -/// [`Subsystem`]: trait.Subsystem.html -pub trait Subsystem { - /// Start this `Subsystem` and return `SpawnedSubsystem`. - fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem; -} +/// A subsystem compatible with the overseer - one which can be run in the context of the +/// overseer. +pub type CompatibleSubsystem = Box> + Send>; /// A subsystem that we oversee. /// @@ -369,8 +313,8 @@ pub trait Subsystem { /// /// [`Subsystem`]: trait.Subsystem.html #[allow(dead_code)] -struct OverseenSubsystem { - subsystem: Box + Send>, +struct OverseenSubsystem { + subsystem: CompatibleSubsystem, instance: Option>, } @@ -457,10 +401,13 @@ where /// # }; /// /// struct ValidationSubsystem; - /// impl Subsystem for ValidationSubsystem { + /// + /// impl Subsystem for ValidationSubsystem + /// where C: SubsystemContext + /// { /// fn start( /// &mut self, - /// mut ctx: SubsystemContext, + /// mut ctx: C, /// ) -> SpawnedSubsystem { /// SpawnedSubsystem(Box::pin(async move { /// loop { @@ -471,10 +418,12 @@ where /// } /// /// struct CandidateBackingSubsystem; - /// impl Subsystem for CandidateBackingSubsystem { + /// impl Subsystem for CandidateBackingSubsystem + /// where C: SubsystemContext + /// { /// fn start( /// &mut self, - /// mut ctx: SubsystemContext, + /// mut ctx: C, /// ) -> SpawnedSubsystem { /// SpawnedSubsystem(Box::pin(async move { /// loop { @@ -508,8 +457,8 @@ where /// ``` pub fn new( leaves: impl IntoIterator, - validation: Box + Send>, - candidate_backing: Box + Send>, + validation: CompatibleSubsystem, + candidate_backing: CompatibleSubsystem, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> { let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); @@ -698,15 +647,15 @@ where } } -fn spawn( +fn spawn( spawner: &mut S, futures: &mut FuturesUnordered>, streams: &mut StreamUnordered>, - mut s: Box + Send>, + mut s: CompatibleSubsystem, ) -> SubsystemResult> { let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY); let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY); - let ctx = SubsystemContext::new(to_rx, from_tx); + let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx }; let f = s.start(ctx); let handle = spawner.spawn_with_handle(f.0)?; @@ -733,8 +682,10 @@ mod tests { struct TestSubsystem1(mpsc::Sender); - impl Subsystem for TestSubsystem1 { - fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + impl Subsystem for TestSubsystem1 + where C: SubsystemContext + { + fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); SpawnedSubsystem(Box::pin(async move { let mut i = 0; @@ -756,14 +707,16 @@ mod tests { struct TestSubsystem2(mpsc::Sender); - impl Subsystem for TestSubsystem2 { - fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + impl Subsystem for TestSubsystem2 + where C: SubsystemContext + { + fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { let mut c: usize = 0; loop { if c < 10 { let (tx, _) = oneshot::channel(); - ctx.send_msg( + ctx.send_message( AllMessages::CandidateValidation( CandidateValidationMessage::Validate( Default::default(), @@ -796,8 +749,10 @@ mod tests { struct TestSubsystem4; - impl Subsystem for TestSubsystem4 { - fn start(&mut self, mut _ctx: SubsystemContext) -> SpawnedSubsystem { + impl Subsystem for TestSubsystem4 + where C: SubsystemContext + { + fn start(&mut self, mut _ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { // Do nothing and exit. })) @@ -881,8 +836,10 @@ mod tests { struct TestSubsystem5(mpsc::Sender); - impl Subsystem for TestSubsystem5 { - fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + impl Subsystem for TestSubsystem5 + where C: SubsystemContext + { + fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); SpawnedSubsystem(Box::pin(async move { @@ -905,8 +862,10 @@ mod tests { struct TestSubsystem6(mpsc::Sender); - impl Subsystem for TestSubsystem6 { - fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + impl Subsystem for TestSubsystem6 + where C: SubsystemContext + { + fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); SpawnedSubsystem(Box::pin(async move { diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs index d7342b9087fd..ca8a91dc97d5 100644 --- a/node/subsystem/src/lib.rs +++ b/node/subsystem/src/lib.rs @@ -105,8 +105,9 @@ pub type SubsystemResult = Result; /// [`Overseer`]: struct.Overseer.html /// [`SubsystemJob`]: trait.SubsystemJob.html #[async_trait] -pub trait SubsystemContext { - /// The message type of this context. +pub trait SubsystemContext: Send + 'static { + /// The message type of this context. Subsystems launched with this context will expect + /// to receive messages of this type. type Message: Send; /// Try to asynchronously receive a message. @@ -125,7 +126,8 @@ pub trait SubsystemContext { async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()>; /// Send multiple direct messages to other `Subsystem`s, routed based on message type. - async fn send_messages(&mut self, msgs: impl IntoIterator) -> SubsystemResult<()>; + async fn send_messages(&mut self, msgs: T) -> SubsystemResult<()> + where T: IntoIterator + Send, T::IntoIter: Send; } /// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`]. From 844a9d1683503c2f7b8b4c1c7a0661fc091302d2 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 24 Jun 2020 18:32:11 -0400 Subject: [PATCH 12/33] fix minimal example --- node/overseer/examples/minimal-example.rs | 25 ++++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 77b99a3a3b3f..0edc87a6b8db 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -28,16 +28,17 @@ use futures_timer::Delay; use kv_log_macro as log; use polkadot_primitives::parachain::{BlockData, PoVBlock}; -use polkadot_overseer::{Overseer, Subsystem, SubsystemContext, SpawnedSubsystem}; +use polkadot_overseer::Overseer; -use messages::{ - AllMessages, CandidateBackingMessage, FromOverseer, CandidateValidationMessage +use polkadot_subsystem::{Subsystem, SubsystemContext, SpawnedSubsystem, FromOverseer}; +use polkadot_subsystem::messages::{ + AllMessages, CandidateBackingMessage, CandidateValidationMessage }; struct Subsystem1; impl Subsystem1 { - async fn run(mut ctx: SubsystemContext) { + async fn run(mut ctx: impl SubsystemContext) { loop { match ctx.try_recv().await { Ok(Some(msg)) => { @@ -56,7 +57,7 @@ impl Subsystem1 { Delay::new(Duration::from_secs(1)).await; let (tx, _) = oneshot::channel(); - ctx.send_msg(AllMessages::CandidateValidation( + ctx.send_message(AllMessages::CandidateValidation( CandidateValidationMessage::Validate( Default::default(), Default::default(), @@ -70,8 +71,10 @@ impl Subsystem1 { } } -impl Subsystem for Subsystem1 { - fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem { +impl Subsystem for Subsystem1 + where C: SubsystemContext +{ + fn start(&mut self, ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { Self::run(ctx).await; })) @@ -81,7 +84,7 @@ impl Subsystem for Subsystem1 { struct Subsystem2; impl Subsystem2 { - async fn run(mut ctx: SubsystemContext) { + async fn run(mut ctx: impl SubsystemContext) { ctx.spawn(Box::pin(async { loop { log::info!("Job tick"); @@ -105,8 +108,10 @@ impl Subsystem2 { } } -impl Subsystem for Subsystem2 { - fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem { +impl Subsystem for Subsystem2 + where C: SubsystemContext +{ + fn start(&mut self, ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { Self::run(ctx).await; })) From cae15618ef4c67b01876147fa3542720e203940f Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 24 Jun 2020 18:33:25 -0400 Subject: [PATCH 13/33] fix overseer doc test --- node/overseer/src/lib.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index a0026812ffb3..6f801989188d 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -395,9 +395,10 @@ where /// # use std::time::Duration; /// # use futures::{executor, pin_mut, select, FutureExt}; /// # use futures_timer::Delay; - /// # use polkadot_overseer::{ - /// # Overseer, Subsystem, SpawnedSubsystem, SubsystemContext, - /// # CandidateValidationMessage, CandidateBackingMessage, + /// # use polkadot_overseer::Overseer; + /// # use polkadot_subsystem::{ + /// # Subsystem, SpawnedSubsystem, SubsystemContext, + /// # messages::{CandidateValidationMessage, CandidateBackingMessage}, /// # }; /// /// struct ValidationSubsystem; From 2c9be735ddd1463be9b69d6f65049a87dbe3d9b1 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 24 Jun 2020 18:43:21 -0400 Subject: [PATCH 14/33] update network-bridge crate --- Cargo.lock | 1 - node/network/bridge/Cargo.toml | 1 - node/network/bridge/src/lib.rs | 24 ++++++++++++------------ 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 639b23c04318..9e9b7222d953 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4274,7 +4274,6 @@ dependencies = [ "parking_lot 0.10.2", "polkadot-node-primitives", "polkadot-node-subsystem", - "polkadot-overseer", "polkadot-primitives", "sc-network", "sp-runtime", diff --git a/node/network/bridge/Cargo.toml b/node/network/bridge/Cargo.toml index ce9e9545774d..9d9faa05a486 100644 --- a/node/network/bridge/Cargo.toml +++ b/node/network/bridge/Cargo.toml @@ -12,7 +12,6 @@ streamunordered = "0.5.1" polkadot-primitives = { path = "../../../primitives" } node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } parity-scale-codec = "1.3.0" -overseer = { package = "polkadot-overseer", path = "../../overseer" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index c71abd9f33e4..1aa60ceac73b 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -27,10 +27,8 @@ use sc_network::{ }; use sp_runtime::ConsensusEngineId; -use messages::{ - NetworkBridgeEvent, NetworkBridgeMessage, FromOverseer, OverseerSignal, AllMessages, -}; -use overseer::{Subsystem, SubsystemContext, SpawnedSubsystem}; +use polkadot_subsystem::{FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem}; +use polkadot_subsystem::messages::{NetworkBridgeEvent, NetworkBridgeMessage, AllMessages,}; use node_primitives::{ProtocolId, View}; use polkadot_primitives::{Block, Hash}; @@ -111,8 +109,10 @@ impl NetworkBridge { } } -impl Subsystem for NetworkBridge { - fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem { +impl Subsystem for NetworkBridge + where C: SubsystemContext +{ + fn start(&mut self, ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(run_network(self.0.clone(), ctx).boxed()) } } @@ -137,7 +137,7 @@ enum Action { } fn action_from_overseer_message( - res: overseer::SubsystemResult>, + res: polkadot_subsystem::SubsystemResult>, ) -> Action { match res { Ok(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent))) @@ -205,14 +205,14 @@ fn construct_view(live_heads: &[Hash]) -> View { async fn dispatch_update_to_all( update: NetworkBridgeEvent, event_producers: impl IntoIterator AllMessages>, - ctx: &mut SubsystemContext, -) -> overseer::SubsystemResult<()> { + ctx: &mut impl SubsystemContext, +) -> polkadot_subsystem::SubsystemResult<()> { // collect messages here to avoid the borrow lasting across await boundary. let messages: Vec<_> = event_producers.into_iter() .map(|producer| producer(update.clone())) .collect(); - ctx.send_msgs(messages).await + ctx.send_messages(messages).await } async fn update_view( @@ -236,7 +236,7 @@ async fn update_view( Some(NetworkBridgeEvent::OurViewChange(local_view.clone())) } -async fn run_network(net: impl Network, mut ctx: SubsystemContext) { +async fn run_network(net: impl Network, mut ctx: impl SubsystemContext) { let mut event_stream = net.event_stream().fuse(); // Most recent heads are at the back. @@ -389,7 +389,7 @@ async fn run_network(net: impl Network, mut ctx: SubsystemContext Date: Wed, 24 Jun 2020 20:10:59 -0400 Subject: [PATCH 15/33] write a subsystem test-helpers crate --- Cargo.lock | 9 ++ Cargo.toml | 4 +- node/test-helpers/subsystem/Cargo.toml | 11 +++ node/test-helpers/subsystem/src/lib.rs | 109 +++++++++++++++++++++++++ 4 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 node/test-helpers/subsystem/Cargo.toml create mode 100644 node/test-helpers/subsystem/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 9e9b7222d953..57b8b8934c98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4695,6 +4695,15 @@ dependencies = [ "sp-core", ] +[[package]] +name = "polkadot-subsystem-test-helpers" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures 0.3.5", + "polkadot-node-subsystem", +] + [[package]] name = "polkadot-test-runtime" version = "0.8.10" diff --git a/Cargo.toml b/Cargo.toml index f5fcd0335eb8..697992e68f69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,12 +42,12 @@ members = [ "service", "validation", + "node/network/bridge", "node/overseer", "node/primitives", "node/service", "node/subsystem", - - "node/network/bridge", + "node/test-helpers/subsystem", "parachain/test-parachains", "parachain/test-parachains/adder", diff --git a/node/test-helpers/subsystem/Cargo.toml b/node/test-helpers/subsystem/Cargo.toml new file mode 100644 index 000000000000..c6acd42d1b06 --- /dev/null +++ b/node/test-helpers/subsystem/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "polkadot-subsystem-test-helpers" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" +description = "Helpers for testing subsystems" + +[dependencies] +futures = "0.3.5" +async-trait = "0.1" +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } diff --git a/node/test-helpers/subsystem/src/lib.rs b/node/test-helpers/subsystem/src/lib.rs new file mode 100644 index 000000000000..72fc93eba459 --- /dev/null +++ b/node/test-helpers/subsystem/src/lib.rs @@ -0,0 +1,109 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Utilities for testing subsystems. + +use polkadot_subsystem::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError}; +use polkadot_subsystem::messages::AllMessages; + +use futures::prelude::*; +use futures::channel::mpsc; +use futures::task::{Spawn, SpawnExt}; +use futures::poll; + +use std::pin::Pin; +use std::task::Poll; + +/// A test subsystem context. +pub struct TestSubsystemContext { + tx: mpsc::UnboundedSender, + rx: mpsc::UnboundedReceiver>, + spawn: S, +} + +#[async_trait::async_trait] +impl SubsystemContext for TestSubsystemContext { + type Message = M; + + async fn try_recv(&mut self) -> Result>, ()> { + match poll!(self.rx.next()) { + Poll::Ready(Some(msg)) => Ok(Some(msg)), + Poll::Ready(None) => Err(()), + Poll::Pending => Ok(None), + } + } + + async fn recv(&mut self) -> SubsystemResult> { + self.rx.next().await.ok_or(SubsystemError) + } + + async fn spawn(&mut self, s: Pin + Send>>) -> SubsystemResult<()> { + self.spawn.spawn(s).map_err(Into::into) + } + + async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { + self.tx.unbounded_send(msg).expect("test overseer no longer live"); + Ok(()) + } + + async fn send_messages(&mut self, msgs: T) -> SubsystemResult<()> + where T: IntoIterator + Send, T::IntoIter: Send + { + for msg in msgs { + self.tx.unbounded_send(msg).expect("test overseer no longer live"); + } + + Ok(()) + } +} + +/// A handle for interacting with the subsystem context. +pub struct TestSubsystemContextHandle { + tx: mpsc::UnboundedSender>, + rx: mpsc::UnboundedReceiver, +} + +impl TestSubsystemContextHandle { + /// Send a message or signal to the subsystem. + pub fn send(&self, from_overseer: FromOverseer) { + self.tx.unbounded_send(from_overseer).expect("Test subsystem no longer live"); + } + + /// Receive the next message from the subsystem. + pub async fn recv(&mut self) -> AllMessages { + self.rx.next().await.expect("Test subsystem no longer live") + } +} + +/// Make a test subsystem context. +pub fn make_subsystem_context(spawn: S) + -> (TestSubsystemContext, TestSubsystemContextHandle) +{ + let (overseer_tx, overseer_rx) = mpsc::unbounded(); + let (all_messages_tx, all_messages_rx) = mpsc::unbounded(); + + ( + TestSubsystemContext { + tx: all_messages_tx, + rx: overseer_rx, + spawn, + }, + TestSubsystemContextHandle { + tx: overseer_tx, + rx: all_messages_rx + }, + ) +} From 8ac62691a9c7aa81adcd3e90161702d35c65325f Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 24 Jun 2020 20:30:14 -0400 Subject: [PATCH 16/33] write a network test helper for network-bridge --- Cargo.lock | 1 + node/network/bridge/Cargo.toml | 1 + node/network/bridge/src/lib.rs | 97 ++++++++++++++++++++++++++++------ 3 files changed, 84 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 57b8b8934c98..1019b2fa579e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4275,6 +4275,7 @@ dependencies = [ "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-primitives", + "polkadot-subsystem-test-helpers", "sc-network", "sp-runtime", "streamunordered", diff --git a/node/network/bridge/Cargo.toml b/node/network/bridge/Cargo.toml index 9d9faa05a486..809ec0cc5c62 100644 --- a/node/network/bridge/Cargo.toml +++ b/node/network/bridge/Cargo.toml @@ -18,3 +18,4 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys [dev-dependencies] parking_lot = "0.10.0" +subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 1aa60ceac73b..1bd13f9ad853 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -403,42 +403,109 @@ async fn run_network(net: impl Network, mut ctx: impl SubsystemContext), } + // The subsystem's view of the network - only supports a single call to `event_stream`. #[derive(Clone)] struct TestNetwork { net_events: Arc>>>, - outgoing_tx: mpsc::UnboundedSender, + action_tx: mpsc::UnboundedSender, } - struct TestHost { - outgoing_rx: mpsc::UnboundedReceiver, + // The test's view of the network. This receives updates from the subsystem in the form + // of `NetworkAction`s. + struct TestNetworkHandle { + action_rx: mpsc::UnboundedReceiver, net_tx: mpsc::UnboundedSender, } - fn new_test_host() -> ( + fn new_test_network() -> ( TestNetwork, - mpsc::UnboundedSender, - mpsc::UnboundedReceiver, + TestNetworkHandle, ) { let (net_tx, net_rx) = mpsc::unbounded(); - let (outgoing_tx, outgoing_rx) = mpsc::unbounded(); + let (action_tx, action_rx) = mpsc::unbounded(); - let net = TestNetwork { - net_events: Arc::new(Mutex::new(Some(net_rx))), - outgoing_tx, - }; + ( + TestNetwork { + net_events: Arc::new(Mutex::new(Some(net_rx))), + action_tx, + }, + TestNetworkHandle { + action_rx, + net_tx, + }, + ) + } + + impl Network for TestNetwork { + fn event_stream(&self) -> BoxStream { + self.net_events.lock() + .take() + .expect("Subsystem made more than one call to `event_stream`") + .boxed() + } + + fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) -> BoxFuture<()> { + let x = async move { + self.action_tx.unbounded_send(NetworkAction::ReputationChange(who, cost_benefit)) + .expect("test hung up on network"); + }; + + x.boxed() + } + + fn write_notification(&self, who: PeerId, message: Vec) -> BoxFuture<()> { + let x = async move { + self.action_tx.unbounded_send(NetworkAction::Message(who, message)) + .expect("test hung up on network"); + }; - (net, net_tx, outgoing_rx) + x.boxed() + } + } + + impl TestNetworkHandle { + async fn next_network_action(&mut self) -> NetworkAction { + self.action_rx.next().await.expect("subsystem concluded early") + } + + fn connect_peer(&self, peer: PeerId, role: ObservedRole) { + self.send_network_event(NetworkEvent::NotificationStreamOpened { + remote: peer, + engine_id: POLKADOT_ENGINE_ID, + role, + }); + } + + fn disconnect_peer(&self, peer: PeerId) { + self.send_network_event(NetworkEvent::NotificationStreamClosed { + remote: peer, + engine_id: POLKADOT_ENGINE_ID, + }); + } + + fn peer_message(&self, peer: PeerId, message: Vec) { + self.send_network_event(NetworkEvent::NotificationsReceived { + remote: peer, + messages: vec![(POLKADOT_ENGINE_ID, message.into())], + }); + } + + fn send_network_event(&self, event: NetworkEvent) { + self.net_tx.unbounded_send(event).expect("subsystem concluded early"); + } } - impl TestNetwork { + #[test] + fn sends_view_updates_to_peers() { } From 51ff607e45b5d57c711d80b2df6a38218d3dcc41 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 25 Jun 2020 17:30:44 -0400 Subject: [PATCH 17/33] set up (broken) view test --- node/network/bridge/src/lib.rs | 54 ++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 1bd13f9ad853..71764679dbb8 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -228,6 +228,7 @@ async fn update_view( let message = WireMessage::ViewUpdate(new_view.clone()).encode(); let write_all = peers.keys().cloned().map(|peer| { + println!("Sending view update message to peer {:?}", peer); net.write_notification(peer, message.clone()) }); @@ -283,6 +284,7 @@ async fn run_network(net: impl Network, mut ctx: impl SubsystemContext { + println!("New relay parent. Updating view."); live_heads.push(relay_parent); if let Some(view_update) = update_view(&peers, &live_heads, &net, &mut local_view).await @@ -314,6 +316,7 @@ async fn run_network(net: impl Network, mut ctx: impl SubsystemContext { + println!("New peer {:?}", peer); match peers.entry(peer.clone()) { Entry::Occupied(_) => continue, Entry::Vacant(vacant) => { @@ -404,9 +407,11 @@ async fn run_network(net: impl Network, mut ctx: impl SubsystemContext), @@ -473,10 +478,21 @@ mod tests { } impl TestNetworkHandle { + // Get the next network action. async fn next_network_action(&mut self) -> NetworkAction { self.action_rx.next().await.expect("subsystem concluded early") } + // Wait for the next N network actions. + async fn next_network_actions(&mut self, n: usize) -> Vec { + let mut v = Vec::with_capacity(n); + for _ in 0..n { + v.push(self.next_network_action().await); + } + + v + } + fn connect_peer(&self, peer: PeerId, role: ObservedRole) { self.send_network_event(NetworkEvent::NotificationStreamOpened { remote: peer, @@ -504,9 +520,47 @@ mod tests { } } + fn network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) -> bool { + actions.iter().find(|&x| x == action).is_some() + } + #[test] fn sends_view_updates_to_peers() { + let pool = ThreadPool::new().unwrap(); + + let (network, mut network_handle) = new_test_network(); + let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool.clone()); + pool.spawn_ok(run_network(network, context)); + + let protocol_id: ProtocolId = *b"test"; + + executor::block_on(async move { + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + network_handle.connect_peer(peer_a.clone(), ObservedRole::Full); + network_handle.connect_peer(peer_b.clone(), ObservedRole::Full); + + let hash_a = Hash::from([1; 32]); + + virtual_overseer.send(FromOverseer::Communication { + msg: NetworkBridgeMessage::RegisterEventProducer(protocol_id, ) + }); + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a))); + + let actions = network_handle.next_network_actions(2).await; + let wire_message = WireMessage::ViewUpdate(View(vec![hash_a])).encode(); + assert!(network_actions_contains( + &actions, + &NetworkAction::Message(peer_a, wire_message.clone()), + )); + assert!(network_actions_contains( + &actions, + &NetworkAction::Message(peer_b, wire_message.clone()), + )); + }); } From 960029b64cbe2138186f1ca2ef4f934e6d263f38 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 25 Jun 2020 18:33:29 -0400 Subject: [PATCH 18/33] Revamp network to be more async-friendly and not require Sync --- node/network/bridge/src/lib.rs | 218 +++++++++++++++++++++++---------- 1 file changed, 150 insertions(+), 68 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 71764679dbb8..0f5799e44a22 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -27,12 +27,16 @@ use sc_network::{ }; use sp_runtime::ConsensusEngineId; -use polkadot_subsystem::{FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem}; -use polkadot_subsystem::messages::{NetworkBridgeEvent, NetworkBridgeMessage, AllMessages,}; +use polkadot_subsystem::{ + FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, + SubsystemResult, +}; +use polkadot_subsystem::messages::{NetworkBridgeEvent, NetworkBridgeMessage, AllMessages}; use node_primitives::{ProtocolId, View}; use polkadot_primitives::{Block, Hash}; use std::collections::hash_map::{HashMap, Entry}; +use std::pin::Pin; use std::sync::Arc; const MAX_VIEW_HEADS: usize = 5; @@ -66,33 +70,92 @@ pub fn notifications_protocol_info() -> (ConsensusEngineId, std::borrow::Cow<'st (POLKADOT_ENGINE_ID, POLKADOT_PROTOCOL_NAME.into()) } +/// An action to be carried out by the network. +#[derive(PartialEq)] +pub enum NetworkAction { + /// Note a change in reputation for a peer. + ReputationChange(PeerId, ReputationChange), + /// Write a notification to a given peer. + WriteNotification(PeerId, Vec), +} + /// An abstraction over networking for the purposes of this subsystem. -pub trait Network: Clone + Send + Sync + 'static { +pub trait Network: Clone + Send + 'static { /// Get a stream of all events occurring on the network. This may include events unrelated /// to the Polkadot protocol - the user of this function should filter only for events related /// to the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID). - fn event_stream(&self) -> BoxStream; + fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>; + + /// Get access to an underlying sink for all network actions. + fn action_sink<'a>(&'a mut self) -> Pin< + Box + Send + 'a> + >; /// Report a given peer as either beneficial (+) or costly (-) according to the given scalar. - fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) -> BoxFuture<()>; + fn report_peer(&mut self, who: PeerId, cost_benefit: ReputationChange) + -> BoxFuture> + { + async move { + self.action_sink().send(NetworkAction::ReputationChange(who, cost_benefit)).await + }.boxed() + } /// Write a notification to a peer on the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID) topic. - fn write_notification(&self, who: PeerId, message: Vec) -> BoxFuture<()>; + fn write_notification(&mut self, who: PeerId, message: Vec) + -> BoxFuture> + { + async move { + self.action_sink().send(NetworkAction::WriteNotification(who, message)).await + }.boxed() + } } impl Network for Arc> { - fn event_stream(&self) -> BoxStream { + fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed() } - fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) -> BoxFuture<()> { - sc_network::NetworkService::report_peer(self, who, cost_benefit); - future::ready(()).boxed() - } + fn action_sink<'a>(&'a mut self) + -> Pin + Send + 'a>> + { + use futures::task::{Poll, Context}; + + // wrapper around a NetworkService to make it act like a sink. + struct ActionSink<'b>(&'b sc_network::NetworkService); - fn write_notification(&self, who: PeerId, message: Vec) -> BoxFuture<()> { - sc_network::NetworkService::write_notification(self, who, POLKADOT_ENGINE_ID, message); - future::ready(()).boxed() + impl<'b> Sink for ActionSink<'b> { + type Error = SubsystemError; + + fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> { + match action { + NetworkAction::ReputationChange(peer, cost_benefit) => self.0.report_peer( + peer, + cost_benefit, + ), + NetworkAction::WriteNotification(peer, message) => self.0.write_notification( + peer, + POLKADOT_ENGINE_ID, + message, + ), + } + + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + } + + Box::pin(ActionSink(&**self)) } } @@ -113,7 +176,9 @@ impl Subsystem for NetworkBridge where C: SubsystemContext { fn start(&mut self, ctx: C) -> SpawnedSubsystem { - SpawnedSubsystem(run_network(self.0.clone(), ctx).boxed()) + // Swallow error because failure is fatal to the node and we log with more precision + // within `run_network`. + SpawnedSubsystem(run_network(self.0.clone(), ctx).map(|_| ()).boxed()) } } @@ -218,26 +283,27 @@ async fn dispatch_update_to_all( async fn update_view( peers: &HashMap, live_heads: &[Hash], - net: &impl Network, + net: &mut impl Network, local_view: &mut View, -) -> Option { +) -> SubsystemResult> { let new_view = construct_view(live_heads); - if *local_view == new_view { return None } + if *local_view == new_view { return Ok(None) } *local_view = new_view.clone(); let message = WireMessage::ViewUpdate(new_view.clone()).encode(); - let write_all = peers.keys().cloned().map(|peer| { - println!("Sending view update message to peer {:?}", peer); - net.write_notification(peer, message.clone()) - }); + let notifications = peers.keys().cloned() + .map(move |peer| Ok(NetworkAction::WriteNotification(peer, message.clone()))); - future::join_all(write_all).await; + net.action_sink().send_all(&mut stream::iter(notifications)).await?; - Some(NetworkBridgeEvent::OurViewChange(local_view.clone())) + Ok(Some(NetworkBridgeEvent::OurViewChange(local_view.clone()))) } -async fn run_network(net: impl Network, mut ctx: impl SubsystemContext) { +async fn run_network( + mut net: N, + mut ctx: impl SubsystemContext, +) -> SubsystemResult<()> { let mut event_stream = net.event_stream().fuse(); // Most recent heads are at the back. @@ -270,47 +336,64 @@ async fn run_network(net: impl Network, mut ctx: impl SubsystemContext { - let message = WireMessage::ProtocolMessage(protocol, message).encode(); - - for peer in peers.iter().skip(1).cloned() { - net.write_notification(peer, message.clone()).await; - } - - if let Some(peer) = peers.first() { - net.write_notification(peer.clone(), message).await; - } + let mut message_producer = stream::iter({ + let n_peers = peers.len(); + let mut message = Some( + WireMessage::ProtocolMessage(protocol, message).encode() + ); + + peers.iter().cloned().enumerate().map(move |(i, peer)| { + // optimization: avoid cloning the message for the last peer in the + // list. The message payload can be quite large. If the underlying + // network used `Bytes` this would not be necessary. + let message =if i == n_peers - 1 { + message.take() + .expect("Only taken in last iteration of loop, never afterwards; qed") + } else { + message.as_ref() + .expect("Only taken in last iteratio of loop, we are not there yet; qed") + .clone() + }; + + Ok(NetworkAction::WriteNotification(peer, message)) + }) + }); + + net.action_sink().send_all(&mut message_producer).await?; } Action::ReportPeer(peer, rep) => { - net.report_peer(peer, rep).await + // let binding required here to avoid unnecessary sync bound. + let report_fut = net.report_peer(peer, rep); + report_fut.await? } Action::StartWork(relay_parent) => { println!("New relay parent. Updating view."); live_heads.push(relay_parent); if let Some(view_update) - = update_view(&peers, &live_heads, &net, &mut local_view).await + = update_view(&peers, &live_heads, &mut net, &mut local_view).await? { - if let Err(_) = dispatch_update_to_all( + if let Err(e) = dispatch_update_to_all( view_update, event_producers.values(), &mut ctx, ).await { log::warn!("Aborting - Failure to dispatch messages to overseer"); - return + return Err(e) } } } Action::StopWork(relay_parent) => { live_heads.retain(|h| h != &relay_parent); if let Some(view_update) - = update_view(&peers, &live_heads, &net, &mut local_view).await + = update_view(&peers, &live_heads, &mut net, &mut local_view).await? { - if let Err(_) = dispatch_update_to_all( + if let Err(e) = dispatch_update_to_all( view_update, event_producers.values(), &mut ctx, ).await { log::warn!("Aborting - Failure to dispatch messages to overseer"); - return + return Err(e) } } } @@ -324,26 +407,26 @@ async fn run_network(net: impl Network, mut ctx: impl SubsystemContext { if peers.remove(&peer).is_some() { - if let Err(_) = dispatch_update_to_all( + if let Err(e) = dispatch_update_to_all( NetworkBridgeEvent::PeerDisconnected(peer), event_producers.values(), &mut ctx, ).await { log::warn!("Aborting - Failure to dispatch messages to overseer"); - return + return Err(e) } } }, @@ -358,7 +441,12 @@ async fn run_network(net: impl Network, mut ctx: impl SubsystemContext { if new_view.0.len() > MAX_VIEW_HEADS { - net.report_peer(peer.clone(), MALFORMED_VIEW_COST).await; + let report_fut = net.report_peer( + peer.clone(), + MALFORMED_VIEW_COST, + ); + + report_fut.await?; continue } @@ -380,7 +468,12 @@ async fn run_network(net: impl Network, mut ctx: impl SubsystemContext { - net.report_peer(peer.clone(), UNKNOWN_PROTO_COST).await; + let report_fut = net.report_peer( + peer.clone(), + UNKNOWN_PROTO_COST, + ); + + report_fut.await?; None } }; @@ -392,13 +485,14 @@ async fn run_network(net: impl Network, mut ctx: impl SubsystemContext return, + Action::Abort => return Ok(()), } } } @@ -411,12 +505,6 @@ mod tests { use std::sync::Arc; use parking_lot::Mutex; - #[derive(PartialEq)] - enum NetworkAction { - ReputationChange(PeerId, ReputationChange), - Message(PeerId, Vec), - } - // The subsystem's view of the network - only supports a single call to `event_stream`. #[derive(Clone)] struct TestNetwork { @@ -451,14 +539,14 @@ mod tests { } impl Network for TestNetwork { - fn event_stream(&self) -> BoxStream { + fn event_stream(&mut self) -> BoxStream { self.net_events.lock() .take() .expect("Subsystem made more than one call to `event_stream`") .boxed() } - fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) -> BoxFuture<()> { + fn report_peer(&mut self, who: PeerId, cost_benefit: ReputationChange) -> BoxFuture<()> { let x = async move { self.action_tx.unbounded_send(NetworkAction::ReputationChange(who, cost_benefit)) .expect("test hung up on network"); @@ -467,9 +555,9 @@ mod tests { x.boxed() } - fn write_notification(&self, who: PeerId, message: Vec) -> BoxFuture<()> { + fn write_notification(&mut self, who: PeerId, message: Vec) -> BoxFuture<()> { let x = async move { - self.action_tx.unbounded_send(NetworkAction::Message(who, message)) + self.action_tx.unbounded_send(NetworkAction::WriteNotification(who, message)) .expect("test hung up on network"); }; @@ -532,8 +620,6 @@ mod tests { let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool.clone()); pool.spawn_ok(run_network(network, context)); - let protocol_id: ProtocolId = *b"test"; - executor::block_on(async move { let peer_a = PeerId::random(); let peer_b = PeerId::random(); @@ -543,22 +629,18 @@ mod tests { let hash_a = Hash::from([1; 32]); - virtual_overseer.send(FromOverseer::Communication { - msg: NetworkBridgeMessage::RegisterEventProducer(protocol_id, ) - }); - virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a))); let actions = network_handle.next_network_actions(2).await; let wire_message = WireMessage::ViewUpdate(View(vec![hash_a])).encode(); assert!(network_actions_contains( &actions, - &NetworkAction::Message(peer_a, wire_message.clone()), + &NetworkAction::WriteNotification(peer_a, wire_message.clone()), )); assert!(network_actions_contains( &actions, - &NetworkAction::Message(peer_b, wire_message.clone()), + &NetworkAction::WriteNotification(peer_b, wire_message.clone()), )); }); } From eb52f9a205c51b9002b29dee0919985d66b72b0f Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 25 Jun 2020 18:33:39 -0400 Subject: [PATCH 19/33] fix spacing --- node/network/bridge/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 0f5799e44a22..223de020aabb 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -346,7 +346,7 @@ async fn run_network( // optimization: avoid cloning the message for the last peer in the // list. The message payload can be quite large. If the underlying // network used `Bytes` this would not be necessary. - let message =if i == n_peers - 1 { + let message = if i == n_peers - 1 { message.take() .expect("Only taken in last iteration of loop, never afterwards; qed") } else { From f6526c4d9321974a4e63075b62e1f05413945b5e Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 25 Jun 2020 18:40:47 -0400 Subject: [PATCH 20/33] fix test compilation --- node/network/bridge/src/lib.rs | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 223de020aabb..3c2630d3dc15 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -502,6 +502,7 @@ mod tests { use super::*; use futures::channel::mpsc; use futures::executor::{self, ThreadPool}; + use std::sync::Arc; use parking_lot::Mutex; @@ -539,29 +540,17 @@ mod tests { } impl Network for TestNetwork { - fn event_stream(&mut self) -> BoxStream { + fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { self.net_events.lock() .take() .expect("Subsystem made more than one call to `event_stream`") .boxed() } - fn report_peer(&mut self, who: PeerId, cost_benefit: ReputationChange) -> BoxFuture<()> { - let x = async move { - self.action_tx.unbounded_send(NetworkAction::ReputationChange(who, cost_benefit)) - .expect("test hung up on network"); - }; - - x.boxed() - } - - fn write_notification(&mut self, who: PeerId, message: Vec) -> BoxFuture<()> { - let x = async move { - self.action_tx.unbounded_send(NetworkAction::WriteNotification(who, message)) - .expect("test hung up on network"); - }; - - x.boxed() + fn action_sink<'a>(&'a mut self) + -> Pin + Send + 'a>> + { + Box::pin((&mut self.action_tx).sink_map_err(Into::into)) } } @@ -618,7 +607,11 @@ mod tests { let (network, mut network_handle) = new_test_network(); let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool.clone()); - pool.spawn_ok(run_network(network, context)); + pool.spawn_ok( + run_network(network, context) + .map_err(|_| panic!("subsystem execution failed")) + .map(|_| ()) + ); executor::block_on(async move { let peer_a = PeerId::random(); From 0f5a1b109e85cf4041da163c468ab8c299801c5b Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 25 Jun 2020 19:39:17 -0400 Subject: [PATCH 21/33] insert side-channel for actions --- Cargo.lock | 1 + node/network/bridge/Cargo.toml | 1 + node/network/bridge/src/lib.rs | 95 +++++++++++++++++++++++++++++----- 3 files changed, 85 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1019b2fa579e..9d9d67a34c4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4267,6 +4267,7 @@ dependencies = [ name = "polkadot-network-bridge" version = "0.1.0" dependencies = [ + "assert_matches", "futures 0.3.5", "futures-timer 3.0.2", "log 0.4.8", diff --git a/node/network/bridge/Cargo.toml b/node/network/bridge/Cargo.toml index 809ec0cc5c62..4f6c8631e2f9 100644 --- a/node/network/bridge/Cargo.toml +++ b/node/network/bridge/Cargo.toml @@ -19,3 +19,4 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys [dev-dependencies] parking_lot = "0.10.0" subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } +assert_matches = "1.3.0" diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 3c2630d3dc15..a4037c2afa72 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -54,7 +54,7 @@ const MALFORMED_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Malformed view"); /// Messages received on the network. -#[derive(Encode, Decode)] +#[derive(Debug, Encode, Decode, Clone)] pub enum WireMessage { /// A message from a peer on a specific protocol. #[codec(index = "1")] @@ -178,7 +178,7 @@ impl Subsystem for NetworkBridge fn start(&mut self, ctx: C) -> SpawnedSubsystem { // Swallow error because failure is fatal to the node and we log with more precision // within `run_network`. - SpawnedSubsystem(run_network(self.0.clone(), ctx).map(|_| ()).boxed()) + SpawnedSubsystem(run_network(self.0.clone(), ctx, |_| ()).map(|_| ()).boxed()) } } @@ -187,6 +187,7 @@ struct PeerData { view: View, } +#[derive(Debug)] enum Action { RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages), SendMessage(Vec, ProtocolId, Vec), @@ -227,7 +228,7 @@ fn action_from_overseer_message( fn action_from_network_message(event: Option) -> Option { match event { None => { - log::warn!("Shutting down Network Bridge: underlying event stream concluded"); + log::info!("Shutting down Network Bridge: underlying event stream concluded"); Some(Action::Abort) } Some(NetworkEvent::Dht(_)) => None, @@ -303,6 +304,7 @@ async fn update_view( async fn run_network( mut net: N, mut ctx: impl SubsystemContext, + action_inspector: impl Fn(&Action), // side-channel for tests to inspect internals ) -> SubsystemResult<()> { let mut event_stream = net.event_stream().fuse(); @@ -330,6 +332,8 @@ async fn run_network( } }; + action_inspector(&action); + match action { Action::RegisterEventProducer(protocol_id, event_producer) => { // insert only if none present. @@ -505,6 +509,7 @@ mod tests { use std::sync::Arc; use parking_lot::Mutex; + use assert_matches::assert_matches; // The subsystem's view of the network - only supports a single call to `event_stream`. #[derive(Clone)] @@ -520,6 +525,47 @@ mod tests { net_tx: mpsc::UnboundedSender, } + // a record of an action internal to the network. + #[derive(Debug)] + enum InternalActionRecord { + RegisterEventProducer(ProtocolId), + SendMessage(Vec, ProtocolId, Vec), + ReportPeer(PeerId, ReputationChange), + StartWork(Hash), + StopWork(Hash), + + PeerConnected(PeerId, ObservedRole), + PeerDisconnected(PeerId), + PeerMessages(PeerId, Vec), + + Abort, + } + + impl<'a> From<&'a Action> for InternalActionRecord { + fn from(action: &'a Action) -> Self { + match *action { + Action::RegisterEventProducer(protocol, _) + => InternalActionRecord::RegisterEventProducer(protocol), + Action::SendMessage(ref peers, protocol, ref message) + => InternalActionRecord::SendMessage(peers.clone(), protocol, message.clone()), + Action::ReportPeer(ref peer, rep) + => InternalActionRecord::ReportPeer(peer.clone(), rep.clone()), + + Action::StartWork(hash) => InternalActionRecord::StartWork(hash), + Action::StopWork(hash) => InternalActionRecord::StopWork(hash), + + Action::PeerConnected(ref peer, ref role) + => InternalActionRecord::PeerConnected(peer.clone(), role.clone()), + Action::PeerDisconnected(ref peer) + => InternalActionRecord::PeerDisconnected(peer.clone()), + Action::PeerMessages(ref peer, ref messages) + => InternalActionRecord::PeerMessages(peer.clone(), messages.clone()), + + Action::Abort => InternalActionRecord::Abort, + } + } + } + fn new_test_network() -> ( TestNetwork, TestNetworkHandle, @@ -597,6 +643,8 @@ mod tests { } } + // network actions are sensitive to ordering of `PeerId`s within a `HashMap`, so + // we need to use this to prevent fragile reliance on peer ordering. fn network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) -> bool { actions.iter().find(|&x| x == action).is_some() } @@ -606,20 +654,38 @@ mod tests { let pool = ThreadPool::new().unwrap(); let (network, mut network_handle) = new_test_network(); - let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool.clone()); - pool.spawn_ok( - run_network(network, context) - .map_err(|_| panic!("subsystem execution failed")) - .map(|_| ()) - ); - - executor::block_on(async move { + let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool); + let (action_tx, mut action_rx) = mpsc::unbounded::(); + + let network_bridge = run_network( + network, + context, + move |action| { let _ = action_tx.unbounded_send(action.into()); }, + ) + .map_err(|_| panic!("subsystem execution failed")) + .map(|_| ()); + + let test_fut = async move { let peer_a = PeerId::random(); let peer_b = PeerId::random(); network_handle.connect_peer(peer_a.clone(), ObservedRole::Full); network_handle.connect_peer(peer_b.clone(), ObservedRole::Full); + assert_matches!( + action_rx.next().await.unwrap(), + InternalActionRecord::PeerConnected(p, ObservedRole::Full) => { + assert_eq!(p, peer_a); + } + ); + + assert_matches!( + action_rx.next().await.unwrap(), + InternalActionRecord::PeerConnected(p, ObservedRole::Full) => { + assert_eq!(p, peer_b); + } + ); + let hash_a = Hash::from([1; 32]); virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a))); @@ -635,7 +701,12 @@ mod tests { &actions, &NetworkAction::WriteNotification(peer_b, wire_message.clone()), )); - }); + }; + + futures::pin_mut!(test_fut); + futures::pin_mut!(network_bridge); + + executor::block_on(future::select(test_fut, network_bridge)); } From 8f75746ebf9e345522932e7af5462acdfa015591 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 25 Jun 2020 19:47:57 -0400 Subject: [PATCH 22/33] Add some more message types to AllMessages --- node/overseer/src/lib.rs | 6 ++++++ node/subsystem/src/messages.rs | 14 ++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 6f801989188d..8fb8706be429 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -640,6 +640,12 @@ where let _ = s.tx.send(FromOverseer::Communication { msg }).await; } } + _ => { + // TODO: temporary catch-all until all subsystems are integrated with overseer. + // The overseer is not complete until this is an exhaustive match with all + // messages targeting an included subsystem. + // https://github.com/paritytech/polkadot/issues/1317 + } } } diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index 42203a13dea8..7fc5007ad6c0 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -207,4 +207,18 @@ pub enum AllMessages { CandidateValidation(CandidateValidationMessage), /// Message for the candidate backing subsystem. CandidateBacking(CandidateBackingMessage), + /// Message for the candidate selection subsystem. + CandidateSelection(CandidateSelectionMessage), + /// Message for the statement distribution subsystem. + StatementDistribution(StatementDistributionMessage), + /// Message for the availability distribution subsystem. + AvailabilityDistribution(AvailabilityDistributionMessage), + /// Message for the bitfield distribution subsystem. + BitfieldDistribution(BitfieldDistributionMessage), + /// Message for the Provisioner subsystem. + Provisioner(ProvisionerMessage), + /// Message for the Runtime API subsystem. + RuntimeApi(RuntimeApiMessage), + /// Message for the availability store subsystem. + AvailabilityStore(AvailabilityStoreMessage), } From 940216bd19d6160e158147b2738009ae13749bdc Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 25 Jun 2020 19:53:42 -0400 Subject: [PATCH 23/33] introduce a test harness --- node/network/bridge/src/lib.rs | 41 +++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index a4037c2afa72..d86f65642554 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -649,13 +649,18 @@ mod tests { actions.iter().find(|&x| x == action).is_some() } - #[test] - fn sends_view_updates_to_peers() { + struct TestHarness { + network_handle: TestNetworkHandle, + virtual_overseer: subsystem_test::TestSubsystemContextHandle, + action_rx: mpsc::UnboundedReceiver, + } + + fn test_harness>(test: impl FnOnce(TestHarness) -> T) { let pool = ThreadPool::new().unwrap(); - let (network, mut network_handle) = new_test_network(); + let (network, network_handle) = new_test_network(); let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool); - let (action_tx, mut action_rx) = mpsc::unbounded::(); + let (action_tx, action_rx) = mpsc::unbounded::(); let network_bridge = run_network( network, @@ -665,7 +670,23 @@ mod tests { .map_err(|_| panic!("subsystem execution failed")) .map(|_| ()); - let test_fut = async move { + let test_fut = test(TestHarness { + network_handle, + virtual_overseer, + action_rx, + }); + + futures::pin_mut!(test_fut); + futures::pin_mut!(network_bridge); + + executor::block_on(future::select(test_fut, network_bridge)); + } + + #[test] + fn sends_view_updates_to_peers() { + test_harness(|test_harness| async move { + let TestHarness { mut network_handle, virtual_overseer, mut action_rx } = test_harness; + let peer_a = PeerId::random(); let peer_b = PeerId::random(); @@ -701,16 +722,14 @@ mod tests { &actions, &NetworkAction::WriteNotification(peer_b, wire_message.clone()), )); - }; + }); + } - futures::pin_mut!(test_fut); - futures::pin_mut!(network_bridge); + #[test] + fn peer_view_updates_sent_via_overseer() { - executor::block_on(future::select(test_fut, network_bridge)); } - - // TODO [now]: our view updates are sent. // TODO [now]: peer view updates get sent via overseer. // TODO [now]: peer messages are sent via event producer From 859bd8bcb6be97f5199b78cd8a75b7272f30b0bb Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 26 Jun 2020 12:07:32 -0400 Subject: [PATCH 24/33] add some tests --- node/network/bridge/src/lib.rs | 283 +++++++++++++++++++++++++++++++-- node/subsystem/src/messages.rs | 2 + 2 files changed, 275 insertions(+), 10 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index d86f65642554..4b3fee2ee808 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -35,7 +35,8 @@ use polkadot_subsystem::messages::{NetworkBridgeEvent, NetworkBridgeMessage, All use node_primitives::{ProtocolId, View}; use polkadot_primitives::{Block, Hash}; -use std::collections::hash_map::{HashMap, Entry}; +use std::collections::btree_map::{BTreeMap, Entry as BEntry}; +use std::collections::hash_map::{HashMap, Entry as HEntry}; use std::pin::Pin; use std::sync::Arc; @@ -185,6 +186,8 @@ impl Subsystem for NetworkBridge struct PeerData { /// Latest view sent by the peer. view: View, + /// The role of the peer. + role: ObservedRole, } #[derive(Debug)] @@ -312,8 +315,8 @@ async fn run_network( let mut live_heads = Vec::with_capacity(MAX_VIEW_HEADS); let mut local_view = View(Vec::new()); - let mut peers = HashMap::new(); - let mut event_producers = HashMap::new(); + let mut peers: HashMap = HashMap::new(); + let mut event_producers = BTreeMap::new(); loop { let action = { @@ -337,7 +340,23 @@ async fn run_network( match action { Action::RegisterEventProducer(protocol_id, event_producer) => { // insert only if none present. - event_producers.entry(protocol_id).or_insert(event_producer); + if let BEntry::Vacant(entry) = event_producers.entry(protocol_id) { + let event_producer = entry.insert(event_producer); + + // send the event producer information on all connected peers. + let mut messages = Vec::with_capacity(peers.len() * 2); + for (peer, data) in &peers { + messages.push(event_producer( + NetworkBridgeEvent::PeerConnected(peer.clone(), data.role.clone()) + )); + + messages.push(event_producer( + NetworkBridgeEvent::PeerViewChange(peer.clone(), data.view.clone()) + )); + } + + ctx.send_messages(messages).await?; + } } Action::SendMessage(peers, protocol, message) => { let mut message_producer = stream::iter({ @@ -371,7 +390,6 @@ async fn run_network( report_fut.await? } Action::StartWork(relay_parent) => { - println!("New relay parent. Updating view."); live_heads.push(relay_parent); if let Some(view_update) = update_view(&peers, &live_heads, &mut net, &mut local_view).await? @@ -403,12 +421,12 @@ async fn run_network( } Action::PeerConnected(peer, role) => { - println!("New peer {:?}", peer); match peers.entry(peer.clone()) { - Entry::Occupied(_) => continue, - Entry::Vacant(vacant) => { + HEntry::Occupied(_) => continue, + HEntry::Vacant(vacant) => { vacant.insert(PeerData { view: View(Vec::new()), + role: role.clone(), }); if let Err(e) = dispatch_update_to_all( @@ -511,6 +529,8 @@ mod tests { use parking_lot::Mutex; use assert_matches::assert_matches; + use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage}; + // The subsystem's view of the network - only supports a single call to `event_stream`. #[derive(Clone)] struct TestNetwork { @@ -727,10 +747,253 @@ mod tests { #[test] fn peer_view_updates_sent_via_overseer() { + test_harness(|test_harness| async move { + let TestHarness { + network_handle, + mut virtual_overseer, + mut action_rx, + } = test_harness; + + let peer = PeerId::random(); + + let proto_statement = *b"abcd"; + let proto_bitfield = *b"wxyz"; + + network_handle.connect_peer(peer.clone(), ObservedRole::Full); + assert_matches!( + action_rx.next().await.unwrap(), + InternalActionRecord::PeerConnected(p, ObservedRole::Full) => { + assert_eq!(p, peer); + } + ); + + virtual_overseer.send(FromOverseer::Communication { + msg: NetworkBridgeMessage::RegisterEventProducer( + proto_statement, + |event| AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate(event) + ) + ), + }); + + virtual_overseer.send(FromOverseer::Communication { + msg: NetworkBridgeMessage::RegisterEventProducer( + proto_bitfield, + |event| AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate(event) + ) + ), + }); + + assert_matches!( + action_rx.next().await.unwrap(), + InternalActionRecord::RegisterEventProducer(x) if x == proto_statement + ); + + assert_matches!( + action_rx.next().await.unwrap(), + InternalActionRecord::RegisterEventProducer(x) if x == proto_bitfield + ); + + let view = View(vec![Hash::from([1u8; 32])]); + + // bridge will inform about all previously-connected peers. + { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full) + ) + ) if p == peer + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(p, v) + ) + ) if p == peer && v == View(Default::default()) + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full) + ) + ) if p == peer + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(p, v) + ) + ) if p == peer && v == View(Default::default()) + ); + } + + network_handle.peer_message( + peer.clone(), + WireMessage::ViewUpdate(view.clone()).encode(), + ); + // statement distribution message comes first because handlers are ordered by + // protocol ID. + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(p, v) + ) + ) => { + assert_eq!(p, peer); + assert_eq!(v, view); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(p, v) + ) + ) => { + assert_eq!(p, peer); + assert_eq!(v, view); + } + ); + }); } - // TODO [now]: peer view updates get sent via overseer. + #[test] + fn peer_messages_sent_via_overseer() { + test_harness(|test_harness| async move { + let TestHarness { + network_handle, + mut virtual_overseer, + mut action_rx, + } = test_harness; + + let peer = PeerId::random(); + + let proto_statement = *b"abcd"; + let proto_bitfield = *b"wxyz"; + + network_handle.connect_peer(peer.clone(), ObservedRole::Full); + assert_matches!( + action_rx.next().await.unwrap(), + InternalActionRecord::PeerConnected(p, ObservedRole::Full) => { + assert_eq!(p, peer); + } + ); + + virtual_overseer.send(FromOverseer::Communication { + msg: NetworkBridgeMessage::RegisterEventProducer( + proto_statement, + |event| AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate(event) + ) + ), + }); + + virtual_overseer.send(FromOverseer::Communication { + msg: NetworkBridgeMessage::RegisterEventProducer( + proto_bitfield, + |event| AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate(event) + ) + ), + }); + + assert_matches!( + action_rx.next().await.unwrap(), + InternalActionRecord::RegisterEventProducer(x) if x == proto_statement + ); + + assert_matches!( + action_rx.next().await.unwrap(), + InternalActionRecord::RegisterEventProducer(x) if x == proto_bitfield + ); + + // bridge will inform about all previously-connected peers. + { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full) + ) + ) if p == peer + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(p, v) + ) + ) if p == peer && v == View(Default::default()) + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full) + ) + ) if p == peer + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(p, v) + ) + ) if p == peer && v == View(Default::default()) + ); + } + + let payload = vec![1, 2, 3]; + + network_handle.peer_message( + peer.clone(), + WireMessage::ProtocolMessage(proto_statement, payload.clone()).encode(), + ); + + network_handle.disconnect_peer(peer.clone()); + + // statement distribution message comes first because handlers are ordered by + // protocol ID, and then a disconnection event comes - indicating that the message + // was only sent to the correct protocol. - // TODO [now]: peer messages are sent via event producer + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerMessage(p, m) + ) + ) => { + assert_eq!(p, peer); + assert_eq!(m, payload); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerDisconnected(p) + ) + ) => { + assert_eq!(p, peer); + } + ); + }); + } } diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index 7fc5007ad6c0..406b485dc168 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -173,6 +173,8 @@ pub enum StatementDistributionMessage { /// We have originated a signed statement in the context of /// given relay-parent hash and it should be distributed to other validators. Share(Hash, SignedStatement), + /// Event from the network bridge. + NetworkBridgeUpdate(NetworkBridgeEvent), } /// This data becomes intrinsics or extrinsics which should be included in a future relay chain block. From 4343f5c37ab3244b18cc5c1274c83fccff135917 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 26 Jun 2020 12:59:23 -0400 Subject: [PATCH 25/33] ensure service compiles and passes tests --- Cargo.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a6be6df830d2..10d868af4c3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -207,9 +207,9 @@ version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a265e3abeffdce30b2e26b7a11b222fe37c6067404001b434101457d0385eb92" dependencies = [ - "proc-macro2 1.0.17", - "quote 1.0.6", - "syn 1.0.27", + "proc-macro2 1.0.18", + "quote 1.0.7", + "syn 1.0.31", ] [[package]] From 98c4c547358370065f8e1b7bfe66caf3184246ef Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 26 Jun 2020 12:59:34 -0400 Subject: [PATCH 26/33] fix typo --- node/network/bridge/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 4b3fee2ee808..06e345c2ae9e 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -374,7 +374,7 @@ async fn run_network( .expect("Only taken in last iteration of loop, never afterwards; qed") } else { message.as_ref() - .expect("Only taken in last iteratio of loop, we are not there yet; qed") + .expect("Only taken in last iteration of loop, we are not there yet; qed") .clone() }; From d1948382eb600188da892cba5771b9ca975bdd18 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 26 Jun 2020 19:41:38 -0400 Subject: [PATCH 27/33] fix service-new compilation --- Cargo.lock | 1 + node/service/Cargo.toml | 1 + node/service/src/lib.rs | 20 ++++++++++++-------- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10d868af4c3d..db695d66502b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4646,6 +4646,7 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.9.0", "polkadot-network", + "polkadot-node-subsystem", "polkadot-overseer", "polkadot-primitives", "polkadot-rpc", diff --git a/node/service/Cargo.toml b/node/service/Cargo.toml index 74069f0233af..f1a56acfad95 100644 --- a/node/service/Cargo.toml +++ b/node/service/Cargo.toml @@ -15,6 +15,7 @@ hex-literal = "0.2.1" polkadot-primitives = { path = "../../primitives" } polkadot-runtime = { path = "../../runtime/polkadot" } polkadot-overseer = { path = "../overseer" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } kusama-runtime = { path = "../../runtime/kusama" } westend-runtime = { path = "../../runtime/westend" } polkadot-network = { path = "../../network", optional = true } diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 620850b3bd64..9b917aba1b7a 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -29,10 +29,10 @@ use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use sc_executor::native_executor_instance; use log::info; use sp_blockchain::HeaderBackend; -use polkadot_overseer::{ - self as overseer, - BlockInfo, Overseer, OverseerHandler, Subsystem, SubsystemContext, SpawnedSubsystem, - CandidateValidationMessage, CandidateBackingMessage, +use polkadot_overseer::{self as overseer, BlockInfo, Overseer, OverseerHandler}; +use polkadot_subsystem::{ + Subsystem, SubsystemContext, SpawnedSubsystem, + messages::{CandidateValidationMessage, CandidateBackingMessage}, }; pub use service::{ AbstractService, Role, PruningMode, TransactionPoolOptions, Error, RuntimeGenesis, @@ -269,8 +269,10 @@ macro_rules! new_full_start { struct CandidateValidationSubsystem; -impl Subsystem for CandidateValidationSubsystem { - fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { +impl Subsystem for CandidateValidationSubsystem + where C: SubsystemContext +{ + fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { while let Ok(_) = ctx.recv().await {} })) @@ -279,8 +281,10 @@ impl Subsystem for CandidateValidationSubsystem { struct CandidateBackingSubsystem; -impl Subsystem for CandidateBackingSubsystem { - fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { +impl Subsystem for CandidateBackingSubsystem + where C: SubsystemContext +{ + fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { while let Ok(_) = ctx.recv().await {} })) From d468a2b4273faea8ce3cdfef676793e0460b5117 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 26 Jun 2020 21:22:09 -0400 Subject: [PATCH 28/33] Subsystem test helpers send messages synchronously --- Cargo.lock | 1 + node/test-helpers/subsystem/Cargo.toml | 1 + node/test-helpers/subsystem/src/lib.rs | 142 +++++++++++++++++++++++-- 3 files changed, 133 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db695d66502b..90a541c8805b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4702,6 +4702,7 @@ version = "0.1.0" dependencies = [ "async-trait", "futures 0.3.5", + "parking_lot 0.10.2", "polkadot-node-subsystem", ] diff --git a/node/test-helpers/subsystem/Cargo.toml b/node/test-helpers/subsystem/Cargo.toml index c6acd42d1b06..0fc26a24ea14 100644 --- a/node/test-helpers/subsystem/Cargo.toml +++ b/node/test-helpers/subsystem/Cargo.toml @@ -9,3 +9,4 @@ description = "Helpers for testing subsystems" futures = "0.3.5" async-trait = "0.1" polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +parking_lot = "0.10.0" diff --git a/node/test-helpers/subsystem/src/lib.rs b/node/test-helpers/subsystem/src/lib.rs index 72fc93eba459..c99a33c78d9b 100644 --- a/node/test-helpers/subsystem/src/lib.rs +++ b/node/test-helpers/subsystem/src/lib.rs @@ -23,14 +23,134 @@ use futures::prelude::*; use futures::channel::mpsc; use futures::task::{Spawn, SpawnExt}; use futures::poll; +use parking_lot::Mutex; +use std::convert::Infallible; use std::pin::Pin; -use std::task::Poll; +use std::sync::Arc; +use std::task::{Context, Poll, Waker}; + +enum SinkState { + Empty { + read_waker: Option, + }, + Item { + item: T, + ready_waker: Option, + flush_waker: Option, + }, +} + +/// The sink half of a single-item sink that does not resolve until the item has been read. +pub struct SingleItemSink(Arc>>); + +/// The stream half of a single-item sink. +pub struct SingleItemStream(Arc>>); + +impl Sink for SingleItemSink { + type Error = Infallible; + + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + let mut state = self.0.lock(); + match *state { + SinkState::Empty { .. } => Poll::Ready(Ok(())), + SinkState::Item { ref mut ready_waker, .. } => { + *ready_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + } + + fn start_send( + self: Pin<&mut Self>, + item: T, + ) -> Result<(), Infallible> { + let mut state = self.0.lock(); + + match *state { + SinkState::Empty { ref mut read_waker } => { + if let Some(waker) = read_waker.take() { + waker.wake(); + } + } + _ => panic!("start_send called outside of empty sink state ensured by poll_ready"), + } + + *state = SinkState::Item { + item, + ready_waker: None, + flush_waker: None, + }; + + Ok(()) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + let mut state = self.0.lock(); + match *state { + SinkState::Empty { .. } => Poll::Ready(Ok(())), + SinkState::Item { ref mut flush_waker, .. } => { + *flush_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + self.poll_flush(cx) + } +} + +impl Stream for SingleItemStream { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut state = self.0.lock(); + + let read_waker = Some(cx.waker().clone()); + + match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) { + SinkState::Empty { .. } => Poll::Pending, + SinkState::Item { item, ready_waker, flush_waker } => { + if let Some(waker) = ready_waker { + waker.wake(); + } + + if let Some(waker) = flush_waker { + waker.wake(); + } + + Poll::Ready(Some(item)) + } + } + } +} + +/// Create a single-item Sink/Stream pair. +/// +/// The sink's send methods resolve at the point which the stream reads the item, +/// not when the item is buffered. +pub fn single_item_sink() -> (SingleItemSink, SingleItemStream) { + let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None })); + ( + SingleItemSink(inner.clone()), + SingleItemStream(inner), + ) +} /// A test subsystem context. pub struct TestSubsystemContext { tx: mpsc::UnboundedSender, - rx: mpsc::UnboundedReceiver>, + rx: SingleItemStream>, spawn: S, } @@ -55,16 +175,15 @@ impl SubsystemContext for TestSubs } async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { - self.tx.unbounded_send(msg).expect("test overseer no longer live"); + self.tx.send(msg).await.expect("test overseer no longer live"); Ok(()) } async fn send_messages(&mut self, msgs: T) -> SubsystemResult<()> where T: IntoIterator + Send, T::IntoIter: Send { - for msg in msgs { - self.tx.unbounded_send(msg).expect("test overseer no longer live"); - } + let mut iter = stream::iter(msgs.into_iter().map(Ok)); + self.tx.send_all(&mut iter).await.expect("test overseer no longer live"); Ok(()) } @@ -72,14 +191,15 @@ impl SubsystemContext for TestSubs /// A handle for interacting with the subsystem context. pub struct TestSubsystemContextHandle { - tx: mpsc::UnboundedSender>, + tx: SingleItemSink>, rx: mpsc::UnboundedReceiver, } impl TestSubsystemContextHandle { - /// Send a message or signal to the subsystem. - pub fn send(&self, from_overseer: FromOverseer) { - self.tx.unbounded_send(from_overseer).expect("Test subsystem no longer live"); + /// Send a message or signal to the subsystem. This resolves at the point in time where the + /// subsystem has _read_ the message. + pub async fn send(&mut self, from_overseer: FromOverseer) { + self.tx.send(from_overseer).await.expect("Test subsystem no longer live"); } /// Receive the next message from the subsystem. @@ -92,7 +212,7 @@ impl TestSubsystemContextHandle { pub fn make_subsystem_context(spawn: S) -> (TestSubsystemContext, TestSubsystemContextHandle) { - let (overseer_tx, overseer_rx) = mpsc::unbounded(); + let (overseer_tx, overseer_rx) = single_item_sink(); let (all_messages_tx, all_messages_rx) = mpsc::unbounded(); ( From b676adcfa19c895c38f39a6c2dec9d7745fb8f49 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 26 Jun 2020 21:22:18 -0400 Subject: [PATCH 29/33] remove smelly action inspector --- node/network/bridge/src/lib.rs | 172 ++++++++------------------------- node/subsystem/src/lib.rs | 6 ++ 2 files changed, 48 insertions(+), 130 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 06e345c2ae9e..49cc5dee031c 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -81,7 +81,7 @@ pub enum NetworkAction { } /// An abstraction over networking for the purposes of this subsystem. -pub trait Network: Clone + Send + 'static { +pub trait Network: Send + 'static { /// Get a stream of all events occurring on the network. This may include events unrelated /// to the Polkadot protocol - the user of this function should filter only for events related /// to the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID). @@ -161,7 +161,7 @@ impl Network for Arc> { } /// The network bridge subsystem. -pub struct NetworkBridge(N); +pub struct NetworkBridge(Option); impl NetworkBridge { /// Create a new network bridge subsystem with underlying network service. @@ -169,17 +169,25 @@ impl NetworkBridge { /// This assumes that the network service has had the notifications protocol for the network /// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info). pub fn new(net_service: N) -> Self { - NetworkBridge(net_service) + NetworkBridge(Some(net_service)) } } impl Subsystem for NetworkBridge where C: SubsystemContext { - fn start(&mut self, ctx: C) -> SpawnedSubsystem { - // Swallow error because failure is fatal to the node and we log with more precision - // within `run_network`. - SpawnedSubsystem(run_network(self.0.clone(), ctx, |_| ()).map(|_| ()).boxed()) + fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { + SpawnedSubsystem(match self.0.take() { + None => async move { for _ in ctx.recv().await { } }.boxed(), + Some(net) => { + // Swallow error because failure is fatal to the node and we log with more precision + // within `run_network`. + run_network(net, ctx).map(|_| ()).boxed() + } + }) + + + } } @@ -307,7 +315,6 @@ async fn update_view( async fn run_network( mut net: N, mut ctx: impl SubsystemContext, - action_inspector: impl Fn(&Action), // side-channel for tests to inspect internals ) -> SubsystemResult<()> { let mut event_stream = net.event_stream().fuse(); @@ -335,8 +342,6 @@ async fn run_network( } }; - action_inspector(&action); - match action { Action::RegisterEventProducer(protocol_id, event_producer) => { // insert only if none present. @@ -530,11 +535,11 @@ mod tests { use assert_matches::assert_matches; use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage}; + use subsystem_test::{SingleItemSink, SingleItemStream}; // The subsystem's view of the network - only supports a single call to `event_stream`. - #[derive(Clone)] struct TestNetwork { - net_events: Arc>>>, + net_events: Arc>>>, action_tx: mpsc::UnboundedSender, } @@ -542,55 +547,14 @@ mod tests { // of `NetworkAction`s. struct TestNetworkHandle { action_rx: mpsc::UnboundedReceiver, - net_tx: mpsc::UnboundedSender, - } - - // a record of an action internal to the network. - #[derive(Debug)] - enum InternalActionRecord { - RegisterEventProducer(ProtocolId), - SendMessage(Vec, ProtocolId, Vec), - ReportPeer(PeerId, ReputationChange), - StartWork(Hash), - StopWork(Hash), - - PeerConnected(PeerId, ObservedRole), - PeerDisconnected(PeerId), - PeerMessages(PeerId, Vec), - - Abort, - } - - impl<'a> From<&'a Action> for InternalActionRecord { - fn from(action: &'a Action) -> Self { - match *action { - Action::RegisterEventProducer(protocol, _) - => InternalActionRecord::RegisterEventProducer(protocol), - Action::SendMessage(ref peers, protocol, ref message) - => InternalActionRecord::SendMessage(peers.clone(), protocol, message.clone()), - Action::ReportPeer(ref peer, rep) - => InternalActionRecord::ReportPeer(peer.clone(), rep.clone()), - - Action::StartWork(hash) => InternalActionRecord::StartWork(hash), - Action::StopWork(hash) => InternalActionRecord::StopWork(hash), - - Action::PeerConnected(ref peer, ref role) - => InternalActionRecord::PeerConnected(peer.clone(), role.clone()), - Action::PeerDisconnected(ref peer) - => InternalActionRecord::PeerDisconnected(peer.clone()), - Action::PeerMessages(ref peer, ref messages) - => InternalActionRecord::PeerMessages(peer.clone(), messages.clone()), - - Action::Abort => InternalActionRecord::Abort, - } - } + net_tx: SingleItemSink, } fn new_test_network() -> ( TestNetwork, TestNetworkHandle, ) { - let (net_tx, net_rx) = mpsc::unbounded(); + let (net_tx, net_rx) = subsystem_test::single_item_sink(); let (action_tx, action_rx) = mpsc::unbounded(); ( @@ -636,30 +600,30 @@ mod tests { v } - fn connect_peer(&self, peer: PeerId, role: ObservedRole) { + async fn connect_peer(&mut self, peer: PeerId, role: ObservedRole) { self.send_network_event(NetworkEvent::NotificationStreamOpened { remote: peer, engine_id: POLKADOT_ENGINE_ID, role, - }); + }).await; } - fn disconnect_peer(&self, peer: PeerId) { + async fn disconnect_peer(&mut self, peer: PeerId) { self.send_network_event(NetworkEvent::NotificationStreamClosed { remote: peer, engine_id: POLKADOT_ENGINE_ID, - }); + }).await; } - fn peer_message(&self, peer: PeerId, message: Vec) { + async fn peer_message(&mut self, peer: PeerId, message: Vec) { self.send_network_event(NetworkEvent::NotificationsReceived { remote: peer, messages: vec![(POLKADOT_ENGINE_ID, message.into())], - }); + }).await; } - fn send_network_event(&self, event: NetworkEvent) { - self.net_tx.unbounded_send(event).expect("subsystem concluded early"); + async fn send_network_event(&mut self, event: NetworkEvent) { + self.net_tx.send(event).await.expect("subsystem concluded early"); } } @@ -672,7 +636,6 @@ mod tests { struct TestHarness { network_handle: TestNetworkHandle, virtual_overseer: subsystem_test::TestSubsystemContextHandle, - action_rx: mpsc::UnboundedReceiver, } fn test_harness>(test: impl FnOnce(TestHarness) -> T) { @@ -680,12 +643,10 @@ mod tests { let (network, network_handle) = new_test_network(); let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool); - let (action_tx, action_rx) = mpsc::unbounded::(); let network_bridge = run_network( network, context, - move |action| { let _ = action_tx.unbounded_send(action.into()); }, ) .map_err(|_| panic!("subsystem execution failed")) .map(|_| ()); @@ -693,7 +654,6 @@ mod tests { let test_fut = test(TestHarness { network_handle, virtual_overseer, - action_rx, }); futures::pin_mut!(test_fut); @@ -705,31 +665,17 @@ mod tests { #[test] fn sends_view_updates_to_peers() { test_harness(|test_harness| async move { - let TestHarness { mut network_handle, virtual_overseer, mut action_rx } = test_harness; + let TestHarness { mut network_handle, mut virtual_overseer } = test_harness; let peer_a = PeerId::random(); let peer_b = PeerId::random(); - network_handle.connect_peer(peer_a.clone(), ObservedRole::Full); - network_handle.connect_peer(peer_b.clone(), ObservedRole::Full); - - assert_matches!( - action_rx.next().await.unwrap(), - InternalActionRecord::PeerConnected(p, ObservedRole::Full) => { - assert_eq!(p, peer_a); - } - ); - - assert_matches!( - action_rx.next().await.unwrap(), - InternalActionRecord::PeerConnected(p, ObservedRole::Full) => { - assert_eq!(p, peer_b); - } - ); + network_handle.connect_peer(peer_a.clone(), ObservedRole::Full).await; + network_handle.connect_peer(peer_b.clone(), ObservedRole::Full).await; let hash_a = Hash::from([1; 32]); - virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a))); + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a))).await; let actions = network_handle.next_network_actions(2).await; let wire_message = WireMessage::ViewUpdate(View(vec![hash_a])).encode(); @@ -749,9 +695,8 @@ mod tests { fn peer_view_updates_sent_via_overseer() { test_harness(|test_harness| async move { let TestHarness { - network_handle, + mut network_handle, mut virtual_overseer, - mut action_rx, } = test_harness; let peer = PeerId::random(); @@ -759,13 +704,7 @@ mod tests { let proto_statement = *b"abcd"; let proto_bitfield = *b"wxyz"; - network_handle.connect_peer(peer.clone(), ObservedRole::Full); - assert_matches!( - action_rx.next().await.unwrap(), - InternalActionRecord::PeerConnected(p, ObservedRole::Full) => { - assert_eq!(p, peer); - } - ); + network_handle.connect_peer(peer.clone(), ObservedRole::Full).await; virtual_overseer.send(FromOverseer::Communication { msg: NetworkBridgeMessage::RegisterEventProducer( @@ -774,7 +713,7 @@ mod tests { StatementDistributionMessage::NetworkBridgeUpdate(event) ) ), - }); + }).await; virtual_overseer.send(FromOverseer::Communication { msg: NetworkBridgeMessage::RegisterEventProducer( @@ -783,17 +722,7 @@ mod tests { BitfieldDistributionMessage::NetworkBridgeUpdate(event) ) ), - }); - - assert_matches!( - action_rx.next().await.unwrap(), - InternalActionRecord::RegisterEventProducer(x) if x == proto_statement - ); - - assert_matches!( - action_rx.next().await.unwrap(), - InternalActionRecord::RegisterEventProducer(x) if x == proto_bitfield - ); + }).await; let view = View(vec![Hash::from([1u8; 32])]); @@ -839,7 +768,7 @@ mod tests { network_handle.peer_message( peer.clone(), WireMessage::ViewUpdate(view.clone()).encode(), - ); + ).await; // statement distribution message comes first because handlers are ordered by // protocol ID. @@ -874,9 +803,8 @@ mod tests { fn peer_messages_sent_via_overseer() { test_harness(|test_harness| async move { let TestHarness { - network_handle, + mut network_handle, mut virtual_overseer, - mut action_rx, } = test_harness; let peer = PeerId::random(); @@ -884,13 +812,7 @@ mod tests { let proto_statement = *b"abcd"; let proto_bitfield = *b"wxyz"; - network_handle.connect_peer(peer.clone(), ObservedRole::Full); - assert_matches!( - action_rx.next().await.unwrap(), - InternalActionRecord::PeerConnected(p, ObservedRole::Full) => { - assert_eq!(p, peer); - } - ); + network_handle.connect_peer(peer.clone(), ObservedRole::Full).await; virtual_overseer.send(FromOverseer::Communication { msg: NetworkBridgeMessage::RegisterEventProducer( @@ -899,7 +821,7 @@ mod tests { StatementDistributionMessage::NetworkBridgeUpdate(event) ) ), - }); + }).await; virtual_overseer.send(FromOverseer::Communication { msg: NetworkBridgeMessage::RegisterEventProducer( @@ -908,17 +830,7 @@ mod tests { BitfieldDistributionMessage::NetworkBridgeUpdate(event) ) ), - }); - - assert_matches!( - action_rx.next().await.unwrap(), - InternalActionRecord::RegisterEventProducer(x) if x == proto_statement - ); - - assert_matches!( - action_rx.next().await.unwrap(), - InternalActionRecord::RegisterEventProducer(x) if x == proto_bitfield - ); + }).await; // bridge will inform about all previously-connected peers. { @@ -964,9 +876,9 @@ mod tests { network_handle.peer_message( peer.clone(), WireMessage::ProtocolMessage(proto_statement, payload.clone()).encode(), - ); + ).await; - network_handle.disconnect_peer(peer.clone()); + network_handle.disconnect_peer(peer.clone()).await; // statement distribution message comes first because handlers are ordered by // protocol ID, and then a disconnection event comes - indicating that the message diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs index ca8a91dc97d5..31d094907f51 100644 --- a/node/subsystem/src/lib.rs +++ b/node/subsystem/src/lib.rs @@ -88,6 +88,12 @@ impl From for SubsystemError { } } +impl From for SubsystemError { + fn from(e: std::convert::Infallible) -> Self { + match e {} + } +} + /// An asynchronous subsystem task.. /// /// In essence it's just a newtype wrapping a `BoxFuture`. From 7d44a628190e4cb11ef0c0cb42ee61ecdbe6b77f Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 26 Jun 2020 21:27:05 -0400 Subject: [PATCH 30/33] remove superfluous let binding --- node/network/bridge/src/lib.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 49cc5dee031c..e6e5d4daf8ca 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -390,9 +390,7 @@ async fn run_network( net.action_sink().send_all(&mut message_producer).await?; } Action::ReportPeer(peer, rep) => { - // let binding required here to avoid unnecessary sync bound. - let report_fut = net.report_peer(peer, rep); - report_fut.await? + net.report_peer(peer, rep).await; } Action::StartWork(relay_parent) => { live_heads.push(relay_parent); @@ -468,12 +466,11 @@ async fn run_network( match message { WireMessage::ViewUpdate(new_view) => { if new_view.0.len() > MAX_VIEW_HEADS { - let report_fut = net.report_peer( + net.report_peer( peer.clone(), MALFORMED_VIEW_COST, - ); + ).await; - report_fut.await?; continue } @@ -495,12 +492,11 @@ async fn run_network( NetworkBridgeEvent::PeerMessage(peer.clone(), message) )), None => { - let report_fut = net.report_peer( + net.report_peer( peer.clone(), UNKNOWN_PROTO_COST, - ); + ).await; - report_fut.await?; None } }; From 989e8b50523fa0cbc292f177cd357502e094f956 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 26 Jun 2020 21:27:54 -0400 Subject: [PATCH 31/33] fix warnings --- node/network/bridge/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index e6e5d4daf8ca..725389589ead 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -390,7 +390,7 @@ async fn run_network( net.action_sink().send_all(&mut message_producer).await?; } Action::ReportPeer(peer, rep) => { - net.report_peer(peer, rep).await; + net.report_peer(peer, rep).await?; } Action::StartWork(relay_parent) => { live_heads.push(relay_parent); @@ -469,7 +469,7 @@ async fn run_network( net.report_peer( peer.clone(), MALFORMED_VIEW_COST, - ).await; + ).await?; continue } @@ -495,7 +495,7 @@ async fn run_network( net.report_peer( peer.clone(), UNKNOWN_PROTO_COST, - ).await; + ).await?; None } From 64792f7fcc56eb6e970aed8f416caa8c2fe8fb62 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 29 Jun 2020 13:58:09 -0400 Subject: [PATCH 32/33] Update node/network/bridge/src/lib.rs Co-authored-by: Peter Goodspeed-Niklaus --- node/network/bridge/src/lib.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 725389589ead..a22616d280cd 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -173,8 +173,10 @@ impl NetworkBridge { } } -impl Subsystem for NetworkBridge - where C: SubsystemContext +impl Subsystem for NetworkBridge + where + Network: Network, + Context: SubsystemContext, { fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(match self.0.take() { From 50b51dd04baabe02f160a79fca245ba4ba771a15 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 30 Jun 2020 10:52:42 -0400 Subject: [PATCH 33/33] fix compilation --- node/network/bridge/src/lib.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index a22616d280cd..aef1632a9439 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -40,6 +40,9 @@ use std::collections::hash_map::{HashMap, Entry as HEntry}; use std::pin::Pin; use std::sync::Arc; +/// The maximum amount of heads a peer is allowed to have in their view at any time. +/// +/// We use the same limit to compute the view sent to peers locally. const MAX_VIEW_HEADS: usize = 5; /// The engine ID of the polkadot network protocol. @@ -173,12 +176,12 @@ impl NetworkBridge { } } -impl Subsystem for NetworkBridge - where - Network: Network, +impl Subsystem for NetworkBridge + where + Net: Network, Context: SubsystemContext, { - fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { + fn start(&mut self, mut ctx: Context) -> SpawnedSubsystem { SpawnedSubsystem(match self.0.take() { None => async move { for _ in ctx.recv().await { } }.boxed(), Some(net) => {