Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixes from review
Move function from impl block.
Do not panic but resolve with errors if spawner fails or subsystem
resolves.
  • Loading branch information
montekki committed Jun 1, 2020
commit 7462573afec84d1a727a36d97e6376a6988cf9da
2 changes: 1 addition & 1 deletion overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ fn main() {
Box::new(Subsystem2),
Box::new(Subsystem1),
spawner,
);
).unwrap();
let overseer_fut = overseer.run().fuse();
let timer_stream = timer_stream;

Expand Down
109 changes: 58 additions & 51 deletions overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use futures::{
pending, poll, select,
future::{BoxFuture, RemoteHandle},
stream::FuturesUnordered,
task::{Spawn, SpawnExt},
task::{Spawn, SpawnError, SpawnExt},
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
Expand Down Expand Up @@ -92,6 +92,12 @@ impl From<oneshot::Canceled> for SubsystemError {
}
}

impl From<SpawnError> for SubsystemError {
fn from(_: SpawnError) -> Self {
Self
}
}

/// A `Result` type that wraps [`SubsystemError`].
///
/// [`SubsystemError`]: struct.SubsystemError.html
Expand Down Expand Up @@ -222,6 +228,8 @@ pub enum OverseerSignal {
StartWork,
/// `Subsystem` should stop working.
StopWork,
/// Conclude the work of the `Overseer` and all `Subsystem`s.
Conclude,
}

#[derive(Debug)]
Expand Down Expand Up @@ -447,7 +455,7 @@ where
/// Box::new(ValidationSubsystem),
/// Box::new(CandidateBackingSubsystem),
/// spawner,
/// );
/// ).unwrap();
///
/// let timer = Delay::new(Duration::from_secs(1)).fuse();
///
Expand All @@ -466,7 +474,7 @@ where
validation: Box<dyn Subsystem<ValidationSubsystemMessage> + Send>,
candidate_backing: Box<dyn Subsystem<CandidateBackingSubsystemMessage> + Send>,
mut s: S,
) -> (Self, OverseerHandler) {
) -> SubsystemResult<(Self, OverseerHandler)> {
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);

let handler = OverseerHandler {
Expand All @@ -476,19 +484,19 @@ where
let mut running_subsystems_rx = StreamUnordered::new();
let mut running_subsystems = FuturesUnordered::new();

let validation_subsystem = Self::spawn(
let validation_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
validation,
);
)?;

let candidate_backing_subsystem = Self::spawn(
let candidate_backing_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
candidate_backing,
);
)?;

let this = Self {
validation_subsystem,
Expand All @@ -499,17 +507,17 @@ where
events_rx,
};

(this, handler)
Ok((this, handler))
}

// Stop the overseer.
async fn stop(mut self) {
if let Some(ref mut s) = self.validation_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::StopWork)).await;
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}

if let Some(ref mut s) = self.candidate_backing_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::StopWork)).await;
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}

let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse();
Expand All @@ -528,14 +536,17 @@ where
}

/// Run the `Overseer`.
pub async fn run(mut self) {
pub async fn run(mut self) -> SubsystemResult<()> {
loop {
while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) {
match msg {
Event::MsgToSubsystem(msg) => {
self.route_message(msg).await;
}
Event::Stop => return self.stop().await,
Event::Stop => {
self.stop().await;
return Ok(());
}
_ => ()
}
}
Expand All @@ -555,7 +566,9 @@ where

// Some subsystem exited? It's time to panic.
if let Poll::Ready(Some(finished)) = poll!(self.running_subsystems.next()) {
panic!("Subsystem finished unexpectedly {:?}", finished);
log::error!("Subsystem finished unexpectedly {:?}", finished);
self.stop().await;
return Err(SubsystemError);
}

// Looks like nothing is left to be polled, let's take a break.
Expand All @@ -578,38 +591,37 @@ where
}
}

fn spawn<M: Debug>(
spawner: &mut S,
futures: &mut FuturesUnordered<RemoteHandle<()>>,
streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>,
mut s: Box<dyn Subsystem<M> + Send>,
) -> OverseenSubsystem<M> {
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
let ctx = SubsystemContext::new(to_rx, from_tx);
let f = s.start(ctx);

let handle = spawner.spawn_with_handle(f.0)
.expect("We need to be able to successfully spawn all subsystems");

streams.push(from_rx);
futures.push(handle);

let instance = Some(SubsystemInstance {
tx: to_tx,
});

OverseenSubsystem {
subsystem: s,
instance,
}
}

fn spawn_job(&mut self, j: BoxFuture<'static, ()>) -> SubsystemResult<()> {
self.s.spawn(j).map_err(|_| SubsystemError)
}
}

fn spawn<S: Spawn, M: Debug>(
spawner: &mut S,
futures: &mut FuturesUnordered<RemoteHandle<()>>,
streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>,
mut s: Box<dyn Subsystem<M> + Send>,
) -> SubsystemResult<OverseenSubsystem<M>> {
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
let ctx = SubsystemContext::new(to_rx, from_tx);
let f = s.start(ctx);

let handle = spawner.spawn_with_handle(f.0)?;

streams.push(from_rx);
futures.push(handle);

let instance = Some(SubsystemInstance {
tx: to_tx,
});

Ok(OverseenSubsystem {
subsystem: s,
instance,
})
}

#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -694,7 +706,7 @@ mod tests {
Box::new(TestSubsystem1(s1_tx)),
Box::new(TestSubsystem2(s2_tx)),
spawner,
);
).unwrap();
let overseer_fut = overseer.run().fuse();

pin_mut!(overseer_fut);
Expand Down Expand Up @@ -730,13 +742,10 @@ mod tests {
});
}

// Spawn a subsystem that immediately exits. This should panic:
// Spawn a subsystem that immediately exits.
//
// Subsystems are long-lived worker tasks that are in charge of performing
// some particular kind of work. All subsystems can communicate with each
// other via a well-defined protocol.
// Should immediately conclude the overseer itself with an error.
#[test]
#[should_panic]
fn overseer_panics_on_sybsystem_exit() {
let spawner = executor::ThreadPool::new().unwrap();

Expand All @@ -746,15 +755,13 @@ mod tests {
Box::new(TestSubsystem1(s1_tx)),
Box::new(TestSubsystem4),
spawner,
);
).unwrap();
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);

loop {
select! {
a = overseer_fut => break,
complete => break,
}
select! {
res = overseer_fut => assert!(res.is_err()),
complete => (),
}
})
}
Expand Down