Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
993f6a4
window function bug fix
xin-aurora Apr 10, 2025
b9177ec
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 10, 2025
b147e77
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 15, 2025
997cc49
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 15, 2025
e8fe757
update comments
xin-aurora Apr 15, 2025
bfcf962
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 16, 2025
df1d849
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 17, 2025
2cc882c
Handle literal and non-literal defaults separately
xin-aurora Apr 17, 2025
20d46cb
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 17, 2025
10951fa
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 17, 2025
f193945
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 17, 2025
f4dbd85
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 18, 2025
0ddb79d
remove duplicate val
xin-aurora Apr 18, 2025
855f955
fix early evaluation of onlyLiteralNulls
xin-aurora Apr 18, 2025
9243bb5
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 23, 2025
ee28495
update onlyLiteralNulls
xin-aurora Apr 23, 2025
a4aad86
test merge
xin-aurora Apr 23, 2025
fc6d271
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 24, 2025
4d36ca5
fix val onlyLiterals for any foldable expression
xin-aurora Apr 24, 2025
6ad5f40
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 24, 2025
8994beb
update onlyLiterals
xin-aurora Apr 24, 2025
e99ddda
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 24, 2025
469baa9
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 25, 2025
650cc1b
Merge branch 'apache:master' into windowFuncFix
xin-aurora Apr 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,21 @@ abstract class OffsetWindowFunctionFrameBase(
}
}

/** Indicates whether the default values are Literal. */
protected lazy val onlyLiterals = expressions.forall { e =>
e.default == null || e.default.foldable
}

override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
resetStates(rows)
if (absOffset > rows.length) {
fillDefaultValue(EmptyRow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the code is not needed, just remove them. and add some comments to explain the reason

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the only place? Seems we should never run fillDefaultValue in prepare as the default value can be an expression that references attributes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems all the existing tests just cover the default value as Literal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think it's not related to the partition size.
The cause is apply expression on empty row.
We can check the default expression and apply fillDefaultValue(currentRow) in write if it is not a Literal, or apply it is in prepare if it is a literal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the code is not needed, just remove them. and add some comments to explain the reason

Edit the comments to include the reason

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the only place? Seems we should never run fillDefaultValue in prepare as the default value can be an expression that references attributes.

I found this is the only place to run fillDefaultValue in prepare. I updated the write in FrameLessOffsetWindowFunctionFrame to call fillDefaultValue as well.
I noticed that the write in UnboundedPrecedingOffsetWindowFunctionFrame also relies on prepare to handle cases where offset exceeds the window group size. However, I haven't found a query that triggers this method. It's possible that write in UnboundedPrecedingOffsetWindowFunctionFrame also needs to be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think it's not related to the partition size. The cause is apply expression on empty row. We can check the default expression and apply fillDefaultValue(currentRow) in write if it is not a Literal, or apply it is in prepare if it is a literal.

Thank you for the suggestions! We had a similar idea for fixing this issue. I've updated write in FrameLessOffsetWindowFunctionFrame to call fillDefaultValue(currentRow).

// Avoid evaluating non-literal defaults with EmptyRow,
// which causes NullPointerException.
// Check whether defaults are Literal.
if (onlyLiterals) {
fillDefaultValue(EmptyRow)
}
// Handle non-literal defaults in write().
} else {
if (ignoreNulls) {
prepareForIgnoreNulls()
Expand Down Expand Up @@ -312,7 +323,12 @@ class FrameLessOffsetWindowFunctionFrame(

override def write(index: Int, current: InternalRow): Unit = {
if (absOffset > input.length) {
// Already use default values in prepare.
if (!onlyLiterals) {
// Handle non-literal defaults, e.g., column references
// Use default values since the offset row does not exist.
fillDefaultValue(current)
}
// Literal default values were already evaluated in prepare().
} else {
doWrite(current)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,37 @@ class DataFrameWindowFramesSuite extends QueryTest with SharedSparkSession {
Row(2, default, default, default, default) :: Nil)
}

test("lead/lag with column reference as default when offset exceeds window group size") {
val df = spark.range(0, 10, 1, 1).toDF("id")
val window = Window.partitionBy(expr("div(id, 2)")).orderBy($"id")

val result = df.select(
$"id",
lead($"id", 1, $"id").over(window).as("lead_1"),
lead($"id", 3, $"id").over(window).as("lead_3"),
lag($"id", 1, $"id").over(window).as("lag_1"),
lag($"id", 3, $"id").over(window).as("lag_3")
).orderBy("id")

// check the output in one table
// col0: id, col1: lead_1 result, col2: lead_3 result,
// col3: lag_1 result, col4: lag_3 result
val expected = Seq(
Row(0, 1, 0, 0, 0),
Row(1, 1, 1, 0, 1),
Row(2, 3, 2, 2, 2),
Row(3, 3, 3, 2, 3),
Row(4, 5, 4, 4, 4),
Row(5, 5, 5, 4, 5),
Row(6, 7, 6, 6, 6),
Row(7, 7, 7, 6, 7),
Row(8, 9, 8, 8, 8),
Row(9, 9, 9, 8, 9)
)

checkAnswer(result, expected)
}

test("rows/range between with empty data frame") {
val df = Seq.empty[(String, Int)].toDF("key", "value")
val window = Window.partitionBy($"key").orderBy($"value")
Expand Down