Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update file size handling
  • Loading branch information
alamb committed Apr 7, 2025
commit cabfb589f52b40f3de6d2946025f85f99baf58bf
13 changes: 7 additions & 6 deletions datafusion/datasource/src/file_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,11 @@ impl FileGroupPartitioner {
return None;
}

let target_partition_size = (total_size as usize).div_ceil(target_partitions);
let target_partition_size =
(total_size as u64).div_ceil(target_partitions as u64);

let current_partition_index: usize = 0;
let current_partition_size: usize = 0;
let current_partition_size: u64 = 0;

// Partition byte range evenly for all `PartitionedFile`s
let repartitioned_files = flattened_files
Expand Down Expand Up @@ -497,15 +498,15 @@ struct ToRepartition {
/// the index from which the original file will be taken
source_index: usize,
/// the size of the original file
file_size: usize,
file_size: u64,
/// indexes of which group(s) will this be distributed to (including `source_index`)
new_groups: Vec<usize>,
}

impl ToRepartition {
// how big will each file range be when this file is read in its new groups?
fn range_size(&self) -> usize {
self.file_size / self.new_groups.len()
/// How big will each file range be when this file is read in its new groups?
fn range_size(&self) -> u64 {
self.file_size / (self.new_groups.len() as u64)
}
}

Expand Down
24 changes: 15 additions & 9 deletions datafusion/datasource/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub use self::url::ListingTableUrl;
use crate::file_groups::FileGroup;
use chrono::TimeZone;
use datafusion_common::stats::Precision;
use datafusion_common::{ColumnStatistics, Result};
use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result};
use datafusion_common::{ScalarValue, Statistics};
use file_meta::FileMeta;
use futures::{Stream, StreamExt};
Expand Down Expand Up @@ -123,7 +123,7 @@ impl PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(path.into()),
last_modified: chrono::Utc.timestamp_nanos(0),
size: size as usize,
size,
e_tag: None,
version: None,
},
Expand All @@ -141,7 +141,7 @@ impl PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(path),
last_modified: chrono::Utc.timestamp_nanos(0),
size: size as usize,
size,
e_tag: None,
version: None,
},
Expand Down Expand Up @@ -224,7 +224,7 @@ impl From<ObjectMeta> for PartitionedFile {
/// Indicates that the range calculation determined no further action is
/// necessary, possibly because the calculated range is empty or invalid.
pub enum RangeCalculation {
Range(Option<Range<usize>>),
Range(Option<Range<u64>>),
TerminateEarly,
}

Expand All @@ -250,7 +250,12 @@ pub async fn calculate_range(
match file_meta.range {
None => Ok(RangeCalculation::Range(None)),
Some(FileRange { start, end }) => {
let (start, end) = (start as usize, end as usize);
let start: u64 = start.try_into().map_err(|_| {
exec_datafusion_err!("Expect start range to fit in u64, got {start}")
})?;
let end: u64 = end.try_into().map_err(|_| {
exec_datafusion_err!("Expect end range to fit in u64, got {end}")
})?;

let start_delta = if start != 0 {
find_first_newline(store, location, start - 1, file_size, newline).await?
Expand Down Expand Up @@ -289,10 +294,10 @@ pub async fn calculate_range(
async fn find_first_newline(
object_store: &Arc<dyn ObjectStore>,
location: &Path,
start: usize,
end: usize,
start: u64,
end: u64,
newline: u8,
) -> Result<usize> {
) -> Result<u64> {
let options = GetOptions {
range: Some(GetRange::Bounded(start..end)),
..Default::default()
Expand All @@ -305,10 +310,11 @@ async fn find_first_newline(

while let Some(chunk) = result_stream.next().await.transpose()? {
if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
let position = position as u64;
return Ok(index + position);
}

index += chunk.len();
index += chunk.len() as u64;
}

Ok(index)
Expand Down