Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rewrite example with constants as parameters and fewer dependencies
  • Loading branch information
progval committed Jun 13, 2024
commit d2a7ab8d6e7ba1771311352ec5c58f4914b36551
10 changes: 4 additions & 6 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ hashbrown = { version = "0.14", default-features = false }
twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }

dsi-progress-logger = { version = "0.2.4", optional = true }
simplelog = { version = "0.12.2", optional = true }
sysinfo = { version = "0.30.12", optional = true, default-features = false }

[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -117,8 +115,8 @@ async = ["futures", "tokio"]
object_store = ["dep:object_store", "async"]
# Group Zstd dependencies
zstd = ["dep:zstd", "zstd-sys"]
# Enable progress logging
log = ["dep:simplelog", "dep:dsi-progress-logger"]
# Display memory in example/write_parquet.rs
sysinfo = ["dep:sysinfo"]

[[example]]
name = "read_parquet"
Expand All @@ -127,7 +125,7 @@ path = "./examples/read_parquet.rs"

[[example]]
name = "write_parquet"
required-features = ["log"]
required-features = ["cli", "sysinfo"]
path = "./examples/write_parquet.rs"

[[example]]
Expand Down
91 changes: 75 additions & 16 deletions parquet/examples/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,100 @@
// under the License.

use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;

use dsi_progress_logger::prelude::*;
use std::time::{Duration, Instant};

use arrow::array::{StructArray, UInt64Builder};
use arrow::datatypes::DataType::UInt64;
use arrow::datatypes::{Field, Schema};
use clap::{Parser, ValueEnum};
use parquet::arrow::ArrowWriter as ParquetWriter;
use parquet::basic::Encoding;
use parquet::errors::Result;
use parquet::file::properties::WriterProperties;
use parquet::file::properties::{BloomFilterPosition, WriterProperties};
use sysinfo::{Pid, ProcessRefreshKind, RefreshKind, System, MemoryRefreshKind};

#[derive(ValueEnum, Clone)]
enum BloomFilterPositionArg {
End,
AfterRowGroup,
}

#[derive(Parser)]
#[command(version)]
/// Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage.
struct Args {
#[arg(long, default_value_t = 1000)]
/// Number of batches to write
iterations: u64,

#[arg(long, default_value_t = 1000000)]
/// Number of rows in each batch
batch: u64,

#[arg(long, value_enum, default_value_t=BloomFilterPositionArg::AfterRowGroup)]
/// Where to write Bloom Filters
bloom_filter_position: BloomFilterPositionArg,

/// Path to the file to write
path: PathBuf,
}

fn now() -> String {
chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string()
}

fn mem(system: &mut System) -> String {
let pid = Pid::from(std::process::id() as usize);
system.refresh_process_specifics(pid, ProcessRefreshKind::new().with_memory());
system
.process(pid)
.map(|proc| format!("{}MB", proc.memory() / 1_000_000))
.unwrap_or("N/A".to_string())
}

fn main() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could add some comments here explaining what this example is trying to show

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, along with a Clap argument parser:

$ cargo run --release --features="cli sysinfo" --example write_parquet -- -h
Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage

Usage: write_parquet [OPTIONS] <PATH>

Arguments:
  <PATH>  Path to the file to write

Options:
      --iterations <ITERATIONS>                        Number of batches to write [default: 1000]
      --batch <BATCH>                                  Number of rows in each batch [default: 1000000]
      --bloom-filter-position <BLOOM_FILTER_POSITION>  Where to write Bloom Filters [default: after-row-group] [possible values: end, after-row-group]
  -h, --help                                           Print help
  -V, --version                                        Print version

let _ = simplelog::SimpleLogger::init(simplelog::LevelFilter::Info, Default::default());
let args = Args::parse();

let bloom_filter_position = match args.bloom_filter_position {
BloomFilterPositionArg::End => BloomFilterPosition::End,
BloomFilterPositionArg::AfterRowGroup => BloomFilterPosition::AfterRowGroup,
};

let properties = WriterProperties::builder()
.set_column_bloom_filter_enabled("id".into(), true)
.set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED)
.set_bloom_filter_position(bloom_filter_position)
.build();
let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)]));
// Create parquet file that will be read.
let path = "/tmp/test.parquet";
let file = File::create(path).unwrap();
let file = File::create(args.path).unwrap();
let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?;

let num_iterations = 3000;
let mut pl = progress_logger!(
item_name = "iterations",
display_memory = true,
expected_updates = Some(num_iterations as usize)
let mut system = System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything()));
eprintln!(
"{} Writing {} batches of {} rows. RSS = {}",
now(),
args.iterations,
args.batch,
mem(&mut system)
);
pl.start("Writing batches");

let mut array_builder = UInt64Builder::new();
for i in 0..num_iterations {
pl.update();
for j in 0..1_000_000 {
let mut last_log = Instant::now();
for i in 0..args.iterations {
if Instant::now() - last_log > Duration::new(10, 0) {
last_log = Instant::now();
eprintln!(
"{} Iteration {}/{}. RSS = {}",
now(),
i + 1,
args.iterations,
mem(&mut system)
);
}
for j in 0..args.batch {
array_builder.append_value(i + j);
}
writer.write(
Expand All @@ -65,7 +123,8 @@ fn main() -> Result<()> {
}
writer.flush()?;
writer.close()?;
pl.done();

eprintln!("{} Done. RSS = {}", now(), mem(&mut system));

Ok(())
}