-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Runtime diagnostics for leaked messages in unbounded channels #12971
Changes from 14 commits
793a5fd
2b661e2
3144bd5
49720f2
c640cbf
972b452
1bb9602
69f2727
cacf5b9
82e1b1e
75a3389
94b7254
0d527bc
709efba
2706277
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -28,7 +28,7 @@ | |||||||
| //! # use sp_test_primitives::Block; | ||||||||
| //! # struct DummyLink; impl Link<Block> for DummyLink {} | ||||||||
| //! # let mut my_link = DummyLink; | ||||||||
| //! let (mut tx, mut rx) = buffered_link::<Block>(); | ||||||||
| //! let (mut tx, mut rx) = buffered_link::<Block>(100_000); | ||||||||
| //! tx.blocks_processed(0, 0, vec![]); | ||||||||
| //! | ||||||||
| //! // Calls `my_link.blocks_processed(0, 0, vec![])` when polled. | ||||||||
|
|
@@ -52,8 +52,10 @@ use super::BlockImportResult; | |||||||
| /// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and | ||||||||
| /// can be used to buffer commands, and the receiver can be used to poll said commands and transfer | ||||||||
| /// them to another link. | ||||||||
| pub fn buffered_link<B: BlockT>() -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) { | ||||||||
| let (tx, rx) = tracing_unbounded("mpsc_buffered_link"); | ||||||||
| pub fn buffered_link<B: BlockT>( | ||||||||
| queue_size_warning: i64, | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need signed integer here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's explained in the comment for a struct field: to avoid underflow if, due to the lack of ordering, the counter happens to go < 0.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Internally: yes. But public API doesn't need to be signed integer. This should have been
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm also wondering how much of a performance difference it actually makes using Relaxed ordering here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not that relaxed ordering makes sense in terms of performance, it's about not having to bother about synchronization of increments/decrements, why signed integer is used. Relaxed ordering is just a consequence of this decision, because more strong guarantees are not needed if we use the unsigned integer.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've looked into the issue, and as far as I understand it's impossible to guarantee that the counter is never decremented before it's incremented not relying on internals of
In order for Please correct me if I'm missing something. CC @bkchr
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you use The compiler should add some barrier that ensures that reads/writes are not reordered.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW @nazar-pc why aren't you just use a channel with a size of 0 and using try_send?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because I didn't see
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is a PR implementing exact queue size warning (#13117), but I'd like it to be reviewed by somebody with good understanding of concurrency, atomics, and memory order of operations. If you know who to invite for review, please invite them. |
||||||||
| ) -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) { | ||||||||
| let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning); | ||||||||
| let tx = BufferedLinkSender { tx }; | ||||||||
| let rx = BufferedLinkReceiver { rx: rx.fuse() }; | ||||||||
| (tx, rx) | ||||||||
|
|
@@ -175,7 +177,7 @@ mod tests { | |||||||
|
|
||||||||
| #[test] | ||||||||
| fn is_closed() { | ||||||||
| let (tx, rx) = super::buffered_link::<Block>(); | ||||||||
| let (tx, rx) = super::buffered_link::<Block>(1); | ||||||||
| assert!(!tx.is_closed()); | ||||||||
| drop(rx); | ||||||||
| assert!(tx.is_closed()); | ||||||||
|
|
||||||||
Uh oh!
There was an error while loading. Please reload this page.