diff --git a/Cargo.lock b/Cargo.lock index 227fd8968080..092806bab3bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4895,6 +4895,7 @@ dependencies = [ "futures-timer 3.0.2", "kv-log-macro", "log 0.4.11", + "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-primitives", diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index da19a26e0a92..e21cb9363146 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -17,6 +17,7 @@ async-trait = "0.1" [dev-dependencies] sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +polkadot-node-network-protocol = { path = "../network/protocol" } futures = { version = "0.3.5", features = ["thread-pool"] } futures-timer = "3.0.2" femme = "2.0.1" diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index d4931baa3306..185a0f5614ba 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -717,6 +717,10 @@ where let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } + if let Some(ref mut s) = self.bitfield_signing_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } @@ -733,7 +737,7 @@ where let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } - if let Some(ref mut s) = self.availability_distribution_subsystem.instance { + if let Some(ref mut s) = self.availability_store_subsystem.instance { let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } @@ -817,7 +821,7 @@ where async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { let mut update = ActiveLeavesUpdate::default(); - if let Some(parent) = self.active_leaves.take(&(block.parent_hash, block.number - 1)) { + if let Some(parent) = block.number.checked_sub(1).and_then(|number| self.active_leaves.take(&(block.parent_hash, number))) { update.deactivated.push(parent.0); } @@ -879,6 +883,10 @@ where s.tx.send(FromOverseer::Signal(signal.clone())).await?; } + if let Some(ref mut s) = self.bitfield_signing_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + if let Some(ref mut s) = self.provisioner_subsystem.instance { s.tx.send(FromOverseer::Signal(signal.clone())).await?; } @@ -1021,10 +1029,15 @@ fn spawn( #[cfg(test)] mod tests { + use std::sync::atomic; use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; use polkadot_primitives::v1::{BlockData, PoV}; use polkadot_subsystem::DummySubsystem; + use polkadot_subsystem::messages::RuntimeApiRequest; + + use polkadot_node_network_protocol::{PeerId, ReputationChange, NetworkBridgeEvent}; + use super::*; @@ -1492,4 +1505,206 @@ mod tests { } }); } + + #[derive(Clone)] + struct CounterSubsystem { + stop_signals_received: Arc, + signals_received: Arc, + msgs_received: Arc, + } + + impl CounterSubsystem { + fn new( + stop_signals_received: Arc, + signals_received: Arc, + msgs_received: Arc, + ) -> Self { + Self { + stop_signals_received, + signals_received, + msgs_received, + } + } + } + + impl Subsystem for CounterSubsystem + where + C: SubsystemContext, + M: Send, + { + fn start(self, mut ctx: C) -> SpawnedSubsystem { + SpawnedSubsystem { + name: "counter-subsystem", + future: Box::pin(async move { + loop { + match ctx.try_recv().await { + Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => { + self.stop_signals_received.fetch_add(1, atomic::Ordering::SeqCst); + break; + }, + Ok(Some(FromOverseer::Signal(_))) => { + self.signals_received.fetch_add(1, atomic::Ordering::SeqCst); + continue; + }, + Ok(Some(FromOverseer::Communication { .. })) => { + self.msgs_received.fetch_add(1, atomic::Ordering::SeqCst); + continue; + }, + Err(_) => (), + _ => (), + } + pending!(); + } + }), + } + } + } + + fn test_candidate_validation_msg() -> CandidateValidationMessage { + let (sender, _) = oneshot::channel(); + let pov = Arc::new(PoV { block_data: BlockData(Vec::new()) }); + CandidateValidationMessage::ValidateFromChainState(Default::default(), pov, sender) + } + + fn test_candidate_backing_msg() -> CandidateBackingMessage { + let (sender, _) = oneshot::channel(); + CandidateBackingMessage::GetBackedCandidates(Default::default(), sender) + } + + fn test_candidate_selection_msg() -> CandidateSelectionMessage { + CandidateSelectionMessage::default() + } + + fn test_chain_api_msg() -> ChainApiMessage { + let (sender, _) = oneshot::channel(); + ChainApiMessage::FinalizedBlockNumber(sender) + } + + fn test_collator_protocol_msg() -> CollatorProtocolMessage { + CollatorProtocolMessage::CollateOn(Default::default()) + } + + fn test_network_bridge_event() -> NetworkBridgeEvent { + NetworkBridgeEvent::PeerDisconnected(PeerId::random()) + } + + fn test_statement_distribution_msg() -> StatementDistributionMessage { + StatementDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event()) + } + + fn test_availability_distribution_msg() -> AvailabilityDistributionMessage { + AvailabilityDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event()) + } + + fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage { + BitfieldDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event()) + } + + fn test_provisioner_msg() -> ProvisionerMessage { + let (sender, _) = oneshot::channel(); + ProvisionerMessage::RequestInherentData(Default::default(), sender) + } + + fn test_pov_distribution_msg() -> PoVDistributionMessage { + PoVDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event()) + } + + fn test_runtime_api_msg() -> RuntimeApiMessage { + let (sender, _) = oneshot::channel(); + RuntimeApiMessage::Request(Default::default(), RuntimeApiRequest::Validators(sender)) + } + + fn test_availability_store_msg() -> AvailabilityStoreMessage { + let (sender, _) = oneshot::channel(); + AvailabilityStoreMessage::QueryAvailableData(Default::default(), sender) + } + + fn test_network_bridge_msg() -> NetworkBridgeMessage { + NetworkBridgeMessage::ReportPeer(PeerId::random(), ReputationChange::new(42, "")) + } + + // Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly. + #[test] + fn overseer_all_subsystems_receive_signals_and_messages() { + let spawner = sp_core::testing::TaskExecutor::new(); + + executor::block_on(async move { + let stop_signals_received = Arc::new(atomic::AtomicUsize::new(0)); + let signals_received = Arc::new(atomic::AtomicUsize::new(0)); + let msgs_received = Arc::new(atomic::AtomicUsize::new(0)); + + let subsystem = CounterSubsystem::new( + stop_signals_received.clone(), + signals_received.clone(), + msgs_received.clone(), + ); + + let all_subsystems = AllSubsystems { + candidate_validation: subsystem.clone(), + candidate_backing: subsystem.clone(), + candidate_selection: subsystem.clone(), + collator_protocol: subsystem.clone(), + statement_distribution: subsystem.clone(), + availability_distribution: subsystem.clone(), + bitfield_signing: subsystem.clone(), + bitfield_distribution: subsystem.clone(), + provisioner: subsystem.clone(), + pov_distribution: subsystem.clone(), + runtime_api: subsystem.clone(), + availability_store: subsystem.clone(), + network_bridge: subsystem.clone(), + chain_api: subsystem.clone(), + }; + let (overseer, mut handler) = Overseer::new( + vec![], + all_subsystems, + spawner, + ).unwrap(); + let overseer_fut = overseer.run().fuse(); + + pin_mut!(overseer_fut); + + // send a signal to each subsystem + handler.block_imported(BlockInfo { + hash: Default::default(), + parent_hash: Default::default(), + number: Default::default(), + }).await.unwrap(); + + // send a msg to each subsystem + // except for BitfieldSigning as the message is not instantiable + handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await.unwrap(); + handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await.unwrap(); + handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await.unwrap(); + handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await.unwrap(); + handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await.unwrap(); + handler.send_msg(AllMessages::AvailabilityDistribution(test_availability_distribution_msg())).await.unwrap(); + // handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await.unwrap(); + handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await.unwrap(); + handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await.unwrap(); + handler.send_msg(AllMessages::PoVDistribution(test_pov_distribution_msg())).await.unwrap(); + handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await.unwrap(); + handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await.unwrap(); + handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await.unwrap(); + handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await.unwrap(); + + // send a stop signal to each subsystems + handler.stop().await.unwrap(); + + select! { + res = overseer_fut => { + const NUM_SUBSYSTEMS: usize = 14; + + assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); + // x2 because of broadcast_signal on startup + assert_eq!(signals_received.load(atomic::Ordering::SeqCst), 2 * NUM_SUBSYSTEMS); + // -1 for BitfieldSigning + assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 1); + + assert!(res.is_ok()); + }, + complete => (), + } + }); + } }