Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added comments and minor refactoring
  • Loading branch information
benashford committed Jan 2, 2021
commit 9a2ec7a50d6ecec1bdc1246e8e96b9cea37f3aa6
3 changes: 3 additions & 0 deletions src/client/connect/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
* except according to those terms.
*/

//! Experimental support for a non-Tokio runtime. This hasn't been tested as much as Tokio, so
//! should be considered an unstable feature for the time being.

use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down
1 change: 1 addition & 0 deletions src/client/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub async fn connect(addr: &SocketAddr) -> Result<RespConnection, error::Error>
Ok(RespConnection::new(tcp_stream))
}

/// Connect with optional authentication
pub async fn connect_with_auth(
addr: &SocketAddr,
username: Option<&str>,
Expand Down
4 changes: 2 additions & 2 deletions src/client/paired.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ enum ReceiveStatus {
type Responder = oneshot::Sender<resp::RespValue>;
type SendPayload = (resp::RespValue, Responder);

// /// The PairedConnectionInner is a spawned future that is responsible for pairing commands and
// /// results onto a `RespConnection` that is otherwise unpaired
/// The PairedConnectionInner is a spawned future that is responsible for pairing commands and
/// results onto a `RespConnection` that is otherwise unpaired
struct PairedConnectionInner {
/// The underlying connection that talks the RESP protocol
connection: RespConnection,
Expand Down
44 changes: 44 additions & 0 deletions src/client/reconnect/holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::error;

use super::ActionWork;

/// A standalone actor which holds a Redis connection
#[derive(Debug)]
pub(crate) struct ConnectionHolder<T, F> {
queue: ActorSender<
Expand All @@ -41,6 +42,11 @@ where
}
}

/// Perform a chunk of work on the available connection, if available.
///
/// Returns a boolean. True means the work was done. False means the work was not done, and the
/// caller must attempt a reconnection. Any other failure will return an error, the caller
/// should not attempt a reconnection.
pub(crate) fn do_work(&self, f: F) -> impl Future<Output = Result<bool, error::Error>> {
self.queue
.invoke(ConnectionHolderAction::DoWork(f))
Expand All @@ -55,6 +61,7 @@ where
})
}

/// Set a new connection if previously advised to attempt re-connection.
pub(crate) async fn set_connection(&self, con: T) -> Result<(), error::Error> {
match self
.queue
Expand All @@ -65,6 +72,17 @@ where
_ => panic!("Wrong response"),
}
}

pub(crate) async fn set_connection_failed(&self) -> Result<(), error::Error> {
match self
.queue
.invoke(ConnectionHolderAction::SetConnectionFailed)
.await?
{
ConnectionHolderResult::SetConnectionFailed => Ok(()),
_ => panic!("Wrong response"),
}
}
}

impl<T, F> Clone for ConnectionHolder<T, F>
Expand All @@ -78,12 +96,14 @@ where
}
}

// TODO - should probably be configurable...
const MAX_CONNECTION_DUR: Duration = Duration::from_secs(10);

#[derive(Debug)]
enum ConnectionHolderAction<T, F> {
DoWork(F),
SetConnection(T),
SetConnectionFailed,
}

impl<T, F> Action for ConnectionHolderAction<T, F>
Expand All @@ -110,6 +130,10 @@ where
}
}
},
ConnectionHolderState::NotConnected => {
*state = ConnectionHolderState::Connecting(Instant::now());
DoWorkState::NotConnected
}
ConnectionHolderState::Connecting(ref mut inst) => {
let now = Instant::now();
let dur = now - *inst;
Expand All @@ -131,9 +155,27 @@ where
ConnectionHolderState::Connecting(_) => {
*state = ConnectionHolderState::Connected(con)
}
ConnectionHolderState::NotConnected => {
log::warn!("This is a valid, but rare sequence of events");
*state = ConnectionHolderState::Connected(con)
}
}
ConnectionHolderResult::SetConnection
}
ConnectionHolderAction::SetConnectionFailed => {
match state {
ConnectionHolderState::Connected(_) => {
log::warn!("Cannot set state when in Connected state");
}
ConnectionHolderState::Connecting(_) => {
*state = ConnectionHolderState::NotConnected
}
ConnectionHolderState::NotConnected => {
log::warn!("Suspicious series of events...");
}
}
ConnectionHolderResult::SetConnectionFailed
}
};

Ok(res)
Expand All @@ -147,6 +189,7 @@ where
{
Connecting(Instant),
Connected(T),
NotConnected,
}

impl<T> ConnectionHolderState<T>
Expand All @@ -162,6 +205,7 @@ where
pub(crate) enum ConnectionHolderResult<E> {
DoWork(DoWorkState<E>),
SetConnection,
SetConnectionFailed,
}

#[derive(Debug)]
Expand Down
12 changes: 11 additions & 1 deletion src/client/reconnect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{error, task::spawn};

use holder::ConnectionHolder;

/// A trait to be implemented by the chunks of work that are sent to a Redis connection
pub(crate) trait ActionWork {
type WorkPayload;
type ConnectionType;
Expand All @@ -25,6 +26,7 @@ pub(crate) trait ActionWork {
fn call(self, con: &Self::ConnectionType) -> Result<(), error::Error>;
}

/// A trait to be implemented to allow a connection to be re-established should it be lost
pub(crate) trait ReconnectableActions {
type WorkPayload;
type WorkFn: ActionWork<WorkPayload = Self::WorkPayload, ConnectionType = Self::ConnectionType>;
Expand All @@ -35,6 +37,8 @@ pub(crate) trait ReconnectableActions {
) -> Pin<Box<dyn Future<Output = Result<Self::ConnectionType, error::Error>> + Send>>;
}

/// A wrapper around a Redis connection that will automatically try and re-connect should the
/// connection be lost
#[derive(Debug)]
pub(crate) struct Reconnectable<A>
where
Expand Down Expand Up @@ -84,7 +88,13 @@ where
Ok(()) => (),
Err(e) => log::warn!("Couldn't set new connection: {}", e),
},
Err(e) => log::error!("Could not open connection: {}", e),
Err(e) => {
log::error!("Could not open connection: {}", e);
match con.set_connection_failed().await {
Ok(()) => (),
Err(e) => log::warn!("Couldn't set connection failure: {}", e),
}
}
}
})
}
Expand Down