Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Working with AsyncStd
  • Loading branch information
benashford committed Dec 27, 2020
commit c1eaf62663728e81c5e3b58a7db15a0cabacd5c2
11 changes: 11 additions & 0 deletions examples/psubscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,19 @@ use futures::StreamExt;

use redis_async::{client, protocol::FromResp};

#[cfg(feature = "with_tokio")]
#[tokio::main]
async fn main() {
do_main().await;
}

#[cfg(feature = "with_async_std")]
#[async_std::main]
async fn main() {
do_main().await;
}

async fn do_main() {
env_logger::init();
let topic = env::args().nth(1).unwrap_or_else(|| "test.*".to_string());
let addr = env::args()
Expand Down
12 changes: 11 additions & 1 deletion examples/realistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,19 @@ use futures_util::future;

use redis_async::{client, resp_array};

/// An artificial "realistic" non-trivial example to demonstrate usage
#[cfg(feature = "with_tokio")]
#[tokio::main]
async fn main() {
do_main().await;
}

#[cfg(feature = "with_async_std")]
#[async_std::main]
async fn main() {
do_main().await;
}

async fn do_main() {
// Create some completely arbitrary "test data"
let test_data_size = 10;
let test_data: Vec<_> = (0..test_data_size).map(|x| (x, x.to_string())).collect();
Expand Down
11 changes: 11 additions & 0 deletions examples/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,19 @@ use futures::StreamExt;

use redis_async::{client, protocol::FromResp};

#[cfg(feature = "with_tokio")]
#[tokio::main]
async fn main() {
do_main().await;
}

#[cfg(feature = "with_async_std")]
#[async_std::main]
async fn main() {
do_main().await;
}

async fn do_main() {
env_logger::init();
let topic = env::args()
.nth(1)
Expand Down
122 changes: 114 additions & 8 deletions src/client/connect/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,151 @@ use std::task::{Context, Poll};

use async_net::TcpStream;

use bytes::{Buf, BytesMut};

use futures_sink::Sink;
use futures_util::stream::Stream;
use futures_util::{
io::{AsyncRead, AsyncWrite},
stream::Stream,
};

use crate::{
error::Error,
protocol::{
codec::{decode, encode},
resp::RespValue,
},
};

use crate::{error::Error, protocol::resp::RespValue};
const TCP_PACKET_SIZE: usize = 1500;
const DEFAULT_BUF_LEN: usize = TCP_PACKET_SIZE;
const MAX_PACKETS: usize = 100;
const MAX_BUF_LEN: usize = TCP_PACKET_SIZE * MAX_PACKETS;
const BUF_INC_STEP: usize = TCP_PACKET_SIZE * 4;

pub struct RespTcpStream {
tcp_stream: TcpStream,
out_buf: BytesMut,
in_buf: BytesMut,
}

impl RespTcpStream {
pub(crate) fn new(tcp_stream: TcpStream) -> Self {
RespTcpStream { tcp_stream }
RespTcpStream {
tcp_stream,
out_buf: BytesMut::with_capacity(DEFAULT_BUF_LEN),
in_buf: BytesMut::with_capacity(DEFAULT_BUF_LEN),
}
}
}

impl RespTcpStream {
fn attempt_push(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
loop {
match Pin::new(&mut self.tcp_stream).poll_write(cx, &self.out_buf) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(0)) => return Poll::Ready(Ok(())),
Poll::Ready(Ok(bytes_written)) => {
self.out_buf.advance(bytes_written);
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
}
}
}

fn pull_into_buffer(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
self.in_buf.reserve(BUF_INC_STEP);
let mut old_len = self.in_buf.len();
let new_len = old_len + BUF_INC_STEP;
unsafe {
self.in_buf.set_len(new_len);
}
let result = match Pin::new(&mut self.tcp_stream)
.poll_read(cx, &mut self.in_buf[old_len..new_len])
{
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(bytes_read)) => {
old_len += bytes_read;
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
};
unsafe {
self.in_buf.set_len(old_len);
}
result
}
}

impl Sink<RespValue> for RespTcpStream {
type Error = Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
let mut_self = self.get_mut();

if mut_self.out_buf.len() == 0 {
return Poll::Ready(Ok(()));
}

if let Poll::Ready(Err(e)) = mut_self.attempt_push(cx) {
return Poll::Ready(Err(e));
}

if mut_self.out_buf.len() >= MAX_BUF_LEN {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}

fn start_send(self: Pin<&mut Self>, item: RespValue) -> Result<(), Self::Error> {
todo!()
let mut_self = self.get_mut();
encode(item, &mut mut_self.out_buf);

Ok(())
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
self.get_mut().attempt_push(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
let mut_self = self.get_mut();
while mut_self.out_buf.len() > 0 {
match mut_self.attempt_push(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Ready(Ok(())) => (),
}
}

match Pin::new(&mut mut_self.tcp_stream).poll_close(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
}
}
}

impl Stream for RespTcpStream {
type Item = Result<RespValue, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!()
let mut_self = self.get_mut();
loop {
// Result<Option<(usize, RespValue)>, Error>
match decode(&mut mut_self.in_buf, 0) {
Ok(Some((pos, thing))) => {
mut_self.in_buf.advance(pos);
return Poll::Ready(Some(Ok(thing)));
}
Ok(None) => match mut_self.pull_into_buffer(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(())) => (),
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
},
Err(e) => return Poll::Ready(Some(Err(e))),
}
}
}
}
6 changes: 6 additions & 0 deletions src/protocol/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ mod encode;

#[cfg(feature = "tokio_codec")]
pub(crate) mod tokio;

#[cfg(feature = "with_async_std")]
pub(crate) use encode::encode;

#[cfg(feature = "with_async_std")]
pub(crate) use decode::decode;
2 changes: 1 addition & 1 deletion src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#[macro_use]
pub(crate) mod resp;

mod codec;
pub(crate) mod codec;

#[cfg(feature = "tokio_codec")]
pub(crate) use codec::tokio::RespCodec;
Expand Down