-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21646][SQL] Add new type coercion to compatible with Hive #18853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
f59a213
bc83848
cedb239
8d37c72
522c4cd
3bec6a2
844aec7
7812018
27d5b13
53d673f
8da0cf0
2ada11a
d34f294
b99fb60
0d9cf69
22d0355
558ff90
663eb35
7802483
dffe5d2
97a071d
408e889
e763330
81067b9
d0a2089
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1460,6 +1460,13 @@ that these options will be deprecated in future release as more optimizations ar | |
| Configures the number of partitions to use when shuffling data for joins or aggregations. | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.sql.typeCoercion.mode</code></td> | ||
| <td><code>legacy</code></td> | ||
| <td> | ||
| The <code>legacy</code> type coercion mode was used in spark prior to 2.3, and so it continues to be the default to avoid breaking behavior. However, it has logical inconsistencies. The <code>hive</code> mode is preferred for most new applications, though it may require additional manual casting. | ||
|
||
| </td> | ||
| </tr> | ||
| </table> | ||
|
|
||
| # Distributed SQL Engine | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._ | |
| import org.apache.spark.sql.catalyst.expressions.aggregate._ | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
|
|
||
|
|
@@ -44,11 +45,9 @@ import org.apache.spark.sql.types._ | |
| */ | ||
| object TypeCoercion { | ||
|
|
||
| val typeCoercionRules = | ||
| private val commonTypeCoercionRules = | ||
| PropagateTypes :: | ||
| InConversion :: | ||
| WidenSetOperationTypes :: | ||
| PromoteStrings :: | ||
| DecimalPrecision :: | ||
| BooleanEquality :: | ||
| FunctionArgumentConversion :: | ||
|
|
@@ -62,6 +61,18 @@ object TypeCoercion { | |
| WindowFrameCoercion :: | ||
| Nil | ||
|
|
||
| def rules(conf: SQLConf): List[Rule[LogicalPlan]] = { | ||
| if (conf.isHiveTypeCoercionMode) { | ||
| commonTypeCoercionRules :+ | ||
| HiveInConversion :+ | ||
| HivePromoteStrings | ||
| } else { | ||
| commonTypeCoercionRules :+ | ||
| InConversion :+ | ||
| PromoteStrings | ||
|
||
| } | ||
| } | ||
|
|
||
| // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types. | ||
| // The conversion for integral and floating point types have a linear widening hierarchy: | ||
| val numericPrecedence = | ||
|
|
@@ -132,6 +143,25 @@ object TypeCoercion { | |
| case (l, r) => None | ||
| } | ||
|
|
||
| val findCommonTypeToCompatibleWithHive: (DataType, DataType) => Option[DataType] = { | ||
| // Follow hive's binary comparison action: | ||
| // https://github.com/apache/hive/blob/rel/storage-release-2.4.0/ql/src/java/ | ||
| // org/apache/hadoop/hive/ql/exec/FunctionRegistry.java#L781 | ||
|
||
| case (StringType, DateType) => Some(DateType) | ||
| case (DateType, StringType) => Some(DateType) | ||
| case (StringType, TimestampType) => Some(TimestampType) | ||
| case (TimestampType, StringType) => Some(TimestampType) | ||
| case (TimestampType, DateType) => Some(TimestampType) | ||
| case (DateType, TimestampType) => Some(TimestampType) | ||
| case (StringType, NullType) => Some(StringType) | ||
| case (NullType, StringType) => Some(StringType) | ||
| case (StringType | TimestampType, r: NumericType) => Some(DoubleType) | ||
| case (l: NumericType, StringType | TimestampType) => Some(DoubleType) | ||
| case (l: StringType, r: AtomicType) if r != StringType => Some(r) | ||
| case (l: AtomicType, r: StringType) if l != StringType => Some(l) | ||
| case _ => None | ||
| } | ||
|
|
||
| /** | ||
| * Case 2 type widening (see the classdoc comment above for TypeCoercion). | ||
| * | ||
|
|
@@ -325,17 +355,18 @@ object TypeCoercion { | |
| } | ||
| } | ||
|
|
||
| private def castExpr(expr: Expression, targetType: DataType): Expression = { | ||
| (expr.dataType, targetType) match { | ||
| case (NullType, dt) => Literal.create(null, targetType) | ||
| case (l, dt) if (l != dt) => Cast(expr, targetType) | ||
| case _ => expr | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Promotes strings that appear in arithmetic expressions. | ||
| */ | ||
| object PromoteStrings extends Rule[LogicalPlan] { | ||
| private def castExpr(expr: Expression, targetType: DataType): Expression = { | ||
| (expr.dataType, targetType) match { | ||
| case (NullType, dt) => Literal.create(null, targetType) | ||
| case (l, dt) if (l != dt) => Cast(expr, targetType) | ||
| case _ => expr | ||
| } | ||
| } | ||
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
| // Skip nodes who's children have not been resolved yet. | ||
|
|
@@ -352,7 +383,6 @@ object TypeCoercion { | |
| p.makeCopy(Array(Cast(left, TimestampType), right)) | ||
| case p @ Equality(left @ TimestampType(), right @ StringType()) => | ||
| p.makeCopy(Array(left, Cast(right, TimestampType))) | ||
|
|
||
| case p @ BinaryComparison(left, right) | ||
| if findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined => | ||
| val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType).get | ||
|
|
@@ -372,6 +402,52 @@ object TypeCoercion { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Promotes strings that appear in arithmetic expressions to compatible with Hive. | ||
| */ | ||
| object HivePromoteStrings extends Rule[LogicalPlan] { | ||
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
| // Skip nodes who's children have not been resolved yet. | ||
| case e if !e.childrenResolved => e | ||
|
|
||
| case a @ BinaryArithmetic(left @ StringType(), right) => | ||
| a.makeCopy(Array(Cast(left, DoubleType), right)) | ||
| case a @ BinaryArithmetic(left, right @ StringType()) => | ||
| a.makeCopy(Array(left, Cast(right, DoubleType))) | ||
|
|
||
| case p @ Equality(left, right) | ||
| if findCommonTypeToCompatibleWithHive(left.dataType, right.dataType).isDefined => | ||
| val commonType = findCommonTypeToCompatibleWithHive(left.dataType, right.dataType).get | ||
| p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType))) | ||
| case p @ BinaryComparison(left, right) | ||
| if findCommonTypeToCompatibleWithHive(left.dataType, right.dataType).isDefined => | ||
| val commonType = findCommonTypeToCompatibleWithHive(left.dataType, right.dataType).get | ||
| p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType))) | ||
|
|
||
| case Abs(e @ StringType()) => Abs(Cast(e, DoubleType)) | ||
| case Sum(e @ StringType()) => Sum(Cast(e, DoubleType)) | ||
| case Average(e @ StringType()) => Average(Cast(e, DoubleType)) | ||
| case StddevPop(e @ StringType()) => StddevPop(Cast(e, DoubleType)) | ||
| case StddevSamp(e @ StringType()) => StddevSamp(Cast(e, DoubleType)) | ||
| case UnaryMinus(e @ StringType()) => UnaryMinus(Cast(e, DoubleType)) | ||
| case UnaryPositive(e @ StringType()) => UnaryPositive(Cast(e, DoubleType)) | ||
| case VariancePop(e @ StringType()) => VariancePop(Cast(e, DoubleType)) | ||
| case VarianceSamp(e @ StringType()) => VarianceSamp(Cast(e, DoubleType)) | ||
| case Skewness(e @ StringType()) => Skewness(Cast(e, DoubleType)) | ||
| case Kurtosis(e @ StringType()) => Kurtosis(Cast(e, DoubleType)) | ||
| } | ||
| } | ||
|
|
||
| private def flattenExpr(expr: Expression): Seq[Expression] = { | ||
| expr match { | ||
| // Multi columns in IN clause is represented as a CreateNamedStruct. | ||
| // flatten the named struct to get the list of expressions. | ||
| case cns: CreateNamedStruct => cns.valExprs | ||
| case expr => Seq(expr) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Handles type coercion for both IN expression with subquery and IN | ||
| * expressions without subquery. | ||
|
|
@@ -387,14 +463,6 @@ object TypeCoercion { | |
| * Analysis Exception will be raised at the type checking phase. | ||
| */ | ||
| object InConversion extends Rule[LogicalPlan] { | ||
| private def flattenExpr(expr: Expression): Seq[Expression] = { | ||
| expr match { | ||
| // Multi columns in IN clause is represented as a CreateNamedStruct. | ||
| // flatten the named struct to get the list of expressions. | ||
| case cns: CreateNamedStruct => cns.valExprs | ||
| case expr => Seq(expr) | ||
| } | ||
| } | ||
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
| // Skip nodes who's children have not been resolved yet. | ||
|
|
@@ -412,7 +480,66 @@ object TypeCoercion { | |
|
|
||
| val commonTypes = lhs.zip(rhs).flatMap { case (l, r) => | ||
| findCommonTypeForBinaryComparison(l.dataType, r.dataType) | ||
| .orElse(findTightestCommonType(l.dataType, r.dataType)) | ||
|
||
|
|
||
| } | ||
|
|
||
| // The number of columns/expressions must match between LHS and RHS of an | ||
| // IN subquery expression. | ||
| if (commonTypes.length == lhs.length) { | ||
| val castedRhs = rhs.zip(commonTypes).map { | ||
| case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), e.name)() | ||
| case (e, _) => e | ||
| } | ||
| val castedLhs = lhs.zip(commonTypes).map { | ||
| case (e, dt) if e.dataType != dt => Cast(e, dt) | ||
| case (e, _) => e | ||
| } | ||
|
|
||
| // Before constructing the In expression, wrap the multi values in LHS | ||
| // in a CreatedNamedStruct. | ||
| val newLhs = castedLhs match { | ||
| case Seq(lhs) => lhs | ||
| case _ => CreateStruct(castedLhs) | ||
| } | ||
|
|
||
| val newSub = Project(castedRhs, sub) | ||
| In(newLhs, Seq(ListQuery(newSub, children, exprId, newSub.output))) | ||
| } else { | ||
| i | ||
| } | ||
|
|
||
| case i @ In(a, b) if b.exists(_.dataType != a.dataType) => | ||
| findWiderCommonType(i.children.map(_.dataType)) match { | ||
| case Some(finalDataType) => i.withNewChildren(i.children.map(Cast(_, finalDataType))) | ||
| case None => i | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Handles type coercion for IN expression to compatible with Hive. | ||
| */ | ||
| object HiveInConversion extends Rule[LogicalPlan] { | ||
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
| // Skip nodes who's children have not been resolved yet. | ||
| case e if !e.childrenResolved => e | ||
|
|
||
| // Handle type casting required between value expression and subquery output | ||
| // in IN subquery. | ||
| case i @ In(a, Seq(ListQuery(sub, children, exprId, _))) | ||
| if !i.resolved && flattenExpr(a).length == sub.output.length => | ||
| // LHS is the value expression of IN subquery. | ||
| val lhs = flattenExpr(a) | ||
|
|
||
| // RHS is the subquery output. | ||
| val rhs = sub.output | ||
|
|
||
| val commonTypes = lhs.zip(rhs).flatMap { case (l, r) => | ||
| findCommonTypeToCompatibleWithHive(l.dataType, r.dataType) | ||
| .orElse(findTightestCommonType(l.dataType, r.dataType)) | ||
|
|
||
| } | ||
|
|
||
| // The number of columns/expressions must match between LHS and RHS of an | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -925,6 +925,18 @@ object SQLConf { | |
| .intConf | ||
| .createWithDefault(10000) | ||
|
|
||
| val typeCoercionMode = | ||
| buildConf("spark.sql.typeCoercion.mode") | ||
| .doc("The 'legacy' typeCoercion mode was used in spark prior to 2.3, " + | ||
| "and so it continues to be the default to avoid breaking behavior. " + | ||
| "However, it has logical inconsistencies. " + | ||
| "The 'hive' mode is preferred for most new applications, " + | ||
| "though it may require additional manual casting.") | ||
|
||
| .stringConf | ||
| .transform(_.toLowerCase(Locale.ROOT)) | ||
| .checkValues(Set("legacy", "hive")) | ||
| .createWithDefault("legacy") | ||
|
|
||
| object Deprecated { | ||
| val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" | ||
| } | ||
|
|
@@ -1203,6 +1215,8 @@ class SQLConf extends Serializable with Logging { | |
|
|
||
| def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH) | ||
|
|
||
| def isHiveTypeCoercionMode: Boolean = getConf(SQLConf.typeCoercionMode).equals("hive") | ||
|
|
||
| /** ********************** SQLConf functionality methods ************ */ | ||
|
|
||
| /** Set Spark SQL configuration properties. */ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
->
default