Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor streaming
  • Loading branch information
cosmicexplorer committed Jul 16, 2024
commit 27ba4d9060b2fbfafa5fc4fb2055397bb3d1650e
21 changes: 12 additions & 9 deletions src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,13 +1041,8 @@ pub(crate) fn central_header_to_zip_file_inner<R: Read>(
aes_extra_data_start: 0,
extra_fields: Vec::new(),
};
match parse_extra_field(&mut result) {
Ok(stripped_extra_field) => {
result.extra_field = stripped_extra_field;
}
Err(ZipError::Io(..)) => {}
Err(e) => return Err(e),
}
let stripped_extra_field = parse_extra_field(&mut result)?;
result.extra_field = stripped_extra_field;

let aes_enabled = result.compression_method == CompressionMethod::AES;
if aes_enabled && result.aes_mode.is_none() {
Expand Down Expand Up @@ -1076,9 +1071,17 @@ pub(crate) fn parse_extra_field(file: &mut ZipFileData) -> ZipResult<Option<Arc<

/* TODO: codify this structure into Zip64ExtraFieldBlock fields! */
let mut position = reader.position() as usize;
while (position) < len {
while position < len {
let old_position = position;
let remove = parse_single_extra_field(file, &mut reader, position as u64, false)?;
let remove = match parse_single_extra_field(file, &mut reader, position as u64, false) {
Ok(b) => b,
/* If we get an error reading too far, then assume this is an extra field we don't know
* how to handle, and just return the remaining amount. */
Err(ZipError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Ok(Some(processed_extra_field))
}
Err(e) => return Err(e),
};
position = reader.position() as usize;
if remove {
let remaining = len - (position - old_position);
Expand Down
132 changes: 76 additions & 56 deletions src/unstable/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ pub mod streaming {
pub struct StreamingArchive<R> {
reader: R,
remaining_before_next_entry: u64,
first_metadata_block: Option<Box<[u8]>>,
first_metadata_block: Option<[u8; mem::size_of::<ZipLocalEntryBlock>()]>,
}

impl<R> StreamingArchive<R> {
Expand All @@ -510,15 +510,11 @@ pub mod streaming {
where
R: Read,
{
pub fn next_entry(&mut self) -> ZipResult<Option<StreamingZipEntry<impl Read + '_>>> {
// We can't use the typical ::parse() method, as we follow separate code paths depending
// on the "magic" value (since the magic value will be from the central directory header
// if we've finished iterating over all the actual files).
/* TODO: smallvec? */
fn drain_remaining(&mut self) -> Result<(), ZipError> {
let Self {
ref mut reader,
ref mut remaining_before_next_entry,
ref mut first_metadata_block,
..
} = self;
if *remaining_before_next_entry > 0 {
io::copy(
Expand All @@ -527,36 +523,50 @@ pub mod streaming {
)?;
*remaining_before_next_entry = 0;
}
Ok(())
}

pub fn next_entry(&mut self) -> ZipResult<Option<StreamingZipEntry<impl Read + '_>>> {
// We can't use the typical ::parse() method, as we follow separate code paths depending
// on the "magic" value (since the magic value will be from the central directory header
// if we've finished iterating over all the actual files).
self.drain_remaining()?;
let Self {
ref mut reader,
ref mut remaining_before_next_entry,
ref mut first_metadata_block,
} = self;
assert_eq!(0, *remaining_before_next_entry);

let mut block = [0u8; mem::size_of::<ZipLocalEntryBlock>()];
reader.read_exact(&mut block)?;
let block: Box<[u8]> = block.into();

let signature = spec::Magic::from_first_le_bytes(&block);

let signature = spec::Magic::from_first_le_bytes(block.as_ref());
match signature {
spec::Magic::LOCAL_FILE_HEADER_SIGNATURE => (),
spec::Magic::CENTRAL_DIRECTORY_HEADER_SIGNATURE => {
ZipLocalEntryBlock::MAGIC => (),
/* If the signature corresponds to the first central directory entry, then we are
* out of file entries. We can't seek backwards in a stream, so we save the block
* we just read to our mutable state. */
ZipCentralEntryBlock::MAGIC => {
assert!(
first_metadata_block.is_none(),
"metadata block should never be set except exactly once"
);
assert!(
mem::size_of::<ZipLocalEntryBlock>()
< mem::size_of::<ZipCentralEntryBlock>()
);
*first_metadata_block = Some(block);
return Ok(None);
}
_ => return Err(ZipLocalEntryBlock::WRONG_MAGIC_ERROR),
}

let block = ZipLocalEntryBlock::interpret(&block)?;

let mut data = ZipFileData::from_local_block(block, reader)?;

match parse_extra_field(&mut data) {
/* FIXME: check for the right error type here instead of accepting any old i/o
* error. */
Ok(..) | Err(ZipError::Io(..)) => {}
Err(e) => return Err(e),
}
let stripped_extra_field = parse_extra_field(&mut data)?;
data.extra_field = stripped_extra_field;

let limit_reader =
DrainWrapper::new(data.compressed_size, remaining_before_next_entry, reader);
Expand All @@ -570,46 +580,55 @@ pub mod streaming {
}

pub fn next_metadata_entry(&mut self) -> ZipResult<Option<ZipStreamFileMetadata>> {
/* We should only need to drain remaining exactly once (if at all), since that's only
* employed if the end user fails to read the entire contents of a streaming file
* entry. */
self.drain_remaining()?;
let Self {
ref mut reader,
ref mut remaining_before_next_entry,
ref remaining_before_next_entry,
ref mut first_metadata_block,
} = self;
if *remaining_before_next_entry > 0 {
io::copy(
&mut reader.by_ref().take(*remaining_before_next_entry),
&mut io::sink(),
)?;
*remaining_before_next_entry = 0;
}

// Parse central header
let block = match first_metadata_block.take() {
None => match ZipCentralEntryBlock::parse(reader) {
Ok(block) => block,
Err(ZipError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Ok(None);
assert_eq!(0, *remaining_before_next_entry);

/* Get the bytes out of the stream necessary to create a parseable central block. */
let block: [u8; mem::size_of::<ZipCentralEntryBlock>()] =
match first_metadata_block.take() {
/* If we have a block we tried to parse earlier from .next_entry(), get the
* data from there, then read the additional bytes necessary to construct
* a central directory entry. This should always happen exactly once. */
Some(block) => {
assert!(
mem::size_of::<ZipLocalEntryBlock>()
< mem::size_of::<ZipCentralEntryBlock>()
);
assert_eq!(block.len(), mem::size_of::<ZipLocalEntryBlock>());

let mut remaining_block = [0u8; mem::size_of::<ZipCentralEntryBlock>()
- mem::size_of::<ZipLocalEntryBlock>()];
reader.read_exact(remaining_block.as_mut())?;

let mut joined_block = [0u8; mem::size_of::<ZipCentralEntryBlock>()];
joined_block[..block.len()].copy_from_slice(&block);
joined_block[block.len()..].copy_from_slice(&remaining_block);
joined_block
}
Err(e) => return Err(e),
},
Some(block) => {
assert_eq!(block.len(), mem::size_of::<ZipLocalEntryBlock>());
assert!(
mem::size_of::<ZipLocalEntryBlock>()
< mem::size_of::<ZipCentralEntryBlock>()
);

let mut remaining_block = [0u8; mem::size_of::<ZipCentralEntryBlock>()
- mem::size_of::<ZipLocalEntryBlock>()];
reader.read_exact(remaining_block.as_mut())?;

let mut joined_block = [0u8; mem::size_of::<ZipCentralEntryBlock>()];
joined_block[..block.len()].copy_from_slice(&block);
joined_block[block.len()..].copy_from_slice(&remaining_block);

ZipCentralEntryBlock::interpret(&joined_block)?
}
};
/* After the first central block is parsed, we should always go into this
* branch, reading the necessary bytes from the stream. */
None => {
let mut block = [0u8; mem::size_of::<ZipCentralEntryBlock>()];
match reader.read_exact(&mut block) {
Ok(()) => (),
/* The reader is done! This is expected to happen exactly once when the
* stream is completely finished. */
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(e.into()),
};
block
}
};
// Parse central header
let block = ZipCentralEntryBlock::interpret(&block)?;

// Give archive_offset and central_header_start dummy value 0, since
// they are not used in the output.
Expand Down Expand Up @@ -654,10 +673,11 @@ pub mod streaming {
R: Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
assert!(!buf.is_empty());
let to_read = self.remaining().min(buf.len());
/* If the input is exhausted, or `buf` was empty, just forward any error. */
/* If the input is exhausted, or `buf` was empty, we are done. */
if to_read == 0 {
return self.inner.read(&mut []);
return Ok(0);
}

let count = self.inner.read(&mut buf[..to_read])?;
Expand Down