Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address review
  • Loading branch information
b-naber committed Mar 26, 2021
commit fdadf94457761ef1befcfac4084fe68390c68c0e
2 changes: 1 addition & 1 deletion tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ cfg_sync! {
pub(crate) use task::AtomicWaker;

mod once_cell;
pub use self::once_cell::{OnceCell, NotInitializedError, AlreadyInitializedError};
pub use self::once_cell::{OnceCell, NotInitializedError, SetError};

pub mod watch;
}
Expand Down
191 changes: 118 additions & 73 deletions tokio/src/sync/once_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ use std::error::Error;
use std::fmt;
use std::future::Future;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};

/// A thread-safe cell which can be written to only once.
///
/// Provides the functionality to either set the value, in case `OnceCell`
/// is uninitialized, or get the already initialized value by using an async
/// function via [`OnceCell::get_or_init_with`] or by using a Future via
/// [`OnceCell::get_or_init`] directly via [`OnceCell::get_or_init`].
/// function via [`OnceCell::get_or_init_with`].
///
/// [`OnceCell::get_or_init_with`]: crate::sync::OnceCell::get_or_init_with
/// [`OnceCell::get_or_init`]: crate::sync::OnceCell::get_or_init
///
/// # Examples
/// ```
Expand All @@ -24,7 +23,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
/// 1 + 1
/// }
///
/// static ONCE: OnceCell<u32> = OnceCell::new();
/// static ONCE: OnceCell<u32> = OnceCell::const_new();
///
/// #[tokio::main]
/// async fn main() {
Expand Down Expand Up @@ -76,10 +75,19 @@ impl<T: PartialEq> PartialEq for OnceCell<T> {
impl<T: Eq> Eq for OnceCell<T> {}

impl<T> OnceCell<T> {
/// Creates a new uninitialized OnceCell instance.
pub fn new() -> Self {
OnceCell {
value_set: AtomicBool::new(false),
value: UnsafeCell::new(MaybeUninit::uninit()),
semaphore: Semaphore::new(1),
}
}

/// Creates a new uninitialized OnceCell instance.
#[cfg(all(feature = "parking_lot", not(all(loom, test)),))]
#[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
pub const fn new() -> Self {
pub const fn const_new() -> Self {
OnceCell {
value_set: AtomicBool::new(false),
value: UnsafeCell::new(MaybeUninit::uninit()),
Expand All @@ -97,6 +105,12 @@ impl<T> OnceCell<T> {
&*self.value.with(|ptr| (*ptr).as_ptr())
}

// SAFETY: safe to call only once self.initialized() is true and since
// only one mutable reference can call this method
unsafe fn get_unchecked_mut(&mut self) -> &mut T {
&mut *self.value.with_mut(|ptr| (*ptr).as_mut_ptr())
}

// SAFETY: safe to call only once a permit on the semaphore has been
// acquired
unsafe fn set_value(&self, value: T) {
Expand All @@ -115,27 +129,38 @@ impl<T> OnceCell<T> {
if self.initialized() {
Ok(unsafe { self.get_unchecked() })
} else {
Err(NotInitializedError)
Err(NotInitializedError(()))
}
}

/// If the cell is initialized, this method returns a mutable reference to its value,
/// otherwise returns [`NotInitializedError`].
///
/// [`NotInitializedError`]: crate::sync::NotInitializederror
pub fn get_mut(&mut self) -> Result<&mut T, NotInitializedError> {
if self.initialized() {
Ok(unsafe { self.get_unchecked_mut() })
} else {
Err(NotInitializedError(()))
}
}

/// Sets the value of the OnceCell to the argument value.
///
/// If the value of the OnceCell was already set prior to this call
/// or some other set is currently initializing the cell, then
/// [`AlreadyInitializedError`] is returned. In order to wait
/// for an ongoing initialization to finish, call [`OnceCell::get_or_init`]
/// or [`OnceCell::get_or_init_with`] instead.
/// then [`AlreadyInitializedError`] is returned. If another thread
/// is initializing the cell while this method is called,
/// ['InitializingError`] is returned. In order to wait
/// for an ongoing initialization to finish, call
/// [`OnceCell::get_or_init_with`] instead.
///
/// [`AlreadyInitializedError`]: crate::sync::AlreadyInitializedError
/// ['OnceCell::get_or_init`]: crate::sync::OnceCell::get_or_init
/// [`InitializingError`]: crate::sync::InitializingError
/// ['OnceCell::get_or_init_with`]: crate::sync::OnceCell::get_or_init_with
pub fn set(&self, value: T) -> Result<(), AlreadyInitializedError> {
pub fn set(&self, value: T) -> Result<(), SetError> {
if !self.initialized() {
// After acquire().await we have either acquired a permit while self.value
// is still uninitialized, or another thread has intialized the value and
// closed the semaphore, in which case self.initialized is true and we
// don't set the value
// Another thread might be initializing the cell, in which case `try_acquire` will
// return an error
match self.semaphore.try_acquire() {
Ok(_permit) => {
if !self.initialized() {
Expand All @@ -151,16 +176,30 @@ impl<T> OnceCell<T> {
}
}
_ => {
// Couldn't acquire the permit, look if initializing process is already completed
if !self.initialized() {
panic!(
"couldn't acquire a permit even though OnceCell value is uninitialized."
);
return Err(SetError::InitializingError(()));
}
}
}
}

Err(AlreadyInitializedError)
Err(SetError::AlreadyInitializedError(()))
}

/// Tries to set the value of the cell, overwriting the previously set value, in case one is
/// available. If no value was previously set, this method has the same functionality has
/// [`OnceCell::set`].
///
/// [`OnceCell::set`]: crate::sync::OnceCell::set
pub fn set_mut(&mut self, value: T) -> Result<(), SetError> {
if self.initialized() {
// SAFETY: Setting this value is safe because the mutable reference guarantees exclusivity
unsafe { self.set_value(value) };
Ok(())
} else {
self.set(value)
}
}

/// Tries to initialize the value of the OnceCell using the async function `f`.
Expand Down Expand Up @@ -217,56 +256,25 @@ impl<T> OnceCell<T> {
}
}

/// Tries to initialize the value of the `OnceCell` using the the Future `f`.
/// If the value of the `OnceCell` was already initialized prior to this call,
/// a reference to that initialized value is returned. If some other thread
/// initiated the initialization prior to this call and the initialization
/// hasn't completed, this call waits until the initialization is finished.
///
/// This will deadlock if `f` internally tries to initialize the cell itself.
pub async fn get_or_init<F>(&self, f: F) -> &T
where
F: Future<Output = T>,
{
/// Moves the value out of the cell and drops the cell afterwards.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Moves the value out of the cell and drops the cell afterwards.
/// Moves the value out of the cell, destroying the cell in the process.

pub fn into_inner(self) -> Result<T, NotInitializedError> {
if self.initialized() {
// SAFETY: once the value is initialized, no mutable references are given out, so
// we can give out arbitrarily many immutable references
return unsafe { self.get_unchecked() };
Ok(unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) })
} else {
// After acquire().await we have either acquired a permit while self.value
// is still uninitialized, or current thread is awoken after another thread
// has intialized the value and closed the semaphore, in which case self.initialized
// is true and we don't set the value here
match self.semaphore.acquire().await {
Ok(_permit) => {
if !self.initialized() {
// If `f` panics or `select!` is called, this `get_or_init` call
// is aborted and the semaphore permit is dropped.
let value = f.await;

// SAFETY: There is only one permit on the semaphore, hence only one
// mutable reference is created
unsafe { self.set_value(value) };
Err(NotInitializedError(()))
}
}

// SAFETY: once the value is initialized, no mutable references are given out, so
// we can give out arbitrarily many immutable references
return unsafe { self.get_unchecked() };
} else {
unreachable!("acquired semaphore after value was already initialized.");
}
}
Err(_) => {
if self.initialized() {
// SAFETY: once the value is initialized, no mutable references are given out, so
// we can give out arbitrarily many immutable references
return unsafe { self.get_unchecked() };
} else {
unreachable!(
"Semaphore closed, but the OnceCell has not been initialized."
);
}
}
}
/// Takes ownership of the current value, leaving the cell unitialized.
pub fn take(&mut self) -> Result<T, NotInitializedError> {
if self.initialized() {
// Note: ptr::read does not move the value out of `self.value`. We need to use
// `self.initialized` to prevent incorrect accesses to value
let value = unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) };
self.value_set.store(false, Ordering::Release);
Ok(value)
} else {
Err(NotInitializedError(()))
}
}
}
Expand All @@ -284,25 +292,62 @@ unsafe impl<T: Sync + Send> Sync for OnceCell<T> {}
// it's safe to send it to another thread
unsafe impl<T: Send> Send for OnceCell<T> {}

/// Error returned from the [`OnceCell::set`] method
/// Errors that can be returned from [`OnceCell::set`] and
/// [`OnceCell::set_mut`].
///
/// [`OnceCell::set`]: crate::sync::OnceCell::set
/// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut
#[derive(Debug, PartialEq)]
pub struct AlreadyInitializedError;
pub enum SetError {
/// Error resulting from [`OnceCell::set`] or [`OnceCell::set_mut`] calls if
/// the cell was previously initialized.
///
/// [`OnceCell::set`]: crate::sync::OnceCell::set
/// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut
AlreadyInitializedError(()),

impl fmt::Display for AlreadyInitializedError {
/// Error resulting from [`OnceCell::set`] or [`OnceCell::set_mut`] calls when
/// the cell is currently being inintialized during the calls to those methods.
///
/// [`OnceCell::set`]: crate::sync::OnceCell::set
/// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut
InitializingError(()),
}

impl fmt::Display for SetError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "AlreadyInitializedError")
match self {
SetError::AlreadyInitializedError(_) => write!(f, "AlreadyInitializedError"),
SetError::InitializingError(_) => write!(f, "InitializingError"),
}
}
}

impl Error for AlreadyInitializedError {}
impl Error for SetError {}

impl SetError {
/// Whether `SetError` is `SetError::AlreadyInitializEderror`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a typo here. Also, please put a period at the end (also for the other method).

pub fn is_already_init_err(&self) -> bool {
match self {
SetError::AlreadyInitializedError(_) => true,
SetError::InitializingError(_) => false,
}
}

/// Whether `SetError` is `SetError::InitializingError`
pub fn is_initializing_err(&self) -> bool {
match self {
SetError::AlreadyInitializedError(_) => false,
SetError::InitializingError(_) => true,
}
}
}

/// Error returned from the [`OnceCell::get`] method
///
/// [`OnceCell::get`]: crate::sync::OnceCell::get
#[derive(Debug, PartialEq)]
pub struct NotInitializedError;
pub struct NotInitializedError(());

impl fmt::Display for NotInitializedError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
30 changes: 7 additions & 23 deletions tokio/tests/async_send_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
#![cfg(feature = "full")]

use std::cell::Cell;
use std::future::Future;
use std::io::{Cursor, SeekFrom};
use std::net::SocketAddr;
use std::pin::Pin;
use std::rc::Rc;
use tokio::net::TcpStream;
use tokio::time::{Duration, Instant};
Expand Down Expand Up @@ -270,37 +272,19 @@ async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init_with(
async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = u8> + Send>>): Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = u8>>>): Send & !Sync);
_, fn() -> Pin<Box<dyn Future<Output = u8>>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send + Sync>>): Send & Sync);
_, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send + Sync>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send>>): Send & !Sync);
_, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = Cell<u8>>>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send + Sync>>): Send & Sync);
_, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send + Sync>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send>>): Send & !Sync);
_, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = Rc<u8>>>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init(
_, Pin<Box<dyn Future<Output = u8> + Send + Sync>>): Send & Sync);
async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init(
_, Pin<Box<dyn Future<Output = u8> + Send>>): Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init(
_, Pin<Box<dyn Future<Output = u8>>>): Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init(
_, Pin<Box<dyn Future<Output = Cell<u8>> + Send + Sync>>): Send & Sync);
async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init(
_, Pin<Box<dyn Future<Output = Cell<u8>> + Send>>): Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init(
_, Pin<Box<dyn Future<Output = Cell<u8>>>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init(
_, Pin<Box<dyn Future<Output = Rc<u8>> + Send + Sync>>): Send & Sync);
async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init(
_, Pin<Box<dyn Future<Output = Rc<u8>> + Send>>): Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init(
_, Pin<Box<dyn Future<Output = Rc<u8>>>>): !Send & !Sync);
assert_value!(tokio::sync::OnceCell<u8>: Send & Sync);
assert_value!(tokio::sync::OnceCell<Cell<u8>>: Send & !Sync);
assert_value!(tokio::sync::OnceCell<Rc<u8>>: !Send & !Sync);
Expand Down
Loading