Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ where
I: Iterator<Item = ConnectionId>
{
/// Obtains the next connection, if any.
pub fn next<'b>(&'b mut self) -> Option<EstablishedConnection<'b, TInEvent>>
pub fn next(&mut self) -> Option<EstablishedConnection<'_, TInEvent>>
{
while let Some(id) = self.ids.next() {
if self.pool.manager.is_established(&id) { // (*)
Expand Down
12 changes: 6 additions & 6 deletions core/src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,8 @@ where

/// Obtains a dialing attempt to the peer by connection ID of
/// the current connection attempt.
pub fn attempt<'b>(&'b mut self, id: ConnectionId)
-> Option<DialingAttempt<'b, TInEvent>>
pub fn attempt(&mut self, id: ConnectionId)
-> Option<DialingAttempt<'_, TInEvent>>
{
if let hash_map::Entry::Occupied(attempts) = self.network.dialing.entry(self.peer_id.clone()) {
if let Some(pos) = attempts.get().iter().position(|s| s.current.0 == id) {
Expand All @@ -446,8 +446,8 @@ where
}

/// Gets an iterator over all dialing (i.e. pending outgoing) connections to the peer.
pub fn attempts<'b>(&'b mut self)
-> DialingAttemptIter<'b,
pub fn attempts(&mut self)
-> DialingAttemptIter<'_,
TInEvent,
TOutEvent,
THandler,
Expand All @@ -460,8 +460,8 @@ where
/// Obtains some dialing connection to the peer.
///
/// At least one dialing connection is guaranteed to exist on a `DialingPeer`.
pub fn some_attempt<'b>(&'b mut self)
-> DialingAttempt<'b, TInEvent>
pub fn some_attempt(&mut self)
-> DialingAttempt<'_, TInEvent>
{
self.attempts()
.into_first()
Expand Down
27 changes: 14 additions & 13 deletions misc/multiaddr/src/from_url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ fn from_url_inner_http_ws(url: url::Url, lossy: bool) -> std::result::Result<Mul
return Err(FromUrlErr::BadUrl);
};

if !lossy {
if !url.username().is_empty() || url.password().is_some() ||
(lost_path && url.path() != "/" && !url.path().is_empty()) ||
url.query().is_some() || url.fragment().is_some()
{
return Err(FromUrlErr::InformationLoss);
}
if !lossy && (
!url.username().is_empty() ||
url.password().is_some() ||
(lost_path && url.path() != "/" && !url.path().is_empty()) ||
url.query().is_some() || url.fragment().is_some()
) {
return Err(FromUrlErr::InformationLoss);
}

Ok(iter::once(ip)
Expand All @@ -104,12 +104,13 @@ fn from_url_inner_path(url: url::Url, lossy: bool) -> std::result::Result<Multia
_ => unreachable!("We only call this function for one of the given schemes; qed")
};

if !lossy {
if !url.username().is_empty() || url.password().is_some() ||
url.query().is_some() || url.fragment().is_some()
{
return Err(FromUrlErr::InformationLoss);
}
if !lossy && (
!url.username().is_empty() ||
url.password().is_some() ||
url.query().is_some() ||
url.fragment().is_some()
) {
return Err(FromUrlErr::InformationLoss);
}

Ok(Multiaddr::from(protocol))
Expand Down
26 changes: 20 additions & 6 deletions protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,29 @@ pub enum RequestResponseHandlerEvent<TCodec>
where
TCodec: RequestResponseCodec
{
/// An inbound request.
/// A request has been received.
Request {
request_id: RequestId,
request: TCodec::Request,
sender: oneshot::Sender<TCodec::Response>
},
/// An inbound response.
/// A response has been received.
Response {
request_id: RequestId,
response: TCodec::Response
},
/// An outbound upgrade (i.e. request) timed out.
/// A response to an inbound request has been sent.
ResponseSent(RequestId),
/// A response to an inbound request was omitted as a result
/// of dropping the response `sender` of an inbound `Request`.
ResponseOmission(RequestId),
/// An outbound request timed out while sending the request
/// or waiting for the response.
OutboundTimeout(RequestId),
/// An outbound request failed to negotiate a mutually supported protocol.
OutboundUnsupportedProtocols(RequestId),
/// An inbound request timed out.
/// An inbound request timed out while waiting for the request
/// or sending the response.
InboundTimeout(RequestId),
/// An inbound request failed to negotiate a mutually supported protocol.
InboundUnsupportedProtocols(RequestId),
Expand Down Expand Up @@ -187,9 +194,16 @@ where

fn inject_fully_negotiated_inbound(
&mut self,
(): (),
_: RequestId
sent: bool,
request_id: RequestId
) {
if sent {
self.pending_events.push_back(
RequestResponseHandlerEvent::ResponseSent(request_id))
} else {
self.pending_events.push_back(
RequestResponseHandlerEvent::ResponseOmission(request_id))
}
}

fn inject_fully_negotiated_outbound(
Expand Down
6 changes: 4 additions & 2 deletions protocols/request-response/src/handler/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl<TCodec> InboundUpgrade<NegotiatedSubstream> for ResponseProtocol<TCodec>
where
TCodec: RequestResponseCodec + Send + 'static,
{
type Output = ();
type Output = bool;
type Error = io::Error;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

Expand All @@ -105,10 +105,12 @@ where
if let Ok(response) = self.response_receiver.await {
let write = self.codec.write_response(&protocol, &mut io, response);
write.await?;
} else {
return Ok(false)
}
}
io.close().await?;
Ok(())
Ok(true)
}.boxed()
}
}
Expand Down
80 changes: 53 additions & 27 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,6 @@
//! For that purpose, [`RequestResponseCodec::Protocol`] is typically
//! instantiated with a sum type.
//!
//! ## One-Way Protocols
//!
//! The implementation supports one-way protocols that do not
//! have responses. In these cases the [`RequestResponseCodec::Response`] can
//! be defined as `()` and [`RequestResponseCodec::read_response`] as well as
//! [`RequestResponseCodec::write_response`] given the obvious implementations.
//! Note that `RequestResponseMessage::Response` will still be emitted,
//! immediately after the request has been sent, since `RequestResponseCodec::read_response`
//! will not actually read anything from the given I/O stream.
//! [`RequestResponse::send_response`] need not be called for one-way protocols,
//! i.e. the [`ResponseChannel`] may just be dropped.
//!
//! ## Limited Protocol Support
//!
//! It is possible to only support inbound or outbound requests for
Expand Down Expand Up @@ -115,9 +103,11 @@ pub enum RequestResponseMessage<TRequest, TResponse, TChannelResponse = TRespons
request_id: RequestId,
/// The request message.
request: TRequest,
/// The sender of the request who is awaiting a response.
/// The channel waiting for the response.
///
/// See [`RequestResponse::send_response`].
/// If this channel is dropped instead of being used to send a response
/// via [`RequestResponse::send_response`], a [`RequestResponseEvent::InboundFailure`]
/// with [`InboundFailure::ResponseOmission`] is emitted.
channel: ResponseChannel<TChannelResponse>,
},
/// A response message.
Expand Down Expand Up @@ -151,6 +141,14 @@ pub enum RequestResponseEvent<TRequest, TResponse, TChannelResponse = TResponse>
error: OutboundFailure,
},
/// An inbound request failed.
///
/// > **Note**: The case whereby a connection on which a response is sent
/// > closes after [`RequestResponse::send_response`] already succeeded
/// > but before the response could be sent on the connection is reflected
/// > by there being no [`RequestResponseEvent::ResponseSent`] event.
/// > Code interested in ensuring a response has been successfully
/// > handed to the transport layer, e.g. before continuing with the next
/// > step in a multi-step protocol, should listen to these events.
InboundFailure {
/// The peer from whom the request was received.
peer: PeerId,
Expand All @@ -159,6 +157,16 @@ pub enum RequestResponseEvent<TRequest, TResponse, TChannelResponse = TResponse>
/// The error that occurred.
error: InboundFailure,
},
/// A response to an inbound request has been sent.
///
/// When this event is received, the response has been flushed on
/// the underlying transport connection.
ResponseSent {
/// The peer to whom the response was sent.
peer: PeerId,
/// The ID of the inbound request whose response was sent.
request_id: RequestId,
},
}

/// Possible failures occurring in the context of sending
Expand Down Expand Up @@ -186,14 +194,17 @@ pub enum OutboundFailure {
#[derive(Debug)]
pub enum InboundFailure {
/// The inbound request timed out, either while reading the
/// incoming request or before a response is sent, i.e. if
/// incoming request or before a response is sent, e.g. if
/// [`RequestResponse::send_response`] is not called in a
/// timely manner.
Timeout,
/// The local peer supports none of the requested protocols.
/// The local peer supports none of the protocols requested
/// by the remote.
UnsupportedProtocols,
/// The connection closed before a response was delivered.
ConnectionClosed,
/// The local peer failed to respond to an inbound request
/// due to the [`ResponseChannel`] being dropped instead of
/// being passed to [`RequestResponse::send_response`].
ResponseOmission,
}

/// A channel for sending a response to an inbound request.
Expand Down Expand Up @@ -379,17 +390,18 @@ where

/// Initiates sending a response to an inbound request.
///
/// If the `ResponseChannel` is already closed due to a timeout,
/// the response is discarded and eventually [`RequestResponseEvent::InboundFailure`]
/// is emitted by `RequestResponse::poll`.
/// If the `ResponseChannel` is already closed due to a timeout or
/// the connection being closed, the response is returned as an `Err`
/// for further handling. Once the response has been successfully sent
/// on the corresponding connection, [`RequestResponseEvent::ResponseSent`]
/// is emitted.
///
/// The provided `ResponseChannel` is obtained from a
/// The provided `ResponseChannel` is obtained from an inbound
/// [`RequestResponseMessage::Request`].
pub fn send_response(&mut self, ch: ResponseChannel<TCodec::Response>, rs: TCodec::Response) {
// Fails only if the inbound upgrade timed out waiting for the response,
// in which case the handler emits `RequestResponseHandlerEvent::InboundTimeout`
// which in turn results in `RequestResponseEvent::InboundFailure`.
let _ = ch.sender.send(rs);
pub fn send_response(&mut self, ch: ResponseChannel<TCodec::Response>, rs: TCodec::Response)
-> Result<(), TCodec::Response>
{
ch.sender.send(rs)
}

/// Adds a known address for a peer that can be used for
Expand Down Expand Up @@ -577,6 +589,20 @@ where
RequestResponseEvent::Message { peer, message }
));
}
RequestResponseHandlerEvent::ResponseSent(request_id) => {
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::ResponseSent { peer, request_id }));
}
RequestResponseHandlerEvent::ResponseOmission(request_id) => {
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
peer,
request_id,
error: InboundFailure::ResponseOmission
}));
}
RequestResponseHandlerEvent::OutboundTimeout(request_id) => {
if let Some((peer, _conn)) = self.pending_responses.remove(&request_id) {
self.pending_events.push_back(
Expand Down
Loading