Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add WriterProperties::bloom_filter_position
  • Loading branch information
progval committed Jun 10, 2024
commit 6effa7f90ea7eb71c841b81dd9f15f0cae853997
9 changes: 7 additions & 2 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::column::writer::{
use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
use crate::file::properties::{BloomFilterPosition, WriterProperties, WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
Expand Down Expand Up @@ -264,7 +264,11 @@ impl<W: Write + Send> ArrowWriter<W> {
chunk.append_to_row_group(&mut row_group_writer)?;
}
let row_group_metadata = row_group_writer.close()?;
self.writer.write_bloom_filters(&mut [row_group_metadata.to_thrift()])?;
match self.writer.properties().bloom_filter_position() {
BloomFilterPosition::AfterRowGroup =>
self.writer.write_bloom_filters(&mut [row_group_metadata.to_thrift()])?,
BloomFilterPosition::End => (),
}
Ok(())
}

Expand Down Expand Up @@ -1757,6 +1761,7 @@ mod tests {
.set_dictionary_page_size_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.set_bloom_filter_enabled(bloom_filter)
.set_bloom_filter_position(BloomFilterPosition::End)
.build();

files.push(roundtrip_opts(&expected_batch, props))
Expand Down
36 changes: 36 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag
pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
/// Default value for [`WriterProperties::max_row_group_size`]
pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
/// Default value for [`WriterProperties::bloom_filter_position`]
pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup;
/// Default value for [`WriterProperties::created_by`]
pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION"));
/// Default value for [`WriterProperties::column_index_truncate_length`]
Expand Down Expand Up @@ -86,6 +88,24 @@ impl FromStr for WriterVersion {
}
}

/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) should
/// write Bloom filters
///
/// Basic constant, which is not part of the Thrift definition.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BloomFilterPosition {
/// Write Bloom Filters of each row group right after the row group
///
/// This saves memory by writing it as soon as it is computed, at the cost
/// of data locality for readers
AfterRowGroup,
/// Write Bloom Filters at the end of the file
///
/// This allows better data locality for readers, at the cost of memory usage
/// for writers.
End,
}

/// Reference counted writer properties.
pub type WriterPropertiesPtr = Arc<WriterProperties>;

Expand Down Expand Up @@ -131,6 +151,7 @@ pub struct WriterProperties {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
Expand Down Expand Up @@ -217,6 +238,11 @@ impl WriterProperties {
self.max_row_group_size
}

/// Returns maximum number of rows in a row group.
pub fn bloom_filter_position(&self) -> BloomFilterPosition {
self.bloom_filter_position
}

/// Returns configured writer version.
pub fn writer_version(&self) -> WriterVersion {
self.writer_version
Expand Down Expand Up @@ -337,6 +363,7 @@ pub struct WriterPropertiesBuilder {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
key_value_metadata: Option<Vec<KeyValue>>,
Expand All @@ -356,6 +383,7 @@ impl WriterPropertiesBuilder {
data_page_row_count_limit: usize::MAX,
write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION,
writer_version: DEFAULT_WRITER_VERSION,
created_by: DEFAULT_CREATED_BY.to_string(),
key_value_metadata: None,
Expand All @@ -375,6 +403,7 @@ impl WriterPropertiesBuilder {
data_page_row_count_limit: self.data_page_row_count_limit,
write_batch_size: self.write_batch_size,
max_row_group_size: self.max_row_group_size,
bloom_filter_position: self.bloom_filter_position,
writer_version: self.writer_version,
created_by: self.created_by,
key_value_metadata: self.key_value_metadata,
Expand Down Expand Up @@ -479,6 +508,12 @@ impl WriterPropertiesBuilder {
self
}

/// Sets where in the final file Bloom Filters are written
pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self {
self.bloom_filter_position = value;
self
}

/// Sets "created by" property.
pub fn set_created_by(mut self, value: String) -> Self {
self.created_by = value;
Expand Down Expand Up @@ -991,6 +1026,7 @@ mod tests {
);
assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE);
assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE);
assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION);
assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION);
assert_eq!(props.created_by(), DEFAULT_CREATED_BY);
assert_eq!(props.key_value_metadata(), None);
Expand Down
1 change: 1 addition & 0 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
.map(|v| v.to_thrift())
.collect::<Vec<_>>();

self.write_bloom_filters(&mut row_groups)?;
// Write column indexes and offset indexes
self.write_column_indexes(&mut row_groups)?;
self.write_offset_indexes(&mut row_groups)?;
Expand Down