Skip to content
87 changes: 85 additions & 2 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub struct Builder {
/// To run before each worker thread stops
pub(super) before_stop: Option<Callback>,

/// To run before each worker thread is parked.
pub(super) before_park: Option<Callback>,

/// To run after each thread is unparked.
pub(super) after_unpark: Option<Callback>,

/// Customizable keep alive timeout for BlockingPool
pub(super) keep_alive: Option<Duration>,
}
Expand Down Expand Up @@ -135,6 +141,8 @@ impl Builder {
// No worker thread callbacks
after_start: None,
before_stop: None,
before_park: None,
after_unpark: None,

keep_alive: None,
}
Expand Down Expand Up @@ -374,6 +382,79 @@ impl Builder {
self
}

/// Executes function `f` just before a thread is parked (goes idle).
/// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
/// can be called, and may result in this thread being unparked immediately.
///
/// This can be used to start work only when the executor is idle, or for bookkeeping
/// and monitoring purposes.
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
/// # use std::sync::atomic::{AtomicBool, Ordering};
///
/// # pub fn main() {
///
/// let once = AtomicBool::new(true);
///
/// let runtime = runtime::Builder::new_multi_thread()
/// .on_thread_park(move || {
/// if once.swap(false, Ordering::Relaxed) {
/// tokio::spawn(async { println!("thread went idle"); });
/// }
/// })
/// .build();
///
/// runtime.unwrap().block_on(async {
/// tokio::task::yield_now().await;
/// println!("Hello from Tokio!");
/// })
/// # }
/// ```
#[cfg(not(loom))]
pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.before_park = Some(std::sync::Arc::new(f));
self
}

/// Executes function `f` just after a thread unparks (starts executing tasks).
///
/// This is intended for bookkeeping and monitoring use cases; note that work
/// in this callback will increase latencies when the application has allowed one or
/// more runtime threads to go idle.
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let runtime = runtime::Builder::new_multi_thread()
/// .on_thread_unpark(|| {
/// println!("thread unparking");
/// })
/// .build();
///
/// runtime.unwrap().block_on(async {
/// tokio::task::yield_now().await;
/// println!("Hello from Tokio!");
/// })
/// # }
/// ```
#[cfg(not(loom))]
pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.after_unpark = Some(std::sync::Arc::new(f));
self
}

/// Creates the configured `Runtime`.
///
/// The returned `Runtime` instance is ready to spawn tasks.
Expand Down Expand Up @@ -546,7 +627,7 @@ cfg_rt_multi_thread! {

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.before_park.clone(), self.after_unpark.clone());
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

// Create the blocking pool
Expand Down Expand Up @@ -587,7 +668,9 @@ impl fmt::Debug for Builder {
)
.field("thread_stack_size", &self.thread_stack_size)
.field("after_start", &self.after_start.as_ref().map(|_| "..."))
.field("before_stop", &self.after_start.as_ref().map(|_| "..."))
.field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
.field("before_park", &self.before_park.as_ref().map(|_| "..."))
.field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
.finish()
}
}
11 changes: 8 additions & 3 deletions tokio/src/runtime/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub(crate) use worker::block_in_place;

use crate::loom::sync::Arc;
use crate::runtime::task::JoinHandle;
use crate::runtime::Parker;
use crate::runtime::{Callback, Parker};

use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -43,8 +43,13 @@ pub(crate) struct Spawner {
// ===== impl ThreadPool =====

impl ThreadPool {
pub(crate) fn new(size: usize, parker: Parker) -> (ThreadPool, Launch) {
let (shared, launch) = worker::create(size, parker);
pub(crate) fn new(
size: usize,
parker: Parker,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
) -> (ThreadPool, Launch) {
let (shared, launch) = worker::create(size, parker, before_park, after_unpark);
let spawner = Spawner { shared };
let thread_pool = ThreadPool { spawner };

Expand Down
22 changes: 20 additions & 2 deletions tokio/src/runtime/thread_pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use crate::runtime::enter::EnterContext;
use crate::runtime::park::{Parker, Unparker};
use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
use crate::runtime::thread_pool::{AtomicCell, Idle};
use crate::runtime::{queue, task};
use crate::runtime::{queue, task, Callback};
use crate::util::FastRand;

use std::cell::RefCell;
Expand Down Expand Up @@ -137,6 +137,11 @@ pub(super) struct Shared {
/// stolen by a thread that was spawned as part of `block_in_place`.
#[allow(clippy::vec_box)] // we're moving an already-boxed value
shutdown_cores: Mutex<Vec<Box<Core>>>,

/// Callback for a worker parking itself
before_park: Option<Callback>,
/// Callback for a worker unparking itself
after_unpark: Option<Callback>,
}

/// Used to communicate with a worker from other threads.
Expand Down Expand Up @@ -174,7 +179,12 @@ type Notified = task::Notified<Arc<Shared>>;
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);

pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
pub(super) fn create(
size: usize,
park: Parker,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
) -> (Arc<Shared>, Launch) {
let mut cores = vec![];
let mut remotes = vec![];

Expand Down Expand Up @@ -204,6 +214,8 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
before_park,
after_unpark,
});

let mut launch = Launch(vec![]);
Expand Down Expand Up @@ -466,11 +478,17 @@ impl Context {
*self.core.borrow_mut() = Some(core);

// Park thread
if let Some(f) = &self.worker.shared.before_park {
f()
}
if let Some(timeout) = duration {
park.park_timeout(timeout).expect("park failed");
} else {
park.park().expect("park failed");
}
if let Some(f) = &self.worker.shared.after_unpark {
f()
}

// Remove `core` from context
core = self.core.borrow_mut().take().expect("core missing");
Expand Down