Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
client/network/protocol: Track number of non-primary connections
Introduce the Prometheus metric "non_primary_connections_opened_total"
and increase it each time a new connection to a peer is opened in case
one already existed (non-primary).
  • Loading branch information
mxinden committed Jun 18, 2020
commit 12a1d7bc15e99919cf1bd7b5a5f7fd23fe865799
7 changes: 3 additions & 4 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use sp_runtime::traits::{
use sp_arithmetic::traits::SaturatedConversion;
use message::{BlockAnnounce, Message};
use message::generic::{Message as GenericMessage, ConsensusMessage, Roles};
use prometheus_endpoint::{Registry, Gauge, GaugeVec, HistogramVec, PrometheusError, Opts, register, U64};
use prometheus_endpoint::{Registry, Gauge, GaugeVec, PrometheusError, Opts, register, U64};
use sync::{ChainSync, SyncState};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
Expand Down Expand Up @@ -379,7 +379,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
metrics_registry: Option<&Registry>,
boot_node_ids: Arc<HashSet<PeerId>>,
use_new_block_requests_protocol: bool,
queue_size_report: Option<HistogramVec>,
) -> error::Result<(Protocol<B, H>, sc_peerset::PeersetHandle)> {
let info = chain.info();
let sync = ChainSync::new(
Expand Down Expand Up @@ -408,8 +407,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
versions,
build_status_message(&config, &chain),
peerset,
queue_size_report,
);
metrics_registry,
)?;

let mut legacy_equiv_by_name = HashMap::new();

Expand Down
65 changes: 53 additions & 12 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,16 @@ use libp2p::swarm::{
};
use log::{debug, error, trace, warn};
use parking_lot::RwLock;
use prometheus_endpoint::HistogramVec;
use prometheus_endpoint::{
Counter,
HistogramOpts,
HistogramVec,
Opts,
PrometheusError,
Registry,
U64,
register,
};
use rand::distributions::{Distribution as _, Uniform};
use smallvec::SmallVec;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -150,8 +159,8 @@ pub struct GenericProto {
/// Events to produce from `poll()`.
events: VecDeque<NetworkBehaviourAction<NotifsHandlerIn, GenericProtoOut>>,

/// If `Some`, report the message queue sizes on this `Histogram`.
queue_size_report: Option<HistogramVec>,
/// Prometheus metrics to report e.g. message queue size.
metrics: Option<Metrics>,
}

/// Identifier for a delay firing.
Expand Down Expand Up @@ -324,23 +333,52 @@ pub enum GenericProtoOut {
},
}

struct Metrics {
notifications_queue_size: HistogramVec,
non_primary_connections_opened: Counter<U64>,
}

impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Metrics {
notifications_queue_size: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_notifications_queues_size",
"Total size of all the notification queues"
),
buckets: vec![0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 511.0, 512.0],
},
&["protocol"]
)?, registry)?,
non_primary_connections_opened: register(Counter::new(
"sub_libp2p_non_primary_connections_opened_total",
"Total number of non-primary connections opened"
)?, registry)?,
})
}
}

impl GenericProto {
/// Creates a `CustomProtos`.
///
/// The `queue_size_report` is an optional Prometheus metric that can report the size of the
/// messages queue. If passed, it must have one label for the protocol name.
pub fn new(
local_peer_id: PeerId,
protocol: impl Into<ProtocolId>,
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
queue_size_report: Option<HistogramVec>,
) -> Self {
metric_registry: Option<&Registry>,
) -> Result<Self, PrometheusError> {
let legacy_handshake_message = Arc::new(RwLock::new(handshake_message));
let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message);

GenericProto {
let metrics = if let Some(registry) = metric_registry {
Some(Metrics::register(registry)?)
} else {
None
};

Ok(GenericProto {
local_peer_id,
legacy_protocol,
notif_protocols: Vec::new(),
Expand All @@ -351,8 +389,8 @@ impl GenericProto {
incoming: SmallVec::new(),
next_incoming_index: sc_peerset::IncomingIndex(0),
events: VecDeque::new(),
queue_size_report,
}
metrics,
})
}

/// Registers a new notifications protocol.
Expand Down Expand Up @@ -888,7 +926,7 @@ impl NetworkBehaviour for GenericProto {
NotifsHandlerProto::new(
self.legacy_protocol.clone(),
self.notif_protocols.clone(),
self.queue_size_report.clone()
self.metrics.as_ref().map(|m| m.notifications_queue_size.clone()),
)
}

Expand Down Expand Up @@ -1291,6 +1329,9 @@ impl NetworkBehaviour for GenericProto {
let event = GenericProtoOut::CustomProtocolOpen { peer_id: source, received_handshake };
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
} else {
if let Some(metrics) = &self.metrics {
metrics.non_primary_connections_opened.inc();
}
Comment on lines +1332 to +1334
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would strongly prefer to report that in service.rs with all the other metrics.
In my opinion we should when it is possible not infest the code with metrics and instead have metrics on the side of the main logic.

debug!(target: "sub-libp2p", "Secondary connection opened custom protocol.");
}
}
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/protocol/generic_proto/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
});

let behaviour = CustomProtoWithAddr {
inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], vec![], peerset, None),
inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], vec![], peerset, None).unwrap(),
addrs: addrs
.iter()
.enumerate()
Expand Down
12 changes: 0 additions & 12 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
params.metrics_registry.as_ref(),
boot_node_ids.clone(),
params.network_config.use_new_block_requests_protocol,
metrics.as_ref().map(|m| m.notifications_queues_size.clone()),
)?;

// Build the swarm.
Expand Down Expand Up @@ -862,7 +861,6 @@ struct Metrics {
listeners_local_addresses: Gauge<U64>,
listeners_errors_total: Counter<U64>,
network_per_sec_bytes: GaugeVec<U64>,
notifications_queues_size: HistogramVec,
notifications_sizes: HistogramVec,
notifications_streams_closed_total: CounterVec<U64>,
notifications_streams_opened_total: CounterVec<U64>,
Expand Down Expand Up @@ -967,16 +965,6 @@ impl Metrics {
),
&["direction"]
)?, registry)?,
notifications_queues_size: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_notifications_queues_size",
"Total size of all the notification queues"
),
buckets: vec![0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 511.0, 512.0],
},
&["protocol"]
)?, registry)?,
notifications_sizes: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
Expand Down