Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactoring checkpoint
  • Loading branch information
benashford committed Dec 25, 2020
commit 2f3380c6469fc06716733c8a4712bbb53d90d402
12 changes: 9 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ keywords = ["redis", "tokio"]
edition = "2018"

[dependencies]
async-global-executor = { version = "1.4", optional = true }
async-net = { version = "1.5", optional = true }
bytes_05 = { package = "bytes", version = "0.5", optional = true }
bytes_10 = { package = "bytes", version = "1.0", optional = true }
log = "^0.4.11"
Expand All @@ -25,13 +27,17 @@ tokio-util_06 = { package = "tokio-util", version = "0.6", features = ["codec"],
[dev-dependencies]
env_logger = "^0.8.1"
futures = "^0.3.7"
async-std = "1.8"
tokio_02 = { package = "tokio", version = "0.2", features = ["full"] }
tokio_10 = { package = "tokio", version = "1.0", features = ["full"] }

[features]
default = ["tokio10"]

tokio02 = ["bytes_05", "tokio_02", "tokio-util_03", "tokio_codec"]
tokio10 = ["bytes_10", "tokio_10", "tokio-util_06", "tokio_codec"]
async-std18 = ["bytes_10", "async-net", "async-global-executor", "with_async_std"]
tokio02 = ["bytes_05", "tokio_02", "tokio-util_03", "tokio_codec", "with_tokio"]
tokio10 = ["bytes_10", "tokio_10", "tokio-util_06", "tokio_codec", "with_tokio"]

tokio_codec = []
tokio_codec = []
with_async_std = []
with_tokio = []
57 changes: 57 additions & 0 deletions src/client/connect/async_std.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2020 Ben Ashford
*
* Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
* http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
* <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
* option. This file may not be copied, modified, or distributed
* except according to those terms.
*/

use std::pin::Pin;
use std::task::{Context, Poll};

use async_net::TcpStream;

use futures_sink::Sink;
use futures_util::stream::Stream;

use crate::{error::Error, protocol::resp::RespValue};

pub(crate) struct RespTcpStream {
tcp_stream: TcpStream,
}

impl RespTcpStream {
pub(crate) fn new(tcp_stream: TcpStream) -> Self {
RespTcpStream { tcp_stream }
}
}

impl Sink<RespValue> for RespTcpStream {
type Error = Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
}

fn start_send(self: Pin<&mut Self>, item: RespValue) -> Result<(), Self::Error> {
todo!()
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
}
}

impl Stream for RespTcpStream {
type Item = Result<RespValue, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!()
}
}
25 changes: 21 additions & 4 deletions src/client/connect.rs → src/client/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,29 @@
* except according to those terms.
*/

#[cfg(feature = "with_async_std")]
mod async_std;

use std::net::SocketAddr;

#[cfg(feature = "with_async_std")]
use async_net::TcpStream;

use futures_util::{SinkExt, StreamExt};

#[cfg(feature = "with_tokio")]
use tokio::net::TcpStream;
#[cfg(feature = "with_tokio")]
use tokio_util::codec::{Decoder, Framed};

use crate::{
error,
protocol::{FromResp, RespCodec},
};
#[cfg(feature = "tokio_codec")]
use crate::protocol::RespCodec;
use crate::{error, protocol::FromResp};

#[cfg(feature = "with_tokio")]
pub type RespConnection = Framed<TcpStream, RespCodec>;
#[cfg(feature = "with_async_std")]
pub type RespConnection = async_std::RespTcpStream;

/// Connect to a Redis server and return a Future that resolves to a
/// `RespConnection` for reading and writing asynchronously.
Expand All @@ -37,11 +47,18 @@ pub type RespConnection = Framed<TcpStream, RespCodec>;
///
/// But since most Redis usages involve issue commands that result in one
/// single result, this library also implements `paired_connect`.
#[cfg(feature = "with_tokio")]
pub async fn connect(addr: &SocketAddr) -> Result<RespConnection, error::Error> {
let tcp_stream = TcpStream::connect(addr).await?;
Ok(RespCodec.framed(tcp_stream))
}

#[cfg(feature = "with_async_std")]
pub async fn connect(addr: &SocketAddr) -> Result<RespConnection, error::Error> {
let tcp_stream = TcpStream::connect(addr).await?;
Ok(RespConnection::new(tcp_stream))
}

pub async fn connect_with_auth(
addr: &SocketAddr,
username: Option<&str>,
Expand Down
6 changes: 4 additions & 2 deletions src/client/paired.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{
error,
protocol::resp,
reconnect::{reconnect, Reconnect},
task::spawn,
};

/// The state of sending messages to a Redis server
Expand Down Expand Up @@ -209,7 +210,8 @@ async fn inner_conn_fn(
let connection = connect_with_auth(&addr, username, password).await?;
let (out_tx, out_rx) = mpsc::unbounded();
let paired_connection_inner = PairedConnectionInner::new(connection, out_rx);
tokio::spawn(paired_connection_inner);
spawn(paired_connection_inner);

Ok(out_tx)
}

Expand Down Expand Up @@ -297,7 +299,7 @@ impl PairedConnection {
log::error!("Error in send_and_forget: {}", e);
}
};
tokio::spawn(forget_f);
spawn(forget_f);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/client/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
error::{self, ConnectionReason},
protocol::resp::{self, FromResp},
reconnect::{reconnect, Reconnect},
task::spawn,
};

#[derive(Debug)]
Expand Down Expand Up @@ -339,7 +340,7 @@ async fn inner_conn_fn(

let connection = connect_with_auth(&addr, username, password).await?;
let (out_tx, out_rx) = mpsc::unbounded();
tokio::spawn(async {
spawn(async {
match PubsubConnectionInner::new(connection, out_rx).await {
Ok(_) => (),
Err(e) => log::error!("Pub/Sub error: {:?}", e),
Expand Down
5 changes: 2 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ extern crate tokio_02 as tokio;
#[cfg(feature = "tokio02")]
extern crate tokio_util_03 as tokio_util;

#[cfg(feature = "tokio10")]
#[cfg(feature = "bytes_10")]
extern crate bytes_10 as bytes;
#[cfg(feature = "tokio10")]
extern crate tokio_10 as tokio;
Expand All @@ -71,7 +71,6 @@ extern crate tokio_util_06 as tokio_util;
pub mod protocol;

pub mod client;

pub mod error;

pub(crate) mod reconnect;
mod task;
1 change: 1 addition & 0 deletions src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub(crate) mod resp;

mod codec;

#[cfg(feature = "tokio_codec")]
pub(crate) use codec::tokio::RespCodec;

pub use resp::{FromResp, RespValue};
13 changes: 7 additions & 6 deletions src/reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ use futures_util::{
TryFutureExt,
};

use tokio::time::timeout;

use crate::error::{self, ConnectionReason};
use crate::{
error::{self, ConnectionReason},
task::{spawn, timeout},
};

type WorkFn<T, A> = dyn Fn(&T, A) -> Result<(), error::Error> + Send + Sync;
type ConnFn<T> =
Expand Down Expand Up @@ -183,8 +184,8 @@ where

let connection_f = async move {
let connection = match timeout(CONNECTION_TIMEOUT, (reconnect.0.conn_fn)()).await {
Ok(con_r) => con_r,
Err(_) => Err(error::internal(format!(
Some(con_r) => con_r,
None => Err(error::internal(format!(
"Connection timed-out after {} seconds",
CONNECTION_TIMEOUT_SECONDS
))),
Expand Down Expand Up @@ -220,6 +221,6 @@ where
.reconnect(state)
.map_err(|e| log::error!("Error asynchronously reconnecting: {}", e));

tokio::spawn(reconnect_f);
spawn(reconnect_f);
}
}
48 changes: 48 additions & 0 deletions src/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2020 Ben Ashford
*
* Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
* http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
* <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
* option. This file may not be copied, modified, or distributed
* except according to those terms.
*/

use std::future::Future;
use std::time::Duration;

#[cfg(feature = "with_tokio")]
pub(crate) fn spawn<F>(f: F) -> tokio::task::JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send,
{
tokio::spawn(f)
}

#[cfg(feature = "with_async_std")]
pub(crate) fn spawn<F>(f: F) -> () {
async_global_executor::spawn(f)
}

#[cfg(feature = "with_tokio")]
pub(crate) fn timeout<T>(duration: Duration, future: T) -> impl Future<Output = Option<T::Output>>
where
T: Future,
{
let timeout = tokio::time::timeout(duration, future);
async {
match timeout.await {
Ok(t) => Some(t),
Err(_) => None,
}
}
}

#[cfg(feature = "with_async_std")]
pub(crate) fn timeout<T>(duration: Duration, future: T) -> impl Future<Output = Option<T::Output>>
where
T: Future,
{
todo!()
}