Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit d68cfd7

Browse files
expensesbkchr
andauthored
Fix the metered unbounded sender/recievers (#6246)
* Fix the metered unbounded sender/recievers * Use a counter instead * Update client/rpc/src/system/tests.rs * Add an is_terminated check * Add FusedStream impl Co-authored-by: Bastian Köcher <[email protected]>
1 parent 37bbc55 commit d68cfd7

File tree

5 files changed

+41
-23
lines changed

5 files changed

+41
-23
lines changed

client/rpc/src/system/tests.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use sc_network::{self, PeerId};
2222
use sc_network::config::Role;
2323
use substrate_test_runtime_client::runtime::Block;
2424
use assert_matches::assert_matches;
25-
use futures::{prelude::*, channel::mpsc};
25+
use futures::prelude::*;
26+
use sp_utils::mpsc::tracing_unbounded;
2627
use std::thread;
2728

2829
struct Status {
@@ -46,7 +47,7 @@ impl Default for Status {
4647
fn api<T: Into<Option<Status>>>(sync: T) -> System<Block> {
4748
let status = sync.into().unwrap_or_default();
4849
let should_have_peers = !status.is_dev;
49-
let (tx, rx) = mpsc::unbounded();
50+
let (tx, rx) = tracing_unbounded("rpc_system_tests");
5051
thread::spawn(move || {
5152
futures::executor::block_on(rx.for_each(move |request| {
5253
match request {

client/service/src/client/client.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,14 @@ use sc_client_api::{
8080
KeyIterator, CallExecutor, ExecutorProvider, ProofProvider,
8181
cht, UsageProvider
8282
};
83-
use sp_utils::mpsc::tracing_unbounded;
83+
use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
8484
use sp_blockchain::Error;
8585
use prometheus_endpoint::Registry;
8686
use super::{
8787
genesis,
8888
light::{call_executor::prove_execution, fetcher::ChangesProof},
8989
block_rules::{BlockRules, LookupResult as BlockLookupResult},
9090
};
91-
use futures::channel::mpsc;
9291
use rand::Rng;
9392

9493
#[cfg(feature="test-helpers")]
@@ -99,7 +98,7 @@ use {
9998
super::call_executor::LocalCallExecutor,
10099
};
101100

102-
type NotificationSinks<T> = Mutex<Vec<mpsc::UnboundedSender<T>>>;
101+
type NotificationSinks<T> = Mutex<Vec<TracingUnboundedSender<T>>>;
103102

104103
/// Substrate Client
105104
pub struct Client<B, E, Block, RA> where Block: BlockT {

client/service/src/status_sinks.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ impl<T> futures::Future for YieldAfter<T> {
109109
mod tests {
110110
use super::StatusSinks;
111111
use futures::prelude::*;
112-
use futures::channel::mpsc;
112+
use sp_utils::mpsc::tracing_unbounded;
113113
use std::time::Duration;
114114
use std::task::Poll;
115115

@@ -120,7 +120,7 @@ mod tests {
120120

121121
let mut status_sinks = StatusSinks::new();
122122

123-
let (tx, rx) = mpsc::unbounded();
123+
let (tx, rx) = tracing_unbounded("status_sink_test");
124124
status_sinks.push(Duration::from_millis(100), tx);
125125

126126
let mut val_order = 5;

primitives/utils/src/metrics.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use prometheus::{
2323
core::{ AtomicU64, GenericGauge, GenericCounter },
2424
};
2525

26-
#[cfg(features = "metered")]
27-
use prometheus::{core::GenericGaugeVec, Opts};
26+
#[cfg(feature = "metered")]
27+
use prometheus::{core::GenericCounterVec, Opts};
2828

2929

3030
lazy_static! {
@@ -37,9 +37,9 @@ lazy_static! {
3737
).expect("Creating of statics doesn't fail. qed");
3838
}
3939

40-
#[cfg(features = "metered")]
40+
#[cfg(feature = "metered")]
4141
lazy_static! {
42-
pub static ref UNBOUNDED_CHANNELS_COUNTER : GenericGaugeVec<AtomicU64> = GenericGaugeVec::new(
42+
pub static ref UNBOUNDED_CHANNELS_COUNTER : GenericCounterVec<AtomicU64> = GenericCounterVec::new(
4343
Opts::new("unbounded_channel_len", "Items in each mpsc::unbounded instance"),
4444
&["entity", "action"] // 'name of channel, send|received|dropped
4545
).expect("Creating of statics doesn't fail. qed");
@@ -52,7 +52,7 @@ pub fn register_globals(registry: &Registry) -> Result<(), PrometheusError> {
5252
registry.register(Box::new(TOKIO_THREADS_ALIVE.clone()))?;
5353
registry.register(Box::new(TOKIO_THREADS_TOTAL.clone()))?;
5454

55-
#[cfg(features = "metered")]
55+
#[cfg(feature = "metered")]
5656
registry.register(Box::new(UNBOUNDED_CHANNELS_COUNTER.clone()))?;
5757

5858
Ok(())

primitives/utils/src/mpsc.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! Features to meter unbounded channels
1919
20-
#[cfg(not(features = "metered"))]
20+
#[cfg(not(feature = "metered"))]
2121
mod inner {
2222
// just aliased, non performance implications
2323
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
@@ -31,22 +31,29 @@ mod inner {
3131
}
3232

3333

34-
#[cfg(features = "metered")]
34+
#[cfg(feature = "metered")]
3535
mod inner {
3636
//tracing implementation
3737
use futures::channel::mpsc::{self,
3838
UnboundedReceiver, UnboundedSender,
3939
TryRecvError, TrySendError, SendError
4040
};
41-
use futures::{sink::Sink, task::{Poll, Context}, stream::Stream};
41+
use futures::{sink::Sink, task::{Poll, Context}, stream::{Stream, FusedStream}};
4242
use std::pin::Pin;
4343
use crate::metrics::UNBOUNDED_CHANNELS_COUNTER;
4444

4545
/// Wrapper Type around `UnboundedSender` that increases the global
4646
/// measure when a message is added
47-
#[derive(Debug, Clone)]
47+
#[derive(Debug)]
4848
pub struct TracingUnboundedSender<T>(&'static str, UnboundedSender<T>);
4949

50+
// Strangely, deriving `Clone` requires that `T` is also `Clone`.
51+
impl<T> Clone for TracingUnboundedSender<T> {
52+
fn clone(&self) -> Self {
53+
Self(self.0, self.1.clone())
54+
}
55+
}
56+
5057
/// Wrapper Type around `UnboundedReceiver` that decreases the global
5158
/// measure when a message is polled
5259
#[derive(Debug)]
@@ -88,7 +95,7 @@ mod inner {
8895
/// Proxy function to mpsc::UnboundedSender
8996
pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
9097
self.1.unbounded_send(msg).map(|s|{
91-
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"send"]).incr();
98+
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"send"]).inc();
9299
s
93100
})
94101
}
@@ -104,13 +111,19 @@ mod inner {
104111
fn consume(&mut self) {
105112
// consume all items, make sure to reflect the updated count
106113
let mut count = 0;
107-
while let Ok(Some(..)) = self.try_next() {
108-
count += 1;
109-
}
114+
loop {
115+
if self.1.is_terminated() {
116+
break;
117+
}
110118

119+
match self.try_next() {
120+
Ok(Some(..)) => count += 1,
121+
_ => break
122+
}
123+
}
111124
// and discount the messages
112125
if count > 0 {
113-
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"dropped"]).incr_by(count);
126+
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"dropped"]).inc_by(count);
114127
}
115128

116129
}
@@ -127,7 +140,7 @@ mod inner {
127140
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
128141
self.1.try_next().map(|s| {
129142
if s.is_some() {
130-
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"received"]).incr();
143+
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"received"]).inc();
131144
}
132145
s
133146
})
@@ -153,7 +166,7 @@ mod inner {
153166
match Pin::new(&mut s.1).poll_next(cx) {
154167
Poll::Ready(msg) => {
155168
if msg.is_some() {
156-
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, "received"]).incr();
169+
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.0, "received"]).inc();
157170
}
158171
Poll::Ready(msg)
159172
}
@@ -164,6 +177,11 @@ mod inner {
164177
}
165178
}
166179

180+
impl<T> FusedStream for TracingUnboundedReceiver<T> {
181+
fn is_terminated(&self) -> bool {
182+
self.1.is_terminated()
183+
}
184+
}
167185

168186
impl<T> Sink<T> for TracingUnboundedSender<T> {
169187
type Error = SendError;

0 commit comments

Comments
 (0)