Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ mod roundtrip_tests {
use core::panic;
use datafusion::datasource::listing::ListingTable;
use datafusion::datasource::object_store::{
FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile,
ChunkObjectReader, FileMetaStream, ListEntryStream, ObjectStore, SizedFile,
};
use datafusion::error::DataFusionError;
use datafusion::{
Expand Down Expand Up @@ -895,7 +895,7 @@ mod roundtrip_tests {
fn file_reader(
&self,
_file: SizedFile,
) -> datafusion::error::Result<Arc<dyn ObjectReader>> {
) -> datafusion::error::Result<ChunkObjectReader> {
Err(DataFusionError::NotImplemented(
"this is only a test object store".to_string(),
))
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::StreamExt;

use super::FileFormat;
use crate::avro_to_arrow::read_avro_schema_from_reader;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::object_store::{ChunkObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
Expand All @@ -49,15 +49,15 @@ impl FileFormat for AvroFormat {
async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
let mut schemas = vec![];
while let Some(obj_reader) = readers.next().await {
let mut reader = obj_reader?.sync_reader()?;
let mut reader = obj_reader?;
let schema = read_avro_schema_from_reader(&mut reader)?;
schemas.push(schema);
}
let merged_schema = Schema::try_merge(schemas)?;
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(&self, _reader: ChunkObjectReader) -> Result<Statistics> {
Ok(Statistics::default())
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use async_trait::async_trait;
use futures::StreamExt;

use super::FileFormat;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::object_store::{ChunkObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
Expand Down Expand Up @@ -98,7 +98,7 @@ impl FileFormat for CsvFormat {
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(std::usize::MAX);

while let Some(obj_reader) = readers.next().await {
let mut reader = obj_reader?.sync_reader()?;
let mut reader = obj_reader?;
let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
&mut reader,
self.delimiter,
Expand All @@ -119,7 +119,7 @@ impl FileFormat for CsvFormat {
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(&self, _reader: ChunkObjectReader) -> Result<Statistics> {
Ok(Statistics::default())
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use futures::StreamExt;

use super::FileFormat;
use super::FileScanConfig;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::object_store::{ChunkObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::NdJsonExec;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl FileFormat for JsonFormat {
let mut schemas = Vec::new();
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
while let Some(obj_reader) = readers.next().await {
let mut reader = BufReader::new(obj_reader?.sync_reader()?);
let mut reader = BufReader::new(obj_reader?);
let iter = ValueIter::new(&mut reader, None);
let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
let should_take = records_to_read > 0;
Expand All @@ -81,7 +81,7 @@ impl FileFormat for JsonFormat {
Ok(Arc::new(schema))
}

async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(&self, _reader: ChunkObjectReader) -> Result<Statistics> {
Ok(Statistics::default())
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::FileScanConfig;
use crate::physical_plan::{ExecutionPlan, Statistics};

use crate::datasource::object_store::ChunkObjectReader;
use async_trait::async_trait;

use super::object_store::{ObjectReader, ObjectReaderStream};
use super::object_store::ObjectReaderStream;

/// This trait abstracts all the file format specific implementations
/// from the `TableProvider`. This helps code re-utilization accross
Expand All @@ -53,7 +54,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {

/// Infer the statistics for the provided object. The cost and accuracy of the
/// estimated statistics might vary greatly between file formats.
async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) -> Result<Statistics>;
async fn infer_stats(&self, reader: ChunkObjectReader) -> Result<Statistics>;

/// Take a list of files and convert it to the appropriate executor
/// according to this file format.
Expand Down
27 changes: 10 additions & 17 deletions datafusion/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Parquet format abstractions

use std::any::Any;
use std::io::Read;
use std::sync::Arc;

use arrow::datatypes::Schema;
Expand All @@ -40,7 +39,7 @@ use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
};
use crate::arrow::datatypes::{DataType, Field};
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::object_store::{ChunkObjectReader, ObjectReaderStream};
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::DataFusionError;
use crate::error::Result;
Expand Down Expand Up @@ -98,7 +97,7 @@ impl FileFormat for ParquetFormat {
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(&self, reader: ChunkObjectReader) -> Result<Statistics> {
let stats = fetch_statistics(reader)?;
Ok(stats)
}
Expand Down Expand Up @@ -268,19 +267,17 @@ fn summarize_min_max(
}

/// Read and parse the schema of the Parquet file at location `path`
fn fetch_schema(object_reader: Arc<dyn ObjectReader>) -> Result<Schema> {
let obj_reader = ChunkObjectReader(object_reader);
let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
fn fetch_schema(object_reader: ChunkObjectReader) -> Result<Schema> {
let file_reader = Arc::new(SerializedFileReader::new(object_reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let schema = arrow_reader.get_schema()?;

Ok(schema)
}

/// Read and parse the statistics of the Parquet file at location `path`
fn fetch_statistics(object_reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
let obj_reader = ChunkObjectReader(object_reader);
let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
fn fetch_statistics(object_reader: ChunkObjectReader) -> Result<Statistics> {
let file_reader = Arc::new(SerializedFileReader::new(object_reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let schema = arrow_reader.get_schema()?;
let num_fields = schema.fields().len();
Expand Down Expand Up @@ -336,22 +333,18 @@ fn fetch_statistics(object_reader: Arc<dyn ObjectReader>) -> Result<Statistics>
Ok(statistics)
}

/// A wrapper around the object reader to make it implement `ChunkReader`
pub struct ChunkObjectReader(pub Arc<dyn ObjectReader>);

impl Length for ChunkObjectReader {
fn len(&self) -> u64 {
self.0.length()
self.0.chunk_length()
}
}

impl ChunkReader for ChunkObjectReader {
type T = Box<dyn Read + Send + Sync>;
type T = ChunkObjectReader;

fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T> {
self.0
.sync_chunk_reader(start, length)
.map_err(|e| ParquetError::ArrowError(e.to_string()))
Ok(ChunkObjectReader(self.0.slice(start, length)
.map_err(|e| ParquetError::ArrowError(e.to_string()))?))
}
}

Expand Down
69 changes: 48 additions & 21 deletions datafusion/src/datasource/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use std::sync::Arc;

use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt};
use parking_lot::Mutex;

use crate::datasource::object_store::{
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
ChunkObjectReader, FileMeta, FileMetaStream, ListEntryStream, ObjectReader,
ObjectStore,
};
use crate::datasource::PartitionedFile;
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -55,18 +57,54 @@ impl ObjectStore for LocalFileSystem {
todo!()
}

fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
Ok(Arc::new(LocalFileReader::new(file)?))
fn file_reader(&self, file: SizedFile) -> Result<ChunkObjectReader> {
Ok(ChunkObjectReader(Box::new(LocalFileReader::new(file)?)))
}
}

struct LocalFileReader {
file: SizedFile,
r: Arc<Mutex<BufReader<File>>>,
total_size: u64,
current_pos: u64,
chunk_range: Option<(u64, u64)>,
}

impl LocalFileReader {
fn new(file: SizedFile) -> Result<Self> {
Ok(Self { file })
Ok(Self {
r: Arc::new(Mutex::new(BufReader::new(File::open(file.path)?))),
total_size: file.size,
current_pos: 0,
chunk_range: None,
})
}

fn slice(&self, start: u64, length: usize) -> Result<Box<dyn ObjectReader>> {
let end = start + length as u64;
assert!(end <= self.total_size);

let r = self.r.clone();
r.lock().seek(SeekFrom::Start(start))?;
Ok(Box::new(LocalFileReader {
r,
total_size: self.total_size,
current_pos: start,
chunk_range: Some((start, end)),
}))
}

fn chunk_length(&self) -> u64 {
self.chunk_range.map_or(self.total_size, |r| r.1 - r.0)
}
}

impl Read for LocalFileReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let chunk_end = self.chunk_range.map_or(self.total_size, |r| r.1);
let read_len = std::cmp::min(buf.len(), (chunk_end - self.current_pos) as usize);
let read_len = self.r.lock().read(&mut buf[..read_len])?;
self.current_pos += read_len as u64;
Ok(read_len)
}
}

Expand All @@ -82,23 +120,12 @@ impl ObjectReader for LocalFileReader {
)
}

fn sync_chunk_reader(
&self,
start: u64,
length: usize,
) -> Result<Box<dyn Read + Send + Sync>> {
// A new file descriptor is opened for each chunk reader.
// This okay because chunks are usually fairly large.
let mut file = File::open(&self.file.path)?;
file.seek(SeekFrom::Start(start))?;

let file = BufReader::new(file.take(length as u64));

Ok(Box::new(file))
fn slice(&self, start: u64, length: usize) -> Result<Box<dyn ObjectReader>> {
self.slice(start, length)
}

fn length(&self) -> u64 {
self.file.size
fn chunk_length(&self) -> u64 {
self.chunk_length()
}
}

Expand Down Expand Up @@ -167,7 +194,7 @@ pub fn local_object_reader_stream(files: Vec<String>) -> ObjectReaderStream {
}

/// Helper method to convert a file location to a `LocalFileReader`
pub fn local_object_reader(file: String) -> Arc<dyn ObjectReader> {
pub fn local_object_reader(file: String) -> ChunkObjectReader {
LocalFileSystem
.file_reader(local_unpartitioned_file(file).file_meta.sized_file)
.expect("File not found")
Expand Down
32 changes: 16 additions & 16 deletions datafusion/src/datasource/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

pub mod local;

use parking_lot::RwLock;
use parking_lot::{RwLock};
use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::io::Read;
Expand All @@ -39,25 +39,25 @@ use crate::error::{DataFusionError, Result};
/// Note that the dynamic dispatch on the reader might
/// have some performance impacts.
#[async_trait]
pub trait ObjectReader: Send + Sync {
pub trait ObjectReader: Read + Send + Sync {
/// Get reader for a part [start, start + length] in the file asynchronously
async fn chunk_reader(&self, start: u64, length: usize)
-> Result<Box<dyn AsyncRead>>;

/// Get reader for a part [start, start + length] in the file
fn sync_chunk_reader(
&self,
start: u64,
length: usize,
) -> Result<Box<dyn Read + Send + Sync>>;
/// get reader for slice [start, start + length] of the file
fn slice(&self, start: u64, length: usize) -> Result<Box<dyn ObjectReader>>;

/// Get reader for the entire file
fn sync_reader(&self) -> Result<Box<dyn Read + Send + Sync>> {
self.sync_chunk_reader(0, self.length() as usize)
}
/// length of the current chunk, if it's a whole file, then the file length
fn chunk_length(&self) -> u64;
}

/// Get the size of the file
fn length(&self) -> u64;
/// Chunked Reader
pub struct ChunkObjectReader(pub Box<dyn ObjectReader>);

impl Read for ChunkObjectReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.0.read(buf)
}
}

/// Represents a specific file or a prefix (folder) that may
Expand Down Expand Up @@ -123,7 +123,7 @@ pub type ListEntryStream =

/// Stream readers opened on a given object store
pub type ObjectReaderStream =
Pin<Box<dyn Stream<Item = Result<Arc<dyn ObjectReader>>> + Send + Sync>>;
Pin<Box<dyn Stream<Item = Result<ChunkObjectReader>> + Send + Sync>>;

/// A ObjectStore abstracts access to an underlying file/object storage.
/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
Expand Down Expand Up @@ -158,7 +158,7 @@ pub trait ObjectStore: Sync + Send + Debug {
) -> Result<ListEntryStream>;

/// Get object reader for one file
fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>;
fn file_reader(&self, file: SizedFile) -> Result<ChunkObjectReader>;
}

static LOCAL_SCHEME: &str = "file";
Expand Down
Loading