Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add struct for in-memory row group with only selected pages
  • Loading branch information
thinkharderdev committed Aug 17, 2022
commit 7f2201cde0e5a3729e67da3ea4bf036c9dddba35
1 change: 1 addition & 0 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,7 @@ mod tests {
integration.head(&path).await.unwrap();
}

#[ignore]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this test fails on my machine because of a permissions issue. Meant to revert before submitting.

#[tokio::test]
async fn test_list_root() {
let integration = LocalFileSystem::new();
Expand Down
9 changes: 7 additions & 2 deletions parquet/src/arrow/arrow_reader/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::Range;

/// [`RowSelector`] represents a range of rows to scan from a parquet file
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// [`RowSelection`] is a collection of [`RowSelect`] used to skip rows when
/// scanning a parquet file
#[derive(Debug, Clone, Copy)]
pub struct RowSelector {
/// The number of rows
pub row_count: usize,
Expand Down Expand Up @@ -116,6 +117,10 @@ impl RowSelection {
Self { selectors }
}

pub fn selectors(&self) -> &[RowSelector] {
&self.selectors
}

/// Splits off the first `row_count` from this [`RowSelection`]
pub fn split_off(&mut self, row_count: usize) -> Self {
let mut total_count = 0;
Expand Down
168 changes: 166 additions & 2 deletions parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
use std::collections::VecDeque;
use std::fmt::Formatter;

use std::io::{Cursor, SeekFrom};
use std::io::{Cursor, Read, SeekFrom};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -88,7 +88,7 @@ use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;
use parquet_format::{PageHeader, PageType};
use parquet_format::{PageHeader, PageLocation, PageType};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -544,6 +544,170 @@ impl RowGroupCollection for InMemoryRowGroup {
}
}

struct SparseInMemoryRowGroup {
schema: SchemaDescPtr,
column_chunks: Vec<Option<SparseInMemoryColumnChunk>>,
row_count: usize,
}

struct SparseInMemoryColumnChunk {
num_values: i64,
compression: Compression,
physical_type: crate::basic::Type,
pages: Vec<Option<Bytes>>,
page_locations: Vec<PageLocation>,
}

impl SparseInMemoryColumnChunk {
async fn fetch<T: AsyncFileReader + Send>(
&mut self,
input: &mut T,
selection: &RowSelection,
page_locations: &[PageLocation],
) -> Result<()> {
let num_pages = page_locations.len();

let mut fetch_ranges = Vec::with_capacity(num_pages);
let mut fetch_pages = Vec::with_capacity(num_pages);

let mut page_offset = 0;
let mut row_offset = 0;

for selector in selection.selectors() {
row_offset += selector.row_count;
if fetch_pages
.last()
.map(|idx| *idx == page_offset)
.unwrap_or(false)
{
page_offset += 1;
}

while page_offset < page_locations.len()
&& page_locations[page_offset].first_row_index <= row_offset as i64
{
if !selector.skip {
let range_start = page_locations[page_offset].offset;
let range_end = range_start
+ page_locations[page_offset].compressed_page_size as i64;
fetch_ranges.push(range_start as usize..range_end as usize);
fetch_pages.push(page_offset);
}
page_offset += 1;
}
}

let chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();

for (idx, chunk) in fetch_pages.into_iter().zip(chunk_data) {
self.pages[idx] = Some(chunk);
}

Ok(())
}
}

struct SparseInMemoryColumnChunkReader {
page_set: SparseInMemoryColumnChunk,
decompressor: Option<Box<dyn Codec>>,
offset: usize,
seen_num_values: i64,
}

impl SparseInMemoryColumnChunkReader {
fn new(page_set: SparseInMemoryColumnChunk) -> Result<Self> {
let decompressor = create_codec(page_set.compression)?;
let result = Self {
page_set,
decompressor,
offset: 0,
seen_num_values: 0,
};
Ok(result)
}
}

impl Iterator for SparseInMemoryColumnChunkReader {
type Item = Result<Page>;

fn next(&mut self) -> Option<Self::Item> {
self.get_next_page().transpose()
}
}

impl PageReader for SparseInMemoryColumnChunkReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
self.page_set
.pages
.get(self.offset)
.map(|data| match data {
Some(data) => {
let mut cursor = Cursor::new(data);

let page_header = read_page_header(&mut cursor)?;
self.offset += 1;

let compressed_size = page_header.compressed_page_size as usize;

let start_offset = cursor.position() as usize;
let end_offset = start_offset + compressed_size;

let buffer = cursor.into_inner().slice(start_offset..end_offset);

let result = match page_header.type_ {
PageType::DataPage | PageType::DataPageV2 => {
let decoded = decode_page(
page_header,
buffer.into(),
self.page_set.physical_type,
self.decompressor.as_mut(),
)?;
self.seen_num_values += decoded.num_values() as i64;
decoded
}
_ => {
return Err(ParquetError::ArrowError(format!(
"Error reading next page, page at index {} not a data page",
self.offset
)))
}
};

Ok(result)
}
None => Err(ParquetError::ArrowError(format!(
"Error reading next page, page at index {} not fetched",
self.offset
))),
})
.transpose()
}

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
if self.offset < self.page_set.page_locations.len() - 2 {
Ok(Some(PageMetadata {
num_rows: self.page_set.page_locations[self.offset + 1].first_row_index
as usize
- self.page_set.page_locations[self.offset].first_row_index as usize,
is_dict: false,
}))
} else if self.offset == self.page_set.page_locations.len() - 1 {
Ok(Some(PageMetadata {
num_rows: self.page_set.num_values as usize
- self.page_set.page_locations[self.offset].first_row_index as usize,
is_dict: false,
}))
} else {
Ok(None)
}
}

fn skip_next_page(&mut self) -> Result<()> {
self.offset += 1;
Ok(())
}
}

/// Data for a single column chunk
#[derive(Clone)]
struct InMemoryColumnChunk {
Expand Down
1 change: 1 addition & 0 deletions parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl PageWriteSpec {
}

/// Contains metadata for a page
#[derive(Clone)]
pub struct PageMetadata {
/// The number of rows in this page
pub num_rows: usize,
Expand Down