From 3c619dfb94723bd7a7d6a0811ab6329bf107f81b Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 19 Mar 2017 22:42:25 +0900 Subject: [PATCH 1/4] Pivot with timestamp and count should not print internal representation --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 ++++-- .../scala/org/apache/spark/sql/DataFramePivotSuite.scala | 9 +++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 8cf407382619..4f5b6fb3e5c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -486,14 +486,16 @@ class Analyzer( case Pivot(groupByExprs, pivotColumn, pivotValues, aggregates, child) => val singleAgg = aggregates.size == 1 def outputName(value: Literal, aggregate: Expression): String = { + val scalaValue = CatalystTypeConverters.convertToScala(value.value, value.dataType) + val stringValue = Option(scalaValue).getOrElse("null").toString if (singleAgg) { - value.toString + stringValue } else { val suffix = aggregate match { case n: NamedExpression => n.name case _ => toPrettySQL(aggregate) } - value + "_" + suffix + stringValue + "_" + suffix } } if (aggregates.forall(a => PivotFirst.supportsDataType(a.dataType))) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index ca3cb5676742..840faf8c2ac6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -230,4 +230,13 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext{ .groupBy($"a").pivot("a").agg(min($"b")), Row(null, Seq(null, 7), null) :: Row(1, null, Seq(1, 7)) :: Nil) } + + test("pivot with timestamp and count should not print internal representation") { + val timestamp = "2012-12-31 16:00:10.011" + val df = Seq(java.sql.Timestamp.valueOf(timestamp)).toDF("a").groupBy("a").pivot("a").count() + val expected = StructType( + StructField("a", TimestampType) :: + StructField(timestamp, LongType) :: Nil) + assert(df.schema == expected) + } } From 93f05f3545d9af335ca1f6c711b6f84b9938b95e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 22 Mar 2017 10:55:58 +0900 Subject: [PATCH 2/4] Consider timezone in external representation in TimestampType --- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- .../apache/spark/sql/DataFramePivotSuite.scala | 16 ++++++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4f5b6fb3e5c7..342668dbbb37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -486,8 +486,8 @@ class Analyzer( case Pivot(groupByExprs, pivotColumn, pivotValues, aggregates, child) => val singleAgg = aggregates.size == 1 def outputName(value: Literal, aggregate: Expression): String = { - val scalaValue = CatalystTypeConverters.convertToScala(value.value, value.dataType) - val stringValue = Option(scalaValue).getOrElse("null").toString + val utf8val = Cast(value, StringType, Some(conf.sessionLocalTimeZone)).eval(EmptyRow) + val stringValue: String = Option(utf8val).map(_.toString).getOrElse("null") if (singleAgg) { stringValue } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index 840faf8c2ac6..d4c87b23da48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -232,11 +232,15 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext{ } test("pivot with timestamp and count should not print internal representation") { - val timestamp = "2012-12-31 16:00:10.011" - val df = Seq(java.sql.Timestamp.valueOf(timestamp)).toDF("a").groupBy("a").pivot("a").count() - val expected = StructType( - StructField("a", TimestampType) :: - StructField(timestamp, LongType) :: Nil) - assert(df.schema == expected) + val ts = "2012-12-31 16:00:10.011" + val tsWithZone = "2013-01-01 00:00:10.011" + + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") { + val df = Seq(java.sql.Timestamp.valueOf(ts)).toDF("a").groupBy("a").pivot("a").count() + val expected = StructType( + StructField("a", TimestampType) :: + StructField(tsWithZone, LongType) :: Nil) + assert(df.schema == expected) + } } } From 4e4cfa76727cc213f71d2f9b5bf2bf7c5905c54e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 22 Mar 2017 11:08:38 +0900 Subject: [PATCH 3/4] Make the name consistent --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 342668dbbb37..4db5c48ef3bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -486,8 +486,8 @@ class Analyzer( case Pivot(groupByExprs, pivotColumn, pivotValues, aggregates, child) => val singleAgg = aggregates.size == 1 def outputName(value: Literal, aggregate: Expression): String = { - val utf8val = Cast(value, StringType, Some(conf.sessionLocalTimeZone)).eval(EmptyRow) - val stringValue: String = Option(utf8val).map(_.toString).getOrElse("null") + val utf8Value = Cast(value, StringType, Some(conf.sessionLocalTimeZone)).eval(EmptyRow) + val stringValue: String = Option(utf8Value).map(_.toString).getOrElse("null") if (singleAgg) { stringValue } else { From 803a094a7b91ca73853e481c896edb5e5c0e2b80 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 22 Mar 2017 22:22:16 +0900 Subject: [PATCH 4/4] Fix tests to make sure string representation are the same --- .../scala/org/apache/spark/sql/DataFramePivotSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index d4c87b23da48..6ca9ee57e8f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ -class DataFramePivotSuite extends QueryTest with SharedSQLContext{ +class DataFramePivotSuite extends QueryTest with SharedSQLContext { import testImplicits._ test("pivot courses") { @@ -241,6 +241,9 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext{ StructField("a", TimestampType) :: StructField(tsWithZone, LongType) :: Nil) assert(df.schema == expected) + // String representation of timestamp with timezone should take the time difference + // into account. + checkAnswer(df.select($"a".cast(StringType)), Row(tsWithZone)) } } }