-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21646][SQL] Add new type coercion to compatible with Hive #18853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
f59a213
bc83848
cedb239
8d37c72
522c4cd
3bec6a2
844aec7
7812018
27d5b13
53d673f
8da0cf0
2ada11a
d34f294
b99fb60
0d9cf69
22d0355
558ff90
663eb35
7802483
dffe5d2
97a071d
408e889
e763330
81067b9
d0a2089
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -132,6 +132,28 @@ object TypeCoercion { | |
| case (l, r) => None | ||
| } | ||
|
|
||
| /** | ||
| * Follow hive's binary comparison action: | ||
| * https://github.com/apache/hive/blob/rel/storage-release-2.4.0/ql/src/java/ | ||
| * org/apache/hadoop/hive/ql/exec/FunctionRegistry.java#L781 | ||
| */ | ||
| val findCommonTypeCompatibleWithHive: (DataType, DataType) => | ||
| Option[DataType] = { | ||
| case (StringType, DateType) => Some(DateType) | ||
| case (DateType, StringType) => Some(DateType) | ||
| case (StringType, TimestampType) => Some(TimestampType) | ||
| case (TimestampType, StringType) => Some(TimestampType) | ||
| case (TimestampType, DateType) => Some(TimestampType) | ||
| case (DateType, TimestampType) => Some(TimestampType) | ||
| case (StringType, NullType) => Some(StringType) | ||
| case (NullType, StringType) => Some(StringType) | ||
| case (StringType | TimestampType, r: NumericType) => Some(DoubleType) | ||
| case (l: NumericType, StringType | TimestampType) => Some(DoubleType) | ||
| case (l: StringType, r: AtomicType) if r != StringType => Some(r) | ||
| case (l: AtomicType, r: StringType) if l != StringType => Some(l) | ||
| case (l, r) => None | ||
| } | ||
|
|
||
| /** | ||
| * Case 2 type widening (see the classdoc comment above for TypeCoercion). | ||
| * | ||
|
|
@@ -352,11 +374,16 @@ object TypeCoercion { | |
| p.makeCopy(Array(Cast(left, TimestampType), right)) | ||
| case p @ Equality(left @ TimestampType(), right @ StringType()) => | ||
| p.makeCopy(Array(left, Cast(right, TimestampType))) | ||
|
|
||
| case p @ BinaryComparison(left, right) | ||
| if findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined => | ||
| if !plan.conf.binaryComparisonCompatibleWithHive && | ||
| findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined => | ||
| val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType).get | ||
| p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType))) | ||
| case p @ BinaryComparison(left, right) | ||
| if plan.conf.binaryComparisonCompatibleWithHive && | ||
| findCommonTypeCompatibleWithHive(left.dataType, right.dataType).isDefined => | ||
| val commonType = findCommonTypeCompatibleWithHive(left.dataType, right.dataType).get | ||
| p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType))) | ||
|
|
||
| case Abs(e @ StringType()) => Abs(Cast(e, DoubleType)) | ||
| case Sum(e @ StringType()) => Sum(Cast(e, DoubleType)) | ||
|
|
@@ -411,8 +438,13 @@ object TypeCoercion { | |
| val rhs = sub.output | ||
|
|
||
| val commonTypes = lhs.zip(rhs).flatMap { case (l, r) => | ||
| findCommonTypeForBinaryComparison(l.dataType, r.dataType) | ||
| .orElse(findTightestCommonType(l.dataType, r.dataType)) | ||
| if (plan.conf.binaryComparisonCompatibleWithHive) { | ||
| findCommonTypeCompatibleWithHive(l.dataType, r.dataType) | ||
| .orElse(findTightestCommonType(l.dataType, r.dataType)) | ||
| } else { | ||
| findCommonTypeForBinaryComparison(l.dataType, r.dataType) | ||
| .orElse(findTightestCommonType(l.dataType, r.dataType)) | ||
|
||
| } | ||
| } | ||
|
|
||
| // The number of columns/expressions must match between LHS and RHS of an | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -925,6 +925,12 @@ object SQLConf { | |
| .intConf | ||
| .createWithDefault(10000) | ||
|
|
||
| val BINARY_COMPARISON_COMPATIBLE_WITH_HIVE = | ||
| buildConf("spark.sql.binary.comparison.compatible.with.hive") | ||
|
||
| .doc("Whether compatible with Hive when binary comparison.") | ||
|
||
| .booleanConf | ||
| .createWithDefault(true) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has to be |
||
|
|
||
| object Deprecated { | ||
| val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" | ||
| } | ||
|
|
@@ -1203,6 +1209,9 @@ class SQLConf extends Serializable with Logging { | |
|
|
||
| def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH) | ||
|
|
||
| def binaryComparisonCompatibleWithHive: Boolean = | ||
| getConf(SQLConf.BINARY_COMPARISON_COMPATIBLE_WITH_HIVE) | ||
|
|
||
| /** ********************** SQLConf functionality methods ************ */ | ||
|
|
||
| /** Set Spark SQL configuration properties. */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1966,7 +1966,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { | |
|
|
||
| test("SPARK-17913: compare long and string type column may return confusing result") { | ||
| val df = Seq(123L -> "123", 19157170390056973L -> "19157170390056971").toDF("i", "j") | ||
| checkAnswer(df.select($"i" === $"j"), Row(true) :: Row(false) :: Nil) | ||
| checkAnswer(df.select($"i" === $"j"), Row(true) :: Row(true) :: Nil) | ||
|
||
| } | ||
|
|
||
| test("SPARK-19691 Calculating percentile of decimal column fails with ClassCastException") { | ||
|
|
||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is hard to maintain and debug. Instead of mixing them together, could you separate it from the current rule Spark uses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needs 2
PromoteStringsand 2InConversionif separate from current rule, So I merge the logic tofindCommonTypeForBinaryComparison.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine to have two separate rules, but we can call the shared functions.