Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
client/network/request_response.rs: Clean up silently failing responses
  • Loading branch information
mxinden committed Dec 8, 2020
commit b792d6117913e25e8070239fb617abee9d8543a0
352 changes: 184 additions & 168 deletions client/network/src/request_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ enum RequestProcessingOutcome {
response: Vec<u8>,
},
Busy {
request_id: RequestId,
peer: PeerId,
protocol: Cow<'static, str>,
},
Expand Down Expand Up @@ -352,192 +351,209 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
Self::OutEvent,
>,
> {
'poll_all: loop {
// Poll to see if any response is ready to be sent back.
while let Poll::Ready(Some(result)) = self.pending_responses.poll_next_unpin(cx) {
match result {
RequestProcessingOutcome::Response {
request_id, peer, protocol: protocol_name, inner_channel, response
} => {
if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
if let Err(_) = protocol.send_response(inner_channel, Ok(response)) {
self.pending_responses_arrival_time.remove(&request_id);
log::warn!(
target: "sub-libp2p",
"Failed to send response for {:?} on protocol {:?}. Dropping \
response",
request_id, protocol_name,
);
let out = Event::InboundRequest {
peer,
protocol: protocol_name,
result: Err(ResponseFailure::TimeoutOrClosed),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
// Poll to see if any response is ready to be sent back.
while let Poll::Ready(Some(result)) = self.pending_responses.poll_next_unpin(cx) {
match result {
RequestProcessingOutcome::Response {
request_id, peer, protocol: protocol_name, inner_channel, response
} => {
if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
if let Err(_) = protocol.send_response(inner_channel, Ok(response)) {
self.pending_responses_arrival_time.remove(&request_id);
log::warn!(
target: "sub-libp2p",
"Failed to send response for {:?} on protocol {:?}. Dropping \
response",
request_id, protocol_name,
);
let out = Event::InboundRequest {
peer,
protocol: protocol_name,
result: Err(ResponseFailure::TimeoutOrClosed),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
}
RequestProcessingOutcome::Busy { request_id, peer, protocol } => {
// Note: Request is removed from self.pending_responses_arrival_time when
// handling [`InboundFailure::ResponseOmission`].
let out = Event::InboundRequest {
peer,
protocol,
result: Err(ResponseFailure::Busy),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
}
RequestProcessingOutcome::Busy { peer, protocol } => {
// Note: Request is removed from self.pending_responses_arrival_time when
// handling [`InboundFailure::ResponseOmission`].
let out = Event::InboundRequest {
peer,
protocol,
result: Err(ResponseFailure::Busy),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
}
}

// Poll request-responses protocols.
for (protocol, (behaviour, resp_builder)) in &mut self.protocols {
while let Poll::Ready(ev) = behaviour.poll(cx, params) {
let ev = match ev {
// Main events we are interested in.
NetworkBehaviourAction::GenerateEvent(ev) => ev,

// Other events generated by the underlying behaviour are transparently
// passed through.
NetworkBehaviourAction::DialAddress { address } => {
log::error!("The request-response isn't supposed to start dialing peers");
return Poll::Ready(NetworkBehaviourAction::DialAddress { address })
}
NetworkBehaviourAction::DialPeer { peer_id, condition } => {
log::error!("The request-response isn't supposed to start dialing peers");
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id,
condition,
})
}
NetworkBehaviourAction::NotifyHandler {
// Poll request-responses protocols.
for (protocol, (behaviour, resp_builder)) in &mut self.protocols {
while let Poll::Ready(ev) = behaviour.poll(cx, params) {
let ev = match ev {
// Main events we are interested in.
NetworkBehaviourAction::GenerateEvent(ev) => ev,

// Other events generated by the underlying behaviour are transparently
// passed through.
NetworkBehaviourAction::DialAddress { address } => {
log::error!("The request-response isn't supposed to start dialing peers");
return Poll::Ready(NetworkBehaviourAction::DialAddress { address })
}
NetworkBehaviourAction::DialPeer { peer_id, condition } => {
log::error!("The request-response isn't supposed to start dialing peers");
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id,
condition,
})
}
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event,
} => {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event,
} => {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event: ((*protocol).to_string(), event),
})
}
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
address, score,
})
}
};

match ev {
// Received a request from a remote.
RequestResponseEvent::Message {
peer,
message: RequestResponseMessage::Request { request_id, request, channel, .. },
} => {
self.pending_responses_arrival_time.insert(
request_id.clone(),
Instant::now(),
);
event: ((*protocol).to_string(), event),
})
}
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
address, score,
})
}
};

let (tx, rx) = oneshot::channel();

// Submit the request to the "response builder" passed by the user at
// initialization.
if let Some(resp_builder) = resp_builder {
// If the response builder is too busy, silently drop `tx`.
// This will be reported as a `Busy` error.
let _ = resp_builder.try_send(IncomingRequest {
peer: peer.clone(),
payload: request,
pending_response: tx,
});
}
match ev {
// Received a request from a remote.
RequestResponseEvent::Message {
peer,
message: RequestResponseMessage::Request { request_id, request, channel, .. },
} => {
self.pending_responses_arrival_time.insert(
request_id.clone(),
Instant::now(),
);

let (tx, rx) = oneshot::channel();

// Submit the request to the "response builder" passed by the user at
// initialization.
if let Some(resp_builder) = resp_builder {
// If the response builder is too busy, silently drop `tx`.
// This will be reported as a `Busy` error.
let _ = resp_builder.try_send(IncomingRequest {
peer: peer.clone(),
payload: request,
pending_response: tx,
});
}

let protocol = protocol.clone();
self.pending_responses.push(Box::pin(async move {
// The `tx` created above can be dropped if we are not capable of
// processing this request, which is reflected as a "Busy" error.
if let Ok(response) = rx.await {
RequestProcessingOutcome::Response {
request_id, peer, protocol, inner_channel: channel, response
}
} else {
RequestProcessingOutcome::Busy { request_id, peer, protocol }
let protocol = protocol.clone();
self.pending_responses.push(Box::pin(async move {
// The `tx` created above can be dropped if we are not capable of
// processing this request, which is reflected as a "Busy" error.
if let Ok(response) = rx.await {
RequestProcessingOutcome::Response {
request_id, peer, protocol, inner_channel: channel, response
}
}));
} else {
RequestProcessingOutcome::Busy { peer, protocol }
}
}));

// This `continue` makes sure that `pending_responses` gets polled
// after we have added the new element.
continue 'poll_all;
}
// This `wake_by_ref` makes sure that `pending_responses` gets polled
// after we have added the new element.
cx.waker().wake_by_ref();
}

// Received a response from a remote to one of our requests.
RequestResponseEvent::Message {
message:
RequestResponseMessage::Response {
request_id,
response,
},
..
} => {
let out = Event::RequestFinished {
request_id,
result: response.map_err(|()| RequestFailure::Refused),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
// Received a response from a remote to one of our requests.
RequestResponseEvent::Message {
message:
RequestResponseMessage::Response {
request_id,
response,
},
..
} => {
let out = Event::RequestFinished {
request_id,
result: response.map_err(|()| RequestFailure::Refused),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}

// One of our requests has failed.
RequestResponseEvent::OutboundFailure {
// One of our requests has failed.
RequestResponseEvent::OutboundFailure {
request_id,
error,
..
} => {
let out = Event::RequestFinished {
request_id,
error,
..
} => {
let out = Event::RequestFinished {
request_id,
result: Err(RequestFailure::Network(error)),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
result: Err(RequestFailure::Network(error)),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}

// Remote has tried to send a request but failed.
RequestResponseEvent::InboundFailure { request_id, peer, error, .. } => {
if self.pending_responses_arrival_time.remove(&request_id).is_none() {
debug_assert!(
false,
"Expected to find start time for failed response.",
)
}
let out = Event::InboundRequest {
peer,
protocol: protocol.clone(),
result: Err(ResponseFailure::Network(error)),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
// Remote has tried to send a request but failed.
RequestResponseEvent::InboundFailure { request_id, peer, error, .. } => {
if self.pending_responses_arrival_time.remove(&request_id).is_none() {
debug_assert!(
false,
"Expected to find start time for failed response.",
)
}
RequestResponseEvent::ResponseSent { request_id, peer } => {
match self.pending_responses_arrival_time.remove(&request_id) {
Some(arrival) => {
let out = Event::InboundRequest {
peer,
protocol: protocol.clone(),
result: Ok(arrival.elapsed()),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
},
None => debug_assert!(
false,
"Expected to find start time for sent response.",
),
}

let out = Event::InboundRequest {
peer,
protocol: protocol.clone(),
result: Err(ResponseFailure::Network(error)),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
RequestResponseEvent::ResponseSent { request_id, peer } => {
match self.pending_responses_arrival_time.remove(&request_id) {
Some(arrival) => {
let out = Event::InboundRequest {
peer,
protocol: protocol.clone(),
result: Ok(arrival.elapsed()),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
},
None => debug_assert!(
false,
"Expected to find start time for sent response.",
),
}
};
}
}

break Poll::Pending;
}
};
}
}

// Sending a response can fail silently if the connection to the peer closed after calling
// [`RequestResponse::send_response`] but before the response could be sent on the
// connection. Clean up arrival times for responses that have silently failed.
let now = Instant::now();
self.pending_responses_arrival_time.retain(|request_id, arrival_time| {
if now.duration_since(*arrival_time) > Duration::from_secs(10 * 60) {
log::warn!(
target: "sub-libp2p",
"Connection to peer closed after calling `send_response` but before response \
could be send on the connection. Response {:?} has been dropped.",
request_id,
);
false
} else {
true
}
});


Poll::Pending
}
}

Expand Down