-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21646][SQL] Add new type coercion to compatible with Hive #18853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
f59a213
bc83848
cedb239
8d37c72
522c4cd
3bec6a2
844aec7
7812018
27d5b13
53d673f
8da0cf0
2ada11a
d34f294
b99fb60
0d9cf69
22d0355
558ff90
663eb35
7802483
dffe5d2
97a071d
408e889
e763330
81067b9
d0a2089
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -115,21 +115,46 @@ object TypeCoercion { | |
| * is a String and the other is not. It also handles when one op is a Date and the | ||
| * other is a Timestamp by making the target type to be String. | ||
| */ | ||
| val findCommonTypeForBinaryComparison: (DataType, DataType) => Option[DataType] = { | ||
| // We should cast all relative timestamp/date/string comparison into string comparisons | ||
| // This behaves as a user would expect because timestamp strings sort lexicographically. | ||
| // i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true | ||
| case (StringType, DateType) => Some(StringType) | ||
| case (DateType, StringType) => Some(StringType) | ||
| case (StringType, TimestampType) => Some(StringType) | ||
| case (TimestampType, StringType) => Some(StringType) | ||
| case (TimestampType, DateType) => Some(StringType) | ||
| case (DateType, TimestampType) => Some(StringType) | ||
| case (StringType, NullType) => Some(StringType) | ||
| case (NullType, StringType) => Some(StringType) | ||
| case (l: StringType, r: AtomicType) if r != StringType => Some(r) | ||
| case (l: AtomicType, r: StringType) if (l != StringType) => Some(l) | ||
| case (l, r) => None | ||
| private def findCommonTypeForBinaryComparison( | ||
| plan: LogicalPlan, | ||
| l: DataType, | ||
| r: DataType): Option[DataType] = | ||
| if (!plan.conf.isHiveTypeCoercionMode) { | ||
| (l, r) match { | ||
| // We should cast all relative timestamp/date/string comparison into string comparisons | ||
| // This behaves as a user would expect because timestamp strings sort lexicographically. | ||
| // i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true | ||
|
||
| case (StringType, DateType) => Some(StringType) | ||
| case (DateType, StringType) => Some(StringType) | ||
| case (StringType, TimestampType) => Some(StringType) | ||
| case (TimestampType, StringType) => Some(StringType) | ||
| case (TimestampType, DateType) => Some(StringType) | ||
| case (DateType, TimestampType) => Some(StringType) | ||
| case (StringType, NullType) => Some(StringType) | ||
| case (NullType, StringType) => Some(StringType) | ||
| case (l: StringType, r: AtomicType) if r != StringType => Some(r) | ||
| case (l: AtomicType, r: StringType) if (l != StringType) => Some(l) | ||
| case (l, r) => None | ||
| } | ||
| } else { | ||
| (l, r) match { | ||
| // Follow hive's binary comparison action: | ||
| // https://github.com/apache/hive/blob/rel/storage-release-2.4.0/ql/src/java/ | ||
| // org/apache/hadoop/hive/ql/exec/FunctionRegistry.java#L781 | ||
| case (StringType, DateType) => Some(DateType) | ||
| case (DateType, StringType) => Some(DateType) | ||
| case (StringType, TimestampType) => Some(TimestampType) | ||
| case (TimestampType, StringType) => Some(TimestampType) | ||
| case (TimestampType, DateType) => Some(TimestampType) | ||
| case (DateType, TimestampType) => Some(TimestampType) | ||
| case (StringType, NullType) => Some(StringType) | ||
| case (NullType, StringType) => Some(StringType) | ||
| case (StringType | TimestampType, r: NumericType) => Some(DoubleType) | ||
| case (l: NumericType, StringType | TimestampType) => Some(DoubleType) | ||
| case (l: StringType, r: AtomicType) if r != StringType => Some(r) | ||
| case (l: AtomicType, r: StringType) if l != StringType => Some(l) | ||
| case _ => None | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -352,10 +377,9 @@ object TypeCoercion { | |
| p.makeCopy(Array(Cast(left, TimestampType), right)) | ||
| case p @ Equality(left @ TimestampType(), right @ StringType()) => | ||
| p.makeCopy(Array(left, Cast(right, TimestampType))) | ||
|
|
||
| case p @ BinaryComparison(left, right) | ||
| if findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined => | ||
| val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType).get | ||
| if findCommonTypeForBinaryComparison(plan, left.dataType, right.dataType).isDefined => | ||
| val commonType = findCommonTypeForBinaryComparison(plan, left.dataType, right.dataType).get | ||
| p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType))) | ||
|
|
||
| case Abs(e @ StringType()) => Abs(Cast(e, DoubleType)) | ||
|
|
@@ -411,8 +435,9 @@ object TypeCoercion { | |
| val rhs = sub.output | ||
|
|
||
| val commonTypes = lhs.zip(rhs).flatMap { case (l, r) => | ||
| findCommonTypeForBinaryComparison(l.dataType, r.dataType) | ||
| .orElse(findTightestCommonType(l.dataType, r.dataType)) | ||
| findCommonTypeForBinaryComparison(plan, l.dataType, r.dataType) | ||
| .orElse(findTightestCommonType(l.dataType, r.dataType)) | ||
|
|
||
| } | ||
|
|
||
| // The number of columns/expressions must match between LHS and RHS of an | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,7 @@ package org.apache.spark.sql | |
| import java.io.File | ||
| import java.math.MathContext | ||
| import java.net.{MalformedURLException, URL} | ||
| import java.sql.Timestamp | ||
| import java.sql.{Date, Timestamp} | ||
| import java.util.concurrent.atomic.AtomicBoolean | ||
|
|
||
| import org.apache.spark.{AccumulatorSuite, SparkException} | ||
|
|
@@ -2677,4 +2677,142 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { | |
| checkAnswer(df, Row(1, 1, 1)) | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs NumericType") { | ||
| withTempView("v") { | ||
| val str1 = Long.MaxValue.toString + "1" | ||
| val str2 = Int.MaxValue.toString + "1" | ||
| val str3 = "10" | ||
| Seq(str1, str2, str3).toDF("c1").createOrReplaceTempView("v") | ||
| withSQLConf(SQLConf.typeCoercionMode.key -> "hive") { | ||
| checkAnswer(sql("SELECT c1 from v where c1 > 0"), | ||
| Row(str1) :: Row(str2) :: Row(str3) :: Nil) | ||
| checkAnswer(sql("SELECT c1 from v where c1 > 0L"), | ||
| Row(str1) :: Row(str2) :: Row(str3) :: Nil) | ||
| } | ||
|
|
||
| withSQLConf(SQLConf.typeCoercionMode.key -> "default") { | ||
| checkAnswer(sql("SELECT c1 from v where c1 > 0"), Row(str3) :: Nil) | ||
| checkAnswer(sql("SELECT c1 from v where c1 > 0L"), Row(str2) :: Row(str3) :: Nil) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-21646: CommonTypeForBinaryComparison: DoubleType vs IntegerType") { | ||
| withTempView("v") { | ||
| Seq(("0", 1), ("-0.4", 2), ("0.6", 3)).toDF("c1", "c2").createOrReplaceTempView("v") | ||
| withSQLConf(SQLConf.typeCoercionMode.key -> "hive") { | ||
| checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0"), Seq(Row("0"))) | ||
| checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0L"), Seq(Row("0"))) | ||
| checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0.0"), Seq(Row("0"))) | ||
| checkAnswer(sql("SELECT c1 FROM v WHERE c1 = -0.4"), Seq(Row("-0.4"))) | ||
| checkAnswer(sql("SELECT count(*) FROM v WHERE c1 > 0"), Row(1) :: Nil) | ||
| } | ||
|
|
||
| withSQLConf(SQLConf.typeCoercionMode.key -> "default") { | ||
| checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0"), Seq(Row("0"), Row("-0.4"), Row("0.6"))) | ||
| checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0L"), Seq(Row("0"), Row("-0.4"), Row("0.6"))) | ||
| checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0.0"), Seq(Row("0"))) | ||
| checkAnswer(sql("SELECT c1 FROM v WHERE c1 = -0.4"), Seq(Row("-0.4"))) | ||
| checkAnswer(sql("SELECT count(*) FROM v WHERE c1 > 0"), Row(0) :: Nil) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs DateType") { | ||
| withTempView("v") { | ||
| val v1 = Date.valueOf("2017-09-22") | ||
| val v2 = Date.valueOf("2017-09-09") | ||
| Seq(v1, v2).toDF("c1").createTempView("v") | ||
| withSQLConf(SQLConf.typeCoercionMode.key -> "hive") { | ||
| checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Row(v1) :: Row(v2) :: Nil) | ||
| checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as date)"), | ||
| Row(v1) :: Row(v2) :: Nil) | ||
| } | ||
|
|
||
| withSQLConf(SQLConf.typeCoercionMode.key -> "default") { | ||
| checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Nil) | ||
| checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as date)"), | ||
| Row(v1) :: Row(v2) :: Nil) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs TimestampType") { | ||
| withTempView("v") { | ||
| val v1 = Timestamp.valueOf("2017-07-21 23:42:12.123") | ||
| val v2 = Timestamp.valueOf("2017-08-21 23:42:12.123") | ||
| Seq(v1, v2).toDF("c1").createTempView("v") | ||
| withSQLConf(SQLConf.typeCoercionMode.key -> "hive") { | ||
| checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Row(v2) :: Nil) | ||
| checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as timestamp)"), | ||
| Row(v2) :: Nil) | ||
| } | ||
|
|
||
| withSQLConf(SQLConf.typeCoercionMode.key -> "default") { | ||
| checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Nil) | ||
| checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as timestamp)"), | ||
| Row(v2) :: Nil) | ||
|
||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-21646: CommonTypeForBinaryComparison: TimestampType vs DateType") { | ||
| withTempView("v") { | ||
| val v1 = Timestamp.valueOf("2017-07-21 23:42:12.123") | ||
| val v2 = Timestamp.valueOf("2017-08-21 23:42:12.123") | ||
| Seq(v1, v2).toDF("c1").createTempView("v") | ||
| withSQLConf(SQLConf.typeCoercionMode.key -> "Hive") { | ||
| checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as date)"), Row(v2) :: Nil) | ||
| checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as timestamp)"), | ||
| Row(v2) :: Nil) | ||
| } | ||
|
|
||
| withSQLConf(SQLConf.typeCoercionMode.key -> "Default") { | ||
| checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as date)"), Row(v2) :: Nil) | ||
| checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as timestamp)"), | ||
| Row(v2) :: Nil) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-21646: CommonTypeForBinaryComparison: TimestampType vs NumericType") { | ||
| withTempView("v") { | ||
| val v1 = Timestamp.valueOf("2017-07-21 23:42:12.123") | ||
| val v2 = Timestamp.valueOf("2017-08-21 23:42:12.123") | ||
| Seq(v1, v2).toDF("c1").createTempView("v") | ||
| withSQLConf(SQLConf.typeCoercionMode.key -> "hive") { | ||
| checkAnswer(sql("select c1 from v where c1 > 1"), Row(v1) :: Row(v2) :: Nil) | ||
| checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Row(v2) :: Nil) | ||
| checkAnswer(sql("select c1 from v where c1 > '2017-08-01'"), Row(v2) :: Nil) | ||
| checkAnswer( | ||
| sql("select * from v where c1 > cast(cast('2017-08-01' as timestamp) as double)"), | ||
| Row(v2) :: Nil) | ||
| } | ||
|
|
||
| withSQLConf(SQLConf.typeCoercionMode.key -> "default") { | ||
| val e1 = intercept[AnalysisException] { | ||
| sql("select * from v where c1 > 1") | ||
| } | ||
| assert(e1.getMessage.contains("data type mismatch")) | ||
| checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Nil) | ||
| checkAnswer(sql("select c1 from v where c1 > '2017-08-01'"), Row(v2) :: Nil) | ||
| val e2 = intercept[AnalysisException] { | ||
| sql("select * from v where c1 > cast(cast('2017-08-01' as timestamp) as double)") | ||
| } | ||
| assert(e2.getMessage.contains("data type mismatch")) | ||
| } | ||
|
|
||
| val e1 = intercept[AnalysisException] { | ||
| sql("select * from v where c1 > 1") | ||
| } | ||
| assert(e1.getMessage.contains("data type mismatch")) | ||
| checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Nil) | ||
| checkAnswer(sql("select c1 from v where c1 > '2017-08-01'"), Row(v2) :: Nil) | ||
| val e2 = intercept[AnalysisException] { | ||
| sql("select * from v where c1 > cast(cast('2017-08-01' as timestamp) as double)") | ||
| } | ||
| assert(e2.getMessage.contains("data type mismatch")) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This description feels inadequate to me. I think most users will think "hive" means "old, legacy way of doing things and "default" means "new, better, spark way of doing things". But I haven't heard an argument in favor of the "default" behavior, just that we don't want to have a breaking change of behavior.
So (a) I'd advocate that we rename "default" to "legacy", or something else along those lines. I do think it should be the default value, to avoid changing behavior.
and (b) I think the doc section here should more clearly indicate the difference, eg. "The 'legacy' typeCoercion mode was used in spark prior to 2.3, and so it continues to be the default to avoid breaking behavior. However, it has logical inconsistencies. The 'hive' mode is preferred for most new applications, though it may require additional manual casting.
I am even wondering if we should have a 3rd option, to not implicit cast across type categories, eg. like postgres, as this avoids nasty surprises for the user. While the casts are convenient, when it doesn't work there is very little indication to the user that anything went wrong -- most likely they'll just keep continue processing data though the results don't actually have the semantics they want.