Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a653b15
ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply…
adriangb Apr 3, 2025
a5f998c
wip
adriangb Apr 3, 2025
7e2db66
fix tests
adriangb Apr 3, 2025
cb1f830
fix
adriangb Apr 3, 2025
e92d8b5
fix
adriangb Apr 3, 2025
ca391c1
fix doc
adriangb Apr 3, 2025
c78a590
fix doc
adriangb Apr 3, 2025
34c8285
Improve doc comments of `filter-pushdown-apis` (#22)
alamb Apr 5, 2025
e15374f
Apply suggestions from code review
adriangb Apr 5, 2025
2ceec35
simplify according to pr feedback
adriangb Apr 5, 2025
3fbf379
Add missing file
adriangb Apr 5, 2025
e6721d1
Add tests
adriangb Apr 5, 2025
b7b588b
pipe config in
adriangb Apr 5, 2025
d1f01dd
docstrings
adriangb Apr 5, 2025
5929d03
Update datafusion/physical-plan/src/filter_pushdown.rs
adriangb Apr 5, 2025
24483bc
fix
adriangb Apr 5, 2025
d0295ed
fix
adriangb Apr 6, 2025
2d46289
fmt
adriangb Apr 6, 2025
4318267
fix doc
adriangb Apr 6, 2025
7d29056
add example usage of config
adriangb Apr 6, 2025
d382bd3
fix test
adriangb Apr 6, 2025
2dfa8b8
convert exec API and optimizer rule
berkaysynnada Apr 14, 2025
cda6e8d
re-add docs
adriangb Apr 14, 2025
e4d8a8c
dbg
berkaysynnada Apr 16, 2025
3ec1b2a
dbg 2
berkaysynnada Apr 16, 2025
a2df5e0
avoid clones
adriangb Apr 16, 2025
6938d52
part 3
berkaysynnada Apr 16, 2025
6836dd4
fix lint
adriangb Apr 16, 2025
28bb8ea
Merge branch 'filter-pushdown-apis' into filter-pushdown-apis
berkaysynnada Apr 16, 2025
7e95283
tests pass
berkaysynnada Apr 16, 2025
e2f8c12
Update filter.rs
berkaysynnada Apr 16, 2025
bff47be
update projection tests
berkaysynnada Apr 16, 2025
ce49ad4
update slt files
adriangb Apr 16, 2025
d5792bc
Merge branch 'main' into filter-pushdown-apis
adriangb Apr 16, 2025
834f33e
fix
adriangb Apr 16, 2025
9e59246
fix references
adriangb Apr 16, 2025
57a1230
improve impls and update tests
berkaysynnada Apr 17, 2025
367377f
apply stop logic
berkaysynnada Apr 17, 2025
616165d
update slt's
berkaysynnada Apr 17, 2025
b30953f
update other tests
berkaysynnada Apr 17, 2025
ec54cca
minor
berkaysynnada Apr 17, 2025
6345315
rename modules to match logical optimizer, tweak docs
adriangb Apr 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
simplify according to pr feedback
  • Loading branch information
adriangb committed Apr 5, 2025
commit 2ceec35c3c09213f5fddc348f41301e201c298df
30 changes: 15 additions & 15 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use datafusion::{
},
scalar::ScalarValue,
};
use datafusion_common::internal_err;
use datafusion_common::{config::ConfigOptions, Statistics};
use datafusion_common::{internal_err, Result};
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::source::DataSourceExec;
use datafusion_datasource::{
Expand All @@ -40,8 +40,8 @@ use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::{
displayable, execution_plan::FilterSupport, metrics::ExecutionPlanMetricsSet,
DisplayFormatType, ExecutionPlan,
displayable, filter_pushdown::FilterPushdownSupport,
metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan,
};
use object_store::ObjectStore;
use std::sync::{Arc, OnceLock};
Expand All @@ -53,13 +53,13 @@ use std::{
/// A placeholder data source that accepts filter pushdown
#[derive(Clone)]
struct TestSource {
support: FilterSupport,
support: FilterPushdownSupport,
predicate: Option<PhysicalExprRef>,
statistics: Option<Statistics>,
}

impl TestSource {
fn new(support: FilterSupport) -> Self {
fn new(support: FilterPushdownSupport) -> Self {
Self {
support,
predicate: None,
Expand Down Expand Up @@ -105,7 +105,7 @@ impl FileSource for TestSource {
todo!("should not be called")
}

fn statistics(&self) -> datafusion_common::Result<Statistics> {
fn statistics(&self) -> Result<Statistics> {
Ok(self
.statistics
.as_ref()
Expand Down Expand Up @@ -137,23 +137,23 @@ impl FileSource for TestSource {
}
}

fn push_down_filters(
fn try_pushdown_filters(
&self,
filters: &[PhysicalExprRef],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for convenience and consistency, can we rename this as parent_filters as well?

) -> datafusion_common::Result<Option<FileSourceFilterPushdownResult>> {
) -> Result<FileSourceFilterPushdownResult> {
let new = Arc::new(TestSource {
support: self.support,
predicate: Some(conjunction(filters.iter().map(Arc::clone))),
statistics: self.statistics.clone(),
});
Ok(Some(FileSourceFilterPushdownResult::new(
Ok(FileSourceFilterPushdownResult::new(
new,
vec![self.support; filters.len()],
)))
))
}
}

fn test_scan(support: FilterSupport) -> Arc<dyn ExecutionPlan> {
fn test_scan(support: FilterPushdownSupport) -> Arc<dyn ExecutionPlan> {
let schema = schema();
let source = Arc::new(TestSource::new(support));
let base_config = FileScanConfigBuilder::new(
Expand All @@ -167,7 +167,7 @@ fn test_scan(support: FilterSupport) -> Arc<dyn ExecutionPlan> {

#[test]
fn test_pushdown_into_scan() {
let scan = test_scan(FilterSupport::HandledExact);
let scan = test_scan(FilterPushdownSupport::Exact);
let predicate = col_lit_predicate("a", "foo", schema());
let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap());

Expand All @@ -187,9 +187,9 @@ fn test_pushdown_into_scan() {
}

#[test]
test_filter_collapse() {
fn test_filter_collapse() {
// filter should be pushed down into the parquet scan with two filters
let scan = test_scan(FilterSupport::HandledExact);
let scan = test_scan(FilterPushdownSupport::Exact);
let predicate1 = col_lit_predicate("a", "foo", schema());
let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap());
let predicate2 = col_lit_predicate("b", "bar", schema());
Expand All @@ -205,7 +205,7 @@ test_filter_collapse() {
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test
output:
Ok:
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=a@0 = foo AND b@1 = bar
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, predicate=b@1 = bar AND a@0 = foo
"
);
}
Expand Down
20 changes: 7 additions & 13 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
use arrow::datatypes::SchemaRef;
use datafusion_common::Statistics;
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::{LexOrdering, PhysicalExprRef};
use datafusion_physical_plan::execution_plan::FilterPushdownResult;
use datafusion_physical_plan::filter_pushdown::FilterPushdownResult;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;

Expand Down Expand Up @@ -58,7 +58,7 @@ pub trait FileSource: Send + Sync {
/// Return execution plan metrics
fn metrics(&self) -> &ExecutionPlanMetricsSet;
/// Return projected statistics
fn statistics(&self) -> datafusion_common::Result<Statistics>;
fn statistics(&self) -> Result<Statistics>;
/// String representation of file source such as "csv", "json", "parquet"
fn file_type(&self) -> &str;
/// Format FileType specific information
Expand All @@ -76,7 +76,7 @@ pub trait FileSource: Send + Sync {
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
) -> datafusion_common::Result<Option<FileScanConfig>> {
) -> Result<Option<FileScanConfig>> {
if config.file_compression_type.is_compressed() || config.new_lines_in_values {
return Ok(None);
}
Expand All @@ -95,17 +95,11 @@ pub trait FileSource: Send + Sync {
Ok(None)
}

/// Push down filters to the file source if supported.
///
/// Returns `Ok(None)` by default. See [`ExecutionPlan::with_filter_pushdown_result`]
/// for more details.
///
/// [`ExecutionPlan::with_filter_pushdown_result`]: datafusion_physical_plan::execution_plan::ExecutionPlan::with_filter_pushdown_result
fn push_down_filters(
fn try_pushdown_filters(
&self,
_filters: &[PhysicalExprRef],
) -> datafusion_common::Result<Option<FileSourceFilterPushdownResult>> {
Ok(None)
) -> Result<FileSourceFilterPushdownResult> {
Ok(FileSourceFilterPushdownResult::NotPushed)
}
}

Expand Down
30 changes: 18 additions & 12 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use log::{debug, warn};

use crate::{
display::FileGroupsDisplay,
file::FileSource,
file::{FileSource, FileSourceFilterPushdownResult},
file_compression_type::FileCompressionType,
file_stream::FileStream,
source::{DataSource, DataSourceExec},
Expand Down Expand Up @@ -585,19 +585,25 @@ impl DataSource for FileScanConfig {
}))
}

fn push_down_filters(
fn try_pushdown_filters(
&self,
filters: &[PhysicalExprRef],
) -> Result<Option<DataSourceFilterPushdownResult>> {
if let Some(file_source_result) = self.file_source.push_down_filters(filters)? {
let mut new_self = self.clone();
new_self.file_source = file_source_result.inner;
Ok(Some(DataSourceFilterPushdownResult {
inner: Arc::new(new_self) as Arc<dyn DataSource>,
support: file_source_result.support,
}))
} else {
Ok(None)
) -> Result<DataSourceFilterPushdownResult> {
match self.file_source.try_pushdown_filters(filters)? {
FileSourceFilterPushdownResult::NotPushed => {
Ok(DataSourceFilterPushdownResult::NotPushed)
}
FileSourceFilterPushdownResult::Pushed { inner, support } => {
let new_self = Arc::new(
FileScanConfigBuilder::from(self.clone())
.with_source(inner)
.build(),
);
Ok(DataSourceFilterPushdownResult::Pushed {
inner: new_self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than saying "inner", can we emphasize this is the new self having the filter pushed down into it? Maybe renaming it as "updated" ? WDYT?

support,
})
}
}
}
}
Expand Down
62 changes: 22 additions & 40 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use datafusion_physical_plan::execution_plan::{
Boundedness, EmissionType, ExecutionPlanFilterPushdownResult, FilterPushdownResult,
FilterSupport,
};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanFilterPushdownResult,
FilterPushdownResult, PlanProperties,
};

use crate::file_scan_config::FileScanConfig;
Expand Down Expand Up @@ -83,24 +81,11 @@ pub trait DataSource: Send + Sync + Debug {
_projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>>;

/// Push down filters into this `DataSource`.
///
/// Returns `Ok(None)` if the filters cannot be evaluated within the
/// `DataSource`.
///
/// If the filters can be evaluated by the `DataSource`,
/// return a [`FilterPushdownResult`] containing an updated
/// `DataSource` and the support level for each filter (exact or inexact).
///
/// Default implementation returns `Ok(None)`. See [`ExecutionPlan::with_filter_pushdown_result`]
/// for more details.
///
/// [`ExecutionPlan::push_down_filters`]: datafusion_physical_plan::execution_plan::ExecutionPlan::with_filter_pushdown_result
fn push_down_filters(
fn try_pushdown_filters(
&self,
_filters: &[PhysicalExprRef],
) -> Result<Option<DataSourceFilterPushdownResult>>> {
Ok(None)
) -> Result<DataSourceFilterPushdownResult> {
Ok(DataSourceFilterPushdownResult::NotPushed)
}
}

Expand Down Expand Up @@ -218,25 +203,22 @@ impl ExecutionPlan for DataSourceExec {
self.data_source.try_swapping_with_projection(projection)
}

fn with_filter_pushdown_result(
self: Arc<Self>,
own_filters_result: &[FilterSupport],
parent_filters_remaining: &[PhysicalExprRef],
) -> Result<Option<ExecutionPlanFilterPushdownResult>> {
// We didn't give out any filters, this should be empty!
assert!(own_filters_result.is_empty());
// Forward filter pushdown to our data source.
if let Some(pushdown_result) = self
.data_source
.push_down_filters(parent_filters_remaining)?
{
let new_self = Arc::new(DataSourceExec::new(pushdown_result.inner));
Ok(Some(ExecutionPlanFilterPushdownResult::new(
new_self,
pushdown_result.support,
)))
} else {
Ok(None)
fn try_pushdown_filters(
&self,
_plan: &Arc<dyn ExecutionPlan>,
parent_filters: &[PhysicalExprRef],
) -> Result<ExecutionPlanFilterPushdownResult> {
match self.data_source.try_pushdown_filters(parent_filters)? {
DataSourceFilterPushdownResult::NotPushed => {
Ok(ExecutionPlanFilterPushdownResult::NotPushed)
}
DataSourceFilterPushdownResult::Pushed { inner, support } => {
let new_self = Arc::new(DataSourceExec::new(inner));
Ok(ExecutionPlanFilterPushdownResult::Pushed {
inner: new_self,
support,
})
}
}
}
}
Expand Down
Loading
Loading