diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index abe88db12672..8c94a2c66c96 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1769,6 +1769,11 @@ ], "sqlState" : "42000" }, + "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING" : { + "message" : [ + "Window function is not supported in (as column ) on streaming DataFrames/Datasets. Structured Streaming only supports time-window aggregation using the WINDOW function. (window specification: )" + ] + }, "NOT_ALLOWED_IN_FROM" : { "message" : [ "Not allowed in the FROM clause:" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index daa7c0d54b77..2a09d85d8f26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow, WindowExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode} @@ -508,8 +509,18 @@ object UnsupportedOperationChecker extends Logging { case Sample(_, _, _, _, child) if child.isStreaming => throwError("Sampling is not supported on streaming DataFrames/Datasets") - case Window(_, _, _, child) if child.isStreaming => - throwError("Non-time-based windows are not supported on streaming DataFrames/Datasets") + case Window(windowExpression, _, _, child) if child.isStreaming => + val (windowFuncList, columnNameList, windowSpecList) = windowExpression.flatMap { e => + e.collect { + case we: WindowExpression => + (we.windowFunction.toString, e.toAttribute.sql, we.windowSpec.sql) + } + }.unzip3 + throw QueryExecutionErrors.nonTimeWindowNotSupportedInStreamingError( + windowFuncList, + columnNameList, + windowSpecList, + subPlan.origin) case ReturnAnswer(child) if child.isStreaming => throwError("Cannot return immediate result on streaming DataFrames/Dataset. Queries " + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 59b66bd4343e..74c29cabbe18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ValueInterval -import org.apache.spark.sql.catalyst.trees.{SQLQueryContext, TreeNode} +import org.apache.spark.sql.catalyst.trees.{Origin, SQLQueryContext, TreeNode} import org.apache.spark.sql.catalyst.util.{sideBySide, BadRecordException, DateTimeUtils, FailFastMode} import org.apache.spark.sql.connector.catalog.{CatalogNotFoundException, Table, TableProvider} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -724,6 +724,20 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { messageParameters = Map("className" -> className, "operator" -> operator)) } + def nonTimeWindowNotSupportedInStreamingError( + windowFuncList: Seq[String], + columnNameList: Seq[String], + windowSpecList: Seq[String], + origin: Origin): AnalysisException = { + new AnalysisException( + errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING", + messageParameters = Map( + "windowFunc" -> windowFuncList.map(toSQLStmt(_)).mkString(","), + "columnName" -> columnNameList.map(toSQLId(_)).mkString(","), + "windowSpec" -> windowSpecList.map(toSQLStmt(_)).mkString(",")), + origin = origin) + } + def multiplePathsSpecifiedError(allPaths: Seq[String]): SparkIllegalArgumentException = { new SparkIllegalArgumentException( errorClass = "_LEGACY_ERROR_TEMP_2050", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index f9fd02b86e90..32c9a3aa17e6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -738,7 +738,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { testUnaryOperatorInStreamingPlan( "sample", Sample(0.1, 1, true, 1L, _), expectedMsg = "sampling") testUnaryOperatorInStreamingPlan( - "window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows") + "window", + Window(Nil, Nil, Nil, _), + errorClass = "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING") // Output modes with aggregation and non-aggregation plans testOutputMode(Append, shouldSupportAggregation = false, shouldSupportNonAggregation = true) @@ -870,7 +872,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { operationName: String, logicalPlanGenerator: LogicalPlan => LogicalPlan, outputMode: OutputMode = Append, - expectedMsg: String = ""): Unit = { + expectedMsg: String = "", + errorClass: String = ""): Unit = { val expectedMsgs = if (expectedMsg.isEmpty) Seq(operationName) else Seq(expectedMsg) @@ -878,7 +881,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { s"$operationName with stream relation", wrapInStreaming(logicalPlanGenerator(streamRelation)), outputMode, - expectedMsgs) + expectedMsgs, + errorClass) assertSupportedInStreamingPlan( s"$operationName with batch relation", @@ -1025,10 +1029,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { name: String, plan: LogicalPlan, outputMode: OutputMode, - expectedMsgs: Seq[String]): Unit = { + expectedMsgs: Seq[String], + errorClass: String = ""): Unit = { testError( s"streaming plan - $name: not supported", - expectedMsgs :+ "streaming" :+ "DataFrame" :+ "Dataset" :+ "not supported") { + expectedMsgs :+ "streaming" :+ "DataFrame" :+ "Dataset" :+ "not supported", + errorClass) { UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode) } } @@ -1090,7 +1096,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { * Test whether the body of code will fail. If it does fail, then check if it has expected * messages. */ - def testError(testName: String, expectedMsgs: Seq[String])(testBody: => Unit): Unit = { + def testError( + testName: String, + expectedMsgs: Seq[String], + errorClass: String = "")(testBody: => Unit): Unit = { test(testName) { val e = intercept[AnalysisException] { @@ -1102,6 +1111,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { s"actual exception message:\n\t'${e.getMessage}'") } } + if (!errorClass.isEmpty) { + assert(e.getErrorClass == errorClass) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 6fd63454e82e..0ee44a098f7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.{ContinuousMemoryStream, MemorySink} import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider} +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.StreamSourceProvider @@ -686,6 +687,33 @@ class StreamSuite extends StreamTest { assert(query.exception.isEmpty) } + test("SPARK-44044: non-time-window") { + val inputData = MemoryStream[(Int, Int)] + val e = intercept[AnalysisException] { + val agg = inputData + .toDF() + .selectExpr("CAST(_1 AS timestamp) AS col1", "_2 AS col2") + .withWatermark("col1", "10 seconds") + .withColumn("rn_col", row_number().over(Window + .partitionBy("col1") + .orderBy(col("col2")))) + .select("rn_col", "col1", "col2") + .writeStream + .format("console") + .start() + } + checkError( + e, + "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING", + parameters = Map( + "windowFunc" -> "ROW_NUMBER()", + "columnName" -> "`rn_col`", + "windowSpec" -> + ("(PARTITION BY COL1 ORDER BY COL2 ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING " + + "AND CURRENT ROW)"))) + } + + test("SPARK-19873: streaming aggregation with change in number of partitions") { val inputData = MemoryStream[(Int, Int)] val agg = inputData.toDS().groupBy("_1").count()