Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
More work before weekend
  • Loading branch information
tomaka committed Jul 17, 2020
commit c90fe319a229b3397fdd8218ece8cd6d786ed90e
18 changes: 1 addition & 17 deletions client/network/src/protocol/generic_proto/handler/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ use futures::{
};
use log::{debug, error};
use parking_lot::{Mutex, RwLock};
use prometheus_endpoint::HistogramVec;
use std::{borrow::Cow, collections::HashSet, error, io, str, sync::Arc, task::{Context, Poll}};

/// Number of pending notifications in asynchronous contexts.
Expand Down Expand Up @@ -364,30 +363,17 @@ impl NotifsHandlerProto {
/// `list` is a list of notification protocols names, and the message to send as part of the
/// handshake. At the moment, the message is always the same whether we open a substream
/// ourselves or respond to handshake from the remote.
///
/// The `queue_size_report` is an optional Prometheus metric that can report the size of the
/// messages queue. If passed, it must have one label for the protocol name.
pub fn new(
legacy: RegisteredProtocol,
list: impl Into<Vec<(Cow<'static, [u8]>, Arc<RwLock<Vec<u8>>>)>>,
queue_size_report: Option<HistogramVec>
) -> Self {
let list = list.into();

let out_handlers = list
.clone()
.into_iter()
.map(|(proto_name, initial_message)| {
let queue_size_report = queue_size_report.as_ref().and_then(|qs| {
if let Ok(utf8) = str::from_utf8(&proto_name) {
Some(qs.with_label_values(&[utf8]))
} else {
log::warn!("Ignoring Prometheus metric because {:?} isn't UTF-8", proto_name);
None
}
});

(NotifsOutHandlerProto::new(proto_name, queue_size_report), initial_message)
(NotifsOutHandlerProto::new(proto_name), initial_message)
}).collect();

let in_handlers = list.clone()
Expand Down Expand Up @@ -585,8 +571,6 @@ impl ProtocolsHandler for NotifsHandler {
}
}

// TODO: check if legacy's ready as well

let message = match notifications_sink_rx.0.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) | Poll::Pending => {
Expand Down
36 changes: 1 addition & 35 deletions client/network/src/protocol/generic_proto/handler/notif_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use libp2p::swarm::{
NegotiatedSubstream,
};
use log::{debug, warn, error};
use prometheus_endpoint::Histogram;
use std::{borrow::Cow, collections::VecDeque, fmt, mem, pin::Pin, task::{Context, Poll}, time::Duration};
use wasm_timer::Instant;

Expand All @@ -56,17 +55,14 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
pub struct NotifsOutHandlerProto {
/// Name of the protocol to negotiate.
protocol_name: Cow<'static, [u8]>,
/// Optional Prometheus histogram to report message queue size variations.
queue_size_report: Option<Histogram>,
}

impl NotifsOutHandlerProto {
/// Builds a new [`NotifsOutHandlerProto`]. Will use the given protocol name for the
/// notifications substream.
pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>, queue_size_report: Option<Histogram>) -> Self {
pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>) -> Self {
NotifsOutHandlerProto {
protocol_name: protocol_name.into(),
queue_size_report,
}
}
}
Expand All @@ -82,7 +78,6 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto {
NotifsOutHandler {
protocol_name: self.protocol_name,
when_connection_open: Instant::now(),
queue_size_report: self.queue_size_report,
state: State::Disabled,
events_queue: VecDeque::new(),
peer_id: peer_id.clone(),
Expand All @@ -108,9 +103,6 @@ pub struct NotifsOutHandler {
/// When the connection with the remote has been successfully established.
when_connection_open: Instant,

/// Optional prometheus histogram to report message queue sizes variations.
queue_size_report: Option<Histogram>,

/// Queue of events to send to the outside.
///
/// This queue must only ever be modified to insert elements at the back, or remove the first
Expand Down Expand Up @@ -173,11 +165,6 @@ pub enum NotifsOutHandlerIn {

/// Disables the notifications substream for this node. This is the default state.
Disable,

/// Sends a message on the notifications substream. Ignored if the substream isn't open.
///
/// It is only valid to send this if the notifications substream has been enabled.
Send(Vec<u8>),
}

/// Event that can be emitted by a `NotifsOutHandler`.
Expand Down Expand Up @@ -324,27 +311,6 @@ impl ProtocolsHandler for NotifsOutHandler {
State::Poisoned => error!("☎️ Notifications handler in a poisoned state"),
}
}

NotifsOutHandlerIn::Send(msg) =>
if let State::Open { substream, .. } = &mut self.state {
if substream.push_message(msg).is_err() {
warn!(
target: "sub-libp2p",
"📞 Notifications queue with peer {} is full, dropped message (protocol: {:?})",
self.peer_id,
self.protocol_name,
);
}
if let Some(metric) = &self.queue_size_report {
metric.observe(substream.queue_len() as f64);
}
} else {
// This is an API misuse.
warn!(
target: "sub-libp2p",
"📞 Tried to send a notification on a disabled handler"
);
},
}
}

Expand Down
1 change: 1 addition & 0 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
}
};

// TODO:
let ready = sink.reserve_notification(todo!()).await;
Ok(NotificationsBufferSlot {
ready,
Expand Down