From 4161c624294dbdcb86939350bba61bebf1d99505 Mon Sep 17 00:00:00 2001 From: Sunitha Kambhampati Date: Tue, 2 Jun 2020 11:30:30 +0000 Subject: [PATCH 01/13] [SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow ### What changes were proposed in this pull request? JIRA SPARK-28067: Wrong results are returned for aggregate sum with decimals with whole stage codegen enabled **Repro:** WholeStage enabled enabled -> Wrong results WholeStage disabled -> Returns exception Decimal precision 39 exceeds max precision 38 **Issues:** 1. Wrong results are returned which is bad 2. Inconsistency between whole stage enabled and disabled. **Cause:** Sum does not take care of possibility of overflow for the intermediate steps. ie the updateExpressions and mergeExpressions. This PR makes the following changes: - Add changes to check if overflow occurs for decimal in aggregate Sum and if there is an overflow, it will return null for the Sum operation when spark.sql.ansi.enabled is false. - When spark.sql.ansi.enabled is true, then the sum operation will return an exception if an overflow occurs for the decimal operation in Sum. - This is keeping it consistent with the behavior defined in spark.sql.ansi.enabled property **Before the fix: Scenario 1:** - WRONG RESULTS ``` scala> val df = Seq( | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int] scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum")) df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)] scala> df2.show(40,false) +---------------------------------------+ |sum(decNum) | +---------------------------------------+ |20000000000000000000.000000000000000000| +---------------------------------------+ ``` -- **Before fix: Scenario2: Setting spark.sql.ansi.enabled to true** - WRONG RESULTS ``` scala> spark.conf.set("spark.sql.ansi.enabled", "true") scala> val df = Seq( | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int] scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum")) df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)] scala> df2.show(40,false) +---------------------------------------+ |sum(decNum) | +---------------------------------------+ |20000000000000000000.000000000000000000| +---------------------------------------+ ``` **After the fix: Scenario1:** ``` scala> val df = Seq( | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int] scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum")) df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)] scala> df2.show(40,false) +-----------+ |sum(decNum)| +-----------+ |null | +-----------+ ``` **After fix: Scenario2: Setting the spark.sql.ansi.enabled to true:** ``` scala> spark.conf.set("spark.sql.ansi.enabled", "true") scala> val df = Seq( | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int] scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum")) df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)] scala> df2.show(40,false) 20/02/18 10:57:43 ERROR Executor: Exception in task 5.0 in stage 4.0 (TID 30) java.lang.ArithmeticException: Decimal(expanded,100000000000000000000.000000000000000000,39,18}) cannot be represented as Decimal(38, 18). ``` ### Why are the changes needed? The changes are needed in order to fix the wrong results that are returned for decimal aggregate sum. ### Does this PR introduce any user-facing change? User would see wrong results on aggregate sum that involved decimal overflow prior to this change, but now the user will see null. But if user enables the spark.sql.ansi.enabled flag to true, then the user will see an exception and not incorrect results. ### How was this patch tested? New test has been added and existing tests for sql, catalyst and hive suites were run ok. Closes #27627 from skambha/decaggfixwrongresults. Lead-authored-by: Sunitha Kambhampati Co-authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../catalyst/expressions/aggregate/Sum.scala | 72 ++++++++--- .../expressions/decimalExpressions.scala | 52 ++++++++ .../org/apache/spark/sql/DataFrameSuite.scala | 112 ++++++++++++++++-- 3 files changed, 211 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala index d2daaac72fc8..6e850267100f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala @@ -62,38 +62,74 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast private lazy val sum = AttributeReference("sum", sumDataType)() + private lazy val isEmpty = AttributeReference("isEmpty", BooleanType, nullable = false)() + private lazy val zero = Literal.default(sumDataType) - override lazy val aggBufferAttributes = sum :: Nil + override lazy val aggBufferAttributes = resultType match { + case _: DecimalType => sum :: isEmpty :: Nil + case _ => sum :: Nil + } - override lazy val initialValues: Seq[Expression] = Seq( - /* sum = */ Literal.create(null, sumDataType) - ) + override lazy val initialValues: Seq[Expression] = resultType match { + case _: DecimalType => Seq(Literal(null, resultType), Literal(true, BooleanType)) + case _ => Seq(Literal(null, resultType)) + } override lazy val updateExpressions: Seq[Expression] = { if (child.nullable) { - Seq( - /* sum = */ - coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum) - ) + val updateSumExpr = coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum) + resultType match { + case _: DecimalType => + Seq(updateSumExpr, isEmpty && child.isNull) + case _ => Seq(updateSumExpr) + } } else { - Seq( - /* sum = */ - coalesce(sum, zero) + child.cast(sumDataType) - ) + val updateSumExpr = coalesce(sum, zero) + child.cast(sumDataType) + resultType match { + case _: DecimalType => + Seq(updateSumExpr, Literal(false, BooleanType)) + case _ => Seq(updateSumExpr) + } } } + /** + * For decimal type: + * If isEmpty is false and if sum is null, then it means we have had an overflow. + * + * update of the sum is as follows: + * Check if either portion of the left.sum or right.sum has overflowed + * If it has, then the sum value will remain null. + * If it did not have overflow, then add the sum.left and sum.right + * + * isEmpty: Set to false if either one of the left or right is set to false. This + * means we have seen atleast a value that was not null. + */ override lazy val mergeExpressions: Seq[Expression] = { - Seq( - /* sum = */ - coalesce(coalesce(sum.left, zero) + sum.right, sum.left) - ) + val mergeSumExpr = coalesce(coalesce(sum.left, zero) + sum.right, sum.left) + resultType match { + case _: DecimalType => + val inputOverflow = !isEmpty.right && sum.right.isNull + val bufferOverflow = !isEmpty.left && sum.left.isNull + Seq( + If(inputOverflow || bufferOverflow, Literal.create(null, sumDataType), mergeSumExpr), + isEmpty.left && isEmpty.right) + case _ => Seq(mergeSumExpr) + } } + /** + * If the isEmpty is true, then it means there were no values to begin with or all the values + * were null, so the result will be null. + * If the isEmpty is false, then if sum is null that means an overflow has happened. + * So now, if ansi is enabled, then throw exception, if not then return null. + * If sum is not null, then return the sum. + */ override lazy val evaluateExpression: Expression = resultType match { - case d: DecimalType => CheckOverflow(sum, d, !SQLConf.get.ansiEnabled) + case d: DecimalType => + If(isEmpty, Literal.create(null, sumDataType), + CheckOverflowInSum(sum, d, !SQLConf.get.ansiEnabled)) case _ => sum } - } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala index c2c70b2ab08e..7e4560ab8161 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, EmptyBlock, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -144,3 +145,54 @@ case class CheckOverflow( override def sql: String = child.sql } + +// A variant `CheckOverflow`, which treats null as overflow. This is necessary in `Sum`. +case class CheckOverflowInSum( + child: Expression, + dataType: DecimalType, + nullOnOverflow: Boolean) extends UnaryExpression { + + override def nullable: Boolean = true + + override def eval(input: InternalRow): Any = { + val value = child.eval(input) + if (value == null) { + if (nullOnOverflow) null else throw new ArithmeticException("Overflow in sum of decimals.") + } else { + value.asInstanceOf[Decimal].toPrecision( + dataType.precision, + dataType.scale, + Decimal.ROUND_HALF_UP, + nullOnOverflow) + } + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val childGen = child.genCode(ctx) + val nullHandling = if (nullOnOverflow) { + "" + } else { + s""" + |throw new ArithmeticException("Overflow in sum of decimals."); + |""".stripMargin + } + val code = code""" + |${childGen.code} + |boolean ${ev.isNull} = ${childGen.isNull}; + |Decimal ${ev.value} = null; + |if (${childGen.isNull}) { + | $nullHandling + |} else { + | ${ev.value} = ${childGen.value}.toPrecision( + | ${dataType.precision}, ${dataType.scale}, Decimal.ROUND_HALF_UP(), $nullOnOverflow); + | ${ev.isNull} = ${ev.value} == null; + |} + |""".stripMargin + + ev.copy(code = code) + } + + override def toString: String = s"CheckOverflowInSum($child, $dataType, $nullOnOverflow)" + + override def sql: String = child.sql +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 954a4bd9331e..8359dff674a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -192,6 +192,28 @@ class DataFrameSuite extends QueryTest structDf.select(xxhash64($"a", $"record.*"))) } + private def assertDecimalSumOverflow( + df: DataFrame, ansiEnabled: Boolean, expectedAnswer: Row): Unit = { + if (!ansiEnabled) { + try { + checkAnswer(df, expectedAnswer) + } catch { + case e: SparkException if e.getCause.isInstanceOf[ArithmeticException] => + // This is an existing bug that we can write overflowed decimal to UnsafeRow but fail + // to read it. + assert(e.getCause.getMessage.contains("Decimal precision 39 exceeds max precision 38")) + } + } else { + val e = intercept[SparkException] { + df.collect + } + assert(e.getCause.isInstanceOf[ArithmeticException]) + assert(e.getCause.getMessage.contains("cannot be represented as Decimal") || + e.getCause.getMessage.contains("Overflow in sum of decimals") || + e.getCause.getMessage.contains("Decimal precision 39 exceeds max precision 38")) + } + } + test("SPARK-28224: Aggregate sum big decimal overflow") { val largeDecimals = spark.sparkContext.parallelize( DecimalData(BigDecimal("1"* 20 + ".123"), BigDecimal("1"* 20 + ".123")) :: @@ -200,14 +222,90 @@ class DataFrameSuite extends QueryTest Seq(true, false).foreach { ansiEnabled => withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled.toString)) { val structDf = largeDecimals.select("a").agg(sum("a")) - if (!ansiEnabled) { - checkAnswer(structDf, Row(null)) - } else { - val e = intercept[SparkException] { - structDf.collect + assertDecimalSumOverflow(structDf, ansiEnabled, Row(null)) + } + } + } + + test("SPARK-28067: sum of null decimal values") { + Seq("true", "false").foreach { wholeStageEnabled => + withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStageEnabled)) { + Seq("true", "false").foreach { ansiEnabled => + withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled)) { + val df = spark.range(1, 4, 1).select(expr(s"cast(null as decimal(38,18)) as d")) + checkAnswer(df.agg(sum($"d")), Row(null)) + } + } + } + } + } + + test("SPARK-28067: Aggregate sum should not return wrong results for decimal overflow") { + Seq("true", "false").foreach { wholeStageEnabled => + withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStageEnabled)) { + Seq(true, false).foreach { ansiEnabled => + withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled.toString)) { + val df0 = Seq( + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") + val df1 = Seq( + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") + val df = df0.union(df1) + val df2 = df.withColumnRenamed("decNum", "decNum2"). + join(df, "intNum").agg(sum("decNum")) + + val expectedAnswer = Row(null) + assertDecimalSumOverflow(df2, ansiEnabled, expectedAnswer) + + val decStr = "1" + "0" * 19 + val d1 = spark.range(0, 12, 1, 1) + val d2 = d1.select(expr(s"cast('$decStr' as decimal (38, 18)) as d")).agg(sum($"d")) + assertDecimalSumOverflow(d2, ansiEnabled, expectedAnswer) + + val d3 = spark.range(0, 1, 1, 1).union(spark.range(0, 11, 1, 1)) + val d4 = d3.select(expr(s"cast('$decStr' as decimal (38, 18)) as d")).agg(sum($"d")) + assertDecimalSumOverflow(d4, ansiEnabled, expectedAnswer) + + val d5 = d3.select(expr(s"cast('$decStr' as decimal (38, 18)) as d"), + lit(1).as("key")).groupBy("key").agg(sum($"d").alias("sumd")).select($"sumd") + assertDecimalSumOverflow(d5, ansiEnabled, expectedAnswer) + + val nullsDf = spark.range(1, 4, 1).select(expr(s"cast(null as decimal(38,18)) as d")) + + val largeDecimals = Seq(BigDecimal("1"* 20 + ".123"), BigDecimal("9"* 20 + ".123")). + toDF("d") + assertDecimalSumOverflow( + nullsDf.union(largeDecimals).agg(sum($"d")), ansiEnabled, expectedAnswer) + + val df3 = Seq( + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("50000000000000000000"), 1), + (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") + + val df4 = Seq( + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") + + val df5 = Seq( + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("20000000000000000000"), 2)).toDF("decNum", "intNum") + + val df6 = df3.union(df4).union(df5) + val df7 = df6.groupBy("intNum").agg(sum("decNum"), countDistinct("decNum")). + filter("intNum == 1") + assertDecimalSumOverflow(df7, ansiEnabled, Row(1, null, 2)) } - assert(e.getCause.getClass.equals(classOf[ArithmeticException])) - assert(e.getCause.getMessage.contains("cannot be represented as Decimal")) } } } From 00b355b97b9bb55d39f4e32e14c14b428cca6fda Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 2 Jun 2020 11:53:58 +0000 Subject: [PATCH 02/13] [SPARK-31888][SQL] Support `java.time.Instant` in Parquet filter pushdown ### What changes were proposed in this pull request? 1. Modified `ParquetFilters.valueCanMakeFilterOn()` to accept filters with `java.time.Instant` attributes. 2. Added `ParquetFilters.timestampToMicros()` to support both types `java.sql.Timestamp` and `java.time.Instant` in conversions to microseconds. 3. Re-used `timestampToMicros` in constructing of Parquet filters. ### Why are the changes needed? To support pushed down filters with `java.time.Instant` attributes. Before the changes, date filters are not pushed down to Parquet datasource when `spark.sql.datetime.java8API.enabled` is `true`. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Modified tests to `ParquetFilterSuite` to check the case when Java 8 API is enabled. Closes #28696 from MaxGekk/support-instant-parquet-filters. Authored-by: Max Gekk Signed-off-by: Wenchen Fan --- .../datasources/parquet/ParquetFilters.scala | 34 +++-- .../parquet/ParquetFilterSuite.scala | 119 ++++++++++-------- 2 files changed, 83 insertions(+), 70 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 7900693a8482..491977c61d3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong} import java.math.{BigDecimal => JBigDecimal} import java.sql.{Date, Timestamp} -import java.time.LocalDate +import java.time.{Instant, LocalDate} import java.util.Locale import scala.collection.JavaConverters.asScalaBufferConverter @@ -129,6 +129,11 @@ class ParquetFilters( case ld: LocalDate => DateTimeUtils.localDateToDays(ld) } + private def timestampToMicros(v: Any): JLong = v match { + case i: Instant => DateTimeUtils.instantToMicros(i) + case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) + } + private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue() private def decimalToInt64(decimal: JBigDecimal): JLong = decimal.unscaledValue().longValue() @@ -149,8 +154,7 @@ class ParquetFilters( } private def timestampToMillis(v: Any): JLong = { - val timestamp = v.asInstanceOf[Timestamp] - val micros = DateTimeUtils.fromJavaTimestamp(timestamp) + val micros = timestampToMicros(v) val millis = DateTimeUtils.microsToMillis(micros) millis.asInstanceOf[JLong] } @@ -186,8 +190,7 @@ class ParquetFilters( case ParquetTimestampMicrosType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.eq( longColumn(n), - Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp]) - .asInstanceOf[JLong]).orNull) + Option(v).map(timestampToMicros).orNull) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.eq( longColumn(n), @@ -237,8 +240,7 @@ class ParquetFilters( case ParquetTimestampMicrosType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.notEq( longColumn(n), - Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp]) - .asInstanceOf[JLong]).orNull) + Option(v).map(timestampToMicros).orNull) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.notEq( longColumn(n), @@ -280,9 +282,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.lt(intColumn(n), dateToDays(v).asInstanceOf[Integer]) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.lt( - longColumn(n), - DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMicros(v)) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMillis(v)) @@ -319,9 +319,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.ltEq(intColumn(n), dateToDays(v).asInstanceOf[Integer]) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.ltEq( - longColumn(n), - DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMicros(v)) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMillis(v)) @@ -358,9 +356,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.gt(intColumn(n), dateToDays(v).asInstanceOf[Integer]) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.gt( - longColumn(n), - DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMicros(v)) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMillis(v)) @@ -397,9 +393,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.gtEq(intColumn(n), dateToDays(v).asInstanceOf[Integer]) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.gtEq( - longColumn(n), - DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMicros(v)) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMillis(v)) @@ -475,7 +469,7 @@ class ParquetFilters( case ParquetDateType => value.isInstanceOf[Date] || value.isInstanceOf[LocalDate] case ParquetTimestampMicrosType | ParquetTimestampMillisType => - value.isInstanceOf[Timestamp] + value.isInstanceOf[Timestamp] || value.isInstanceOf[Instant] case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) => isDecimalMatched(value, decimalMeta) case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index c4cf5116c203..d20a07f420e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.math.{BigDecimal => JBigDecimal} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import java.time.LocalDate +import java.time.{LocalDate, LocalDateTime, ZoneId} import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators} import org.apache.parquet.filter2.predicate.FilterApi._ @@ -143,7 +143,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } - private def testTimestampPushdown(data: Seq[Timestamp]): Unit = { + private def testTimestampPushdown(data: Seq[String], java8Api: Boolean): Unit = { + implicit class StringToTs(s: String) { + def ts: Timestamp = Timestamp.valueOf(s) + } assert(data.size === 4) val ts1 = data.head val ts2 = data(1) @@ -151,7 +154,18 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared val ts4 = data(3) import testImplicits._ - withNestedDataFrame(data.map(i => Tuple1(i)).toDF()) { case (inputDF, colName, resultFun) => + val df = data.map(i => Tuple1(Timestamp.valueOf(i))).toDF() + withNestedDataFrame(df) { case (inputDF, colName, fun) => + def resultFun(tsStr: String): Any = { + val parsed = if (java8Api) { + LocalDateTime.parse(tsStr.replace(" ", "T")) + .atZone(ZoneId.systemDefault()) + .toInstant + } else { + Timestamp.valueOf(tsStr) + } + fun(parsed) + } withParquetDataFrame(inputDF) { implicit df => val tsAttr = df(colName).expr assert(df(colName).expr.dataType === TimestampType) @@ -160,26 +174,26 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared checkFilterPredicate(tsAttr.isNotNull, classOf[NotEq[_]], data.map(i => Row.apply(resultFun(i)))) - checkFilterPredicate(tsAttr === ts1, classOf[Eq[_]], resultFun(ts1)) - checkFilterPredicate(tsAttr <=> ts1, classOf[Eq[_]], resultFun(ts1)) - checkFilterPredicate(tsAttr =!= ts1, classOf[NotEq[_]], + checkFilterPredicate(tsAttr === ts1.ts, classOf[Eq[_]], resultFun(ts1)) + checkFilterPredicate(tsAttr <=> ts1.ts, classOf[Eq[_]], resultFun(ts1)) + checkFilterPredicate(tsAttr =!= ts1.ts, classOf[NotEq[_]], Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i)))) - checkFilterPredicate(tsAttr < ts2, classOf[Lt[_]], resultFun(ts1)) - checkFilterPredicate(tsAttr > ts1, classOf[Gt[_]], + checkFilterPredicate(tsAttr < ts2.ts, classOf[Lt[_]], resultFun(ts1)) + checkFilterPredicate(tsAttr > ts1.ts, classOf[Gt[_]], Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i)))) - checkFilterPredicate(tsAttr <= ts1, classOf[LtEq[_]], resultFun(ts1)) - checkFilterPredicate(tsAttr >= ts4, classOf[GtEq[_]], resultFun(ts4)) - - checkFilterPredicate(Literal(ts1) === tsAttr, classOf[Eq[_]], resultFun(ts1)) - checkFilterPredicate(Literal(ts1) <=> tsAttr, classOf[Eq[_]], resultFun(ts1)) - checkFilterPredicate(Literal(ts2) > tsAttr, classOf[Lt[_]], resultFun(ts1)) - checkFilterPredicate(Literal(ts3) < tsAttr, classOf[Gt[_]], resultFun(ts4)) - checkFilterPredicate(Literal(ts1) >= tsAttr, classOf[LtEq[_]], resultFun(ts1)) - checkFilterPredicate(Literal(ts4) <= tsAttr, classOf[GtEq[_]], resultFun(ts4)) - - checkFilterPredicate(!(tsAttr < ts4), classOf[GtEq[_]], resultFun(ts4)) - checkFilterPredicate(tsAttr < ts2 || tsAttr > ts3, classOf[Operators.Or], + checkFilterPredicate(tsAttr <= ts1.ts, classOf[LtEq[_]], resultFun(ts1)) + checkFilterPredicate(tsAttr >= ts4.ts, classOf[GtEq[_]], resultFun(ts4)) + + checkFilterPredicate(Literal(ts1.ts) === tsAttr, classOf[Eq[_]], resultFun(ts1)) + checkFilterPredicate(Literal(ts1.ts) <=> tsAttr, classOf[Eq[_]], resultFun(ts1)) + checkFilterPredicate(Literal(ts2.ts) > tsAttr, classOf[Lt[_]], resultFun(ts1)) + checkFilterPredicate(Literal(ts3.ts) < tsAttr, classOf[Gt[_]], resultFun(ts4)) + checkFilterPredicate(Literal(ts1.ts) >= tsAttr, classOf[LtEq[_]], resultFun(ts1)) + checkFilterPredicate(Literal(ts4.ts) <= tsAttr, classOf[GtEq[_]], resultFun(ts4)) + + checkFilterPredicate(!(tsAttr < ts4.ts), classOf[GtEq[_]], resultFun(ts4)) + checkFilterPredicate(tsAttr < ts2.ts || tsAttr > ts3.ts, classOf[Operators.Or], Seq(Row(resultFun(ts1)), Row(resultFun(ts4)))) } } @@ -588,36 +602,41 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } test("filter pushdown - timestamp") { - // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS - val millisData = Seq( - Timestamp.valueOf("1000-06-14 08:28:53.123"), - Timestamp.valueOf("1582-06-15 08:28:53.001"), - Timestamp.valueOf("1900-06-16 08:28:53.0"), - Timestamp.valueOf("2018-06-17 08:28:53.999")) - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) { - testTimestampPushdown(millisData) - } - - // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS - val microsData = Seq( - Timestamp.valueOf("1000-06-14 08:28:53.123456"), - Timestamp.valueOf("1582-06-15 08:28:53.123456"), - Timestamp.valueOf("1900-06-16 08:28:53.123456"), - Timestamp.valueOf("2018-06-17 08:28:53.123456")) - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) { - testTimestampPushdown(microsData) - } - - // spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.INT96.toString) { - import testImplicits._ - withParquetDataFrame(millisData.map(i => Tuple1(i)).toDF()) { implicit df => - val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) - assertResult(None) { - createParquetFilters(schema).createFilter(sources.IsNull("_1")) + Seq(true, false).foreach { java8Api => + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) { + // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS + val millisData = Seq( + "1000-06-14 08:28:53.123", + "1582-06-15 08:28:53.001", + "1900-06-16 08:28:53.0", + "2018-06-17 08:28:53.999") + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> + ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) { + testTimestampPushdown(millisData, java8Api) + } + + // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS + val microsData = Seq( + "1000-06-14 08:28:53.123456", + "1582-06-15 08:28:53.123456", + "1900-06-16 08:28:53.123456", + "2018-06-17 08:28:53.123456") + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> + ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) { + testTimestampPushdown(microsData, java8Api) + } + + // spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> + ParquetOutputTimestampType.INT96.toString) { + import testImplicits._ + withParquetDataFrame( + millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF()) { implicit df => + val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) + assertResult(None) { + createParquetFilters(schema).createFilter(sources.IsNull("_1")) + } + } } } } From d79a8a88b15645a29fabb245b6db3b2179d0f3c0 Mon Sep 17 00:00:00 2001 From: lipzhu Date: Tue, 2 Jun 2020 21:07:10 +0900 Subject: [PATCH 03/13] [SPARK-31834][SQL] Improve error message for incompatible data types ### What changes were proposed in this pull request? We should use dataType.catalogString to unified the data type mismatch message. Before: ```sql spark-sql> create table SPARK_31834(a int) using parquet; spark-sql> insert into SPARK_31834 select '1'; Error in query: Cannot write incompatible data to table '`default`.`spark_31834`': - Cannot safely cast 'a': StringType to IntegerType; ``` After: ```sql spark-sql> create table SPARK_31834(a int) using parquet; spark-sql> insert into SPARK_31834 select '1'; Error in query: Cannot write incompatible data to table '`default`.`spark_31834`': - Cannot safely cast 'a': string to int; ``` ### How was this patch tested? UT. Closes #28654 from lipzhu/SPARK-31834. Authored-by: lipzhu Signed-off-by: HyukjinKwon --- docs/sql-ref-ansi-compliance.md | 2 +- .../org/apache/spark/sql/types/DataType.scala | 7 ++++--- .../analysis/DataSourceV2AnalysisSuite.scala | 10 +++++----- .../DataTypeWriteCompatibilitySuite.scala | 18 +++++++++--------- .../inputs/postgreSQL/window_part1.sql | 2 +- .../inputs/postgreSQL/window_part3.sql | 2 +- .../apache/spark/sql/sources/InsertSuite.scala | 16 ++++++++-------- .../sql/test/DataFrameReaderWriterSuite.scala | 10 +++++----- .../spark/sql/hive/client/VersionsSuite.scala | 2 +- 9 files changed, 35 insertions(+), 34 deletions(-) diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index b62834ebe906..eab194c71ec7 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -95,7 +95,7 @@ CREATE TABLE t (v INT); -- `spark.sql.storeAssignmentPolicy=ANSI` INSERT INTO t VALUES ('1'); org.apache.spark.sql.AnalysisException: Cannot write incompatible data to table '`default`.`t`': -- Cannot safely cast 'v': StringType to IntegerType; +- Cannot safely cast 'v': string to int; -- `spark.sql.storeAssignmentPolicy=LEGACY` (This is a legacy behaviour until Spark 2.x) INSERT INTO t VALUES ('1'); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 7449a28e069d..fe8d7efc9dc1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -457,7 +457,7 @@ object DataType { case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == STRICT => if (!Cast.canUpCast(w, r)) { - addError(s"Cannot safely cast '$context': $w to $r") + addError(s"Cannot safely cast '$context': ${w.catalogString} to ${r.catalogString}") false } else { true @@ -467,7 +467,7 @@ object DataType { case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == ANSI => if (!Cast.canANSIStoreAssign(w, r)) { - addError(s"Cannot safely cast '$context': $w to $r") + addError(s"Cannot safely cast '$context': ${w.catalogString} to ${r.catalogString}") false } else { true @@ -477,7 +477,8 @@ object DataType { true case (w, r) => - addError(s"Cannot write '$context': $w is incompatible with $r") + addError(s"Cannot write '$context': " + + s"${w.catalogString} is incompatible with ${r.catalogString}") false } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala index c01dea96fe2d..e466d558db1e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala @@ -21,7 +21,7 @@ import java.net.URI import java.util.Locale import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} -import org.apache.spark.sql.catalyst.expressions.{Alias, AnsiCast, AttributeReference, Cast, Expression, LessThanOrEqual, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, AnsiCast, AttributeReference, Cast, LessThanOrEqual, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy @@ -143,7 +143,7 @@ abstract class DataSourceV2StrictAnalysisSuite extends DataSourceV2AnalysisBaseS assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write", "'table-name'", - "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType")) + "Cannot safely cast", "'x'", "'y'", "double to float")) } test("byName: multiple field errors are reported") { @@ -160,7 +160,7 @@ abstract class DataSourceV2StrictAnalysisSuite extends DataSourceV2AnalysisBaseS assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write incompatible data to table", "'table-name'", - "Cannot safely cast", "'x'", "DoubleType to FloatType", + "Cannot safely cast", "'x'", "double to float", "Cannot write nullable values to non-null column", "'x'", "Cannot find data for output column", "'y'")) } @@ -176,7 +176,7 @@ abstract class DataSourceV2StrictAnalysisSuite extends DataSourceV2AnalysisBaseS assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write", "'table-name'", - "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType")) + "Cannot safely cast", "'x'", "'y'", "double to float")) } test("byPosition: multiple field errors are reported") { @@ -194,7 +194,7 @@ abstract class DataSourceV2StrictAnalysisSuite extends DataSourceV2AnalysisBaseS assertAnalysisError(parsedPlan, Seq( "Cannot write incompatible data to table", "'table-name'", "Cannot write nullable values to non-null column", "'x'", - "Cannot safely cast", "'x'", "DoubleType to FloatType")) + "Cannot safely cast", "'x'", "double to float")) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala index c47332f5d9fc..1a262d646ca1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala @@ -80,7 +80,7 @@ class StrictDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBa test("Check NullType is incompatible with all other types") { allNonNullTypes.foreach { t => assertSingleError(NullType, t, "nulls", s"Should not allow writing None to type $t") { err => - assert(err.contains(s"incompatible with $t")) + assert(err.contains(s"incompatible with ${t.catalogString}")) } } } @@ -145,12 +145,12 @@ class ANSIDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBase test("Conversions between timestamp and long are not allowed") { assertSingleError(LongType, TimestampType, "longToTimestamp", "Should not allow long to timestamp") { err => - assert(err.contains("Cannot safely cast 'longToTimestamp': LongType to TimestampType")) + assert(err.contains("Cannot safely cast 'longToTimestamp': bigint to timestamp")) } assertSingleError(TimestampType, LongType, "timestampToLong", "Should not allow timestamp to long") { err => - assert(err.contains("Cannot safely cast 'timestampToLong': TimestampType to LongType")) + assert(err.contains("Cannot safely cast 'timestampToLong': timestamp to bigint")) } } @@ -209,8 +209,8 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { s"Should not allow writing $w to $r because cast is not safe") { err => assert(err.contains("'t'"), "Should include the field name context") assert(err.contains("Cannot safely cast"), "Should identify unsafe cast") - assert(err.contains(s"$w"), "Should include write type") - assert(err.contains(s"$r"), "Should include read type") + assert(err.contains(s"${w.catalogString}"), "Should include write type") + assert(err.contains(s"${r.catalogString}"), "Should include read type") } } } @@ -413,7 +413,7 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { assertNumErrors(writeType, readType, "top", "Should catch 14 errors", 14) { errs => assert(errs(0).contains("'top.a.element'"), "Should identify bad type") assert(errs(0).contains("Cannot safely cast")) - assert(errs(0).contains("StringType to DoubleType")) + assert(errs(0).contains("string to double")) assert(errs(1).contains("'top.a'"), "Should identify bad type") assert(errs(1).contains("Cannot write nullable elements to array of non-nulls")) @@ -430,11 +430,11 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { assert(errs(5).contains("'top.m.key'"), "Should identify bad type") assert(errs(5).contains("Cannot safely cast")) - assert(errs(5).contains("StringType to LongType")) + assert(errs(5).contains("string to bigint")) assert(errs(6).contains("'top.m.value'"), "Should identify bad type") assert(errs(6).contains("Cannot safely cast")) - assert(errs(6).contains("BooleanType to FloatType")) + assert(errs(6).contains("boolean to float")) assert(errs(7).contains("'top.m'"), "Should identify bad type") assert(errs(7).contains("Cannot write nullable values to map of non-nulls")) @@ -452,7 +452,7 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { assert(errs(11).contains("'top.x'"), "Should identify bad type") assert(errs(11).contains("Cannot safely cast")) - assert(errs(11).contains("StringType to IntegerType")) + assert(errs(11).contains("string to int")) assert(errs(12).contains("'top'"), "Should identify bad type") assert(errs(12).contains("expected 'x', found 'y'"), "Should detect name mismatch") diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part1.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part1.sql index 087d7a5befd1..6e95aca7aff6 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part1.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part1.sql @@ -146,7 +146,7 @@ SELECT count(*) OVER (PARTITION BY four) FROM (SELECT * FROM tenk1 WHERE FALSE)s -- mixture of agg/wfunc in the same window -- SELECT sum(salary) OVER w, rank() OVER w FROM empsalary WINDOW w AS (PARTITION BY depname ORDER BY salary DESC); --- Cannot safely cast 'enroll_date': StringType to DateType; +-- Cannot safely cast 'enroll_date': string to date; -- SELECT empno, depname, salary, bonus, depadj, MIN(bonus) OVER (ORDER BY empno), MAX(depadj) OVER () FROM( -- SELECT *, -- CASE WHEN enroll_date < '2008-01-01' THEN 2008 - extract(year FROM enroll_date) END * 500 AS bonus, diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part3.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part3.sql index cd3b74b3aa03..f4b8454da0d8 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part3.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part3.sql @@ -42,7 +42,7 @@ create table datetimes ( f_timestamp timestamp ) using parquet; --- Spark cannot safely cast StringType to TimestampType +-- Spark cannot safely cast string to timestamp -- [SPARK-29636] Spark can't parse '11:00 BST' or '2000-10-19 10:23:54+01' signatures to timestamp insert into datetimes values (1, timestamp '11:00', cast ('11:00 BST' as timestamp), cast ('1 year' as timestamp), cast ('2000-10-19 10:23:54+01' as timestamp), timestamp '2000-10-19 10:23:54'), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 87a4d061b817..abd33ab8a8f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -623,12 +623,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { var msg = intercept[AnalysisException] { sql("insert into t select 1L, 2") }.getMessage - assert(msg.contains("Cannot safely cast 'i': LongType to IntegerType")) + assert(msg.contains("Cannot safely cast 'i': bigint to int")) msg = intercept[AnalysisException] { sql("insert into t select 1, 2.0") }.getMessage - assert(msg.contains("Cannot safely cast 'd': DecimalType(2,1) to DoubleType")) + assert(msg.contains("Cannot safely cast 'd': decimal(2,1) to double")) msg = intercept[AnalysisException] { sql("insert into t select 1, 2.0D, 3") @@ -660,18 +660,18 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { var msg = intercept[AnalysisException] { sql("insert into t values('a', 'b')") }.getMessage - assert(msg.contains("Cannot safely cast 'i': StringType to IntegerType") && - msg.contains("Cannot safely cast 'd': StringType to DoubleType")) + assert(msg.contains("Cannot safely cast 'i': string to int") && + msg.contains("Cannot safely cast 'd': string to double")) msg = intercept[AnalysisException] { sql("insert into t values(now(), now())") }.getMessage - assert(msg.contains("Cannot safely cast 'i': TimestampType to IntegerType") && - msg.contains("Cannot safely cast 'd': TimestampType to DoubleType")) + assert(msg.contains("Cannot safely cast 'i': timestamp to int") && + msg.contains("Cannot safely cast 'd': timestamp to double")) msg = intercept[AnalysisException] { sql("insert into t values(true, false)") }.getMessage - assert(msg.contains("Cannot safely cast 'i': BooleanType to IntegerType") && - msg.contains("Cannot safely cast 'd': BooleanType to DoubleType")) + assert(msg.contains("Cannot safely cast 'i': boolean to int") && + msg.contains("Cannot safely cast 'd': boolean to double")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 9747840ce403..fe0a8439acc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -333,7 +333,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with var msg = intercept[AnalysisException] { Seq((1L, 2.0)).toDF("i", "d").write.mode("append").saveAsTable("t") }.getMessage - assert(msg.contains("Cannot safely cast 'i': LongType to IntegerType")) + assert(msg.contains("Cannot safely cast 'i': bigint to int")) // Insert into table successfully. Seq((1, 2.0)).toDF("i", "d").write.mode("append").saveAsTable("t") @@ -354,14 +354,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with var msg = intercept[AnalysisException] { Seq(("a", "b")).toDF("i", "d").write.mode("append").saveAsTable("t") }.getMessage - assert(msg.contains("Cannot safely cast 'i': StringType to IntegerType") && - msg.contains("Cannot safely cast 'd': StringType to DoubleType")) + assert(msg.contains("Cannot safely cast 'i': string to int") && + msg.contains("Cannot safely cast 'd': string to double")) msg = intercept[AnalysisException] { Seq((true, false)).toDF("i", "d").write.mode("append").saveAsTable("t") }.getMessage - assert(msg.contains("Cannot safely cast 'i': BooleanType to IntegerType") && - msg.contains("Cannot safely cast 'd': BooleanType to DoubleType")) + assert(msg.contains("Cannot safely cast 'i': boolean to int") && + msg.contains("Cannot safely cast 'd': boolean to double")) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index d1dd13623650..8642a5ff1681 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -982,7 +982,7 @@ class VersionsSuite extends SparkFunSuite with Logging { """.stripMargin ) - val errorMsg = "Cannot safely cast 'f0': DecimalType(2,1) to BinaryType" + val errorMsg = "Cannot safely cast 'f0': decimal(2,1) to binary" if (isPartitioned) { val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1.3" From e5c346391072174f410aa7d824ce005ce9054758 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 2 Jun 2020 08:29:07 -0500 Subject: [PATCH 04/13] [SPARK-31765][WEBUI] Upgrade HtmlUnit >= 2.37.0 ### What changes were proposed in this pull request? This PR upgrades HtmlUnit. Selenium and Jetty also upgraded because of dependency. ### Why are the changes needed? Recently, a security issue which affects HtmlUnit is reported. https://nvd.nist.gov/vuln/detail/CVE-2020-5529 According to the report, arbitrary code can be run by malicious users. HtmlUnit is used for test so the impact might not be large but it's better to upgrade it just in case. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing testcases. Closes #28585 from sarutak/upgrade-htmlunit. Authored-by: Kousuke Saruta Signed-off-by: Sean Owen --- core/pom.xml | 2 +- .../main/scala/org/apache/spark/ui/JettyUtils.scala | 7 ++++++- .../scala/org/apache/spark/ui/UISeleniumSuite.scala | 2 +- pom.xml | 10 +++++----- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- streaming/pom.xml | 2 +- 7 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index b0f68880f1d8..14b217d7fb22 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -334,7 +334,7 @@ org.seleniumhq.selenium - selenium-htmlunit-driver + htmlunit-driver test diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 4b4788f45324..f1962ef39fc0 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -23,6 +23,7 @@ import javax.servlet.DispatcherType import javax.servlet.http._ import scala.language.implicitConversions +import scala.util.Try import scala.xml.Node import org.eclipse.jetty.client.HttpClient @@ -500,7 +501,11 @@ private[spark] case class ServerInfo( threadPool match { case pool: QueuedThreadPool => // Workaround for SPARK-30385 to avoid Jetty's acceptor thread shrink. - pool.setIdleTimeout(0) + // As of Jetty 9.4.21, the implementation of + // QueuedThreadPool#setIdleTimeout is changed and IllegalStateException + // will be thrown if we try to set idle timeout after the server has started. + // But this workaround works for Jetty 9.4.28 by ignoring the exception. + Try(pool.setIdleTimeout(0)) case _ => } server.stop() diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 909056eab8c5..ecfdf481f4f6 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -24,6 +24,7 @@ import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.io.Source import scala.xml.Node +import com.gargoylesoftware.css.parser.CSSParseException import com.gargoylesoftware.htmlunit.DefaultCssErrorHandler import org.json4s._ import org.json4s.jackson.JsonMethods @@ -33,7 +34,6 @@ import org.scalatest._ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ import org.scalatestplus.selenium.WebBrowser -import org.w3c.css.sac.CSSParseException import org.apache.spark._ import org.apache.spark.LocalSparkContext._ diff --git a/pom.xml b/pom.xml index bdcb4fd5584f..48f6bef7d065 100644 --- a/pom.xml +++ b/pom.xml @@ -139,7 +139,7 @@ com.twitter 1.6.0 - 9.4.18.v20190429 + 9.4.28.v20200408 3.1.0 0.9.5 2.4.0 @@ -187,8 +187,8 @@ 0.12.0 4.7.1 1.1 - 2.52.0 - 2.22 + 3.141.59 + 2.40.0 @@ -595,8 +595,8 @@ org.seleniumhq.selenium - selenium-htmlunit-driver - ${selenium.version} + htmlunit-driver + ${htmlunit.version} test diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 7c5fcba9c213..e4ef1467a960 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -162,7 +162,7 @@ org.seleniumhq.selenium - selenium-htmlunit-driver + htmlunit-driver test diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 1de2677d5ede..5bf20b209aff 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -95,7 +95,7 @@ org.seleniumhq.selenium - selenium-htmlunit-driver + htmlunit-driver test diff --git a/streaming/pom.xml b/streaming/pom.xml index ea351d449481..53b49dd320e9 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -109,7 +109,7 @@ org.seleniumhq.selenium - selenium-htmlunit-driver + htmlunit-driver test From 69ba9b662e2ace592380e3cc5de041031dec2254 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 2 Jun 2020 11:04:07 -0700 Subject: [PATCH 05/13] [SPARK-31860][BUILD] only push release tags on succes ### What changes were proposed in this pull request? Only push the release tag after the build has finished. ### Why are the changes needed? If the build fails we don't need a release tag. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Running locally with a fake user in https://github.com/apache/spark/pull/28667 Closes #28700 from holdenk/SPARK-31860-build-master-only-push-tags-on-success. Authored-by: Holden Karau Signed-off-by: Holden Karau --- dev/create-release/do-release.sh | 14 +++++++++++--- dev/create-release/release-build.sh | 7 +++++-- dev/create-release/release-tag.sh | 16 +++------------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/dev/create-release/do-release.sh b/dev/create-release/do-release.sh index 4f18a55a3bce..64fba8a56aff 100755 --- a/dev/create-release/do-release.sh +++ b/dev/create-release/do-release.sh @@ -17,6 +17,8 @@ # limitations under the License. # +set -e + SELF=$(cd $(dirname $0) && pwd) . "$SELF/release-util.sh" @@ -52,9 +54,6 @@ function should_build { if should_build "tag" && [ $SKIP_TAG = 0 ]; then run_silent "Creating release tag $RELEASE_TAG..." "tag.log" \ "$SELF/release-tag.sh" - echo "It may take some time for the tag to be synchronized to github." - echo "Press enter when you've verified that the new tag ($RELEASE_TAG) is available." - read else echo "Skipping tag creation for $RELEASE_TAG." fi @@ -79,3 +78,12 @@ if should_build "publish"; then else echo "Skipping publish step." fi + +if should_build "tag" && [ $SKIP_TAG = 0 ]; then + git push origin $RELEASE_TAG + if [[ $RELEASE_TAG != *"preview"* ]]; then + git push origin HEAD:$GIT_BRANCH + else + echo "It's preview release. We only push $RELEASE_TAG to remote." + fi +fi diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index e3bcb72ab5c6..66c51845cc1d 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -92,9 +92,12 @@ BASE_DIR=$(pwd) init_java init_maven_sbt -rm -rf spark -git clone "$ASF_REPO" +# Only clone repo fresh if not present, otherwise use checkout from the tag step +if [ ! -d spark ]; then + git clone "$ASF_REPO" +fi cd spark +git fetch git checkout $GIT_REF git_hash=`git rev-parse --short HEAD` echo "Checked out Spark git hash $git_hash" diff --git a/dev/create-release/release-tag.sh b/dev/create-release/release-tag.sh index 39856a995595..e37aa27fc0aa 100755 --- a/dev/create-release/release-tag.sh +++ b/dev/create-release/release-tag.sh @@ -25,6 +25,7 @@ function exit_with_usage { cat << EOF usage: $NAME Tags a Spark release on a particular branch. +You must push the tags after. Inputs are specified with the following environment variables: ASF_USERNAME - Apache Username @@ -105,19 +106,8 @@ sed -i".tmp7" 's/SPARK_VERSION_SHORT:.*$/SPARK_VERSION_SHORT: '"$R_NEXT_VERSION" git commit -a -m "Preparing development version $NEXT_VERSION" -if ! is_dry_run; then - # Push changes - git push origin $RELEASE_TAG - if [[ $RELEASE_VERSION != *"preview"* ]]; then - git push origin HEAD:$GIT_BRANCH - else - echo "It's preview release. We only push $RELEASE_TAG to remote." - fi - - cd .. - rm -rf spark -else - cd .. +cd .. +if is_dry_run; then mv spark spark.tag echo "Clone with version changes and tag available as spark.tag in the output directory." fi From 25702281dc0c7cc333978f51a15ebf9fd02cc684 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 2 Jun 2020 11:11:23 -0700 Subject: [PATCH 06/13] [SPARK-31778][K8S][BUILD] Support cross-building docker images ### What changes were proposed in this pull request? Add cross build support to our docker image script using the new dockerx extension. ### Why are the changes needed? We have a CI for Spark on ARM, we should support building images for ARM and AMD64. ### Does this PR introduce _any_ user-facing change? Yes, a new flag is added to the docker image build script to cross-build ### How was this patch tested? Manually ran build script & pushed to https://hub.docker.com/repository/registry-1.docker.io/holdenk/spark/tags?page=1 verified amd64 & arm64 listed. Closes #28615 from holdenk/cross-build. Lead-authored-by: Holden Karau Co-authored-by: Holden Karau Signed-off-by: Holden Karau --- bin/docker-image-tool.sh | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index 57b86254ab42..8a01b80c4164 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -19,6 +19,8 @@ # This script builds and pushes docker images when run from a release of Spark # with Kubernetes support. +set -x + function error { echo "$@" 1>&2 exit 1 @@ -172,6 +174,7 @@ function build { local BASEDOCKERFILE=${BASEDOCKERFILE:-"kubernetes/dockerfiles/spark/Dockerfile"} local PYDOCKERFILE=${PYDOCKERFILE:-false} local RDOCKERFILE=${RDOCKERFILE:-false} + local ARCHS=${ARCHS:-"--platform linux/amd64,linux/arm64"} (cd $(img_ctx_dir base) && docker build $NOCACHEARG "${BUILD_ARGS[@]}" \ -t $(image_ref spark) \ @@ -179,6 +182,11 @@ function build { if [ $? -ne 0 ]; then error "Failed to build Spark JVM Docker image, please refer to Docker build output for details." fi + if [ "${CROSS_BUILD}" != "false" ]; then + (cd $(img_ctx_dir base) && docker buildx build $ARCHS $NOCACHEARG "${BUILD_ARGS[@]}" \ + -t $(image_ref spark) \ + -f "$BASEDOCKERFILE" .) + fi if [ "${PYDOCKERFILE}" != "false" ]; then (cd $(img_ctx_dir pyspark) && docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ @@ -187,6 +195,11 @@ function build { if [ $? -ne 0 ]; then error "Failed to build PySpark Docker image, please refer to Docker build output for details." fi + if [ "${CROSS_BUILD}" != "false" ]; then + (cd $(img_ctx_dir pyspark) && docker buildx build $ARCHS $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ + -t $(image_ref spark-py) \ + -f "$PYDOCKERFILE" .) + fi fi if [ "${RDOCKERFILE}" != "false" ]; then @@ -196,6 +209,11 @@ function build { if [ $? -ne 0 ]; then error "Failed to build SparkR Docker image, please refer to Docker build output for details." fi + if [ "${CROSS_BUILD}" != "false" ]; then + (cd $(img_ctx_dir sparkr) && docker buildx build $ARCHS $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ + -t $(image_ref spark-r) \ + -f "$RDOCKERFILE" .) + fi fi } @@ -227,6 +245,8 @@ Options: -n Build docker image with --no-cache -u uid UID to use in the USER directive to set the user the main Spark process runs as inside the resulting container + -X Use docker buildx to cross build. Automatically pushes. + See https://docs.docker.com/buildx/working-with-buildx/ for steps to setup buildx. -b arg Build arg to build or push the image. For multiple build args, this option needs to be used separately for each build arg. @@ -252,6 +272,12 @@ Examples: - Build and push JDK11-based image with tag "v3.0.0" to docker.io/myrepo $0 -r docker.io/myrepo -t v3.0.0 -b java_image_tag=11-jre-slim build $0 -r docker.io/myrepo -t v3.0.0 push + + - Build and push JDK11-based image for multiple archs to docker.io/myrepo + $0 -r docker.io/myrepo -t v3.0.0 -X -b java_image_tag=11-jre-slim build + # Note: buildx, which does cross building, needs to do the push during build + # So there is no seperate push step with -X + EOF } @@ -268,7 +294,8 @@ RDOCKERFILE= NOCACHEARG= BUILD_PARAMS= SPARK_UID= -while getopts f:p:R:mr:t:nb:u: option +CROSS_BUILD="false" +while getopts f:p:R:mr:t:Xnb:u: option do case "${option}" in @@ -279,6 +306,7 @@ do t) TAG=${OPTARG};; n) NOCACHEARG="--no-cache";; b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};; + X) CROSS_BUILD=1;; m) if ! which minikube 1>/dev/null; then error "Cannot find minikube." From 979593d7085cc905ed02a17a618ddfe642ea9a79 Mon Sep 17 00:00:00 2001 From: Eren Avsarogullari Date: Tue, 2 Jun 2020 12:46:12 -0700 Subject: [PATCH 07/13] [SPARK-31566][SQL][DOCS] Add SQL Rest API Documentation ### What changes were proposed in this pull request? SQL Rest API exposes query execution details and metrics as Public API. Its documentation will be useful for the end-users. ### Why are the changes needed? SQL Rest API does not exist under Spark Rest API. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Manually build and check Closes #28354 from erenavsarogullari/SPARK-31566. Lead-authored-by: Eren Avsarogullari Co-authored-by: Gengliang Wang Co-authored-by: Eren Avsarogullari Signed-off-by: Gengliang Wang --- docs/monitoring.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docs/monitoring.md b/docs/monitoring.md index 4da0f8e9d71e..32959b77c477 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -544,6 +544,24 @@ can be identified by their `[attempt-id]`. In the API listed below, when running /applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] Details of the given operation and given batch. + + /applications/[app-id]/sql + A list of all queries for a given application. +
+ ?details=[true (default) | false] lists/hides details of Spark plan nodes. +
+ ?planDescription=[true (default) | false] enables/disables Physical planDescription on demand when Physical Plan size is high. +
+ ?offset=[offset]&length=[len] lists queries in the given range. + + + /applications/[app-id]/sql/[execution-id] + Details for the given query. +
+ ?details=[true (default) | false] lists/hides metric details in addition to given query details. +
+ ?planDescription=[true (default) | false] enables/disables Physical planDescription on demand for the given query when Physical Plan size is high. + /applications/[app-id]/environment Environment details of the given application. From 367d94a30d946637529c319eab8817df84e78926 Mon Sep 17 00:00:00 2001 From: William Hyun Date: Tue, 2 Jun 2020 21:58:33 +0000 Subject: [PATCH 08/13] [SPARK-31876][BUILD] Upgrade to Zstd 1.4.5 ### What changes were proposed in this pull request? This PR aims to upgrade to Zstd 1.4.5. ### Why are the changes needed? Zstd 1.4.5 improves performance. https://github.com/facebook/zstd/releases/tag/v1.4.5 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Passed the Jenkins. Closes #28682 from williamhyun/zstd. Authored-by: William Hyun Signed-off-by: DB Tsai --- dev/deps/spark-deps-hadoop-2.7-hive-1.2 | 2 +- dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 2 +- dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 2 +- pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 b/dev/deps/spark-deps-hadoop-2.7-hive-1.2 index d0830a1ddd3d..0fd800558273 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-1.2 @@ -208,4 +208,4 @@ xmlenc/0.52//xmlenc-0.52.jar xz/1.5//xz-1.5.jar zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar zookeeper/3.4.14//zookeeper-3.4.14.jar -zstd-jni/1.4.4-3//zstd-jni-1.4.4-3.jar +zstd-jni/1.4.5-2//zstd-jni-1.4.5-2.jar diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 index 6d050d8a048f..e4df088e08b6 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 @@ -222,4 +222,4 @@ xmlenc/0.52//xmlenc-0.52.jar xz/1.5//xz-1.5.jar zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar zookeeper/3.4.14//zookeeper-3.4.14.jar -zstd-jni/1.4.4-3//zstd-jni-1.4.4-3.jar +zstd-jni/1.4.5-2//zstd-jni-1.4.5-2.jar diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index 6dab667522ec..7f3f74e3e039 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -236,4 +236,4 @@ xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar xz/1.5//xz-1.5.jar zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar zookeeper/3.4.14//zookeeper-3.4.14.jar -zstd-jni/1.4.4-3//zstd-jni-1.4.4-3.jar +zstd-jni/1.4.5-2//zstd-jni-1.4.5-2.jar diff --git a/pom.xml b/pom.xml index 48f6bef7d065..2275db51c30e 100644 --- a/pom.xml +++ b/pom.xml @@ -665,7 +665,7 @@ com.github.luben zstd-jni - 1.4.4-3 + 1.4.5-2 com.clearspring.analytics From 271eb26c026490f7307ae888200bfc81a645592f Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 2 Jun 2020 16:45:16 -0700 Subject: [PATCH 09/13] [SPARK-31882][WEBUI] DAG-viz is not rendered correctly with pagination ### What changes were proposed in this pull request? This PR fix an issue related to DAG-viz. Because DAG-viz for a job fetches link urls for each stage from the stage table, rendering can fail with pagination. You can reproduce this issue with the following operation. ``` sc.parallelize(1 to 10).map(value => (value ,value)).repartition(1).repartition(1).repartition(1).reduceByKey(_ + _).collect ``` And then, visit the corresponding job page. There are 5 stages so show <5 stages in the paged table. dag-rendering-issue1 dag-rendering-issue2 ### Why are the changes needed? This is a bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Newly added test case with following command. `build/sbt -Dtest.default.exclude.tags= -Dspark.test.webdriver.chrome.driver=/path/to/chromedriver "testOnly org.apache.spark.ui.ChromeUISeleniumSuite -- -z SPARK-31882"` Closes #28690 from sarutak/fix-dag-rendering-issue. Authored-by: Kousuke Saruta Signed-off-by: Gengliang Wang --- .../apache/spark/ui/static/spark-dag-viz.js | 7 ++---- .../org/apache/spark/ui/static/webui.js | 7 +++++- .../scala/org/apache/spark/ui/UIUtils.scala | 1 + .../spark/ui/RealBrowserUISeleniumSuite.scala | 22 +++++++++++++++++++ 4 files changed, 31 insertions(+), 6 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index ae02defd9bb9..fd4a48d2db33 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -216,7 +216,7 @@ function renderDagVizForJob(svgContainer) { var dot = metadata.select(".dot-file").text(); var stageId = metadata.attr("stage-id"); var containerId = VizConstants.graphPrefix + stageId; - var isSkipped = metadata.attr("skipped") == "true"; + var isSkipped = metadata.attr("skipped") === "true"; var container; if (isSkipped) { container = svgContainer @@ -225,11 +225,8 @@ function renderDagVizForJob(svgContainer) { .attr("skipped", "true"); } else { // Link each graph to the corresponding stage page (TODO: handle stage attempts) - // Use the link from the stage table so it also works for the history server var attemptId = 0; - var stageLink = d3.select("#stage-" + stageId + "-" + attemptId) - .select("a.name-link") - .attr("href"); + var stageLink = uiRoot + appBasePath + "/stages/stage/?id=" + stageId + "&attempt=" + attemptId; container = svgContainer .append("a") .attr("xlink:href", stageLink) diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js index 0ba461f02317..4f8409ca2b7c 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.js +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js @@ -16,11 +16,16 @@ */ var uiRoot = ""; +var appBasePath = ""; function setUIRoot(val) { uiRoot = val; } +function setAppBasePath(path) { + appBasePath = path; +} + function collapseTablePageLoad(name, table){ if (window.localStorage.getItem(name) == "true") { // Set it to false so that the click function can revert it @@ -33,7 +38,7 @@ function collapseTable(thisName, table){ var status = window.localStorage.getItem(thisName) == "true"; status = !status; - var thisClass = '.' + thisName + var thisClass = '.' + thisName; // Expand the list of additional metrics. var tableDiv = $(thisClass).parent().find('.' + table); diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 90167858df66..087a22d6c614 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -292,6 +292,7 @@ private[spark] object UIUtils extends Logging { {commonHeaderNodes(request)} + {if (showVisualization) vizHeaderNodes(request) else Seq.empty} {if (useDataTables) dataTablesHeaderNodes(request) else Seq.empty} + sc.parallelize(1 to 100).map(v => (v, v)).repartition(10).reduceByKey(_ + _).collect + + eventually(timeout(10.seconds), interval(50.microseconds)) { + val pathWithPagedTable = + "/jobs/job/?id=0&completedStage.page=2&completedStage.sort=Stage+Id&" + + "completedStage.desc=true&completedStage.pageSize=1#completed" + goToUi(sc, pathWithPagedTable) + + // Open DAG Viz. + webDriver.findElement(By.id("job-dag-viz")).click() + val stages = webDriver.findElements(By.cssSelector("svg[class='job'] > a")) + stages.size() should be (3) + + stages.get(0).getAttribute("href") should include ("/stages/stage/?id=0&attempt=0") + stages.get(1).getAttribute("href") should include ("/stages/stage/?id=1&attempt=0") + stages.get(2).getAttribute("href") should include ("/stages/stage/?id=2&attempt=0") + } + } + } + /** * Create a test SparkContext with the SparkUI enabled. * It is safe to `get` the SparkUI directly from the SparkContext returned here. From e4db3b5b1742b4bdfa32937273e5d07a76cde79b Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Tue, 2 Jun 2020 17:26:43 -0700 Subject: [PATCH 10/13] [SPARK-29431][WEBUI] Improve Web UI / Sql tab visualization with cached dataframes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? With this pull request I want to improve the Web UI / SQL tab visualization. The principal problem that I find is when you have a cache in your plan, the SQL visualization don’t show any information about the part of the plan that has been cached. Before the change ![image](https://user-images.githubusercontent.com/12819544/66587418-aa7f6280-eb8a-11e9-80cf-bf10d6c0abea.png) After the change ![image](https://user-images.githubusercontent.com/12819544/66587526-ddc1f180-eb8a-11e9-92de-c3b3f5657b66.png) ### Why are the changes needed? When we have a SQL plan with cached dataframes we lose the graphical information of this dataframe in the sql tab ### Does this PR introduce any user-facing change? Yes, in the sql tab ### How was this patch tested? Unit testing and manual tests throught spark shell Closes #26082 from planga82/feature/SPARK-29431_SQL_Cache_webUI. Lead-authored-by: Pablo Langa Co-authored-by: Gengliang Wang Co-authored-by: Unknown Signed-off-by: Gengliang Wang --- .../spark/sql/execution/SparkPlanInfo.scala | 2 + .../sql/execution/ui/SparkPlanInfoSuite.scala | 44 +++++++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 357820a9d63d..db587dd98685 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo import org.apache.spark.sql.internal.SQLConf @@ -56,6 +57,7 @@ private[execution] object SparkPlanInfo { case ReusedSubqueryExec(child) => child :: Nil case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil case stage: QueryStageExec => stage.plan :: Nil + case inMemTab: InMemoryTableScanExec => inMemTab.relation.cachedPlan :: Nil case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala new file mode 100644 index 000000000000..a702e00ff9f9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import org.apache.spark.sql.execution.SparkPlanInfo +import org.apache.spark.sql.test.SharedSparkSession + +class SparkPlanInfoSuite extends SharedSparkSession{ + + import testImplicits._ + + def vaidateSparkPlanInfo(sparkPlanInfo: SparkPlanInfo): Unit = { + sparkPlanInfo.nodeName match { + case "InMemoryTableScan" => assert(sparkPlanInfo.children.length == 1) + case _ => sparkPlanInfo.children.foreach(vaidateSparkPlanInfo) + } + } + + test("SparkPlanInfo creation from SparkPlan with InMemoryTableScan node") { + val dfWithCache = Seq( + (1, 1), + (2, 2) + ).toDF().filter("_1 > 1").cache().repartition(10) + + val planInfoResult = SparkPlanInfo.fromSparkPlan(dfWithCache.queryExecution.executedPlan) + + vaidateSparkPlanInfo(planInfoResult) + } +} From e1d52011401c1989f26b230eb8c82adc63e147e7 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Wed, 3 Jun 2020 12:07:05 +0900 Subject: [PATCH 11/13] [SPARK-31895][PYTHON][SQL] Support DataFrame.explain(extended: str) case to be consistent with Scala side ### What changes were proposed in this pull request? Scala: ```scala scala> spark.range(10).explain("cost") ``` ``` == Optimized Logical Plan == Range (0, 10, step=1, splits=Some(12)), Statistics(sizeInBytes=80.0 B) == Physical Plan == *(1) Range (0, 10, step=1, splits=12) ``` PySpark: ```python >>> spark.range(10).explain("cost") ``` ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/dataframe.py", line 333, in explain raise TypeError(err_msg) TypeError: extended (optional) should be provided as bool, got ``` In addition, it is consistent with other codes too, for example, `DataFrame.sample` also can support `DataFrame.sample(1.0)` and `DataFrame.sample(False)`. ### Why are the changes needed? To provide the consistent API support across APIs. ### Does this PR introduce _any_ user-facing change? Nope, it's only changes in unreleased branches. If this lands to master only, yes, users will be able to set `mode` as `df.explain("...")` in Spark 3.1. After this PR: ```python >>> spark.range(10).explain("cost") ``` ``` == Optimized Logical Plan == Range (0, 10, step=1, splits=Some(12)), Statistics(sizeInBytes=80.0 B) == Physical Plan == *(1) Range (0, 10, step=1, splits=12) ``` ### How was this patch tested? Unittest was added and manually tested as well to make sure: ```python spark.range(10).explain(True) spark.range(10).explain(False) spark.range(10).explain("cost") spark.range(10).explain(extended="cost") spark.range(10).explain(mode="cost") spark.range(10).explain() spark.range(10).explain(True, "cost") spark.range(10).explain(1.0) ``` Closes #28711 from HyukjinKwon/SPARK-31895. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon --- python/pyspark/sql/dataframe.py | 35 ++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 65b902cf3c4d..03e3b9ca4bd0 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -276,6 +276,8 @@ def explain(self, extended=None, mode=None): """Prints the (logical and physical) plans to the console for debugging purpose. :param extended: boolean, default ``False``. If ``False``, prints only the physical plan. + When this is a string without specifying the ``mode``, it works as the mode is + specified. :param mode: specifies the expected output format of plans. * ``simple``: Print only a physical plan. @@ -306,12 +308,17 @@ def explain(self, extended=None, mode=None): Output [2]: [age#0, name#1] ... + >>> df.explain("cost") + == Optimized Logical Plan == + ...Statistics... + ... + .. versionchanged:: 3.0.0 Added optional argument `mode` to specify the expected output format of plans. """ if extended is not None and mode is not None: - raise Exception("extended and mode can not be specified simultaneously") + raise Exception("extended and mode should not be set together.") # For the no argument case: df.explain() is_no_argument = extended is None and mode is None @@ -319,18 +326,22 @@ def explain(self, extended=None, mode=None): # For the cases below: # explain(True) # explain(extended=False) - is_extended_case = extended is not None and isinstance(extended, bool) + is_extended_case = isinstance(extended, bool) and mode is None - # For the mode specified: df.explain(mode="formatted") - is_mode_case = mode is not None and isinstance(mode, basestring) + # For the case when extended is mode: + # df.explain("formatted") + is_extended_as_mode = isinstance(extended, basestring) and mode is None - if not is_no_argument and not (is_extended_case or is_mode_case): - if extended is not None: - err_msg = "extended (optional) should be provided as bool" \ - ", got {0}".format(type(extended)) - else: # For mode case - err_msg = "mode (optional) should be provided as str, got {0}".format(type(mode)) - raise TypeError(err_msg) + # For the mode specified: + # df.explain(mode="formatted") + is_mode_case = extended is None and isinstance(mode, basestring) + + if not (is_no_argument or is_extended_case or is_extended_as_mode or is_mode_case): + argtypes = [ + str(type(arg)) for arg in [extended, mode] if arg is not None] + raise TypeError( + "extended (optional) and mode (optional) should be a string " + "and bool; however, got [%s]." % ", ".join(argtypes)) # Sets an explain mode depending on a given argument if is_no_argument: @@ -339,6 +350,8 @@ def explain(self, extended=None, mode=None): explain_mode = "extended" if extended else "simple" elif is_mode_case: explain_mode = mode + elif is_extended_as_mode: + explain_mode = extended print(self._sc._jvm.PythonSQLUtils.explainString(self._jdf.queryExecution(), explain_mode)) From baafd4386c89d61c5eb600554df6315466e95fa8 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Wed, 3 Jun 2020 14:15:30 +0900 Subject: [PATCH 12/13] Revert "[SPARK-31765][WEBUI] Upgrade HtmlUnit >= 2.37.0" This reverts commit e5c346391072174f410aa7d824ce005ce9054758. --- core/pom.xml | 2 +- .../main/scala/org/apache/spark/ui/JettyUtils.scala | 7 +------ .../scala/org/apache/spark/ui/UISeleniumSuite.scala | 2 +- pom.xml | 10 +++++----- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- streaming/pom.xml | 2 +- 7 files changed, 11 insertions(+), 16 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 14b217d7fb22..b0f68880f1d8 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -334,7 +334,7 @@ org.seleniumhq.selenium - htmlunit-driver + selenium-htmlunit-driver test diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index f1962ef39fc0..4b4788f45324 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -23,7 +23,6 @@ import javax.servlet.DispatcherType import javax.servlet.http._ import scala.language.implicitConversions -import scala.util.Try import scala.xml.Node import org.eclipse.jetty.client.HttpClient @@ -501,11 +500,7 @@ private[spark] case class ServerInfo( threadPool match { case pool: QueuedThreadPool => // Workaround for SPARK-30385 to avoid Jetty's acceptor thread shrink. - // As of Jetty 9.4.21, the implementation of - // QueuedThreadPool#setIdleTimeout is changed and IllegalStateException - // will be thrown if we try to set idle timeout after the server has started. - // But this workaround works for Jetty 9.4.28 by ignoring the exception. - Try(pool.setIdleTimeout(0)) + pool.setIdleTimeout(0) case _ => } server.stop() diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index ecfdf481f4f6..909056eab8c5 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -24,7 +24,6 @@ import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.io.Source import scala.xml.Node -import com.gargoylesoftware.css.parser.CSSParseException import com.gargoylesoftware.htmlunit.DefaultCssErrorHandler import org.json4s._ import org.json4s.jackson.JsonMethods @@ -34,6 +33,7 @@ import org.scalatest._ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ import org.scalatestplus.selenium.WebBrowser +import org.w3c.css.sac.CSSParseException import org.apache.spark._ import org.apache.spark.LocalSparkContext._ diff --git a/pom.xml b/pom.xml index 2275db51c30e..b3f7b7db1a79 100644 --- a/pom.xml +++ b/pom.xml @@ -139,7 +139,7 @@ com.twitter 1.6.0 - 9.4.28.v20200408 + 9.4.18.v20190429 3.1.0 0.9.5 2.4.0 @@ -187,8 +187,8 @@ 0.12.0 4.7.1 1.1 - 3.141.59 - 2.40.0 + 2.52.0 + 2.22 @@ -595,8 +595,8 @@
org.seleniumhq.selenium - htmlunit-driver - ${htmlunit.version} + selenium-htmlunit-driver + ${selenium.version} test diff --git a/sql/core/pom.xml b/sql/core/pom.xml index e4ef1467a960..7c5fcba9c213 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -162,7 +162,7 @@ org.seleniumhq.selenium - htmlunit-driver + selenium-htmlunit-driver test diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 5bf20b209aff..1de2677d5ede 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -95,7 +95,7 @@ org.seleniumhq.selenium - htmlunit-driver + selenium-htmlunit-driver test diff --git a/streaming/pom.xml b/streaming/pom.xml index 53b49dd320e9..ea351d449481 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -109,7 +109,7 @@ org.seleniumhq.selenium - htmlunit-driver + selenium-htmlunit-driver test From c59f51bcc207725b8cbc4201df9367f874f5915c Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 3 Jun 2020 06:07:53 +0000 Subject: [PATCH 13/13] [SPARK-31879][SQL] Using GB as default Locale for datetime formatters # What changes were proposed in this pull request? This PR switches the default Locale from the `US` to `GB` to change the behavior of the first day of the week from Sunday-started to Monday-started as same as v2.4 ### Why are the changes needed? #### cases ```sql spark-sql> select to_timestamp('2020-1-1', 'YYYY-w-u'); 2019-12-29 00:00:00 spark-sql> set spark.sql.legacy.timeParserPolicy=legacy; spark.sql.legacy.timeParserPolicy legacy spark-sql> select to_timestamp('2020-1-1', 'YYYY-w-u'); 2019-12-30 00:00:00 ``` #### reasons These week-based fields need Locale to express their semantics, the first day of the week varies from country to country. From the Java doc of WeekFields ```java /** * Gets the first day-of-week. *

* The first day-of-week varies by culture. * For example, the US uses Sunday, while France and the ISO-8601 standard use Monday. * This method returns the first day using the standard {code DayOfWeek} enum. * * return the first day-of-week, not null */ public DayOfWeek getFirstDayOfWeek() { return firstDayOfWeek; } ``` But for the SimpleDateFormat, the day-of-week is not localized ``` u Day number of week (1 = Monday, ..., 7 = Sunday) Number 1 ``` Currently, the default locale we use is the US, so the result moved a day backward. For other countries, please refer to [First Day of the Week in Different Countries](http://chartsbin.com/view/41671) With this change, it restores the first day of week calculating for functions when using the default locale. ### Does this PR introduce _any_ user-facing change? Yes, but the behavior change is used to restore the old one of v2.4 ### How was this patch tested? add unit tests Closes #28692 from yaooqinn/SPARK-31879. Authored-by: Kent Yao Signed-off-by: Wenchen Fan --- .../sql/catalyst/util/DateFormatter.scala | 8 +++++++- .../sql/catalyst/util/TimestampFormatter.scala | 8 +++++++- .../resources/sql-tests/inputs/datetime.sql | 4 ++++ .../sql-tests/results/ansi/datetime.sql.out | 18 +++++++++++++++++- .../sql-tests/results/datetime-legacy.sql.out | 18 +++++++++++++++++- .../sql-tests/results/datetime.sql.out | 18 +++++++++++++++++- 6 files changed, 69 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 06e1cdc27e7d..fe20e546f5d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -117,7 +117,13 @@ class LegacySimpleDateFormatter(pattern: String, locale: Locale) extends LegacyD object DateFormatter { import LegacyDateFormats._ - val defaultLocale: Locale = Locale.US + /** + * Before Spark 3.0, the first day-of-week is always Monday. Since Spark 3.0, it depends on the + * locale. + * We pick GB as the default locale instead of US, to be compatible with Spark 2.x, as US locale + * uses Sunday as the first day-of-week. See SPARK-31879. + */ + val defaultLocale: Locale = new Locale("en", "GB") val defaultPattern: String = "yyyy-MM-dd" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 3e302e217039..1f14c70164c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -278,7 +278,13 @@ object LegacyDateFormats extends Enumeration { object TimestampFormatter { import LegacyDateFormats._ - val defaultLocale: Locale = Locale.US + /** + * Before Spark 3.0, the first day-of-week is always Monday. Since Spark 3.0, it depends on the + * locale. + * We pick GB as the default locale instead of US, to be compatible with Spark 2.x, as US locale + * uses Sunday as the first day-of-week. See SPARK-31879. + */ + val defaultLocale: Locale = new Locale("en", "GB") def defaultPattern(): String = s"${DateFormatter.defaultPattern} HH:mm:ss" diff --git a/sql/core/src/test/resources/sql-tests/inputs/datetime.sql b/sql/core/src/test/resources/sql-tests/inputs/datetime.sql index 9bd936f6f441..5636e0b67036 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/datetime.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/datetime.sql @@ -164,3 +164,7 @@ select from_csv('26/October/2015', 'date Date', map('dateFormat', 'dd/MMMMM/yyyy select from_unixtime(1, 'yyyyyyyyyyy-MM-dd'); select date_format(timestamp '2018-11-17 13:33:33', 'yyyyyyyyyy-MM-dd HH:mm:ss'); select date_format(date '2018-11-17', 'yyyyyyyyyyy-MM-dd'); + +-- SPARK-31879: the first day of week +select date_format('2020-01-01', 'YYYY-MM-dd uu'); +select date_format('2020-01-01', 'YYYY-MM-dd uuuu'); diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out index ca04b008d653..3803460f3f08 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 119 +-- Number of queries: 121 -- !query @@ -1025,3 +1025,19 @@ struct<> -- !query output org.apache.spark.SparkUpgradeException You may get a different result due to the upgrading of Spark 3.0: Fail to recognize 'yyyyyyyyyyy-MM-dd' pattern in the DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html + + +-- !query +select date_format('2020-01-01', 'YYYY-MM-dd uu') +-- !query schema +struct +-- !query output +2020-01-01 03 + + +-- !query +select date_format('2020-01-01', 'YYYY-MM-dd uuuu') +-- !query schema +struct +-- !query output +2020-01-01 Wednesday diff --git a/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out b/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out index fe932d3a706a..99dd14d21e6f 100644 --- a/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 119 +-- Number of queries: 121 -- !query @@ -980,3 +980,19 @@ select date_format(date '2018-11-17', 'yyyyyyyyyyy-MM-dd') struct -- !query output 00000002018-11-17 + + +-- !query +select date_format('2020-01-01', 'YYYY-MM-dd uu') +-- !query schema +struct +-- !query output +2020-01-01 03 + + +-- !query +select date_format('2020-01-01', 'YYYY-MM-dd uuuu') +-- !query schema +struct +-- !query output +2020-01-01 0003 diff --git a/sql/core/src/test/resources/sql-tests/results/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/datetime.sql.out index 06a41da2671e..c8c568c736d7 100755 --- a/sql/core/src/test/resources/sql-tests/results/datetime.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/datetime.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 119 +-- Number of queries: 121 -- !query @@ -997,3 +997,19 @@ struct<> -- !query output org.apache.spark.SparkUpgradeException You may get a different result due to the upgrading of Spark 3.0: Fail to recognize 'yyyyyyyyyyy-MM-dd' pattern in the DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html + + +-- !query +select date_format('2020-01-01', 'YYYY-MM-dd uu') +-- !query schema +struct +-- !query output +2020-01-01 03 + + +-- !query +select date_format('2020-01-01', 'YYYY-MM-dd uuuu') +-- !query schema +struct +-- !query output +2020-01-01 Wednesday