1717
1818//! Contains reader which reads parquet data into arrow [`RecordBatch`]
1919
20- use std:: collections:: VecDeque ;
21- use std:: sync:: Arc ;
22-
20+ use arrow_array:: builder:: UInt64Builder ;
2321use arrow_array:: cast:: AsArray ;
24- use arrow_array:: Array ;
22+ use arrow_array:: { Array , ArrayRef } ;
2523use arrow_array:: { RecordBatch , RecordBatchReader } ;
26- use arrow_schema:: { ArrowError , DataType as ArrowType , Schema , SchemaRef } ;
24+ use arrow_schema:: { ArrowError , DataType as ArrowType , Field , FieldRef , Schema , SchemaRef } ;
2725use arrow_select:: filter:: prep_null_mask_filter;
2826pub use filter:: { ArrowPredicate , ArrowPredicateFn , RowFilter } ;
2927pub use selection:: { RowSelection , RowSelector } ;
28+ use std:: collections:: VecDeque ;
29+ use std:: sync:: Arc ;
3030
3131pub use crate :: arrow:: array_reader:: RowGroups ;
3232use crate :: arrow:: array_reader:: { build_array_reader, ArrayReader } ;
@@ -72,6 +72,10 @@ pub struct ArrowReaderBuilder<T> {
7272 pub ( crate ) limit : Option < usize > ,
7373
7474 pub ( crate ) offset : Option < usize > ,
75+
76+ pub ( crate ) row_id : Option < FieldRef > ,
77+
78+ pub ( crate ) prefetch : Option < ProjectionMask > ,
7579}
7680
7781impl < T > ArrowReaderBuilder < T > {
@@ -88,6 +92,8 @@ impl<T> ArrowReaderBuilder<T> {
8892 selection : None ,
8993 limit : None ,
9094 offset : None ,
95+ row_id : None ,
96+ prefetch : None ,
9197 }
9298 }
9399
@@ -114,6 +120,15 @@ impl<T> ArrowReaderBuilder<T> {
114120 Self { batch_size, ..self }
115121 }
116122
123+ /// Project a column into the result with name `field_name` that will contain the row ID
124+ /// for each row. The row ID will be the row offset of the row in the underlying file
125+ pub fn with_row_id ( self , field_name : impl Into < String > ) -> Self {
126+ Self {
127+ row_id : Some ( RowId :: field_ref ( field_name) ) ,
128+ ..self
129+ }
130+ }
131+
117132 /// Only read data from the provided row group indexes
118133 ///
119134 /// This is also called row group filtering
@@ -132,6 +147,15 @@ impl<T> ArrowReaderBuilder<T> {
132147 }
133148 }
134149
150+ /// If evaluating a `RowFilter` also prefetch the columns in `mask`
151+ /// while fetching row filter columns
152+ pub fn with_prefetch ( self , mask : Option < ProjectionMask > ) -> Self {
153+ Self {
154+ prefetch : mask,
155+ ..self
156+ }
157+ }
158+
135159 /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
136160 /// data into memory.
137161 ///
@@ -623,6 +647,8 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
623647 batch_size,
624648 array_reader,
625649 apply_range ( selection, reader. num_rows ( ) , self . offset , self . limit ) ,
650+ // TODO what do we do here?
651+ None ,
626652 ) )
627653 }
628654}
@@ -684,13 +710,55 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
684710
685711impl < T : ChunkReader + ' static > PageIterator for ReaderPageIterator < T > { }
686712
713+ pub ( crate ) struct RowId {
714+ offset : u64 ,
715+ field : FieldRef ,
716+ buffer : UInt64Builder ,
717+ }
718+
719+ impl RowId {
720+ pub fn new ( offset : u64 , field : FieldRef , batch_size : usize ) -> Self {
721+ Self {
722+ offset,
723+ field,
724+ buffer : UInt64Builder :: with_capacity ( batch_size) ,
725+ }
726+ }
727+
728+ pub fn field_ref ( name : impl Into < String > ) -> FieldRef {
729+ Arc :: new ( Field :: new ( name, ArrowType :: UInt64 , false ) )
730+ }
731+
732+ pub fn skip ( & mut self , n : usize ) {
733+ self . offset += n as u64 ;
734+ }
735+
736+ pub fn field ( & self ) -> FieldRef {
737+ self . field . clone ( )
738+ }
739+
740+ fn read ( & mut self , n : usize ) {
741+ // SAFETY: We are appending a `Range<u64>` which has a trusted length
742+ unsafe {
743+ self . buffer
744+ . append_trusted_len_iter ( self . offset ..self . offset + n as u64 )
745+ }
746+ self . offset += n as u64 ;
747+ }
748+
749+ fn consume ( & mut self ) -> ArrayRef {
750+ Arc :: new ( self . buffer . finish ( ) )
751+ }
752+ }
753+
687754/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
688755/// read from a parquet data source
689756pub struct ParquetRecordBatchReader {
690757 batch_size : usize ,
691758 array_reader : Box < dyn ArrayReader > ,
692759 schema : SchemaRef ,
693760 selection : Option < VecDeque < RowSelector > > ,
761+ row_id : Option < RowId > ,
694762}
695763
696764impl Iterator for ParquetRecordBatchReader {
@@ -708,6 +776,10 @@ impl Iterator for ParquetRecordBatchReader {
708776 Err ( e) => return Some ( Err ( e. into ( ) ) ) ,
709777 } ;
710778
779+ if let Some ( row_id) = self . row_id . as_mut ( ) {
780+ row_id. skip ( skipped) ;
781+ }
782+
711783 if skipped != front. row_count {
712784 return Some ( Err ( general_err ! (
713785 "failed to skip rows, expected {}, got {}" ,
@@ -738,16 +810,24 @@ impl Iterator for ParquetRecordBatchReader {
738810 } ;
739811 match self . array_reader . read_records ( to_read) {
740812 Ok ( 0 ) => break ,
741- Ok ( rec) => read_records += rec,
813+ Ok ( rec) => {
814+ if let Some ( rowid) = self . row_id . as_mut ( ) {
815+ rowid. read ( rec) ;
816+ }
817+ read_records += rec
818+ }
742819 Err ( error) => return Some ( Err ( error. into ( ) ) ) ,
743820 }
744821 }
745822 }
746- None => {
747- if let Err ( error) = self . array_reader . read_records ( self . batch_size ) {
748- return Some ( Err ( error. into ( ) ) ) ;
823+ None => match self . array_reader . read_records ( self . batch_size ) {
824+ Ok ( n) => {
825+ if let Some ( rowid) = self . row_id . as_mut ( ) {
826+ rowid. read ( n) ;
827+ }
749828 }
750- }
829+ Err ( error) => return Some ( Err ( error. into ( ) ) ) ,
830+ } ,
751831 } ;
752832
753833 match self . array_reader . consume_batch ( ) {
@@ -761,7 +841,23 @@ impl Iterator for ParquetRecordBatchReader {
761841
762842 match struct_array {
763843 Err ( err) => Some ( Err ( err) ) ,
764- Ok ( e) => ( e. len ( ) > 0 ) . then ( || Ok ( RecordBatch :: from ( e) ) ) ,
844+ Ok ( e) => {
845+ if e. len ( ) > 0 {
846+ Some ( Ok ( match self . row_id . as_mut ( ) {
847+ Some ( rowid) => {
848+ let columns = std:: iter:: once ( rowid. consume ( ) )
849+ . chain ( e. columns ( ) . iter ( ) . cloned ( ) )
850+ . collect ( ) ;
851+
852+ RecordBatch :: try_new ( self . schema . clone ( ) , columns)
853+ . expect ( "invalid schema" )
854+ }
855+ None => RecordBatch :: from ( e) ,
856+ } ) )
857+ } else {
858+ None
859+ }
860+ }
765861 }
766862 }
767863 }
@@ -806,6 +902,7 @@ impl ParquetRecordBatchReader {
806902 array_reader,
807903 schema : Arc :: new ( Schema :: new ( levels. fields . clone ( ) ) ) ,
808904 selection : selection. map ( |s| s. trim ( ) . into ( ) ) ,
905+ row_id : None ,
809906 } )
810907 }
811908
@@ -816,17 +913,29 @@ impl ParquetRecordBatchReader {
816913 batch_size : usize ,
817914 array_reader : Box < dyn ArrayReader > ,
818915 selection : Option < RowSelection > ,
916+ rowid : Option < RowId > ,
819917 ) -> Self {
820- let schema = match array_reader. get_data_type ( ) {
821- ArrowType :: Struct ( ref fields) => Schema :: new ( fields. clone ( ) ) ,
918+ let struct_fields = match array_reader. get_data_type ( ) {
919+ ArrowType :: Struct ( ref fields) => fields. clone ( ) ,
822920 _ => unreachable ! ( "Struct array reader's data type is not struct!" ) ,
823921 } ;
824922
923+ let schema = match rowid. as_ref ( ) {
924+ Some ( rowid) => {
925+ let fields: Vec < _ > = std:: iter:: once ( rowid. field ( ) )
926+ . chain ( struct_fields. iter ( ) . cloned ( ) )
927+ . collect ( ) ;
928+ Schema :: new ( fields)
929+ }
930+ None => Schema :: new ( struct_fields) ,
931+ } ;
932+
825933 Self {
826934 batch_size,
827935 array_reader,
828936 schema : Arc :: new ( schema) ,
829937 selection : selection. map ( |s| s. trim ( ) . into ( ) ) ,
938+ row_id : rowid,
830939 }
831940 }
832941}
@@ -887,7 +996,8 @@ pub(crate) fn evaluate_predicate(
887996 input_selection : Option < RowSelection > ,
888997 predicate : & mut dyn ArrowPredicate ,
889998) -> Result < RowSelection > {
890- let reader = ParquetRecordBatchReader :: new ( batch_size, array_reader, input_selection. clone ( ) ) ;
999+ let reader =
1000+ ParquetRecordBatchReader :: new ( batch_size, array_reader, input_selection. clone ( ) , None ) ;
8911001 let mut filters = vec ! [ ] ;
8921002 for maybe_batch in reader {
8931003 let maybe_batch = maybe_batch?;
@@ -935,7 +1045,8 @@ pub(crate) async fn evaluate_predicate_coop(
9351045) -> Result < RowSelection > {
9361046 let mut budget = DECODE_BUDGET ;
9371047
938- let reader = ParquetRecordBatchReader :: new ( batch_size, array_reader, input_selection. clone ( ) ) ;
1048+ let reader =
1049+ ParquetRecordBatchReader :: new ( batch_size, array_reader, input_selection. clone ( ) , None ) ;
9391050 let mut filters = vec ! [ ] ;
9401051 for maybe_batch in reader {
9411052 let maybe_batch = maybe_batch?;
0 commit comments