Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
741bbf6
bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041)
BugenZhao Jul 16, 2024
8f76248
Remove `impl<T: AsRef<[u8]>> From<T> for Buffer` that easily acciden…
XiangpengHao Jul 16, 2024
bb5f12b
Make display of interval types more pretty (#6006)
Rachelint Jul 16, 2024
756b1fb
Update snafu (#5930)
Jesse-Bakker Jul 16, 2024
fe04e09
Update Parquet thrift generated structures (#6045)
etseidl Jul 16, 2024
2e7f7ef
Revert "Revert "Write Bloom filters between row groups instead of the…
alamb Jul 16, 2024
effccc1
Revert "Update snafu (#5930)" (#6069)
alamb Jul 16, 2024
649d09d
Update pyo3 requirement from 0.21.1 to 0.22.1 (fixed) (#6075)
crepererum Jul 17, 2024
05e681d
remove repeated codes to make the codes more concise. (#6080)
Rachelint Jul 18, 2024
e40b311
Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068)
etseidl Jul 19, 2024
81c34ac
Update pyo3 requirement from 0.21.1 to 0.22.2 (#6085)
dependabot[bot] Jul 23, 2024
3bc9987
Deprecate read_page_locations() and simplify offset index in `Parquet…
etseidl Jul 23, 2024
095130f
Merge remote-tracking branch 'apache/master' into 53.0.0-dev
alamb Jul 25, 2024
a6353d1
Update parquet/src/column/writer/mod.rs
alamb Jul 25, 2024
eeccaca
Upgrade protobuf definitions to flightsql 17.0 (#6133)
djanderson Jul 27, 2024
b07d057
Add `ParquetMetadataWriter` allow ad-hoc encoding of `ParquetMetadata`
adriangb Jul 24, 2024
e2be8d3
fix loading in test by etseidl
adriangb Jul 31, 2024
0175d53
add rough equivalence test
etseidl Jul 31, 2024
f188bf8
one more check
etseidl Jul 31, 2024
57b85d7
make clippy happy
etseidl Jul 31, 2024
1f3eb0b
Merge pull request #1 from etseidl/pr_6000_ets
adriangb Jul 31, 2024
4d1651c
separate tests that require arrow into a separate module
etseidl Jul 31, 2024
8691903
Merge remote-tracking branch 'origin/master' into test_merge5
etseidl Aug 1, 2024
241ee02
add histograms to to_thrift()
etseidl Aug 1, 2024
0b53d55
Merge pull request #2 from etseidl/fix_compile_check
adriangb Aug 5, 2024
4d7158f
Merge pull request #3 from etseidl/fix_checks_and_merge
adriangb Aug 5, 2024
590c4ed
Merge remote-tracking branch 'apache/master' into add-encode_metadata
alamb Aug 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert "Revert "Write Bloom filters between row groups instead of the…
… end (#…" (#5933)

This reverts commit 22e0b44.
  • Loading branch information
alamb authored Jul 16, 2024
commit 2e7f7ef9b68f69e8707d01dbbf5ad0e57f190979
8 changes: 8 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ hashbrown = { version = "0.14", default-features = false }
twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
sysinfo = { version = "0.30.12", optional = true, default-features = false }

[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -114,12 +115,19 @@ async = ["futures", "tokio"]
object_store = ["dep:object_store", "async"]
# Group Zstd dependencies
zstd = ["dep:zstd", "zstd-sys"]
# Display memory in example/write_parquet.rs
sysinfo = ["dep:sysinfo"]

[[example]]
name = "read_parquet"
required-features = ["arrow"]
path = "./examples/read_parquet.rs"

[[example]]
name = "write_parquet"
required-features = ["cli", "sysinfo"]
path = "./examples/write_parquet.rs"

[[example]]
name = "async_read_parquet"
required-features = ["arrow", "async"]
Expand Down
131 changes: 131 additions & 0 deletions parquet/examples/write_parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};

use arrow::array::{StructArray, UInt64Builder};
use arrow::datatypes::DataType::UInt64;
use arrow::datatypes::{Field, Schema};
use clap::{Parser, ValueEnum};
use parquet::arrow::ArrowWriter as ParquetWriter;
use parquet::basic::Encoding;
use parquet::errors::Result;
use parquet::file::properties::{BloomFilterPosition, WriterProperties};
use sysinfo::{MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};

#[derive(ValueEnum, Clone)]
enum BloomFilterPositionArg {
End,
AfterRowGroup,
}

#[derive(Parser)]
#[command(version)]
/// Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage.
struct Args {
#[arg(long, default_value_t = 1000)]
/// Number of batches to write
iterations: u64,

#[arg(long, default_value_t = 1000000)]
/// Number of rows in each batch
batch: u64,

#[arg(long, value_enum, default_value_t=BloomFilterPositionArg::AfterRowGroup)]
/// Where to write Bloom Filters
bloom_filter_position: BloomFilterPositionArg,

/// Path to the file to write
path: PathBuf,
}

fn now() -> String {
chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string()
}

fn mem(system: &mut System) -> String {
let pid = Pid::from(std::process::id() as usize);
system.refresh_process_specifics(pid, ProcessRefreshKind::new().with_memory());
system
.process(pid)
.map(|proc| format!("{}MB", proc.memory() / 1_000_000))
.unwrap_or("N/A".to_string())
}

fn main() -> Result<()> {
let args = Args::parse();

let bloom_filter_position = match args.bloom_filter_position {
BloomFilterPositionArg::End => BloomFilterPosition::End,
BloomFilterPositionArg::AfterRowGroup => BloomFilterPosition::AfterRowGroup,
};

let properties = WriterProperties::builder()
.set_column_bloom_filter_enabled("id".into(), true)
.set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED)
.set_bloom_filter_position(bloom_filter_position)
.build();
let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)]));
// Create parquet file that will be read.
let file = File::create(args.path).unwrap();
let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?;

let mut system =
System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything()));
eprintln!(
"{} Writing {} batches of {} rows. RSS = {}",
now(),
args.iterations,
args.batch,
mem(&mut system)
);

let mut array_builder = UInt64Builder::new();
let mut last_log = Instant::now();
for i in 0..args.iterations {
if Instant::now() - last_log > Duration::new(10, 0) {
last_log = Instant::now();
eprintln!(
"{} Iteration {}/{}. RSS = {}",
now(),
i + 1,
args.iterations,
mem(&mut system)
);
}
for j in 0..args.batch {
array_builder.append_value(i + j);
}
writer.write(
&StructArray::new(
schema.fields().clone(),
vec![Arc::new(array_builder.finish())],
None,
)
.into(),
)?;
}
writer.flush()?;
writer.close()?;

eprintln!("{} Done. RSS = {}", now(), mem(&mut system));

Ok(())
}
28 changes: 25 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::column::writer::{
};
use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
Expand Down Expand Up @@ -204,7 +204,7 @@ impl<W: Write + Send> ArrowWriter<W> {
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.writer.flushed_row_groups()
}

Expand Down Expand Up @@ -1097,7 +1097,9 @@ mod tests {
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion};
use crate::file::properties::{
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
reader::{FileReader, SerializedFileReader},
Expand Down Expand Up @@ -1745,6 +1747,7 @@ mod tests {
values: ArrayRef,
schema: SchemaRef,
bloom_filter: bool,
bloom_filter_position: BloomFilterPosition,
}

impl RoundTripOptions {
Expand All @@ -1755,6 +1758,7 @@ mod tests {
values,
schema: Arc::new(schema),
bloom_filter: false,
bloom_filter_position: BloomFilterPosition::AfterRowGroup,
}
}
}
Expand All @@ -1774,6 +1778,7 @@ mod tests {
values,
schema,
bloom_filter,
bloom_filter_position,
} = options;

let encodings = match values.data_type() {
Expand Down Expand Up @@ -1814,6 +1819,7 @@ mod tests {
.set_dictionary_page_size_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.set_bloom_filter_enabled(bloom_filter)
.set_bloom_filter_position(bloom_filter_position)
.build();

files.push(roundtrip_opts(&expected_batch, props))
Expand Down Expand Up @@ -2171,6 +2177,22 @@ mod tests {
values_required::<BinaryViewArray, _>(many_vecs_iter);
}

#[test]
fn i32_column_bloom_filter_at_end() {
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
let mut options = RoundTripOptions::new(array, false);
options.bloom_filter = true;
options.bloom_filter_position = BloomFilterPosition::End;

let files = one_column_roundtrip_with_options(options);
check_bloom_filter(
files,
"col".to_string(),
(0..SMALL_SIZE as i32).collect(),
(SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
);
}

#[test]
fn i32_column_bloom_filter() {
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::{
arrow::arrow_writer::ArrowWriterOptions,
arrow::ArrowWriter,
errors::{ParquetError, Result},
file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties},
file::{metadata::RowGroupMetaData, properties::WriterProperties},
format::{FileMetaData, KeyValue},
};
use arrow_array::RecordBatch;
Expand Down Expand Up @@ -172,7 +172,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.sync_writer.flushed_row_groups()
}

Expand Down
5 changes: 5 additions & 0 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,11 @@ impl RowGroupMetaData {
&self.columns
}

/// Returns mutable slice of column chunk metadata.
pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
&mut self.columns
}

/// Number of rows in this row group.
pub fn num_rows(&self) -> i64 {
self.num_rows
Expand Down
36 changes: 36 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag
pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
/// Default value for [`WriterProperties::max_row_group_size`]
pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
/// Default value for [`WriterProperties::bloom_filter_position`]
pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup;
/// Default value for [`WriterProperties::created_by`]
pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION"));
/// Default value for [`WriterProperties::column_index_truncate_length`]
Expand Down Expand Up @@ -88,6 +90,24 @@ impl FromStr for WriterVersion {
}
}

/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) should
/// write Bloom filters
///
/// Basic constant, which is not part of the Thrift definition.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BloomFilterPosition {
/// Write Bloom Filters of each row group right after the row group
///
/// This saves memory by writing it as soon as it is computed, at the cost
/// of data locality for readers
AfterRowGroup,
/// Write Bloom Filters at the end of the file
///
/// This allows better data locality for readers, at the cost of memory usage
/// for writers.
End,
}

/// Reference counted writer properties.
pub type WriterPropertiesPtr = Arc<WriterProperties>;

Expand Down Expand Up @@ -132,6 +152,7 @@ pub struct WriterProperties {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
Expand Down Expand Up @@ -219,6 +240,11 @@ impl WriterProperties {
self.max_row_group_size
}

/// Returns maximum number of rows in a row group.
pub fn bloom_filter_position(&self) -> BloomFilterPosition {
self.bloom_filter_position
}

/// Returns configured writer version.
pub fn writer_version(&self) -> WriterVersion {
self.writer_version
Expand Down Expand Up @@ -340,6 +366,7 @@ pub struct WriterPropertiesBuilder {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
key_value_metadata: Option<Vec<KeyValue>>,
Expand All @@ -359,6 +386,7 @@ impl WriterPropertiesBuilder {
data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT,
write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION,
writer_version: DEFAULT_WRITER_VERSION,
created_by: DEFAULT_CREATED_BY.to_string(),
key_value_metadata: None,
Expand All @@ -378,6 +406,7 @@ impl WriterPropertiesBuilder {
data_page_row_count_limit: self.data_page_row_count_limit,
write_batch_size: self.write_batch_size,
max_row_group_size: self.max_row_group_size,
bloom_filter_position: self.bloom_filter_position,
writer_version: self.writer_version,
created_by: self.created_by,
key_value_metadata: self.key_value_metadata,
Expand Down Expand Up @@ -489,6 +518,12 @@ impl WriterPropertiesBuilder {
self
}

/// Sets where in the final file Bloom Filters are written (default `AfterRowGroup`)
pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self {
self.bloom_filter_position = value;
self
}

/// Sets "created by" property (defaults to `parquet-rs version <VERSION>`).
pub fn set_created_by(mut self, value: String) -> Self {
self.created_by = value;
Expand Down Expand Up @@ -1054,6 +1089,7 @@ mod tests {
);
assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE);
assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE);
assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION);
assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION);
assert_eq!(props.created_by(), DEFAULT_CREATED_BY);
assert_eq!(props.key_value_metadata(), None);
Expand Down
Loading