Skip to content

Commit 0dc4769

Browse files
author
Kai Jewson
authored
sync: add OwnedRwLockReadGuard and OwnedRwLockWriteGuard (#3340)
1 parent 6f896d8 commit 0dc4769

File tree

9 files changed

+814
-10
lines changed

9 files changed

+814
-10
lines changed

tokio/src/sync/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,9 @@ cfg_sync! {
451451

452452
mod rwlock;
453453
pub use rwlock::RwLock;
454+
pub use rwlock::owned_read_guard::OwnedRwLockReadGuard;
455+
pub use rwlock::owned_write_guard::OwnedRwLockWriteGuard;
456+
pub use rwlock::owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
454457
pub use rwlock::read_guard::RwLockReadGuard;
455458
pub use rwlock::write_guard::RwLockWriteGuard;
456459
pub use rwlock::write_guard_mapped::RwLockMappedWriteGuard;

tokio/src/sync/rwlock.rs

Lines changed: 252 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,19 @@ use crate::sync::batch_semaphore::{Semaphore, TryAcquireError};
22
use crate::sync::mutex::TryLockError;
33
use std::cell::UnsafeCell;
44
use std::marker;
5+
use std::marker::PhantomData;
6+
use std::mem::ManuallyDrop;
7+
use std::sync::Arc;
58

9+
pub(crate) mod owned_read_guard;
10+
pub(crate) mod owned_write_guard;
11+
pub(crate) mod owned_write_guard_mapped;
612
pub(crate) mod read_guard;
713
pub(crate) mod write_guard;
814
pub(crate) mod write_guard_mapped;
15+
pub(crate) use owned_read_guard::OwnedRwLockReadGuard;
16+
pub(crate) use owned_write_guard::OwnedRwLockWriteGuard;
17+
pub(crate) use owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
918
pub(crate) use read_guard::RwLockReadGuard;
1019
pub(crate) use write_guard::RwLockWriteGuard;
1120
pub(crate) use write_guard_mapped::RwLockMappedWriteGuard;
@@ -101,13 +110,31 @@ fn bounds() {
101110
check_sync::<RwLockReadGuard<'_, u32>>();
102111
check_unpin::<RwLockReadGuard<'_, u32>>();
103112

113+
check_send::<OwnedRwLockReadGuard<u32, i32>>();
114+
check_sync::<OwnedRwLockReadGuard<u32, i32>>();
115+
check_unpin::<OwnedRwLockReadGuard<u32, i32>>();
116+
104117
check_send::<RwLockWriteGuard<'_, u32>>();
105118
check_sync::<RwLockWriteGuard<'_, u32>>();
106119
check_unpin::<RwLockWriteGuard<'_, u32>>();
107120

108-
let rwlock = RwLock::new(0);
121+
check_send::<RwLockMappedWriteGuard<'_, u32>>();
122+
check_sync::<RwLockMappedWriteGuard<'_, u32>>();
123+
check_unpin::<RwLockMappedWriteGuard<'_, u32>>();
124+
125+
check_send::<OwnedRwLockWriteGuard<u32>>();
126+
check_sync::<OwnedRwLockWriteGuard<u32>>();
127+
check_unpin::<OwnedRwLockWriteGuard<u32>>();
128+
129+
check_send::<OwnedRwLockMappedWriteGuard<u32, i32>>();
130+
check_sync::<OwnedRwLockMappedWriteGuard<u32, i32>>();
131+
check_unpin::<OwnedRwLockMappedWriteGuard<u32, i32>>();
132+
133+
let rwlock = Arc::new(RwLock::new(0));
109134
check_send_sync_val(rwlock.read());
135+
check_send_sync_val(Arc::clone(&rwlock).read_owned());
110136
check_send_sync_val(rwlock.write());
137+
check_send_sync_val(Arc::clone(&rwlock).write_owned());
111138
}
112139

113140
// As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads.
@@ -120,14 +147,42 @@ unsafe impl<T> Sync for RwLock<T> where T: ?Sized + Send + Sync {}
120147
// `T` is `Send`.
121148
unsafe impl<T> Send for RwLockReadGuard<'_, T> where T: ?Sized + Sync {}
122149
unsafe impl<T> Sync for RwLockReadGuard<'_, T> where T: ?Sized + Send + Sync {}
150+
// T is required to be `Send` because an OwnedRwLockReadGuard can be used to drop the value held in
151+
// the RwLock, unlike RwLockReadGuard.
152+
unsafe impl<T, U> Send for OwnedRwLockReadGuard<T, U>
153+
where
154+
T: ?Sized + Send + Sync,
155+
U: ?Sized + Sync,
156+
{
157+
}
158+
unsafe impl<T, U> Sync for OwnedRwLockReadGuard<T, U>
159+
where
160+
T: ?Sized + Send + Sync,
161+
U: ?Sized + Send + Sync,
162+
{
163+
}
123164
unsafe impl<T> Sync for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
165+
unsafe impl<T> Sync for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
124166
unsafe impl<T> Sync for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
167+
unsafe impl<T, U> Sync for OwnedRwLockMappedWriteGuard<T, U>
168+
where
169+
T: ?Sized + Send + Sync,
170+
U: ?Sized + Send + Sync,
171+
{
172+
}
125173
// Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over
126174
// `T` is `Send` - but since this is also provides mutable access, we need to
127175
// make sure that `T` is `Send` since its value can be sent across thread
128176
// boundaries.
129177
unsafe impl<T> Send for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
178+
unsafe impl<T> Send for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
130179
unsafe impl<T> Send for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
180+
unsafe impl<T, U> Send for OwnedRwLockMappedWriteGuard<T, U>
181+
where
182+
T: ?Sized + Send + Sync,
183+
U: ?Sized + Send + Sync,
184+
{
185+
}
131186

132187
impl<T: ?Sized> RwLock<T> {
133188
/// Creates a new instance of an `RwLock<T>` which is unlocked.
@@ -222,6 +277,64 @@ impl<T: ?Sized> RwLock<T> {
222277
}
223278
}
224279

280+
/// Locks this `RwLock` with shared read access, causing the current task
281+
/// to yield until the lock has been acquired.
282+
///
283+
/// The calling task will yield until there are no writers which hold the
284+
/// lock. There may be other readers inside the lock when the task resumes.
285+
///
286+
/// This method is identical to [`RwLock::read`], except that the returned
287+
/// guard references the `RwLock` with an [`Arc`] rather than by borrowing
288+
/// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
289+
/// method, and the guard will live for the `'static` lifetime, as it keeps
290+
/// the `RwLock` alive by holding an `Arc`.
291+
///
292+
/// Note that under the priority policy of [`RwLock`], read locks are not
293+
/// granted until prior write locks, to prevent starvation. Therefore
294+
/// deadlock may occur if a read lock is held by the current task, a write
295+
/// lock attempt is made, and then a subsequent read lock attempt is made
296+
/// by the current task.
297+
///
298+
/// Returns an RAII guard which will drop this read access of the `RwLock`
299+
/// when dropped.
300+
///
301+
/// # Examples
302+
///
303+
/// ```
304+
/// use std::sync::Arc;
305+
/// use tokio::sync::RwLock;
306+
///
307+
/// #[tokio::main]
308+
/// async fn main() {
309+
/// let lock = Arc::new(RwLock::new(1));
310+
/// let c_lock = lock.clone();
311+
///
312+
/// let n = lock.read_owned().await;
313+
/// assert_eq!(*n, 1);
314+
///
315+
/// tokio::spawn(async move {
316+
/// // While main has an active read lock, we acquire one too.
317+
/// let r = c_lock.read_owned().await;
318+
/// assert_eq!(*r, 1);
319+
/// }).await.expect("The spawned task has panicked");
320+
///
321+
/// // Drop the guard after the spawned task finishes.
322+
/// drop(n);
323+
///}
324+
/// ```
325+
pub async fn read_owned(self: Arc<Self>) -> OwnedRwLockReadGuard<T> {
326+
self.s.acquire(1).await.unwrap_or_else(|_| {
327+
// The semaphore was closed. but, we never explicitly close it, and we have a
328+
// handle to it through the Arc, which means that this can never happen.
329+
unreachable!()
330+
});
331+
OwnedRwLockReadGuard {
332+
data: self.c.get(),
333+
lock: ManuallyDrop::new(self),
334+
_p: PhantomData,
335+
}
336+
}
337+
225338
/// Attempts to acquire this `RwLock` with shared read access.
226339
///
227340
/// If the access couldn't be acquired immediately, returns [`TryLockError`].
@@ -268,6 +381,58 @@ impl<T: ?Sized> RwLock<T> {
268381
})
269382
}
270383

384+
/// Attempts to acquire this `RwLock` with shared read access.
385+
///
386+
/// If the access couldn't be acquired immediately, returns [`TryLockError`].
387+
/// Otherwise, an RAII guard is returned which will release read access
388+
/// when dropped.
389+
///
390+
/// This method is identical to [`RwLock::try_read`], except that the
391+
/// returned guard references the `RwLock` with an [`Arc`] rather than by
392+
/// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
393+
/// call this method, and the guard will live for the `'static` lifetime,
394+
/// as it keeps the `RwLock` alive by holding an `Arc`.
395+
///
396+
/// [`TryLockError`]: TryLockError
397+
///
398+
/// # Examples
399+
///
400+
/// ```
401+
/// use std::sync::Arc;
402+
/// use tokio::sync::RwLock;
403+
///
404+
/// #[tokio::main]
405+
/// async fn main() {
406+
/// let lock = Arc::new(RwLock::new(1));
407+
/// let c_lock = lock.clone();
408+
///
409+
/// let v = lock.try_read_owned().unwrap();
410+
/// assert_eq!(*v, 1);
411+
///
412+
/// tokio::spawn(async move {
413+
/// // While main has an active read lock, we acquire one too.
414+
/// let n = c_lock.read_owned().await;
415+
/// assert_eq!(*n, 1);
416+
/// }).await.expect("The spawned task has panicked");
417+
///
418+
/// // Drop the guard when spawned task finishes.
419+
/// drop(v);
420+
/// }
421+
/// ```
422+
pub fn try_read_owned(self: Arc<Self>) -> Result<OwnedRwLockReadGuard<T>, TryLockError> {
423+
match self.s.try_acquire(1) {
424+
Ok(permit) => permit,
425+
Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
426+
Err(TryAcquireError::Closed) => unreachable!(),
427+
}
428+
429+
Ok(OwnedRwLockReadGuard {
430+
data: self.c.get(),
431+
lock: ManuallyDrop::new(self),
432+
_p: PhantomData,
433+
})
434+
}
435+
271436
/// Locks this `RwLock` with exclusive write access, causing the current
272437
/// task to yield until the lock has been acquired.
273438
///
@@ -303,6 +468,48 @@ impl<T: ?Sized> RwLock<T> {
303468
}
304469
}
305470

471+
/// Locks this `RwLock` with exclusive write access, causing the current
472+
/// task to yield until the lock has been acquired.
473+
///
474+
/// The calling task will yield while other writers or readers currently
475+
/// have access to the lock.
476+
///
477+
/// This method is identical to [`RwLock::write`], except that the returned
478+
/// guard references the `RwLock` with an [`Arc`] rather than by borrowing
479+
/// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
480+
/// method, and the guard will live for the `'static` lifetime, as it keeps
481+
/// the `RwLock` alive by holding an `Arc`.
482+
///
483+
/// Returns an RAII guard which will drop the write access of this `RwLock`
484+
/// when dropped.
485+
///
486+
/// # Examples
487+
///
488+
/// ```
489+
/// use std::sync::Arc;
490+
/// use tokio::sync::RwLock;
491+
///
492+
/// #[tokio::main]
493+
/// async fn main() {
494+
/// let lock = Arc::new(RwLock::new(1));
495+
///
496+
/// let mut n = lock.write_owned().await;
497+
/// *n = 2;
498+
///}
499+
/// ```
500+
pub async fn write_owned(self: Arc<Self>) -> OwnedRwLockWriteGuard<T> {
501+
self.s.acquire(MAX_READS as u32).await.unwrap_or_else(|_| {
502+
// The semaphore was closed. but, we never explicitly close it, and we have a
503+
// handle to it through the Arc, which means that this can never happen.
504+
unreachable!()
505+
});
506+
OwnedRwLockWriteGuard {
507+
data: self.c.get(),
508+
lock: ManuallyDrop::new(self),
509+
_p: PhantomData,
510+
}
511+
}
512+
306513
/// Attempts to acquire this `RwLock` with exclusive write access.
307514
///
308515
/// If the access couldn't be acquired immediately, returns [`TryLockError`].
@@ -340,6 +547,50 @@ impl<T: ?Sized> RwLock<T> {
340547
})
341548
}
342549

550+
/// Attempts to acquire this `RwLock` with exclusive write access.
551+
///
552+
/// If the access couldn't be acquired immediately, returns [`TryLockError`].
553+
/// Otherwise, an RAII guard is returned which will release write access
554+
/// when dropped.
555+
///
556+
/// This method is identical to [`RwLock::try_write`], except that the
557+
/// returned guard references the `RwLock` with an [`Arc`] rather than by
558+
/// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
559+
/// call this method, and the guard will live for the `'static` lifetime,
560+
/// as it keeps the `RwLock` alive by holding an `Arc`.
561+
///
562+
/// [`TryLockError`]: TryLockError
563+
///
564+
/// # Examples
565+
///
566+
/// ```
567+
/// use std::sync::Arc;
568+
/// use tokio::sync::RwLock;
569+
///
570+
/// #[tokio::main]
571+
/// async fn main() {
572+
/// let rw = Arc::new(RwLock::new(1));
573+
///
574+
/// let v = Arc::clone(&rw).read_owned().await;
575+
/// assert_eq!(*v, 1);
576+
///
577+
/// assert!(rw.try_write_owned().is_err());
578+
/// }
579+
/// ```
580+
pub fn try_write_owned(self: Arc<Self>) -> Result<OwnedRwLockWriteGuard<T>, TryLockError> {
581+
match self.s.try_acquire(MAX_READS as u32) {
582+
Ok(permit) => permit,
583+
Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
584+
Err(TryAcquireError::Closed) => unreachable!(),
585+
}
586+
587+
Ok(OwnedRwLockWriteGuard {
588+
data: self.c.get(),
589+
lock: ManuallyDrop::new(self),
590+
_p: PhantomData,
591+
})
592+
}
593+
343594
/// Returns a mutable reference to the underlying data.
344595
///
345596
/// Since this call borrows the `RwLock` mutably, no actual locking needs to

0 commit comments

Comments
 (0)