Skip to content

Commit a57bd41

Browse files
committed
Support split for **any** T: AsyncRead + AsyncWrite + Unpin + 'split
1 parent 41bb78c commit a57bd41

File tree

3 files changed

+90
-34
lines changed

3 files changed

+90
-34
lines changed

Cargo.toml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,18 @@ tokio = { optional = true, version = "1", features = ["net", "io-util", "syn
2020
smol = { optional = true, version = "2" }
2121
glommio = { optional = true, version = "0.9" }
2222

23-
futures-util = { optional = true, version = "0.3", default-features = false, features = ["io"] }
23+
futures-util = { version = "0.3", default-features = false, features = ["io"] }
2424

2525
sha1 = { version = "0.10", default-features = false }
2626
base64 = { version = "0.22" }
2727

2828
[features]
29-
rt_tokio = ["__splitref__", "dep:tokio","tokio/net","tokio/io-util","tokio/sync","tokio/time"]
30-
rt_smol = ["__clone__", "dep:smol"]
31-
rt_glommio = ["__splitref__", "dep:glommio", "dep:futures-util"]
29+
rt_tokio = ["__runtime__", "dep:tokio", "futures-util/unstable","futures-util/bilock"]
30+
rt_smol = ["__runtime__", "dep:smol"]
31+
rt_glommio = ["__runtime__", "dep:glommio"]
3232

3333
### internal ###
3434
__runtime__ = []
35-
__splitref__ = ["__runtime__"]
36-
__clone__ = ["__runtime__"]
3735

3836
#####################################
3937
DEBUG = ["tokio?/rt"] ###############

src/connection.rs

Lines changed: 71 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -333,36 +333,76 @@ pub mod split {
333333
}
334334
}
335335

336-
#[cfg(feature="__splitref__")]
337-
const _: () = {
338-
#[cfg(feature="rt_tokio")]
339-
impl<'split> Splitable<'split> for tokio::net::TcpStream {
340-
type ReadHalf = tokio::net::tcp::ReadHalf <'split>;
341-
type WriteHalf = tokio::net::tcp::WriteHalf<'split>;
342-
fn split(&'split mut self) -> (Self::ReadHalf, Self::WriteHalf) {
343-
<tokio::net::TcpStream>::split(self)
344-
}
345-
}
346-
#[cfg(feature="rt_glommio")]
336+
#[cfg(any(feature="rt_smol", feature="rt_glommio"))]
337+
const _: (/* futures-io users */) = {
347338
impl<'split, T: AsyncRead + AsyncWrite + Unpin + 'split> Splitable<'split> for T {
348-
type ReadHalf = futures_util::io::ReadHalf <&'split mut T>;
339+
type ReadHalf = futures_util::io::ReadHalf<&'split mut T>;
349340
type WriteHalf = futures_util::io::WriteHalf<&'split mut T>;
350341
fn split(&'split mut self) -> (Self::ReadHalf, Self::WriteHalf) {
351342
AsyncRead::split(self)
352343
}
353344
}
354345
};
355-
#[cfg(feature="__clone__")]
356-
const _: () = {
357-
impl<'split, C: AsyncRead + AsyncWrite + Unpin + Sized + Clone + 'split> Splitable<'split> for C {
358-
type ReadHalf = Self;
359-
type WriteHalf = &'split mut Self;
346+
347+
#[cfg(any(feature="rt_tokio"))]
348+
const _: (/* tokio::io users */) = {
349+
impl<'split, T: AsyncRead + AsyncWrite + Unpin + 'split> Splitable<'split> for T {
350+
type ReadHalf = TokioIoReadHalf<'split, T>;
351+
type WriteHalf = TokioIoWriteHalf<'split, T>;
360352
fn split(&'split mut self) -> (Self::ReadHalf, Self::WriteHalf) {
361-
(self.clone(), self)
353+
let (r, w) = futures_util::lock::BiLock::new(self);
354+
(TokioIoReadHalf(r), TokioIoWriteHalf(w))
355+
}
356+
}
357+
358+
/*
359+
* based on https://github.com/rust-lang/futures-rs/blob/de9274e655b2fff8c9630a259a473b71a6b79dda/futures-util/src/io/split.rs
360+
*/
361+
pub struct TokioIoReadHalf<'split, T>(futures_util::lock::BiLock<&'split mut T>);
362+
pub struct TokioIoWriteHalf<'split, T>(futures_util::lock::BiLock<&'split mut T>);
363+
fn lock_and_then<T, U, E>(
364+
lock: &futures_util::lock::BiLock<T>,
365+
cx: &mut std::task::Context<'_>,
366+
f: impl FnOnce(std::pin::Pin<&mut T>, &mut std::task::Context<'_>) -> std::task::Poll<Result<U, E>>
367+
) -> std::task::Poll<Result<U, E>> {
368+
let mut l = futures_util::ready!(lock.poll_lock(cx));
369+
f(l.as_pin_mut(), cx)
370+
}
371+
impl<'split, T: tokio::io::AsyncRead + Unpin> tokio::io::AsyncRead for TokioIoReadHalf<'split, T> {
372+
#[inline]
373+
fn poll_read(
374+
self: std::pin::Pin<&mut Self>,
375+
cx: &mut std::task::Context<'_>,
376+
buf: &mut tokio::io::ReadBuf<'_>
377+
) -> std::task::Poll<std::io::Result<()>> {
378+
lock_and_then(&self.0, cx, |l, cx| l.poll_read(cx, buf))
379+
}
380+
}
381+
impl<'split, T: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for TokioIoWriteHalf<'split, T> {
382+
#[inline]
383+
fn poll_write(
384+
self: std::pin::Pin<&mut Self>,
385+
cx: &mut std::task::Context<'_>,
386+
buf: &[u8]
387+
) -> std::task::Poll<std::io::Result<usize>> {
388+
lock_and_then(&self.0, cx, |l, cx| l.poll_write(cx, buf))
389+
}
390+
#[inline]
391+
fn poll_flush(
392+
self: std::pin::Pin<&mut Self>,
393+
cx: &mut std::task::Context<'_>
394+
) -> std::task::Poll<std::io::Result<()>> {
395+
lock_and_then(&self.0, cx, |l, cx| l.poll_flush(cx))
396+
}
397+
fn poll_shutdown(
398+
self: std::pin::Pin<&mut Self>,
399+
cx: &mut std::task::Context<'_>
400+
) -> std::task::Poll<std::io::Result<()>> {
401+
lock_and_then(&self.0, cx, |l, cx| l.poll_shutdown(cx))
362402
}
363403
}
364404
};
365-
405+
366406
pub struct ReadHalf<C: AsyncRead + Unpin> {
367407
__closed__: Arc<RwLock<bool>>,
368408
conn: C,
@@ -433,3 +473,15 @@ pub mod split {
433473
}
434474
}
435475
}
476+
477+
#[cfg(test)]
478+
mod tests {
479+
use super::*;
480+
481+
#[cfg(feature="__runtime__")]
482+
#[test]
483+
fn test_impl_splitable() {
484+
fn assert_impl_splitable<T: split::Splitable<'static>>() {}
485+
assert_impl_splitable::<crate::runtime::net::TcpStream>();
486+
}
487+
}

src/lib.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,34 +32,40 @@
3232
))]
3333
compile_error! {"More than one runtime feature flags can't be activated"}
3434

35-
#[cfg(feature="__runtime__")]
35+
#[cfg(feature="rt_tokio")]
3636
mod runtime {
37-
#[cfg(feature="rt_tokio")]
3837
pub use {
3938
tokio::io::AsyncReadExt as AsyncRead,
4039
tokio::io::AsyncWriteExt as AsyncWrite,
4140
tokio::sync::RwLock,
4241
tokio::time::sleep
4342
};
44-
45-
#[cfg(feature="rt_smol")]
43+
#[cfg(test)]
44+
pub use tokio::net;
45+
}
46+
#[cfg(feature="rt_smol")]
47+
mod runtime {
4648
pub use {
47-
smol::io::AsyncReadExt as AsyncRead,
48-
smol::io::AsyncWriteExt as AsyncWrite,
49+
futures_util::AsyncReadExt as AsyncRead,
50+
futures_util::AsyncWriteExt as AsyncWrite,
4951
smol::lock::RwLock,
5052
};
51-
#[cfg(feature="rt_smol")]
5253
pub async fn sleep(duration: std::time::Duration) {
5354
smol::Timer::after(duration).await;
5455
}
55-
56-
#[cfg(feature="rt_glommio")]
56+
#[cfg(test)]
57+
pub use smol::net;
58+
}
59+
#[cfg(feature="rt_glommio")]
60+
mod runtime {
5761
pub use {
5862
futures_util::AsyncReadExt as AsyncRead,
5963
futures_util::AsyncWriteExt as AsyncWrite,
6064
glommio::sync::RwLock,
6165
glommio::timer::sleep
6266
};
67+
#[cfg(test)]
68+
pub use glommio::net;
6369
}
6470

6571
pub mod message;

0 commit comments

Comments
 (0)