Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 83 additions & 94 deletions Cargo.lock

Large diffs are not rendered by default.

34 changes: 26 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,19 @@ ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
apache-avro = { version = "0.17", default-features = false }
arrow = { version = "54.3.0", features = [
arrow = { version = "55.0.0", features = [
"prettyprint",
"chrono-tz",
] }
arrow-buffer = { version = "54.3.0", default-features = false }
arrow-flight = { version = "54.3.0", features = [
arrow-buffer = { version = "55.0.0", default-features = false }
arrow-flight = { version = "55.0.0", features = [
"flight-sql-experimental",
] }
arrow-ipc = { version = "54.3.0", default-features = false, features = [
arrow-ipc = { version = "55.0.0", default-features = false, features = [
"lz4",
] }
arrow-ord = { version = "54.3.0", default-features = false }
arrow-schema = { version = "54.3.0", default-features = false }
arrow-ord = { version = "55.0.0", default-features = false }
arrow-schema = { version = "55.0.0", default-features = false }
async-trait = "0.1.88"
bigdecimal = "0.4.7"
bytes = "1.10"
Expand Down Expand Up @@ -147,9 +147,9 @@ hashbrown = { version = "0.14.5", features = ["raw"] }
indexmap = "2.8.0"
itertools = "0.14"
log = "^0.4"
object_store = { version = "0.11.0", default-features = false }
object_store = { version = "0.12.0", default-features = false }
parking_lot = "0.12"
parquet = { version = "54.3.0", default-features = false, features = [
parquet = { version = "55.0.0", default-features = false, features = [
"arrow",
"async",
"object_store",
Expand Down Expand Up @@ -213,3 +213,21 @@ used_underscore_binding = "warn"
[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] }
unused_qualifications = "deny"


## Temporary arrow-rs patch until 55
## https://github.com/apache/arrow-rs/pull/7391

[patch.crates-io]
arrow = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_for_55.0.0" }
arrow-array = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_for_55.0.0" }
arrow-buffer = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_for_55.0.0" }
arrow-cast = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_for_55.0.0" }
arrow-data = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_for_55.0.0" }
arrow-ipc = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_for_55.0.0" }
arrow-schema = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_for_55.0.0" }
arrow-select = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_for_55.0.0" }
arrow-string = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_for_55.0.0" }
arrow-ord = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_for_55.0.0" }
arrow-flight = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_for_55.0.0" }
parquet = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_for_55.0.0" }
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ log = { workspace = true }
object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
pyo3 = { version = "0.23.5", optional = true }
pyo3 = { version = "0.24.0", optional = true }
recursive = { workspace = true, optional = true }
sqlparser = { workspace = true }
tokio = { workspace = true }
Expand Down
20 changes: 10 additions & 10 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl FileOpener for ArrowOpener {
)?;
// read footer according to footer_len
let get_option = GetOptions {
range: Some(GetRange::Suffix(10 + footer_len)),
range: Some(GetRange::Suffix(10 + (footer_len as u64))),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to usize/u64 are for better wasm support, see

..Default::default()
};
let get_result = object_store
Expand All @@ -332,9 +332,9 @@ impl FileOpener for ArrowOpener {
.iter()
.flatten()
.map(|block| {
let block_len = block.bodyLength() as usize
+ block.metaDataLength() as usize;
let block_offset = block.offset() as usize;
let block_len =
block.bodyLength() as u64 + block.metaDataLength() as u64;
let block_offset = block.offset() as u64;
block_offset..block_offset + block_len
})
.collect_vec();
Expand All @@ -354,19 +354,19 @@ impl FileOpener for ArrowOpener {
.iter()
.flatten()
.filter(|block| {
let block_offset = block.offset() as usize;
block_offset >= range.start as usize
&& block_offset < range.end as usize
let block_offset = block.offset() as u64;
block_offset >= range.start as u64
&& block_offset < range.end as u64
})
.copied()
.collect_vec();

let recordbatch_ranges = recordbatches
.iter()
.map(|block| {
let block_len = block.bodyLength() as usize
+ block.metaDataLength() as usize;
let block_offset = block.offset() as usize;
let block_len =
block.bodyLength() as u64 + block.metaDataLength() as u64;
let block_offset = block.offset() as u64;
block_offset..block_offset + block_len
})
.collect_vec();
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/test/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta
ObjectMeta {
location,
last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
size: metadata.len() as usize,
size: metadata.len(),
e_tag: None,
version: None,
}
Expand Down Expand Up @@ -166,7 +166,7 @@ impl ObjectStore for BlockingObjectStore {
fn list(
&self,
prefix: Option<&Path>,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.inner.list(prefix)
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl TestParquetFile {

println!("Generated test dataset with {num_rows} rows");

let size = std::fs::metadata(&path)?.len() as usize;
let size = std::fs::metadata(&path)?.len();

let mut canonical_path = path.canonicalize()?;

Expand Down
3 changes: 2 additions & 1 deletion datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ impl MetadataFetch for ObjectStoreFetch<'_> {
&mut self,
range: Range<usize>,
) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
let range = range.start as u64..range.end as u64;
async {
self.store
.get_range(&self.meta.location, range)
Expand All @@ -765,7 +766,7 @@ pub async fn fetch_parquet_metadata(

ParquetMetaDataReader::new()
.with_prefetch_hint(size_hint)
.load_and_finish(fetch, file_size)
.load_and_finish(fetch, file_size as usize)
.await
.map_err(DataFusionError::from)
}
Expand Down
16 changes: 9 additions & 7 deletions datafusion/datasource-parquet/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@
//! [`ParquetFileReaderFactory`] and [`DefaultParquetFileReaderFactory`] for
//! low level control of parquet file readers

use crate::ParquetFileMetrics;
use bytes::Bytes;
use datafusion_datasource::file_meta::FileMeta;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::future::BoxFuture;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::file::metadata::ParquetMetaData;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;

use crate::ParquetFileMetrics;

/// Interface for reading parquet files.
///
/// The combined implementations of [`ParquetFileReaderFactory`] and
Expand Down Expand Up @@ -114,10 +114,11 @@ impl AsyncFileReader for ParquetFileReader {
self.inner.get_byte_ranges(ranges)
}

fn get_metadata(
&mut self,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
self.inner.get_metadata()
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
self.inner.get_metadata(options)
}
}

Expand All @@ -135,7 +136,8 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
metrics,
);
let store = Arc::clone(&self.store);
let mut inner = ParquetObjectReader::new(store, file_meta.object_meta);
let mut inner = ParquetObjectReader::new(store, file_meta.object_meta.location)
.with_file_size(file_meta.object_meta.size as usize);

if let Some(hint) = metadata_size_hint {
inner = inner.with_footer_size_hint(hint)
Expand Down
13 changes: 7 additions & 6 deletions datafusion/datasource/src/file_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,11 @@ impl FileGroupPartitioner {
return None;
}

let target_partition_size = (total_size as usize).div_ceil(target_partitions);
let target_partition_size =
(total_size as u64).div_ceil(target_partitions as u64);

let current_partition_index: usize = 0;
let current_partition_size: usize = 0;
let current_partition_size: u64 = 0;

// Partition byte range evenly for all `PartitionedFile`s
let repartitioned_files = flattened_files
Expand Down Expand Up @@ -497,15 +498,15 @@ struct ToRepartition {
/// the index from which the original file will be taken
source_index: usize,
/// the size of the original file
file_size: usize,
file_size: u64,
/// indexes of which group(s) will this be distributed to (including `source_index`)
new_groups: Vec<usize>,
}

impl ToRepartition {
// how big will each file range be when this file is read in its new groups?
fn range_size(&self) -> usize {
self.file_size / self.new_groups.len()
/// How big will each file range be when this file is read in its new groups?
fn range_size(&self) -> u64 {
self.file_size / (self.new_groups.len() as u64)
}
}

Expand Down
24 changes: 15 additions & 9 deletions datafusion/datasource/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub use self::url::ListingTableUrl;
use crate::file_groups::FileGroup;
use chrono::TimeZone;
use datafusion_common::stats::Precision;
use datafusion_common::{ColumnStatistics, Result};
use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result};
use datafusion_common::{ScalarValue, Statistics};
use file_meta::FileMeta;
use futures::{Stream, StreamExt};
Expand Down Expand Up @@ -123,7 +123,7 @@ impl PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(path.into()),
last_modified: chrono::Utc.timestamp_nanos(0),
size: size as usize,
size,
e_tag: None,
version: None,
},
Expand All @@ -141,7 +141,7 @@ impl PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(path),
last_modified: chrono::Utc.timestamp_nanos(0),
size: size as usize,
size,
e_tag: None,
version: None,
},
Expand Down Expand Up @@ -224,7 +224,7 @@ impl From<ObjectMeta> for PartitionedFile {
/// Indicates that the range calculation determined no further action is
/// necessary, possibly because the calculated range is empty or invalid.
pub enum RangeCalculation {
Range(Option<Range<usize>>),
Range(Option<Range<u64>>),
TerminateEarly,
}

Expand All @@ -250,7 +250,12 @@ pub async fn calculate_range(
match file_meta.range {
None => Ok(RangeCalculation::Range(None)),
Some(FileRange { start, end }) => {
let (start, end) = (start as usize, end as usize);
let start: u64 = start.try_into().map_err(|_| {
exec_datafusion_err!("Expect start range to fit in u64, got {start}")
})?;
let end: u64 = end.try_into().map_err(|_| {
exec_datafusion_err!("Expect end range to fit in u64, got {end}")
})?;

let start_delta = if start != 0 {
find_first_newline(store, location, start - 1, file_size, newline).await?
Expand Down Expand Up @@ -289,10 +294,10 @@ pub async fn calculate_range(
async fn find_first_newline(
object_store: &Arc<dyn ObjectStore>,
location: &Path,
start: usize,
end: usize,
start: u64,
end: u64,
newline: u8,
) -> Result<usize> {
) -> Result<u64> {
let options = GetOptions {
range: Some(GetRange::Bounded(start..end)),
..Default::default()
Expand All @@ -305,10 +310,11 @@ async fn find_first_newline(

while let Some(chunk) = result_stream.next().await.transpose()? {
if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
let position = position as u64;
return Ok(index + position);
}

index += chunk.len();
index += chunk.len() as u64;
}

Ok(index)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
object_store = { workspace = true, features = ["fs"] }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parking_lot = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
Expand Down
28 changes: 26 additions & 2 deletions datafusion/functions-aggregate/benches/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@ use std::sync::Arc;

use arrow::array::{
Array, ArrayRef, ArrowPrimitiveType, AsArray, ListArray, NullBufferBuilder,
PrimitiveArray,
};
use arrow::datatypes::{Field, Int64Type};
use arrow::util::bench_util::create_primitive_array;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_expr::Accumulator;
use datafusion_functions_aggregate::array_agg::ArrayAggAccumulator;

use arrow::buffer::OffsetBuffer;
use arrow::util::test_util::seedable_rng;
use rand::distributions::{Distribution, Standard};
use rand::prelude::StdRng;
use rand::Rng;
use rand::SeedableRng;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I inlined the small amount of code from bench_util so this benchmark is standalone and easier to understand what is tested

/// Returns fixed seedable RNG
pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
}

fn merge_batch_bench(c: &mut Criterion, name: &str, values: ArrayRef) {
let list_item_data_type = values.as_list::<i32>().values().data_type().clone();
Expand All @@ -46,6 +52,24 @@ fn merge_batch_bench(c: &mut Criterion, name: &str, values: ArrayRef) {
});
}

pub fn create_primitive_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
where
T: ArrowPrimitiveType,
Standard: Distribution<T::Native>,
{
let mut rng = seedable_rng();

(0..size)
.map(|_| {
if rng.gen::<f32>() < null_density {
None
} else {
Some(rng.gen())
}
})
.collect()
}

/// Create List array with the given item data type, null density, null locations and zero length lists density
/// Creates an random (but fixed-seeded) array of a given size and null density
pub fn create_list_array<T>(
Expand Down
Loading
Loading