From b7cb0d4468c700b226bd9b47e270dd6a56c5ef3b Mon Sep 17 00:00:00 2001 From: Wei Guo Date: Sat, 15 Jun 2024 02:34:34 +0800 Subject: [PATCH 1/5] rename error class name --- .../resources/error/error-conditions.json | 32 ++++++++-------- .../spark/sql/catalyst/plans/joinTypes.scala | 37 +++++++++++-------- .../sql/errors/QueryCompilationErrors.scala | 2 +- .../sql/catalyst/plans/JoinTypesTest.scala | 15 ++++++++ .../spark/sql/DataFrameAsOfJoinSuite.scala | 19 ++++++++++ .../org/apache/spark/sql/DatasetSuite.scala | 33 +++++++---------- 6 files changed, 87 insertions(+), 51 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 35dfa7a6c349..734758821f5c 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -90,6 +90,11 @@ "message" : [ "The input argument `tolerance` must be a constant." ] + }, + "UNSUPPORTED_DIRECTION" : { + "message" : [ + "Unsupported as-of join direction ''. Supported as-of join direction include: ." + ] } }, "sqlState" : "42604" @@ -2358,6 +2363,12 @@ }, "sqlState" : "42K0K" }, + "INVALID_JOIN_TYPE_FOR_JOINWITH" : { + "message" : [ + "Invalid join type in joinWith: ." + ], + "sqlState" : "42613" + }, "INVALID_JSON_DATA_TYPE" : { "message" : [ "Failed to convert the JSON string '' to a data type. Please enter a valid data type." @@ -4653,6 +4664,12 @@ ], "sqlState" : "42K0E" }, + "UNSUPPORTED_JOIN_TYPE" : { + "message" : [ + "Unsupported join type ''. Supported join types include: ." + ], + "sqlState" : "0A000" + }, "UNSUPPORTED_INSERT" : { "message" : [ "Can't insert into the target." @@ -6162,11 +6179,6 @@ "Invalid partition transformation: ." ] }, - "_LEGACY_ERROR_TEMP_1319" : { - "message" : [ - "Invalid join type in joinWith: ." - ] - }, "_LEGACY_ERROR_TEMP_1320" : { "message" : [ "Typed column that needs input type and schema cannot be passed in untyped `select` API. Use the typed `Dataset.select` API instead." @@ -8146,16 +8158,6 @@ "Expected a Boolean type expression in replaceNullWithFalse, but got the type in ." ] }, - "_LEGACY_ERROR_TEMP_3216" : { - "message" : [ - "Unsupported join type ''. Supported join types include: ." - ] - }, - "_LEGACY_ERROR_TEMP_3217" : { - "message" : [ - "Unsupported as-of join direction ''. Supported as-of join direction include: ." - ] - }, "_LEGACY_ERROR_TEMP_3218" : { "message" : [ "Must be 2 children: " diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala index f123258683ec..d9da255eccc9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala @@ -19,10 +19,22 @@ package org.apache.spark.sql.catalyst.plans import java.util.Locale -import org.apache.spark.{SparkIllegalArgumentException, SparkUnsupportedOperationException} +import org.apache.spark.SparkUnsupportedOperationException +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.Attribute object JoinType { + + val supported = Seq( + "inner", + "outer", "full", "fullouter", "full_outer", + "leftouter", "left", "left_outer", + "rightouter", "right", "right_outer", + "leftsemi", "left_semi", "semi", + "leftanti", "left_anti", "anti", + "cross" + ) + def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match { case "inner" => Inner case "outer" | "full" | "fullouter" => FullOuter @@ -32,20 +44,12 @@ object JoinType { case "leftanti" | "anti" => LeftAnti case "cross" => Cross case _ => - val supported = Seq( - "inner", - "outer", "full", "fullouter", "full_outer", - "leftouter", "left", "left_outer", - "rightouter", "right", "right_outer", - "leftsemi", "left_semi", "semi", - "leftanti", "left_anti", "anti", - "cross") - - throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3216", + throw new AnalysisException( + errorClass = "UNSUPPORTED_JOIN_TYPE", messageParameters = Map( "typ" -> typ, - "supported" -> supported.mkString("'", "', '", "'"))) + "supported" -> supported.mkString("'", "', '", "'")) + ) } } @@ -129,15 +133,16 @@ object LeftSemiOrAnti { object AsOfJoinDirection { + val supported = Seq("forward", "backward", "nearest") + def apply(direction: String): AsOfJoinDirection = { direction.toLowerCase(Locale.ROOT) match { case "forward" => Forward case "backward" => Backward case "nearest" => Nearest case _ => - val supported = Seq("forward", "backward", "nearest") - throw new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_3217", + throw new AnalysisException( + errorClass = "AS_OF_JOIN.UNSUPPORTED_DIRECTION", messageParameters = Map( "direction" -> direction, "supported" -> supported.mkString("'", "', '", "'"))) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 7b9eb2020a5f..f55759820b52 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3316,7 +3316,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat def invalidJoinTypeInJoinWithError(joinType: JoinType): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1319", + errorClass = "INVALID_JOIN_TYPE_FOR_JOINWITH", messageParameters = Map("joinType" -> joinType.sql)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/JoinTypesTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/JoinTypesTest.scala index 43221bf60ca3..886b043ad79e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/JoinTypesTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/JoinTypesTest.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException class JoinTypesTest extends SparkFunSuite { @@ -61,4 +62,18 @@ class JoinTypesTest extends SparkFunSuite { assert(JoinType("cross") === Cross) } + test("unsupported join type") { + val joinType = "unknown" + checkError( + exception = intercept[AnalysisException]( + JoinType(joinType) + ), + errorClass = "UNSUPPORTED_JOIN_TYPE", + sqlState = "0A000", + parameters = Map( + "typ" -> joinType, + "supported" -> JoinType.supported.mkString("'", "', '", "'") + ) + ) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAsOfJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAsOfJoinSuite.scala index 280eb095dc75..a03f08312355 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAsOfJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAsOfJoinSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import scala.jdk.CollectionConverters._ +import org.apache.spark.sql.catalyst.plans.AsOfJoinDirection import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSparkSession @@ -123,6 +124,24 @@ class DataFrameAsOfJoinSuite extends QueryTest parameters = Map.empty) } + test("as-of join - unsupported direction") { + val (df1, df2) = prepareForAsOfJoin() + val direction = "unknown" + checkError( + exception = intercept[AnalysisException] { + df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq.empty, + joinType = "inner", tolerance = lit(-1), allowExactMatches = true, + direction = direction) + }, + errorClass = "AS_OF_JOIN.UNSUPPORTED_DIRECTION", + sqlState = "42604", + parameters = Map( + "direction" -> direction, + "supported" -> AsOfJoinDirection.supported.mkString("'", "', '", "'") + ) + ) + } + test("as-of join - allowExactMatches = false") { val (df1, df2) = prepareForAsOfJoin() checkAnswer( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 97546ba2dac9..b939ed40c7db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.{FooClassWithEnum, FooEnum, ScroogeLikeExam import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoders, ExpressionEncoder, OuterScopes} import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.BoxedIntEncoder import org.apache.spark.sql.catalyst.expressions.{CodegenObjectFactoryMode, GenericRowWithSchema} -import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} +import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.trees.DataFrameQueryContext import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SQLExecution} @@ -542,25 +542,20 @@ class DatasetSuite extends QueryTest val ds1 = Seq(1, 2, 3).toDS().as("a") val ds2 = Seq(1, 2).toDS().as("b") - val e1 = intercept[AnalysisException] { - ds1.joinWith(ds2, $"a.value" === $"b.value", "left_semi") - }.getMessage - assert(e1.contains("Invalid join type in joinWith: " + LeftSemi.sql)) - - val e2 = intercept[AnalysisException] { - ds1.joinWith(ds2, $"a.value" === $"b.value", "semi") - }.getMessage - assert(e2.contains("Invalid join type in joinWith: " + LeftSemi.sql)) - - val e3 = intercept[AnalysisException] { - ds1.joinWith(ds2, $"a.value" === $"b.value", "left_anti") - }.getMessage - assert(e3.contains("Invalid join type in joinWith: " + LeftAnti.sql)) + def checkJoinWithJoinType(joinType: String): Unit = { + val semiErrorParameters = Map("joinType" -> JoinType(joinType).sql) + checkError( + exception = intercept[AnalysisException]( + ds1.joinWith(ds2, $"a.value" === $"b.value", joinType) + ), + errorClass = "INVALID_JOIN_TYPE_FOR_JOINWITH", + sqlState = "42613", + parameters = semiErrorParameters + ) + } - val e4 = intercept[AnalysisException] { - ds1.joinWith(ds2, $"a.value" === $"b.value", "anti") - }.getMessage - assert(e4.contains("Invalid join type in joinWith: " + LeftAnti.sql)) + Seq("leftsemi", "left_semi", "semi", "leftanti", "left_anti", "anti") + .foreach(checkJoinWithJoinType(_)) } test("groupBy function, keys") { From 6f44c667ca63accbc7951b16c6ee56732c0cda4e Mon Sep 17 00:00:00 2001 From: Wei Guo Date: Sat, 15 Jun 2024 02:42:20 +0800 Subject: [PATCH 2/5] update README --- common/utils/src/main/resources/error/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/common/utils/src/main/resources/error/README.md b/common/utils/src/main/resources/error/README.md index adb631ccdca7..575e2ebad35a 100644 --- a/common/utils/src/main/resources/error/README.md +++ b/common/utils/src/main/resources/error/README.md @@ -34,6 +34,7 @@ The terms error class, state, and condition come from the SQL standard. * Error condition: `AS_OF_JOIN` * Error sub-condition: `TOLERANCE_IS_NON_NEGATIVE` * Error sub-condition: `TOLERANCE_IS_UNFOLDABLE` + * Error sub-condition: `UNSUPPORTED_DIRECTION` ### Inconsistent Use of the Term "Error Class" From 249999e8252508a98134d4dad636374d3099f61c Mon Sep 17 00:00:00 2001 From: Wei Guo Date: Sat, 15 Jun 2024 04:19:03 +0800 Subject: [PATCH 3/5] fix --- .../src/main/resources/error/error-conditions.json | 12 ++++++------ python/pyspark/sql/tests/test_dataframe.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 734758821f5c..77bb484f2884 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -4664,12 +4664,6 @@ ], "sqlState" : "42K0E" }, - "UNSUPPORTED_JOIN_TYPE" : { - "message" : [ - "Unsupported join type ''. Supported join types include: ." - ], - "sqlState" : "0A000" - }, "UNSUPPORTED_INSERT" : { "message" : [ "Can't insert into the target." @@ -4703,6 +4697,12 @@ }, "sqlState" : "42809" }, + "UNSUPPORTED_JOIN_TYPE" : { + "message" : [ + "Unsupported join type ''. Supported join types include: ." + ], + "sqlState" : "0A000" + }, "UNSUPPORTED_MERGE_CONDITION" : { "message" : [ "MERGE operation contains unsupported condition." diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 36a856b62719..d7b31bbc5215 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -529,7 +529,7 @@ def test_join_without_on(self): def test_invalid_join_method(self): df1 = self.spark.createDataFrame([("Alice", 5), ("Bob", 8)], ["name", "age"]) df2 = self.spark.createDataFrame([("Alice", 80), ("Bob", 90)], ["name", "height"]) - self.assertRaises(IllegalArgumentException, lambda: df1.join(df2, how="invalid-join-type")) + self.assertRaises(AnalysisException, lambda: df1.join(df2, how="invalid-join-type")) # Cartesian products require cross join syntax def test_require_cross(self): From b573ddf0dd18118569666e3c5b08d03c1f1c2b34 Mon Sep 17 00:00:00 2001 From: Wei Guo Date: Sat, 15 Jun 2024 21:38:18 +0800 Subject: [PATCH 4/5] update --- python/pyspark/sql/connect/plan.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index b5a71c4d2536..a75dcda22924 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -887,7 +887,7 @@ def __init__( elif how == "cross": join_type = proto.Join.JoinType.JOIN_TYPE_CROSS else: - raise IllegalArgumentException( + raise AnalysisException( error_class="UNSUPPORTED_JOIN_TYPE", message_parameters={"join_type": how}, ) From ceceff5a6e9e7a49d7c2c91b9fa6e4addd87602d Mon Sep 17 00:00:00 2001 From: Wei Guo Date: Sat, 15 Jun 2024 22:07:50 +0800 Subject: [PATCH 5/5] update --- python/pyspark/sql/connect/plan.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index a75dcda22924..19377515ed28 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -55,9 +55,9 @@ from pyspark.sql.connect.expressions import Expression from pyspark.sql.connect.types import pyspark_types_to_proto_types, UnparsedDataType from pyspark.errors import ( + AnalysisException, PySparkValueError, PySparkPicklingError, - IllegalArgumentException, ) if TYPE_CHECKING: