Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
25e42c8
update dependencies
emgeee Sep 9, 2024
9cea1fb
update get_logical_plan signature
emgeee Sep 9, 2024
6fca28b
remove row_number() function
emgeee Sep 9, 2024
f2b3d3b
remove unneeded dependency
emgeee Sep 9, 2024
4b45a4b
fix pyo3 warnings
emgeee Sep 10, 2024
6353aa9
update object_store dependency
emgeee Sep 10, 2024
815b6d7
change PyExpr -> PySortExpr
emgeee Sep 10, 2024
92806a8
comment out key.extract::<&PyTuple>() condition statement
emgeee Sep 10, 2024
e2fa24e
change more instances of PyExpr > PySortExpr
emgeee Sep 10, 2024
21013a7
update function signatures to use _bound versions
emgeee Sep 10, 2024
142e4ed
remove clone
emgeee Sep 10, 2024
e971add
Working through some of the sort requirement changes
timsaucer Sep 10, 2024
c89357e
remove unused import
emgeee Sep 10, 2024
8255f09
expr.display_name is deprecated, used format!() + schema_name() instead
emgeee Sep 10, 2024
df46054
expr.canonical_name() is deprecated, use format!() expr instead
emgeee Sep 10, 2024
6c27614
remove comment
emgeee Sep 10, 2024
70546e2
fix tuple extraction in dataframe.__getitem__()
emgeee Sep 10, 2024
836061f
remove unneeded import
emgeee Sep 10, 2024
4945661
Add docstring comments to SortExpr python class
emgeee Sep 10, 2024
cd04c44
change extract() to downcast()
emgeee Sep 10, 2024
afcc9f1
deprecate Expr::display_name
Michael-J-Ward Aug 10, 2024
7f6187a
fix lint errors
emgeee Sep 10, 2024
8aebaea
update datafusion commit hash
emgeee Sep 11, 2024
afa303f
fix type in cargo file for arrow features
emgeee Sep 17, 2024
f4574ec
upgrade to datafusion 42
emgeee Sep 17, 2024
88ccbd8
cleanup
emgeee Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
change PyExpr -> PySortExpr
  • Loading branch information
emgeee committed Sep 10, 2024
commit 815b6d74f27e25366ef2384aab179b04fb152817
8 changes: 4 additions & 4 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::catalog::{PyCatalog, PyTable};
use crate::dataframe::PyDataFrame;
use crate::dataset::Dataset;
use crate::errors::{py_datafusion_err, DataFusionError};
use crate::expr::PyExpr;
use crate::expr::sort_expr::PySortExpr;
use crate::physical_plan::PyExecutionPlan;
use crate::record_batch::PyRecordBatchStream;
use crate::sql::logical::PyLogicalPlan;
Expand Down Expand Up @@ -333,7 +333,7 @@ impl PySessionContext {
table_partition_cols: Vec<(String, String)>,
file_extension: &str,
schema: Option<PyArrowType<Schema>>,
file_sort_order: Option<Vec<Vec<PyExpr>>>,
file_sort_order: Option<Vec<Vec<PySortExpr>>>,
py: Python,
) -> PyResult<()> {
let options = ListingOptions::new(Arc::new(ParquetFormat::new()))
Expand Down Expand Up @@ -589,7 +589,7 @@ impl PySessionContext {
file_extension: &str,
skip_metadata: bool,
schema: Option<PyArrowType<Schema>>,
file_sort_order: Option<Vec<Vec<PyExpr>>>,
file_sort_order: Option<Vec<Vec<PySortExpr>>>,
py: Python,
) -> PyResult<()> {
let mut options = ParquetReadOptions::default()
Expand Down Expand Up @@ -890,7 +890,7 @@ impl PySessionContext {
file_extension: &str,
skip_metadata: bool,
schema: Option<PyArrowType<Schema>>,
file_sort_order: Option<Vec<Vec<PyExpr>>>,
file_sort_order: Option<Vec<Vec<PySortExpr>>>,
py: Python,
) -> PyResult<PyDataFrame> {
let mut options = ParquetReadOptions::default()
Expand Down
6 changes: 3 additions & 3 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ use pyo3::types::{PyCapsule, PyTuple};
use tokio::task::JoinHandle;

use crate::errors::py_datafusion_err;
use crate::expr::to_sort_expressions;
use crate::expr::sort_expr::to_sort_expressions;
use crate::physical_plan::PyExecutionPlan;
use crate::record_batch::PyRecordBatchStream;
use crate::sql::logical::PyLogicalPlan;
use crate::utils::{get_tokio_runtime, wait_for_future};
use crate::{errors::DataFusionError, expr::PyExpr};
use crate::{errors::DataFusionError, expr::{PyExpr, sort_expr::PySortExpr}};

/// A PyDataFrame is a representation of a logical plan and an API to compose statements.
/// Use it to build a plan and `.collect()` to execute the plan and collect the result.
Expand Down Expand Up @@ -196,7 +196,7 @@ impl PyDataFrame {
}

#[pyo3(signature = (*exprs))]
fn sort(&self, exprs: Vec<PyExpr>) -> PyResult<Self> {
fn sort(&self, exprs: Vec<PySortExpr>) -> PyResult<Self> {
let exprs = to_sort_expressions(exprs);
let df = self.df.as_ref().clone().sort(exprs)?;
Ok(Self::new(df))
Expand Down
17 changes: 4 additions & 13 deletions src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub mod unnest;
pub mod unnest_expr;
pub mod window;

use sort_expr::{PySortExpr, to_sort_expressions};

/// A PyExpr that can be used on a DataFrame
#[pyclass(name = "Expr", module = "datafusion.expr", subclass)]
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -518,7 +520,7 @@ impl PyExpr {

// Expression Function Builder functions

pub fn order_by(&self, order_by: Vec<PyExpr>) -> PyExprFuncBuilder {
pub fn order_by(&self, order_by: Vec<PySortExpr>) -> PyExprFuncBuilder {
self.expr
.clone()
.order_by(to_sort_expressions(order_by))
Expand Down Expand Up @@ -562,20 +564,9 @@ impl From<ExprFuncBuilder> for PyExprFuncBuilder {
}
}

pub fn to_sort_expressions(order_by: Vec<PyExpr>) -> Vec<Expr> {
order_by
.iter()
.map(|e| e.expr.clone())
.map(|e| match e {
Expr::Sort(_) => e,
_ => e.sort(true, true),
})
.collect()
}

#[pymethods]
impl PyExprFuncBuilder {
pub fn order_by(&self, order_by: Vec<PyExpr>) -> PyExprFuncBuilder {
pub fn order_by(&self, order_by: Vec<PySortExpr>) -> PyExprFuncBuilder {
self.builder
.clone()
.order_by(to_sort_expressions(order_by))
Expand Down
9 changes: 8 additions & 1 deletion src/expr/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,17 @@ impl Display for PySortExpr {
}
}

pub fn to_sort_expressions(order_by: Vec<PySortExpr>) -> Vec<SortExpr> {
order_by
.iter()
.map(|e| e.sort.clone())
.collect()
}

#[pymethods]
impl PySortExpr {
fn expr(&self) -> PyResult<PyExpr> {
Ok((*self.sort.expr).clone().into())
Ok(self.sort.expr.clone().into())
}

fn ascending(&self) -> PyResult<bool> {
Expand Down
78 changes: 41 additions & 37 deletions src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use crate::common::data_type::NullTreatment;
use crate::context::PySessionContext;
use crate::errors::DataFusionError;
use crate::expr::conditional_expr::PyCaseBuilder;
use crate::expr::to_sort_expressions;
use crate::expr::sort_expr::to_sort_expressions;
use crate::expr::sort_expr::PySortExpr;
use crate::expr::window::PyWindowFrame;
use crate::expr::PyExpr;
use datafusion::common::{Column, ScalarValue, TableReference};
Expand All @@ -35,15 +36,15 @@ use datafusion::functions_aggregate;
use datafusion::logical_expr::expr::Alias;
use datafusion::logical_expr::sqlparser::ast::NullTreatment as DFNullTreatment;
use datafusion::logical_expr::{
expr::{find_df_window_func, Sort, WindowFunction},
expr::{find_df_window_func, WindowFunction},
lit, Expr, WindowFunctionDefinition,
};

fn add_builder_fns_to_aggregate(
agg_fn: Expr,
distinct: Option<bool>,
filter: Option<PyExpr>,
order_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PySortExpr>>,
null_treatment: Option<NullTreatment>,
) -> PyResult<PyExpr> {
// Since ExprFuncBuilder::new() is private, we can guarantee initializing
Expand Down Expand Up @@ -174,14 +175,16 @@ fn regexp_replace(
}
/// Creates a new Sort Expr
#[pyfunction]
fn order_by(expr: PyExpr, asc: bool, nulls_first: bool) -> PyResult<PyExpr> {
Ok(PyExpr {
expr: datafusion::logical_expr::Expr::Sort(Sort {
expr: Box::new(expr.expr),
asc,
nulls_first,
}),
})
fn order_by(expr: PyExpr, asc: bool, nulls_first: bool) -> PyResult<PySortExpr> {
Ok(
PySortExpr::from(
datafusion::logical_expr::expr::Sort {
expr: expr.expr,
asc,
nulls_first,
}
)
)
}

/// Creates a new Alias Expr
Expand Down Expand Up @@ -342,7 +345,7 @@ macro_rules! aggregate_function {
$($arg: PyExpr),*,
distinct: Option<bool>,
filter: Option<PyExpr>,
order_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PySortExpr>>,
null_treatment: Option<NullTreatment>
) -> PyResult<PyExpr> {
let agg_fn = functions_aggregate::expr_fn::$NAME($($arg.into()),*);
Expand All @@ -363,7 +366,7 @@ macro_rules! aggregate_function_vec_args {
$($arg: PyExpr),*,
distinct: Option<bool>,
filter: Option<PyExpr>,
order_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PySortExpr>>,
null_treatment: Option<NullTreatment>
) -> PyResult<PyExpr> {
let agg_fn = functions_aggregate::expr_fn::$NAME(vec![$($arg.into()),*]);
Expand Down Expand Up @@ -677,7 +680,7 @@ pub fn first_value(
expr: PyExpr,
distinct: Option<bool>,
filter: Option<PyExpr>,
order_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PySortExpr>>,
null_treatment: Option<NullTreatment>,
) -> PyResult<PyExpr> {
// If we initialize the UDAF with order_by directly, then it gets over-written by the builder
Expand All @@ -687,19 +690,20 @@ pub fn first_value(
}

// nth_value requires a non-expr argument
#[pyfunction]
#[pyo3(signature = (expr, n, distinct=None, filter=None, order_by=None, null_treatment=None))]
pub fn nth_value(
expr: PyExpr,
n: i64,
distinct: Option<bool>,
filter: Option<PyExpr>,
order_by: Option<Vec<PyExpr>>,
null_treatment: Option<NullTreatment>,
) -> PyResult<PyExpr> {
let agg_fn = datafusion::functions_aggregate::nth_value::nth_value(vec![expr.expr, lit(n)]);
add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment)
}
// #[pyfunction]
// #[pyo3(signature = (expr, n, distinct=None, filter=None, order_by=None, null_treatment=None))]
// pub fn nth_value(
// expr: PyExpr,
// n: i64,
// distinct: Option<bool>,
// filter: Option<PyExpr>,
// order_by: Option<Vec<PySortExpr>>,
// null_treatment: Option<NullTreatment>,
// ) -> PyResult<PyExpr> {
// // @todo: Commenting this function out for now as it requires some reworking
// let agg_fn = datafusion::functions_aggregate::nth_value::nth_value(vec![expr.expr, lit(n)]);
// add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment)
// }

// string_agg requires a non-expr argument
#[pyfunction]
Expand All @@ -709,7 +713,7 @@ pub fn string_agg(
delimiter: String,
distinct: Option<bool>,
filter: Option<PyExpr>,
order_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PySortExpr>>,
null_treatment: Option<NullTreatment>,
) -> PyResult<PyExpr> {
let agg_fn = datafusion::functions_aggregate::string_agg::string_agg(expr.expr, lit(delimiter));
Expand All @@ -719,7 +723,7 @@ pub fn string_agg(
fn add_builder_fns_to_window(
window_fn: Expr,
partition_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PySortExpr>>,
) -> PyResult<PyExpr> {
// Since ExprFuncBuilder::new() is private, set an empty partition and then
// override later if appropriate.
Expand Down Expand Up @@ -749,7 +753,7 @@ pub fn lead(
shift_offset: i64,
default_value: Option<ScalarValue>,
partition_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PySortExpr>>,
) -> PyResult<PyExpr> {
let window_fn = window_function::lead(arg.expr, Some(shift_offset), default_value);

Expand All @@ -763,7 +767,7 @@ pub fn lag(
shift_offset: i64,
default_value: Option<ScalarValue>,
partition_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PySortExpr>>,
) -> PyResult<PyExpr> {
let window_fn = window_function::lag(arg.expr, Some(shift_offset), default_value);

Expand All @@ -772,7 +776,7 @@ pub fn lag(

#[pyfunction]
#[pyo3(signature = (partition_by=None, order_by=None))]
pub fn rank(partition_by: Option<Vec<PyExpr>>, order_by: Option<Vec<PyExpr>>) -> PyResult<PyExpr> {
pub fn rank(partition_by: Option<Vec<PyExpr>>, order_by: Option<Vec<PySortExpr>>) -> PyResult<PyExpr> {
let window_fn = window_function::rank();

add_builder_fns_to_window(window_fn, partition_by, order_by)
Expand All @@ -782,7 +786,7 @@ pub fn rank(partition_by: Option<Vec<PyExpr>>, order_by: Option<Vec<PyExpr>>) ->
#[pyo3(signature = (partition_by=None, order_by=None))]
pub fn dense_rank(
partition_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PySortExpr>>,
) -> PyResult<PyExpr> {
let window_fn = window_function::dense_rank();

Expand All @@ -793,7 +797,7 @@ pub fn dense_rank(
#[pyo3(signature = (partition_by=None, order_by=None))]
pub fn percent_rank(
partition_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PySortExpr>>,
) -> PyResult<PyExpr> {
let window_fn = window_function::percent_rank();

Expand All @@ -804,7 +808,7 @@ pub fn percent_rank(
#[pyo3(signature = (partition_by=None, order_by=None))]
pub fn cume_dist(
partition_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PySortExpr>>,
) -> PyResult<PyExpr> {
let window_fn = window_function::cume_dist();

Expand All @@ -816,7 +820,7 @@ pub fn cume_dist(
pub fn ntile(
arg: PyExpr,
partition_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PySortExpr>>,
) -> PyResult<PyExpr> {
let window_fn = window_function::ntile(arg.into());

Expand Down Expand Up @@ -965,7 +969,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(regr_syy))?;
m.add_wrapped(wrap_pyfunction!(first_value))?;
m.add_wrapped(wrap_pyfunction!(last_value))?;
m.add_wrapped(wrap_pyfunction!(nth_value))?;
// m.add_wrapped(wrap_pyfunction!(nth_value))?;
m.add_wrapped(wrap_pyfunction!(bit_and))?;
m.add_wrapped(wrap_pyfunction!(bit_or))?;
m.add_wrapped(wrap_pyfunction!(bit_xor))?;
Expand Down