Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactoring checkpoint
  • Loading branch information
benashford committed Dec 26, 2020
commit 84dc585b4ee54133dacbe2f58869c71fb5636459
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async-net = { version = "1.5", optional = true }
bytes_05 = { package = "bytes", version = "0.5", optional = true }
bytes_10 = { package = "bytes", version = "1.0", optional = true }
log = "^0.4.11"
lwactors = { path = "../lwactors", version = "0.2" }
futures-channel = "^0.3.7"
futures-sink = "^0.3.7"
futures-util = "^0.3.7"
Expand All @@ -34,10 +35,10 @@ tokio_10 = { package = "tokio", version = "1.0", features = ["full"] }
[features]
default = ["tokio10"]

async-std18 = ["bytes_10", "async-net", "async-global-executor", "with_async_std"]
tokio02 = ["bytes_05", "tokio_02", "tokio-util_03", "tokio_codec", "with_tokio"]
tokio10 = ["bytes_10", "tokio_10", "tokio-util_06", "tokio_codec", "with_tokio"]
async-std18 = ["bytes_10", "async-net", "async-global-executor", "lwactors/with_async_global_executor14", "with_async_std"]
tokio02 = ["bytes_05", "tokio_02", "tokio-util_03", "tokio_codec", "lwactors/with_tokio02", "with_tokio"]
tokio10 = ["bytes_10", "tokio_10", "tokio-util_06", "tokio_codec", "lwactors/with_tokio10", "with_tokio"]

tokio_codec = []
with_async_std = []
with_tokio = []
with_tokio = ["lwactors/tokio10"]
8 changes: 4 additions & 4 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
//! in one response.
//! * `pubsub_connect` is used for Redis's PUBSUB functionality.

pub mod connect;

mod builder;
pub mod paired;
pub mod pubsub;
pub mod connect;
pub(crate) mod paired;
pub(crate) mod pubsub;
mod reconnect;

pub use self::{
builder::ConnectionBuilder,
Expand Down
99 changes: 62 additions & 37 deletions src/client/paired.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,11 @@ use futures_util::{

use super::{
connect::{connect_with_auth, RespConnection},
reconnect::{Reconnectable, ReconnectableActions},
ConnectionBuilder,
};

use crate::{
error,
protocol::resp,
reconnect::{reconnect, Reconnect},
task::spawn,
};
use crate::{error, protocol::resp, task::spawn};

/// The state of sending messages to a Redis server
enum SendStatus {
Expand Down Expand Up @@ -194,10 +190,56 @@ impl Future for PairedConnectionInner {
}
}

#[derive(Debug)]
struct PairedConnectionActions {
addr: SocketAddr,
username: Option<Arc<str>>,
password: Option<Arc<str>>,
}

impl ReconnectableActions for PairedConnectionActions {
type WorkPayload = SendPayload;
type ConnectionType = mpsc::UnboundedSender<SendPayload>;

fn do_work(
&self,
con: &Self::ConnectionType,
work: Self::WorkPayload,
) -> Result<(), error::Error> {
con.unbounded_send(work).map_err(|e| e.into())
}

fn do_connection(
&self,
) -> Pin<Box<dyn Future<Output = Result<Self::ConnectionType, error::Error>> + Send>> {
let con_f = inner_conn_fn(self.addr, self.username.clone(), self.password.clone());
Box::pin(con_f)
}
}

/// A shareable and cheaply cloneable connection to which Redis commands can be sent
#[derive(Debug, Clone)]
pub struct PairedConnection {
out_tx_c: Arc<Reconnect<SendPayload, mpsc::UnboundedSender<SendPayload>>>,
out_tx_c: Arc<Reconnectable<PairedConnectionActions>>,
}

impl PairedConnection {
async fn init(
addr: SocketAddr,
username: Option<Arc<str>>,
password: Option<Arc<str>>,
) -> Result<Self, error::Error> {
Ok(PairedConnection {
out_tx_c: Arc::new(
Reconnectable::init(PairedConnectionActions {
addr,
username,
password,
})
.await?,
),
})
}
}

async fn inner_conn_fn(
Expand All @@ -221,19 +263,7 @@ impl ConnectionBuilder {
let username = self.username.clone();
let password = self.password.clone();

let work_fn = |con: &mpsc::UnboundedSender<SendPayload>, act| {
con.unbounded_send(act).map_err(|e| e.into())
};

let conn_fn = move || {
let con_f = inner_conn_fn(addr, username.clone(), password.clone());
Box::pin(con_f) as Pin<Box<dyn Future<Output = Result<_, error::Error>> + Send + Sync>>
};

let reconnecting_con = reconnect(work_fn, conn_fn);
reconnecting_con.map_ok(|con| PairedConnection {
out_tx_c: Arc::new(con),
})
PairedConnection::init(addr, username, password)
}
}

Expand Down Expand Up @@ -265,37 +295,32 @@ impl PairedConnection {
/// Behind the scenes the message is queued up and sent to Redis asynchronously before the
/// future is realised. As such, it is guaranteed that messages are sent in the same order
/// that `send` is called.
pub fn send<T>(&self, msg: resp::RespValue) -> impl Future<Output = Result<T, error::Error>>
pub async fn send<T>(&self, msg: resp::RespValue) -> Result<T, error::Error>
where
T: resp::FromResp,
{
match &msg {
resp::RespValue::Array(_) => (),
_ => {
return future::Either::Right(future::ready(Err(error::internal(
"Command must be a RespValue::Array",
))));
return Err(error::internal("Command must be a RespValue::Array"));
}
}

let (tx, rx) = oneshot::channel();
match self.out_tx_c.do_work((msg, tx)) {
Ok(()) => future::Either::Left(async move {
match rx.await {
Ok(v) => Ok(T::from_resp(v)?),
Err(_) => Err(error::internal(
"Connection closed before response received",
)),
}
}),
Err(e) => future::Either::Right(future::ready(Err(e))),
self.out_tx_c.do_work((msg, tx)).await;

match rx.await {
Ok(v) => Ok(T::from_resp(v)?),
Err(_) => Err(error::internal(
"Connection closed before response received",
)),
}
}

pub fn send_and_forget(&self, msg: resp::RespValue) {
let send_f = self.send::<resp::RespValue>(msg);
let forget_f = async {
if let Err(e) = send_f.await {
let pc = self.clone();
let forget_f = async move {
if let Err(e) = pc.send::<resp::RespValue>(msg).await {
log::error!("Error in send_and_forget: {}", e);
}
};
Expand Down
179 changes: 179 additions & 0 deletions src/client/reconnect/holder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Copyright 2020 Ben Ashford
*
* Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
* http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
* <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
* option. This file may not be copied, modified, or distributed
* except according to those terms.
*/

use std::{
future::Future,
sync::Arc,
time::{Duration, Instant},
};

use lwactors::{actor, Action, ActorSender};

use crate::error;

#[derive(Debug)]
pub(crate) struct ConnectionHolder<T>
where
T: Send,
{
queue: ActorSender<ConnectionHolderAction<T>, ConnectionHolderResult<T>, error::Error>,
}

impl<T> ConnectionHolder<T>
where
T: Send + Sync + 'static,
{
pub(crate) fn new(t: T) -> Self {
ConnectionHolder {
queue: actor(ConnectionHolderState::new(t)),
}
}

pub(crate) async fn get_connection(&self) -> Result<GetConnectionState<T>, error::Error> {
match self
.queue
.invoke(ConnectionHolderAction::GetConnection)
.await?
{
ConnectionHolderResult::GetConnection(get_connection_state) => Ok(get_connection_state),
_ => panic!("Wrong response"),
}
}

pub(crate) async fn set_connection(&self, con: T) -> Result<(), error::Error> {
match self
.queue
.invoke(ConnectionHolderAction::SetConnection(con))
.await?
{
ConnectionHolderResult::SetConnection => Ok(()),
_ => panic!("Wrong response"),
}
}

pub(crate) async fn connection_dropped(&self) -> Result<(), error::Error> {
match self
.queue
.invoke(ConnectionHolderAction::ConnectionDropped)
.await?
{
ConnectionHolderResult::ConnectionDropped => Ok(()),
_ => panic!("Wrong response"),
}
}
}

impl<T> Clone for ConnectionHolder<T>
where
T: Send,
{
fn clone(&self) -> Self {
ConnectionHolder {
queue: self.queue.clone(),
}
}
}

const MAX_CONNECTION_DUR: Duration = Duration::from_secs(10);

#[derive(Debug)]
enum ConnectionHolderAction<T> {
GetConnection,
SetConnection(T),
ConnectionDropped,
}

impl<T> Action for ConnectionHolderAction<T> {
type State = ConnectionHolderState<T>;
type Result = ConnectionHolderResult<T>;
type Error = error::Error;

fn act(self, state: &mut Self::State) -> Result<Self::Result, Self::Error> {
let res = match self {
ConnectionHolderAction::GetConnection => {
let gcs = match state {
ConnectionHolderState::NotConnected => {
*state = ConnectionHolderState::Connecting(Instant::now());
GetConnectionState::NotConnected
}
ConnectionHolderState::Connected(ref con) => {
GetConnectionState::Connected(con.clone())
}
ConnectionHolderState::Connecting(ref mut inst) => {
let now = Instant::now();
let dur = now - *inst;
if dur > MAX_CONNECTION_DUR {
*inst = now;
GetConnectionState::NotConnected
} else {
GetConnectionState::Connecting
}
}
};
ConnectionHolderResult::GetConnection(gcs)
}
ConnectionHolderAction::SetConnection(con) => {
match state {
ConnectionHolderState::NotConnected => {
log::warn!("Cannot set state when in NotConnected state");
}
ConnectionHolderState::Connected(_) => {
log::warn!("Cannot set state when in Connected state");
}
ConnectionHolderState::Connecting(_) => {
*state = ConnectionHolderState::Connected(Arc::new(con))
}
}
ConnectionHolderResult::SetConnection
}
ConnectionHolderAction::ConnectionDropped => {
match state {
ConnectionHolderState::NotConnected => (),
ConnectionHolderState::Connected(_) => {
*state = ConnectionHolderState::NotConnected
}
ConnectionHolderState::Connecting(_) => {
log::warn!("Connection already re-connecting...")
}
}
ConnectionHolderResult::ConnectionDropped
}
};

Ok(res)
}
}

#[derive(Debug)]
enum ConnectionHolderState<T> {
NotConnected,
Connecting(Instant),
Connected(Arc<T>),
}

impl<T> ConnectionHolderState<T> {
fn new(t: T) -> Self {
ConnectionHolderState::Connected(Arc::new(t))
}
}

#[derive(Debug)]
pub(crate) enum ConnectionHolderResult<T> {
GetConnection(GetConnectionState<T>),
SetConnection,
ConnectionDropped,
}

#[derive(Debug)]
pub(crate) enum GetConnectionState<T> {
NotConnected,
Connecting,
Connected(Arc<T>),
}
Loading