Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1769,6 +1769,11 @@
],
"sqlState" : "42000"
},
"NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING" : {
"message" : [
"Window function is not supported in <windowFunc> (as column <columnName>) on streaming DataFrames/Datasets. Structured Streaming only supports time-window aggregation using the WINDOW function. (window specification: <windowSpec>)"
]
},
"NOT_ALLOWED_IN_FROM" : {
"message" : [
"Not allowed in the FROM clause:"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow, WindowExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}

Expand Down Expand Up @@ -508,8 +509,18 @@ object UnsupportedOperationChecker extends Logging {
case Sample(_, _, _, _, child) if child.isStreaming =>
throwError("Sampling is not supported on streaming DataFrames/Datasets")

case Window(_, _, _, child) if child.isStreaming =>
throwError("Non-time-based windows are not supported on streaming DataFrames/Datasets")
case Window(windowExpression, _, _, child) if child.isStreaming =>
val (windowFuncList, columnNameList, windowSpecList) = windowExpression.flatMap { e =>
e.collect {
case we: WindowExpression =>
(we.windowFunction.toString, e.toAttribute.sql, we.windowSpec.sql)
}
}.unzip3
throw QueryExecutionErrors.nonTimeWindowNotSupportedInStreamingError(
windowFuncList,
columnNameList,
windowSpecList,
subPlan.origin)

case ReturnAnswer(child) if child.isStreaming =>
throwError("Cannot return immediate result on streaming DataFrames/Dataset. Queries " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ValueInterval
import org.apache.spark.sql.catalyst.trees.{SQLQueryContext, TreeNode}
import org.apache.spark.sql.catalyst.trees.{Origin, SQLQueryContext, TreeNode}
import org.apache.spark.sql.catalyst.util.{sideBySide, BadRecordException, DateTimeUtils, FailFastMode}
import org.apache.spark.sql.connector.catalog.{CatalogNotFoundException, Table, TableProvider}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
Expand Down Expand Up @@ -724,6 +724,20 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
messageParameters = Map("className" -> className, "operator" -> operator))
}

def nonTimeWindowNotSupportedInStreamingError(
windowFuncList: Seq[String],
columnNameList: Seq[String],
windowSpecList: Seq[String],
origin: Origin): AnalysisException = {
new AnalysisException(
errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING",
messageParameters = Map(
"windowFunc" -> windowFuncList.map(toSQLStmt(_)).mkString(","),
"columnName" -> columnNameList.map(toSQLId(_)).mkString(","),
"windowSpec" -> windowSpecList.map(toSQLStmt(_)).mkString(",")),
origin = origin)
}

def multiplePathsSpecifiedError(allPaths: Seq[String]): SparkIllegalArgumentException = {
new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_2050",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
testUnaryOperatorInStreamingPlan(
"sample", Sample(0.1, 1, true, 1L, _), expectedMsg = "sampling")
testUnaryOperatorInStreamingPlan(
"window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")
"window",
Window(Nil, Nil, Nil, _),
errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING")

// Output modes with aggregation and non-aggregation plans
testOutputMode(Append, shouldSupportAggregation = false, shouldSupportNonAggregation = true)
Expand Down Expand Up @@ -870,15 +872,17 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
operationName: String,
logicalPlanGenerator: LogicalPlan => LogicalPlan,
outputMode: OutputMode = Append,
expectedMsg: String = ""): Unit = {
expectedMsg: String = "",
errorClass: String = ""): Unit = {

val expectedMsgs = if (expectedMsg.isEmpty) Seq(operationName) else Seq(expectedMsg)

assertNotSupportedInStreamingPlan(
s"$operationName with stream relation",
wrapInStreaming(logicalPlanGenerator(streamRelation)),
outputMode,
expectedMsgs)
expectedMsgs,
errorClass)

assertSupportedInStreamingPlan(
s"$operationName with batch relation",
Expand Down Expand Up @@ -1025,10 +1029,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
name: String,
plan: LogicalPlan,
outputMode: OutputMode,
expectedMsgs: Seq[String]): Unit = {
expectedMsgs: Seq[String],
errorClass: String = ""): Unit = {
testError(
s"streaming plan - $name: not supported",
expectedMsgs :+ "streaming" :+ "DataFrame" :+ "Dataset" :+ "not supported") {
expectedMsgs :+ "streaming" :+ "DataFrame" :+ "Dataset" :+ "not supported",
errorClass) {
UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode)
}
}
Expand Down Expand Up @@ -1090,7 +1096,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
* Test whether the body of code will fail. If it does fail, then check if it has expected
* messages.
*/
def testError(testName: String, expectedMsgs: Seq[String])(testBody: => Unit): Unit = {
def testError(
testName: String,
expectedMsgs: Seq[String],
errorClass: String = "")(testBody: => Unit): Unit = {

test(testName) {
val e = intercept[AnalysisException] {
Expand All @@ -1102,6 +1111,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
s"actual exception message:\n\t'${e.getMessage}'")
}
}
if (!errorClass.isEmpty) {
assert(e.getErrorClass == errorClass)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.{ContinuousMemoryStream, MemorySink}
import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.StreamSourceProvider
Expand Down Expand Up @@ -686,6 +687,33 @@ class StreamSuite extends StreamTest {
assert(query.exception.isEmpty)
}

test("SPARK-44044: non-time-window") {
val inputData = MemoryStream[(Int, Int)]
val e = intercept[AnalysisException] {
val agg = inputData
.toDF()
.selectExpr("CAST(_1 AS timestamp) AS col1", "_2 AS col2")
.withWatermark("col1", "10 seconds")
.withColumn("rn_col", row_number().over(Window
.partitionBy("col1")
.orderBy(col("col2"))))
.select("rn_col", "col1", "col2")
.writeStream
.format("console")
.start()
}
checkError(
e,
"NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING",
parameters = Map(
"windowFunc" -> "ROW_NUMBER()",
"columnName" -> "`rn_col`",
"windowSpec" ->
("(PARTITION BY COL1 ORDER BY COL2 ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING " +
"AND CURRENT ROW)")))
}


test("SPARK-19873: streaming aggregation with change in number of partitions") {
val inputData = MemoryStream[(Int, Int)]
val agg = inputData.toDS().groupBy("_1").count()
Expand Down