Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
FromOverseer and ToOverseer msgs and stopping
  • Loading branch information
montekki committed May 29, 2020
commit 92989b403d5bd961fd0d3d8a457e2fe573c1448a
1 change: 1 addition & 0 deletions overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2018"
[dependencies]
futures = "0.3.5"
log = "0.4.8"
futures-timer = "3.0.2"

[dev-dependencies]
futures = { version = "0.3.5", features = ["thread-pool"] }
Expand Down
4 changes: 2 additions & 2 deletions overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Subsystem1 {
loop {
match ctx.try_recv().await {
Ok(Some(msg)) => {
log::info!("Subsystem1 received message {}", msg);
log::info!("Subsystem1 received message {:?}", msg);
}
Ok(None) => (),
Err(_) => {}
Expand Down Expand Up @@ -77,7 +77,7 @@ impl Subsystem2 {
loop {
match ctx.try_recv().await {
Ok(Some(msg)) => {
log::info!("Subsystem2 received message {}", msg);
log::info!("Subsystem2 received message {:?}", msg);
continue;
}
Ok(None) => { pending!(); }
Expand Down
101 changes: 71 additions & 30 deletions overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ use std::fmt::Debug;
use std::pin::Pin;
use std::collections::HashMap;
use std::task::Poll;
use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::{
pending, poll,
pending, poll, select,
future::{BoxFuture, RemoteHandle},
stream::FuturesUnordered,
task::{Spawn, SpawnExt},
Future, SinkExt, StreamExt,
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;

/// An error type that describes faults that may happen
///
Expand Down Expand Up @@ -101,7 +103,7 @@ const CHANNEL_CAPACITY: usize = 1024;
/// It is also generic over `I` that is entended to be a type identifying
/// different subsystems, again most likely this is a one large `enum`
/// covering all possible subsystem kinds.
enum OverseerMessage<M: Debug, I> {
enum ToOverseer<M: Debug, I> {
/// This is a message generated by a `Subsystem`.
/// Wraps the messge itself and has an optional `to` of
/// someone who can receive this message.
Expand Down Expand Up @@ -150,38 +152,53 @@ impl OverseerHandler {
}
}

impl<M: Debug, I: Debug> Debug for OverseerMessage<M, I> {
impl<M: Debug, I: Debug> Debug for ToOverseer<M, I> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OverseerMessage::SubsystemMessage { to, msg } => {
ToOverseer::SubsystemMessage { to, msg } => {
write!(f, "OverseerMessage::SubsystemMessage{{ to: {:?}, msg: {:?} }}", to, msg)
}
OverseerMessage::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)")
ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)")
}
}
}
} }

/// A running instance of some `Subsystem`.
struct SubsystemInstance<M: Debug, I> {
/// We talk to the `Overseer` over this channel.
rx: mpsc::Receiver<OverseerMessage<M, I>>,
rx: mpsc::Receiver<ToOverseer<M, I>>,
/// The `Overseer` talks to use over this channel.
tx: mpsc::Sender<M>,
tx: mpsc::Sender<FromOverseer<M>>,
}

/// A context type that is given to the `Subsystem` upon spawning
/// that can be used by `Subsystem` to communicate with the outside world.
pub struct SubsystemContext<M: Debug, I>{
rx: mpsc::Receiver<M>,
tx: mpsc::Sender<OverseerMessage<M, I>>,
rx: mpsc::Receiver<FromOverseer<M>>,
tx: mpsc::Sender<ToOverseer<M, I>>,
}

/// A signal used by `Overseer` to communicate with the `Subsystems`.
#[derive(Debug)]
pub enum OverseerSignal {
StartWork,
StopWork,
}

/// A message type that a `Subsystem` receives from the `Overseer`.
/// It wraps siglans from the `Oveseer` and messages that are communicated
/// between subsystems.
#[derive(Debug)]
pub enum FromOverseer<M: Debug> {
Signal(OverseerSignal),
Communication(M),
}

impl<M: Debug, I> SubsystemContext<M, I> {
/// Try to asyncronously receive a message.
///
/// This has to be used with caution, if you loop over this without
/// using `pending!()` macro you will end up with a busy loop!
pub async fn try_recv(&mut self) -> Result<Option<M>, ()> {
pub async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
match poll!(self.rx.next()) {
Poll::Ready(Some(msg)) => Ok(Some(msg)),
Poll::Ready(None) => Err(()),
Expand All @@ -190,7 +207,7 @@ impl<M: Debug, I> SubsystemContext<M, I> {
}

/// Receive a message.
pub async fn recv(&mut self) -> SubsystemResult<M> {
pub async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
self.rx.next().await.ok_or(SubsystemError)
}

Expand All @@ -199,7 +216,7 @@ impl<M: Debug, I> SubsystemContext<M, I> {
/// The message will be broadcasted to all other `Subsystem`s that can
/// receive it.
pub async fn broadcast_msg(&mut self, msg: M) {
let _ = self.tx.send(OverseerMessage::SubsystemMessage{
let _ = self.tx.send(ToOverseer::SubsystemMessage{
to: None,
msg,
}).await;
Expand All @@ -208,7 +225,7 @@ impl<M: Debug, I> SubsystemContext<M, I> {
/// Spawn a child `Subsystem` on the executor and get it's `I`d upon success.
pub async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.send(OverseerMessage::SpawnJob {
let _ = self.tx.send(ToOverseer::SpawnJob {
s,
res: tx,
}).await;
Expand All @@ -218,13 +235,13 @@ impl<M: Debug, I> SubsystemContext<M, I> {

/// Send a direct message to some other `Subsystem` you know the `I`d of.
pub async fn send_msg(&mut self, to: I, msg: M) {
let _ = self.tx.send(OverseerMessage::SubsystemMessage{
let _ = self.tx.send(ToOverseer::SubsystemMessage{
to: Some(to),
msg,
}).await;
}

fn new(rx: mpsc::Receiver<M>, tx: mpsc::Sender<OverseerMessage<M, I>>) -> Self {
fn new(rx: mpsc::Receiver<FromOverseer<M>>, tx: mpsc::Sender<ToOverseer<M, I>>) -> Self {
Self {
rx,
tx,
Expand Down Expand Up @@ -334,6 +351,29 @@ where
}
}

// Stop the overseer.
async fn stop(mut self) {
for s in self.subsystems.iter_mut() {
if let Some(ref mut s) = s.1.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::StopWork)).await;
}
}

let mut stop_delay = Delay::new(Duration::from_secs(1)).fuse();

loop {
select! {
_ = self.running_subsystems.next() => {
if self.running_subsystems.is_empty() {
break;
}
},
_ = stop_delay => break,
complete => break,
}
}
}

/// Run the `Overseer`.
// TODO: we have to
// * Give out to the user some handler to communicate with the `Overseer`
Expand All @@ -347,10 +387,7 @@ where

while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) {
if let Event::Stop = msg {
// TODO: We should send stop messages to all subsystems, join them
// and wait for some timeout for them to gracefully shutdown and then
// just drop their handlers.
return
return self.stop().await;
}
}

Expand All @@ -368,12 +405,12 @@ where
// TODO: this desperately need refactoring.
Copy link
Contributor

@drahnr drahnr May 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels slightly misplaced to do all of this here, should not the subsystem take care of processing individual Messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec stresses out that the overseer itself does all the message routing to "provide a single source of heartbeat in the system" or I don't remember the exact words, but that piece of doc contains some motivation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heartbeat is about doing this rough thing

let new_chain_head = chain_events.next().await;
future::join_all(subsystems.iter().map(|s| s.send(StartWork(...)))).await;
// thus the `StartWork` message is buffered for all subsystems before we send any further message to subsystems

for msg in msgs.into_iter() {
match msg.1 {
OverseerMessage::SubsystemMessage{ to, msg: m } => {
ToOverseer::SubsystemMessage{ to, msg: m } => {
match to {
Some(to) => {
if let Some(subsystem) = self.subsystems.get_mut(&to) {
if let Some(ref mut i) = subsystem.instance {
let _ = i.tx.send(m).await;
let _ = i.tx.send(FromOverseer::Communication(m)).await;
}
}
}
Expand All @@ -386,14 +423,14 @@ where

if s.subsystem.can_recv_msg(&m) {
if let Some(ref mut i) = s.instance {
let _ = i.tx.send(m.clone()).await;
let _ = i.tx.send(FromOverseer::Communication(m.clone())).await;
}
}
}
}
}
}
OverseerMessage::SpawnJob { s, res } => {
ToOverseer::SpawnJob { s, res } => {
log::info!("Spawn message");

let s = self.spawn_job(s);
Expand Down Expand Up @@ -466,11 +503,13 @@ mod tests {
SpawnedSubsystem(Box::pin(async move {
loop {
match ctx.recv().await {
Ok(msg) => {
Ok(FromOverseer::Communication(msg)) => {
let _ = sender.send(msg).await;
continue;
}
Ok(FromOverseer::Signal(OverseerSignal::StopWork)) => return,
Err(_) => return,
_ => (),
}
}
}))
Expand All @@ -490,6 +529,9 @@ mod tests {
continue;
}
match ctx.try_recv().await {
Ok(Some(FromOverseer::Signal(OverseerSignal::StopWork))) => {
break;
}
Ok(Some(_)) => {
continue;
}
Expand Down Expand Up @@ -537,9 +579,8 @@ mod tests {
// just stay around for longer
loop {
match ctx.try_recv().await {
Ok(Some(_)) => {
continue;
}
Ok(Some(FromOverseer::Signal(OverseerSignal::StopWork))) => break,
Ok(Some(_)) => continue,
Err(_) => return,
_ => (),
}
Expand Down