From 91078acd6e825b2aa124a5b21e463ff3d56290fe Mon Sep 17 00:00:00 2001 From: Daniel Sharifi Date: Thu, 4 Sep 2025 18:26:32 +0000 Subject: [PATCH 01/15] fix: do not return `RecvError` on unseen values in `changed` and `has_changed` --- tokio/src/sync/watch.rs | 19 ++++++++----- tokio/tests/sync_watch.rs | 56 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 884efffa30c..c18d823293c 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -643,7 +643,8 @@ impl Receiver { /// messages for equality, so this call will return true even if the new /// message is equal to the old message. /// - /// Returns an error if the channel has been closed. + /// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__ + /// the latest value is seen. /// # Examples /// /// ``` @@ -669,13 +670,16 @@ impl Receiver { pub fn has_changed(&self) -> Result { // Load the version from the state let state = self.shared.state.load(); - if state.is_closed() { - // The sender has dropped. - return Err(error::RecvError(())); - } let new_version = state.version(); - Ok(self.version != new_version) + let last_value_is_seen = self.version == new_version; + let sender_has_dropped = state.is_closed(); + + if sender_has_dropped && last_value_is_seen { + Err(error::RecvError(())) + } else { + Ok(!last_value_is_seen) + } } /// Marks the state as changed. @@ -709,7 +713,8 @@ impl Receiver { /// method sleeps until a new message is sent by the [`Sender`] connected to /// this `Receiver`, or until the [`Sender`] is dropped. /// - /// This method returns an error if and only if the [`Sender`] is dropped. + /// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__ + /// the latest value is seen. /// /// For more information, see /// [*Change notifications*](self#change-notifications) in the module-level documentation. diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 4418f88e57b..eee347d5a8c 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -450,3 +450,59 @@ async fn sender_closed_is_cooperative() { _ = tokio::task::yield_now() => {}, } } + +#[tokio::test] +async fn changed_does_not_error_on_closed_channel_with_unseen_value() { + let (tx, mut rx) = watch::channel("A"); + tx.send("B").unwrap(); + + drop(tx); + + rx.changed() + .await + .expect("`changed` call does not return an error if the last value is not seen."); +} + +#[tokio::test] +async fn has_changed_does_not_error_on_closed_channel_with_unseen_value() { + let (tx, rx) = watch::channel("A"); + tx.send("B").unwrap(); + + drop(tx); + + let has_changed = rx + .has_changed() + .expect("`has_changed` call does not return an error if the last value is not seen."); + + assert!(has_changed, "Latest value is not seen"); +} + +#[tokio::test] +async fn changed_errors_on_closed_channel_with_seen_value() { + let (tx, mut rx) = watch::channel("A"); + drop(tx); + + rx.changed() + .await + .expect_err("`changed` call returns an error if the last value is seen."); +} + +#[tokio::test] +async fn has_changed_errors_on_closed_channel_with_seen_value() { + let (tx, rx) = watch::channel("A"); + drop(tx); + + rx.has_changed() + .expect_err("`has_changed` call returns an error if the last value is seen."); +} + +#[tokio::test] +async fn wait_for_errors_on_closed_channel_true_predicate() { + let (tx, mut rx) = watch::channel("A"); + tx.send("B").unwrap(); + drop(tx); + + rx.wait_for(|_| true).await.expect( + "`wait_for` call does not return error even if channel is closed when predicate is true for last value.", + ); +} From d589fab982e433f63293cda21730a6d284debcdc Mon Sep 17 00:00:00 2001 From: Daniel Sharifi Date: Thu, 4 Sep 2025 19:09:58 +0000 Subject: [PATCH 02/15] docs: improve documentation and consistency of terms --- tokio/src/sync/watch.rs | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index c18d823293c..981c408445b 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -29,8 +29,10 @@ //! The [`Receiver`] half provides an asynchronous [`changed`] method. This //! method is ready when a new, *unseen* value is sent via the [`Sender`] half. //! -//! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or -//! `Err(`[`error::RecvError`]`)` if the [`Sender`] has been dropped. +//! * [`Receiver::changed()`] returns: +//! * `Ok(())` on receiving a new value. +//! * `Err(`[`RecvError`](error::RecvError)`)` if the +//! channel has been closed __AND__ the current value is *seen*. //! * If the current value is *unseen* when calling [`changed`], then //! [`changed`] will return immediately. If the current value is *seen*, then //! it will sleep until either a new message is sent via the [`Sender`] half, @@ -637,14 +639,17 @@ impl Receiver { } /// Checks if this channel contains a message that this receiver has not yet - /// seen. The new value is not marked as seen. + /// seen. The current values will not be marked as seen. /// - /// Although this method is called `has_changed`, it does not check new - /// messages for equality, so this call will return true even if the new - /// message is equal to the old message. + /// Although this method is called `has_changed`, it does not check + /// messages for equality, so this call will return true even if the current + /// message is equal to the previous message. /// + /// # Errors + /// /// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__ - /// the latest value is seen. + /// the current value is seen. + /// /// # Examples /// /// ``` @@ -705,20 +710,23 @@ impl Receiver { self.version = current_version; } - /// Waits for a change notification, then marks the newest value as seen. + /// Waits for a change notification, then marks the current value as seen. /// - /// If the newest value in the channel has not yet been marked seen when + /// If the current value in the channel has not yet been marked seen when /// this method is called, the method marks that value seen and returns - /// immediately. If the newest value has already been marked seen, then the + /// immediately. If the current value has already been marked seen, then the /// method sleeps until a new message is sent by the [`Sender`] connected to /// this `Receiver`, or until the [`Sender`] is dropped. /// - /// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__ - /// the latest value is seen. /// /// For more information, see /// [*Change notifications*](self#change-notifications) in the module-level documentation. - /// + /// + /// # Errors + /// + /// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__ + /// the current value is seen. + /// /// # Cancel safety /// /// This method is cancel safe. If you use it as the event in a From 0815cc3551b7fbc70625833327457236ab70eac7 Mon Sep 17 00:00:00 2001 From: Daniel Sharifi Date: Thu, 4 Sep 2025 19:14:59 +0000 Subject: [PATCH 03/15] clippy: fix indentation in doc --- tokio/src/sync/watch.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 981c408445b..9847d4f2097 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -32,7 +32,7 @@ //! * [`Receiver::changed()`] returns: //! * `Ok(())` on receiving a new value. //! * `Err(`[`RecvError`](error::RecvError)`)` if the -//! channel has been closed __AND__ the current value is *seen*. +//! channel has been closed __AND__ the current value is *seen*. //! * If the current value is *unseen* when calling [`changed`], then //! [`changed`] will return immediately. If the current value is *seen*, then //! it will sleep until either a new message is sent via the [`Sender`] half, @@ -646,10 +646,10 @@ impl Receiver { /// message is equal to the previous message. /// /// # Errors - /// + /// /// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__ /// the current value is seen. - /// + /// /// # Examples /// /// ``` @@ -721,12 +721,12 @@ impl Receiver { /// /// For more information, see /// [*Change notifications*](self#change-notifications) in the module-level documentation. - /// + /// /// # Errors - /// + /// /// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__ /// the current value is seen. - /// + /// /// # Cancel safety /// /// This method is cancel safe. If you use it as the event in a From cd569bf6922fff74be55530d12abe813ca30f122 Mon Sep 17 00:00:00 2001 From: Daniel Sharifi Date: Thu, 4 Sep 2025 23:50:55 +0000 Subject: [PATCH 04/15] fix typo --- tokio/src/sync/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 9847d4f2097..af6fa16acf3 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -639,7 +639,7 @@ impl Receiver { } /// Checks if this channel contains a message that this receiver has not yet - /// seen. The current values will not be marked as seen. + /// seen. The current value will not be marked as seen. /// /// Although this method is called `has_changed`, it does not check /// messages for equality, so this call will return true even if the current From e6939dd32ef8e58c04912a4133e8911f4395004d Mon Sep 17 00:00:00 2001 From: Daniel Sharifi Date: Fri, 5 Sep 2025 00:10:14 +0000 Subject: [PATCH 05/15] Add example to `has_changed` for error case --- tokio/src/sync/watch.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index af6fa16acf3..c1d79066c6f 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -652,6 +652,7 @@ impl Receiver { /// /// # Examples /// + /// ## Basic usage /// ``` /// use tokio::sync::watch; /// @@ -666,9 +667,26 @@ impl Receiver { /// /// // The value has been marked as seen /// assert!(!rx.has_changed().unwrap()); + /// } + /// ``` + /// + /// ## Closed channel example + /// ``` + /// use tokio::sync::watch; /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = watch::channel("hello"); + /// tx.send("goodbye").unwrap(); /// drop(tx); - /// // The `tx` handle has been dropped + /// + /// // `has_changed` returns Ok(true) as the current value is not seen. + /// assert!(rx.has_changed().unwrap()); + /// + /// // Marks the current value as seen. + /// assert_eq!(*rx.borrow_and_update(), "goodbye"); + /// + /// // The `tx` handle has been dropped __AND__ the current value is seen. /// assert!(rx.has_changed().is_err()); /// } /// ``` From 552e952a621595d2f6e2c3219c9ae34fb6be099f Mon Sep 17 00:00:00 2001 From: Daniel Sharifi Date: Fri, 5 Sep 2025 16:26:05 +0000 Subject: [PATCH 06/15] update docs for `Ref` --- tokio/src/sync/watch.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index c1d79066c6f..805fc0752cf 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -200,7 +200,7 @@ impl<'a, T> Ref<'a, T> { /// Indicates if the borrowed value is considered as _changed_ since the last /// time it has been marked as seen. /// - /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed. + /// Unlike [`Receiver::has_changed()`], this method is not fallible. /// /// When borrowed from the [`Sender`] this function will always return `false`. /// @@ -220,10 +220,10 @@ impl<'a, T> Ref<'a, T> { /// // Drop the sender immediately, just for testing purposes. /// drop(tx); /// - /// // Even if the sender has already been dropped... - /// assert!(rx.has_changed().is_err()); - /// // ...the modified value is still readable and detected as changed. + /// // The modified value is still readable and detected as changed + /// // even if the sender has already been dropped. /// assert_eq!(*rx.borrow(), "goodbye"); + /// assert!(rx.has_changed().unwrap()); /// assert!(rx.borrow().has_changed()); /// /// // Read the changed value and mark it as seen. From 806b08f66c4f3cf028bda5a6d29fd993e39b2e52 Mon Sep 17 00:00:00 2001 From: Daniel Sharifi Date: Fri, 5 Sep 2025 21:25:10 +0000 Subject: [PATCH 07/15] rename variable to `current_version` --- tokio/src/sync/watch.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 805fc0752cf..e2c3e1a07ac 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -693,15 +693,15 @@ impl Receiver { pub fn has_changed(&self) -> Result { // Load the version from the state let state = self.shared.state.load(); - let new_version = state.version(); + let current_version = state.version(); - let last_value_is_seen = self.version == new_version; + let current_value_is_seen = self.version == current_version; let sender_has_dropped = state.is_closed(); - if sender_has_dropped && last_value_is_seen { + if sender_has_dropped && current_value_is_seen { Err(error::RecvError(())) } else { - Ok(!last_value_is_seen) + Ok(!current_value_is_seen) } } From 0c3059748bc1313688f8b1386d54af7ae6839a6b Mon Sep 17 00:00:00 2001 From: Daniel Sharifi Date: Fri, 19 Sep 2025 20:53:40 +0000 Subject: [PATCH 08/15] add section on differences and revert `has_changed` change --- tokio/src/sync/watch.rs | 91 ++++++++++++++++++++++++++++----------- tokio/tests/sync_watch.rs | 7 +-- 2 files changed, 67 insertions(+), 31 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 97117d41e8e..cb90c50f6c0 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -44,7 +44,27 @@ //! The current value at the time the [`Receiver`] is created is considered //! *seen*. //! -//! ## `borrow_and_update` versus `borrow` +//! ## [`changed`] versus [`has_changed`] +//! The [`Receiver`] half provides two methods for checking for changes +//! in the channel, [`has_changed`] and [`changed`]. +//! +//! * [`has_changed`] is a *synchronous* method that checks whether the current +//! value is seen or not and returns a boolean. This method does __not__ mark the +//! value as seen. +//! +//! * [`changed`] is an *asynchronous* method that will return once an unseen +//! value is in the channel. This method does mark the value as seen. +//! +//! Note there are two behavioral differences on when these two methods return +//! an error. +//! +//! - [`has_changed`] errors if and only if the channel is closed. +//! - [`changed`] errors if the channel has been closed __AND__ +//! the current value is seen. +//! +//! See the example below that shows how these methods have different fallibility. +//! +//! ## [`borrow_and_update`] versus [`borrow`] //! //! If the receiver intends to await notifications from [`changed`] in a loop, //! [`Receiver::borrow_and_update()`] should be preferred over @@ -86,6 +106,31 @@ //! # } //! ``` //! +//! Difference on fallibility of [`changed`] versus [`has_changed`]. +//! ``` +//! use tokio::sync::watch; +//! +//! #[tokio::main] +//! async fn main() { +//! let (tx, mut rx) = watch::channel("hello"); +//! tx.send("goodbye").unwrap(); +//! drop(tx); +//! +//! // `has_changed` does not mark the value as seen and errors +//! // since the channel is closed. +//! assert!(rx.has_changed().is_err()); +//! +//! // `changed` returns Ok since the value is not already marked as seen +//! // even if the channel is closed. +//! assert!(rx.changed().await.is_ok()); +//! +//! // The `changed` call above marks the value as seen. +//! // The next `changed` call now returns an error as the channel is closed +//! // AND the current value is seen. +//! assert!(rx.changed().await.is_err()); +//! } +//! ``` +//! //! # Closing //! //! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect @@ -104,6 +149,9 @@ //! [`Sender`]: crate::sync::watch::Sender //! [`Receiver`]: crate::sync::watch::Receiver //! [`changed`]: crate::sync::watch::Receiver::changed +//! [`has_changed`]: crate::sync::watch::Receiver::has_changed +//! [`borrow`]: crate::sync::watch::Receiver::borrow +//! [`borrow_and_update`]: crate::sync::watch::Receiver::borrow_and_update //! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed //! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow //! [`Receiver::borrow_and_update()`]: @@ -200,7 +248,7 @@ impl<'a, T> Ref<'a, T> { /// Indicates if the borrowed value is considered as _changed_ since the last /// time it has been marked as seen. /// - /// Unlike [`Receiver::has_changed()`], this method is not fallible. + /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed. /// /// When borrowed from the [`Sender`] this function will always return `false`. /// @@ -220,10 +268,10 @@ impl<'a, T> Ref<'a, T> { /// // Drop the sender immediately, just for testing purposes. /// drop(tx); /// - /// // The modified value is still readable and detected as changed - /// // even if the sender has already been dropped. + /// // Even if the sender has already been dropped... + /// assert!(rx.has_changed().is_err()); + /// // ...the modified value is still readable and detected as changed. /// assert_eq!(*rx.borrow(), "goodbye"); - /// assert!(rx.has_changed().unwrap()); /// assert!(rx.borrow().has_changed()); /// /// // Read the changed value and mark it as seen. @@ -647,8 +695,7 @@ impl Receiver { /// /// # Errors /// - /// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__ - /// the current value is seen. + /// Returns a [`RecvError`](error::RecvError) if and only if the channel has been closed. /// /// # Examples /// @@ -676,33 +723,25 @@ impl Receiver { /// /// #[tokio::main] /// async fn main() { - /// let (tx, mut rx) = watch::channel("hello"); + /// let (tx, rx) = watch::channel("hello"); /// tx.send("goodbye").unwrap(); - /// drop(tx); /// - /// // `has_changed` returns Ok(true) as the current value is not seen. - /// assert!(rx.has_changed().unwrap()); - /// - /// // Marks the current value as seen. - /// assert_eq!(*rx.borrow_and_update(), "goodbye"); + /// drop(tx); /// - /// // The `tx` handle has been dropped __AND__ the current value is seen. + /// // The channel is closed /// assert!(rx.has_changed().is_err()); /// } /// ``` pub fn has_changed(&self) -> Result { // Load the version from the state let state = self.shared.state.load(); - let current_version = state.version(); - - let current_value_is_seen = self.version == current_version; - let sender_has_dropped = state.is_closed(); - - if sender_has_dropped && current_value_is_seen { - Err(error::RecvError(())) - } else { - Ok(!current_value_is_seen) + if state.is_closed() { + // All senders have dropped. + return Err(error::RecvError(())); } + let new_version = state.version(); + + Ok(self.version != new_version) } /// Marks the state as changed. @@ -741,8 +780,8 @@ impl Receiver { /// /// # Errors /// - /// This method returns a [`RecvError`](error::RecvError) if and only if all [`Sender`]s - /// are dropped. + /// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__ + /// the current value is seen. /// /// # Cancel safety /// diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index eee347d5a8c..63efd5d986c 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -470,11 +470,8 @@ async fn has_changed_does_not_error_on_closed_channel_with_unseen_value() { drop(tx); - let has_changed = rx - .has_changed() - .expect("`has_changed` call does not return an error if the last value is not seen."); - - assert!(has_changed, "Latest value is not seen"); + rx.has_changed() + .expect_err("`has_changed` call returns an error even if the last value is not seen."); } #[tokio::test] From 6ccdff30cee00972075cf6f3daa907bb341baf3b Mon Sep 17 00:00:00 2001 From: Daniel Sharifi Date: Fri, 19 Sep 2025 21:01:50 +0000 Subject: [PATCH 09/15] remove trailing whitespace --- tokio/src/sync/watch.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index cb90c50f6c0..cb064d49ea7 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -115,15 +115,15 @@ //! let (tx, mut rx) = watch::channel("hello"); //! tx.send("goodbye").unwrap(); //! drop(tx); -//! +//! //! // `has_changed` does not mark the value as seen and errors //! // since the channel is closed. //! assert!(rx.has_changed().is_err()); -//! +//! //! // `changed` returns Ok since the value is not already marked as seen //! // even if the channel is closed. //! assert!(rx.changed().await.is_ok()); -//! +//! //! // The `changed` call above marks the value as seen. //! // The next `changed` call now returns an error as the channel is closed //! // AND the current value is seen. From ff423e983282fa6be7d9c43958927207d561714e Mon Sep 17 00:00:00 2001 From: Daniel Sharifi Date: Fri, 19 Sep 2025 22:02:21 +0000 Subject: [PATCH 10/15] update error message for tests --- tokio/tests/sync_watch.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 63efd5d986c..fe4b7cd1207 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -452,7 +452,7 @@ async fn sender_closed_is_cooperative() { } #[tokio::test] -async fn changed_does_not_error_on_closed_channel_with_unseen_value() { +async fn changed_succeeds_on_closed_channel_with_unseen_value() { let (tx, mut rx) = watch::channel("A"); tx.send("B").unwrap(); @@ -464,24 +464,24 @@ async fn changed_does_not_error_on_closed_channel_with_unseen_value() { } #[tokio::test] -async fn has_changed_does_not_error_on_closed_channel_with_unseen_value() { - let (tx, rx) = watch::channel("A"); - tx.send("B").unwrap(); - +async fn changed_errors_on_closed_channel_with_seen_value() { + let (tx, mut rx) = watch::channel("A"); drop(tx); - rx.has_changed() - .expect_err("`has_changed` call returns an error even if the last value is not seen."); + rx.changed() + .await + .expect_err("`has_changed` iff channel is closed."); } #[tokio::test] -async fn changed_errors_on_closed_channel_with_seen_value() { - let (tx, mut rx) = watch::channel("A"); +async fn has_changed_errors_on_closed_channel_with_unseen_value() { + let (tx, rx) = watch::channel("A"); + tx.send("B").unwrap(); + drop(tx); - rx.changed() - .await - .expect_err("`changed` call returns an error if the last value is seen."); + rx.has_changed() + .expect_err("`has_changed` iff channel is closed."); } #[tokio::test] From 2e020e2eb5d252e9345b3acf90d28eb5a211468c Mon Sep 17 00:00:00 2001 From: Daniel Sharifi Date: Fri, 19 Sep 2025 22:03:05 +0000 Subject: [PATCH 11/15] use sync test --- tokio/tests/sync_watch.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index fe4b7cd1207..687b97897a4 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -473,8 +473,8 @@ async fn changed_errors_on_closed_channel_with_seen_value() { .expect_err("`has_changed` iff channel is closed."); } -#[tokio::test] -async fn has_changed_errors_on_closed_channel_with_unseen_value() { +#[test] +fn has_changed_errors_on_closed_channel_with_unseen_value() { let (tx, rx) = watch::channel("A"); tx.send("B").unwrap(); @@ -484,8 +484,8 @@ async fn has_changed_errors_on_closed_channel_with_unseen_value() { .expect_err("`has_changed` iff channel is closed."); } -#[tokio::test] -async fn has_changed_errors_on_closed_channel_with_seen_value() { +#[test] +fn has_changed_errors_on_closed_channel_with_seen_value() { let (tx, rx) = watch::channel("A"); drop(tx); From a1b84b013c30671aff0ed0599fed877387119539 Mon Sep 17 00:00:00 2001 From: Daniel Sharifi <40335219+DSharifi@users.noreply.github.com> Date: Sat, 20 Sep 2025 12:58:12 +0000 Subject: [PATCH 12/15] Apply suggestions from code review Add empty between section title and text Co-authored-by: Qi --- tokio/src/sync/watch.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index cb064d49ea7..2b016d96fca 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -45,6 +45,7 @@ //! *seen*. //! //! ## [`changed`] versus [`has_changed`] + //! The [`Receiver`] half provides two methods for checking for changes //! in the channel, [`has_changed`] and [`changed`]. //! @@ -700,6 +701,7 @@ impl Receiver { /// # Examples /// /// ## Basic usage + /// ``` /// use tokio::sync::watch; /// @@ -718,6 +720,7 @@ impl Receiver { /// ``` /// /// ## Closed channel example + /// ``` /// use tokio::sync::watch; /// From 9628a65467324d116638f3c4f297ce3d208443b0 Mon Sep 17 00:00:00 2001 From: Daniel Sharifi Date: Sat, 20 Sep 2025 13:02:32 +0000 Subject: [PATCH 13/15] add missing backslash from suggestion --- tokio/src/sync/watch.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 2b016d96fca..6ffa53c75df 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -45,7 +45,7 @@ //! *seen*. //! //! ## [`changed`] versus [`has_changed`] - +//! //! The [`Receiver`] half provides two methods for checking for changes //! in the channel, [`has_changed`] and [`changed`]. //! @@ -701,7 +701,7 @@ impl Receiver { /// # Examples /// /// ## Basic usage - + /// /// ``` /// use tokio::sync::watch; /// @@ -720,7 +720,7 @@ impl Receiver { /// ``` /// /// ## Closed channel example - + /// /// ``` /// use tokio::sync::watch; /// From fa6e2801ae4d8aa8fb15eb3796ed490a5fbc0a81 Mon Sep 17 00:00:00 2001 From: Daniel Sharifi <40335219+DSharifi@users.noreply.github.com> Date: Sat, 20 Sep 2025 13:04:19 +0000 Subject: [PATCH 14/15] Apply `expec_err` message suggestions for `changed` Co-authored-by: Qi --- tokio/tests/sync_watch.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 687b97897a4..212443598e7 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -460,7 +460,7 @@ async fn changed_succeeds_on_closed_channel_with_unseen_value() { rx.changed() .await - .expect("`changed` call does not return an error if the last value is not seen."); + .expect("should not return error as long as the current value is not seen"); } #[tokio::test] @@ -470,7 +470,7 @@ async fn changed_errors_on_closed_channel_with_seen_value() { rx.changed() .await - .expect_err("`has_changed` iff channel is closed."); + .expect_err("should return error if the tx is closed and the current value is seen"); } #[test] From 8b8e4780dd0dbe497cc414c99c032301cdbb7d35 Mon Sep 17 00:00:00 2001 From: Daniel Sharifi Date: Sat, 20 Sep 2025 13:07:09 +0000 Subject: [PATCH 15/15] update `expec_err` messages for has_changed tests --- tokio/tests/sync_watch.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 212443598e7..48e4106841f 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -481,7 +481,7 @@ fn has_changed_errors_on_closed_channel_with_unseen_value() { drop(tx); rx.has_changed() - .expect_err("`has_changed` iff channel is closed."); + .expect_err("`has_changed` returns an error if and only if channel is closed. Even if the current value is not seen."); } #[test] @@ -490,7 +490,7 @@ fn has_changed_errors_on_closed_channel_with_seen_value() { drop(tx); rx.has_changed() - .expect_err("`has_changed` call returns an error if the last value is seen."); + .expect_err("`has_changed` returns an error if and only if channel is closed."); } #[tokio::test]