Skip to content

Commit 8762e25

Browse files
uros-dbmihailomilosevic2001
authored andcommitted
[SPARK-47296][SQL][COLLATION] Fail unsupported functions for non-binary collations
### What changes were proposed in this pull request? ### Why are the changes needed? Currently, all `StringType` arguments passed to built-in string functions in Spark SQL get treated as binary strings. This behaviour is incorrect for almost all collationIds except the default (0), and we should instead warn the user if they try to use an unsupported collation for the given function. Over time, we should implement the appropriate support for these (function, collation) pairs, but until then - we should have a way to fail unsupported statements in query analysis. ### Does this PR introduce _any_ user-facing change? Yes, users will now get appropriate errors when they try to use an unsupported collation with a given string function. ### How was this patch tested? Tests in CollationSuite to check if these functions work for binary collations and throw exceptions for others. ### Was this patch authored or co-authored using generative AI tooling? Yes. Closes #45422 from uros-db/regexp-functions. Lead-authored-by: Uros Bojanic <[email protected]> Co-authored-by: Mihailo Milosevic <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent a3c04ec commit 8762e25

File tree

9 files changed

+637
-29
lines changed

9 files changed

+637
-29
lines changed

sql/api/src/main/scala/org/apache/spark/sql/types/StringType.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class StringType private(val collationId: Int) extends AtomicType with Serializa
4040
* equality and hashing).
4141
*/
4242
def isBinaryCollation: Boolean = CollationFactory.fetchCollation(collationId).isBinaryCollation
43+
def isLowercaseCollation: Boolean = collationId == CollationFactory.LOWERCASE_COLLATION_ID
4344

4445
/**
4546
* Type name that is shown to the customer.
@@ -54,8 +55,6 @@ class StringType private(val collationId: Int) extends AtomicType with Serializa
5455

5556
override def hashCode(): Int = collationId.hashCode()
5657

57-
override private[sql] def acceptsType(other: DataType): Boolean = other.isInstanceOf[StringType]
58-
5958
/**
6059
* The default size of a value of the StringType is 20 bytes.
6160
*/
@@ -65,6 +64,8 @@ class StringType private(val collationId: Int) extends AtomicType with Serializa
6564
}
6665

6766
/**
67+
* Use StringType for expressions supporting only binary collation.
68+
*
6869
* @since 1.3.0
6970
*/
7071
@Stable

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,11 @@ object AnsiTypeCoercion extends TypeCoercionBase {
186186
case (NullType, target) if !target.isInstanceOf[TypeCollection] =>
187187
Some(target.defaultConcreteType)
188188

189+
// If a function expects a StringType, no StringType instance should be implicitly cast to
190+
// StringType with a collation that's not accepted (aka. lockdown unsupported collations).
191+
case (_: StringType, StringType) => None
192+
case (_: StringType, _: StringTypeCollated) => None
193+
189194
// This type coercion system will allow implicit converting String type as other
190195
// primitive types, in case of breaking too many existing Spark SQL queries.
191196
case (StringType, a: AtomicType) =>
@@ -215,6 +220,16 @@ object AnsiTypeCoercion extends TypeCoercionBase {
215220
None
216221
}
217222

223+
// "canANSIStoreAssign" doesn't account for targets extending StringTypeCollated, but
224+
// ANSIStoreAssign is generally expected to work with StringTypes
225+
case (_, st: StringTypeCollated) =>
226+
if (Cast.canANSIStoreAssign(inType, st.defaultConcreteType)) {
227+
Some(st.defaultConcreteType)
228+
}
229+
else {
230+
None
231+
}
232+
218233
// When we reach here, input type is not acceptable for any types in this type collection,
219234
// try to find the first one we can implicitly cast.
220235
case (_, TypeCollection(types)) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -995,7 +995,9 @@ object TypeCoercion extends TypeCoercionBase {
995995
case (StringType, AnyTimestampType) => AnyTimestampType.defaultConcreteType
996996
case (StringType, BinaryType) => BinaryType
997997
// Cast any atomic type to string.
998-
case (any: AtomicType, StringType) if any != StringType => StringType
998+
case (any: AtomicType, StringType) if !any.isInstanceOf[StringType] => StringType
999+
case (any: AtomicType, st: StringTypeCollated)
1000+
if !any.isInstanceOf[StringType] => st.defaultConcreteType
9991001

10001002
// When we reach here, input type is not acceptable for any types in this type collection,
10011003
// try to find the first one we can implicitly cast.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions
19+
20+
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
21+
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
22+
import org.apache.spark.sql.catalyst.util.CollationFactory
23+
import org.apache.spark.sql.types.{AbstractDataType, DataType, StringType}
24+
25+
object CollationTypeConstraints {
26+
27+
def checkCollationCompatibility(collationId: Int, dataTypes: Seq[DataType]): TypeCheckResult = {
28+
val collationName = CollationFactory.fetchCollation(collationId).collationName
29+
// Additional check needed for collation compatibility
30+
dataTypes.collectFirst {
31+
case stringType: StringType if stringType.collationId != collationId =>
32+
val collation = CollationFactory.fetchCollation(stringType.collationId)
33+
DataTypeMismatch(
34+
errorSubClass = "COLLATION_MISMATCH",
35+
messageParameters = Map(
36+
"collationNameLeft" -> collationName,
37+
"collationNameRight" -> collation.collationName
38+
)
39+
)
40+
} getOrElse TypeCheckResult.TypeCheckSuccess
41+
}
42+
43+
}
44+
45+
/**
46+
* StringTypeCollated is an abstract class for StringType with collation support.
47+
*/
48+
abstract class StringTypeCollated extends AbstractDataType {
49+
override private[sql] def defaultConcreteType: DataType = StringType
50+
}
51+
52+
/**
53+
* Use StringTypeBinary for expressions supporting only binary collation.
54+
*/
55+
case object StringTypeBinary extends StringTypeCollated {
56+
override private[sql] def simpleString: String = "string_binary"
57+
override private[sql] def acceptsType(other: DataType): Boolean =
58+
other.isInstanceOf[StringType] && other.asInstanceOf[StringType].isBinaryCollation
59+
}
60+
61+
/**
62+
* Use StringTypeBinaryLcase for expressions supporting only binary and lowercase collation.
63+
*/
64+
case object StringTypeBinaryLcase extends StringTypeCollated {
65+
override private[sql] def simpleString: String = "string_binary_lcase"
66+
override private[sql] def acceptsType(other: DataType): Boolean =
67+
other.isInstanceOf[StringType] && (other.asInstanceOf[StringType].isBinaryCollation ||
68+
other.asInstanceOf[StringType].isLowercaseCollation)
69+
}
70+
71+
/**
72+
* Use StringTypeAnyCollation for expressions supporting all possible collation types.
73+
*/
74+
case object StringTypeAnyCollation extends StringTypeCollated {
75+
override private[sql] def simpleString: String = "string_any_collation"
76+
override private[sql] def acceptsType(other: DataType): Boolean = other.isInstanceOf[StringType]
77+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collationExpressions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ case class Collate(child: Expression, collationName: String)
8282
extends UnaryExpression with ExpectsInputTypes {
8383
private val collationId = CollationFactory.collationNameToId(collationName)
8484
override def dataType: DataType = StringType(collationId)
85-
override def inputTypes: Seq[AbstractDataType] = Seq(StringType)
85+
override def inputTypes: Seq[AbstractDataType] = Seq(StringTypeAnyCollation)
8686

8787
override protected def withNewChildInternal(
8888
newChild: Expression): Expression = copy(newChild)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -427,8 +427,8 @@ trait String2StringExpression extends ImplicitCastInputTypes {
427427

428428
def convert(v: UTF8String): UTF8String
429429

430-
override def dataType: DataType = StringType
431-
override def inputTypes: Seq[DataType] = Seq(StringType)
430+
override def dataType: DataType = child.dataType
431+
override def inputTypes: Seq[AbstractDataType] = Seq(StringTypeAnyCollation)
432432

433433
protected override def nullSafeEval(input: Any): Any =
434434
convert(input.asInstanceOf[UTF8String])
@@ -501,26 +501,15 @@ abstract class StringPredicate extends BinaryExpression
501501

502502
def compare(l: UTF8String, r: UTF8String): Boolean
503503

504-
override def inputTypes: Seq[DataType] = Seq(StringType, StringType)
504+
override def inputTypes: Seq[AbstractDataType] =
505+
Seq(StringTypeAnyCollation, StringTypeAnyCollation)
505506

506507
override def checkInputDataTypes(): TypeCheckResult = {
507-
val checkResult = super.checkInputDataTypes()
508-
if (checkResult.isFailure) {
509-
return checkResult
510-
}
511-
// Additional check needed for collation compatibility
512-
val rightCollationId: Int = right.dataType.asInstanceOf[StringType].collationId
513-
if (collationId != rightCollationId) {
514-
DataTypeMismatch(
515-
errorSubClass = "COLLATION_MISMATCH",
516-
messageParameters = Map(
517-
"collationNameLeft" -> CollationFactory.fetchCollation(collationId).collationName,
518-
"collationNameRight" -> CollationFactory.fetchCollation(rightCollationId).collationName
519-
)
520-
)
521-
} else {
522-
TypeCheckResult.TypeCheckSuccess
508+
val defaultCheck = super.checkInputDataTypes()
509+
if (defaultCheck.isFailure) {
510+
return defaultCheck
523511
}
512+
CollationTypeConstraints.checkCollationCompatibility(collationId, children.map(_.dataType))
524513
}
525514

526515
protected override def nullSafeEval(input1: Any, input2: Any): Any =
@@ -1976,7 +1965,7 @@ case class Substring(str: Expression, pos: Expression, len: Expression)
19761965
override def dataType: DataType = str.dataType
19771966

19781967
override def inputTypes: Seq[AbstractDataType] =
1979-
Seq(TypeCollection(StringType, BinaryType), IntegerType, IntegerType)
1968+
Seq(TypeCollection(StringTypeAnyCollation, BinaryType), IntegerType, IntegerType)
19801969

19811970
override def first: Expression = str
19821971
override def second: Expression = pos

0 commit comments

Comments
 (0)