Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Include window spec
  • Loading branch information
siying committed Jun 30, 2023
commit ad9c2cecd4233e8ab30041fb5bbfbc99a027b98d
Original file line number Diff line number Diff line change
Expand Up @@ -509,19 +509,25 @@ object UnsupportedOperationChecker extends Logging {
throwError("Sampling is not supported on streaming DataFrames/Datasets")

case Window(_, _, _, child) if child.isStreaming =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case w @ Window(_, _, _, child) if child.isStreaming =>

w would be same as subPlan.asInstanceOf[Window]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it doesn't even need to... we can just extract the value out from the pattern.

case Window(windowExpressions, _, _, child) if child.isStreaming =>

and then remove L512 and use windowExpressions directly.

val windowFuncs = subPlan.asInstanceOf[Window].windowExpressions.flatMap { e =>
val windowExpression = subPlan.asInstanceOf[Window].windowExpressions
val windowFuncs = windowExpression.flatMap { e =>
e.collect {
case we: WindowExpression => s"${we.windowFunction} as column ${e.toAttribute.sql}"
case we: WindowExpression =>
s"'${we.windowFunction}' as column '${e.toAttribute.sql}'"
}
}.mkString(", ")
val windowSpec = windowExpression.flatMap { e =>
e.collect {
case we: WindowExpression => we.windowSpec.sql
}
}.mkString(", ")
throw new AnalysisException(
s"Unsupported window function in '$windowFuncs'. Structured " +
"Streaming only supports time-window aggregation using the `window` function.",
s"Unsupported window function in $windowFuncs. Structured " +
"Streaming only supports time-window aggregation using the `window` function. " +
s"(window specification: '$windowSpec')",
subPlan.origin.line,
subPlan.origin.startPosition)



case ReturnAnswer(child) if child.isStreaming =>
throwError("Cannot return immediate result on streaming DataFrames/Dataset. Queries " +
"with streaming DataFrames/Datasets must be executed with writeStream.start().")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ class StreamSuite extends StreamTest {
.start()
}
assert(e.getMessage.contains(
"Unsupported window function found in column 'row_number() AS rn_col'"))
"Unsupported window function in 'row_number()' as column 'rn_col'"))
}


Expand Down