@@ -158,26 +158,19 @@ pub struct RequestResponsesBehaviour {
158158 /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
159159 /// response to send back to the remote.
160160 pending_responses : stream:: FuturesUnordered <
161- Pin < Box < dyn Future < Output = RequestProcessingOutcome > + Send > >
161+ Pin < Box < dyn Future < Output = Option < RequestProcessingOutcome > > + Send > >
162162 > ,
163163
164164 /// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here.
165165 pending_responses_arrival_time : LruCache < RequestId , Instant > ,
166166}
167167
168168/// Generated by the response builder and waiting to be processed.
169- enum RequestProcessingOutcome {
170- Response {
171- request_id : RequestId ,
172- peer : PeerId ,
173- protocol : Cow < ' static , str > ,
174- inner_channel : ResponseChannel < Result < Vec < u8 > , ( ) > > ,
175- response : Vec < u8 > ,
176- } ,
177- Busy {
178- peer : PeerId ,
179- protocol : Cow < ' static , str > ,
180- } ,
169+ struct RequestProcessingOutcome {
170+ request_id : RequestId ,
171+ protocol : Cow < ' static , str > ,
172+ inner_channel : ResponseChannel < Result < Vec < u8 > , ( ) > > ,
173+ response : Vec < u8 > ,
181174}
182175
183176impl RequestResponsesBehaviour {
@@ -357,33 +350,31 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
357350 > {
358351 ' poll_all: loop {
359352 // Poll to see if any response is ready to be sent back.
360- while let Poll :: Ready ( Some ( result) ) = self . pending_responses . poll_next_unpin ( cx) {
361- match result {
362- RequestProcessingOutcome :: Response {
363- request_id, peer, protocol : protocol_name, inner_channel, response
364- } => {
365- if let Some ( ( protocol, _) ) = self . protocols . get_mut ( & * protocol_name) {
366- if let Err ( _) = protocol. send_response ( inner_channel, Ok ( response) ) {
367- self . pending_responses_arrival_time . pop ( & request_id) ;
368- log:: debug!(
369- target: "sub-libp2p" ,
370- "Failed to send response for {:?} on protocol {:?} due to a \
371- timeout or due to the connection to the peer being closed. \
372- Dropping response",
373- request_id, protocol_name,
374- ) ;
375- }
376- }
377- }
378- RequestProcessingOutcome :: Busy { peer, protocol } => {
379- // Note: Request is removed from self.pending_responses_arrival_time when
380- // handling [`InboundFailure::ResponseOmission`].
381- let out = Event :: InboundRequest {
382- peer,
383- protocol,
384- result : Err ( ResponseFailure :: Busy ) ,
385- } ;
386- return Poll :: Ready ( NetworkBehaviourAction :: GenerateEvent ( out) ) ;
353+ while let Poll :: Ready ( Some ( outcome) ) = self . pending_responses . poll_next_unpin ( cx) {
354+ let RequestProcessingOutcome {
355+ request_id,
356+ protocol : protocol_name,
357+ inner_channel,
358+ response
359+ } = match outcome {
360+ Some ( outcome) => outcome,
361+ // The response builder was too busy and thus the request was dropped. This is
362+ // later on reported as a `InboundFailure::Omission`.
363+ None => continue ,
364+ } ;
365+
366+ if let Some ( ( protocol, _) ) = self . protocols . get_mut ( & * protocol_name) {
367+ if let Err ( _) = protocol. send_response ( inner_channel, Ok ( response) ) {
368+ // Note: In case this happened due to a timeout, the corresponding
369+ // `RequestResponse` behaviour will emit an `InboundFailure::Timeout` event.
370+ self . pending_responses_arrival_time . pop ( & request_id) ;
371+ log:: debug!(
372+ target: "sub-libp2p" ,
373+ "Failed to send response for {:?} on protocol {:?} due to a \
374+ timeout or due to the connection to the peer being closed. \
375+ Dropping response",
376+ request_id, protocol_name,
377+ ) ;
387378 }
388379 }
389380 }
@@ -442,8 +433,9 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
442433 // Submit the request to the "response builder" passed by the user at
443434 // initialization.
444435 if let Some ( resp_builder) = resp_builder {
445- // If the response builder is too busy, silently drop `tx`.
446- // This will be reported as a `Busy` error.
436+ // If the response builder is too busy, silently drop `tx`. This
437+ // will be reported by the corresponding `RequestResponse` through
438+ // an `InboundFailure::Omission` event.
447439 let _ = resp_builder. try_send ( IncomingRequest {
448440 peer : peer. clone ( ) ,
449441 payload : request,
@@ -454,13 +446,14 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
454446 let protocol = protocol. clone ( ) ;
455447 self . pending_responses . push ( Box :: pin ( async move {
456448 // The `tx` created above can be dropped if we are not capable of
457- // processing this request, which is reflected as a "Busy" error.
449+ // processing this request, which is reflected as a
450+ // `InboundFailure::Omission` event.
458451 if let Ok ( response) = rx. await {
459- RequestProcessingOutcome :: Response {
460- request_id, peer , protocol, inner_channel : channel, response
461- }
452+ Some ( RequestProcessingOutcome {
453+ request_id, protocol, inner_channel : channel, response
454+ } )
462455 } else {
463- RequestProcessingOutcome :: Busy { peer , protocol }
456+ None
464457 }
465458 } ) ) ;
466459
@@ -565,8 +558,6 @@ pub enum RequestFailure {
565558/// Error when processing a request sent by a remote.
566559#[ derive( Debug , derive_more:: Display , derive_more:: Error ) ]
567560pub enum ResponseFailure {
568- /// Internal response builder is too busy to process this request.
569- Busy ,
570561 /// Problem on the network.
571562 #[ display( fmt = "Problem on the network" ) ]
572563 Network ( #[ error( ignore) ] InboundFailure ) ,
0 commit comments