Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Cleanups
  • Loading branch information
alamb committed Dec 1, 2025
commit 3a81112ba9aad0234e8e91e0c97de67a5f71ba66
21 changes: 9 additions & 12 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ mod test {
compute::cast,
datatypes::{DataType, Field, Schema, SchemaRef},
};
use arrow::record_batch::RecordBatch;
use bytes::{BufMut, Bytes, BytesMut};
use datafusion_common::{
assert_batches_eq, record_batch, stats::Precision, ColumnStatistics,
Expand All @@ -796,6 +797,7 @@ mod test {
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
use futures::stream::BoxStream;
use object_store::{memory::InMemory, path::Path, ObjectStore};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::AsyncFileReader;
Expand All @@ -812,7 +814,7 @@ mod test {
async fn count_batches_and_rows(
mut stream: std::pin::Pin<
Box<
dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>>
dyn Stream<Item = Result<RecordBatch, DataFusionError>>
+ Send,
>,
>,
Expand All @@ -827,13 +829,8 @@ mod test {
}

async fn collect_batches(
mut stream: std::pin::Pin<
Box<
dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>>
+ Send,
>,
>,
) -> Vec<arrow::array::RecordBatch> {
mut stream: BoxStream<'_, Result<RecordBatch, DataFusionError>>
) -> Vec<RecordBatch> {
let mut batches = vec![];
while let Some(Ok(batch)) = stream.next().await {
batches.push(batch);
Expand All @@ -844,7 +841,7 @@ mod test {
async fn write_parquet(
store: Arc<dyn ObjectStore>,
filename: &str,
batch: arrow::record_batch::RecordBatch,
batch: RecordBatch,
) -> usize {
let mut out = BytesMut::new().writer();
{
Expand All @@ -862,7 +859,7 @@ mod test {
async fn write_parquet_with_properties(
store: Arc<dyn ObjectStore>,
filename: &str,
batch: arrow::record_batch::RecordBatch,
batch: RecordBatch,
properties: WriterProperties,
) -> (usize, Bytes) {
let mut out = BytesMut::new().writer();
Expand Down Expand Up @@ -1650,8 +1647,8 @@ mod test {
impl SchemaMapper for CustomSchemaMapper {
fn map_batch(
&self,
batch: arrow::array::RecordBatch,
) -> datafusion_common::Result<arrow::array::RecordBatch> {
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch> {
let a_column = cast(batch.column(0), &DataType::UInt64)?;
// Add in a new column "b" with default value 0.0
let b_column =
Expand Down