Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 30cb4d1

Browse files
Revert "network: Detect early that NotificationOutSubstream was closed by the remote (#13396)" (#13409)
This reverts commit dfa654d.
1 parent a330912 commit 30cb4d1

File tree

2 files changed

+12
-127
lines changed

2 files changed

+12
-127
lines changed

client/network/src/protocol/notifications/handler.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -782,9 +782,6 @@ impl ConnectionHandler for NotifsHandler {
782782
// performed before the code paths that can produce `Ready` (with some rare exceptions).
783783
// Importantly, however, the flush is performed *after* notifications are queued with
784784
// `Sink::start_send`.
785-
// Note that we must call `poll_flush` on all substreams and not only on those we
786-
// have called `Sink::start_send` on, because `NotificationsOutSubstream::poll_flush`
787-
// also reports the substream termination (even if no data was written into it).
788785
for protocol_index in 0..self.protocols.len() {
789786
match &mut self.protocols[protocol_index].state {
790787
State::Open { out_substream: out_substream @ Some(_), .. } => {
@@ -827,7 +824,7 @@ impl ConnectionHandler for NotifsHandler {
827824
State::OpenDesiredByRemote { in_substream, pending_opening } =>
828825
match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
829826
Poll::Pending => {},
830-
Poll::Ready(Ok(())) => {},
827+
Poll::Ready(Ok(void)) => match void {},
831828
Poll::Ready(Err(_)) => {
832829
self.protocols[protocol_index].state =
833830
State::Closed { pending_opening: *pending_opening };
@@ -843,7 +840,7 @@ impl ConnectionHandler for NotifsHandler {
843840
cx,
844841
) {
845842
Poll::Pending => {},
846-
Poll::Ready(Ok(())) => {},
843+
Poll::Ready(Ok(void)) => match void {},
847844
Poll::Ready(Err(_)) => *in_substream = None,
848845
},
849846
}

client/network/src/protocol/notifications/upgrade/notifications.rs

Lines changed: 10 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
4141
use log::{error, warn};
4242
use sc_network_common::protocol::ProtocolName;
4343
use std::{
44+
convert::Infallible,
4445
io, mem,
4546
pin::Pin,
4647
task::{Context, Poll},
@@ -220,7 +221,10 @@ where
220221

221222
/// Equivalent to `Stream::poll_next`, except that it only drives the handshake and is
222223
/// guaranteed to not generate any notification.
223-
pub fn poll_process(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
224+
pub fn poll_process(
225+
self: Pin<&mut Self>,
226+
cx: &mut Context,
227+
) -> Poll<Result<Infallible, io::Error>> {
224228
let mut this = self.project();
225229

226230
loop {
@@ -242,10 +246,8 @@ where
242246
},
243247
NotificationsInSubstreamHandshake::Flush => {
244248
match Sink::poll_flush(this.socket.as_mut(), cx)? {
245-
Poll::Ready(()) => {
246-
*this.handshake = NotificationsInSubstreamHandshake::Sent;
247-
return Poll::Ready(Ok(()))
248-
},
249+
Poll::Ready(()) =>
250+
*this.handshake = NotificationsInSubstreamHandshake::Sent,
249251
Poll::Pending => {
250252
*this.handshake = NotificationsInSubstreamHandshake::Flush;
251253
return Poll::Pending
@@ -258,7 +260,7 @@ where
258260
st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote |
259261
st @ NotificationsInSubstreamHandshake::BothSidesClosed => {
260262
*this.handshake = st;
261-
return Poll::Ready(Ok(()))
263+
return Poll::Pending
262264
},
263265
}
264266
}
@@ -441,21 +443,6 @@ where
441443

442444
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
443445
let mut this = self.project();
444-
445-
// `Sink::poll_flush` does not expose stream closed error until we write something into
446-
// the stream, so the code below makes sure we detect that the substream was closed
447-
// even if we don't write anything into it.
448-
match Stream::poll_next(this.socket.as_mut(), cx) {
449-
Poll::Pending => {},
450-
Poll::Ready(Some(_)) => {
451-
error!(
452-
target: "sub-libp2p",
453-
"Unexpected incoming data in `NotificationsOutSubstream`",
454-
);
455-
},
456-
Poll::Ready(None) => return Poll::Ready(Err(NotificationsOutError::Terminated)),
457-
}
458-
459446
Sink::poll_flush(this.socket.as_mut(), cx).map_err(NotificationsOutError::Io)
460447
}
461448

@@ -505,21 +492,13 @@ pub enum NotificationsOutError {
505492
/// I/O error on the substream.
506493
#[error(transparent)]
507494
Io(#[from] io::Error),
508-
509-
/// End of incoming data detected on out substream.
510-
#[error("substream was closed/reset")]
511-
Terminated,
512495
}
513496

514497
#[cfg(test)]
515498
mod tests {
516-
use super::{
517-
NotificationsIn, NotificationsInOpen, NotificationsOut, NotificationsOutError,
518-
NotificationsOutOpen,
519-
};
520-
use futures::{channel::oneshot, future, prelude::*};
499+
use super::{NotificationsIn, NotificationsInOpen, NotificationsOut, NotificationsOutOpen};
500+
use futures::{channel::oneshot, prelude::*};
521501
use libp2p::core::upgrade;
522-
use std::{pin::Pin, task::Poll};
523502
use tokio::net::{TcpListener, TcpStream};
524503
use tokio_util::compat::TokioAsyncReadCompatExt;
525504

@@ -712,95 +691,4 @@ mod tests {
712691

713692
client.await.unwrap();
714693
}
715-
716-
#[tokio::test]
717-
async fn send_handshake_without_polling_for_incoming_data() {
718-
const PROTO_NAME: &str = "/test/proto/1";
719-
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
720-
721-
let client = tokio::spawn(async move {
722-
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
723-
let NotificationsOutOpen { handshake, .. } = upgrade::apply_outbound(
724-
socket.compat(),
725-
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
726-
upgrade::Version::V1,
727-
)
728-
.await
729-
.unwrap();
730-
731-
assert_eq!(handshake, b"hello world");
732-
});
733-
734-
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
735-
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
736-
737-
let (socket, _) = listener.accept().await.unwrap();
738-
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
739-
socket.compat(),
740-
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
741-
)
742-
.await
743-
.unwrap();
744-
745-
assert_eq!(handshake, b"initial message");
746-
substream.send_handshake(&b"hello world"[..]);
747-
748-
// Actually send the handshake.
749-
future::poll_fn(|cx| Pin::new(&mut substream).poll_process(cx)).await.unwrap();
750-
751-
client.await.unwrap();
752-
}
753-
754-
#[tokio::test]
755-
async fn can_detect_dropped_out_substream_without_writing_data() {
756-
const PROTO_NAME: &str = "/test/proto/1";
757-
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
758-
759-
let client = tokio::spawn(async move {
760-
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
761-
let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound(
762-
socket.compat(),
763-
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
764-
upgrade::Version::V1,
765-
)
766-
.await
767-
.unwrap();
768-
769-
assert_eq!(handshake, b"hello world");
770-
771-
future::poll_fn(|cx| match Pin::new(&mut substream).poll_flush(cx) {
772-
Poll::Pending => Poll::Pending,
773-
Poll::Ready(Ok(())) => {
774-
cx.waker().wake_by_ref();
775-
Poll::Pending
776-
},
777-
Poll::Ready(Err(e)) => {
778-
assert!(matches!(e, NotificationsOutError::Terminated));
779-
Poll::Ready(())
780-
},
781-
})
782-
.await;
783-
});
784-
785-
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
786-
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
787-
788-
let (socket, _) = listener.accept().await.unwrap();
789-
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
790-
socket.compat(),
791-
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
792-
)
793-
.await
794-
.unwrap();
795-
796-
assert_eq!(handshake, b"initial message");
797-
798-
// Send the handhsake.
799-
substream.send_handshake(&b"hello world"[..]);
800-
future::poll_fn(|cx| Pin::new(&mut substream).poll_process(cx)).await.unwrap();
801-
802-
drop(substream);
803-
804-
client.await.unwrap();
805-
}
806694
}

0 commit comments

Comments
 (0)