Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Readability improvements, use Internal for unreachable and/or bug-ind…
…icating errors
  • Loading branch information
ozankabak committed Sep 28, 2022
commit c582e0113cc4958eb4acf1b531f8eb6d772bd2cf
6 changes: 3 additions & 3 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ macro_rules! impl_common_cases_op {
$RHS,
$OPERATION,
[
// FloatXY coerces everything to fXY:
// Float64 coerces everything to f64:
[Float64, Float32, Float64, f64],
[Float64, Int64, Float64, f64],
[Float64, Int32, Float64, f64],
Expand All @@ -430,11 +430,11 @@ macro_rules! impl_common_cases_op {
[Float64, UInt32, Float64, f64],
[Float64, UInt16, Float64, f64],
[Float64, UInt8, Float64, f64],
// UIntXY coerces all smaller unsigned types to uXY:
// UInt64 coerces all smaller unsigned types to u64:
[UInt64, UInt32, UInt64, u64],
[UInt64, UInt16, UInt64, u64],
[UInt64, UInt8, UInt64, u64],
// IntXY coerces all smaller integral types to iXY:
// Int64 coerces all smaller integral types to i64:
[Int64, Int32, Int64, i64],
[Int64, Int16, Int64, i64],
[Int64, Int8, Int64, i64],
Expand Down
95 changes: 43 additions & 52 deletions datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,26 +155,24 @@ fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
} else {
let is_descending: bool = sort_options
.first()
.ok_or_else(|| DataFusionError::Execution("Array is empty".to_string()))?
.ok_or_else(|| DataFusionError::Internal("Array is empty".to_string()))?
.descending;

current_row_values
.iter()
.map(|value| {
Ok(if value.is_null() {
value.clone()
if value.is_null() {
return Ok(value.clone());
};
let offset = ScalarValue::try_from_value(&value.get_datatype(), delta)?;
if SEARCH_SIDE == is_descending {
// TODO: ADD overflow check
value.add(&offset)
} else if value.is_unsigned() && value < &offset {
ScalarValue::try_from_value(&value.get_datatype(), 0)
} else {
let offset =
ScalarValue::try_from_value(&value.get_datatype(), delta)?;
if SEARCH_SIDE == is_descending {
// TODO: ADD overflow check
value.add(&offset)?
} else if value.is_unsigned() && value < &offset {
ScalarValue::try_from_value(&value.get_datatype(), 0)?
} else {
value.sub(&offset)?
}
})
value.sub(&offset)
}
})
.collect::<Result<Vec<ScalarValue>>>()?
};
Expand All @@ -188,7 +186,7 @@ fn calculate_current_window(
window_frame: &WindowFrame,
range_columns: &[ArrayRef],
sort_options: &[SortOptions],
len: usize,
length: usize,
idx: usize,
) -> Result<(usize, usize)> {
match window_frame.units {
Expand Down Expand Up @@ -220,7 +218,7 @@ fn calculate_current_window(
}
// UNBOUNDED FOLLOWING
WindowFrameBound::Following(None) => {
Err(DataFusionError::Execution(format!(
Err(DataFusionError::Internal(format!(
"Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
window_frame
)))
Expand All @@ -229,7 +227,7 @@ fn calculate_current_window(
let end = match window_frame.end_bound {
// UNBOUNDED PRECEDING
WindowFrameBound::Preceding(None) => {
Err(DataFusionError::Execution(format!(
Err(DataFusionError::Internal(format!(
"Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
window_frame
)))
Expand Down Expand Up @@ -257,7 +255,7 @@ fn calculate_current_window(
)
}
// UNBOUNDED FOLLOWING
WindowFrameBound::Following(None) => Ok(len),
WindowFrameBound::Following(None) => Ok(length),
};
Ok((start?, end?))
}
Expand All @@ -273,10 +271,10 @@ fn calculate_current_window(
}
}
WindowFrameBound::CurrentRow => Ok(idx),
WindowFrameBound::Following(Some(n)) => Ok(min(idx + n as usize, len)),
WindowFrameBound::Following(Some(n)) => Ok(min(idx + n as usize, length)),
// UNBOUNDED FOLLOWING
WindowFrameBound::Following(None) => {
Err(DataFusionError::Execution(format!(
Err(DataFusionError::Internal(format!(
"Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
window_frame
)))
Expand All @@ -285,7 +283,7 @@ fn calculate_current_window(
let end = match window_frame.end_bound {
// UNBOUNDED PRECEDING
WindowFrameBound::Preceding(None) => {
Err(DataFusionError::Execution(format!(
Err(DataFusionError::Internal(format!(
"Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
window_frame
)))
Expand All @@ -299,10 +297,10 @@ fn calculate_current_window(
}
WindowFrameBound::CurrentRow => Ok(idx + 1),
WindowFrameBound::Following(Some(n)) => {
Ok(min(idx + n as usize + 1, len))
Ok(min(idx + n as usize + 1, length))
}
// UNBOUNDED FOLLOWING
WindowFrameBound::Following(None) => Ok(len),
WindowFrameBound::Following(None) => Ok(length),
};
Ok((start?, end?))
}
Expand Down Expand Up @@ -338,11 +336,11 @@ impl AggregateWindowAccumulator {
fn calculate_whole_table(
&mut self,
value_slice: &[ArrayRef],
len: usize,
length: usize,
) -> Result<ArrayRef> {
self.accumulator.update_batch(value_slice)?;
let value = self.accumulator.evaluate()?;
Ok(value.to_array_of_size(len))
Ok(value.to_array_of_size(length))
}

/// This function calculates the running window logic for the rows in `value_range` of `value_slice`.
Expand All @@ -356,25 +354,25 @@ impl AggregateWindowAccumulator {
) -> Result<ArrayRef> {
// We iterate on each row to perform a running calculation.
// First, cur_range is calculated, then it is compared with last_range.
let len = value_range.end - value_range.start;
let length = value_range.end - value_range.start;
let slice_order_columns = order_bys
.iter()
.map(|v| v.slice(value_range.start, value_range.end - value_range.start))
.map(|v| v.slice(value_range.start, length))
.collect::<Vec<_>>();
let sort_options: Vec<SortOptions> =
self.order_by.iter().map(|o| o.options).collect();

let updated_zero_offset_value_range = Range {
start: 0,
end: value_range.end - value_range.start,
end: length,
};
let mut row_wise_results: Vec<ScalarValue> = vec![];
let mut last_range: (usize, usize) = (
updated_zero_offset_value_range.start,
updated_zero_offset_value_range.start,
);

for i in 0..len {
for i in 0..length {
let window_frame = self.window_frame.as_ref().ok_or_else(|| {
DataFusionError::Internal(
"Window frame cannot be empty to calculate window ranges".to_string(),
Expand All @@ -384,7 +382,7 @@ impl AggregateWindowAccumulator {
window_frame,
&slice_order_columns,
&sort_options,
len,
length,
i,
)?;

Expand Down Expand Up @@ -428,38 +426,33 @@ impl AggregateWindowAccumulator {
"Value range cannot be empty".to_owned(),
));
}
let len = value_range.end - value_range.start;
let length = value_range.end - value_range.start;
let value_slice = values
.iter()
.map(|v| v.slice(value_range.start, len))
.map(|v| v.slice(value_range.start, length))
.collect::<Vec<_>>();
let wanted_order_columns =
&order_bys[self.partition_by.len()..order_bys.len()].to_vec();
match (wanted_order_columns.len(), self.window_frame) {
let order_columns = &order_bys[self.partition_by.len()..order_bys.len()].to_vec();
match (order_columns.len(), self.window_frame) {
(0, None) => {
// OVER() case
self.calculate_whole_table(&value_slice, len)
// OVER () case
self.calculate_whole_table(&value_slice, length)
}
(_n, None) => {
// OVER(ORDER BY a) case
// OVER (ORDER BY a) case
// We create an implicit window for ORDER BY.
self.window_frame =
Some(AggregateWindowAccumulator::implicit_order_by_window());

self.calculate_running_window(
&value_slice,
wanted_order_columns,
value_range,
)
self.calculate_running_window(&value_slice, order_columns, value_range)
}
(0, Some(frame)) => {
match frame.units {
WindowFrameUnits::Range => {
// OVER(RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW )
self.calculate_whole_table(&value_slice, len)
// OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) case
self.calculate_whole_table(&value_slice, length)
}
WindowFrameUnits::Rows => {
// OVER(ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING )
// OVER (ROWS BETWEEN X PRECEDING AND Y FOLLOWING) case
self.calculate_running_window(
&value_slice,
order_bys,
Expand All @@ -471,12 +464,10 @@ impl AggregateWindowAccumulator {
)),
}
}
// OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING )
(_n, _) => self.calculate_running_window(
&value_slice,
wanted_order_columns,
value_range,
),
// OVER (ORDER BY a ROWS/RANGE BETWEEN X PRECEDING AND Y FOLLOWING) case
(_n, _) => {
self.calculate_running_window(&value_slice, order_columns, value_range)
}
}
}
}