Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions client/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
pin_mut!(f);

tokio_runtime.block_on(main(f))?;
tokio_runtime.block_on(task_manager.clean_shutdown());
drop(task_manager);

Ok(())
}
Expand Down Expand Up @@ -154,7 +154,6 @@ impl<C: SubstrateCli> Runner<C> {
self.print_node_infos();
let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?;
let res = self.tokio_runtime.block_on(main(task_manager.future().fuse()));
self.tokio_runtime.block_on(task_manager.clean_shutdown());
Ok(res?)
}

Expand Down
83 changes: 12 additions & 71 deletions client/service/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@
use crate::{config::TaskType, Error};
use exit_future::Signal;
use futures::{
future::{join_all, pending, select, try_join_all, BoxFuture, Either},
future::{pending, select, try_join_all, BoxFuture, Either},
Future, FutureExt, StreamExt,
};
use log::debug;
use prometheus_endpoint::{
exponential_buckets, register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError,
Registry, U64,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use std::{panic, pin::Pin, result::Result};
use tokio::{runtime::Handle, task::JoinHandle};
use tokio::runtime::Handle;
use tracing_futures::Instrument;

mod prometheus_future;
Expand Down Expand Up @@ -73,7 +72,6 @@ pub struct SpawnTaskHandle {
on_exit: exit_future::Exit,
tokio_handle: Handle,
metrics: Option<Metrics>,
task_notifier: TracingUnboundedSender<JoinHandle<()>>,
}

impl SpawnTaskHandle {
Expand Down Expand Up @@ -113,11 +111,6 @@ impl SpawnTaskHandle {
task: impl Future<Output = ()> + Send + 'static,
task_type: TaskType,
) {
if self.task_notifier.is_closed() {
debug!("Attempt to spawn a new task has been prevented: {}", name);
return
}

let on_exit = self.on_exit.clone();
let metrics = self.metrics.clone();

Expand Down Expand Up @@ -169,17 +162,17 @@ impl SpawnTaskHandle {
}
.in_current_span();

let join_handle = match task_type {
TaskType::Async => self.tokio_handle.spawn(future),
match task_type {
TaskType::Async => {
self.tokio_handle.spawn(future);
},
TaskType::Blocking => {
let handle = self.tokio_handle.clone();
self.tokio_handle.spawn_blocking(move || {
handle.block_on(future);
})
});
},
};

let _ = self.task_notifier.unbounded_send(join_handle);
}
}
}

Expand Down Expand Up @@ -288,8 +281,8 @@ pub struct TaskManager {
/// A future that resolves when the service has exited, this is useful to
/// make sure any internally spawned futures stop when the service does.
on_exit: exit_future::Exit,
/// A signal that makes the exit future above resolve, fired on service drop.
signal: Option<Signal>,
/// A signal that makes the exit future above resolve, fired on drop.
_signal: Signal,
/// Tokio runtime handle that is used to spawn futures.
tokio_handle: Handle,
/// Prometheus metric where to report the polling times.
Expand All @@ -301,10 +294,6 @@ pub struct TaskManager {
essential_failed_rx: TracingUnboundedReceiver<()>,
/// Things to keep alive until the task manager is dropped.
keep_alive: Box<dyn std::any::Any + Send>,
/// A sender to a stream of background tasks. This is used for the completion future.
task_notifier: TracingUnboundedSender<JoinHandle<()>>,
/// This future will complete when all the tasks are joined and the stream is closed.
completion_future: JoinHandle<()>,
/// A list of other `TaskManager`'s to terminate and gracefully shutdown when the parent
/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
/// task fails.
Expand All @@ -325,25 +314,14 @@ impl TaskManager {

let metrics = prometheus_registry.map(Metrics::register).transpose()?;

let (task_notifier, background_tasks) = tracing_unbounded("mpsc_background_tasks");
// NOTE: for_each_concurrent will await on all the JoinHandle futures at the same time. It
// is possible to limit this but it's actually better for the memory foot print to await
// them all to not accumulate anything on that stream.
let completion_future =
tokio_handle.spawn(background_tasks.for_each_concurrent(None, |x| async move {
let _ = x.await;
}));

Ok(Self {
on_exit,
signal: Some(signal),
_signal: signal,
tokio_handle,
metrics,
essential_failed_tx,
essential_failed_rx,
keep_alive: Box::new(()),
task_notifier,
completion_future,
children: Vec::new(),
})
}
Expand All @@ -354,7 +332,6 @@ impl TaskManager {
on_exit: self.on_exit.clone(),
tokio_handle: self.tokio_handle.clone(),
metrics: self.metrics.clone(),
task_notifier: self.task_notifier.clone(),
}
}

Expand All @@ -363,36 +340,12 @@ impl TaskManager {
SpawnEssentialTaskHandle::new(self.essential_failed_tx.clone(), self.spawn_handle())
}

/// Send the signal for termination, prevent new tasks to be created, await for all the existing
/// tasks to be finished and drop the object. You can consider this as an async drop.
///
/// It's always better to call and await this function before exiting the process as background
/// tasks may be running in the background. If the process exit and the background tasks are not
/// cancelled, this will lead to objects not getting dropped properly.
///
/// This is an issue in some cases as some of our dependencies do require that we drop all the
/// objects properly otherwise it triggers a SIGABRT on exit.
pub fn clean_shutdown(mut self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
self.terminate();
let children_shutdowns = self.children.into_iter().map(|x| x.clean_shutdown());
let keep_alive = self.keep_alive;
let completion_future = self.completion_future;

Box::pin(async move {
join_all(children_shutdowns).await;
let _ = completion_future.await;

let _ = keep_alive;
})
}

/// Return a future that will end with success if the signal to terminate was sent
/// (`self.terminate()`) or with an error if an essential task fails.
///
/// # Warning
///
/// This function will not wait until the end of the remaining task. You must call and await
/// `clean_shutdown()` after this.
/// This function will not wait until the end of the remaining task.
pub fn future<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
Expand All @@ -417,18 +370,6 @@ impl TaskManager {
})
}

/// Signal to terminate all the running tasks.
pub fn terminate(&mut self) {
if let Some(signal) = self.signal.take() {
let _ = signal.fire();
// NOTE: this will prevent new tasks to be spawned
self.task_notifier.close_channel();
for child in self.children.iter_mut() {
child.terminate();
}
}
}

/// Set what the task manager should keep alive, can be called multiple times.
pub fn keep_alive<T: 'static + Send>(&mut self, to_keep_alive: T) {
// allows this fn to safely called multiple times.
Expand Down
Loading