Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Allow communication between subsystems and outside world
  • Loading branch information
montekki committed May 30, 2020
commit 564964c1a73cc854cd8372b3d228b63f48cdfa6d
24 changes: 21 additions & 3 deletions overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! * Establishing message passing

use std::time::Duration;
use futures::{pending, executor};
use futures::{pending, pin_mut, executor, select, stream, FutureExt, StreamExt};
use futures_timer::Delay;
use kv_log_macro as log;

Expand Down Expand Up @@ -132,7 +132,25 @@ fn main() {
(SubsystemId::Subsystem2, Box::new(Subsystem2::new())),
];

let overseer = Overseer::new(subsystems, spawner);
overseer.run().await;
let timer_stream = stream::repeat(()).then(|_| async {
Delay::new(Duration::from_secs(1)).await;
});

let (overseer, mut handler) = Overseer::new(subsystems, spawner);
let overseer_fut = overseer.run().fuse();
let timer_stream = timer_stream;

pin_mut!(timer_stream);
pin_mut!(overseer_fut);

loop {
select! {
_ = overseer_fut => break,
_ = timer_stream.next() => {
handler.send_to_subsystem(SubsystemId::Subsystem1, 42usize).await.unwrap();
}
complete => break,
}
}
});
}
114 changes: 82 additions & 32 deletions overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ enum ToOverseer<M: Debug, I> {
/// If that `to` is present the message will be targetedly sent to the intended
/// receiver. The most obvious use case of this is communicating with children.
SubsystemMessage {
to: I,
to: Option<I>,
msg: M,
},
/// A message that wraps something the `Subsystem` is desiring to
Expand All @@ -143,28 +143,51 @@ enum ToOverseer<M: Debug, I> {
},
}

enum Event {
/// Some event from outer world.
enum Event<M, I> {
BlockImport,
BlockFinalized,
MsgToSubsystem {
msg: M,
to: I,
},
Stop,
}

/// Some message that is sent from one of the `Subsystem`s to the outside world.
pub enum OutboundMessage<M, I> {
SubsystemMessage {
msg: M,
from: I,
}
}

/// A handler used to communicate with the [`Overseer`].
///
/// [`Overseer`]: struct.Overseer.html
#[derive(Clone)]
pub struct OverseerHandler {
events_tx: mpsc::Sender<Event>,
pub struct OverseerHandler<M, I> {
events_tx: mpsc::Sender<Event<M, I>>,
outside_rx: mpsc::Receiver<OutboundMessage<M, I>>,
}

impl OverseerHandler {
impl<M, I> OverseerHandler<M, I> {
/// Inform the `Overseer` that that some block was imported.
pub async fn block_imported(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockImport).await?;

Ok(())
}

/// Send some message to one of the `Subsystem`s.
pub async fn send_to_subsystem(&mut self, to: I, msg: M) -> SubsystemResult<()> {
self.events_tx.send(Event::MsgToSubsystem {
msg,
to,
}).await?;

Ok(())
}

/// Inform the `Overseer` that that some block was finalized.
pub async fn block_finalized(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockFinalized).await?;
Expand All @@ -178,6 +201,10 @@ impl OverseerHandler {

Ok(())
}

pub async fn recv_msg(&mut self) -> Option<OutboundMessage<M, I>> {
self.outside_rx.next().await
}
}

impl<M: Debug, I: Debug> Debug for ToOverseer<M, I> {
Expand Down Expand Up @@ -273,7 +300,17 @@ impl<M: Debug, I> SubsystemContext<M, I> {
/// Send a direct message to some other `Subsystem` you know the `I`d of.
pub async fn send_msg(&mut self, to: I, msg: M) -> SubsystemResult<()> {
self.tx.send(ToOverseer::SubsystemMessage{
to,
to: Some(to),
msg,
}).await?;

Ok(())
}

/// Send a message to some entity that resides outside of the `Overseer`.
pub async fn send_msg_outside(&mut self, msg: M) -> SubsystemResult<()> {
self.tx.send(ToOverseer::SubsystemMessage {
to: None,
msg,
}).await?;

Expand Down Expand Up @@ -333,10 +370,10 @@ pub struct Overseer<M: Debug, S: Spawn, I> {
channel_capacity: usize,

/// Events that are sent to the overseer from the outside world
events_rx: mpsc::Receiver<Event>,
events_rx: mpsc::Receiver<Event<M, I>>,

/// A sender for the `events_rx`, used to return `OverseerHandler` to the user.
events_tx: mpsc::Sender<Event>,
/// A sender to send things to the outside
outside_tx: mpsc::Sender<OutboundMessage<M, I>>,
}

impl<M, S, I> Overseer<M, S, I>
Expand Down Expand Up @@ -370,33 +407,31 @@ where
/// ```
///
/// [`Subsystem`]: trait.Subsystem.html
pub fn new<T: IntoIterator<Item = (I, Box<dyn Subsystem<M, I> + Send>)>>(subsystems: T, s: S) -> Self {
pub fn new<T>(subsystems: T, s: S) -> (Self, OverseerHandler<M, I>)
where
T: IntoIterator<Item = (I, Box<dyn Subsystem<M, I> + Send>)> {
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (outside_tx, outside_rx) = mpsc::channel(CHANNEL_CAPACITY);

let handler = OverseerHandler {
events_tx: events_tx.clone(),
outside_rx,
};

let mut this = Self {
subsystems: HashMap::new(),
s,
running_subsystems: FuturesUnordered::new(),
channel_capacity: CHANNEL_CAPACITY,
events_rx,
events_tx,
outside_tx,
};

for s in subsystems.into_iter() {
let _ = this.spawn(s);
}

this
}

/// Get the [`OverseerHandler`] to communicate with the overseer.
///
/// [`OverseerHandler`]: struct.OverseerHandler.html
pub fn handler(&mut self) -> OverseerHandler {
let events_tx = self.events_tx.clone();

OverseerHandler {
events_tx,
}
(this, handler)
}

// Stop the overseer.
Expand Down Expand Up @@ -430,8 +465,19 @@ where
let mut msgs = Vec::default();

while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) {
if let Event::Stop = msg {
return self.stop().await;
match msg {
Event::MsgToSubsystem { msg, to } => {
if let Some(subsystem) = self.subsystems.get_mut(&to) {
if let Some(ref mut i) = subsystem.instance {
let _ = i.tx.send(FromOverseer::Communication {
msg,
from: None,
}).await;
}
}
}
Event::Stop => return self.stop().await,
_ => ()
}
}

Expand All @@ -447,7 +493,7 @@ where
// Do the message dispatching be it broadcasting or direct messages.
for msg in msgs.into_iter() {
match msg.1 {
ToOverseer::SubsystemMessage{ to, msg: m } => {
ToOverseer::SubsystemMessage { to: Some(to), msg: m } => {
if let Some(subsystem) = self.subsystems.get_mut(&to) {
if let Some(ref mut i) = subsystem.instance {
let _ = i.tx.send(FromOverseer::Communication {
Expand All @@ -457,6 +503,12 @@ where
}
}
}
ToOverseer::SubsystemMessage { msg: m, .. } => {
let _ = self.outside_tx.send(OutboundMessage::SubsystemMessage {
msg: m,
from: msg.0,
}).await;
}
ToOverseer::SpawnJob { s, res } => {
let s = self.spawn_job(s);

Expand Down Expand Up @@ -641,8 +693,7 @@ mod tests {
(SubsystemId::Subsystem1, Box::new(TestSubsystem1(s1_tx))),
(SubsystemId::Subsystem2, Box::new(TestSubsystem2(s2_tx))),
];
let mut overseer = Overseer::new(subsystems, spawner);
let mut handler = overseer.handler();
let (overseer, mut handler) = Overseer::new(subsystems, spawner);
let overseer_fut = overseer.run().fuse();

pin_mut!(overseer_fut);
Expand Down Expand Up @@ -688,8 +739,7 @@ mod tests {
let subsystems: Vec<(SubsystemId, Box<dyn Subsystem<usize, SubsystemId> + Send>)> = vec![
(SubsystemId::Subsystem3, Box::new(TestSubsystem3(Some(tx)))),
];
let mut overseer = Overseer::new(subsystems, spawner);
let mut handler = overseer.handler();
let (overseer, mut handler) = Overseer::new(subsystems, spawner);
let overseer_fut = overseer.run().fuse();

let mut rx = rx.fuse();
Expand Down Expand Up @@ -722,7 +772,7 @@ mod tests {
(SubsystemId::Subsystem4, Box::new(TestSubsystem4)),
];

let overseer = Overseer::new(subsystems, spawner);
let (overseer, _) = Overseer::new(subsystems, spawner);
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);

Expand Down