Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add optional batch send function which may be easier to use in certai…
…n circumstances
  • Loading branch information
benashford committed Jan 3, 2021
commit 242f06bc390d1d346fc424621b0e58dea7480080
73 changes: 59 additions & 14 deletions src/client/paired.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,21 @@ enum ReceiveStatus {
NotReady,
}

type Responder = oneshot::Sender<resp::RespValue>;
type SendPayload = (resp::RespValue, Responder);
#[derive(Debug)]
enum SendPayload {
One(resp::RespValue, oneshot::Sender<resp::RespValue>),
Batch(Vec<(resp::RespValue, oneshot::Sender<resp::RespValue>)>),
}

/// The PairedConnectionInner is a spawned future that is responsible for pairing commands and
/// results onto a `RespConnection` that is otherwise unpaired
struct PairedConnectionInner {
/// The underlying connection that talks the RESP protocol
connection: RespConnection,
/// The channel upon which commands are received
out_rx: mpsc::UnboundedReceiver<SendPayload>,
out_rx: mpsc::UnboundedReceiver<(resp::RespValue, oneshot::Sender<resp::RespValue>)>,
/// The queue of waiting oneshot's for commands sent but results not yet received
waiting: VecDeque<Responder>,
waiting: VecDeque<oneshot::Sender<resp::RespValue>>,

/// The status of the underlying connection
send_status: SendStatus,
Expand Down Expand Up @@ -191,10 +194,22 @@ impl Future for PairedConnectionInner {
}

impl ActionWork for SendPayload {
type ConnectionType = mpsc::UnboundedSender<SendPayload>;
type ConnectionType =
mpsc::UnboundedSender<(resp::RespValue, oneshot::Sender<resp::RespValue>)>;

fn call(self, con: &Self::ConnectionType) -> Result<(), error::Error> {
con.unbounded_send(self).map_err(|e| e.into())
match self {
SendPayload::One(value, receiver) => {
con.unbounded_send((value, receiver))?;
}
SendPayload::Batch(batches) => {
for (value, receiver) in batches {
con.unbounded_send((value, receiver))?;
}
}
}

Ok(())
}
}

Expand All @@ -210,7 +225,10 @@ impl ReconnectableActions for PairedConnectionActions {

fn do_connection(
&self,
) -> ReconnectableConnectionFuture<mpsc::UnboundedSender<SendPayload>, error::Error> {
) -> ReconnectableConnectionFuture<
mpsc::UnboundedSender<(resp::RespValue, oneshot::Sender<resp::RespValue>)>,
error::Error,
> {
let con_f = inner_conn_fn(self.addr, self.username.clone(), self.password.clone());
Box::pin(con_f)
}
Expand Down Expand Up @@ -245,7 +263,8 @@ async fn inner_conn_fn(
addr: SocketAddr,
username: Option<Arc<str>>,
password: Option<Arc<str>>,
) -> Result<mpsc::UnboundedSender<SendPayload>, error::Error> {
) -> Result<mpsc::UnboundedSender<(resp::RespValue, oneshot::Sender<resp::RespValue>)>, error::Error>
{
let username = username.as_ref().map(|u| u.as_ref());
let password = password.as_ref().map(|p| p.as_ref());
let connection = connect_with_auth(&addr, username, password).await?;
Expand Down Expand Up @@ -296,7 +315,7 @@ impl PairedConnection {
}

let (tx, rx) = oneshot::channel();
let work_f = self.out_tx_c.do_work((msg, tx));
let work_f = self.out_tx_c.do_work(SendPayload::One(msg, tx));

Either::Right(async {
let _ = work_f.await?;
Expand All @@ -309,12 +328,38 @@ impl PairedConnection {
})
}

// pub fn send_batch(
// &self,
// msgs: Vec<resp::RespValue>,
// ) -> impl Future<Output = Result<Vec<resp::RespValue>, error::Error>> {
pub fn send_batch(
&self,
msgs: Vec<resp::RespValue>,
) -> impl Future<Output = Result<Vec<resp::RespValue>, error::Error>> + '_ {
let batch_size = msgs.len();
let mut work = Vec::with_capacity(batch_size);
let mut receivers = Vec::with_capacity(batch_size);

for msg in msgs {
let (tx, rx) = oneshot::channel();
work.push((msg, tx));
receivers.push(rx);
}

// }
let work_f = self.out_tx_c.do_work(SendPayload::Batch(work));

async move {
let _ = work_f.await?;
let mut results = Vec::with_capacity(batch_size);
for receiver in receivers {
match receiver.await {
Ok(v) => results.push(v),
Err(_) => {
return Err(error::internal(
"Connection closed before response received",
))
}
}
}
Ok(results)
}
}

pub fn send_and_forget(&self, msg: resp::RespValue) {
let _ = self.send::<resp::RespValue>(msg);
Expand Down
6 changes: 5 additions & 1 deletion src/protocol/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
* except according to those terms.
*/

use std::cmp;

use bytes::{BufMut, BytesMut};

use crate::protocol::resp::RespValue;

const DEFAULT_MESSAGE_SIZE: usize = 1024;

fn check_and_reserve(buf: &mut BytesMut, amt: usize) {
let remaining_bytes = buf.remaining_mut();
if remaining_bytes < amt {
buf.reserve(amt);
buf.reserve(cmp::max(amt, DEFAULT_MESSAGE_SIZE));
}
}

Expand Down