diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index be92e93d25..d627b6a63d 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -51,6 +51,8 @@ pub struct IcebergTableScan { projection: Option>, /// Filters to apply to the table scan predicates: Option, + /// Optional limit on the number of rows to return + limit: Option, } impl IcebergTableScan { @@ -61,6 +63,7 @@ impl IcebergTableScan { schema: ArrowSchemaRef, projection: Option<&Vec>, filters: &[Expr], + limit: Option, ) -> Self { let output_schema = match projection { None => schema.clone(), @@ -76,6 +79,7 @@ impl IcebergTableScan { plan_properties, projection, predicates, + limit, } } @@ -95,6 +99,10 @@ impl IcebergTableScan { self.predicates.as_ref() } + pub fn limit(&self) -> Option { + self.limit + } + /// Computes [`PlanProperties`] used in query optimization. fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties { // TODO: @@ -146,9 +154,29 @@ impl ExecutionPlan for IcebergTableScan { ); let stream = futures::stream::once(fut).try_flatten(); + // Apply limit if specified + let limited_stream: Pin> + Send>> = + if let Some(limit) = self.limit { + let mut remaining = limit; + Box::pin(stream.try_filter_map(move |batch| { + futures::future::ready(if remaining == 0 { + Ok(None) + } else if batch.num_rows() <= remaining { + remaining -= batch.num_rows(); + Ok(Some(batch)) + } else { + let limited_batch = batch.slice(0, remaining); + remaining = 0; + Ok(Some(limited_batch)) + }) + })) + } else { + Box::pin(stream) + }; + Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), - stream, + limited_stream, ))) } } diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index ad616542a4..ae87342fa5 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -127,7 +127,7 @@ impl TableProvider for IcebergTableProvider { _state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], - _limit: Option, + limit: Option, ) -> DFResult> { // Load fresh table metadata from catalog let table = self @@ -143,6 +143,7 @@ impl TableProvider for IcebergTableProvider { self.schema.clone(), projection, filters, + limit, ))) } @@ -311,7 +312,7 @@ impl TableProvider for IcebergStaticTableProvider { _state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], - _limit: Option, + limit: Option, ) -> DFResult> { // Use cached table (no refresh) Ok(Arc::new(IcebergTableScan::new( @@ -320,6 +321,7 @@ impl TableProvider for IcebergStaticTableProvider { self.schema.clone(), projection, filters, + limit, ))) } @@ -774,4 +776,96 @@ mod tests { "Plan should contain SortExec when fanout is disabled" ); } + + #[tokio::test] + async fn test_limit_pushdown_static_provider() { + use datafusion::datasource::TableProvider; + + let table = get_test_table_from_metadata_file().await; + let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone()) + .await + .unwrap(); + + let ctx = SessionContext::new(); + let state = ctx.state(); + + // Test scan with limit + let scan_plan = table_provider + .scan(&state, None, &[], Some(10)) + .await + .unwrap(); + + // Verify that the scan plan is an IcebergTableScan + let iceberg_scan = scan_plan + .as_any() + .downcast_ref::() + .expect("Expected IcebergTableScan"); + + // Verify the limit is set + assert_eq!( + iceberg_scan.limit(), + Some(10), + "Limit should be set to 10 in the scan plan" + ); + } + + #[tokio::test] + async fn test_limit_pushdown_catalog_backed_provider() { + use datafusion::datasource::TableProvider; + + let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; + + let provider = + IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) + .await + .unwrap(); + + let ctx = SessionContext::new(); + let state = ctx.state(); + + // Test scan with limit + let scan_plan = provider.scan(&state, None, &[], Some(5)).await.unwrap(); + + // Verify that the scan plan is an IcebergTableScan + let iceberg_scan = scan_plan + .as_any() + .downcast_ref::() + .expect("Expected IcebergTableScan"); + + // Verify the limit is set + assert_eq!( + iceberg_scan.limit(), + Some(5), + "Limit should be set to 5 in the scan plan" + ); + } + + #[tokio::test] + async fn test_no_limit_pushdown() { + use datafusion::datasource::TableProvider; + + let table = get_test_table_from_metadata_file().await; + let table_provider = IcebergStaticTableProvider::try_new_from_table(table.clone()) + .await + .unwrap(); + + let ctx = SessionContext::new(); + let state = ctx.state(); + + // Test scan without limit + let scan_plan = table_provider.scan(&state, None, &[], None).await.unwrap(); + + // Verify that the scan plan is an IcebergTableScan + let iceberg_scan = scan_plan + .as_any() + .downcast_ref::() + .expect("Expected IcebergTableScan"); + + // Verify the limit is None + assert_eq!( + iceberg_scan.limit(), + None, + "Limit should be None when not specified" + ); + } }