diff --git a/Cargo.lock b/Cargo.lock index a4f39046a5f5..ccf1ac310d18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1156,6 +1156,21 @@ dependencies = [ "libc", ] +[[package]] +name = "femme" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b6b21baebbed15551f2170010ca4101b9ed3fdc05822791c8bd4631840eab81" +dependencies = [ + "cfg-if", + "js-sys", + "log 0.4.8", + "serde", + "serde_derive", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "file-per-thread-logger" version = "0.1.3" @@ -2309,9 +2324,9 @@ dependencies = [ [[package]] name = "kv-log-macro" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a2d3beed37e5483887d81eb39de6de03a8346531410e1306ca48a9a89bd3a51" +checksum = "4ff57d6d215f7ca7eb35a9a64d656ba4d9d2bef114d741dc08048e75e2f5d418" dependencies = [ "log 0.4.8", ] @@ -3270,6 +3285,18 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" +[[package]] +name = "overseer" +version = "0.1.0" +dependencies = [ + "femme", + "futures 0.3.5", + "futures-timer 3.0.2", + "kv-log-macro", + "log 0.4.8", + "streamunordered", +] + [[package]] name = "owning_ref" version = "0.4.1" @@ -7155,6 +7182,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "streamunordered" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9394ee1338fee8370bee649f8a7170b3a56917903a0956467ad192dcf8699ca" +dependencies = [ + "futures-core", + "futures-sink", + "futures-util", + "slab", +] + [[package]] name = "string" version = "0.2.1" @@ -8297,6 +8336,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c7d40d09cdbf0f4895ae58cf57d92e1e57a9dd8ed2e8390514b54a47cc5551" dependencies = [ "cfg-if", + "serde", + "serde_json", "wasm-bindgen-macro", ] diff --git a/Cargo.toml b/Cargo.toml index 5baf461b565e..51dbcbac7d6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ members = [ "erasure-coding", "network", "network/test", + "overseer", "primitives", "runtime/common", "runtime/polkadot", diff --git a/overseer/Cargo.toml b/overseer/Cargo.toml new file mode 100644 index 000000000000..bcd0a8e9e529 --- /dev/null +++ b/overseer/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "overseer" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +futures = "0.3.5" +log = "0.4.8" +futures-timer = "3.0.2" +streamunordered = "0.5.1" + +[dev-dependencies] +futures = { version = "0.3.5", features = ["thread-pool"] } +futures-timer = "3.0.2" +femme = "2.0.1" +log = "0.4.8" +kv-log-macro = "1.0.6" diff --git a/overseer/examples/minimal-example.rs b/overseer/examples/minimal-example.rs new file mode 100644 index 000000000000..4dd37dbafea1 --- /dev/null +++ b/overseer/examples/minimal-example.rs @@ -0,0 +1,134 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Shows a basic usage of the `Overseer`: +//! * Spawning subsystems and subsystem child jobs +//! * Establishing message passing + +use std::time::Duration; +use futures::{ + pending, pin_mut, executor, select, stream, + FutureExt, StreamExt, +}; +use futures_timer::Delay; +use kv_log_macro as log; + +use overseer::{ + AllMessages, CandidateBackingSubsystemMessage, FromOverseer, + Overseer, Subsystem, SubsystemContext, SpawnedSubsystem, ValidationSubsystemMessage, +}; + +struct Subsystem1; + +impl Subsystem1 { + async fn run(mut ctx: SubsystemContext) { + loop { + match ctx.try_recv().await { + Ok(Some(msg)) => { + if let FromOverseer::Communication { msg } = msg { + log::info!("msg {:?}", msg); + } + continue; + } + Ok(None) => (), + Err(_) => { + log::info!("exiting"); + return; + } + } + + Delay::new(Duration::from_secs(1)).await; + ctx.send_msg(AllMessages::Validation( + ValidationSubsystemMessage::ValidityAttestation + )).await.unwrap(); + } + } +} + +impl Subsystem for Subsystem1 { + fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem { + SpawnedSubsystem(Box::pin(async move { + Self::run(ctx).await; + })) + } +} + +struct Subsystem2; + +impl Subsystem2 { + async fn run(mut ctx: SubsystemContext) { + ctx.spawn(Box::pin(async { + loop { + log::info!("Job tick"); + Delay::new(Duration::from_secs(1)).await; + } + })).await.unwrap(); + + loop { + match ctx.try_recv().await { + Ok(Some(msg)) => { + log::info!("Subsystem2 received message {:?}", msg); + continue; + } + Ok(None) => { pending!(); } + Err(_) => { + log::info!("exiting"); + return; + }, + } + } + } +} + +impl Subsystem for Subsystem2 { + fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem { + SpawnedSubsystem(Box::pin(async move { + Self::run(ctx).await; + })) + } +} + +fn main() { + femme::with_level(femme::LevelFilter::Trace); + let spawner = executor::ThreadPool::new().unwrap(); + + futures::executor::block_on(async { + let timer_stream = stream::repeat(()).then(|_| async { + Delay::new(Duration::from_secs(1)).await; + }); + + let (overseer, _handler) = Overseer::new( + Box::new(Subsystem2), + Box::new(Subsystem1), + spawner, + ).unwrap(); + let overseer_fut = overseer.run().fuse(); + let timer_stream = timer_stream; + + pin_mut!(timer_stream); + pin_mut!(overseer_fut); + + loop { + select! { + _ = overseer_fut => break, + _ = timer_stream.next() => { + log::info!("tick"); + } + complete => break, + } + } + }); +} diff --git a/overseer/src/lib.rs b/overseer/src/lib.rs new file mode 100644 index 000000000000..f7ac6cac5079 --- /dev/null +++ b/overseer/src/lib.rs @@ -0,0 +1,768 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! # Overseer +//! +//! `overseer` implements the Overseer architecture described in the +//! [implementors-guide](https://github.com/paritytech/polkadot/blob/master/roadmap/implementors-guide/guide.md). +//! For the motivations behind implementing the overseer itself you should +//! check out that guide, documentation in this crate will be mostly discussing +//! technical stuff. +//! +//! An `Overseer` is something that allows spawning/stopping and overseing +//! asynchronous tasks as well as establishing a well-defined and easy to use +//! protocol that the tasks can use to communicate with each other. It is desired +//! that this protocol is the only way tasks communicate with each other, however +//! at this moment there are no foolproof guards against other ways of communication. +//! +//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that +//! share the same behavior from `Overseer`'s point of view. +//! +//! ```text +//! +-----------------------------+ +//! | Overseer | +//! +-----------------------------+ +//! +//! ................| Overseer "holds" these and uses |.............. +//! . them to (re)start things . +//! . . +//! . +-------------------+ +---------------------+ . +//! . | Subsystem1 | | Subsystem2 | . +//! . +-------------------+ +---------------------+ . +//! . | | . +//! .................................................................. +//! | | +//! start() start() +//! V V +//! ..................| Overseer "runs" these |....................... +//! . +--------------------+ +---------------------+ . +//! . | SubsystemInstance1 | | SubsystemInstance2 | . +//! . +--------------------+ +---------------------+ . +//! .................................................................. +//! ``` + +use std::fmt::Debug; +use std::pin::Pin; +use std::task::Poll; +use std::time::Duration; + +use futures::channel::{mpsc, oneshot}; +use futures::{ + pending, poll, select, + future::{BoxFuture, RemoteHandle}, + stream::FuturesUnordered, + task::{Spawn, SpawnError, SpawnExt}, + Future, FutureExt, SinkExt, StreamExt, +}; +use futures_timer::Delay; +use streamunordered::{StreamYield, StreamUnordered}; + +/// An error type that describes faults that may happen +/// +/// These are: +/// * Channels being closed +/// * Subsystems dying when they are not expected to +/// * Subsystems not dying when they are told to die +/// * etc. +#[derive(Debug)] +pub struct SubsystemError; + +impl From for SubsystemError { + fn from(_: mpsc::SendError) -> Self { + Self + } +} + +impl From for SubsystemError { + fn from(_: oneshot::Canceled) -> Self { + Self + } +} + +impl From for SubsystemError { + fn from(_: SpawnError) -> Self { + Self + } +} + +/// A `Result` type that wraps [`SubsystemError`]. +/// +/// [`SubsystemError`]: struct.SubsystemError.html +pub type SubsystemResult = Result; + +/// An asynchronous subsystem task that runs inside and being overseen by the [`Overseer`]. +/// +/// In essence it's just a newtype wrapping a `BoxFuture`. +/// +/// [`Overseer`]: struct.Overseer.html +pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>); + +// A capacity of bounded channels inside the overseer. +const CHANNEL_CAPACITY: usize = 1024; +// A graceful `Overseer` teardown time delay. +const STOP_DELAY: u64 = 1; + +/// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. +/// +/// It wraps a system-wide [`AllMessages`] type that represents all possible +/// messages in the system. +/// +/// [`AllMessages`]: enum.AllMessages.html +/// [`Subsystem`]: trait.Subsystem.html +/// [`Overseer`]: struct.Overseer.html +enum ToOverseer { + /// This is a message sent by a `Subsystem`. + SubsystemMessage(AllMessages), + + /// A message that wraps something the `Subsystem` is desiring to + /// spawn on the overseer and a `oneshot::Sender` to signal the result + /// of the spawn. + SpawnJob { + s: BoxFuture<'static, ()>, + res: oneshot::Sender>, + }, +} + +/// Some event from outer world. +enum Event { + BlockImport, + BlockFinalized, + MsgToSubsystem(AllMessages), + Stop, +} + +/// Some message that is sent from one of the `Subsystem`s to the outside world. +pub enum OutboundMessage { + SubsystemMessage { + msg: AllMessages, + } +} + +/// A handler used to communicate with the [`Overseer`]. +/// +/// [`Overseer`]: struct.Overseer.html +pub struct OverseerHandler { + events_tx: mpsc::Sender, +} + +impl OverseerHandler { + /// Inform the `Overseer` that that some block was imported. + pub async fn block_imported(&mut self) -> SubsystemResult<()> { + self.events_tx.send(Event::BlockImport).await?; + + Ok(()) + } + + /// Send some message to one of the `Subsystem`s. + pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> { + self.events_tx.send(Event::MsgToSubsystem(msg)).await?; + + Ok(()) + } + + /// Inform the `Overseer` that that some block was finalized. + pub async fn block_finalized(&mut self) -> SubsystemResult<()> { + self.events_tx.send(Event::BlockFinalized).await?; + + Ok(()) + } + + /// Tell `Overseer` to shutdown. + pub async fn stop(&mut self) -> SubsystemResult<()> { + self.events_tx.send(Event::Stop).await?; + + Ok(()) + } +} + +impl Debug for ToOverseer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ToOverseer::SubsystemMessage(msg) => { + write!(f, "OverseerMessage::SubsystemMessage({:?})", msg) + } + ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)") + } + } +} + +/// A running instance of some [`Subsystem`]. +/// +/// [`Subsystem`]: trait.Subsystem.html +struct SubsystemInstance { + tx: mpsc::Sender>, +} + +/// A context type that is given to the [`Subsystem`] upon spawning. +/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s +/// or to spawn it's [`SubsystemJob`]s. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`Subsystem`]: trait.Subsystem.html +/// [`SubsystemJob`]: trait.SubsystemJob.html +pub struct SubsystemContext{ + rx: mpsc::Receiver>, + tx: mpsc::Sender, +} + +/// A signal used by [`Overseer`] to communicate with the [`Subsystem`]s. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`Subsystem`]: trait.Subsystem.html +#[derive(Debug)] +pub enum OverseerSignal { + /// `Subsystem` should start working. + StartWork, + /// `Subsystem` should stop working. + StopWork, + /// Conclude the work of the `Overseer` and all `Subsystem`s. + Conclude, +} + +#[derive(Debug)] +/// A message type used by the Validation [`Subsystem`]. +/// +/// [`Subsystem`]: trait.Subsystem.html +pub enum ValidationSubsystemMessage { + ValidityAttestation, +} + +#[derive(Debug)] +/// A message type used by the CandidateBacking [`Subsystem`]. +/// +/// [`Subsystem`]: trait.Subsystem.html +pub enum CandidateBackingSubsystemMessage { + RegisterBackingWatcher, + Second, +} + +/// A message type tying together all message types that are used across [`Subsystem`]s. +/// +/// [`Subsystem`]: trait.Subsystem.html +#[derive(Debug)] +pub enum AllMessages { + Validation(ValidationSubsystemMessage), + CandidateBacking(CandidateBackingSubsystemMessage), +} + +/// A message type that a [`Subsystem`] receives from the [`Overseer`]. +/// It wraps siglans from the [`Overseer`] and messages that are circulating +/// between subsystems. +/// +/// It is generic over over the message type `M` that a particular `Subsystem` may use. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`Subsystem`]: trait.Subsystem.html +#[derive(Debug)] +pub enum FromOverseer { + /// Signal from the `Overseer`. + Signal(OverseerSignal), + + /// Some other `Subsystem`'s message. + Communication { + msg: M, + }, +} + +impl SubsystemContext { + /// Try to asyncronously receive a message. + /// + /// This has to be used with caution, if you loop over this without + /// using `pending!()` macro you will end up with a busy loop! + pub async fn try_recv(&mut self) -> Result>, ()> { + match poll!(self.rx.next()) { + Poll::Ready(Some(msg)) => Ok(Some(msg)), + Poll::Ready(None) => Err(()), + Poll::Pending => Ok(None), + } + } + + /// Receive a message. + pub async fn recv(&mut self) -> SubsystemResult> { + self.rx.next().await.ok_or(SubsystemError) + } + + /// Spawn a child task on the executor. + pub async fn spawn(&mut self, s: Pin + Send>>) -> SubsystemResult<()> { + let (tx, rx) = oneshot::channel(); + self.tx.send(ToOverseer::SpawnJob { + s, + res: tx, + }).await?; + + rx.await? + } + + /// Send a direct message to some other `Subsystem`, routed based on message type. + pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> { + self.tx.send(ToOverseer::SubsystemMessage(msg)).await?; + + Ok(()) + } + + fn new(rx: mpsc::Receiver>, tx: mpsc::Sender) -> Self { + Self { + rx, + tx, + } + } +} + +/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`]. +/// +/// It is generic over the message type circulating in the system. +/// The idea that we want some type contaning persistent state that +/// can spawn actually running subsystems when asked to. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`Subsystem`]: trait.Subsystem.html +pub trait Subsystem { + /// Start this `Subsystem` and return `SpawnedSubsystem`. + fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem; +} + +/// A subsystem that we oversee. +/// +/// Ties together the [`Subsystem`] itself and it's running instance +/// (which may be missing if the [`Subsystem`] is not running at the moment +/// for whatever reason). +/// +/// [`Subsystem`]: trait.Subsystem.html +#[allow(dead_code)] +struct OverseenSubsystem { + subsystem: Box + Send>, + instance: Option>, +} + +/// The `Overseer` itself. +pub struct Overseer { + /// A validation subsystem + validation_subsystem: OverseenSubsystem, + + /// A candidate backing subsystem + candidate_backing_subsystem: OverseenSubsystem, + + /// Spawner to spawn tasks to. + s: S, + + /// Here we keep handles to spawned subsystems to be notified when they terminate. + running_subsystems: FuturesUnordered>, + + /// Gather running subsystms' outbound streams into one. + running_subsystems_rx: StreamUnordered>, + + /// Events that are sent to the overseer from the outside world + events_rx: mpsc::Receiver, +} + +impl Overseer +where + S: Spawn, +{ + /// Create a new intance of the `Overseer` with a fixed set of [`Subsystem`]s. + /// + /// Each [`Subsystem`] is passed to this function as an explicit parameter + /// and is supposed to implement some interface that is generic over message type + /// that is specific to this [`Subsystem`]. At the moment there are only two + /// subsystems: + /// * Validation + /// * CandidateBacking + /// + /// As any entity that satisfies the interface may act as a [`Subsystem`] this allows + /// mocking in the test code: + /// + /// ```text + /// +------------------------------------+ + /// | Overseer | + /// +------------------------------------+ + /// / | | \ + /// ................. subsystems................................... + /// . +-----------+ +-----------+ +----------+ +---------+ . + /// . | | | | | | | | . + /// . +-----------+ +-----------+ +----------+ +---------+ . + /// ............................................................... + /// | + /// probably `spawn` + /// a `job` + /// | + /// V + /// +-----------+ + /// | | + /// +-----------+ + /// + /// ``` + /// + /// [`Subsystem`]: trait.Subsystem.html + /// + /// # Example + /// + /// The [`Subsystems`] may be any type as long as they implement an expected interface. + /// Here, we create two mock subsystems and start the `Overseer` with them. For the sake + /// of simplicity the termination of the example is done with a timeout. + /// ``` + /// # use std::time::Duration; + /// # use futures::{executor, pin_mut, select, FutureExt}; + /// # use futures_timer::Delay; + /// # use overseer::{ + /// # Overseer, Subsystem, SpawnedSubsystem, SubsystemContext, + /// # ValidationSubsystemMessage, CandidateBackingSubsystemMessage, + /// # }; + /// + /// struct ValidationSubsystem; + /// impl Subsystem for ValidationSubsystem { + /// fn start( + /// &mut self, + /// mut ctx: SubsystemContext, + /// ) -> SpawnedSubsystem { + /// SpawnedSubsystem(Box::pin(async move { + /// loop { + /// Delay::new(Duration::from_secs(1)).await; + /// } + /// })) + /// } + /// } + /// + /// struct CandidateBackingSubsystem; + /// impl Subsystem for CandidateBackingSubsystem { + /// fn start( + /// &mut self, + /// mut ctx: SubsystemContext, + /// ) -> SpawnedSubsystem { + /// SpawnedSubsystem(Box::pin(async move { + /// loop { + /// Delay::new(Duration::from_secs(1)).await; + /// } + /// })) + /// } + /// } + /// + /// # fn main() { executor::block_on(async move { + /// let spawner = executor::ThreadPool::new().unwrap(); + /// let (overseer, _handler) = Overseer::new( + /// Box::new(ValidationSubsystem), + /// Box::new(CandidateBackingSubsystem), + /// spawner, + /// ).unwrap(); + /// + /// let timer = Delay::new(Duration::from_millis(50)).fuse(); + /// + /// let overseer_fut = overseer.run().fuse(); + /// pin_mut!(timer); + /// pin_mut!(overseer_fut); + /// + /// select! { + /// _ = overseer_fut => (), + /// _ = timer => (), + /// } + /// # + /// # }); } + /// ``` + pub fn new( + validation: Box + Send>, + candidate_backing: Box + Send>, + mut s: S, + ) -> SubsystemResult<(Self, OverseerHandler)> { + let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); + + let handler = OverseerHandler { + events_tx: events_tx.clone(), + }; + + let mut running_subsystems_rx = StreamUnordered::new(); + let mut running_subsystems = FuturesUnordered::new(); + + let validation_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + validation, + )?; + + let candidate_backing_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + candidate_backing, + )?; + + let this = Self { + validation_subsystem, + candidate_backing_subsystem, + s, + running_subsystems, + running_subsystems_rx, + events_rx, + }; + + Ok((this, handler)) + } + + // Stop the overseer. + async fn stop(mut self) { + if let Some(ref mut s) = self.validation_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.candidate_backing_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse(); + + loop { + select! { + _ = self.running_subsystems.next() => { + if self.running_subsystems.is_empty() { + break; + } + }, + _ = stop_delay => break, + complete => break, + } + } + } + + /// Run the `Overseer`. + pub async fn run(mut self) -> SubsystemResult<()> { + loop { + while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) { + match msg { + Event::MsgToSubsystem(msg) => { + self.route_message(msg).await; + } + Event::Stop => { + self.stop().await; + return Ok(()); + } + _ => () + } + } + + while let Poll::Ready(Some((StreamYield::Item(msg), _))) = poll!( + &mut self.running_subsystems_rx.next() + ) { + match msg { + ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await, + ToOverseer::SpawnJob { s, res } => { + let s = self.spawn_job(s); + + let _ = res.send(s); + } + } + } + + // Some subsystem exited? It's time to panic. + if let Poll::Ready(Some(finished)) = poll!(self.running_subsystems.next()) { + log::error!("Subsystem finished unexpectedly {:?}", finished); + self.stop().await; + return Err(SubsystemError); + } + + // Looks like nothing is left to be polled, let's take a break. + pending!(); + } + } + + async fn route_message(&mut self, msg: AllMessages) { + match msg { + AllMessages::Validation(msg) => { + if let Some(ref mut s) = self.validation_subsystem.instance { + let _= s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::CandidateBacking(msg) => { + if let Some(ref mut s) = self.candidate_backing_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + } + } + + + fn spawn_job(&mut self, j: BoxFuture<'static, ()>) -> SubsystemResult<()> { + self.s.spawn(j).map_err(|_| SubsystemError) + } +} + +fn spawn( + spawner: &mut S, + futures: &mut FuturesUnordered>, + streams: &mut StreamUnordered>, + mut s: Box + Send>, +) -> SubsystemResult> { + let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY); + let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY); + let ctx = SubsystemContext::new(to_rx, from_tx); + let f = s.start(ctx); + + let handle = spawner.spawn_with_handle(f.0)?; + + streams.push(from_rx); + futures.push(handle); + + let instance = Some(SubsystemInstance { + tx: to_tx, + }); + + Ok(OverseenSubsystem { + subsystem: s, + instance, + }) +} + +#[cfg(test)] +mod tests { + use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; + use super::*; + + struct TestSubsystem1(mpsc::Sender); + + impl Subsystem for TestSubsystem1 { + fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + let mut sender = self.0.clone(); + SpawnedSubsystem(Box::pin(async move { + let mut i = 0; + loop { + match ctx.recv().await { + Ok(FromOverseer::Communication { .. }) => { + let _ = sender.send(i).await; + i += 1; + continue; + } + Ok(FromOverseer::Signal(OverseerSignal::StopWork)) => return, + Err(_) => return, + _ => (), + } + } + })) + } + } + + struct TestSubsystem2(mpsc::Sender); + + impl Subsystem for TestSubsystem2 { + fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + SpawnedSubsystem(Box::pin(async move { + let mut c: usize = 0; + loop { + if c < 10 { + ctx.send_msg( + AllMessages::Validation( + ValidationSubsystemMessage::ValidityAttestation + ) + ).await.unwrap(); + c += 1; + continue; + } + match ctx.try_recv().await { + Ok(Some(FromOverseer::Signal(OverseerSignal::StopWork))) => { + break; + } + Ok(Some(_)) => { + continue; + } + Err(_) => return, + _ => (), + } + pending!(); + } + })) + } + } + + struct TestSubsystem4; + + impl Subsystem for TestSubsystem4 { + fn start(&mut self, mut _ctx: SubsystemContext) -> SpawnedSubsystem { + SpawnedSubsystem(Box::pin(async move { + // Do nothing and exit. + })) + } + } + + // Checks that a minimal configuration of two jobs can run and exchange messages. + #[test] + fn overseer_works() { + let spawner = executor::ThreadPool::new().unwrap(); + + executor::block_on(async move { + let (s1_tx, mut s1_rx) = mpsc::channel(64); + let (s2_tx, mut s2_rx) = mpsc::channel(64); + + let (overseer, mut handler) = Overseer::new( + Box::new(TestSubsystem1(s1_tx)), + Box::new(TestSubsystem2(s2_tx)), + spawner, + ).unwrap(); + let overseer_fut = overseer.run().fuse(); + + pin_mut!(overseer_fut); + + let mut s1_results = Vec::new(); + let mut s2_results = Vec::new(); + + loop { + select! { + a = overseer_fut => break, + s1_next = s1_rx.next() => { + match s1_next { + Some(msg) => { + s1_results.push(msg); + if s1_results.len() == 10 { + handler.stop().await.unwrap(); + } + } + None => break, + } + }, + s2_next = s2_rx.next() => { + match s2_next { + Some(msg) => s2_results.push(s2_next), + None => break, + } + }, + complete => break, + } + } + + assert_eq!(s1_results, (0..10).collect::>()); + }); + } + + // Spawn a subsystem that immediately exits. + // + // Should immediately conclude the overseer itself with an error. + #[test] + fn overseer_panics_on_sybsystem_exit() { + let spawner = executor::ThreadPool::new().unwrap(); + + executor::block_on(async move { + let (s1_tx, _) = mpsc::channel(64); + let (overseer, _handle) = Overseer::new( + Box::new(TestSubsystem1(s1_tx)), + Box::new(TestSubsystem4), + spawner, + ).unwrap(); + let overseer_fut = overseer.run().fuse(); + pin_mut!(overseer_fut); + + select! { + res = overseer_fut => assert!(res.is_err()), + complete => (), + } + }) + } +}