From aa28e19745f18c4f2fb5f929d2a9ef6e06bd7bba Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 3 Mar 2020 10:57:23 -0800 Subject: [PATCH 01/16] move protocol.rs into subfolder --- network/src/{protocol.rs => protocol/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename network/src/{protocol.rs => protocol/mod.rs} (100%) diff --git a/network/src/protocol.rs b/network/src/protocol/mod.rs similarity index 100% rename from network/src/protocol.rs rename to network/src/protocol/mod.rs From 21da512c2d823418afa8828e54cb4ad4d02c320e Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 3 Mar 2020 12:49:00 -0800 Subject: [PATCH 02/16] add trait for mocking network behavior --- network/src/protocol/mod.rs | 99 +++++++++++++++++++++++++---------- network/src/protocol/tests.rs | 32 +++++++++++ 2 files changed, 104 insertions(+), 27 deletions(-) create mode 100644 network/src/protocol/tests.rs diff --git a/network/src/protocol/mod.rs b/network/src/protocol/mod.rs index 380c62a7d5ec..cdf13093a197 100644 --- a/network/src/protocol/mod.rs +++ b/network/src/protocol/mod.rs @@ -42,6 +42,7 @@ use polkadot_validation::{ }; use sc_network::{config::Roles, Event, PeerId}; use sp_api::ProvideRuntimeApi; +use sp_runtime::ConsensusEngineId; use std::collections::{hash_map::{Entry, HashMap}, HashSet}; use std::pin::Pin; @@ -58,12 +59,15 @@ pub const VERSION: u32 = 1; pub const MIN_SUPPORTED_VERSION: u32 = 1; /// The engine ID of the polkadot network protocol. -pub const POLKADOT_ENGINE_ID: sp_runtime::ConsensusEngineId = *b"dot2"; +pub const POLKADOT_ENGINE_ID: ConsensusEngineId = *b"dot2"; /// The protocol name. pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/1"; pub use crate::legacy::gossip::ChainContext; +#[cfg(test)] +mod tests; + // Messages from the service API or network adapter. enum ServiceToWorkerMsg { // basic peer messages. @@ -113,13 +117,48 @@ enum ServiceToWorkerMsg { Hash, // relay-parent, oneshot::Sender + Send>>>, ), + + /// Used in tests to ensure that all other messages sent from the same + /// thread have been flushed. Also executes arbitrary logic with the protocl + /// handler. + #[cfg(test)] + Synchronize(Box), +} + +/// Operations that a handle to an underlying network service should provide. +pub trait NetworkServiceOps: Send + Sync { + /// Report the peer as having a particular positive or negative value. + fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange); + + /// Write a notification to a given peer. + fn write_notification( + &self, + peer: PeerId, + engine_id: ConsensusEngineId, + notification: Vec, + ); +} + +impl NetworkServiceOps for PolkadotNetworkService { + fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange) { + PolkadotNetworkService::report_peer(self, peer, value); + } + + fn write_notification( + &self, + peer: PeerId, + engine_id: ConsensusEngineId, + notification: Vec, + ) { + PolkadotNetworkService::write_notification(self, peer, engine_id, notification); + } } /// An async handle to the network service. #[derive(Clone)] pub struct Service { sender: mpsc::Sender, - network_service: Arc, + network_service: Arc, } /// Registers the protocol. @@ -356,7 +395,7 @@ impl RecentValidatorIds { } struct ProtocolHandler { - service: Arc, + service: Arc, peers: HashMap, // reverse mapping from validator-ID to PeerID. Multiple peers can represent // the same validator because of sentry nodes. @@ -368,7 +407,7 @@ struct ProtocolHandler { impl ProtocolHandler { fn new( - service: Arc, + service: Arc, config: Config, ) -> Self { ProtocolHandler { @@ -661,7 +700,7 @@ impl ProtocolHandler { } fn send_peer_collations( - service: &PolkadotNetworkService, + service: &dyn NetworkServiceOps, remote: PeerId, collations: impl IntoIterator, ) { @@ -676,7 +715,7 @@ fn send_peer_collations( async fn worker_loop( config: Config, - service: Arc, + service: Arc, gossip_handle: RegisteredMessageValidator, sender: mpsc::Sender, api: Arc, @@ -848,6 +887,10 @@ async fn worker_loop( let _ = sender.send(checked_messages); } + #[cfg(test)] + ServiceToWorkerMsg::Synchronize(callback) => { + (callback)(&mut protocol_handler) + } } } } @@ -1141,6 +1184,29 @@ impl Service { .flatten_stream() .boxed() } + + /// Synchronizes the worker thread. Executes the callback in the worker's + /// context, and returns a future that resolves once it's been executed. + #[cfg(test)] + fn synchronize( + &self, + callback: impl FnOnce(&mut ProtocolHandler) -> T + Send + 'static, + ) -> impl Future { + let (tx, rx) = oneshot::channel(); + let mut sender = self.sender.clone(); + + async move { + let msg = ServiceToWorkerMsg::Synchronize(Box::new(move |proto| { + let res = callback(proto); + if let Err(_) = tx.send(res) { + log::warn!(target: "p_net", "Failed to send synchronization result"); + } + })); + + sender.send(msg).await.expect("Worker thread unexpectedly hung up"); + rx.await.expect("Worker thread failed to send back result") + } + } } impl ParachainNetwork for Service { @@ -1268,24 +1334,3 @@ impl TableRouter for Router { }) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn router_inner_drop_sends_worker_message() { - let parent = [1; 32].into(); - - let (sender, mut receiver) = mpsc::channel(0); - drop(RouterInner { - relay_parent: parent, - sender, - }); - - match receiver.try_next() { - Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x), - _ => panic!("message not sent"), - } - } -} diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs new file mode 100644 index 000000000000..799a384f5a60 --- /dev/null +++ b/network/src/protocol/tests.rs @@ -0,0 +1,32 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +//! Tests for the protocol. + +use super::*; + +#[test] +fn router_inner_drop_sends_worker_message() { + let parent = [1; 32].into(); + + let (sender, mut receiver) = mpsc::channel(0); + drop(RouterInner { + relay_parent: parent, + sender, + }); + + match receiver.try_next() { + Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x), + _ => panic!("message not sent"), + } +} From aafc9031556267e0a8c3bfd6c9b163efabc6946c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 3 Mar 2020 13:57:38 -0800 Subject: [PATCH 03/16] add a mock version of network ops --- network/src/protocol/tests.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index 799a384f5a60..8a542c24d782 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -14,6 +14,36 @@ //! Tests for the protocol. use super::*; +use parking_lot::Mutex; + +struct MockNetworkOps { + recorded: Arc>, +} + +struct Recorded { + peer_reputations: HashMap, + notifications: Vec<(PeerId, Message)>, +} + +impl NetworkServiceOps for MockNetworkOps { + fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange) { + let mut recorded = self.recorded.lock(); + let total_rep = recorded.peer_reputations.entry(peer).or_insert(0); + + *total_rep = total_rep.saturating_add(value.value); + } + + fn write_notification( + &self, + peer: PeerId, + engine_id: ConsensusEngineId, + notification: Vec, + ) { + assert_eq!(engine_id, POLKADOT_ENGINE_ID); + let message = Message::decode(&mut ¬ification[..]).expect("invalid notification"); + self.recorded.lock().notifications.push((peer, message)); + } +} #[test] fn router_inner_drop_sends_worker_message() { From 1615486e85d0ff013e16b585a66f0c3b5ce9b142 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 3 Mar 2020 16:12:16 -0800 Subject: [PATCH 04/16] remove some redundant parameters from service messages --- network/src/protocol/mod.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/network/src/protocol/mod.rs b/network/src/protocol/mod.rs index cdf13093a197..2591ca9ad502 100644 --- a/network/src/protocol/mod.rs +++ b/network/src/protocol/mod.rs @@ -79,13 +79,11 @@ enum ServiceToWorkerMsg { BuildConsensusNetworking(Arc, Vec, oneshot::Sender), DropConsensusNetworking(Hash), SubmitValidatedCollation( - Hash, // relay-parent AbridgedCandidateReceipt, PoVBlock, (ValidatorIndex, Vec), ), FetchPoVBlock( - Hash, // relay-parent AbridgedCandidateReceipt, oneshot::Sender, ), @@ -806,7 +804,8 @@ async fn worker_loop( ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => { consensus_instances.remove(&relay_parent); } - ServiceToWorkerMsg::SubmitValidatedCollation(relay_parent, receipt, pov_block, chunks) => { + ServiceToWorkerMsg::SubmitValidatedCollation(receipt, pov_block, chunks) => { + let relay_parent = receipt.relay_parent; let instance = match consensus_instances.get(&relay_parent) { None => continue, Some(instance) => instance, @@ -820,7 +819,7 @@ async fn worker_loop( &gossip_handle, ); } - ServiceToWorkerMsg::FetchPoVBlock(_relay_parent, _candidate, _sender) => { + ServiceToWorkerMsg::FetchPoVBlock(_candidate, _sender) => { // TODO https://github.com/paritytech/polkadot/issues/742: // create a filter on gossip for it and send to sender. } @@ -1294,6 +1293,8 @@ pub enum RouterError { Canceled(oneshot::Canceled), #[display(fmt = "Could not reach worker with request: {}", _0)] SendError(mpsc::SendError), + #[display(fmt = "Provided candidate receipt does not have expected relay parent {}", _0)] + IncorrectRelayParent(Hash), } impl TableRouter for Router { @@ -1307,8 +1308,13 @@ impl TableRouter for Router { pov_block: PoVBlock, chunks: (ValidatorIndex, &[ErasureChunk]), ) -> Self::SendLocalCollation { + if receipt.relay_parent != self.inner.relay_parent { + return Box::pin( + future::ready(Err(RouterError::IncorrectRelayParent(self.inner.relay_parent))) + ); + } + let message = ServiceToWorkerMsg::SubmitValidatedCollation( - self.inner.relay_parent.clone(), receipt, pov_block, (chunks.0, chunks.1.to_vec()), @@ -1320,9 +1326,14 @@ impl TableRouter for Router { } fn fetch_pov_block(&self, candidate: &AbridgedCandidateReceipt) -> Self::FetchValidationProof { + if candidate.relay_parent != self.inner.relay_parent { + return Box::pin( + future::ready(Err(RouterError::IncorrectRelayParent(self.inner.relay_parent))) + ); + } + let (tx, rx) = oneshot::channel(); let message = ServiceToWorkerMsg::FetchPoVBlock( - self.inner.relay_parent.clone(), candidate.clone(), tx, ); From 9ce3766470cd3ddbfc37831fde29714cfb2ae111 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 3 Mar 2020 16:52:02 -0800 Subject: [PATCH 05/16] ensure fetching erasure chunks automatically cancels --- network/src/protocol/mod.rs | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/network/src/protocol/mod.rs b/network/src/protocol/mod.rs index 2591ca9ad502..99ee2eecf7e4 100644 --- a/network/src/protocol/mod.rs +++ b/network/src/protocol/mod.rs @@ -342,6 +342,30 @@ struct ConsensusNetworkingInstance { _drop_signal: exit_future::Signal, } +type RegisteredMessageValidator = crate::legacy::gossip::RegisteredMessageValidator; + +/// A utility future that resolves when the receiving end of a channel has hung up. +/// +/// This is an `.await`-friendly interface around `poll_canceled`. +// TODO: remove in favor of https://github.com/rust-lang/futures-rs/pull/2092/ +// once published. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct AwaitCanceled<'a, T> { + inner: &'a mut oneshot::Sender, +} + +impl Future for AwaitCanceled<'_, T> { + type Output = (); + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut futures::task::Context<'_>, + ) -> futures::task::Poll<()> { + self.inner.poll_canceled(cx) + } +} + /// Protocol configuration. #[derive(Default)] pub struct Config { @@ -823,7 +847,7 @@ async fn worker_loop( // TODO https://github.com/paritytech/polkadot/issues/742: // create a filter on gossip for it and send to sender. } - ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, sender) => { + ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, mut sender) => { let topic = crate::erasure_coding_topic(&candidate_hash); // for every erasure-root, relay-parent pair, there should only be one @@ -848,8 +872,10 @@ async fn worker_loop( )); let _ = executor.spawn(async move { - let chunk = get_msg.await; - let _ = sender.send(chunk); + let res = future::select(get_msg, AwaitCanceled { inner: &mut sender}).await; + if let Either::Left((chunk, _)) = res { + let _ = sender.send(chunk); + } }); } ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, erasure_chunk) => { From bf43e3dc63ef20a66e92e6ee790878db6e46a610 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 3 Mar 2020 17:02:20 -0800 Subject: [PATCH 06/16] introduce dummy ProvideRuntimeApi --- network/src/protocol/tests.rs | 195 ++++++++++++++++++++++++++++++++++ 1 file changed, 195 insertions(+) diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index 8a542c24d782..aa9cade866d5 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -16,6 +16,18 @@ use super::*; use parking_lot::Mutex; +use polkadot_primitives::{Block, Header, BlockId}; +use polkadot_primitives::parachain::{ + Id as ParaId, Chain, DutyRoster, ParachainHost, ValidatorId, + Retriable, CollatorId, AbridgedCandidateReceipt, + GlobalValidationSchedule, LocalValidationData, +}; +use sp_blockchain::Result as ClientResult; +use sp_api::{ApiRef, Core, RuntimeVersion, StorageProof, ApiErrorExt, ApiExt, ProvideRuntimeApi}; +use sp_runtime::traits::{Block as BlockT, HasherFor, NumberFor}; +use sp_state_machine::ChangesTrieState; +use sp_core::{NativeOrEncoded, ExecutionContext}; + struct MockNetworkOps { recorded: Arc>, } @@ -45,6 +57,174 @@ impl NetworkServiceOps for MockNetworkOps { } } +#[derive(Default)] +struct ApiData { + validators: Vec, + duties: Vec, + active_parachains: Vec<(ParaId, Option<(CollatorId, Retriable)>)>, +} + +#[derive(Default, Clone)] +struct TestApi { + data: Arc>, +} + +struct RuntimeApi { + data: Arc>, +} + +impl ProvideRuntimeApi for TestApi { + type Api = RuntimeApi; + + fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> { + RuntimeApi { data: self.data.clone() }.into() + } +} + +impl Core for RuntimeApi { + fn Core_version_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult> { + unimplemented!("Not required for testing!") + } + + fn Core_execute_block_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option, + _: Vec, + ) -> ClientResult> { + unimplemented!("Not required for testing!") + } + + fn Core_initialize_block_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<&Header>, + _: Vec, + ) -> ClientResult> { + unimplemented!("Not required for testing!") + } +} + +impl ApiErrorExt for RuntimeApi { + type Error = sp_blockchain::Error; +} + +impl ApiExt for RuntimeApi { + type StateBackend = sp_state_machine::InMemoryBackend>; + + fn map_api_result Result, R, E>( + &self, + _: F + ) -> Result { + unimplemented!("Not required for testing!") + } + + fn runtime_version_at(&self, _: &BlockId) -> ClientResult { + unimplemented!("Not required for testing!") + } + + fn record_proof(&mut self) { } + + fn extract_proof(&mut self) -> Option { + None + } + + fn into_storage_changes( + &self, + _: &Self::StateBackend, + _: Option<&ChangesTrieState, NumberFor>>, + _: ::Hash, + ) -> std::result::Result, String> + where Self: Sized + { + unimplemented!("Not required for testing!") + } +} + +impl ParachainHost for RuntimeApi { + fn ParachainHost_validators_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult>> { + Ok(NativeOrEncoded::Native(self.data.lock().validators.clone())) + } + + fn ParachainHost_duty_roster_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult> { + + Ok(NativeOrEncoded::Native(DutyRoster { + validator_duty: self.data.lock().duties.clone(), + })) + } + + fn ParachainHost_active_parachains_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult)>>> { + Ok(NativeOrEncoded::Native(self.data.lock().active_parachains.clone())) + } + + fn ParachainHost_parachain_code_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option, + _: Vec, + ) -> ClientResult>>> { + Ok(NativeOrEncoded::Native(Some(Vec::new()))) + } + + fn ParachainHost_global_validation_schedule_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult> { + Ok(NativeOrEncoded::Native(Default::default())) + } + + fn ParachainHost_local_validation_data_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option, + _: Vec, + ) -> ClientResult>> { + Ok(NativeOrEncoded::Native(Some(Default::default()))) + } + + fn ParachainHost_get_heads_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _extrinsics: Option::Extrinsic>>, + _: Vec, + ) -> ClientResult>>> { + Ok(NativeOrEncoded::Native(Some(Vec::new()))) + } +} + + #[test] fn router_inner_drop_sends_worker_message() { let parent = [1; 32].into(); @@ -60,3 +240,18 @@ fn router_inner_drop_sends_worker_message() { _ => panic!("message not sent"), } } + +#[test] +fn erasure_chunk_receiver_drop_cancels_gossip_listen() { + let mut pool = futures::executor::LocalPool::new(); + + // worker_loop( + // config: Config, + // service: Arc, + // gossip_handle: RegisteredMessageValidator, + // sender: mpsc::Sender, + // api: Arc, + // mut receiver: mpsc::Receiver, + // executor: Sp, + // ) +} From 9d863a8cb70e978c6b4cf70a0f49fc1431e93107 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 4 Mar 2020 17:54:18 -0800 Subject: [PATCH 07/16] abstract over gossip somewhat --- network/src/protocol/mod.rs | 41 +++++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/network/src/protocol/mod.rs b/network/src/protocol/mod.rs index 99ee2eecf7e4..94fa804887a7 100644 --- a/network/src/protocol/mod.rs +++ b/network/src/protocol/mod.rs @@ -124,7 +124,7 @@ enum ServiceToWorkerMsg { } /// Operations that a handle to an underlying network service should provide. -pub trait NetworkServiceOps: Send + Sync { +trait NetworkServiceOps: Send + Sync { /// Report the peer as having a particular positive or negative value. fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange); @@ -152,6 +152,39 @@ impl NetworkServiceOps for PolkadotNetworkService { } } +/// Operations that a handle to a gossip network should provide. +trait GossipOps: Clone + crate::legacy::GossipService { + fn new_local_leaf( + &self, + relay_parent: Hash, + validation_data: crate::legacy::gossip::MessageValidationData, + ) -> crate::legacy::gossip::NewLeafActions; + + /// Register an availability store in the gossip service to evaluate incoming + /// messages with. + fn register_availability_store( + &self, + store: av_store::Store, + ); +} + +impl GossipOps for RegisteredMessageValidator { + fn new_local_leaf( + &self, + relay_parent: Hash, + validation_data: crate::legacy::gossip::MessageValidationData, + ) -> crate::legacy::gossip::NewLeafActions { + RegisteredMessageValidator::new_local_leaf(self, relay_parent, validation_data) + } + + fn register_availability_store( + &self, + store: av_store::Store, + ) { + RegisteredMessageValidator::register_availability_store(self, store); + } +} + /// An async handle to the network service. #[derive(Clone)] pub struct Service { @@ -738,7 +771,7 @@ fn send_peer_collations( async fn worker_loop( config: Config, service: Arc, - gossip_handle: RegisteredMessageValidator, + gossip_handle: impl GossipOps, sender: mpsc::Sender, api: Arc, mut receiver: mpsc::Receiver, @@ -986,7 +1019,7 @@ async fn statement_import_loop( table: Arc, api: Arc, weak_router: Weak, - gossip_handle: RegisteredMessageValidator, + gossip_handle: impl GossipOps, mut exit: exit_future::Exit, executor: impl Spawn, ) where @@ -1098,7 +1131,7 @@ fn distribute_validated_collation( receipt: AbridgedCandidateReceipt, pov_block: PoVBlock, chunks: (ValidatorIndex, Vec), - gossip_handle: &RegisteredMessageValidator, + gossip_handle: &impl GossipOps, ) { // produce a signed statement. let hash = receipt.hash(); From 2fdd235912442f6b3a028d428c58fc09593b9b75 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 4 Mar 2020 18:31:56 -0800 Subject: [PATCH 08/16] add mock gossip handler --- network/src/legacy/gossip/mod.rs | 5 +++ network/src/protocol/tests.rs | 67 +++++++++++++++++++++++++------- 2 files changed, 57 insertions(+), 15 deletions(-) diff --git a/network/src/legacy/gossip/mod.rs b/network/src/legacy/gossip/mod.rs index 5141f11c9969..a77dd3f54c87 100644 --- a/network/src/legacy/gossip/mod.rs +++ b/network/src/legacy/gossip/mod.rs @@ -327,6 +327,11 @@ pub struct NewLeafActions { } impl NewLeafActions { + #[cfg(test)] + pub fn new() -> Self { + NewLeafActions { actions: Vec::new() } + } + /// Perform the queued actions, feeding into gossip. pub fn perform( self, diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index aa9cade866d5..6a670828d8ca 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -22,21 +22,37 @@ use polkadot_primitives::parachain::{ Retriable, CollatorId, AbridgedCandidateReceipt, GlobalValidationSchedule, LocalValidationData, }; +use sc_network_gossip::TopicNotification; use sp_blockchain::Result as ClientResult; use sp_api::{ApiRef, Core, RuntimeVersion, StorageProof, ApiErrorExt, ApiExt, ProvideRuntimeApi}; use sp_runtime::traits::{Block as BlockT, HasherFor, NumberFor}; use sp_state_machine::ChangesTrieState; use sp_core::{NativeOrEncoded, ExecutionContext}; +#[derive(Default)] struct MockNetworkOps { - recorded: Arc>, + recorded: Mutex, } +#[derive(Default)] struct Recorded { peer_reputations: HashMap, notifications: Vec<(PeerId, Message)>, } +#[derive(Default, Clone)] +struct MockGossip { + inner: Arc>>>, +} + +impl MockGossip { + fn add_gossip_stream(&self, topic: Hash) -> mpsc::UnboundedSender { + let (tx, rx) = mpsc::unbounded(); + self.inner.lock().insert(topic, rx); + tx + } +} + impl NetworkServiceOps for MockNetworkOps { fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange) { let mut recorded = self.recorded.lock(); @@ -57,6 +73,40 @@ impl NetworkServiceOps for MockNetworkOps { } } +impl crate::legacy::GossipService for MockGossip { + fn gossip_messages_for(&self, topic: Hash) -> crate::legacy::GossipMessageStream { + crate::legacy::GossipMessageStream::new(match self.inner.lock().remove(&topic) { + None => Box::pin(stream::empty()), + Some(rx) => Box::pin(rx), + }) + } + + fn gossip_message(&self, _topic: Hash, _message: GossipMessage) { + + } + + fn send_message(&self, _who: PeerId, _message: GossipMessage) { + + } +} + +impl GossipOps for MockGossip { + fn new_local_leaf( + &self, + _relay_parent: Hash, + _validation_data: crate::legacy::gossip::MessageValidationData, + ) -> crate::legacy::gossip::NewLeafActions { + crate::legacy::gossip::NewLeafActions::new() + } + + fn register_availability_store( + &self, + _store: av_store::Store, + ) { + + } +} + #[derive(Default)] struct ApiData { validators: Vec, @@ -69,6 +119,7 @@ struct TestApi { data: Arc>, } +#[derive(Default)] struct RuntimeApi { data: Arc>, } @@ -241,17 +292,3 @@ fn router_inner_drop_sends_worker_message() { } } -#[test] -fn erasure_chunk_receiver_drop_cancels_gossip_listen() { - let mut pool = futures::executor::LocalPool::new(); - - // worker_loop( - // config: Config, - // service: Arc, - // gossip_handle: RegisteredMessageValidator, - // sender: mpsc::Sender, - // api: Arc, - // mut receiver: mpsc::Receiver, - // executor: Sp, - // ) -} From c982aa22e4956b1a1de5e0b95016aabba8890e74 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 4 Mar 2020 18:48:20 -0800 Subject: [PATCH 09/16] skeleton test --- Cargo.lock | 1 + network/Cargo.toml | 1 + network/src/protocol/tests.rs | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 34 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index eca3d455bb76..c12117ca8eae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3832,6 +3832,7 @@ dependencies = [ "exit-future 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/network/Cargo.toml b/network/Cargo.toml index 4e397a04db1b..5405741170df 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -31,3 +31,4 @@ wasm-timer = "0.2.4" [dev-dependencies] sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } +lazy_static = "1.0" diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index 6a670828d8ca..cba69d958fb6 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -275,6 +275,12 @@ impl ParachainHost for RuntimeApi { } } +lazy_static::lazy_static! { + static ref EXECUTOR: futures::executor::ThreadPool = futures::executor::ThreadPool::builder() + .pool_size(1) + .create() + .unwrap(); +} #[test] fn router_inner_drop_sends_worker_message() { @@ -292,3 +298,29 @@ fn router_inner_drop_sends_worker_message() { } } +#[test] +fn spawn_worker_task() { + let pool = EXECUTOR.clone(); + + let network_ops = Arc::new(MockNetworkOps::default()); + let mock_gossip = MockGossip::default(); + let (worker_tx, worker_rx) = mpsc::channel(0); + let api = Arc::new(TestApi::default()); + + let worker_task = worker_loop( + Config { collating_for: None }, + network_ops.clone(), + mock_gossip.clone(), + worker_tx.clone(), + api.clone(), + worker_rx, + pool.clone(), + ); + + let _service = Service { + sender: worker_tx, + network_service: network_ops, + }; + + pool.spawn(worker_task).unwrap(); +} From d4b0cc9eb1923426fb4f1aa649814ce440f88cd0 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 5 Mar 2020 16:07:10 -0800 Subject: [PATCH 10/16] remove dependence of shared table on router --- validation/src/block_production.rs | 4 +- validation/src/shared_table/mod.rs | 76 ++++++++++++++++-------------- 2 files changed, 41 insertions(+), 39 deletions(-) diff --git a/validation/src/block_production.rs b/validation/src/block_production.rs index ad8ffb86cc6a..5fb3dd8a62ec 100644 --- a/validation/src/block_production.rs +++ b/validation/src/block_production.rs @@ -197,7 +197,6 @@ impl consensus::Proposer for Proposer consensus::Proposer for Proposer { parent_id: BlockId, client: Arc, transaction_pool: Arc, - table: Arc, inherent_data: Option, inherent_digests: DigestFor, deadline: Instant, diff --git a/validation/src/shared_table/mod.rs b/validation/src/shared_table/mod.rs index eb38dae2b871..2b6a8e62271b 100644 --- a/validation/src/shared_table/mod.rs +++ b/validation/src/shared_table/mod.rs @@ -34,7 +34,7 @@ use futures::channel::oneshot; use log::{warn, debug}; use bitvec::bitvec; -use super::{GroupInfo, TableRouter}; +use super::GroupInfo; use self::includable::IncludabilitySender; use primitives::Pair; use sp_api::ProvideRuntimeApi; @@ -135,14 +135,14 @@ impl SharedTableInner { // // the statement producer, if any, will produce only statements concerning the same candidate // as the one just imported - fn import_remote_statement( + fn import_remote_statement( &mut self, context: &TableContext, - router: &R, + fetch_pov_block: impl Fn(&AbridgedCandidateReceipt) -> Fetch, statement: table::SignedStatement, max_block_data_size: Option, ) -> Option> { let summary = self.table.import_statement(context, statement)?; self.update_trackers(&summary.candidate, context); @@ -175,7 +175,7 @@ impl SharedTableInner { None } Some(candidate) => { - let fetch = router.fetch_pov_block(candidate); + let fetch = fetch_pov_block(candidate); Some(Work { candidate_receipt: candidate.clone(), @@ -446,14 +446,19 @@ impl SharedTable { /// /// The ParachainWork, if any, will produce only statements concerning the same candidate /// as the one just imported - pub fn import_remote_statement( + pub fn import_remote_statement( &self, - router: &R, + fetch_pov_block: impl Fn(&AbridgedCandidateReceipt) -> Fetch, statement: table::SignedStatement, ) -> Option> { - self.inner.lock().import_remote_statement(&*self.context, router, statement, self.max_block_data_size) + self.inner.lock().import_remote_statement( + &*self.context, + fetch_pov_block, + statement, + self.max_block_data_size, + ) } /// Import many statements at once. @@ -464,18 +469,26 @@ impl SharedTable { /// /// The ParachainWork, if any, will produce only statements concerning the same candidate /// as the one just imported - pub fn import_remote_statements(&self, router: &R, iterable: I) -> U + pub fn import_remote_statements( + &self, + fetch_pov_block: impl Fn(&AbridgedCandidateReceipt) -> Fetch, + iterable: I, + ) -> U where - R: TableRouter, I: IntoIterator, U: ::std::iter::FromIterator>>, { let mut inner = self.inner.lock(); iterable.into_iter().map(move |statement| { - inner.import_remote_statement(&*self.context, router, statement, self.max_block_data_size) + inner.import_remote_statement( + &*self.context, + &fetch_pov_block, + statement, + self.max_block_data_size, + ) }).collect() } @@ -562,7 +575,7 @@ impl SharedTable { self.inner.lock().table.get_misbehavior().clone() } - /// Track includability of a given set of candidate hashes. + /// Track includability of a given set of candidate hashes. pub fn track_includability(&self, iterable: I) -> oneshot::Receiver<()> where I: IntoIterator { @@ -626,23 +639,14 @@ mod tests { ) {} } - #[derive(Clone)] - struct DummyRouter; - impl TableRouter for DummyRouter { - type Error = ::std::io::Error; - type SendLocalCollation = future::Ready>; - type FetchValidationProof = future::Ready>; - - fn local_collation( - &self, - _candidate: AbridgedCandidateReceipt, - _pov_block: PoVBlock, - _chunks: (ValidatorIndex, &[ErasureChunk]) - ) -> Self::SendLocalCollation { future::ready(Ok(())) } - - fn fetch_pov_block(&self, _candidate: &AbridgedCandidateReceipt) -> Self::FetchValidationProof { - future::ok(pov_block_with_data(vec![1, 2, 3, 4, 5])) - } + fn lazy_fetch_pov() + -> Box< + dyn Fn(&AbridgedCandidateReceipt) -> future::Ready< + Result + > + > + { + Box::new(|_| future::ok(pov_block_with_data(vec![1, 2, 3, 4, 5]))) } #[test] @@ -688,7 +692,7 @@ mod tests { }; shared_table.import_remote_statement( - &DummyRouter, + lazy_fetch_pov(), signed_statement, ).expect("candidate and local validity group are same"); } @@ -736,7 +740,7 @@ mod tests { }; shared_table.import_remote_statement( - &DummyRouter, + lazy_fetch_pov(), signed_statement, ).expect("should produce work"); } @@ -909,7 +913,7 @@ mod tests { }; let _a = shared_table.import_remote_statement( - &DummyRouter, + lazy_fetch_pov(), signed_statement.clone(), ).expect("should produce work"); @@ -917,7 +921,7 @@ mod tests { .expect("validation has started").is_in_progress()); let b = shared_table.import_remote_statement( - &DummyRouter, + lazy_fetch_pov(), signed_statement.clone(), ); @@ -967,7 +971,7 @@ mod tests { .expect("validation has started").is_done()); let a = shared_table.import_remote_statement( - &DummyRouter, + lazy_fetch_pov(), signed_statement, ); From 736e788ff9781904ab68b0f6072fc5c3daf418fc Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 5 Mar 2020 16:31:39 -0800 Subject: [PATCH 11/16] remove worker dependence on its own sender --- network/src/protocol/mod.rs | 47 ++++++++++++++++------------------- network/src/protocol/tests.rs | 1 - 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/network/src/protocol/mod.rs b/network/src/protocol/mod.rs index 94fa804887a7..cb9a5eddd9ec 100644 --- a/network/src/protocol/mod.rs +++ b/network/src/protocol/mod.rs @@ -46,7 +46,7 @@ use sp_runtime::ConsensusEngineId; use std::collections::{hash_map::{Entry, HashMap}, HashSet}; use std::pin::Pin; -use std::sync::{Arc, Weak}; +use std::sync::Arc; use std::time::Duration; use super::{cost, benefit, PolkadotNetworkService}; @@ -76,7 +76,7 @@ enum ServiceToWorkerMsg { PeerDisconnected(PeerId), // service messages. - BuildConsensusNetworking(Arc, Vec, oneshot::Sender), + BuildConsensusNetworking(Arc, Vec), DropConsensusNetworking(Hash), SubmitValidatedCollation( AbridgedCandidateReceipt, @@ -223,7 +223,6 @@ pub fn start( config, service.clone(), gossip_validator, - worker_sender.clone(), api, worker_receiver, executor.clone(), @@ -772,7 +771,6 @@ async fn worker_loop( config: Config, service: Arc, gossip_handle: impl GossipOps, - sender: mpsc::Sender, api: Arc, mut receiver: mpsc::Receiver, executor: Sp, @@ -812,15 +810,11 @@ async fn worker_loop( protocol_handler.on_raw_messages(remote, messages) } - ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities, router_sender) => { + ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) => { // glue: let gossip know about our new local leaf. let relay_parent = table.consensus_parent_hash().clone(); let (signal, exit) = exit_future::signal(); - let router = Router { - inner: Arc::new(RouterInner { relay_parent, sender: sender.clone() }), - }; - let key = table.session_key(); if let Some(key) = key { if let InsertedRecentKey::New(_) = local_keys.insert(key.clone()) { @@ -842,21 +836,16 @@ async fn worker_loop( _drop_signal: signal, }); - let weak_router = Arc::downgrade(&router.inner); - // glue the incoming messages, shared table, and validation // work together. let _ = executor.spawn(statement_import_loop( relay_parent, table, api.clone(), - weak_router, gossip_handle.clone(), exit, executor.clone(), )); - - let _ = router_sender.send(router); } ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => { consensus_instances.remove(&relay_parent); @@ -1018,7 +1007,6 @@ async fn statement_import_loop( relay_parent: Hash, table: Arc, api: Arc, - weak_router: Weak, gossip_handle: impl GossipOps, mut exit: exit_future::Exit, executor: impl Spawn, @@ -1073,14 +1061,16 @@ async fn statement_import_loop( statements.insert(0, statement); let producers: Vec<_> = { - // create a temporary router handle for importing all of these statements - let temp_router = match weak_router.upgrade() { - None => break, - Some(inner) => Router { inner }, - }; + // TODO: fetch these from gossip. + // https://github.com/paritytech/polkadot/issues/742 + fn ignore_pov_fetch_requests(_: &AbridgedCandidateReceipt) + -> future::Pending> + { + future::pending() + } table.import_remote_statements( - &temp_router, + &ignore_pov_fetch_requests, statements.iter().cloned(), ) }; @@ -1268,7 +1258,7 @@ impl Service { } impl ParachainNetwork for Service { - type Error = future::Either; + type Error = mpsc::SendError; type TableRouter = Router; type BuildTableRouter = Pin> + Send>>; @@ -1279,14 +1269,19 @@ impl ParachainNetwork for Service { ) -> Self::BuildTableRouter { let authorities = authorities.to_vec(); let mut sender = self.sender.clone(); + let relay_parent = table.consensus_parent_hash().clone(); - let (tx, rx) = oneshot::channel(); Box::pin(async move { sender.send( - ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities, tx) - ).map_err(future::Either::Left).await?; + ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) + ).await?; - rx.map_err(future::Either::Right).await + Ok(Router { + inner: Arc::new(RouterInner { + relay_parent, + sender, + }) + }) }) } } diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index cba69d958fb6..d57fe694aace 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -311,7 +311,6 @@ fn spawn_worker_task() { Config { collating_for: None }, network_ops.clone(), mock_gossip.clone(), - worker_tx.clone(), api.clone(), worker_rx, pool.clone(), From 4b1f39663f36e61aa7157b8d25f384086e982167 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 5 Mar 2020 16:33:34 -0800 Subject: [PATCH 12/16] test shutdown --- network/src/protocol/tests.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index d57fe694aace..cd2546ca5391 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -299,7 +299,7 @@ fn router_inner_drop_sends_worker_message() { } #[test] -fn spawn_worker_task() { +fn worker_task_shuts_down_when_sender_dropped() { let pool = EXECUTOR.clone(); let network_ops = Arc::new(MockNetworkOps::default()); @@ -316,10 +316,11 @@ fn spawn_worker_task() { pool.clone(), ); - let _service = Service { + let service = Service { sender: worker_tx, network_service: network_ops, }; - pool.spawn(worker_task).unwrap(); + drop(service); + let _ = futures::executor::block_on(worker_task); } From 8444134bf4196e263c1463fffc60b7c76f09e42a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 11 Mar 2020 13:50:33 -0700 Subject: [PATCH 13/16] add tests --- network/src/protocol/mod.rs | 46 +++----- network/src/protocol/tests.rs | 216 ++++++++++++++++++++++++++++++---- 2 files changed, 210 insertions(+), 52 deletions(-) diff --git a/network/src/protocol/mod.rs b/network/src/protocol/mod.rs index cb9a5eddd9ec..058f67eae804 100644 --- a/network/src/protocol/mod.rs +++ b/network/src/protocol/mod.rs @@ -454,6 +454,7 @@ struct ProtocolHandler { // reverse mapping from validator-ID to PeerID. Multiple peers can represent // the same validator because of sentry nodes. connected_validators: HashMap>, + consensus_instances: HashMap, collators: crate::legacy::collator_pool::CollatorPool, local_collations: crate::legacy::local_collations::LocalCollations, config: Config, @@ -468,6 +469,7 @@ impl ProtocolHandler { service, peers: HashMap::new(), connected_validators: HashMap::new(), + consensus_instances: HashMap::new(), collators: Default::default(), local_collations: Default::default(), config, @@ -475,7 +477,7 @@ impl ProtocolHandler { } fn on_connect(&mut self, peer: PeerId, roles: Roles) { - let claimed_validator = roles.contains(sc_network::config::Roles::AUTHORITY); + let claimed_validator = roles.contains(Roles::AUTHORITY); self.peers.insert(peer.clone(), PeerData { claimed_validator, @@ -679,6 +681,7 @@ impl ProtocolHandler { if let Some(invalidated) = invalidated_key { self.validator_representative_removed(invalidated, &remote); } + self.connected_validators.entry(key).or_insert_with(HashSet::new).insert(remote.clone()); send_peer_collations(&*self.service, remote, collations_to_send); } @@ -782,7 +785,6 @@ async fn worker_loop( const COLLECT_GARBAGE_INTERVAL: Duration = Duration::from_secs(29); let mut protocol_handler = ProtocolHandler::new(service, config); - let mut consensus_instances = HashMap::new(); let mut local_keys = RecentValidatorIds::default(); let mut collect_garbage = stream::unfold((), move |_| { @@ -829,12 +831,15 @@ async fn worker_loop( new_leaf_actions.perform(&gossip_handle); - consensus_instances.insert(relay_parent, ConsensusNetworkingInstance { - statement_table: table.clone(), + protocol_handler.consensus_instances.insert( relay_parent, - attestation_topic: crate::legacy::gossip::attestation_topic(relay_parent), - _drop_signal: signal, - }); + ConsensusNetworkingInstance { + statement_table: table.clone(), + relay_parent, + attestation_topic: crate::legacy::gossip::attestation_topic(relay_parent), + _drop_signal: signal, + }, + ); // glue the incoming messages, shared table, and validation // work together. @@ -848,11 +853,11 @@ async fn worker_loop( )); } ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => { - consensus_instances.remove(&relay_parent); + protocol_handler.consensus_instances.remove(&relay_parent); } ServiceToWorkerMsg::SubmitValidatedCollation(receipt, pov_block, chunks) => { let relay_parent = receipt.relay_parent; - let instance = match consensus_instances.get(&relay_parent) { + let instance = match protocol_handler.consensus_instances.get(&relay_parent) { None => continue, Some(instance) => instance, }; @@ -1232,29 +1237,6 @@ impl Service { .flatten_stream() .boxed() } - - /// Synchronizes the worker thread. Executes the callback in the worker's - /// context, and returns a future that resolves once it's been executed. - #[cfg(test)] - fn synchronize( - &self, - callback: impl FnOnce(&mut ProtocolHandler) -> T + Send + 'static, - ) -> impl Future { - let (tx, rx) = oneshot::channel(); - let mut sender = self.sender.clone(); - - async move { - let msg = ServiceToWorkerMsg::Synchronize(Box::new(move |proto| { - let res = callback(proto); - if let Err(_) = tx.send(res) { - log::warn!(target: "p_net", "Failed to send synchronization result"); - } - })); - - sender.send(msg).await.expect("Worker thread unexpectedly hung up"); - rx.await.expect("Worker thread failed to send back result") - } - } } impl ParachainNetwork for Service { diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index cd2546ca5391..b379c380c82d 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -22,12 +22,16 @@ use polkadot_primitives::parachain::{ Retriable, CollatorId, AbridgedCandidateReceipt, GlobalValidationSchedule, LocalValidationData, }; +use polkadot_validation::SharedTable; + +use av_store::Store as AvailabilityStore; use sc_network_gossip::TopicNotification; use sp_blockchain::Result as ClientResult; use sp_api::{ApiRef, Core, RuntimeVersion, StorageProof, ApiErrorExt, ApiExt, ProvideRuntimeApi}; use sp_runtime::traits::{Block as BlockT, HasherFor, NumberFor}; use sp_state_machine::ChangesTrieState; -use sp_core::{NativeOrEncoded, ExecutionContext}; +use sp_core::{crypto::Pair, NativeOrEncoded, ExecutionContext}; +use sp_keyring::Sr25519Keyring; #[derive(Default)] struct MockNetworkOps { @@ -46,6 +50,7 @@ struct MockGossip { } impl MockGossip { + #[allow(unused)] fn add_gossip_stream(&self, topic: Hash) -> mpsc::UnboundedSender { let (tx, rx) = mpsc::unbounded(); self.inner.lock().insert(topic, rx); @@ -275,6 +280,39 @@ impl ParachainHost for RuntimeApi { } } +impl super::Service { + async fn connect_peer(&mut self, peer: PeerId, roles: Roles) { + self.sender.send(ServiceToWorkerMsg::PeerConnected(peer, roles)).await.unwrap(); + } + + async fn peer_message(&mut self, peer: PeerId, message: Message) { + let bytes = message.encode().into(); + + self.sender.send(ServiceToWorkerMsg::PeerMessage(peer, vec![bytes])).await.unwrap(); + } + + async fn disconnect_peer(&mut self, peer: PeerId) { + self.sender.send(ServiceToWorkerMsg::PeerDisconnected(peer)).await.unwrap(); + } + + async fn synchronize( + &mut self, + callback: impl FnOnce(&mut ProtocolHandler) -> T + Send + 'static, + ) -> T { + let (tx, rx) = oneshot::channel(); + + let msg = ServiceToWorkerMsg::Synchronize(Box::new(move |proto| { + let res = callback(proto); + if let Err(_) = tx.send(res) { + log::warn!(target: "p_net", "Failed to send synchronization result"); + } + })); + + self.sender.send(msg).await.expect("Worker thread unexpectedly hung up"); + rx.await.expect("Worker thread failed to send back result") + } +} + lazy_static::lazy_static! { static ref EXECUTOR: futures::executor::ThreadPool = futures::executor::ThreadPool::builder() .pool_size(1) @@ -282,24 +320,11 @@ lazy_static::lazy_static! { .unwrap(); } -#[test] -fn router_inner_drop_sends_worker_message() { - let parent = [1; 32].into(); - - let (sender, mut receiver) = mpsc::channel(0); - drop(RouterInner { - relay_parent: parent, - sender, - }); - - match receiver.try_next() { - Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x), - _ => panic!("message not sent"), - } -} - -#[test] -fn worker_task_shuts_down_when_sender_dropped() { +fn test_setup(config: Config) -> ( + Service, + MockGossip, + impl Future + Send + 'static, +) { let pool = EXECUTOR.clone(); let network_ops = Arc::new(MockNetworkOps::default()); @@ -308,7 +333,7 @@ fn worker_task_shuts_down_when_sender_dropped() { let api = Arc::new(TestApi::default()); let worker_task = worker_loop( - Config { collating_for: None }, + config, network_ops.clone(), mock_gossip.clone(), api.clone(), @@ -321,6 +346,157 @@ fn worker_task_shuts_down_when_sender_dropped() { network_service: network_ops, }; + (service, mock_gossip, worker_task) +} + +#[test] +fn router_inner_drop_sends_worker_message() { + let parent = [1; 32].into(); + + let (sender, mut receiver) = mpsc::channel(0); + drop(RouterInner { + relay_parent: parent, + sender, + }); + + match receiver.try_next() { + Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x), + _ => panic!("message not sent"), + } +} + +#[test] +fn worker_task_shuts_down_when_sender_dropped() { + let (service, _gossip, worker_task) = test_setup(Config { collating_for: None }); + drop(service); let _ = futures::executor::block_on(worker_task); } + +#[test] +fn consensus_instances_cleaned_up() { + let (mut service, _gossip, worker_task) = test_setup(Config { collating_for: None }); + let relay_parent = [0; 32].into(); + let authorities = Vec::new(); + + let table = Arc::new(SharedTable::new( + Vec::new(), + HashMap::new(), + None, + relay_parent, + AvailabilityStore::new_in_memory(service.clone()), + None, + )); + + let executor = EXECUTOR.clone(); + executor.spawn(worker_task).unwrap(); + + let router = futures::executor::block_on( + service.build_table_router(table, &authorities) + ).unwrap(); + + drop(router); + + assert!(futures::executor::block_on(service.synchronize(move |proto| { + !proto.consensus_instances.contains_key(&relay_parent) + }))); +} + +#[test] +fn validator_peer_cleaned_up() { + let (mut service, _gossip, worker_task) = test_setup(Config { collating_for: None }); + + let peer = PeerId::random(); + let validator_key = Sr25519Keyring::Alice.pair(); + let validator_id = ValidatorId::from(validator_key.public()); + + EXECUTOR.clone().spawn(worker_task).unwrap(); + futures::executor::block_on(async move { + service.connect_peer(peer.clone(), Roles::AUTHORITY).await; + service.peer_message(peer.clone(), Message::Status(Status { + version: VERSION, + collating_for: None, + })).await; + service.peer_message(peer.clone(), Message::ValidatorId(validator_id.clone())).await; + + let p = peer.clone(); + let v = validator_id.clone(); + let (peer_has_key, reverse_lookup) = service.synchronize(move |proto| { + let peer_has_key = proto.peers.get(&p).map_or( + false, + |p_data| p_data.session_keys.as_slice().contains(&v), + ); + + let reverse_lookup = proto.connected_validators.get(&v).map_or( + false, + |reps| reps.contains(&p), + ); + + (peer_has_key, reverse_lookup) + }).await; + + assert!(peer_has_key); + assert!(reverse_lookup); + + service.disconnect_peer(peer.clone()).await; + + let p = peer.clone(); + let v = validator_id.clone(); + let (peer_removed, rev_removed) = service.synchronize(move |proto| { + let peer_removed = !proto.peers.contains_key(&p); + let reverse_mapping_removed = !proto.connected_validators.contains_key(&v); + + (peer_removed, reverse_mapping_removed) + }).await; + + assert!(peer_removed); + assert!(rev_removed); + }); +} + +#[test] +fn validator_key_spillover_cleaned() { + let (mut service, _gossip, worker_task) = test_setup(Config { collating_for: None }); + + let peer = PeerId::random(); + let make_validator_id = |ring: Sr25519Keyring| ValidatorId::from(ring.public()); + + // We will push 1 extra beyond what is normally kept. + assert_eq!(RECENT_SESSIONS, 3); + let key_a = make_validator_id(Sr25519Keyring::Alice); + let key_b = make_validator_id(Sr25519Keyring::Bob); + let key_c = make_validator_id(Sr25519Keyring::Charlie); + let key_d = make_validator_id(Sr25519Keyring::Dave); + + let keys = vec![key_a, key_b, key_c, key_d]; + + EXECUTOR.clone().spawn(worker_task).unwrap(); + futures::executor::block_on(async move { + service.connect_peer(peer.clone(), Roles::AUTHORITY).await; + service.peer_message(peer.clone(), Message::Status(Status { + version: VERSION, + collating_for: None, + })).await; + + for key in &keys { + service.peer_message(peer.clone(), Message::ValidatorId(key.clone())).await; + } + + let p = peer.clone(); + let active_keys = keys[1..].to_vec(); + let discarded_key = keys[0].clone(); + assert!(service.synchronize(move |proto| { + let active_correct = proto.peers.get(&p).map_or(false, |p_data| { + p_data.session_keys.as_slice() == &active_keys[..] + }); + + let active_lookup = active_keys.iter().all(|k| { + proto.connected_validators.get(&k).map_or(false, |m| m.contains(&p)) + }); + + let discarded = !proto.connected_validators.contains_key(&discarded_key); + + active_correct && active_lookup && discarded + }).await); + }); +} From c35dae86374c89dd63ec7e331c40989135a0e6f0 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 11 Mar 2020 21:40:57 -0700 Subject: [PATCH 14/16] test that gossip streams are cleaned up correctly --- network/src/protocol/mod.rs | 4 +- network/src/protocol/tests.rs | 92 ++++++++++++++++++++++++++++++----- 2 files changed, 82 insertions(+), 14 deletions(-) diff --git a/network/src/protocol/mod.rs b/network/src/protocol/mod.rs index 058f67eae804..243e22d44c08 100644 --- a/network/src/protocol/mod.rs +++ b/network/src/protocol/mod.rs @@ -153,7 +153,7 @@ impl NetworkServiceOps for PolkadotNetworkService { } /// Operations that a handle to a gossip network should provide. -trait GossipOps: Clone + crate::legacy::GossipService { +trait GossipOps: Clone + Send + crate::legacy::GossipService + 'static { fn new_local_leaf( &self, relay_parent: Hash, @@ -374,8 +374,6 @@ struct ConsensusNetworkingInstance { _drop_signal: exit_future::Signal, } -type RegisteredMessageValidator = crate::legacy::gossip::RegisteredMessageValidator; - /// A utility future that resolves when the receiving end of a channel has hung up. /// /// This is an `.await`-friendly interface around `poll_canceled`. diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index b379c380c82d..ac9cdbb02273 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -20,15 +20,15 @@ use polkadot_primitives::{Block, Header, BlockId}; use polkadot_primitives::parachain::{ Id as ParaId, Chain, DutyRoster, ParachainHost, ValidatorId, Retriable, CollatorId, AbridgedCandidateReceipt, - GlobalValidationSchedule, LocalValidationData, + GlobalValidationSchedule, LocalValidationData, ErasureChunk, }; use polkadot_validation::SharedTable; -use av_store::Store as AvailabilityStore; +use av_store::{Store as AvailabilityStore, ErasureNetworking}; use sc_network_gossip::TopicNotification; use sp_blockchain::Result as ClientResult; use sp_api::{ApiRef, Core, RuntimeVersion, StorageProof, ApiErrorExt, ApiExt, ProvideRuntimeApi}; -use sp_runtime::traits::{Block as BlockT, HasherFor, NumberFor}; +use sp_runtime::traits::{Block as BlockT, HashFor, NumberFor}; use sp_state_machine::ChangesTrieState; use sp_core::{crypto::Pair, NativeOrEncoded, ExecutionContext}; use sp_keyring::Sr25519Keyring; @@ -44,17 +44,27 @@ struct Recorded { notifications: Vec<(PeerId, Message)>, } +// Test setup registers receivers of gossip messages as well as signals that +// fire when they are taken. +type GossipStreamEntry = (mpsc::UnboundedReceiver, oneshot::Sender<()>); + #[derive(Default, Clone)] struct MockGossip { - inner: Arc>>>, + inner: Arc>>, } impl MockGossip { - #[allow(unused)] - fn add_gossip_stream(&self, topic: Hash) -> mpsc::UnboundedSender { + fn add_gossip_stream(&self, topic: Hash) + -> (mpsc::UnboundedSender, oneshot::Receiver<()>) + { let (tx, rx) = mpsc::unbounded(); - self.inner.lock().insert(topic, rx); - tx + let (o_tx, o_rx) = oneshot::channel(); + self.inner.lock().insert(topic, (rx, o_tx)); + (tx, o_rx) + } + + fn contains_listener(&self, topic: &Hash) -> bool { + self.inner.lock().contains_key(topic) } } @@ -82,7 +92,10 @@ impl crate::legacy::GossipService for MockGossip { fn gossip_messages_for(&self, topic: Hash) -> crate::legacy::GossipMessageStream { crate::legacy::GossipMessageStream::new(match self.inner.lock().remove(&topic) { None => Box::pin(stream::empty()), - Some(rx) => Box::pin(rx), + Some((rx, o_rx)) => { + let _ = o_rx.send(()); + Box::pin(rx) + } }) } @@ -174,7 +187,7 @@ impl ApiErrorExt for RuntimeApi { } impl ApiExt for RuntimeApi { - type StateBackend = sp_state_machine::InMemoryBackend>; + type StateBackend = sp_state_machine::InMemoryBackend>; fn map_api_result Result, R, E>( &self, @@ -196,7 +209,7 @@ impl ApiExt for RuntimeApi { fn into_storage_changes( &self, _: &Self::StateBackend, - _: Option<&ChangesTrieState, NumberFor>>, + _: Option<&ChangesTrieState, NumberFor>>, _: ::Hash, ) -> std::result::Result, String> where Self: Sized @@ -500,3 +513,60 @@ fn validator_key_spillover_cleaned() { }).await); }); } + +#[test] +fn erasure_fetch_drop_also_drops_gossip_sender() { + let (mut service, gossip, worker_task) = test_setup(Config { collating_for: None }); + let candidate_hash = [1; 32].into(); + + let expected_index = 1; + + let executor = EXECUTOR.clone(); + executor.spawn(worker_task).unwrap(); + + let topic = crate::erasure_coding_topic(&candidate_hash); + let (mut gossip_tx, gossip_taken_rx) = gossip.add_gossip_stream(topic); + + futures::executor::block_on(async move { + let chunk_listener = service.fetch_erasure_chunk( + &candidate_hash, + expected_index, + ); + + // spawn an abortable handle to the chunk listener future. + // we will wait until this future has proceeded enough to start grabbing + // messages from gossip, and then we will abort the future. + let (chunk_listener, abort_handle) = future::abortable(chunk_listener); + executor.spawn(chunk_listener.map(|_| ())).unwrap(); + gossip_taken_rx.await.unwrap(); + + // gossip listener was taken. and is active. + assert!(!gossip.contains_listener(&topic)); + assert!(!gossip_tx.is_closed()); + + abort_handle.abort(); + + loop { + // if dropping the sender leads to the gossip listener + // being cleaned up, we will eventually be unable to send a message + // on the sender. + if gossip_tx.is_closed() { break } + + let fake_chunk = GossipMessage::ErasureChunk( + crate::legacy::gossip::ErasureChunkMessage { + chunk: ErasureChunk { + chunk: vec![], + index: expected_index + 1, + proof: vec![], + }, + candidate_hash, + } + ).encode(); + + match gossip_tx.send(TopicNotification { message: fake_chunk, sender: None }).await { + Err(e) => { assert!(e.is_disconnected()); break }, + Ok(_) => continue, + } + } + }); +} From 56311545a67abae15b3ea1a25e43c84901e98e62 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 14 Mar 2020 18:04:20 -0400 Subject: [PATCH 15/16] refactor worker out into its own struct and reduce bound on executor --- network/src/protocol/mod.rs | 233 ++++++++++++++++++++++++------------ 1 file changed, 154 insertions(+), 79 deletions(-) diff --git a/network/src/protocol/mod.rs b/network/src/protocol/mod.rs index 243e22d44c08..f78d84c39289 100644 --- a/network/src/protocol/mod.rs +++ b/network/src/protocol/mod.rs @@ -123,6 +123,12 @@ enum ServiceToWorkerMsg { Synchronize(Box), } +/// Messages from a background task to the main worker task. +enum BackgroundToWorkerMsg { + // Spawn a given future. + Spawn(future::BoxFuture<'static, ()>), +} + /// Operations that a handle to an underlying network service should provide. trait NetworkServiceOps: Send + Sync { /// Report the peer as having a particular positive or negative value. @@ -456,6 +462,7 @@ struct ProtocolHandler { collators: crate::legacy::collator_pool::CollatorPool, local_collations: crate::legacy::local_collations::LocalCollations, config: Config, + local_keys: RecentValidatorIds, } impl ProtocolHandler { @@ -470,6 +477,7 @@ impl ProtocolHandler { consensus_instances: HashMap::new(), collators: Default::default(), local_collations: Default::default(), + local_keys: Default::default(), config, } } @@ -752,6 +760,11 @@ impl ProtocolHandler { } } } + + fn drop_consensus_networking(&mut self, relay_parent: &Hash) { + // this triggers an abort of the background task. + self.consensus_instances.remove(relay_parent); + } } fn send_peer_collations( @@ -768,95 +781,90 @@ fn send_peer_collations( } } -async fn worker_loop( - config: Config, - service: Arc, - gossip_handle: impl GossipOps, +struct Worker { + protocol_handler: ProtocolHandler, api: Arc, - mut receiver: mpsc::Receiver, executor: Sp, -) where + gossip_handle: Gossip, + background_to_main_sender: mpsc::Sender, + background_receiver: mpsc::Receiver, + service_receiver: mpsc::Receiver, +} + +impl Worker where Api: ProvideRuntimeApi + Send + Sync + 'static, Api::Api: ParachainHost, - Sp: Spawn + Clone + Send + 'static, + Sp: Spawn + Clone, + Gossip: GossipOps, { - const COLLECT_GARBAGE_INTERVAL: Duration = Duration::from_secs(29); + // spawns a background task to spawn consensus networking. + fn build_consensus_networking( + &mut self, + table: Arc, + authorities: Vec, + ) { + // glue: let gossip know about our new local leaf. + let relay_parent = table.consensus_parent_hash().clone(); + let (signal, exit) = exit_future::signal(); + + let key = table.session_key(); + if let Some(key) = key { + if let InsertedRecentKey::New(_) = self.protocol_handler.local_keys.insert(key.clone()) { + self.protocol_handler.distribute_new_session_key(key); + } + } - let mut protocol_handler = ProtocolHandler::new(service, config); - let mut local_keys = RecentValidatorIds::default(); + let new_leaf_actions = self.gossip_handle.new_local_leaf( + relay_parent, + crate::legacy::gossip::MessageValidationData { authorities }, + ); - let mut collect_garbage = stream::unfold((), move |_| { - futures_timer::Delay::new(COLLECT_GARBAGE_INTERVAL).map(|_| Some(((), ()))) - }).map(drop); + new_leaf_actions.perform(&self.gossip_handle); - loop { - let message = match future::select(receiver.next(), collect_garbage.next()).await { - Either::Left((None, _)) | Either::Right((None, _)) => break, - Either::Left((Some(message), _)) => message, - Either::Right(_) => { - protocol_handler.collect_garbage(); - continue - } - }; + self.protocol_handler.consensus_instances.insert( + relay_parent, + ConsensusNetworkingInstance { + statement_table: table.clone(), + relay_parent, + attestation_topic: crate::legacy::gossip::attestation_topic(relay_parent), + _drop_signal: signal, + }, + ); + + // glue the incoming messages, shared table, and validation + // work together. + let _ = self.executor.spawn(statement_import_loop( + relay_parent, + table, + self.api.clone(), + self.gossip_handle.clone(), + self.background_to_main_sender.clone(), + exit, + )); + } + fn handle_service_message(&mut self, message: ServiceToWorkerMsg) { match message { ServiceToWorkerMsg::PeerConnected(remote, roles) => { - protocol_handler.on_connect(remote, roles); + self.protocol_handler.on_connect(remote, roles); } ServiceToWorkerMsg::PeerDisconnected(remote) => { - protocol_handler.on_disconnect(remote); + self.protocol_handler.on_disconnect(remote); } ServiceToWorkerMsg::PeerMessage(remote, messages) => { - protocol_handler.on_raw_messages(remote, messages) + self.protocol_handler.on_raw_messages(remote, messages) } ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) => { - // glue: let gossip know about our new local leaf. - let relay_parent = table.consensus_parent_hash().clone(); - let (signal, exit) = exit_future::signal(); - - let key = table.session_key(); - if let Some(key) = key { - if let InsertedRecentKey::New(_) = local_keys.insert(key.clone()) { - protocol_handler.distribute_new_session_key(key); - } - } - - let new_leaf_actions = gossip_handle.new_local_leaf( - relay_parent, - crate::legacy::gossip::MessageValidationData { authorities }, - ); - - new_leaf_actions.perform(&gossip_handle); - - protocol_handler.consensus_instances.insert( - relay_parent, - ConsensusNetworkingInstance { - statement_table: table.clone(), - relay_parent, - attestation_topic: crate::legacy::gossip::attestation_topic(relay_parent), - _drop_signal: signal, - }, - ); - - // glue the incoming messages, shared table, and validation - // work together. - let _ = executor.spawn(statement_import_loop( - relay_parent, - table, - api.clone(), - gossip_handle.clone(), - exit, - executor.clone(), - )); + self.build_consensus_networking(table, authorities); } ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => { - protocol_handler.consensus_instances.remove(&relay_parent); + self.protocol_handler.drop_consensus_networking(&relay_parent); } ServiceToWorkerMsg::SubmitValidatedCollation(receipt, pov_block, chunks) => { let relay_parent = receipt.relay_parent; - let instance = match protocol_handler.consensus_instances.get(&relay_parent) { - None => continue, + let instance = match self.protocol_handler.consensus_instances.get(&relay_parent) { + None => return, Some(instance) => instance, }; @@ -865,7 +873,7 @@ async fn worker_loop( receipt, pov_block, chunks, - &gossip_handle, + &self.gossip_handle, ); } ServiceToWorkerMsg::FetchPoVBlock(_candidate, _sender) => { @@ -879,7 +887,7 @@ async fn worker_loop( // valid chunk with the given index. // // so we only care about the first item of the filtered stream. - let get_msg = gossip_handle.gossip_messages_for(topic) + let get_msg = self.gossip_handle.gossip_messages_for(topic) .filter_map(move |(msg, _)| { future::ready(match msg { GossipMessage::ErasureChunk(chunk) => @@ -896,8 +904,8 @@ async fn worker_loop( "gossip message streams do not conclude early; qed" )); - let _ = executor.spawn(async move { - let res = future::select(get_msg, AwaitCanceled { inner: &mut sender}).await; + let _ = self.executor.spawn(async move { + let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await; if let Either::Left((chunk, _)) = res { let _ = sender.send(chunk); } @@ -905,7 +913,7 @@ async fn worker_loop( } ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, erasure_chunk) => { let topic = crate::erasure_coding_topic(&candidate_hash); - gossip_handle.gossip_message( + self.gossip_handle.gossip_message( topic, GossipMessage::ErasureChunk(ErasureChunkMessage { chunk: erasure_chunk, @@ -915,20 +923,20 @@ async fn worker_loop( } ServiceToWorkerMsg::AwaitCollation(relay_parent, para_id, sender) => { debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent); - protocol_handler.await_collation(relay_parent, para_id, sender) + self.protocol_handler.await_collation(relay_parent, para_id, sender) } ServiceToWorkerMsg::NoteBadCollator(collator) => { - protocol_handler.note_bad_collator(collator); + self.protocol_handler.note_bad_collator(collator); } ServiceToWorkerMsg::RegisterAvailabilityStore(store) => { - gossip_handle.register_availability_store(store); + self.gossip_handle.register_availability_store(store); } ServiceToWorkerMsg::OurCollation(targets, collation) => { - protocol_handler.distribute_our_collation(targets, collation); + self.protocol_handler.distribute_our_collation(targets, collation); } ServiceToWorkerMsg::ListenCheckedStatements(relay_parent, sender) => { let topic = crate::legacy::gossip::attestation_topic(relay_parent); - let checked_messages = gossip_handle.gossip_messages_for(topic) + let checked_messages = self.gossip_handle.gossip_messages_for(topic) .filter_map(|msg| match msg.0 { GossipMessage::Statement(s) => future::ready(Some(s.signed_statement)), _ => future::ready(None), @@ -939,12 +947,72 @@ async fn worker_loop( } #[cfg(test)] ServiceToWorkerMsg::Synchronize(callback) => { - (callback)(&mut protocol_handler) + (callback)(&mut self.protocol_handler) + } + } + } + + fn handle_background_message(&mut self, message: BackgroundToWorkerMsg) { + match message { + BackgroundToWorkerMsg::Spawn(task) => { + let _ = self.executor.spawn(task); + } + } + } + + async fn main_loop(&mut self) { + const COLLECT_GARBAGE_INTERVAL: Duration = Duration::from_secs(29); + + let mut collect_garbage = stream::unfold((), move |_| { + futures_timer::Delay::new(COLLECT_GARBAGE_INTERVAL).map(|_| Some(((), ()))) + }).map(drop); + + loop { + futures::select! { + _do_collect = collect_garbage.next() => { + self.protocol_handler.collect_garbage(); + } + service_msg = self.service_receiver.next() => match service_msg { + Some(msg) => self.handle_service_message(msg), + None => return, + }, + background_msg = self.background_receiver.next() => match background_msg { + Some(msg) => self.handle_background_message(msg), + None => return, + }, } } } } +async fn worker_loop( + config: Config, + service: Arc, + gossip_handle: impl GossipOps, + api: Arc, + receiver: mpsc::Receiver, + executor: Sp, +) where + Api: ProvideRuntimeApi + Send + Sync + 'static, + Api::Api: ParachainHost, + Sp: Spawn + Clone, +{ + const BACKGROUND_TO_MAIN_BUF: usize = 16; + + let (background_tx, background_rx) = mpsc::channel(BACKGROUND_TO_MAIN_BUF); + let mut worker = Worker { + protocol_handler: ProtocolHandler::new(service, config), + api, + executor, + gossip_handle, + background_to_main_sender: background_tx, + background_receiver: background_rx, + service_receiver: receiver, + }; + + worker.main_loop().await +} + // A unique trace for valid statements issued by a validator. #[derive(Hash, PartialEq, Eq, Clone, Debug)] pub(crate) enum StatementTrace { @@ -1011,8 +1079,8 @@ async fn statement_import_loop( table: Arc, api: Arc, gossip_handle: impl GossipOps, + mut to_worker: mpsc::Sender, mut exit: exit_future::Exit, - executor: impl Spawn, ) where Api: ProvideRuntimeApi + Send + Sync + 'static, Api::Api: ParachainHost, @@ -1109,7 +1177,14 @@ async fn statement_import_loop( }); let work = future::select(work.boxed(), exit.clone()).map(drop); - let _ = executor.spawn(work); + if let Err(_) = to_worker.send( + BackgroundToWorkerMsg::Spawn(work.boxed()) + ).await { + // can fail only if remote has hung up - worker is dead, + // we should die too. this is defensive, since the exit future + // would fire shortly anyway. + return + } } } } From f60f3fadb21f3bfb4f573d49f6418460783be2f8 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 14 Mar 2020 18:11:16 -0400 Subject: [PATCH 16/16] remove reliance of tests on global thread pool --- Cargo.lock | 1 - network/Cargo.toml | 1 - network/src/protocol/tests.rs | 61 ++++++++++++++++++----------------- 3 files changed, 31 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c12117ca8eae..eca3d455bb76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3832,7 +3832,6 @@ dependencies = [ "exit-future 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/network/Cargo.toml b/network/Cargo.toml index 5405741170df..4e397a04db1b 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -31,4 +31,3 @@ wasm-timer = "0.2.4" [dev-dependencies] sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } -lazy_static = "1.0" diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index ac9cdbb02273..652f1cff01a6 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -33,6 +33,9 @@ use sp_state_machine::ChangesTrieState; use sp_core::{crypto::Pair, NativeOrEncoded, ExecutionContext}; use sp_keyring::Sr25519Keyring; +use futures::executor::LocalPool; +use futures::task::LocalSpawnExt; + #[derive(Default)] struct MockNetworkOps { recorded: Mutex, @@ -326,19 +329,13 @@ impl super::Service { } } -lazy_static::lazy_static! { - static ref EXECUTOR: futures::executor::ThreadPool = futures::executor::ThreadPool::builder() - .pool_size(1) - .create() - .unwrap(); -} - fn test_setup(config: Config) -> ( Service, MockGossip, - impl Future + Send + 'static, + LocalPool, + impl Future + 'static, ) { - let pool = EXECUTOR.clone(); + let pool = LocalPool::new(); let network_ops = Arc::new(MockNetworkOps::default()); let mock_gossip = MockGossip::default(); @@ -351,7 +348,7 @@ fn test_setup(config: Config) -> ( mock_gossip.clone(), api.clone(), worker_rx, - pool.clone(), + pool.spawner(), ); let service = Service { @@ -359,7 +356,7 @@ fn test_setup(config: Config) -> ( network_service: network_ops, }; - (service, mock_gossip, worker_task) + (service, mock_gossip, pool, worker_task) } #[test] @@ -380,15 +377,15 @@ fn router_inner_drop_sends_worker_message() { #[test] fn worker_task_shuts_down_when_sender_dropped() { - let (service, _gossip, worker_task) = test_setup(Config { collating_for: None }); + let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); drop(service); - let _ = futures::executor::block_on(worker_task); + let _ = pool.run_until(worker_task); } #[test] fn consensus_instances_cleaned_up() { - let (mut service, _gossip, worker_task) = test_setup(Config { collating_for: None }); + let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); let relay_parent = [0; 32].into(); let authorities = Vec::new(); @@ -401,30 +398,29 @@ fn consensus_instances_cleaned_up() { None, )); - let executor = EXECUTOR.clone(); - executor.spawn(worker_task).unwrap(); + pool.spawner().spawn_local(worker_task).unwrap(); - let router = futures::executor::block_on( + let router = pool.run_until( service.build_table_router(table, &authorities) ).unwrap(); drop(router); - assert!(futures::executor::block_on(service.synchronize(move |proto| { + assert!(pool.run_until(service.synchronize(move |proto| { !proto.consensus_instances.contains_key(&relay_parent) }))); } #[test] fn validator_peer_cleaned_up() { - let (mut service, _gossip, worker_task) = test_setup(Config { collating_for: None }); + let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); let peer = PeerId::random(); let validator_key = Sr25519Keyring::Alice.pair(); let validator_id = ValidatorId::from(validator_key.public()); - EXECUTOR.clone().spawn(worker_task).unwrap(); - futures::executor::block_on(async move { + pool.spawner().spawn_local(worker_task).unwrap(); + pool.run_until(async move { service.connect_peer(peer.clone(), Roles::AUTHORITY).await; service.peer_message(peer.clone(), Message::Status(Status { version: VERSION, @@ -469,7 +465,7 @@ fn validator_peer_cleaned_up() { #[test] fn validator_key_spillover_cleaned() { - let (mut service, _gossip, worker_task) = test_setup(Config { collating_for: None }); + let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); let peer = PeerId::random(); let make_validator_id = |ring: Sr25519Keyring| ValidatorId::from(ring.public()); @@ -483,8 +479,8 @@ fn validator_key_spillover_cleaned() { let keys = vec![key_a, key_b, key_c, key_d]; - EXECUTOR.clone().spawn(worker_task).unwrap(); - futures::executor::block_on(async move { + pool.spawner().spawn_local(worker_task).unwrap(); + pool.run_until(async move { service.connect_peer(peer.clone(), Roles::AUTHORITY).await; service.peer_message(peer.clone(), Message::Status(Status { version: VERSION, @@ -516,18 +512,18 @@ fn validator_key_spillover_cleaned() { #[test] fn erasure_fetch_drop_also_drops_gossip_sender() { - let (mut service, gossip, worker_task) = test_setup(Config { collating_for: None }); + let (service, gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); let candidate_hash = [1; 32].into(); let expected_index = 1; - let executor = EXECUTOR.clone(); - executor.spawn(worker_task).unwrap(); + let spawner = pool.spawner(); + spawner.spawn_local(worker_task).unwrap(); let topic = crate::erasure_coding_topic(&candidate_hash); let (mut gossip_tx, gossip_taken_rx) = gossip.add_gossip_stream(topic); - futures::executor::block_on(async move { + let test_work = async move { let chunk_listener = service.fetch_erasure_chunk( &candidate_hash, expected_index, @@ -537,7 +533,7 @@ fn erasure_fetch_drop_also_drops_gossip_sender() { // we will wait until this future has proceeded enough to start grabbing // messages from gossip, and then we will abort the future. let (chunk_listener, abort_handle) = future::abortable(chunk_listener); - executor.spawn(chunk_listener.map(|_| ())).unwrap(); + let handle = spawner.spawn_with_handle(chunk_listener).unwrap(); gossip_taken_rx.await.unwrap(); // gossip listener was taken. and is active. @@ -546,6 +542,9 @@ fn erasure_fetch_drop_also_drops_gossip_sender() { abort_handle.abort(); + // we must `await` this, otherwise context may never transfer over + // to the spawned `Abortable` future. + assert!(handle.await.is_err()); loop { // if dropping the sender leads to the gossip listener // being cleaned up, we will eventually be unable to send a message @@ -568,5 +567,7 @@ fn erasure_fetch_drop_also_drops_gossip_sender() { Ok(_) => continue, } } - }); + }; + + pool.run_until(test_work); }