From 403809c5c02e56e5c96d459a37dcc5aedf2df76b Mon Sep 17 00:00:00 2001 From: OopsOutOfMemory Date: Fri, 5 Jun 2015 14:51:23 +0800 Subject: [PATCH 1/6] promote non-string to string when can not found tighestCommonTypeOfTwo --- .../catalyst/analysis/HiveTypeCoercion.scala | 43 +++++++++++++++++-- .../org/apache/spark/sql/SQLQuerySuite.scala | 10 +++++ 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 6ed192360dd6..aded64a27466 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -67,6 +67,42 @@ object HiveTypeCoercion { }) } + /** + * Implicit promote the DataType to StringType if + * one of the data type in (dt1, dt2) is StringType, and the other is not either + * BooleanType or BinaryType, the TightestCommonType should be StringType + * eg: 1. CaseWhenLike case when ... then dt1 else dt2 end + * 2. Coalesce(null, dt1, dt2) + + */ + val promoteToStringTypeOfTwo: (DataType, DataType) => Option[DataType] = { + case (t1: StringType, t2: DataType) if (t2 != BinaryType && t2 != BooleanType) => + Some(StringType) + + case (t1: DataType, t2: StringType) if (t1 != BinaryType && t1 != BooleanType) => + Some(StringType) + + case _ => None + } + + /** + * Find the tightest common type of a set of types by continuously applying + * `findTightestCommonTypeOfTwo` on these types. + */ + private def findTightestCommonTypeAndTryPromoteToString(types: Seq[DataType]) = { + types.foldLeft[Option[DataType]](Some(NullType))((r, c) => r match { + case None => None + case Some(d) => + val dt = findTightestCommonTypeOfTwo(d, c) + if (dt == None) { + promoteToStringTypeOfTwo(d, c) + } else { + dt + } + }) + } + + /** * Find the tightest common type of a set of types by continuously applying * `findTightestCommonTypeOfTwo` on these types. @@ -599,7 +635,7 @@ trait HiveTypeCoercion { // compatible with every child column. case Coalesce(es) if es.map(_.dataType).distinct.size > 1 => val types = es.map(_.dataType) - findTightestCommonType(types) match { + findTightestCommonTypeAndTryPromoteToString(types) match { case Some(finalDataType) => Coalesce(es.map(Cast(_, finalDataType))) case None => sys.error(s"Could not determine return type of Coalesce for ${types.mkString(",")}") @@ -634,7 +670,7 @@ trait HiveTypeCoercion { def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { case c: CaseWhenLike if c.childrenResolved && !c.valueTypesEqual => logDebug(s"Input values for null casting ${c.valueTypes.mkString(",")}") - val maybeCommonType = findTightestCommonType(c.valueTypes) + val maybeCommonType = findTightestCommonTypeAndTryPromoteToString(c.valueTypes) maybeCommonType.map { commonType => val castedBranches = c.branches.grouped(2).map { case Seq(when, value) if value.dataType != commonType => @@ -650,7 +686,8 @@ trait HiveTypeCoercion { }.getOrElse(c) case c: CaseKeyWhen if c.childrenResolved && !c.resolved => - val maybeCommonType = findTightestCommonType((c.key +: c.whenList).map(_.dataType)) + val maybeCommonType = + findTightestCommonTypeAndTryPromoteToString((c.key +: c.whenList).map(_.dataType)) maybeCommonType.map { commonType => val castedBranches = c.branches.grouped(2).map { case Seq(when, then) if when.dataType != commonType => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 14ecd4e9a77d..e29bdf3b8f43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -45,6 +45,16 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { Row("one", 6) :: Row("three", 3) :: Nil) } + test("SPARK-8010: promote numeric to string") { + val df = Seq((1,1)).toDF("key","value") + df.registerTempTable("src") + val queryCaseWhen = sql("select case when true then 1.0 else '1' end from src ") + val queryCoalesce = sql("select coalesce(null, 1, '1') from src ") + + checkAnswer(queryCaseWhen, Row("1.0") :: Nil) + checkAnswer(queryCoalesce, Row("1") :: Nil) + } + test("SPARK-6743: no columns from cache") { Seq( (83, 0, 38), From 95cbd58a36b6e947ac7ded9fd5282caed4a6d870 Mon Sep 17 00:00:00 2001 From: OopsOutOfMemory Date: Fri, 5 Jun 2015 14:57:07 +0800 Subject: [PATCH 2/6] fix style --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e29bdf3b8f43..0b265f34055e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -46,7 +46,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { } test("SPARK-8010: promote numeric to string") { - val df = Seq((1,1)).toDF("key","value") + val df = Seq((1, 1)).toDF("key", "value") df.registerTempTable("src") val queryCaseWhen = sql("select case when true then 1.0 else '1' end from src ") val queryCoalesce = sql("select coalesce(null, 1, '1') from src ") From df365d22b06d85d53240f9f7c1ed5ae3b96bca15 Mon Sep 17 00:00:00 2001 From: OopsOutOfMemory Date: Fri, 5 Jun 2015 17:35:00 +0800 Subject: [PATCH 3/6] refine --- .../sql/catalyst/analysis/HiveTypeCoercion.scala | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index aded64a27466..804eb34ace89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -85,20 +85,11 @@ object HiveTypeCoercion { case _ => None } - /** - * Find the tightest common type of a set of types by continuously applying - * `findTightestCommonTypeOfTwo` on these types. - */ private def findTightestCommonTypeAndTryPromoteToString(types: Seq[DataType]) = { types.foldLeft[Option[DataType]](Some(NullType))((r, c) => r match { case None => None case Some(d) => - val dt = findTightestCommonTypeOfTwo(d, c) - if (dt == None) { - promoteToStringTypeOfTwo(d, c) - } else { - dt - } + findTightestCommonTypeOfTwo(d, c).orElse(promoteToStringTypeOfTwo(d, c)) }) } From 4cd561819b3270317302ad43803091d879bd771f Mon Sep 17 00:00:00 2001 From: OopsOutOfMemory Date: Fri, 5 Jun 2015 23:26:10 +0800 Subject: [PATCH 4/6] limit the data type to primitive type --- .../sql/catalyst/analysis/HiveTypeCoercion.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 804eb34ace89..31009b2e47fb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -68,19 +68,18 @@ object HiveTypeCoercion { } /** - * Implicit promote the DataType to StringType if + * Implicit promote the AtomicType to StringType if * one of the data type in (dt1, dt2) is StringType, and the other is not either * BooleanType or BinaryType, the TightestCommonType should be StringType * eg: 1. CaseWhenLike case when ... then dt1 else dt2 end * 2. Coalesce(null, dt1, dt2) - */ val promoteToStringTypeOfTwo: (DataType, DataType) => Option[DataType] = { - case (t1: StringType, t2: DataType) if (t2 != BinaryType && t2 != BooleanType) => - Some(StringType) + case (t1: StringType, t2: AtomicType) if (t2 != BinaryType && t2 != BooleanType) => + Some(StringType) - case (t1: DataType, t2: StringType) if (t1 != BinaryType && t1 != BooleanType) => - Some(StringType) + case (t1: AtomicType, t2: StringType) if (t1 != BinaryType && t1 != BooleanType) => + Some(StringType) case _ => None } From 6018613e7fdf48a5f42e86a67954f1abd10b4e0d Mon Sep 17 00:00:00 2001 From: OopsOutOfMemory Date: Mon, 8 Jun 2015 11:14:45 +0800 Subject: [PATCH 5/6] convert function to method --- .../sql/catalyst/analysis/HiveTypeCoercion.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 31009b2e47fb..c88fdc90d23a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -74,7 +74,7 @@ object HiveTypeCoercion { * eg: 1. CaseWhenLike case when ... then dt1 else dt2 end * 2. Coalesce(null, dt1, dt2) */ - val promoteToStringTypeOfTwo: (DataType, DataType) => Option[DataType] = { + private def promoteToStringType(t1: DataType, t2: DataType): Option[DataType] = (t1, t2) match { case (t1: StringType, t2: AtomicType) if (t2 != BinaryType && t2 != BooleanType) => Some(StringType) @@ -84,11 +84,11 @@ object HiveTypeCoercion { case _ => None } - private def findTightestCommonTypeAndTryPromoteToString(types: Seq[DataType]) = { + private def findTightestCommonTypeAndPromoteToString(types: Seq[DataType]): Option[DataType] = { types.foldLeft[Option[DataType]](Some(NullType))((r, c) => r match { case None => None case Some(d) => - findTightestCommonTypeOfTwo(d, c).orElse(promoteToStringTypeOfTwo(d, c)) + findTightestCommonTypeOfTwo(d, c).orElse(promoteToStringType(d, c)) }) } @@ -625,7 +625,7 @@ trait HiveTypeCoercion { // compatible with every child column. case Coalesce(es) if es.map(_.dataType).distinct.size > 1 => val types = es.map(_.dataType) - findTightestCommonTypeAndTryPromoteToString(types) match { + findTightestCommonTypeAndPromoteToString(types) match { case Some(finalDataType) => Coalesce(es.map(Cast(_, finalDataType))) case None => sys.error(s"Could not determine return type of Coalesce for ${types.mkString(",")}") @@ -660,7 +660,7 @@ trait HiveTypeCoercion { def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { case c: CaseWhenLike if c.childrenResolved && !c.valueTypesEqual => logDebug(s"Input values for null casting ${c.valueTypes.mkString(",")}") - val maybeCommonType = findTightestCommonTypeAndTryPromoteToString(c.valueTypes) + val maybeCommonType = findTightestCommonTypeAndPromoteToString(c.valueTypes) maybeCommonType.map { commonType => val castedBranches = c.branches.grouped(2).map { case Seq(when, value) if value.dataType != commonType => @@ -677,7 +677,7 @@ trait HiveTypeCoercion { case c: CaseKeyWhen if c.childrenResolved && !c.resolved => val maybeCommonType = - findTightestCommonTypeAndTryPromoteToString((c.key +: c.whenList).map(_.dataType)) + findTightestCommonTypeAndPromoteToString((c.key +: c.whenList).map(_.dataType)) maybeCommonType.map { commonType => val castedBranches = c.branches.grouped(2).map { case Seq(when, then) if when.dataType != commonType => From 7a209d7d02af0fc6427a967ac35145bc5e3e3383 Mon Sep 17 00:00:00 2001 From: OopsOutOfMemory Date: Fri, 12 Jun 2015 14:53:47 +0800 Subject: [PATCH 6/6] rebase master --- .../catalyst/analysis/HiveTypeCoercion.scala | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index c88fdc90d23a..7858c94fef0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -68,27 +68,14 @@ object HiveTypeCoercion { } /** - * Implicit promote the AtomicType to StringType if - * one of the data type in (dt1, dt2) is StringType, and the other is not either - * BooleanType or BinaryType, the TightestCommonType should be StringType - * eg: 1. CaseWhenLike case when ... then dt1 else dt2 end - * 2. Coalesce(null, dt1, dt2) + * Similar to [[findTightestCommonType]], if can not find the TightestCommonType, try to use + * [[findTightestCommonTypeToString]] to find the TightestCommonType. */ - private def promoteToStringType(t1: DataType, t2: DataType): Option[DataType] = (t1, t2) match { - case (t1: StringType, t2: AtomicType) if (t2 != BinaryType && t2 != BooleanType) => - Some(StringType) - - case (t1: AtomicType, t2: StringType) if (t1 != BinaryType && t1 != BooleanType) => - Some(StringType) - - case _ => None - } - private def findTightestCommonTypeAndPromoteToString(types: Seq[DataType]): Option[DataType] = { types.foldLeft[Option[DataType]](Some(NullType))((r, c) => r match { case None => None case Some(d) => - findTightestCommonTypeOfTwo(d, c).orElse(promoteToStringType(d, c)) + findTightestCommonTypeOfTwo(d, c).orElse(findTightestCommonTypeToString(d, c)) }) }