Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a245e96
Initial commit
cecton Jul 14, 2020
fe1c372
Move task_manager.rs to mod.rs
cecton Jul 14, 2020
163501a
Graceful shutdown for the task manager
cecton Jul 14, 2020
cf228c0
Await all background task JoinHandle at the same time
cecton Jul 15, 2020
551268b
Add tests
cecton Jul 15, 2020
b3c0648
Make future() wait also for exit signal + fix essential task failed
cecton Jul 15, 2020
2855ada
add comments for non-obvious code
cecton Jul 15, 2020
db34d96
Use clean_shutdown() in sc-cli
cecton Jul 15, 2020
2dcd0b3
Adapt code and upgrade tokio in sc-cli
cecton Jul 15, 2020
42333f4
cleanup spacing in doc
cecton Jul 15, 2020
1198324
Add license
cecton Jul 15, 2020
56c8139
I guess actually running the clean shutdown would be a good idea
cecton Jul 15, 2020
ef3a054
fix tests
cecton Jul 15, 2020
6f656cf
Merge remote-tracking branch 'origin/master' into cecton-async-gracef…
gnunicorn Jul 16, 2020
4e6b8d2
Update client/cli/src/runner.rs
cecton Jul 16, 2020
5dbe179
Improve error logging
cecton Jul 16, 2020
c133c59
disable other tests (can't reproduce on my machine)
cecton Jul 16, 2020
1fb6481
Revert "disable other tests (can't reproduce on my machine)"
cecton Jul 16, 2020
502aba4
It is possible that the tasks are ended first
cecton Jul 16, 2020
1b91a8c
Revert "It is possible that the tasks are ended first"
cecton Jul 16, 2020
ee5e13c
Use single threaded scheduler for more predictability
cecton Jul 16, 2020
4e15214
enable_time
cecton Jul 16, 2020
fbcefa4
Revert "enable_time"
cecton Jul 16, 2020
23b642a
Revert "Use single threaded scheduler for more predictability"
cecton Jul 16, 2020
5b5becb
Revert "Revert "It is possible that the tasks are ended first""
cecton Jul 16, 2020
bc431b6
This cannot be verified either with a threaded pool
cecton Jul 16, 2020
aaf038c
Merge commit 1be02953d4eb521ac1d40e55c71b44e2031ac105 (no conflict)
cecton Jul 17, 2020
2a1a2ac
Merge commit 47be8d939148b0cb0d98d9ba132f082829c12e04 (no conflict)
cecton Jul 17, 2020
c079e31
Merge commit 5c43b2bebb331ebeaac5b6e21778203b1c73aa83 (no conflict)
cecton Jul 21, 2020
fbcb752
Merge commit cd67889e08b8f79af00c159b35f126c1cb106dda (no conflict)
cecton Jul 21, 2020
e4ad84a
Merge commit 86f85949329f0b27b56c41cd02ae30413e62fa5e (conflicts)
cecton Jul 21, 2020
b5e5fe2
Apply suggestions from code review
cecton Jul 21, 2020
5fd98a6
Merge commit e3bb2cea3187fca77fd892becd35294a6cd7daeb (no conflict)
cecton Jul 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add tests
  • Loading branch information
cecton committed Jul 15, 2020
commit 551268b387a8d6c802e538697c4d2b2fe0cee4d4
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,5 @@ substrate-test-runtime-client = { version = "2.0.0-rc4", path = "../../test-util
sp-consensus-babe = { version = "0.8.0-rc4", path = "../../primitives/consensus/babe" }
grandpa = { version = "0.8.0-rc4", package = "sc-finality-grandpa", path = "../finality-grandpa" }
grandpa-primitives = { version = "2.0.0-rc4", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" }
tokio = { version = "0.2", default-features = false }
async-std = { version = "1.6", default-features = false }
27 changes: 8 additions & 19 deletions client/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,34 +277,22 @@ pub(crate) type JoinFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
///
/// ```
/// # use sc_service::TaskExecutor;
/// # mod tokio { pub mod runtime {
/// # #[derive(Clone)]
/// # pub struct Runtime;
/// # impl Runtime {
/// # pub fn new() -> Result<Self, ()> { Ok(Runtime) }
/// # pub fn handle(&self) -> &Self { &self }
/// # pub fn spawn(&self, _: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>)
/// # -> std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>> { Box::pin(async {}) }
/// # }
/// # } }
/// use futures::future::FutureExt;
/// use tokio::runtime::Runtime;
///
/// let runtime = Runtime::new().unwrap();
/// let handle = runtime.handle().clone();
/// let task_executor: TaskExecutor = (move |future, _task_type| {
/// handle.spawn(future)
/// handle.spawn(future).map(|_| ())
/// }).into();
/// ```
///
/// ## Using async-std
///
/// ```
/// # use sc_service::TaskExecutor;
/// # mod async_std { pub mod task {
/// # pub fn spawn(_: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>)
/// # -> std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>> { Box::pin(async {}) }
/// # } }
/// let task_executor: TaskExecutor = (|future, _task_type| {
/// // NOTE: async-std's JoinHandle is not a Result so we don't need to map the result
/// async_std::task::spawn(future)
/// }).into();
/// ```
Expand All @@ -317,12 +305,13 @@ impl std::fmt::Debug for TaskExecutor {
}
}

impl<F> std::convert::From<F> for TaskExecutor
impl<F, FUT> std::convert::From<F> for TaskExecutor
where
F: Fn(SomeFuture, TaskType) -> JoinFuture + Send + Sync + 'static,
F: Fn(SomeFuture, TaskType) -> FUT + Send + Sync + 'static,
FUT: Future<Output = ()> + Send + 'static,
{
fn from(x: F) -> Self {
Self(Arc::new(x))
fn from(func: F) -> Self {
Self(Arc::new(move |fut, tt| Box::pin(func(fut, tt))))
}
}

Expand Down
15 changes: 9 additions & 6 deletions client/service/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_u
use crate::{config::{TaskExecutor, TaskType, JoinFuture}, Error};

mod prometheus_future;
#[cfg(test)]
mod tests;

/// An handle for spawning tasks in the service.
#[derive(Clone)]
Expand Down Expand Up @@ -119,7 +121,8 @@ impl SpawnTaskHandle {

let join_handle = self.executor.spawn(Box::pin(future), task_type);
let mut task_notifier = self.task_notifier.clone();
self.executor.spawn(Box::pin(async move {
self.executor.spawn(
Box::pin(async move {
if let Err(err) = task_notifier.send(join_handle).await {
error!("Could not send spawned task handle to queue: {}", err);
}
Expand Down Expand Up @@ -256,9 +259,10 @@ impl TaskManager {
let metrics = prometheus_registry.map(Metrics::register).transpose()?;

let (task_notifier, background_tasks) = tracing_unbounded("mpsc_background_tasks");
let completion_future = executor.spawn(Box::pin(
background_tasks.for_each_concurrent(None, |x| x)
), TaskType::Async);
let completion_future = executor.spawn(
Box::pin(background_tasks.for_each_concurrent(None, |x| x)),
TaskType::Async,
);

Ok(Self {
on_exit,
Expand All @@ -273,7 +277,6 @@ impl TaskManager {
})
}


/// Get a handle for spawning tasks.
pub fn spawn_handle(&self) -> SpawnTaskHandle {
SpawnTaskHandle {
Expand All @@ -290,7 +293,7 @@ impl TaskManager {
}

/// Send the signal for termination, prevent new tasks to be created, await for all the existing
/// tasks to finished.
/// tasks to be finished and drop the object. You can consider this as an async drop.
pub fn clean_shutdown(mut self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
self.terminate();
let keep_alive = self.keep_alive;
Expand Down
148 changes: 148 additions & 0 deletions client/service/src/task_manager/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use crate::config::TaskExecutor;
use crate::task_manager::TaskManager;
use futures::future::FutureExt;
use parking_lot::Mutex;
use std::any::Any;
use std::sync::Arc;
use std::time::Duration;

#[derive(Clone)]
struct DropTester(Arc<Mutex<usize>>);

struct DropTesterRef(DropTester);

impl DropTester {
fn new() -> DropTester {
DropTester(Arc::new(Mutex::new(0)))
}

fn new_ref(&self) -> DropTesterRef {
*self.0.lock() += 1;
DropTesterRef(self.clone())
}

fn assert_eq(&self, n: usize) {
assert_eq!(*self.0.lock(), n, "unexpected value for drop tester");
}
}

impl Drop for DropTesterRef {
fn drop(&mut self) {
*(self.0).0.lock() -= 1;
}
}

#[test]
fn ensure_drop_tester_working() {
let drop_tester = DropTester::new();
drop_tester.assert_eq(0);
let drop_tester_ref_1 = drop_tester.new_ref();
drop_tester.assert_eq(1);
let drop_tester_ref_2 = drop_tester.new_ref();
drop_tester.assert_eq(2);
drop(drop_tester_ref_1);
drop_tester.assert_eq(1);
drop(drop_tester_ref_2);
drop_tester.assert_eq(0);
}

async fn run_background_task(_keep_alive: impl Any) {
loop {
tokio::time::delay_for(Duration::from_secs(1)).await;
}
}

async fn run_background_task_blocking(duration: Duration, _keep_alive: impl Any) {
loop {
// block for X sec (not interruptible)
std::thread::sleep(duration);
// await for 1 sec (interruptible)
tokio::time::delay_for(Duration::from_secs(1)).await;
}
}

#[test]
fn ensure_futures_are_awaited_on_shutdown() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();

let task_manager = TaskManager::new(task_executor, None).unwrap();
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
drop_tester.assert_eq(2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await });
drop_tester.assert_eq(2);
runtime.block_on(task_manager.clean_shutdown());
drop_tester.assert_eq(0);
}

#[test]
fn ensure_keep_alive_during_shutdown() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();

let mut task_manager = TaskManager::new(task_executor, None).unwrap();
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
task_manager.keep_alive(drop_tester.new_ref());
spawn_handle.spawn("task1", run_background_task(()));
drop_tester.assert_eq(1);
// allow the tasks to even start
runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await });
drop_tester.assert_eq(1);
runtime.block_on(task_manager.clean_shutdown());
drop_tester.assert_eq(0);
}

#[test]
fn ensure_blocking_futures_are_awaited_on_shutdown() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();

let task_manager = TaskManager::new(task_executor, None).unwrap();
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn(
"task1",
run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()),
);
spawn_handle.spawn(
"task2",
run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()),
);
drop_tester.assert_eq(2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await });
drop_tester.assert_eq(2);
runtime.block_on(task_manager.clean_shutdown());
drop_tester.assert_eq(0);
}

#[test]
fn ensure_no_task_can_be_spawn_after_terminate() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into();

let mut task_manager = TaskManager::new(task_executor, None).unwrap();
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref()));
drop_tester.assert_eq(2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await });
drop_tester.assert_eq(2);
task_manager.terminate();
spawn_handle.spawn("task3", run_background_task(drop_tester.new_ref()));
// NOTE: task3 will not increase the count because it has been ignored
drop_tester.assert_eq(2);
runtime.block_on(task_manager.clean_shutdown());
drop_tester.assert_eq(0);
}