diff --git a/client/rpc/src/system/tests.rs b/client/rpc/src/system/tests.rs index 25ebd80953be6..7fe5cdc752a06 100644 --- a/client/rpc/src/system/tests.rs +++ b/client/rpc/src/system/tests.rs @@ -22,7 +22,8 @@ use sc_network::{self, PeerId}; use sc_network::config::Role; use substrate_test_runtime_client::runtime::Block; use assert_matches::assert_matches; -use futures::{prelude::*, channel::mpsc}; +use futures::prelude::*; +use sp_utils::mpsc::tracing_unbounded; use std::thread; struct Status { @@ -46,7 +47,7 @@ impl Default for Status { fn api>>(sync: T) -> System { let status = sync.into().unwrap_or_default(); let should_have_peers = !status.is_dev; - let (tx, rx) = mpsc::unbounded(); + let (tx, rx) = tracing_unbounded("rpc_system_tests"); thread::spawn(move || { futures::executor::block_on(rx.for_each(move |request| { match request { diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 77b3f065f43dd..a795c4377e292 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -80,7 +80,7 @@ use sc_client_api::{ KeyIterator, CallExecutor, ExecutorProvider, ProofProvider, cht, UsageProvider }; -use sp_utils::mpsc::tracing_unbounded; +use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded}; use sp_blockchain::Error; use prometheus_endpoint::Registry; use super::{ @@ -88,7 +88,6 @@ use super::{ light::{call_executor::prove_execution, fetcher::ChangesProof}, block_rules::{BlockRules, LookupResult as BlockLookupResult}, }; -use futures::channel::mpsc; use rand::Rng; #[cfg(feature="test-helpers")] @@ -99,7 +98,7 @@ use { super::call_executor::LocalCallExecutor, }; -type NotificationSinks = Mutex>>; +type NotificationSinks = Mutex>>; /// Substrate Client pub struct Client where Block: BlockT { diff --git a/client/service/src/status_sinks.rs b/client/service/src/status_sinks.rs index 4b1dce52f9a31..c3de468ab06fd 100644 --- a/client/service/src/status_sinks.rs +++ b/client/service/src/status_sinks.rs @@ -109,7 +109,7 @@ impl futures::Future for YieldAfter { mod tests { use super::StatusSinks; use futures::prelude::*; - use futures::channel::mpsc; + use sp_utils::mpsc::tracing_unbounded; use std::time::Duration; use std::task::Poll; @@ -120,7 +120,7 @@ mod tests { let mut status_sinks = StatusSinks::new(); - let (tx, rx) = mpsc::unbounded(); + let (tx, rx) = tracing_unbounded("status_sink_test"); status_sinks.push(Duration::from_millis(100), tx); let mut val_order = 5; diff --git a/primitives/utils/src/metrics.rs b/primitives/utils/src/metrics.rs index b991ce016b115..a66589b5927fe 100644 --- a/primitives/utils/src/metrics.rs +++ b/primitives/utils/src/metrics.rs @@ -23,8 +23,8 @@ use prometheus::{ core::{ AtomicU64, GenericGauge, GenericCounter }, }; -#[cfg(features = "metered")] -use prometheus::{core::GenericGaugeVec, Opts}; +#[cfg(feature = "metered")] +use prometheus::{core::GenericCounterVec, Opts}; lazy_static! { @@ -37,9 +37,9 @@ lazy_static! { ).expect("Creating of statics doesn't fail. qed"); } -#[cfg(features = "metered")] +#[cfg(feature = "metered")] lazy_static! { - pub static ref UNBOUNDED_CHANNELS_COUNTER : GenericGaugeVec = GenericGaugeVec::new( + pub static ref UNBOUNDED_CHANNELS_COUNTER : GenericCounterVec = GenericCounterVec::new( Opts::new("unbounded_channel_len", "Items in each mpsc::unbounded instance"), &["entity", "action"] // 'name of channel, send|received|dropped ).expect("Creating of statics doesn't fail. qed"); @@ -52,7 +52,7 @@ pub fn register_globals(registry: &Registry) -> Result<(), PrometheusError> { registry.register(Box::new(TOKIO_THREADS_ALIVE.clone()))?; registry.register(Box::new(TOKIO_THREADS_TOTAL.clone()))?; - #[cfg(features = "metered")] + #[cfg(feature = "metered")] registry.register(Box::new(UNBOUNDED_CHANNELS_COUNTER.clone()))?; Ok(()) diff --git a/primitives/utils/src/mpsc.rs b/primitives/utils/src/mpsc.rs index 827195388f933..70baa006bdcdc 100644 --- a/primitives/utils/src/mpsc.rs +++ b/primitives/utils/src/mpsc.rs @@ -17,7 +17,7 @@ //! Features to meter unbounded channels -#[cfg(not(features = "metered"))] +#[cfg(not(feature = "metered"))] mod inner { // just aliased, non performance implications use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; @@ -31,22 +31,29 @@ mod inner { } -#[cfg(features = "metered")] +#[cfg(feature = "metered")] mod inner { //tracing implementation use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender, TryRecvError, TrySendError, SendError }; - use futures::{sink::Sink, task::{Poll, Context}, stream::Stream}; + use futures::{sink::Sink, task::{Poll, Context}, stream::{Stream, FusedStream}}; use std::pin::Pin; use crate::metrics::UNBOUNDED_CHANNELS_COUNTER; /// Wrapper Type around `UnboundedSender` that increases the global /// measure when a message is added - #[derive(Debug, Clone)] + #[derive(Debug)] pub struct TracingUnboundedSender(&'static str, UnboundedSender); + // Strangely, deriving `Clone` requires that `T` is also `Clone`. + impl Clone for TracingUnboundedSender { + fn clone(&self) -> Self { + Self(self.0, self.1.clone()) + } + } + /// Wrapper Type around `UnboundedReceiver` that decreases the global /// measure when a message is polled #[derive(Debug)] @@ -88,7 +95,7 @@ mod inner { /// Proxy function to mpsc::UnboundedSender pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { self.1.unbounded_send(msg).map(|s|{ - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"send"]).incr(); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"send"]).inc(); s }) } @@ -104,13 +111,19 @@ mod inner { fn consume(&mut self) { // consume all items, make sure to reflect the updated count let mut count = 0; - while let Ok(Some(..)) = self.try_next() { - count += 1; - } + loop { + if self.1.is_terminated() { + break; + } + match self.try_next() { + Ok(Some(..)) => count += 1, + _ => break + } + } // and discount the messages if count > 0 { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"dropped"]).incr_by(count); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"dropped"]).inc_by(count); } } @@ -127,7 +140,7 @@ mod inner { pub fn try_next(&mut self) -> Result, TryRecvError> { self.1.try_next().map(|s| { if s.is_some() { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"received"]).incr(); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"received"]).inc(); } s }) @@ -153,7 +166,7 @@ mod inner { match Pin::new(&mut s.1).poll_next(cx) { Poll::Ready(msg) => { if msg.is_some() { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, "received"]).incr(); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.0, "received"]).inc(); } Poll::Ready(msg) } @@ -164,6 +177,11 @@ mod inner { } } + impl FusedStream for TracingUnboundedReceiver { + fn is_terminated(&self) -> bool { + self.1.is_terminated() + } + } impl Sink for TracingUnboundedSender { type Error = SendError;