-
Notifications
You must be signed in to change notification settings - Fork 205
Closed
Description
We have noticed the API for SubscriptionSink is a bit messy to use if you want to await for stream to be sent via the SubscriptionSink if the connection has already been closed it might leak until the stream/fut "resolves" because the stream itself doesn't know anything of the connection state.
For example given that we have the following impl:
impl MyServerTrait for ()
{
fn subscribe_to_something(&self, mut sink: SubscriptionSink) -> RpcResult<()> {
// assume this stream lives forever and produces items "rarely"...
let stream = create_stream();
let fut = async move {
stream
.take_while(|sc| future::ready(sink.send(sc).map_or(false, |_| true)))
.for_each(|_| future::ready(()))
.await
};
tokio::spawn(fut);
}
}If connection has already this case the tokio task will live until sink.send fails.
To be on the safe-side you would need to something like:
impl MyServerTrait for () {
fn subscribe_to_something(&self, mut sink: SubscriptionSink) -> RpcResult<()> {
// assume this stream lives forever and produces items "rarely"...
let stream = create_stream();
let fut = async move {
loop {
let timeout = tokio::time::sleep(std::time::Duration::from_secs(60));
tokio::pin!(timeout);
tokio::select! {
Some(item) = stream.next() => {
if let Err(e) = sink.send(&item) {
break;
}
},
_ = &mut timeout => {
if sink.is_closed() {
break;
}
}
else => break,
};
}
};
tokio::spawn(fut);
}
}Metadata
Metadata
Assignees
Labels
No labels