Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit d165deb

Browse files
authored
End multiplexer stream once one of its inputs end. (#2380)
* End multiplexer stream once one of its inputs end. Also add test, that we don't panic once a stream is exhausted. * Don't fuse already fused stream.
1 parent 4866f8e commit d165deb

File tree

2 files changed

+49
-7
lines changed

2 files changed

+49
-7
lines changed

node/network/bridge/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ where
169169
let action = {
170170
let subsystem_next = ctx.recv().fuse();
171171
let mut net_event_next = event_stream.next().fuse();
172-
let mut req_res_event_next = bridge.request_multiplexer.next().fuse();
172+
let mut req_res_event_next = bridge.request_multiplexer.next();
173173
futures::pin_mut!(subsystem_next);
174174

175175
futures::select! {

node/network/bridge/src/multiplexer.rs

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
use std::pin::Pin;
1818

1919
use futures::channel::mpsc;
20-
use futures::stream::Stream;
20+
use futures::stream::{FusedStream, Stream};
2121
use futures::task::{Context, Poll};
2222
use strum::IntoEnumIterator;
2323

@@ -35,12 +35,15 @@ use polkadot_subsystem::messages::AllMessages;
3535
///
3636
/// This multiplexer consumes all request streams and makes them a `Stream` of a single message
3737
/// type, useful for the network bridge to send them via the `Overseer` to other subsystems.
38+
///
39+
/// The resulting stream will end once any of its input ends.
3840
pub struct RequestMultiplexer {
3941
receivers: Vec<(Protocol, mpsc::Receiver<network::IncomingRequest>)>,
4042
next_poll: usize,
4143
}
4244

4345
/// Multiplexing can fail in case of invalid messages.
46+
#[derive(Debug, PartialEq, Eq)]
4447
pub struct RequestMultiplexError {
4548
/// The peer that sent the invalid message.
4649
pub peer: PeerId,
@@ -85,15 +88,17 @@ impl Stream for RequestMultiplexer {
8588
// % safe, because count initialized to len, loop would not be entered if 0, also
8689
// length of receivers is fixed.
8790
let (p, rx): &mut (_, _) = &mut self.receivers[i % len];
91+
// Avoid panic:
92+
if rx.is_terminated() {
93+
// Early return, we don't want to update next_poll.
94+
return Poll::Ready(None);
95+
}
8896
i += 1;
8997
count -= 1;
9098
match Pin::new(rx).poll_next(cx) {
91-
// If at least one stream is pending, then we are not done yet (No
92-
// Ready(None)).
9399
Poll::Pending => result = Poll::Pending,
94-
// Receiver is a fused stream, which allows for this simple handling of
95-
// exhausted ones.
96-
Poll::Ready(None) => {}
100+
// We are done, once a single receiver is done.
101+
Poll::Ready(None) => return Poll::Ready(None),
97102
Poll::Ready(Some(v)) => {
98103
result = Poll::Ready(Some(multiplex_single(*p, v)));
99104
break;
@@ -105,6 +110,17 @@ impl Stream for RequestMultiplexer {
105110
}
106111
}
107112

113+
impl FusedStream for RequestMultiplexer {
114+
fn is_terminated(&self) -> bool {
115+
let len = self.receivers.len();
116+
if len == 0 {
117+
return true;
118+
}
119+
let (_, rx) = &self.receivers[self.next_poll % len];
120+
rx.is_terminated()
121+
}
122+
}
123+
108124
/// Convert a single raw incoming request into a `MultiplexMessage`.
109125
fn multiplex_single(
110126
p: Protocol,
@@ -130,3 +146,29 @@ fn decode_with_peer<Req: Decode>(
130146
) -> Result<Req, RequestMultiplexError> {
131147
Req::decode(&mut payload.as_ref()).map_err(|error| RequestMultiplexError { peer, error })
132148
}
149+
150+
#[cfg(test)]
151+
mod tests {
152+
use futures::prelude::*;
153+
use futures::stream::FusedStream;
154+
155+
use super::RequestMultiplexer;
156+
#[test]
157+
fn check_exhaustion_safety() {
158+
// Create and end streams:
159+
fn drop_configs() -> RequestMultiplexer {
160+
let (multiplexer, _) = RequestMultiplexer::new();
161+
multiplexer
162+
}
163+
let multiplexer = drop_configs();
164+
futures::executor::block_on(async move {
165+
let mut f = multiplexer;
166+
assert!(f.next().await.is_none());
167+
assert!(f.is_terminated());
168+
assert!(f.next().await.is_none());
169+
assert!(f.is_terminated());
170+
assert!(f.next().await.is_none());
171+
assert!(f.is_terminated());
172+
});
173+
}
174+
}

0 commit comments

Comments
 (0)