Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 27 additions & 3 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ impl SessionState {
}
}

let query = SqlToRel::new_with_options(&provider, self.get_parser_options());
let query = self.build_sql_query_planner(&provider);
query.statement_to_plan(statement)
}

Expand Down Expand Up @@ -658,8 +658,7 @@ impl SessionState {
tables: HashMap::new(),
};

let query = SqlToRel::new_with_options(&provider, self.get_parser_options());

let query = self.build_sql_query_planner(&provider);
query.sql_to_expr(sql_expr, df_schema, &mut PlannerContext::new())
}

Expand Down Expand Up @@ -943,6 +942,31 @@ impl SessionState {
let udtf = self.table_functions.remove(name);
Ok(udtf.map(|x| x.function().clone()))
}

fn build_sql_query_planner<'a, S>(&self, provider: &'a S) -> SqlToRel<'a, S>
where
S: ContextProvider,
{
let query = SqlToRel::new_with_options(provider, self.get_parser_options());

// register crate of array expressions (if enabled)
#[cfg(feature = "array_expressions")]
{
let array_planner =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Arc::new(functions_array::planner::ArrayFunctionPlanner::default()) as _;

let field_access_planner =
Arc::new(functions_array::planner::FieldAccessPlanner::default()) as _;

query
.with_user_defined_planner(array_planner)
.with_user_defined_planner(field_access_planner)
}
#[cfg(not(feature = "array_expressions"))]
{
query
}
}
}

struct SessionContextProvider<'a> {
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub mod function;
pub mod groups_accumulator;
pub mod interval_arithmetic;
pub mod logical_plan;
pub mod planner;
pub mod registry;
pub mod simplify;
pub mod sort_properties;
Expand Down
133 changes: 133 additions & 0 deletions datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! SQL query planner module
use std::sync::Arc;

use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::{
config::ConfigOptions, file_options::file_type::FileType, not_impl_err, DFSchema,
Result, TableReference,
};

use crate::{AggregateUDF, Expr, GetFieldAccess, ScalarUDF, TableSource, WindowUDF};

/// The ContextProvider trait allows the query planner to obtain meta-data about tables and
/// functions referenced in SQL statements
pub trait ContextProvider {
/// Getter for a datasource
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>>;

fn get_file_type(&self, _ext: &str) -> Result<Arc<dyn FileType>> {
not_impl_err!("Registered file types are not supported")
}

/// Getter for a table function
fn get_table_function_source(
&self,
_name: &str,
_args: Vec<Expr>,
) -> Result<Arc<dyn TableSource>> {
not_impl_err!("Table Functions are not supported")
}

/// This provides a worktable (an intermediate table that is used to store the results of a CTE during execution)
/// We don't directly implement this in the logical plan's ['SqlToRel`]
/// because the sql code needs access to a table that contains execution-related types that can't be a direct dependency
/// of the sql crate (namely, the `CteWorktable`).
/// The [`ContextProvider`] provides a way to "hide" this dependency.
fn create_cte_work_table(
&self,
_name: &str,
_schema: SchemaRef,
) -> Result<Arc<dyn TableSource>> {
not_impl_err!("Recursive CTE is not implemented")
}

/// Getter for a UDF description
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>;
/// Getter for a UDAF description
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>>;
/// Getter for a UDWF
fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>>;
/// Getter for system/user-defined variable type
fn get_variable_type(&self, variable_names: &[String]) -> Option<DataType>;

/// Get configuration options
fn options(&self) -> &ConfigOptions;

/// Get all user defined scalar function names
fn udf_names(&self) -> Vec<String>;

/// Get all user defined aggregate function names
fn udaf_names(&self) -> Vec<String>;

/// Get all user defined window function names
fn udwf_names(&self) -> Vec<String>;
}

/// This trait allows users to customize the behavior of the SQL planner
pub trait UserDefinedPlanner {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about different names for this struct that might be more specific to SQL and thus make it easier to find

How about UserDefinedSQLPlanner? or SQLPlannerExtensions?

/// Plan the binary operation between two expressions, returns OriginalBinaryExpr if not possible
fn plan_binary_op(
&self,
expr: BinaryExpr,
_schema: &DFSchema,
) -> Result<PlannerSimplifyResult> {
Ok(PlannerSimplifyResult::OriginalBinaryExpr(expr))
}

/// Plan the field access expression, returns OriginalFieldAccessExpr if not possible
fn plan_field_access(
&self,
expr: FieldAccessExpr,
_schema: &DFSchema,
) -> Result<PlannerSimplifyResult> {
Ok(PlannerSimplifyResult::OriginalFieldAccessExpr(expr))
}

// Plan the array literal, returns OriginalArray if not possible
fn plan_array_literal(
&self,
exprs: Vec<Expr>,
_schema: &DFSchema,
) -> Result<PlannerSimplifyResult> {
Ok(PlannerSimplifyResult::OriginalArray(exprs))
}
}

pub struct BinaryExpr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be confusing if this struct is also called BinaryExpr given https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.BinaryExpr.html. What do you think about callng this RawBinaryExpr or maybe SqlBinaryExpr (and then rename FieldAccessExpr similarly)?

Here is some suggested comments to explain the structure (it would be good to do the same for FieldAccessExpr too):

Suggested change
pub struct BinaryExpr {
/// An operator with two arguments to plan
///
/// Note `left` and `right` are DataFusion [`Expr`]s but the `op` is the SQL AST operator.
/// This structure is used by [`UserDefinedPlanner`] to plan operators with custom expressions.
pub struct BinaryExpr {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also struggle with this naming too 😆 . I will take RawBinaryExpr

pub op: sqlparser::ast::BinaryOperator,
pub left: Expr,
pub right: Expr,
}

pub struct FieldAccessExpr {
pub field_access: GetFieldAccess,
pub expr: Expr,
}

pub enum PlannerSimplifyResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this isn't really "simplifying" anything maybe we could call it PlannerResult instead of PlannerSimplifyResult

/// The function call was simplified to an entirely new Expr
Simplified(Expr),
/// the function call could not be simplified, and the arguments
/// are return unmodified.
OriginalBinaryExpr(BinaryExpr),
OriginalFieldAccessExpr(FieldAccessExpr),
OriginalArray(Vec<Expr>),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be easier to use if it were generic so that the callsites would know exactly the type of the returns Original

SO something like

Suggested change
pub enum PlannerSimplifyResult {
/// The function call was simplified to an entirely new Expr
Simplified(Expr),
/// the function call could not be simplified, and the arguments
/// are return unmodified.
OriginalBinaryExpr(BinaryExpr),
OriginalFieldAccessExpr(FieldAccessExpr),
OriginalArray(Vec<Expr>),
}
pub enum PlannerSimplifyResult<T> {
/// The structure was planned as an entirely new Expr by the planner
Simplified(Expr),
/// the planner did not handle planning the structure, and it is returned
/// unmodified.
OriginalBinaryExpr(T),
}

1 change: 1 addition & 0 deletions datafusion/functions-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ datafusion-functions = { workspace = true }
itertools = { version = "0.12", features = ["use_std"] }
log = { workspace = true }
paste = "1.0.14"
sqlparser = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since datafusion-expr already has a explicit dependency on sqlparser --

sqlparser = { workspace = true }

I think it would be cleaner here to avoid the explicit dependency on sqlparser and instead export it in datafusion-expr.

SO like in datafusion/expr/src/lib.rs put a pub use sqlparser;

That would ensure the versions always matched (though I think workspace mostly takes care of this) as well as make it clear that the functions-array really depends on the API in datafusion-expr rather than sqlparser directly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw do we also need to cleanup datafusion-sql as well as it import datafusion-expr and sqlparser too


[dev-dependencies]
criterion = { version = "0.5", features = ["async_tokio"] }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub mod extract;
pub mod flatten;
pub mod length;
pub mod make_array;
pub mod planner;
pub mod position;
pub mod range;
pub mod remove;
Expand All @@ -50,7 +51,6 @@ pub mod set_ops;
pub mod sort;
pub mod string;
pub mod utils;

use datafusion_common::Result;
use datafusion_execution::FunctionRegistry;
use datafusion_expr::ScalarUDF;
Expand Down
173 changes: 173 additions & 0 deletions datafusion/functions-array/src/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_common::{utils::list_ndims, DFSchema, Result};
use datafusion_expr::{
planner::{BinaryExpr, FieldAccessExpr, PlannerSimplifyResult, UserDefinedPlanner},
AggregateFunction, Expr, ExprSchemable, GetFieldAccess,
};
use datafusion_functions::expr_fn::get_field;

use crate::{
array_has::array_has_all,
expr_fn::{array_append, array_concat, array_prepend},
extract::{array_element, array_slice},
make_array::make_array,
};

#[derive(Default)]
pub struct ArrayFunctionPlanner {}

impl UserDefinedPlanner for ArrayFunctionPlanner {
fn plan_binary_op(
&self,
expr: BinaryExpr,
schema: &DFSchema,
) -> Result<PlannerSimplifyResult> {
let BinaryExpr { op, left, right } = expr;

if op == sqlparser::ast::BinaryOperator::StringConcat {
let left_type = left.get_type(schema)?;
let right_type = right.get_type(schema)?;
let left_list_ndims = list_ndims(&left_type);
let right_list_ndims = list_ndims(&right_type);

// Rewrite string concat operator to function based on types
// if we get list || list then we rewrite it to array_concat()
// if we get list || non-list then we rewrite it to array_append()
// if we get non-list || list then we rewrite it to array_prepend()
// if we get string || string then we rewrite it to concat()

// We determine the target function to rewrite based on the list n-dimension, the check is not exact but sufficient.
// The exact validity check is handled in the actual function, so even if there is 3d list appended with 1d list, it is also fine to rewrite.
if left_list_ndims + right_list_ndims == 0 {
// TODO: concat function ignore null, but string concat takes null into consideration
// we can rewrite it to concat if we can configure the behaviour of concat function to the one like `string concat operator`
} else if left_list_ndims == right_list_ndims {
return Ok(PlannerSimplifyResult::Simplified(array_concat(vec![
left, right,
])));
} else if left_list_ndims > right_list_ndims {
return Ok(PlannerSimplifyResult::Simplified(array_append(left, right)));
} else if left_list_ndims < right_list_ndims {
return Ok(PlannerSimplifyResult::Simplified(array_prepend(
left, right,
)));
}
} else if matches!(
op,
sqlparser::ast::BinaryOperator::AtArrow
| sqlparser::ast::BinaryOperator::ArrowAt
) {
let left_type = left.get_type(schema)?;
let right_type = right.get_type(schema)?;
let left_list_ndims = list_ndims(&left_type);
let right_list_ndims = list_ndims(&right_type);
// if both are list
if left_list_ndims > 0 && right_list_ndims > 0 {
if op == sqlparser::ast::BinaryOperator::AtArrow {
// array1 @> array2 -> array_has_all(array1, array2)
return Ok(PlannerSimplifyResult::Simplified(array_has_all(
left, right,
)));
} else {
// array1 <@ array2 -> array_has_all(array2, array1)
return Ok(PlannerSimplifyResult::Simplified(array_has_all(
right, left,
)));
}
}
}

Ok(PlannerSimplifyResult::OriginalBinaryExpr(BinaryExpr {
op,
left,
right,
}))
}

fn plan_array_literal(
&self,
exprs: Vec<Expr>,
_schema: &DFSchema,
) -> Result<PlannerSimplifyResult> {
Ok(PlannerSimplifyResult::Simplified(make_array(exprs)))
}
}

#[derive(Default)]
pub struct FieldAccessPlanner {}

impl UserDefinedPlanner for FieldAccessPlanner {
fn plan_field_access(
&self,
expr: FieldAccessExpr,
_schema: &DFSchema,
) -> Result<PlannerSimplifyResult> {
let FieldAccessExpr { expr, field_access } = expr;

match field_access {
// expr["field"] => get_field(expr, "field")
GetFieldAccess::NamedStructField { name } => {
Ok(PlannerSimplifyResult::Simplified(get_field(expr, name)))
}
// expr[idx] ==> array_element(expr, idx)
GetFieldAccess::ListIndex { key: index } => {
match expr {
// Special case for array_agg(expr)[index] to NTH_VALUE(expr, index)
Expr::AggregateFunction(agg_func) if is_array_agg(&agg_func) => {
Ok(PlannerSimplifyResult::Simplified(Expr::AggregateFunction(
datafusion_expr::expr::AggregateFunction::new(
AggregateFunction::NthValue,
agg_func
.args
.into_iter()
.chain(std::iter::once(*index))
.collect(),
agg_func.distinct,
agg_func.filter,
agg_func.order_by,
agg_func.null_treatment,
),
)))
}
_ => Ok(PlannerSimplifyResult::Simplified(array_element(
expr, *index,
))),
}
}
// expr[start, stop, stride] ==> array_slice(expr, start, stop, stride)
GetFieldAccess::ListRange {
start,
stop,
stride,
} => Ok(PlannerSimplifyResult::Simplified(array_slice(
expr,
*start,
*stop,
Some(*stride),
))),
}
}
}

fn is_array_agg(agg_func: &datafusion_expr::expr::AggregateFunction) -> bool {
agg_func.func_def
== datafusion_expr::expr::AggregateFunctionDefinition::BuiltIn(
AggregateFunction::ArrayAgg,
)
}
Loading