Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions client/rpc/src/system/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use sc_network::{self, PeerId};
use sc_network::config::Role;
use substrate_test_runtime_client::runtime::Block;
use assert_matches::assert_matches;
use futures::{prelude::*, channel::mpsc};
use futures::prelude::*;
use sp_utils::mpsc::tracing_unbounded;
use std::thread;

struct Status {
Expand All @@ -46,7 +47,7 @@ impl Default for Status {
fn api<T: Into<Option<Status>>>(sync: T) -> System<Block> {
let status = sync.into().unwrap_or_default();
let should_have_peers = !status.is_dev;
let (tx, rx) = mpsc::unbounded();
let (tx, rx) = tracing_unbounded("rpc_system_tests");
thread::spawn(move || {
futures::executor::block_on(rx.for_each(move |request| {
match request {
Expand Down
5 changes: 2 additions & 3 deletions client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,14 @@ use sc_client_api::{
KeyIterator, CallExecutor, ExecutorProvider, ProofProvider,
cht, UsageProvider
};
use sp_utils::mpsc::tracing_unbounded;
use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
use sp_blockchain::Error;
use prometheus_endpoint::Registry;
use super::{
genesis,
light::{call_executor::prove_execution, fetcher::ChangesProof},
block_rules::{BlockRules, LookupResult as BlockLookupResult},
};
use futures::channel::mpsc;
use rand::Rng;

#[cfg(feature="test-helpers")]
Expand All @@ -99,7 +98,7 @@ use {
super::call_executor::LocalCallExecutor,
};

type NotificationSinks<T> = Mutex<Vec<mpsc::UnboundedSender<T>>>;
type NotificationSinks<T> = Mutex<Vec<TracingUnboundedSender<T>>>;

/// Substrate Client
pub struct Client<B, E, Block, RA> where Block: BlockT {
Expand Down
4 changes: 2 additions & 2 deletions client/service/src/status_sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl<T> futures::Future for YieldAfter<T> {
mod tests {
use super::StatusSinks;
use futures::prelude::*;
use futures::channel::mpsc;
use sp_utils::mpsc::tracing_unbounded;
use std::time::Duration;
use std::task::Poll;

Expand All @@ -120,7 +120,7 @@ mod tests {

let mut status_sinks = StatusSinks::new();

let (tx, rx) = mpsc::unbounded();
let (tx, rx) = tracing_unbounded("status_sink_test");
status_sinks.push(Duration::from_millis(100), tx);

let mut val_order = 5;
Expand Down
10 changes: 5 additions & 5 deletions primitives/utils/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use prometheus::{
core::{ AtomicU64, GenericGauge, GenericCounter },
};

#[cfg(features = "metered")]
use prometheus::{core::GenericGaugeVec, Opts};
#[cfg(feature = "metered")]
use prometheus::{core::GenericCounterVec, Opts};


lazy_static! {
Expand All @@ -37,9 +37,9 @@ lazy_static! {
).expect("Creating of statics doesn't fail. qed");
}

#[cfg(features = "metered")]
#[cfg(feature = "metered")]
lazy_static! {
pub static ref UNBOUNDED_CHANNELS_COUNTER : GenericGaugeVec<AtomicU64> = GenericGaugeVec::new(
pub static ref UNBOUNDED_CHANNELS_COUNTER : GenericCounterVec<AtomicU64> = GenericCounterVec::new(
Opts::new("unbounded_channel_len", "Items in each mpsc::unbounded instance"),
&["entity", "action"] // 'name of channel, send|received|dropped
).expect("Creating of statics doesn't fail. qed");
Expand All @@ -52,7 +52,7 @@ pub fn register_globals(registry: &Registry) -> Result<(), PrometheusError> {
registry.register(Box::new(TOKIO_THREADS_ALIVE.clone()))?;
registry.register(Box::new(TOKIO_THREADS_TOTAL.clone()))?;

#[cfg(features = "metered")]
#[cfg(feature = "metered")]
registry.register(Box::new(UNBOUNDED_CHANNELS_COUNTER.clone()))?;

Ok(())
Expand Down
40 changes: 29 additions & 11 deletions primitives/utils/src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Features to meter unbounded channels

#[cfg(not(features = "metered"))]
#[cfg(not(feature = "metered"))]
mod inner {
// just aliased, non performance implications
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
Expand All @@ -31,22 +31,29 @@ mod inner {
}


#[cfg(features = "metered")]
#[cfg(feature = "metered")]
mod inner {
//tracing implementation
use futures::channel::mpsc::{self,
UnboundedReceiver, UnboundedSender,
TryRecvError, TrySendError, SendError
};
use futures::{sink::Sink, task::{Poll, Context}, stream::Stream};
use futures::{sink::Sink, task::{Poll, Context}, stream::{Stream, FusedStream}};
use std::pin::Pin;
use crate::metrics::UNBOUNDED_CHANNELS_COUNTER;

/// Wrapper Type around `UnboundedSender` that increases the global
/// measure when a message is added
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct TracingUnboundedSender<T>(&'static str, UnboundedSender<T>);

// Strangely, deriving `Clone` requires that `T` is also `Clone`.
impl<T> Clone for TracingUnboundedSender<T> {
fn clone(&self) -> Self {
Self(self.0, self.1.clone())
}
}

/// Wrapper Type around `UnboundedReceiver` that decreases the global
/// measure when a message is polled
#[derive(Debug)]
Expand Down Expand Up @@ -88,7 +95,7 @@ mod inner {
/// Proxy function to mpsc::UnboundedSender
pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
self.1.unbounded_send(msg).map(|s|{
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"send"]).incr();
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"send"]).inc();
s
})
}
Expand All @@ -104,13 +111,19 @@ mod inner {
fn consume(&mut self) {
// consume all items, make sure to reflect the updated count
let mut count = 0;
while let Ok(Some(..)) = self.try_next() {
count += 1;
}
loop {
if self.1.is_terminated() {
break;
}

match self.try_next() {
Ok(Some(..)) => count += 1,
_ => break
}
}
// and discount the messages
if count > 0 {
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"dropped"]).incr_by(count);
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"dropped"]).inc_by(count);
}

}
Expand All @@ -127,7 +140,7 @@ mod inner {
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
self.1.try_next().map(|s| {
if s.is_some() {
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"received"]).incr();
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"received"]).inc();
}
s
})
Expand All @@ -153,7 +166,7 @@ mod inner {
match Pin::new(&mut s.1).poll_next(cx) {
Poll::Ready(msg) => {
if msg.is_some() {
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, "received"]).incr();
UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.0, "received"]).inc();
}
Poll::Ready(msg)
}
Expand All @@ -164,6 +177,11 @@ mod inner {
}
}

impl<T> FusedStream for TracingUnboundedReceiver<T> {
fn is_terminated(&self) -> bool {
self.1.is_terminated()
}
}

impl<T> Sink<T> for TracingUnboundedSender<T> {
type Error = SendError;
Expand Down