Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Write Bloom filters between row groups instead of the end
This allows Bloom filters to not be saved in memory, which can save
significant space when writing long files
  • Loading branch information
progval committed Jun 10, 2024
commit a417f014ce6ff38081a584676e2f13c58cc110fe
3 changes: 2 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ impl<W: Write + Send> ArrowWriter<W> {
for chunk in in_progress.close()? {
chunk.append_to_row_group(&mut row_group_writer)?;
}
row_group_writer.close()?;
let row_group_metadata = row_group_writer.close()?;
self.writer.write_bloom_filters(&mut [row_group_metadata.to_thrift()])?;
Ok(())
}

Expand Down
13 changes: 9 additions & 4 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,19 @@ impl<W: Write + Send> SerializedFileWriter<W> {
}

/// Serialize all the bloom filter to the file
fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
pub fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
// iter row group
// iter each column
// write bloom filter to the file
for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
for row_group in row_groups.iter_mut() {
let row_group_idx: u16 = row_group
.ordinal
.expect("Missing row group ordinal")
.try_into()
.expect("Negative row group ordinal");
let row_group_idx = row_group_idx as usize;
for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() {
match &self.bloom_filters[row_group_idx][column_idx] {
match self.bloom_filters[row_group_idx][column_idx].take() {
Some(bloom_filter) => {
let start_offset = self.buf.bytes_written();
bloom_filter.write(&mut self.buf)?;
Expand Down Expand Up @@ -338,7 +344,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {
.map(|v| v.to_thrift())
.collect::<Vec<_>>();

self.write_bloom_filters(&mut row_groups)?;
// Write column indexes and offset indexes
self.write_column_indexes(&mut row_groups)?;
self.write_offset_indexes(&mut row_groups)?;
Expand Down