Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Gather messages with StreamUnordered
  • Loading branch information
montekki committed Jun 1, 2020
commit d55fd6e6dab831c27d688dd89e50e3cf0c675e31
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2018"
futures = "0.3.5"
log = "0.4.8"
futures-timer = "3.0.2"
streamunordered = "0.5.1"

[dev-dependencies]
futures = { version = "0.3.5", features = ["thread-pool"] }
Expand Down
85 changes: 40 additions & 45 deletions overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use futures::{
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use streamunordered::{StreamYield, StreamUnordered};

/// An error type that describes faults that may happen
///
Expand Down Expand Up @@ -207,7 +208,6 @@ impl Debug for ToOverseer {
///
/// [`Subsystem`]: trait.Subsystem.html
struct SubsystemInstance<M: Debug> {
rx: mpsc::Receiver<ToOverseer>,
tx: mpsc::Sender<FromOverseer<M>>,
}

Expand Down Expand Up @@ -360,6 +360,9 @@ pub struct Overseer<S: Spawn> {
/// Here we keep handles to spawned subsystems to be notified when they terminate.
running_subsystems: FuturesUnordered<RemoteHandle<()>>,

/// Gather running subsystms' outbound streams into one.
running_subsystems_rx: StreamUnordered<mpsc::Receiver<ToOverseer>>,

/// Events that are sent to the overseer from the outside world
events_rx: mpsc::Receiver<Event>,
}
Expand Down Expand Up @@ -404,16 +407,29 @@ where
events_tx: events_tx.clone(),
};

let (h_v, validation_subsystem) = Self::spawn(&mut s, validation).unwrap();
let (h_c, candidate_backing_subsystem) = Self::spawn(&mut s, candidate_backing).unwrap();
let mut running_subsystems_rx = StreamUnordered::new();
let mut running_subsystems = FuturesUnordered::new();

let validation_subsystem = Self::spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
validation,
);

let running_subsystems = vec![h_v, h_c].into_iter().collect();
let candidate_backing_subsystem = Self::spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
candidate_backing,
);

let this = Self {
validation_subsystem,
candidate_backing_subsystem,
s,
running_subsystems,
running_subsystems_rx,
events_rx,
};

Expand Down Expand Up @@ -448,49 +464,29 @@ where
/// Run the `Overseer`.
pub async fn run(mut self) {
loop {
// Upon iteration of the loop we will be collecting all the messages
// that need dispatching (if any).
let mut msgs = Vec::new();
let mut to_spawn = Vec::new();

while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) {
match msg {
Event::MsgToSubsystem(msg) => {
msgs.push(msg);
self.route_message(msg).await;
}
Event::Stop => return self.stop().await,
_ => ()
}
}

if let Some(s) = &mut self.candidate_backing_subsystem.instance {
while let Poll::Ready(Some(msg)) = poll!(&mut s.rx.next()) {
match msg {
ToOverseer::SubsystemMessage(msg) => msgs.push(msg),
ToOverseer::SpawnJob { s, res } => to_spawn.push((s, res)),
}
}
}
while let Poll::Ready(Some((StreamYield::Item(msg), _))) = poll!(
&mut self.running_subsystems_rx.next()
) {
match msg {
ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await,
ToOverseer::SpawnJob { s, res } => {
let s = self.spawn_job(s);

if let Some(s) = &mut self.validation_subsystem.instance {
while let Poll::Ready(Some(msg)) = poll!(&mut s.rx.next()) {
match msg {
ToOverseer::SubsystemMessage(msg) => msgs.push(msg),
ToOverseer::SpawnJob { s, res } => to_spawn.push((s, res)),
let _ = res.send(s);
}
}
}

for msg in msgs.into_iter() {
self.route_message(msg).await;
}

for msg in to_spawn.into_iter() {
let s = self.spawn_job(msg.0);

let _ = msg.1.send(s);
}

// Some subsystem exited? It's time to panic.
if let Poll::Ready(Some(finished)) = poll!(self.running_subsystems.next()) {
panic!("Subsystem finished unexpectedly {:?}", finished);
Expand All @@ -505,7 +501,7 @@ where
match msg {
AllMessages::Validation(msg) => {
if let Some(ref mut s) = self.validation_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
let _= s.tx.send(FromOverseer::Communication { msg }).await;
}
}
AllMessages::CandidateBacking(msg) => {
Expand All @@ -518,8 +514,10 @@ where

fn spawn<M: Debug>(
spawner: &mut S,
futures: &mut FuturesUnordered<RemoteHandle<()>>,
streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>,
mut s: Box<dyn Subsystem<M> + Send>,
) -> SubsystemResult<(RemoteHandle<()>, OverseenSubsystem<M>)> {
) -> OverseenSubsystem<M> {
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
let ctx = SubsystemContext::new(to_rx, from_tx);
Expand All @@ -528,20 +526,17 @@ where
let handle = spawner.spawn_with_handle(f.0)
.expect("We need to be able to successfully spawn all subsystems");

streams.push(from_rx);
futures.push(handle);

let instance = Some(SubsystemInstance {
rx: from_rx,
tx: to_tx,
});

Ok(
(
handle,
OverseenSubsystem {
subsystem: s,
instance,
}
)
)
OverseenSubsystem {
subsystem: s,
instance,
}
}

fn spawn_job(&mut self, j: BoxFuture<'static, ()>) -> SubsystemResult<()> {
Expand Down