Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}

import org.apache.spark.sql.Row
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.execution.command.{DescribeCommandBase, ExecutedCommandExec, ShowTablesCommand, ShowViewsCommand}
import org.apache.spark.sql.execution.datasources.v2.{DescribeTableExec, ShowTablesExec}
Expand All @@ -37,30 +37,45 @@ object HiveResult {
* Returns the result as a hive compatible sequence of strings. This is used in tests and
* `SparkSQLDriver` for CLI applications.
*/
def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan match {
case ExecutedCommandExec(_: DescribeCommandBase) =>
formatDescribeTableOutput(executedPlan.executeCollectPublic())
case _: DescribeTableExec =>
formatDescribeTableOutput(executedPlan.executeCollectPublic())
// SHOW TABLES in Hive only output table names while our v1 command outputs
// database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
command.executeCollect().map(_.getString(1))
// SHOW TABLES in Hive only output table names while our v2 command outputs
// namespace and table name.
case command : ShowTablesExec =>
command.executeCollect().map(_.getString(1))
// SHOW VIEWS in Hive only outputs view names while our v1 command outputs
// namespace, viewName, and isTemporary.
case command @ ExecutedCommandExec(_: ShowViewsCommand) =>
command.executeCollect().map(_.getString(1))
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = executedPlan.output.map(_.dataType)
// Reformat to match hive tab delimited output.
result.map(_.zip(types).map(e => toHiveString(e)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk, thinking about what I just wrote to @cloud-fan, would the toHiveString here already handle conversion using the correct session timezone, not the JVM timezone?
Or is there some other case that doesn't work? E.g. about hybrid vs. proleptic calendar?

If that is the case, then we should also set the DATETIME_JAVA8API_ENABLED in the withLocalProperties around Thriftserver JDBC/ODBC operations, to make it work correctly also there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is in dateFormatter. Currently, its legacy formatter which is used for java.sql.Date doesn't respect to the Spark session time zone and depends on JVM time zone. It works fine for Java 8 LocalDate and respect the session time zone.

I tried to fix the issue in #28709 but the fix brings more troubles than fixes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do believe the proper solution is to switch to Java 8 Api.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do date formatting depend on timezone for output formatting?
I thought that timezone is only needed for date parsing, for special cases such as 'now' or 'today' or 'yesterday'?
Or is it the hybrid/proleptic calendar output formatting depend on the timezone?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following that example:

$ export TZ="Europe/Moscow"
$ ./bin/spark-sql -S
spark-sql> set spark.sql.session.timeZone=America/Los_Angeles;
spark.sql.session.timeZone	America/Los_Angeles
spark-sql> select date '2020-06-03';
2020-06-02
spark-sql> select make_date(2020, 6, 3);
2020-06-02

Could you explain why does the make_date(2020, 6, 3) -> 2020-06-02 happens?
Does make_date create a date of midnight 2020-6-3 in Moscow TZ, and it gets returned in America/Los_Angeles, where it is still 2020-6-2?
Could you explain step by step with examples what type and what timezone is used during parsing, during collecting, and for the string display before and after the changes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • The date literal '2020-06-03' (and make_date(2020, 6, 3)) is converted to the number of days since the epoch '1970-01-01'. The result is 18416, and it doesn't depend on time zone. You get the same via Java 8 API:
scala> println(LocalDate.of(2020, 6, 3).toEpochDay)
18416

The number is stored as date value internally in Spark.

  • To print it out, we should collect it and convert to string. The following steps are for Java 8 OFF:
  1. The days are converted to java.sql.Date by toJavaDate() which is called from

    override def toScala(catalystValue: Any): Date =
    if (catalystValue == null) null else DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int])
    override def toScalaImpl(row: InternalRow, column: Int): Date =
    DateTimeUtils.toJavaDate(row.getInt(column))
    .

  2. toJavaDate() has to create an instance of java.sql.Date from milliseconds since the epoch 1970-01-01 00:00:00Z in UTC time zone. It converts the days 18416 to milliseconds via 18416 * 86400000 and gets 1591142400000.

  3. 1591142400000 is interpreted as local milliseconds in the JVM time zone Europe/Moscow which has wall clock offset of 10800000 millis or 3 hours. So, 1591142400000 is shifted by 10800000 to get "UTC timestamp". The result is 1591131600000 which is:

    • 2020-06-02T21:00:00 in UTC
    • 2020-06-03T00:00:00 in Europe/Moscow
    • 2020-06-02T14:00:00 in America/Los_Angeles
  4. new Date(1591131600000) is collected and formatted in toHiveString by using the legacy date formatter. Currently, the legacy date formatter ignores Spark session time zone America/Los_Angeles and uses JVM time zone Europe/Moscow. In this way, it converts new Date(1591131600000) = 2020-06-03T00:00:00 in Europe/Moscow to 2020-06-03. Looks fine but after this PR [SPARK-31901][SQL] Use the session time zone in legacy date formatters #28709, it takes America/Los_Angeles and performs the conversion 2020-06-02T14:00:00 America/Los_Angeles to 2020-06-02

So, the problem is in toJavaDate() which still uses the default JVM time zone.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And one more nuance, the legacy type java.sql.Date is not local date as Java 8 type java.time.LocalDate is. It is actually a timestamp in UTC linked to the JVM time zone. Using it as a local date is not good idea at all but this is Spark's legacy code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So everything is OK now? Enabling jave 8 time API is only for better performance and support negative year?

BTW I doubt if we can support negative year in thriftserver. Even if the server-side can generate the datetime string correctly. The client-side parse the string using Timestamp.of which doesn't support negative year.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything is ok when JVM time zone on executors (where toJavaDate is called) is equal to JVM time zone on the driver (where HiveResult is initialized). And both JVM time zones are equal to Spark's session time zone.

.map(_.mkString("\t"))
def hiveResultString(ds: Dataset[_]): Seq[String] = {
val executedPlan = ds.queryExecution.executedPlan
executedPlan match {
case ExecutedCommandExec(_: DescribeCommandBase) =>
formatDescribeTableOutput(executedPlan.executeCollectPublic())
case _: DescribeTableExec =>
formatDescribeTableOutput(executedPlan.executeCollectPublic())
// SHOW TABLES in Hive only output table names while our v1 command outputs
// database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
command.executeCollect().map(_.getString(1))
// SHOW TABLES in Hive only output table names while our v2 command outputs
// namespace and table name.
case command : ShowTablesExec =>
command.executeCollect().map(_.getString(1))
// SHOW VIEWS in Hive only outputs view names while our v1 command outputs
// namespace, viewName, and isTemporary.
case command @ ExecutedCommandExec(_: ShowViewsCommand) =>
command.executeCollect().map(_.getString(1))
case _ =>
val sessionWithJava8DatetimeEnabled = {
val cloned = ds.sparkSession.cloneSession()
cloned.conf.set(SQLConf.DATETIME_JAVA8API_ENABLED.key, true)
cloned
}
sessionWithJava8DatetimeEnabled.withActive {
// We cannot collect the original dataset because its encoders could be created
// with disabled Java 8 date-time API.
val result: Seq[Seq[Any]] = Dataset.ofRows(ds.sparkSession, ds.logicalPlan)
.queryExecution
.executedPlan
.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = executedPlan.output.map(_.dataType)
// Reformat to match hive tab delimited output.
result.map(_.zip(types).map(e => toHiveString(e)))
.map(_.mkString("\t"))
}
}
Comment on lines +60 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkSQLCLIDriver is the only non-test user of this function, and if we want the CLI to always use the new Java 8 date-time APIs, I think we could better explicitly set it there, rather than cloning the session, and cloning the Dataset here to do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. This also reminds me of #28671. Is it possible to always enable java 8 time API in the thrifter server?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I did originally in previous PR 916838a but somehow we came up to the conclusion of cloning session and set Java 8 Api in hiveResultString() locally.

}

private def formatDescribeTableOutput(rows: Array[Row]): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ select make_date(-44, 3, 15)
-- !query schema
struct<make_date(-44, 3, 15):date>
-- !query output
0045-03-15
-0044-03-15


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession {
val schema = df.schema.catalogString
// Get answer, but also get rid of the #1234 expression ids that show up in explain plans
val answer = SQLExecution.withNewExecutionId(df.queryExecution, Some(sql)) {
hiveResultString(df.queryExecution.executedPlan).map(replaceNotIncludedMsg)
hiveResultString(df).map(replaceNotIncludedMsg)
}

// If the output is not pre-sorted, sort it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,53 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.connector.InMemoryTableCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSparkSession}

class HiveResultSuite extends SharedSparkSession {
import testImplicits._

private def withOutstandingZoneIds(f: => Unit): Unit = {
for {
jvmZoneId <- outstandingZoneIds
sessionZoneId <- outstandingZoneIds
Comment on lines +30 to +31
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should test the cases when jvm default time zone and session time zone are not in sync.

} {
withDefaultTimeZone(jvmZoneId) {
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionZoneId.getId) {
f
}
}
}
}

test("date formatting in hive result") {
val dates = Seq("2018-12-28", "1582-10-03", "1582-10-04", "1582-10-15")
val df = dates.toDF("a").selectExpr("cast(a as date) as b")
val executedPlan1 = df.queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan1)
assert(result == dates)
val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan
val result2 = HiveResult.hiveResultString(executedPlan2)
assert(result2 == dates.map(x => s"[$x]"))
withOutstandingZoneIds {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After enabling Java 8 API, the result is always the same independently from JVM and Spark's session time zones. Before it wasn't possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, when Java 8 API is off, the test will fail because Java 7 based date/timestamp conversions depend on JVM time zone on the executors side and HiveResult side. If they are not in sync each other and with Spark session time zone, the result can be wrong.

val dates = Seq("2018-12-28", "1582-10-03", "1582-10-04", "1582-10-15")
val df = dates.toDF("a").selectExpr("cast(a as date) as b")
val result = HiveResult.hiveResultString(df)
assert(result == dates)
val df2 = df.selectExpr("array(b)")
val result2 = HiveResult.hiveResultString(df2)
assert(result2 == dates.map(x => s"[$x]"))
}
}

test("timestamp formatting in hive result") {
val timestamps = Seq(
"2018-12-28 01:02:03",
"1582-10-03 01:02:03",
"1582-10-04 01:02:03",
"1582-10-15 01:02:03")
val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b")
val executedPlan1 = df.queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan1)
assert(result == timestamps)
val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan
val result2 = HiveResult.hiveResultString(executedPlan2)
assert(result2 == timestamps.map(x => s"[$x]"))
withOutstandingZoneIds {
val timestamps = Seq(
"2018-12-28 01:02:03",
"1582-10-03 01:02:03",
"1582-10-04 01:02:03",
"1582-10-15 01:02:03")
val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b")
val result = HiveResult.hiveResultString(df)
assert(result == timestamps)
val df2 = df.selectExpr("array(b)")
val result2 = HiveResult.hiveResultString(df2)
assert(result2 == timestamps.map(x => s"[$x]"))
}
}

test("toHiveString correctly handles UDTs") {
Expand All @@ -58,15 +75,14 @@ class HiveResultSuite extends SharedSparkSession {
test("decimal formatting in hive result") {
val df = Seq(new java.math.BigDecimal("1")).toDS()
Seq(2, 6, 18).foreach { scala =>
val executedPlan =
df.selectExpr(s"CAST(value AS decimal(38, $scala))").queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan)
val decimalDf = df.selectExpr(s"CAST(value AS decimal(38, $scala))")
val result = HiveResult.hiveResultString(decimalDf)
assert(result.head.split("\\.").last.length === scala)
}

val executedPlan = Seq(java.math.BigDecimal.ZERO).toDS()
.selectExpr(s"CAST(value AS decimal(38, 8))").queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan)
val df2 = Seq(java.math.BigDecimal.ZERO).toDS()
.selectExpr(s"CAST(value AS decimal(38, 8))")
val result = HiveResult.hiveResultString(df2)
assert(result.head === "0.00000000")
}

Expand All @@ -77,8 +93,7 @@ class HiveResultSuite extends SharedSparkSession {
withTable(s"$ns.$tbl") {
spark.sql(s"CREATE TABLE $ns.$tbl (id bigint) USING $source")
val df = spark.sql(s"SHOW TABLES FROM $ns")
val executedPlan = df.queryExecution.executedPlan
assert(HiveResult.hiveResultString(executedPlan).head == tbl)
assert(HiveResult.hiveResultString(df).head == tbl)
}
}
}
Expand All @@ -91,11 +106,10 @@ class HiveResultSuite extends SharedSparkSession {
withTable(s"$ns.$tbl") {
spark.sql(s"CREATE TABLE $ns.$tbl (id bigint COMMENT 'col1') USING $source")
val df = spark.sql(s"DESCRIBE $ns.$tbl")
val executedPlan = df.queryExecution.executedPlan
val expected = "id " +
"\tbigint " +
"\tcol1 "
assert(HiveResult.hiveResultString(executedPlan).head == expected)
assert(HiveResult.hiveResultString(df).head == expected)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,14 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession {
withClue(s"Function '${info.getName}', Expression class '$className'") {
val example = info.getExamples
checkExampleSyntax(example)
example.split(" > ").toList.foreach {
example.split(" > ").toList.foreach(_ match {
case exampleRe(sql, output) =>
val df = clonedSpark.sql(sql)
val actual = unindentAndTrim(
hiveResultString(df.queryExecution.executedPlan).mkString("\n"))
val actual = unindentAndTrim(hiveResultString(df).mkString("\n"))
val expected = unindentAndTrim(output)
assert(actual === expected)
case _ =>
}
})
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont
// TODO unify the error code
try {
context.sparkContext.setJobDescription(command)
val execution = context.sessionState.executePlan(context.sql(command).logicalPlan)
val df = context.sql(command)
val execution = df.queryExecution
hiveResponse = SQLExecution.withNewExecutionId(execution) {
hiveResultString(execution.executedPlan)
hiveResultString(df)
}
tableSchema = getResultSetSchema(execution)
new CommandProcessorResponse(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,9 @@ abstract class HiveComparisonTest extends SparkFunSuite with BeforeAndAfterAll {
val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) =>
val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath))
def getResult(): Seq[String] = {
SQLExecution.withNewExecutionId(query)(hiveResultString(query.executedPlan))
SQLExecution.withNewExecutionId(query) {
hiveResultString(Dataset.ofRows(query.sparkSession, query.logical))
}
}
try { (query, prepareAnswer(query, getResult())) } catch {
case e: Throwable =>
Expand Down