Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Don't expose mutable Vec, remove grow settings
  • Loading branch information
maciejhirsz committed Sep 17, 2020
commit 24ce8d8220016e155876ab7540b9f49e5376a4bd
34 changes: 23 additions & 11 deletions src/capped_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::BufMut;
use std::ops::{Deref, DerefMut};
use std::ops::Deref;
use std::io;

pub struct CappedBuffer {
Expand Down Expand Up @@ -45,18 +45,21 @@ impl Deref for CappedBuffer {
}
}

impl DerefMut for CappedBuffer {
fn deref_mut(&mut self) -> &mut Vec<u8> {
&mut self.buf
impl io::Write for CappedBuffer {
fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
if buf.len() > self.remaining() {
buf = &buf[..self.remaining()];
}
self.buf.extend_from_slice(buf);
Ok(buf.len())
}
}

impl io::Write for CappedBuffer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if buf.len() < self.remaining_mut() {
self.buf.write(buf)
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
if buf.len() <= self.remaining() {
self.buf.extend_from_slice(buf);
Ok(())
} else {
Err(io::Error::new(io::ErrorKind::InvalidInput, "Input exceeds buffer capacity"))
Err(io::Error::new(io::ErrorKind::InvalidInput, "Exceeded maximum buffer capacity"))
}
}

Expand All @@ -71,10 +74,19 @@ impl BufMut for CappedBuffer {
}

unsafe fn advance_mut(&mut self, cnt: usize) {
assert!(cnt <= self.remaining(), "Exceeded buffer capacity");

self.buf.advance_mut(cnt);
}

unsafe fn bytes_mut(&mut self) -> &mut [u8] {
self.buf.bytes_mut()
let remaining = self.remaining();
let mut bytes = self.buf.bytes_mut();

if bytes.len() > remaining {
bytes = &mut bytes[..remaining];
}

bytes
}
}
51 changes: 23 additions & 28 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,7 @@ where
if !data[..end].ends_with(b"\r\n\r\n") {
return Ok(());
}
println!("Buffer growing??");
self.in_buffer.get_mut().extend(&data[end..]);
self.in_buffer.get_mut().write_all(&data[end..])?;
end
};
res.get_mut().truncate(end);
Expand Down Expand Up @@ -1185,20 +1184,18 @@ where
}

fn check_buffer_out(&mut self, frame: &Frame) -> Result<()> {
if self.out_buffer.get_ref().capacity() <= self.out_buffer.get_ref().len() + frame.len() {
// extend
let mut new = CappedBuffer::new(self.out_buffer.get_ref().capacity(), self.settings.max_out_buffer_capacity);
new.extend(&self.out_buffer.get_ref()[self.out_buffer.position() as usize..]);
if new.len() == new.capacity() {
if self.settings.out_buffer_grow && self.out_buffer.get_ref().remaining() > 0 {
new.reserve(self.settings.out_buffer_capacity)
} else {
return Err(Error::new(
Kind::Capacity,
"Maxed out output buffer for connection.",
));
}
if self.out_buffer.get_ref().remaining() < frame.len() {
// There is no more room to grow, and we can't shift the buffer
if self.out_buffer.position() == 0 {
return Err(Error::new(
Kind::Capacity,
"Maxed out output buffer for connection.",
));
}

// Shift the buffer
let mut new = CappedBuffer::new(self.out_buffer.get_ref().capacity(), self.settings.max_out_buffer_capacity);
new.write_all(&self.out_buffer.get_ref()[self.out_buffer.position() as usize..])?;
self.out_buffer = Cursor::new(new);
}
Ok(())
Expand All @@ -1208,20 +1205,18 @@ where
trace!("Reading buffer for connection to {}.", self.peer_addr());
if let Some(len) = self.socket.try_read_buf(self.in_buffer.get_mut())? {
trace!("Buffered {}.", len);
if self.in_buffer.get_ref().len() == self.in_buffer.get_ref().capacity() {
// extend
let mut new = CappedBuffer::new(self.in_buffer.get_ref().capacity(), self.settings.max_in_buffer_capacity);
new.extend(&self.in_buffer.get_ref()[self.in_buffer.position() as usize..]);
if new.len() == new.capacity() {
if self.settings.in_buffer_grow && self.in_buffer.get_ref().remaining() > 0 {
new.reserve(self.settings.in_buffer_capacity);
} else {
return Err(Error::new(
Kind::Capacity,
"Maxed out input buffer for connection.",
));
}
if self.in_buffer.get_ref().remaining() == 0 {
// There is no more room to grow, and we can't shift the buffer
if self.in_buffer.position() == 0 {
return Err(Error::new(
Kind::Capacity,
"Maxed out input buffer for connection.",
));
}

// Shift the buffer
let mut new = CappedBuffer::new(self.in_buffer.get_ref().capacity(), self.settings.max_in_buffer_capacity);
new.write_all(&self.in_buffer.get_ref()[self.in_buffer.position() as usize..])?;
self.in_buffer = Cursor::new(new);
}
Ok(Some(len))
Expand Down
22 changes: 6 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,26 +162,18 @@ pub struct Settings {
/// The maximum length of acceptable incoming frames. Messages longer than this will be rejected.
/// Default: unlimited
pub max_fragment_size: usize,
/// The size of the incoming buffer. A larger buffer uses more memory but will allow for fewer
/// reallocations.
/// The initial size of the incoming buffer. A larger buffer uses more memory but will allow for
/// fewer reallocations.
/// Default: 2048
pub in_buffer_capacity: usize,
/// Whether to reallocate the incoming buffer when `in_buffer_capacity` is reached. If this is
/// false, a Capacity error will be triggered instead.
/// Default: true
pub in_buffer_grow: bool,
/// The maximum size of the incoming buffer, if `in_buffer_grow` is enabled.
/// The maximum size to which the incoming buffer can grow.
/// Default: unlimited
pub max_in_buffer_capacity: usize,
/// The size of the outgoing buffer. A larger buffer uses more memory but will allow for fewer
/// reallocations.
/// The initial size of the outgoing buffer. A larger buffer uses more memory but will allow for
/// fewer reallocations.
/// Default: 2048
pub out_buffer_capacity: usize,
/// Whether to reallocate the incoming buffer when `out_buffer_capacity` is reached. If this is
/// false, a Capacity error will be triggered instead.
/// Default: true
pub out_buffer_grow: bool,
/// The maximum size of the outgoing buffer, if `out_buffer_grow` is enabled.
/// The maximum size to which the outgoing buffer can grow.
/// Default: unlimited
pub max_out_buffer_capacity: usize,
/// Whether to panic when an Internal error is encountered. Internal errors should generally
Expand Down Expand Up @@ -257,10 +249,8 @@ impl Default for Settings {
fragment_size: u16::max_value() as usize,
max_fragment_size: usize::max_value(),
in_buffer_capacity: 2048,
in_buffer_grow: true,
max_in_buffer_capacity: usize::max_value(),
out_buffer_capacity: 2048,
out_buffer_grow: true,
max_out_buffer_capacity: usize::max_value(),
panic_on_internal: true,
panic_on_capacity: false,
Expand Down
4 changes: 0 additions & 4 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ pub trait TryReadBuf: io::Read {
let res = map_non_block(self.read(unsafe { buf.bytes_mut() }));

if let Ok(Some(cnt)) = res {
if cnt > buf.remaining_mut() {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "Exceeded buffer limit"));
}

unsafe {
buf.advance_mut(cnt);
}
Expand Down