Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub struct IcebergTableScan {
projection: Option<Vec<String>>,
/// Filters to apply to the table scan
predicates: Option<Predicate>,
/// Optional limit on the number of rows to return
limit: Option<usize>,
}

impl IcebergTableScan {
Expand All @@ -61,6 +63,7 @@ impl IcebergTableScan {
schema: ArrowSchemaRef,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Self {
let output_schema = match projection {
None => schema.clone(),
Expand All @@ -76,6 +79,7 @@ impl IcebergTableScan {
plan_properties,
projection,
predicates,
limit,
}
}

Expand All @@ -95,6 +99,10 @@ impl IcebergTableScan {
self.predicates.as_ref()
}

pub fn limit(&self) -> Option<usize> {
self.limit
}

/// Computes [`PlanProperties`] used in query optimization.
fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties {
// TODO:
Expand Down Expand Up @@ -146,9 +154,29 @@ impl ExecutionPlan for IcebergTableScan {
);
let stream = futures::stream::once(fut).try_flatten();

// Apply limit if specified
let limited_stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> =
if let Some(limit) = self.limit {
let mut remaining = limit;
Box::pin(stream.try_filter_map(move |batch| {
futures::future::ready(if remaining == 0 {
Ok(None)
} else if batch.num_rows() <= remaining {
remaining -= batch.num_rows();
Ok(Some(batch))
} else {
let limited_batch = batch.slice(0, remaining);
remaining = 0;
Ok(Some(limited_batch))
})
}))
} else {
Box::pin(stream)
};

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
stream,
limited_stream,
)))
}
}
Expand Down
98 changes: 96 additions & 2 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl TableProvider for IcebergTableProvider {
_state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
_limit: Option<usize>,
limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
// Load fresh table metadata from catalog
let table = self
Expand All @@ -143,6 +143,7 @@ impl TableProvider for IcebergTableProvider {
self.schema.clone(),
projection,
filters,
limit,
)))
}

Expand Down Expand Up @@ -311,7 +312,7 @@ impl TableProvider for IcebergStaticTableProvider {
_state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
_limit: Option<usize>,
limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
// Use cached table (no refresh)
Ok(Arc::new(IcebergTableScan::new(
Expand All @@ -320,6 +321,7 @@ impl TableProvider for IcebergStaticTableProvider {
self.schema.clone(),
projection,
filters,
limit,
)))
}

Expand Down Expand Up @@ -774,4 +776,96 @@ mod tests {
"Plan should contain SortExec when fanout is disabled"
);
}

#[tokio::test]
async fn test_limit_pushdown_static_provider() {
use datafusion::datasource::TableProvider;

let table = get_test_table_from_metadata_file().await;
let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
.await
.unwrap();

let ctx = SessionContext::new();
let state = ctx.state();

// Test scan with limit
let scan_plan = table_provider
.scan(&state, None, &[], Some(10))
.await
.unwrap();

// Verify that the scan plan is an IcebergTableScan
let iceberg_scan = scan_plan
.as_any()
.downcast_ref::<IcebergTableScan>()
.expect("Expected IcebergTableScan");

// Verify the limit is set
assert_eq!(
iceberg_scan.limit(),
Some(10),
"Limit should be set to 10 in the scan plan"
);
}

#[tokio::test]
async fn test_limit_pushdown_catalog_backed_provider() {
use datafusion::datasource::TableProvider;

let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;

let provider =
IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
.await
.unwrap();

let ctx = SessionContext::new();
let state = ctx.state();

// Test scan with limit
let scan_plan = provider.scan(&state, None, &[], Some(5)).await.unwrap();

// Verify that the scan plan is an IcebergTableScan
let iceberg_scan = scan_plan
.as_any()
.downcast_ref::<IcebergTableScan>()
.expect("Expected IcebergTableScan");

// Verify the limit is set
assert_eq!(
iceberg_scan.limit(),
Some(5),
"Limit should be set to 5 in the scan plan"
);
}

#[tokio::test]
async fn test_no_limit_pushdown() {
use datafusion::datasource::TableProvider;

let table = get_test_table_from_metadata_file().await;
let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone())
.await
.unwrap();

let ctx = SessionContext::new();
let state = ctx.state();

// Test scan without limit
let scan_plan = table_provider.scan(&state, None, &[], None).await.unwrap();

// Verify that the scan plan is an IcebergTableScan
let iceberg_scan = scan_plan
.as_any()
.downcast_ref::<IcebergTableScan>()
.expect("Expected IcebergTableScan");

// Verify the limit is None
assert_eq!(
iceberg_scan.limit(),
None,
"Limit should be None when not specified"
);
}
}
Loading