Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove one layer of unnecessary redirection
  • Loading branch information
benashford committed Jan 3, 2021
commit 7a5490fc27b1aeb0cc36a3893bc97c14e5e9505f
27 changes: 11 additions & 16 deletions src/client/paired.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use futures_util::{

use super::{
connect::{connect_with_auth, RespConnection},
reconnect::{ActionWork, Reconnectable, ReconnectableActions},
reconnect::{ActionWork, Reconnectable, ReconnectableActions, ReconnectableConnectionFuture},
ConnectionBuilder,
};

Expand Down Expand Up @@ -190,21 +190,11 @@ impl Future for PairedConnectionInner {
}
}

#[derive(Debug)]
struct PairedConnectionWork {
payload: SendPayload,
}

impl ActionWork for PairedConnectionWork {
type WorkPayload = SendPayload;
impl ActionWork for SendPayload {
type ConnectionType = mpsc::UnboundedSender<SendPayload>;

fn init(payload: Self::WorkPayload) -> Self {
PairedConnectionWork { payload }
}

fn call(self, con: &Self::ConnectionType) -> Result<(), error::Error> {
con.unbounded_send(self.payload).map_err(|e| e.into())
con.unbounded_send(self).map_err(|e| e.into())
}
}

Expand All @@ -217,12 +207,10 @@ struct PairedConnectionActions {

impl ReconnectableActions for PairedConnectionActions {
type WorkPayload = SendPayload;
type WorkFn = PairedConnectionWork;
type ConnectionType = mpsc::UnboundedSender<SendPayload>;

fn do_connection(
&self,
) -> Pin<Box<dyn Future<Output = Result<Self::ConnectionType, error::Error>> + Send>> {
) -> ReconnectableConnectionFuture<mpsc::UnboundedSender<SendPayload>, error::Error> {
let con_f = inner_conn_fn(self.addr, self.username.clone(), self.password.clone());
Box::pin(con_f)
}
Expand Down Expand Up @@ -321,6 +309,13 @@ impl PairedConnection {
})
}

// pub fn send_batch(
// &self,
// msgs: Vec<resp::RespValue>,
// ) -> impl Future<Output = Result<Vec<resp::RespValue>, error::Error>> {

// }

pub fn send_and_forget(&self, msg: resp::RespValue) {
let _ = self.send::<resp::RespValue>(msg);
}
Expand Down
22 changes: 6 additions & 16 deletions src/client/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use crate::{
task::spawn,
};

use super::reconnect::{ActionWork, Reconnectable, ReconnectableActions};
use super::reconnect::{
ActionWork, Reconnectable, ReconnectableActions, ReconnectableConnectionFuture,
};

#[derive(Debug)]
enum PubsubEvent {
Expand Down Expand Up @@ -322,21 +324,11 @@ impl Future for PubsubConnectionInner {
}
}

#[derive(Debug)]
struct PubsubWork {
payload: PubsubEvent,
}

impl ActionWork for PubsubWork {
type WorkPayload = PubsubEvent;
impl ActionWork for PubsubEvent {
type ConnectionType = mpsc::UnboundedSender<PubsubEvent>;

fn init(payload: Self::WorkPayload) -> Self {
PubsubWork { payload }
}

fn call(self, con: &Self::ConnectionType) -> Result<(), error::Error> {
con.unbounded_send(self.payload).map_err(|e| e.into())
con.unbounded_send(self).map_err(|e| e.into())
}
}

Expand All @@ -351,12 +343,10 @@ struct PubsubConnectionActions {

impl ReconnectableActions for PubsubConnectionActions {
type WorkPayload = PubsubEvent;
type WorkFn = PubsubWork;
type ConnectionType = mpsc::UnboundedSender<PubsubEvent>;

fn do_connection(
&self,
) -> Pin<Box<dyn Future<Output = Result<Self::ConnectionType, error::Error>> + Send>> {
) -> ReconnectableConnectionFuture<mpsc::UnboundedSender<PubsubEvent>, error::Error> {
let con_f = inner_conn_fn(self.addr, self.username.clone(), self.password.clone());
Box::pin(con_f)
}
Expand Down
39 changes: 20 additions & 19 deletions src/client/reconnect/holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,19 @@ use super::ActionWork;

/// A standalone actor which holds a Redis connection
#[derive(Debug)]
pub(crate) struct ConnectionHolder<T, F> {
queue: ActorSender<
ConnectionHolderAction<T, F>,
ConnectionHolderResult<error::Error>,
error::Error,
>,
pub(crate) struct ConnectionHolder<F>
where
F: ActionWork,
{
queue:
ActorSender<ConnectionHolderAction<F>, ConnectionHolderResult<error::Error>, error::Error>,
}

impl<T, F> ConnectionHolder<T, F>
impl<F> ConnectionHolder<F>
where
T: Send + Sync + 'static,
F: ActionWork<ConnectionType = T> + Send + 'static,
F: ActionWork + Send + 'static,
{
pub(crate) fn new(t: T) -> Self {
pub(crate) fn new(t: F::ConnectionType) -> Self {
ConnectionHolder {
queue: actor(ConnectionHolderState::new(t)),
}
Expand All @@ -62,7 +61,7 @@ where
}

/// Set a new connection if previously advised to attempt re-connection.
pub(crate) async fn set_connection(&self, con: T) -> Result<(), error::Error> {
pub(crate) async fn set_connection(&self, con: F::ConnectionType) -> Result<(), error::Error> {
match self
.queue
.invoke(ConnectionHolderAction::SetConnection(con))
Expand All @@ -85,9 +84,9 @@ where
}
}

impl<T, F> Clone for ConnectionHolder<T, F>
impl<F> Clone for ConnectionHolder<F>
where
T: Send,
F: ActionWork,
{
fn clone(&self) -> Self {
ConnectionHolder {
Expand All @@ -100,18 +99,20 @@ where
const MAX_CONNECTION_DUR: Duration = Duration::from_secs(10);

#[derive(Debug)]
enum ConnectionHolderAction<T, F> {
enum ConnectionHolderAction<F>
where
F: ActionWork,
{
DoWork(F),
SetConnection(T),
SetConnection(F::ConnectionType),
SetConnectionFailed,
}

impl<T, F> Action for ConnectionHolderAction<T, F>
impl<F> Action for ConnectionHolderAction<F>
where
T: Send,
F: ActionWork<ConnectionType = T>,
F: ActionWork,
{
type State = ConnectionHolderState<T>;
type State = ConnectionHolderState<F::ConnectionType>;
type Result = ConnectionHolderResult<error::Error>;
type Error = error::Error;

Expand Down
25 changes: 13 additions & 12 deletions src/client/reconnect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,32 @@

mod holder;

use std::{future::Future, pin::Pin};
use std::{fmt, future::Future, pin::Pin};

use crate::{error, task::spawn};

use holder::ConnectionHolder;

/// A trait to be implemented by the chunks of work that are sent to a Redis connection
pub(crate) trait ActionWork {
type WorkPayload;
type ConnectionType;

fn init(payload: Self::WorkPayload) -> Self;
type ConnectionType: Send + fmt::Debug;

fn call(self, con: &Self::ConnectionType) -> Result<(), error::Error>;
}

pub(crate) type ReconnectableConnectionFuture<C, E> =
Pin<Box<dyn Future<Output = Result<C, E>> + Send>>;

/// A trait to be implemented to allow a connection to be re-established should it be lost
pub(crate) trait ReconnectableActions {
type WorkPayload;
type WorkFn: ActionWork<WorkPayload = Self::WorkPayload, ConnectionType = Self::ConnectionType>;
type ConnectionType: Send + Sync + 'static;
type WorkPayload: ActionWork + 'static;

fn do_connection(
&self,
) -> Pin<Box<dyn Future<Output = Result<Self::ConnectionType, error::Error>> + Send>>;
) -> ReconnectableConnectionFuture<
<Self::WorkPayload as ActionWork>::ConnectionType,
error::Error,
>;
}

/// A wrapper around a Redis connection that will automatically try and re-connect should the
Expand All @@ -44,14 +45,14 @@ pub(crate) struct Reconnectable<A>
where
A: ReconnectableActions,
{
con: ConnectionHolder<A::ConnectionType, A::WorkFn>,
con: ConnectionHolder<A::WorkPayload>,
actions: A,
}

impl<A> Reconnectable<A>
where
A: ReconnectableActions,
A::WorkFn: Send + 'static,
A::WorkPayload: Send,
{
pub(crate) async fn init(actions: A) -> Result<Self, error::Error> {
let t = actions.do_connection().await?;
Expand All @@ -65,7 +66,7 @@ where
&self,
work: A::WorkPayload,
) -> impl Future<Output = Result<(), error::Error>> + '_ {
let work_f = self.con.do_work(A::WorkFn::init(work));
let work_f = self.con.do_work(work);

async move {
if work_f.await? {
Expand Down