diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 5967613b98e46..f7c26f6fa3d27 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -17,10 +17,11 @@ use crate::{ config::{ProtocolId, Role}, block_requests, light_client_handler, finality_requests, peer_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, - protocol::{message::{self, Roles}, CustomMessageOutcome, Protocol}, - Event, ObservedRole, DhtEvent, ExHashT, + protocol::{message::{self, Roles}, CustomMessageOutcome, NotificationsSink, Protocol}, + ObservedRole, DhtEvent, ExHashT, }; +use bytes::Bytes; use codec::Encode as _; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; @@ -98,11 +99,53 @@ pub enum BehaviourOut { request_duration: Duration, }, - /// Any event represented by the [`Event`] enum. + /// Opened a substream with the given node with the given notifications protocol. /// - /// > **Note**: The [`Event`] enum contains the events that are available through the public - /// > API of the library. - Event(Event), + /// The protocol is always one of the notification protocols that have been registered. + NotificationStreamOpened { + /// Node we opened the substream with. + remote: PeerId, + /// The concerned protocol. Each protocol uses a different substream. + engine_id: ConsensusEngineId, + /// Object that permits sending notifications to the peer. + notifications_sink: NotificationsSink, + /// Role of the remote. + role: ObservedRole, + }, + + /// The [`NotificationsSink`] object used to send notifications with the given peer must be + /// replaced with a new one. + /// + /// This event is typically emitted when a transport-level connection is closed and we fall + /// back to a secondary connection. + NotificationStreamReplaced { + /// Id of the peer we are connected to. + remote: PeerId, + /// The concerned protocol. Each protocol uses a different substream. + engine_id: ConsensusEngineId, + /// Replacement for the previous [`NotificationsSink`]. + notifications_sink: NotificationsSink, + }, + + /// Closed a substream with the given node. Always matches a corresponding previous + /// `NotificationStreamOpened` message. + NotificationStreamClosed { + /// Node we closed the substream with. + remote: PeerId, + /// The concerned protocol. Each protocol uses a different substream. + engine_id: ConsensusEngineId, + }, + + /// Received one or more messages from the given node using the given protocol. + NotificationsReceived { + /// Node we received the message from. + remote: PeerId, + /// Concerned protocol and associated message. + messages: Vec<(ConsensusEngineId, Bytes)>, + }, + + /// Event generated by a DHT. + Dht(DhtEvent), } impl Behaviour { @@ -184,12 +227,12 @@ impl Behaviour { let list = self.substrate.register_notifications_protocol(engine_id, protocol_name, handshake_message); for (remote, roles) in list { let role = reported_roles_to_observed_role(&self.role, remote, roles); - let ev = Event::NotificationStreamOpened { + self.events.push_back(BehaviourOut::NotificationStreamOpened { remote: remote.clone(), engine_id, role, - }; - self.events.push_back(BehaviourOut::Event(ev)); + notifications_sink: todo!(), // TODO: uuuuggghhh + }); } } @@ -278,26 +321,34 @@ Behaviour { CustomMessageOutcome::FinalityProofRequest { target, block_hash, request } => { self.finality_proof_requests.send_request(&target, block_hash, request); }, - CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles } => { + CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles, notifications_sink } => { let role = reported_roles_to_observed_role(&self.role, &remote, roles); for engine_id in protocols { - self.events.push_back(BehaviourOut::Event(Event::NotificationStreamOpened { + self.events.push_back(BehaviourOut::NotificationStreamOpened { remote: remote.clone(), engine_id, role: role.clone(), - })); + notifications_sink: notifications_sink.clone(), + }); } }, + CustomMessageOutcome::NotificationStreamReplaced { remote, protocols, notifications_sink } => + for engine_id in protocols { + self.events.push_back(BehaviourOut::NotificationStreamReplaced { + remote: remote.clone(), + engine_id, + notifications_sink: notifications_sink.clone(), + }); + }, CustomMessageOutcome::NotificationStreamClosed { remote, protocols } => for engine_id in protocols { - self.events.push_back(BehaviourOut::Event(Event::NotificationStreamClosed { + self.events.push_back(BehaviourOut::NotificationStreamClosed { remote: remote.clone(), engine_id, - })); + }); }, CustomMessageOutcome::NotificationsReceived { remote, messages } => { - let ev = Event::NotificationsReceived { remote, messages }; - self.events.push_back(BehaviourOut::Event(ev)); + self.events.push_back(BehaviourOut::NotificationsReceived { remote, messages }); }, CustomMessageOutcome::PeerNewBest(peer_id, number) => { self.light_client_handler.update_best_block(&peer_id, number); @@ -393,16 +444,16 @@ impl NetworkBehaviourEventProcess self.substrate.add_discovered_nodes(iter::once(peer_id)); } DiscoveryOut::ValueFound(results) => { - self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValueFound(results)))); + self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueFound(results))); } DiscoveryOut::ValueNotFound(key) => { - self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValueNotFound(key)))); + self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueNotFound(key))); } DiscoveryOut::ValuePut(key) => { - self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePut(key)))); + self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePut(key))); } DiscoveryOut::ValuePutFailed(key) => { - self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key)))); + self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePutFailed(key))); } DiscoveryOut::RandomKademliaStarted(protocols) => { for protocol in protocols { diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 626cb04389236..3b47dfd09cc1c 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -67,7 +67,7 @@ pub mod message; pub mod event; pub mod sync; -pub use generic_proto::LegacyConnectionKillError; +pub use generic_proto::{NotificationsSink, Ready, LegacyConnectionKillError}; const REQUEST_TIMEOUT_SEC: u64 = 40; /// Interval at which we perform time based maintenance @@ -939,7 +939,12 @@ impl Protocol { } /// Called on receipt of a status message via the legacy protocol on the first connection between two peers. - pub fn on_peer_connected(&mut self, who: PeerId, status: message::Status) -> CustomMessageOutcome { + pub fn on_peer_connected( + &mut self, + who: PeerId, + status: message::Status, + notifications_sink: NotificationsSink, + ) -> CustomMessageOutcome { trace!(target: "sync", "New peer {} {:?}", who, status); let _protocol_version = { if self.context_data.peers.contains_key(&who) { @@ -1051,10 +1056,12 @@ impl Protocol { remote: who, protocols: self.protocol_name_by_engine.keys().cloned().collect(), roles: info.roles, + notifications_sink, } } - /// Send a notification to the given peer we're connected to. + // TODO: remove + /*/// Send a notification to the given peer we're connected to. /// /// Doesn't do anything if we don't have a notifications substream for that protocol with that /// peer. @@ -1078,7 +1085,7 @@ impl Protocol { engine_id ); } - } + }*/ /// Registers a new notifications protocol. /// @@ -1829,7 +1836,18 @@ pub enum CustomMessageOutcome { JustificationImport(Origin, B::Hash, NumberFor, Justification), FinalityProofImport(Origin, B::Hash, NumberFor, Vec), /// Notification protocols have been opened with a remote. - NotificationStreamOpened { remote: PeerId, protocols: Vec, roles: Roles }, + NotificationStreamOpened { + remote: PeerId, + protocols: Vec, + roles: Roles, + notifications_sink: NotificationsSink + }, + /// The [`NotificationsSink`] of some notification protocols need an update. + NotificationStreamReplaced { + remote: PeerId, + protocols: Vec, + notifications_sink: NotificationsSink, + }, /// Notification protocols have been closed with a remote. NotificationStreamClosed { remote: PeerId, protocols: Vec }, /// Messages have been received on one or more notifications protocols. @@ -1994,9 +2012,10 @@ impl NetworkBehaviour for Protocol { }; let outcome = match event { - GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, .. } => { + GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, notifications_sink, .. } => { match as Decode>::decode(&mut &received_handshake[..]) { - Ok(GenericMessage::Status(handshake)) => self.on_peer_connected(peer_id, handshake), + Ok(GenericMessage::Status(handshake)) => + self.on_peer_connected(peer_id, handshake, notifications_sink), Ok(msg) => { debug!( target: "sync", @@ -2020,6 +2039,13 @@ impl NetworkBehaviour for Protocol { } } } + GenericProtoOut::CustomProtocolReplaced { peer_id, notifications_sink, .. } => { + CustomMessageOutcome::NotificationStreamReplaced { + remote: peer_id, + protocols: self.protocol_name_by_engine.keys().cloned().collect(), + notifications_sink, + } + }, GenericProtoOut::CustomProtocolClosed { peer_id, .. } => { self.on_peer_disconnected(peer_id) }, diff --git a/client/network/src/protocol/generic_proto.rs b/client/network/src/protocol/generic_proto.rs index cf8434d8bceff..18130c455f30e 100644 --- a/client/network/src/protocol/generic_proto.rs +++ b/client/network/src/protocol/generic_proto.rs @@ -21,7 +21,7 @@ //! network, then performs the Substrate protocol handling on top. pub use self::behaviour::{GenericProto, GenericProtoOut}; -pub use self::handler::LegacyConnectionKillError; +pub use self::handler::{NotificationsSink, Ready, LegacyConnectionKillError}; mod behaviour; mod handler; diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 0e56b03b7ad29..69f5a28e760e6 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -15,8 +15,10 @@ // along with Substrate. If not, see . use crate::config::ProtocolId; -use crate::protocol::generic_proto::handler::{NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn}; -use crate::protocol::generic_proto::upgrade::RegisteredProtocol; +use crate::protocol::generic_proto::{ + handler::{NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn}, + upgrade::RegisteredProtocol +}; use bytes::BytesMut; use fnv::FnvHashMap; @@ -189,7 +191,7 @@ enum PeerState { /// We may still have ongoing traffic with that peer, but it should cease shortly. Disabled { /// The connections that are currently open for custom protocol traffic. - open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>, + open: SmallVec<[(ConnectionId, NotificationsSink); crate::MAX_CONNECTIONS_PER_PEER]>, /// If `Some`, any dial attempts to this peer are delayed until the given `Instant`. banned_until: Option, }, @@ -199,7 +201,7 @@ enum PeerState { /// but should get disconnected in a few seconds. DisabledPendingEnable { /// The connections that are currently open for custom protocol traffic. - open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>, + open: SmallVec<[(ConnectionId, NotificationsSink); crate::MAX_CONNECTIONS_PER_PEER]>, /// When to enable this remote. References an entry in `delays`. timer: DelayId, /// When the `timer` will trigger. @@ -210,7 +212,7 @@ enum PeerState { /// enabled state. Enabled { /// The connections that are currently open for custom protocol traffic. - open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>, + open: SmallVec<[(ConnectionId, NotificationsSink); crate::MAX_CONNECTIONS_PER_PEER]>, }, /// We received an incoming connection from this peer and forwarded that @@ -227,15 +229,15 @@ impl PeerState { self.get_open().is_some() } - /// Returns the connection ID of the first established connection + /// Returns the [`NotificationsSink`] of the first established connection /// that is open for custom protocol traffic. - fn get_open(&self) -> Option { + fn get_open(&self) -> Option<&NotificationsSink> { match self { PeerState::Disabled { open, .. } | PeerState::DisabledPendingEnable { open, .. } | PeerState::Enabled { open, .. } => if !open.is_empty() { - Some(open[0]) + Some(&open[0].1) } else { None } @@ -284,9 +286,24 @@ pub enum GenericProtoOut { /// Handshake that was sent to us. /// This is normally a "Status" message, but this is out of the concern of this code. received_handshake: Vec, + /// Object that permits sending notifications to the peer. + notifications_sink: NotificationsSink, + }, + + /// The [`NotificationsSink`] object used to send notifications with the given peer must be + /// replaced with a new one. + /// + /// This event is typically emitted when a transport-level connection is closed and we fall + /// back to a secondary connection. + CustomProtocolReplaced { + /// Id of the peer we are connected to. + peer_id: PeerId, + /// Replacement for the previous [`NotificationsSink`]. + notifications_sink: NotificationsSink, }, - /// Closed a custom protocol with the remote. + /// Closed a custom protocol with the remote. The existing [`NotificationsSink`] should + /// be dropped. CustomProtocolClosed { /// Id of the peer we were connected to. peer_id: PeerId, @@ -538,14 +555,14 @@ impl GenericProto { message: impl Into>, encoded_fallback_message: Vec, ) { - let conn = match self.peers.get(target).and_then(|p| p.get_open()) { + let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) { None => { debug!(target: "sub-libp2p", "Tried to sent notification to {:?} without an open channel.", target); return }, - Some(conn) => conn + Some(sink) => sink }; trace!( @@ -555,16 +572,11 @@ impl GenericProto { str::from_utf8(&protocol_name) ); trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); - - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: target.clone(), - handler: NotifyHandler::One(conn), - event: NotifsHandlerIn::SendNotification { - message: message.into(), - encoded_fallback_message, - protocol_name, - }, - }); + notifs_sink.send_sync_notification( + &protocol_name, + encoded_fallback_message, + message + ); } /// Sends a message to a peer. @@ -574,25 +586,19 @@ impl GenericProto { /// Also note that even we have a valid open substream, it may in fact be already closed /// without us knowing, in which case the packet will not be received. pub fn send_packet(&mut self, target: &PeerId, message: Vec) { - let conn = match self.peers.get(target).and_then(|p| p.get_open()) { + let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) { None => { debug!(target: "sub-libp2p", "Tried to sent packet to {:?} without an open channel.", target); return } - Some(conn) => conn + Some(sink) => sink }; trace!(target: "sub-libp2p", "External API => Packet for {:?}", target); trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: target.clone(), - handler: NotifyHandler::One(conn), - event: NotifsHandlerIn::SendLegacy { - message, - } - }); + notifs_sink.send_legacy(message); } /// Returns the state of the peerset manager, for debugging purposes. @@ -985,7 +991,7 @@ impl NetworkBehaviour for GenericProto { // i.e. there is no connection that is open for custom protocols, // in which case `CustomProtocolClosed` was already emitted. let closed = open.is_empty(); - open.retain(|c| c != conn); + open.retain(|(c, _)| c != conn); if open.is_empty() && !closed { debug!(target: "sub-libp2p", "External API <= Closed({})", peer_id); let event = GenericProtoOut::CustomProtocolClosed { @@ -995,6 +1001,7 @@ impl NetworkBehaviour for GenericProto { self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } + // TODO: generate CustomProtocolChanged event here } _ => {} } @@ -1142,7 +1149,7 @@ impl NetworkBehaviour for GenericProto { let last = match mem::replace(entry.get_mut(), PeerState::Poisoned) { PeerState::Enabled { mut open } => { - if let Some(pos) = open.iter().position(|c| c == &connection) { + if let Some(pos) = open.iter().position(|(c, _)| c == &connection) { open.remove(pos); } else { debug_assert!(false); @@ -1176,7 +1183,7 @@ impl NetworkBehaviour for GenericProto { last }, PeerState::Disabled { mut open, banned_until } => { - if let Some(pos) = open.iter().position(|c| c == &connection) { + if let Some(pos) = open.iter().position(|(c, _)| c == &connection) { open.remove(pos); } else { debug_assert!(false); @@ -1199,7 +1206,7 @@ impl NetworkBehaviour for GenericProto { timer, timer_deadline } => { - if let Some(pos) = open.iter().position(|c| c == &connection) { + if let Some(pos) = open.iter().position(|(c, _)| c == &connection) { open.remove(pos); } else { debug_assert!(false); @@ -1234,11 +1241,12 @@ impl NetworkBehaviour for GenericProto { }; self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } else { + // TODO: emit CustomProtocolChanged event here debug!(target: "sub-libp2p", "Secondary connection closed custom protocol."); } } - NotifsHandlerOut::Open { endpoint, received_handshake } => { + NotifsHandlerOut::Open { endpoint, received_handshake, notifications_sink } => { debug!(target: "sub-libp2p", "Handler({:?}) => Endpoint {:?} open for custom protocols.", source, endpoint); @@ -1248,8 +1256,8 @@ impl NetworkBehaviour for GenericProto { Some(PeerState::DisabledPendingEnable { ref mut open, .. }) | Some(PeerState::Disabled { ref mut open, .. }) => { let first = open.is_empty(); - if !open.iter().any(|c| *c == connection) { - open.push(connection); + if !open.iter().any(|(c, _)| *c == connection) { + open.push((connection, notifications_sink.clone())); } else { error!( target: "sub-libp2p", @@ -1269,7 +1277,11 @@ impl NetworkBehaviour for GenericProto { if first { debug!(target: "sub-libp2p", "External API <= Open({:?})", source); - let event = GenericProtoOut::CustomProtocolOpen { peer_id: source, received_handshake }; + let event = GenericProtoOut::CustomProtocolOpen { + peer_id: source, + received_handshake, + notifications_sink + }; self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } else { diff --git a/client/network/src/protocol/generic_proto/handler.rs b/client/network/src/protocol/generic_proto/handler.rs index 3b4469a872598..818cf87545af2 100644 --- a/client/network/src/protocol/generic_proto/handler.rs +++ b/client/network/src/protocol/generic_proto/handler.rs @@ -15,7 +15,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -pub use self::group::{NotifsHandlerProto, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut}; + +pub use self::group::{ + NotificationsSink, Ready, NotifsHandlerProto, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut +}; pub use self::legacy::ConnectionKillError as LegacyConnectionKillError; mod group; diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 3403f7dd823d2..726307037b294 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -63,10 +63,20 @@ use libp2p::swarm::{ SubstreamProtocol, NegotiatedSubstream, }; +use futures::{ + channel::mpsc, + lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard}, + prelude::* +}; use log::{debug, error}; -use parking_lot::RwLock; -use prometheus_endpoint::HistogramVec; -use std::{borrow::Cow, error, io, str, sync::Arc, task::{Context, Poll}}; +use parking_lot::{Mutex, RwLock}; +use std::{borrow::Cow, collections::HashSet, error, io, str, sync::Arc, task::{Context, Poll}}; + +/// Number of pending notifications in asynchronous contexts. +/// See [`NotificationsSink::reserve_notification`] for context. +const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8; +/// Number of pending notifications in synchronous contexts. +const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 512; /// Implements the `IntoProtocolsHandler` trait of libp2p. /// @@ -107,6 +117,16 @@ pub struct NotifsHandler { /// we push the corresponding index here and process them when the handler /// gets enabled/disabled. pending_in: Vec, + + /// If `Some`, contains the two `Receiver`s connected to the [`NotificationsSink`] that has + /// been sent out. The notifications to send out can be pulled from this receivers. + /// We use two different channels in order to have two different channel sizes, but from the + /// receiving point of view, the two channels are the same. + /// The receivers are fused in case the user drops the [`NotificationsSink`] entirely. + notifications_sink_rx: Option<( + stream::Fuse>, + stream::Fuse> + )>, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -140,6 +160,7 @@ impl IntoProtocolsHandler for NotifsHandlerProto { legacy: self.legacy.into_handler(remote_peer_id, connected_point), enabled: EnabledState::Initial, pending_in: Vec::new(), + notifications_sink_rx: None, } } } @@ -152,32 +173,6 @@ pub enum NotifsHandlerIn { /// The node should stop using custom protocols. Disable, - - /// Sends a message through the custom protocol substream. - /// - /// > **Note**: This must **not** be a `ConsensusMessage`, `Transactions`, or - /// > `BlockAnnounce` message. - SendLegacy { - /// The message to send. - message: Vec, - }, - - /// Sends a notifications message. - SendNotification { - /// Name of the protocol for the message. - /// - /// Must match one of the registered protocols. For backwards-compatibility reasons, if - /// the remote doesn't support this protocol, we use the legacy substream. - protocol_name: Cow<'static, [u8]>, - - /// Message to send on the legacy substream if the protocol isn't available. - /// - /// This corresponds to what you would have sent with `SendLegacy`. - encoded_fallback_message: Vec, - - /// The message to send. - message: Vec, - }, } /// Event that can be emitted by a `NotifsHandler`. @@ -190,6 +185,8 @@ pub enum NotifsHandlerOut { /// Handshake that was sent to us. /// This is normally a "Status" message, but this out of the concern of this code. received_handshake: Vec, + /// How notifications can be sent to this node. + notifications_sink: NotificationsSink, }, /// The connection is closed for custom protocols. @@ -227,19 +224,148 @@ pub enum NotifsHandlerOut { }, } +/// Sink connected directly to the node background task. Allows sending notifications to the peer. +/// +/// Can be cloned in order to obtain multiple references to the same peer. +#[derive(Debug, Clone)] +pub struct NotificationsSink { + inner: Arc, +} + +#[derive(Debug)] +struct NotificationsSinkInner { + /// Sender to use in asynchronous contexts. Uses an asynchronous mutex. + async_channel: FuturesMutex>, + /// Sender to use in synchronous contexts. Uses a synchronous mutex. + /// This channel has a large capacity and is meant to be used in contexts where + /// back-pressure cannot be properly exerted. + /// It will be removed in a future version. + sync_channel: Mutex>, + /// Notifications protocol names that have been registered. + valid_protocol_names: HashSet>, +} + +/// Message emitted through the [`NotificationsSink`] and processed by the background task +/// dedicated to the peer. +#[derive(Debug)] +enum NotificationsSinkMessage { + /// Message emitted by [`NotificationsSink::send_legacy`]. + Legacy { + message: Vec, + }, + + /// Message emitted by [`NotificationsSink::reserve_notification`] and + /// [`NotificationsSink::write_notification_now`]. + Notification { + protocol_name: Vec, + encoded_fallback_message: Vec, + message: Vec, + }, + + /// Must close the connection. + ForceClose, +} + +impl NotificationsSink { + /// Sends a message to the peer using the legacy substream. + /// + /// If too many messages are already buffered, the message is silently discarded and the + /// connection to the peer will be closed shortly after. + /// + /// This method will be removed in a future version. + pub fn send_legacy<'a>(&'a self, message: impl Into>) { + let mut lock = self.inner.sync_channel.lock(); + let result = lock.try_send(NotificationsSinkMessage::Legacy { + message: message.into() + }); + + if result.is_err() { + // Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the + // buffer, and therefore that `try_send` will succeed. + let _result2 = lock.clone().try_send(NotificationsSinkMessage::ForceClose); + debug_assert!(_result2.is_ok()); + } + } + + /// Sends a notification to the peer. + /// + /// If too many messages are already buffered, the notification is silently discarded and the + /// connection to the peer will be closed shortly after. + /// + /// This method will be removed in a future version. + pub fn send_sync_notification<'a>( + &'a self, + protocol_name: &[u8], + encoded_fallback_message: impl Into>, + message: impl Into> + ) { + if !self.inner.valid_protocol_names.contains(protocol_name) { + // TODO: report this? + return; + } + + let mut lock = self.inner.sync_channel.lock(); + let result = lock.try_send(NotificationsSinkMessage::Notification { + protocol_name: protocol_name.to_owned(), + encoded_fallback_message: encoded_fallback_message.into(), + message: message.into() + }); + + if result.is_err() { + // Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the + // buffer, and therefore that `try_send` will succeed. + let _result2 = lock.clone().try_send(NotificationsSinkMessage::ForceClose); + debug_assert!(_result2.is_ok()); + } + } + + /// Wait until the remote is ready to accept a notification. + pub async fn reserve_notification<'a>(&'a self, protocol_name: &[u8]) -> Ready<'a> { + // TODO: + /*if !self.inner.valid_protocol_names.contains(protocol_name) { + // TODO: report this? + return; + }*/ + + let mut lock = self.inner.async_channel.lock().await; + // TODO: check result + let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await; + // TODO: check protocol name validity + Ready { protocol_name: protocol_name.to_owned(), lock } + } +} + +/// Notification slot is reserved and the notification can actually be sent. +#[must_use] +#[derive(Debug)] +pub struct Ready<'a> { + /// Guarded channel. The channel inside is guaranteed to not be full. + lock: FuturesMutexGuard<'a, mpsc::Sender>, + /// Name of the protocol. Should match one of the protocols passed at initialization. + protocol_name: Vec, +} + +impl<'a> Ready<'a> { + /// Consumes this slots reservation and actually queues the notification. + pub fn send(mut self, notification: impl Into>) { + // TODO: check result + self.lock.start_send(NotificationsSinkMessage::Notification { + protocol_name: self.protocol_name, + encoded_fallback_message: todo!(), // TODO: + message: notification.into(), + }); + } +} + impl NotifsHandlerProto { /// Builds a new handler. /// /// `list` is a list of notification protocols names, and the message to send as part of the /// handshake. At the moment, the message is always the same whether we open a substream /// ourselves or respond to handshake from the remote. - /// - /// The `queue_size_report` is an optional Prometheus metric that can report the size of the - /// messages queue. If passed, it must have one label for the protocol name. pub fn new( legacy: RegisteredProtocol, list: impl Into, Arc>>)>>, - queue_size_report: Option ) -> Self { let list = list.into(); @@ -247,16 +373,7 @@ impl NotifsHandlerProto { .clone() .into_iter() .map(|(proto_name, initial_message)| { - let queue_size_report = queue_size_report.as_ref().and_then(|qs| { - if let Ok(utf8) = str::from_utf8(&proto_name) { - Some(qs.with_label_values(&[utf8])) - } else { - log::warn!("Ignoring Prometheus metric because {:?} isn't UTF-8", proto_name); - None - } - }); - - (NotifsOutHandlerProto::new(proto_name, queue_size_report), initial_message) + (NotifsOutHandlerProto::new(proto_name), initial_message) }).collect(); let in_handlers = list.clone() @@ -363,24 +480,6 @@ impl ProtocolsHandler for NotifsHandler { self.in_handlers[num].0.inject_event(NotifsInHandlerIn::Refuse); } }, - NotifsHandlerIn::SendLegacy { message } => - self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }), - NotifsHandlerIn::SendNotification { message, encoded_fallback_message, protocol_name } => { - for (handler, _) in &mut self.out_handlers { - if handler.protocol_name() != &protocol_name[..] { - continue; - } - - if handler.is_open() { - handler.inject_event(NotifsOutHandlerIn::Send(message)); - return; - } - } - - self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { - message: encoded_fallback_message, - }); - }, } } @@ -461,6 +560,61 @@ impl ProtocolsHandler for NotifsHandler { ) -> Poll< ProtocolsHandlerEvent > { + if let Some(notifications_sink_rx) = &mut self.notifications_sink_rx { + 'poll_notifs_sink: loop { + // Before we poll the notifications sink receiver, check that all the notification + // channels are ready to send a message. + for (out_handler, _) in &self.out_handlers { + match out_handler.poll_ready(cx) { + Poll::Ready(_) => {}, + Poll::Pending => break, + } + } + + let message = match notifications_sink_rx.0.poll_next_unpin(cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) | Poll::Pending => { + match notifications_sink_rx.0.poll_next_unpin(cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) | Poll::Pending => break, + } + }, + }; + + match message { + NotificationsSinkMessage::Legacy { message } => { + self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { + message + }); + } + NotificationsSinkMessage::Notification { + protocol_name, + encoded_fallback_message, + message + } => { + for (handler, _) in &mut self.out_handlers { + if handler.protocol_name() != &protocol_name[..] { + continue; + } + + if handler.is_open() { + handler.inject_event(NotifsOutHandlerIn::Send(message)); + continue 'poll_notifs_sink; + } + } + + self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { + message: encoded_fallback_message, + }); + } + NotificationsSinkMessage::ForceClose => { + //return Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::B(err))) + todo!() + } + } + } + } + if let Poll::Ready(ev) = self.legacy.poll(cx) { return match ev { ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } => @@ -468,14 +622,39 @@ impl ProtocolsHandler for NotifsHandler { protocol: protocol.map_upgrade(EitherUpgrade::B), info: None, }), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, received_handshake, .. }) => + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { + endpoint, + received_handshake, + .. + }) => { + let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); + let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); + let notifications_sink = NotificationsSink { + inner: Arc::new(NotificationsSinkInner { + async_channel: FuturesMutex::new(async_tx), + sync_channel: Mutex::new(sync_tx), + valid_protocol_names: self.out_handlers + .iter().map(|(h, _)| h.protocol_name().to_owned()).collect(), + }), + }; + + debug_assert!(self.notifications_sink_rx.is_none()); + self.notifications_sink_rx = Some((async_rx.fuse(), sync_rx.fuse())); + Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Open { endpoint, received_handshake } - )), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => + NotifsHandlerOut::Open { endpoint, received_handshake, notifications_sink } + )) + }, + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => { + // We consciously drop the receivers despite notifications being potentially + // still buffered up. + debug_assert!(self.notifications_sink_rx.is_some()); + self.notifications_sink_rx = None; + Poll::Ready(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::Closed { endpoint, reason } - )), + )) + }, ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => Poll::Ready(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::CustomMessage { message } diff --git a/client/network/src/protocol/generic_proto/handler/legacy.rs b/client/network/src/protocol/generic_proto/handler/legacy.rs index 71d6175f06674..a0f49f0a558df 100644 --- a/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -206,6 +206,9 @@ pub enum LegacyProtoHandlerIn { Disable, /// Sends a message through a custom protocol substream. + /// + /// Closes the substream if the buffer is full. + // TODO: actually do it ^ SendCustomMessage { /// The message to send. message: Vec, @@ -565,7 +568,6 @@ impl ProtocolsHandler for LegacyProtoHandler { } } - #[inline] fn inject_dial_upgrade_error(&mut self, _: (), err: ProtocolsHandlerUpgrErr) { let is_severe = match err { ProtocolsHandlerUpgrErr::Upgrade(_) => true, diff --git a/client/network/src/protocol/generic_proto/handler/notif_out.rs b/client/network/src/protocol/generic_proto/handler/notif_out.rs index 6b97ad67e34c6..3e218ec9df5ee 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_out.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_out.rs @@ -34,7 +34,6 @@ use libp2p::swarm::{ NegotiatedSubstream, }; use log::{debug, warn, error}; -use prometheus_endpoint::Histogram; use std::{borrow::Cow, collections::VecDeque, fmt, mem, pin::Pin, task::{Context, Poll}, time::Duration}; use wasm_timer::Instant; @@ -56,17 +55,14 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5); pub struct NotifsOutHandlerProto { /// Name of the protocol to negotiate. protocol_name: Cow<'static, [u8]>, - /// Optional Prometheus histogram to report message queue size variations. - queue_size_report: Option, } impl NotifsOutHandlerProto { /// Builds a new [`NotifsOutHandlerProto`]. Will use the given protocol name for the /// notifications substream. - pub fn new(protocol_name: impl Into>, queue_size_report: Option) -> Self { + pub fn new(protocol_name: impl Into>) -> Self { NotifsOutHandlerProto { protocol_name: protocol_name.into(), - queue_size_report, } } } @@ -82,7 +78,6 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto { NotifsOutHandler { protocol_name: self.protocol_name, when_connection_open: Instant::now(), - queue_size_report: self.queue_size_report, state: State::Disabled, events_queue: VecDeque::new(), peer_id: peer_id.clone(), @@ -108,9 +103,6 @@ pub struct NotifsOutHandler { /// When the connection with the remote has been successfully established. when_connection_open: Instant, - /// Optional prometheus histogram to report message queue sizes variations. - queue_size_report: Option, - /// Queue of events to send to the outside. /// /// This queue must only ever be modified to insert elements at the back, or remove the first @@ -173,11 +165,6 @@ pub enum NotifsOutHandlerIn { /// Disables the notifications substream for this node. This is the default state. Disable, - - /// Sends a message on the notifications substream. Ignored if the substream isn't open. - /// - /// It is only valid to send this if the notifications substream has been enabled. - Send(Vec), } /// Event that can be emitted by a `NotifsOutHandler`. @@ -216,6 +203,16 @@ impl NotifsOutHandler { pub fn protocol_name(&self) -> &[u8] { &self.protocol_name } + + // TODO: docs + pub fn poll_ready(&self, cx: &mut Context) -> Poll { + if let State::Open { substream, .. } = &self.state { + substream.poll_ready_unpin(cx).map(|()| true) + } else { + Poll::Ready(false) + } + } + } impl ProtocolsHandler for NotifsOutHandler { @@ -314,27 +311,6 @@ impl ProtocolsHandler for NotifsOutHandler { State::Poisoned => error!("☎️ Notifications handler in a poisoned state"), } } - - NotifsOutHandlerIn::Send(msg) => - if let State::Open { substream, .. } = &mut self.state { - if substream.push_message(msg).is_err() { - warn!( - target: "sub-libp2p", - "📞 Notifications queue with peer {} is full, dropped message (protocol: {:?})", - self.peer_id, - self.protocol_name, - ); - } - if let Some(metric) = &self.queue_size_report { - metric.observe(substream.queue_len() as f64); - } - } else { - // This is an API misuse. - warn!( - target: "sub-libp2p", - "📞 Tried to send a notification on a disabled handler" - ); - }, } } diff --git a/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/client/network/src/protocol/generic_proto/upgrade/notifications.rs index efcd0a4c8fb7d..399b2108fecda 100644 --- a/client/network/src/protocol/generic_proto/upgrade/notifications.rs +++ b/client/network/src/protocol/generic_proto/upgrade/notifications.rs @@ -43,8 +43,6 @@ use unsigned_varint::codec::UviBytes; /// Maximum allowed size of the two handshake messages, in bytes. const MAX_HANDSHAKE_SIZE: usize = 1024; -/// Maximum number of buffered messages before we refuse to accept more. -const MAX_PENDING_MESSAGES: usize = 512; /// Upgrade that accepts a substream, sends back a status message, then becomes a unidirectional /// stream of messages. @@ -93,10 +91,6 @@ pub struct NotificationsOutSubstream { /// Substream where to send messages. #[pin] socket: Framed>>>, - /// Queue of messages waiting to be sent. - messages_queue: VecDeque>, - /// If true, we need to flush `socket`. - need_flush: bool, } impl NotificationsIn { @@ -272,80 +266,34 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, Ok((handshake, NotificationsOutSubstream { socket: Framed::new(socket, UviBytes::default()), - messages_queue: VecDeque::with_capacity(MAX_PENDING_MESSAGES), - need_flush: false, })) }) } } -impl NotificationsOutSubstream { - /// Returns the number of items in the queue, capped to `u32::max_value()`. - pub fn queue_len(&self) -> u32 { - u32::try_from(self.messages_queue.len()).unwrap_or(u32::max_value()) - } - - /// Push a message to the queue of messages. - /// - /// This has the same effect as the `Sink::start_send` implementation. - pub fn push_message(&mut self, item: Vec) -> Result<(), NotificationsOutError> { - if self.messages_queue.len() >= MAX_PENDING_MESSAGES { - return Err(NotificationsOutError::Clogged); - } - - self.messages_queue.push_back(item); - Ok(()) - } -} - impl Sink> for NotificationsOutSubstream where TSubstream: AsyncRead + AsyncWrite + Unpin, { type Error = NotificationsOutError; fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> { - Poll::Ready(Ok(())) + let mut this = self.project(); + Sink::poll_ready(this.socket.as_mut(), cx) } fn start_send(mut self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { - self.push_message(item) + let mut this = self.project(); + Sink::start_send(this.socket.as_mut(), cx) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); - - while !this.messages_queue.is_empty() { - match Sink::poll_ready(this.socket.as_mut(), cx) { - Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))), - Poll::Ready(Ok(())) => { - let msg = this.messages_queue.pop_front() - .expect("checked for !is_empty above; qed"); - Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg))?; - *this.need_flush = true; - }, - Poll::Pending => return Poll::Pending, - } - } - - if *this.need_flush { - match Sink::poll_flush(this.socket.as_mut(), cx) { - Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))), - Poll::Ready(Ok(())) => *this.need_flush = false, - Poll::Pending => return Poll::Pending, - } - } - - Poll::Ready(Ok(())) + Sink::poll_flush(this.socket.as_mut(), cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - ready!(Sink::poll_flush(self.as_mut(), cx))?; - let this = self.project(); - match Sink::poll_close(this.socket, cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(err)) => Poll::Ready(Err(From::from(err))), - Poll::Pending => Poll::Pending, - } + let mut this = self.project(); + Sink::poll_close(this.socket.as_mut(), cx) } } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 7d4135de6b9ed..52fd519cf6122 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -38,7 +38,7 @@ use crate::{ }, on_demand_layer::AlwaysBadChecker, light_client_handler, block_requests, finality_requests, - protocol::{self, event::Event, LegacyConnectionKillError, sync::SyncState, PeerInfo, Protocol}, + protocol::{self, event::Event, LegacyConnectionKillError, NotificationsSink, Ready, sync::SyncState, PeerInfo, Protocol}, transport, ReputationChange, }; use futures::prelude::*; @@ -50,7 +50,8 @@ use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent, protocols_handle use log::{error, info, trace, warn}; use parking_lot::Mutex; use prometheus_endpoint::{ - register, Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64, + register, Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramOpts, HistogramVec, Opts, + PrometheusError, Registry, U64, }; use sc_peerset::PeersetHandle; use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link}; @@ -61,7 +62,7 @@ use sp_runtime::{ use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use std::{ borrow::{Borrow, Cow}, - collections::HashSet, + collections::{HashMap, HashSet}, fs, marker::PhantomData, num:: NonZeroUsize, @@ -95,6 +96,12 @@ pub struct NetworkService { peerset: PeersetHandle, /// Channel that sends messages to the actual worker. to_worker: TracingUnboundedSender>, + /// For each peer and protocol combination, an object that allows sending notifications to + /// that peer. Updated by the [`NetworkWorker`]. + peers_notifications_sinks: Arc>>, + /// Field extracted from the [`Metrics`] struct and necessary to report the + /// notifications-related metrics. + notifications_sizes_metric: Option, /// Marker to pin the `H` generic. Serves no purpose except to not break backwards /// compatibility. _marker: PhantomData, @@ -342,6 +349,7 @@ impl NetworkWorker { } let external_addresses = Arc::new(Mutex::new(Vec::new())); + let peers_notifications_sinks = Arc::new(Mutex::new(HashMap::new())); let service = Arc::new(NetworkService { bandwidth, @@ -351,6 +359,9 @@ impl NetworkWorker { peerset: peerset_handle, local_peer_id, to_worker, + peers_notifications_sinks: peers_notifications_sinks.clone(), + notifications_sizes_metric: + metrics.as_ref().map(|metrics| metrics.notifications_sizes.clone()), _marker: PhantomData, }); @@ -364,6 +375,7 @@ impl NetworkWorker { from_worker, light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()), event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?, + peers_notifications_sinks, metrics, boot_node_ids, }) @@ -542,8 +554,15 @@ impl NetworkService { &self.local_peer_id } - /// Writes a message on an open notifications channel. Has no effect if the notifications - /// channel with this protocol name is closed. + /// Appends a notification to the buffer of pending outgoing notifications with the given peer. + /// Has no effect if the notifications channel with this protocol name is not open. + /// + /// If the buffer of pending outgoing notifications with that peer is full, the notification + /// is silently dropped. This happens if you call this method at a higher rate than the rate + /// at which the peer processes these notifications, or if the available network bandwidth is + /// too low. + /// For this reason, this method is considered soft-deprecated. You are encouraged to use + /// [`NetworkService::prepare_notification`] instead. /// /// > **Note**: The reason why this is a no-op in the situation where we have no channel is /// > that we don't guarantee message delivery anyway. Networking issues can cause @@ -551,14 +570,99 @@ impl NetworkService { /// > between the remote voluntarily closing a substream or a network error /// > preventing the message from being delivered. /// - /// The protocol must have been registered with `register_notifications_protocol`. + /// The protocol must have been registered with `register_notifications_protocol` or + /// `NetworkConfiguration::notifications_protocols`. /// pub fn write_notification(&self, target: PeerId, engine_id: ConsensusEngineId, message: Vec) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::WriteNotification { - target, - engine_id, - message, - }); + if let Some(notifications_sizes_metric) = self.notifications_sizes_metric.as_ref() { + notifications_sizes_metric + .with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)]) + .observe(message.len() as f64); + } + + // TODO: + todo!(); + } + + /// Waits until one or more slots are available in the buffer of pending outgoing notifications + /// with the given peer. + /// + /// The returned `Future` finishes after the peer is ready to accept more notifications, or + /// after the substream has been closed. Use the returned [`NotificationsBufferSlot`] to + /// actually send the notifications. + /// + /// An error is returned if there exists no open notifications substream with that combination + /// of peer and protocol, or if the remote has asked to close the notifications substream. + /// + /// If the remote requests to close the notifications substream, all notifications successfully + /// enqueued with this method will finish being sent out before the substream actually gets + /// closed, but attempting to enqueue more notifications will now return an error. It is also + /// possible for the entire connection to be abruptly closed, in which case enqueued + /// notifications will be lost. + /// + /// The protocol must have been registered with `register_notifications_protocol` or + /// `NetworkConfiguration::notifications_protocols`. + /// + /// # Usage + /// + /// This method waits until there is space available in the buffer of messages towards the + /// given peer. If the peer processes notifications at a slower rate than we send them, this + /// buffer will quickly fill up. + /// + /// As such, you should never do something like this: + /// + /// ```ignore + /// // Do NOT do this + /// for peer in peers { + /// if let Ok(n) = network.prepare_notification(peer, ...).await { + /// n.send(...); + /// } + /// } + /// ``` + /// + /// Doing so would slow down all peers to the rate of the slowest one. A malicious or + /// malfunctioning peer could intentionally process notifications at a very slow rate. + /// + /// Instead, you are encouraged to maintain your own buffer of notifications on top of the one + /// maintained by `sc-network`, and use `prepare_notification` to progressively send out + /// elements from your buffer. If this additional buffer is full (which will happen at some + /// point if the peer is too slow to process notifications), appropriate measures can be taken, + /// such as removing non-critical notifications from the buffer or disconnecting the peer + /// using [`NetworkService::disconnect_peer`]. + /// + /// + /// Notifications Per-peer buffer + /// broadcast +-------> of notifications +--> `prepare_notification` +--> Internet + /// ^ (not covered by + /// | sc-network) + /// + + /// Notifications should be dropped + /// if buffer is full + /// + pub async fn prepare_notification<'a>( + &'a self, + target: PeerId, + engine_id: ConsensusEngineId, + ) -> Result, SendNotificationsError> { + // We clone the `NotificationsSink` in order to be able to unlock the network-wide + // `peers_notifications_sinks` mutex as soon as possible. + let sink = { + let peers_notifications_sinks = self.peers_notifications_sinks.lock(); + if let Some(sink) = peers_notifications_sinks.get(&(target, engine_id)) { + sink.clone() + } else { + return Err(SendNotificationsError::NoSubstream); + } + }; + + // TODO: + let ready = sink.reserve_notification(todo!()).await; + Ok(NotificationsBufferSlot { + ready, + notification_size_metric: self.notifications_sizes_metric.as_ref().map(|histogram| { + histogram.with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)]) + }), + }) } /// Returns a stream containing the events that happen on the network. @@ -813,6 +917,36 @@ impl NetworkStateInfo for NetworkService } } +/// Reserved slot in the notifications buffer, ready to accept data. +#[must_use] +pub struct NotificationsBufferSlot<'a> { + ready: Ready<'a>, + + /// Field extracted from the [`Metrics`] struct and necessary to report the + /// notifications-related metrics. + notification_size_metric: Option, +} + +impl<'a> NotificationsBufferSlot<'a> { + /// Consumes this slots reservation and actually queues the notification. + pub fn send(self, notification: impl Into>) { + let notification = notification.into(); + + if let Some(notification_size_metric) = &self.notification_size_metric { + notification_size_metric.observe(notification.len() as f64); + } + + self.ready.send(notification) + } +} + +/// Error returned by [`NetworkService::send_notification`]. +#[derive(Debug, derive_more::Display, derive_more::Error)] +pub enum SendNotificationsError { + /// No open notifications substream exists with the provided combination of peer and protocol. + NoSubstream, +} + /// Messages sent from the `NetworkService` to the `NetworkWorker`. /// /// Each entry corresponds to a method of `NetworkService`. @@ -826,11 +960,6 @@ enum ServiceToWorkerMsg { AddKnownAddress(PeerId, Multiaddr), SyncFork(Vec, B::Hash, NumberFor), EventStream(out_events::Sender), - WriteNotification { - message: Vec, - engine_id: ConsensusEngineId, - target: PeerId, - }, RegisterNotifProtocol { engine_id: ConsensusEngineId, protocol_name: Cow<'static, [u8]>, @@ -867,6 +996,9 @@ pub struct NetworkWorker { metrics: Option, /// The `PeerId`'s of all boot nodes. boot_node_ids: Arc>, + /// For each peer and protocol combination, an object that allows sending notifications to + /// that peer. Shared with the [`NetworkService`]. + peers_notifications_sinks: Arc>>, } struct Metrics { @@ -1162,14 +1294,6 @@ impl Future for NetworkWorker { this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number), ServiceToWorkerMsg::EventStream(sender) => this.event_streams.push(sender), - ServiceToWorkerMsg::WriteNotification { message, engine_id, target } => { - if let Some(metrics) = this.metrics.as_ref() { - metrics.notifications_sizes - .with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)]) - .observe(message.len() as f64); - } - this.network_service.user_protocol_mut().write_notification(target, engine_id, message) - }, ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => { this.network_service .register_notifications_protocol(engine_id, protocol_name); @@ -1237,11 +1361,61 @@ impl Future for NetworkWorker { .inc(); } }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) => { + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened { remote, engine_id, notifications_sink, role })) => { if let Some(metrics) = this.metrics.as_ref() { - metrics.update_with_network_event(&ev); + metrics.notifications_streams_opened_total + .with_label_values(&[&maybe_utf8_bytes_to_string(&engine_id)]).inc(); } - this.event_streams.send(ev); + { + let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); + peers_notifications_sinks.insert((remote.clone(), engine_id), notifications_sink); + } + this.event_streams.send(Event::NotificationStreamOpened { + remote, + engine_id, + role, + }); + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced { remote, engine_id, notifications_sink })) => { + let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); + if let Some(s) = peers_notifications_sinks.get_mut(&(remote, engine_id)) { + *s = notifications_sink; + } else { + log::error!( + target: "sub-libp2p", + "NotificationStreamReplaced for non-existing substream" + ); + } + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, engine_id })) => { + if let Some(metrics) = this.metrics.as_ref() { + metrics.notifications_streams_closed_total + .with_label_values(&[&maybe_utf8_bytes_to_string(&engine_id[..])]).inc(); + } + this.event_streams.send(Event::NotificationStreamClosed { + remote: remote.clone(), + engine_id, + }); + { + let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); + peers_notifications_sinks.remove(&(remote.clone(), engine_id)); + } + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived { remote, messages })) => { + if let Some(metrics) = this.metrics.as_ref() { + for (engine_id, message) in &messages { + metrics.notifications_sizes + .with_label_values(&["in", &maybe_utf8_bytes_to_string(engine_id)]) + .observe(message.len() as f64); + } + } + this.event_streams.send(Event::NotificationsReceived { + remote, + messages, + }); + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Dht(ev))) => { + this.event_streams.send(Event::Dht(ev)); }, Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, endpoint, num_established }) => { trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);