Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/utils/src/main/resources/error/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The terms error class, state, and condition come from the SQL standard.
* Error condition: `AS_OF_JOIN`
* Error sub-condition: `TOLERANCE_IS_NON_NEGATIVE`
* Error sub-condition: `TOLERANCE_IS_UNFOLDABLE`
* Error sub-condition: `UNSUPPORTED_DIRECTION`

### Inconsistent Use of the Term "Error Class"

Expand Down
32 changes: 17 additions & 15 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@
"message" : [
"The input argument `tolerance` must be a constant."
]
},
"UNSUPPORTED_DIRECTION" : {
"message" : [
"Unsupported as-of join direction '<direction>'. Supported as-of join direction include: <supported>."
]
}
},
"sqlState" : "42604"
Expand Down Expand Up @@ -2358,6 +2363,12 @@
},
"sqlState" : "42K0K"
},
"INVALID_JOIN_TYPE_FOR_JOINWITH" : {
"message" : [
"Invalid join type in joinWith: <joinType>."
],
"sqlState" : "42613"
},
"INVALID_JSON_DATA_TYPE" : {
"message" : [
"Failed to convert the JSON string '<invalidType>' to a data type. Please enter a valid data type."
Expand Down Expand Up @@ -4686,6 +4697,12 @@
},
"sqlState" : "42809"
},
"UNSUPPORTED_JOIN_TYPE" : {
"message" : [
"Unsupported join type '<typ>'. Supported join types include: <supported>."
],
"sqlState" : "0A000"
},
"UNSUPPORTED_MERGE_CONDITION" : {
"message" : [
"MERGE operation contains unsupported <condName> condition."
Expand Down Expand Up @@ -6162,11 +6179,6 @@
"Invalid partition transformation: <expr>."
]
},
"_LEGACY_ERROR_TEMP_1319" : {
"message" : [
"Invalid join type in joinWith: <joinType>."
]
},
"_LEGACY_ERROR_TEMP_1320" : {
"message" : [
"Typed column <typedCol> that needs input type and schema cannot be passed in untyped `select` API. Use the typed `Dataset.select` API instead."
Expand Down Expand Up @@ -8146,16 +8158,6 @@
"Expected a Boolean type expression in replaceNullWithFalse, but got the type <dataType> in <expr>."
]
},
"_LEGACY_ERROR_TEMP_3216" : {
"message" : [
"Unsupported join type '<typ>'. Supported join types include: <supported>."
]
},
"_LEGACY_ERROR_TEMP_3217" : {
"message" : [
"Unsupported as-of join direction '<direction>'. Supported as-of join direction include: <supported>."
]
},
"_LEGACY_ERROR_TEMP_3218" : {
"message" : [
"Must be 2 children: <others>"
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/connect/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
from pyspark.sql.connect.expressions import Expression
from pyspark.sql.connect.types import pyspark_types_to_proto_types, UnparsedDataType
from pyspark.errors import (
AnalysisException,
PySparkValueError,
PySparkPicklingError,
IllegalArgumentException,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -887,7 +887,7 @@ def __init__(
elif how == "cross":
join_type = proto.Join.JoinType.JOIN_TYPE_CROSS
else:
raise IllegalArgumentException(
raise AnalysisException(
error_class="UNSUPPORTED_JOIN_TYPE",
message_parameters={"join_type": how},
)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ def test_join_without_on(self):
def test_invalid_join_method(self):
df1 = self.spark.createDataFrame([("Alice", 5), ("Bob", 8)], ["name", "age"])
df2 = self.spark.createDataFrame([("Alice", 80), ("Bob", 90)], ["name", "height"])
self.assertRaises(IllegalArgumentException, lambda: df1.join(df2, how="invalid-join-type"))
self.assertRaises(AnalysisException, lambda: df1.join(df2, how="invalid-join-type"))

# Cartesian products require cross join syntax
def test_require_cross(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,22 @@ package org.apache.spark.sql.catalyst.plans

import java.util.Locale

import org.apache.spark.{SparkIllegalArgumentException, SparkUnsupportedOperationException}
import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.Attribute

object JoinType {

val supported = Seq(
"inner",
"outer", "full", "fullouter", "full_outer",
"leftouter", "left", "left_outer",
"rightouter", "right", "right_outer",
"leftsemi", "left_semi", "semi",
"leftanti", "left_anti", "anti",
"cross"
)

def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {
case "inner" => Inner
case "outer" | "full" | "fullouter" => FullOuter
Expand All @@ -32,20 +44,12 @@ object JoinType {
case "leftanti" | "anti" => LeftAnti
case "cross" => Cross
case _ =>
val supported = Seq(
"inner",
"outer", "full", "fullouter", "full_outer",
"leftouter", "left", "left_outer",
"rightouter", "right", "right_outer",
"leftsemi", "left_semi", "semi",
"leftanti", "left_anti", "anti",
"cross")

throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3216",
throw new AnalysisException(
errorClass = "UNSUPPORTED_JOIN_TYPE",
messageParameters = Map(
"typ" -> typ,
"supported" -> supported.mkString("'", "', '", "'")))
"supported" -> supported.mkString("'", "', '", "'"))
)
}
}

Expand Down Expand Up @@ -129,15 +133,16 @@ object LeftSemiOrAnti {

object AsOfJoinDirection {

val supported = Seq("forward", "backward", "nearest")

def apply(direction: String): AsOfJoinDirection = {
direction.toLowerCase(Locale.ROOT) match {
case "forward" => Forward
case "backward" => Backward
case "nearest" => Nearest
case _ =>
val supported = Seq("forward", "backward", "nearest")
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3217",
throw new AnalysisException(
errorClass = "AS_OF_JOIN.UNSUPPORTED_DIRECTION",
messageParameters = Map(
"direction" -> direction,
"supported" -> supported.mkString("'", "', '", "'")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3316,7 +3316,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat

def invalidJoinTypeInJoinWithError(joinType: JoinType): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1319",
errorClass = "INVALID_JOIN_TYPE_FOR_JOINWITH",
messageParameters = Map("joinType" -> joinType.sql))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.spark.sql.catalyst.plans

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException

class JoinTypesTest extends SparkFunSuite {

Expand Down Expand Up @@ -61,4 +62,18 @@ class JoinTypesTest extends SparkFunSuite {
assert(JoinType("cross") === Cross)
}

test("unsupported join type") {
val joinType = "unknown"
checkError(
exception = intercept[AnalysisException](
JoinType(joinType)
),
errorClass = "UNSUPPORTED_JOIN_TYPE",
sqlState = "0A000",
parameters = Map(
"typ" -> joinType,
"supported" -> JoinType.supported.mkString("'", "', '", "'")
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.plans.AsOfJoinDirection
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -123,6 +124,24 @@ class DataFrameAsOfJoinSuite extends QueryTest
parameters = Map.empty)
}

test("as-of join - unsupported direction") {
val (df1, df2) = prepareForAsOfJoin()
val direction = "unknown"
checkError(
exception = intercept[AnalysisException] {
df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq.empty,
joinType = "inner", tolerance = lit(-1), allowExactMatches = true,
direction = direction)
},
errorClass = "AS_OF_JOIN.UNSUPPORTED_DIRECTION",
sqlState = "42604",
parameters = Map(
"direction" -> direction,
"supported" -> AsOfJoinDirection.supported.mkString("'", "', '", "'")
)
)
}

test("as-of join - allowExactMatches = false") {
val (df1, df2) = prepareForAsOfJoin()
checkAnswer(
Expand Down
33 changes: 14 additions & 19 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.{FooClassWithEnum, FooEnum, ScroogeLikeExam
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoders, ExpressionEncoder, OuterScopes}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.BoxedIntEncoder
import org.apache.spark.sql.catalyst.expressions.{CodegenObjectFactoryMode, GenericRowWithSchema}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.trees.DataFrameQueryContext
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SQLExecution}
Expand Down Expand Up @@ -542,25 +542,20 @@ class DatasetSuite extends QueryTest
val ds1 = Seq(1, 2, 3).toDS().as("a")
val ds2 = Seq(1, 2).toDS().as("b")

val e1 = intercept[AnalysisException] {
ds1.joinWith(ds2, $"a.value" === $"b.value", "left_semi")
}.getMessage
assert(e1.contains("Invalid join type in joinWith: " + LeftSemi.sql))

val e2 = intercept[AnalysisException] {
ds1.joinWith(ds2, $"a.value" === $"b.value", "semi")
}.getMessage
assert(e2.contains("Invalid join type in joinWith: " + LeftSemi.sql))

val e3 = intercept[AnalysisException] {
ds1.joinWith(ds2, $"a.value" === $"b.value", "left_anti")
}.getMessage
assert(e3.contains("Invalid join type in joinWith: " + LeftAnti.sql))
def checkJoinWithJoinType(joinType: String): Unit = {
val semiErrorParameters = Map("joinType" -> JoinType(joinType).sql)
checkError(
exception = intercept[AnalysisException](
ds1.joinWith(ds2, $"a.value" === $"b.value", joinType)
),
errorClass = "INVALID_JOIN_TYPE_FOR_JOINWITH",
sqlState = "42613",
parameters = semiErrorParameters
)
}

val e4 = intercept[AnalysisException] {
ds1.joinWith(ds2, $"a.value" === $"b.value", "anti")
}.getMessage
assert(e4.contains("Invalid join type in joinWith: " + LeftAnti.sql))
Seq("leftsemi", "left_semi", "semi", "leftanti", "left_anti", "anti")
.foreach(checkJoinWithJoinType(_))
}

test("groupBy function, keys") {
Expand Down