Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactoring checkpoint
  • Loading branch information
benashford committed Dec 26, 2020
commit f9ffa59b7c8e901fbb2cb836600308c01a671261
7 changes: 2 additions & 5 deletions src/client/paired.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ use std::task::{Context, Poll};

use futures_channel::{mpsc, oneshot};
use futures_sink::Sink;
use futures_util::{
future::{self, TryFutureExt},
stream::StreamExt,
};
use futures_util::stream::StreamExt;

use super::{
connect::{connect_with_auth, RespConnection},
Expand Down Expand Up @@ -307,7 +304,7 @@ impl PairedConnection {
}

let (tx, rx) = oneshot::channel();
self.out_tx_c.do_work((msg, tx)).await;
self.out_tx_c.do_work((msg, tx)).await?;

match rx.await {
Ok(v) => Ok(T::from_resp(v)?),
Expand Down
77 changes: 57 additions & 20 deletions src/client/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ use std::task::{Context, Poll};

use futures_channel::{mpsc, oneshot};
use futures_sink::Sink;
use futures_util::{
future::TryFutureExt,
stream::{Fuse, Stream, StreamExt},
};
use futures_util::stream::{Fuse, Stream, StreamExt};

use super::{
connect::{connect_with_auth, RespConnection},
Expand All @@ -30,10 +27,11 @@ use super::{
use crate::{
error::{self, ConnectionReason},
protocol::resp::{self, FromResp},
reconnect::{reconnect, Reconnect},
task::spawn,
};

use super::reconnect::{Reconnectable, ReconnectableActions};

#[derive(Debug)]
enum PubsubEvent {
/// The: topic, sink to send messages through, and a oneshot to signal subscription has
Expand Down Expand Up @@ -324,10 +322,58 @@ impl Future for PubsubConnectionInner {
}
}

// PubsubEvent, mpsc::UnboundedSender<PubsubEvent>

#[derive(Debug)]
struct PubsubConnectionActions {
addr: SocketAddr,
username: Option<Arc<str>>,
password: Option<Arc<str>>,
}

impl ReconnectableActions for PubsubConnectionActions {
type WorkPayload = PubsubEvent;
type ConnectionType = mpsc::UnboundedSender<PubsubEvent>;

fn do_work(
&self,
con: &Self::ConnectionType,
work: Self::WorkPayload,
) -> Result<(), error::Error> {
con.unbounded_send(work).map_err(|e| e.into())
}

fn do_connection(
&self,
) -> Pin<Box<dyn Future<Output = Result<Self::ConnectionType, error::Error>> + Send>> {
let con_f = inner_conn_fn(self.addr, self.username.clone(), self.password.clone());
Box::pin(con_f)
}
}

/// A shareable reference to subscribe to PUBSUB topics
#[derive(Debug, Clone)]
pub struct PubsubConnection {
out_tx_c: Arc<Reconnect<PubsubEvent, mpsc::UnboundedSender<PubsubEvent>>>,
out_tx_c: Arc<Reconnectable<PubsubConnectionActions>>,
}

impl PubsubConnection {
async fn init(
addr: SocketAddr,
username: Option<Arc<str>>,
password: Option<Arc<str>>,
) -> Result<Self, error::Error> {
Ok(PubsubConnection {
out_tx_c: Arc::new(
Reconnectable::init(PubsubConnectionActions {
addr,
username,
password,
})
.await?,
),
})
}
}

async fn inner_conn_fn(
Expand Down Expand Up @@ -355,18 +401,7 @@ impl ConnectionBuilder {
let username = self.username.clone();
let password = self.password.clone();

let reconnecting_f = reconnect(
|con: &mpsc::UnboundedSender<PubsubEvent>, act| {
con.unbounded_send(act).map_err(|e| e.into())
},
move || {
let con_f = inner_conn_fn(addr, username.clone(), password.clone());
Box::pin(con_f)
},
);
reconnecting_f.map_ok(|con| PubsubConnection {
out_tx_c: Arc::new(con),
})
PubsubConnection::init(addr, username, password)
}
}

Expand Down Expand Up @@ -395,7 +430,8 @@ impl PubsubConnection {
let (tx, rx) = mpsc::unbounded();
let (signal_t, signal_r) = oneshot::channel();
self.out_tx_c
.do_work(PubsubEvent::Subscribe(topic.to_owned(), tx, signal_t))?;
.do_work(PubsubEvent::Subscribe(topic.to_owned(), tx, signal_t))
.await?;

match signal_r.await {
Ok(_) => Ok(PubsubStream {
Expand All @@ -411,7 +447,8 @@ impl PubsubConnection {
let (tx, rx) = mpsc::unbounded();
let (signal_t, signal_r) = oneshot::channel();
self.out_tx_c
.do_work(PubsubEvent::Psubscribe(topic.to_owned(), tx, signal_t))?;
.do_work(PubsubEvent::Psubscribe(topic.to_owned(), tx, signal_t))
.await?;

match signal_r.await {
Ok(_) => Ok(PubsubStream {
Expand Down
1 change: 0 additions & 1 deletion src/client/reconnect/holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
*/

use std::{
future::Future,
sync::Arc,
time::{Duration, Instant},
};
Expand Down
2 changes: 0 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,4 @@ pub mod protocol;
pub mod client;
pub mod error;

#[deprecated]
pub(crate) mod reconnect;
mod task;
226 changes: 0 additions & 226 deletions src/reconnect.rs

This file was deleted.

Loading