Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use std::{

pub use crate::request_responses::{
ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, RequestId,
IfDisconnected
};

/// General behaviour of the network. Combines all protocols together.
Expand Down Expand Up @@ -248,8 +249,9 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
protocol: &str,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
connect: IfDisconnected,
) {
self.request_responses.send_request(target, protocol, request, pending_response)
self.request_responses.send_request(target, protocol, request, pending_response, connect)
}

/// Returns a shared reference to the user protocol.
Expand Down Expand Up @@ -317,7 +319,7 @@ Behaviour<B, H> {
}

self.request_responses.send_request(
&target, &self.block_request_protocol_name, buf, pending_response,
&target, &self.block_request_protocol_name, buf, pending_response, IfDisconnected::ImmediateError,
);
},
CustomMessageOutcome::NotificationStreamOpened { remote, protocol, roles, notifications_sink } => {
Expand Down Expand Up @@ -454,11 +456,22 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> {
use light_client_requests::sender::OutEvent;
while let Poll::Ready(Some(event)) = self.light_client_request_sender.poll_next_unpin(cx) {
while let Poll::Ready(Some(event)) =
self.light_client_request_sender.poll_next_unpin(cx)
{
match event {
OutEvent::SendRequest { target, request, pending_response, protocol_name } => {
self.request_responses.send_request(&target, &protocol_name, request, pending_response)
}
OutEvent::SendRequest {
target,
request,
pending_response,
protocol_name,
} => self.request_responses.send_request(
&target,
&protocol_name,
request,
pending_response,
IfDisconnected::ImmediateError,
),
}
}

Expand Down
32 changes: 28 additions & 4 deletions client/network/src/request_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,25 @@ impl From<(Cow<'static, str>, RequestId)> for ProtocolRequestId {
}
}

/// When sending a request, what to do on a disconnected recipient.
pub enum IfDisconnected {
/// Try to connect to the peer.
TryConnect,
/// Just fail if the destination is not yet connected.
ImmediateError,
}

/// Convenience functions for `IfDisconnected`.
impl IfDisconnected {
/// Shall we connect to a disconnected peer?
pub fn should_connect(self) -> bool {
match self {
Self::TryConnect => true,
Self::ImmediateError => false,
}
}
}

/// Implementation of `NetworkBehaviour` that provides support for request-response protocols.
pub struct RequestResponsesBehaviour {
/// The multiple sub-protocols, by name.
Expand Down Expand Up @@ -269,17 +288,19 @@ impl RequestResponsesBehaviour {

/// Initiates sending a request.
///
/// An error is returned if we are not connected to the target peer or if the protocol doesn't
/// match one that has been registered.
/// If there is no established connection to the target peer, the behavior is determined by the choice of `connect`.
///
/// An error is returned if the protocol doesn't match one that has been registered.
pub fn send_request(
&mut self,
target: &PeerId,
protocol_name: &str,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
connect: IfDisconnected,
) {
if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
if protocol.is_connected(target) {
if protocol.is_connected(target) || connect.should_connect() {
let request_id = protocol.send_request(target, request);
let prev_req_id = self.pending_requests.insert(
(protocol_name.to_string().into(), request_id).into(),
Expand Down Expand Up @@ -489,7 +510,6 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
return Poll::Ready(NetworkBehaviourAction::DialAddress { address })
}
NetworkBehaviourAction::DialPeer { peer_id, condition } => {
log::error!("The request-response isn't supposed to start dialing peers");
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id,
condition,
Expand Down Expand Up @@ -949,6 +969,7 @@ mod tests {
protocol_name,
b"this is a request".to_vec(),
sender,
IfDisconnected::ImmediateError,
);
assert!(response_receiver.is_none());
response_receiver = Some(receiver);
Expand Down Expand Up @@ -1037,6 +1058,7 @@ mod tests {
protocol_name,
b"this is a request".to_vec(),
sender,
IfDisconnected::ImmediateError,
);
assert!(response_receiver.is_none());
response_receiver = Some(receiver);
Expand Down Expand Up @@ -1179,12 +1201,14 @@ mod tests {
protocol_name_1,
b"this is a request".to_vec(),
sender_1,
IfDisconnected::ImmediateError,
);
swarm_1.send_request(
&peer_id,
protocol_name_2,
b"this is a request".to_vec(),
sender_2,
IfDisconnected::ImmediateError,
);
assert!(response_receivers.is_none());
response_receivers = Some((receiver_1, receiver_2));
Expand Down
51 changes: 38 additions & 13 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ use std::{
task::Poll,
};

pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure};
pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, IfDisconnected};

mod metrics;
mod out_events;
Expand Down Expand Up @@ -812,9 +812,10 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// notifications should remain the default ways of communicating information. For example, a
/// peer can announce something through a notification, after which the recipient can obtain
/// more information by performing a request.
/// As such, this function is meant to be called only with peers we are already connected to.
/// Calling this method with a `target` we are not connected to will *not* attempt to connect
/// to said peer.
/// As such, call this function with `IfDisconnected::ImmediateError` for `connect`. This way you
/// will get an error immediately for disconnected peers, instead of waiting for a potentially very
/// long connection attempt, which would suggest that something is wrong anyway, as you are
/// supposed to be connected because of the notification protocol.
///
/// No limit or throttling of concurrent outbound requests per peer and protocol are enforced.
/// Such restrictions, if desired, need to be enforced at the call site(s).
Expand All @@ -826,15 +827,12 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
&self,
target: PeerId,
protocol: impl Into<Cow<'static, str>>,
request: Vec<u8>
request: Vec<u8>,
connect: IfDisconnected,
) -> Result<Vec<u8>, RequestFailure> {
let (tx, rx) = oneshot::channel();
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
target,
protocol: protocol.into(),
request,
pending_response: tx
});

self.start_request(target, protocol, request, tx, connect);

match rx.await {
Ok(v) => v,
Expand All @@ -845,6 +843,32 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
}
}

/// Variation of `request` which starts a request whose response is delivered on a provided channel.
///
/// Instead of blocking and waiting for a reply, this function returns immediately, sending
/// responses via the passed in sender. This alternative API exists to make it easier to
/// integrate with message passing APIs.
///
/// Keep in mind that the connected receiver might receive a `Canceled` event in case of a
/// closing connection. This is expected behaviour. With `request` you would get a
/// `RequestFailure::Network(OutboundFailure::ConnectionClosed)` in that case.
pub fn start_request(
&self,
target: PeerId,
protocol: impl Into<Cow<'static, str>>,
request: Vec<u8>,
tx: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
connect: IfDisconnected,
) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
target,
protocol: protocol.into(),
request,
pending_response: tx,
connect,
});
}

/// You may call this when new transactons are imported by the transaction pool.
///
/// All transactions will be fetched from the `TransactionPool` that was passed at
Expand Down Expand Up @@ -1262,6 +1286,7 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
protocol: Cow<'static, str>,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
connect: IfDisconnected,
},
DisconnectPeer(PeerId, Cow<'static, str>),
NewBestBlockImported(B::Hash, NumberFor<B>),
Expand Down Expand Up @@ -1385,8 +1410,8 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number),
ServiceToWorkerMsg::EventStream(sender) =>
this.event_streams.push(sender),
ServiceToWorkerMsg::Request { target, protocol, request, pending_response } => {
this.network_service.send_request(&target, &protocol, request, pending_response);
ServiceToWorkerMsg::Request { target, protocol, request, pending_response, connect } => {
this.network_service.send_request(&target, &protocol, request, pending_response, connect);
},
ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) =>
this.network_service.user_protocol_mut().disconnect_peer(&who, &protocol_name),
Expand Down