-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Add NetworkService::send_notifications #6591
Changes from 3 commits
4c8181c
432ee3c
73bf377
8921896
c90159c
24e3d31
d5c53b4
dc82672
7cf8526
5a6236a
c90fe31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -541,16 +541,24 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> { | |
| &self.local_peer_id | ||
| } | ||
|
|
||
| /// Writes a message on an open notifications channel. Has no effect if the notifications | ||
| /// channel with this protocol name is closed. | ||
| /// Appends a notification to the buffer of pending outgoing notifications with the given peer. | ||
| /// Has no effect if the notifications channel with this protocol name is not open. | ||
| /// | ||
| /// If the buffer of pending outgoing notifications with that peer is full, the notification | ||
| /// is silently dropped. This happens if you call this method at a higher rate than the rate | ||
| /// at which the peer processes these notifications, or if the available network bandwidth is | ||
| /// too low. | ||
| /// For this reason, this method is considered soft-deprecated. You are encouraged to use | ||
| /// [`NetworkService::send_notification`] instead. | ||
| /// | ||
| /// > **Note**: The reason why this is a no-op in the situation where we have no channel is | ||
| /// > that we don't guarantee message delivery anyway. Networking issues can cause | ||
| /// > connections to drop at any time, and higher-level logic shouldn't differentiate | ||
| /// > between the remote voluntarily closing a substream or a network error | ||
| /// > preventing the message from being delivered. | ||
| /// | ||
| /// The protocol must have been registered with `register_notifications_protocol`. | ||
| /// The protocol must have been registered with `register_notifications_protocol` or | ||
| /// `NetworkConfiguration::notifications_protocols`. | ||
| /// | ||
| pub fn write_notification(&self, target: PeerId, engine_id: ConsensusEngineId, message: Vec<u8>) { | ||
| let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::WriteNotification { | ||
|
|
@@ -560,6 +568,71 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> { | |
| }); | ||
| } | ||
|
|
||
| /// Appends one or more notifications to the buffer of pending outgoing notifications with the | ||
| /// given peer. | ||
| /// | ||
| /// The returned `Future` finishes only after all the notifications have been queued or the | ||
| /// substream has been closed. | ||
| /// | ||
| /// This operation is atomic in the sense that either all or none of the notifications have | ||
| /// been enqueued. In particular, it is guaranteed that no notifications have been transmitted | ||
| /// if the operation is interrupted by dropping the `Future`. | ||
| /// | ||
| /// An error is returned if there exists no open notifications substream with that combination | ||
| /// of peer and protocol, or if the remote has asked to close the notifications substream. | ||
| /// | ||
| /// If the remote requests to close the notifications substream, all notifications successfully | ||
| /// enqueued with this method will finish being sent out before the substream actually gets | ||
| /// closed, but attempting to enqueue more notifications will now return an error. It is also | ||
| /// possible for the entire connection to be abruptly closed, in which case enqueued | ||
| /// notifications will be lost. | ||
| /// | ||
| /// The protocol must have been registered with `register_notifications_protocol` or | ||
| /// `NetworkConfiguration::notifications_protocols`. | ||
| /// | ||
| /// # Usage | ||
| /// | ||
| /// This method waits until there space is available in the buffer of messages towards the | ||
| /// given peer. If the peer processes notifications at a slower rate than we send them, this | ||
| /// buffer will quickly fill up. | ||
| /// | ||
| /// As such, you should never do something like this: | ||
| /// | ||
| /// ```ignore | ||
| /// // Do NOT do this | ||
| /// for peer in peers { | ||
| /// network.send_notifications(peer, ..., notifications).await; | ||
|
||
| /// } | ||
| /// ``` | ||
| /// | ||
| /// Doing so would slow down all peers to the rate of the slowest one. A malicious or | ||
| /// malfunctioning peer could intentionally process notifications at a very slow rate. | ||
| /// | ||
| /// Instead, you are encouraged to maintain your own buffer of notifications on top of the one | ||
| /// maintained by `sc-network`, and use `send_notifications` to progressively send out | ||
| /// elements from your buffer. If this additional buffer is full (which will happen at some | ||
| /// point if the peer is too slow to process notifications), appropriate measures can be taken, | ||
| /// such as removing non-critical notifications from the buffer or disconnecting the peer | ||
| /// using [`NetworkService::disconnect_peer`]. | ||
| /// | ||
| /// | ||
| /// Notifications Per-peer buffer | ||
| /// broadcast +-------> of notifications +--> `send_notifications` +--> Internet | ||
| /// ^ (not covered by | ||
| /// | sc-network) | ||
| /// + | ||
| /// Notifications should be dropped | ||
| /// if buffer is full | ||
| /// | ||
| pub async fn send_notifications( | ||
|
||
| &self, | ||
| target: PeerId, | ||
| engine_id: ConsensusEngineId, | ||
| notifications: impl ExactSizeIterator<Item = impl Into<Vec<u8>>>, | ||
tomaka marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ) -> Result<(), SendNotificationError> { | ||
mxinden marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| todo!() | ||
| } | ||
|
|
||
| /// Returns a stream containing the events that happen on the network. | ||
| /// | ||
| /// If this method is called multiple times, the events are duplicated. | ||
|
|
@@ -812,6 +885,13 @@ impl<B, H> NetworkStateInfo for NetworkService<B, H> | |
| } | ||
| } | ||
|
|
||
| /// Error returned by [`NetworkService::send_notification`]. | ||
| #[derive(Debug, derive_more::Display, derive_more::Error)] | ||
| pub enum SendNotificationError { | ||
tomaka marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| /// No open notifications substream exists with this combination of peer and protocol. | ||
tomaka marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| NoSubstream, | ||
| } | ||
|
|
||
| /// Messages sent from the `NetworkService` to the `NetworkWorker`. | ||
| /// | ||
| /// Each entry corresponds to a method of `NetworkService`. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.