Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1808,6 +1808,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
- Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.
- Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``.
- Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema.
- Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after promotes both sides to TIMESTAMP. To set `false` to `spark.sql.hive.compareDateTimestampInTimestamp` restores the previous behavior. This option will be removed in Spark 3.0.

## Upgrading From Spark SQL 2.2 to 2.3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ import org.apache.spark.sql.types._
object TypeCoercion {

def typeCoercionRules(conf: SQLConf): List[Rule[LogicalPlan]] =
InConversion ::
InConversion(conf) ::
WidenSetOperationTypes ::
PromoteStrings ::
PromoteStrings(conf) ::
DecimalPrecision ::
BooleanEquality ::
FunctionArgumentConversion ::
Expand Down Expand Up @@ -127,27 +127,34 @@ object TypeCoercion {
* is a String and the other is not. It also handles when one op is a Date and the
* other is a Timestamp by making the target type to be String.
*/
val findCommonTypeForBinaryComparison: (DataType, DataType) => Option[DataType] = {
private def findCommonTypeForBinaryComparison(
dt1: DataType, dt2: DataType, conf: SQLConf): Option[DataType] = (dt1, dt2) match {
// We should cast all relative timestamp/date/string comparison into string comparisons
// This behaves as a user would expect because timestamp strings sort lexicographically.
// i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true
case (StringType, DateType) => Some(StringType)
case (DateType, StringType) => Some(StringType)
case (StringType, TimestampType) => Some(StringType)
case (TimestampType, StringType) => Some(StringType)
case (TimestampType, DateType) => Some(StringType)
case (DateType, TimestampType) => Some(StringType)
case (StringType, NullType) => Some(StringType)
case (NullType, StringType) => Some(StringType)

// Cast to TimestampType when we compare DateType with TimestampType
// if conf.compareDateTimestampInTimestamp is true
// i.e. TimeStamp('2017-03-01 00:00:00') eq Date('2017-03-01') = true
case (TimestampType, DateType)
=> if (conf.compareDateTimestampInTimestamp) Some(TimestampType) else Some(StringType)
case (DateType, TimestampType)
=> if (conf.compareDateTimestampInTimestamp) Some(TimestampType) else Some(StringType)

// There is no proper decimal type we can pick,
// using double type is the best we can do.
// See SPARK-22469 for details.
case (n: DecimalType, s: StringType) => Some(DoubleType)
case (s: StringType, n: DecimalType) => Some(DoubleType)

case (l: StringType, r: AtomicType) if r != StringType => Some(r)
case (l: AtomicType, r: StringType) if (l != StringType) => Some(l)
case (l: AtomicType, r: StringType) if l != StringType => Some(l)
case (l, r) => None
}

Expand Down Expand Up @@ -313,7 +320,7 @@ object TypeCoercion {
/**
* Promotes strings that appear in arithmetic expressions.
*/
object PromoteStrings extends TypeCoercionRule {
case class PromoteStrings(conf: SQLConf) extends TypeCoercionRule {
private def castExpr(expr: Expression, targetType: DataType): Expression = {
(expr.dataType, targetType) match {
case (NullType, dt) => Literal.create(null, targetType)
Expand Down Expand Up @@ -342,8 +349,8 @@ object TypeCoercion {
p.makeCopy(Array(left, Cast(right, TimestampType)))

case p @ BinaryComparison(left, right)
if findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined =>
val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType).get
if findCommonTypeForBinaryComparison(left.dataType, right.dataType, conf).isDefined =>
val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType, conf).get
p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType)))

case Abs(e @ StringType()) => Abs(Cast(e, DoubleType))
Expand Down Expand Up @@ -374,7 +381,7 @@ object TypeCoercion {
* operator type is found the original expression will be returned and an
* Analysis Exception will be raised at the type checking phase.
*/
object InConversion extends TypeCoercionRule {
case class InConversion(conf: SQLConf) extends TypeCoercionRule {
private def flattenExpr(expr: Expression): Seq[Expression] = {
expr match {
// Multi columns in IN clause is represented as a CreateNamedStruct.
Expand All @@ -400,7 +407,7 @@ object TypeCoercion {
val rhs = sub.output

val commonTypes = lhs.zip(rhs).flatMap { case (l, r) =>
findCommonTypeForBinaryComparison(l.dataType, r.dataType)
findCommonTypeForBinaryComparison(l.dataType, r.dataType, conf)
.orElse(findTightestCommonType(l.dataType, r.dataType))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,16 @@ object SQLConf {
.checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
.createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)

val TYPECOERCION_COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP =
buildConf("spark.sql.typeCoercion.compareDateTimestampInTimestamp")
.internal()
.doc("When true (default), compare Date with Timestamp after converting both sides to " +
"Timestamp. This behavior is compatible with Hive 2.2 or later. See HIVE-15236. " +
"When false, restore the behavior prior to Spark 2.4. Compare Date with Timestamp after " +
"converting both sides to string. This config will be removed in spark 3.0")
.booleanConf
.createWithDefault(true)

val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
Expand Down Expand Up @@ -1332,6 +1342,9 @@ class SQLConf extends Serializable with Logging {
def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE))

def compareDateTimestampInTimestamp : Boolean =
getConf(TYPECOERCION_COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP)

def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)

def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ class TypeCoercionSuite extends AnalysisTest {
*/
test("make sure rules do not fire early") {
// InConversion
val inConversion = TypeCoercion.InConversion
val inConversion = TypeCoercion.InConversion(conf)
ruleTest(inConversion,
In(UnresolvedAttribute("a"), Seq(Literal(1))),
In(UnresolvedAttribute("a"), Seq(Literal(1)))
Expand Down Expand Up @@ -1251,18 +1251,40 @@ class TypeCoercionSuite extends AnalysisTest {
}

test("binary comparison with string promotion") {
ruleTest(PromoteStrings,
val rule = TypeCoercion.PromoteStrings(conf)
ruleTest(rule,
GreaterThan(Literal("123"), Literal(1)),
GreaterThan(Cast(Literal("123"), IntegerType), Literal(1)))
ruleTest(PromoteStrings,
ruleTest(rule,
LessThan(Literal(true), Literal("123")),
LessThan(Literal(true), Cast(Literal("123"), BooleanType)))
ruleTest(PromoteStrings,
ruleTest(rule,
EqualTo(Literal(Array(1, 2)), Literal("123")),
EqualTo(Literal(Array(1, 2)), Literal("123")))
ruleTest(PromoteStrings,
ruleTest(rule,
GreaterThan(Literal("1.5"), Literal(BigDecimal("0.5"))),
GreaterThan(Cast(Literal("1.5"), DoubleType), Cast(Literal(BigDecimal("0.5")), DoubleType)))
GreaterThan(Cast(Literal("1.5"), DoubleType), Cast(Literal(BigDecimal("0.5")),
DoubleType)))
Seq(true, false).foreach { convertToTS =>
withSQLConf(
"spark.sql.typeCoercion.compareDateTimestampInTimestamp" -> convertToTS.toString) {
val date0301 = Literal(java.sql.Date.valueOf("2017-03-01"))
val timestamp0301000000 = Literal(Timestamp.valueOf("2017-03-01 00:00:00"))
val timestamp0301000001 = Literal(Timestamp.valueOf("2017-03-01 00:00:01"))
if (convertToTS) {
// `Date` should be treated as timestamp at 00:00:00 See SPARK-23549
ruleTest(rule, EqualTo(date0301, timestamp0301000000),
EqualTo(Cast(date0301, TimestampType), timestamp0301000000))
ruleTest(rule, LessThan(date0301, timestamp0301000001),
LessThan(Cast(date0301, TimestampType), timestamp0301000001))
} else {
ruleTest(rule, LessThan(date0301, timestamp0301000000),
LessThan(Cast(date0301, StringType), Cast(timestamp0301000000, StringType)))
ruleTest(rule, LessThan(date0301, timestamp0301000001),
LessThan(Cast(date0301, StringType), Cast(timestamp0301000001, StringType)))
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  test("binary comparison with string promotion") {
    val rule = TypeCoercion.PromoteStrings(conf)
    ruleTest(rule,
      GreaterThan(Literal("123"), Literal(1)),
      GreaterThan(Cast(Literal("123"), IntegerType), Literal(1)))
    ruleTest(rule,
      LessThan(Literal(true), Literal("123")),
      LessThan(Literal(true), Cast(Literal("123"), BooleanType)))
    ruleTest(rule,
      EqualTo(Literal(Array(1, 2)), Literal("123")),
      EqualTo(Literal(Array(1, 2)), Literal("123")))
    ruleTest(rule,
      GreaterThan(Literal("1.5"), Literal(BigDecimal("0.5"))),
      GreaterThan(Cast(Literal("1.5"), DoubleType), Cast(Literal(BigDecimal("0.5")),
        DoubleType)))
    Seq(true, false).foreach { convertToTS =>
      withSQLConf("spark.sql.hive.compareDateTimestampInTimestamp" -> convertToTS.toString) {
        val date0301 = Literal(java.sql.Date.valueOf("2017-03-01"))
        val timestamp0301000000 = Literal(Timestamp.valueOf("2017-03-01 00:00:00"))
        val timestamp0301000001 = Literal(Timestamp.valueOf("2017-03-01 00:00:01"))
        if (convertToTS) {
          // `Date` should be treated as timestamp at 00:00:00 See SPARK-23549
          ruleTest(rule, EqualTo(date0301, timestamp0301000000),
            EqualTo(Cast(date0301, TimestampType), timestamp0301000000))
          ruleTest(rule, LessThan(date0301, timestamp0301000001),
            LessThan(Cast(date0301, TimestampType), timestamp0301000001))
        } else {
          ruleTest(rule, LessThan(date0301, timestamp0301000000),
            LessThan(Cast(date0301, StringType), Cast(timestamp0301000000, StringType)))
          ruleTest(rule, LessThan(date0301, timestamp0301000001),
            LessThan(Cast(date0301, StringType), Cast(timestamp0301000001, StringType)))
        }
      }
    }
  }

}

test("cast WindowFrame boundaries to the type they operate upon") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,10 @@ select 2.0 <= '2.2';
select 0.5 <= '1.5';
select to_date('2009-07-30 04:17:52') <= to_date('2009-07-30 04:17:52');
select to_date('2009-07-30 04:17:52') <= '2009-07-30 04:17:52';

-- SPARK-23549: Cast to timestamp when comparing timestamp with date
select to_date('2017-03-01') = to_timestamp('2017-03-01 00:00:00');
select to_timestamp('2017-03-01 00:00:01') > to_date('2017-03-01');
select to_timestamp('2017-03-01 00:00:01') >= to_date('2017-03-01');
select to_date('2017-03-01') < to_timestamp('2017-03-01 00:00:01');
select to_date('2017-03-01') <= to_timestamp('2017-03-01 00:00:01');
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 32
-- Number of queries: 37


-- !query 0
Expand Down Expand Up @@ -256,3 +256,43 @@ select to_date('2009-07-30 04:17:52') <= '2009-07-30 04:17:52'
struct<(CAST(to_date('2009-07-30 04:17:52') AS STRING) <= 2009-07-30 04:17:52):boolean>
-- !query 31 output
true


-- !query 32
select to_date('2017-03-01') = to_timestamp('2017-03-01 00:00:00')
-- !query 32 schema
struct<(CAST(to_date('2017-03-01') AS TIMESTAMP) = to_timestamp('2017-03-01 00:00:00')):boolean>
-- !query 32 output
true


-- !query 33
select to_timestamp('2017-03-01 00:00:01') > to_date('2017-03-01')
-- !query 33 schema
struct<(to_timestamp('2017-03-01 00:00:01') > CAST(to_date('2017-03-01') AS TIMESTAMP)):boolean>
-- !query 33 output
true


-- !query 34
select to_timestamp('2017-03-01 00:00:01') >= to_date('2017-03-01')
-- !query 34 schema
struct<(to_timestamp('2017-03-01 00:00:01') >= CAST(to_date('2017-03-01') AS TIMESTAMP)):boolean>
-- !query 34 output
true


-- !query 35
select to_date('2017-03-01') < to_timestamp('2017-03-01 00:00:01')
-- !query 35 schema
struct<(CAST(to_date('2017-03-01') AS TIMESTAMP) < to_timestamp('2017-03-01 00:00:01')):boolean>
-- !query 35 output
true


-- !query 36
select to_date('2017-03-01') <= to_timestamp('2017-03-01 00:00:01')
-- !query 36 schema
struct<(CAST(to_date('2017-03-01') AS TIMESTAMP) <= to_timestamp('2017-03-01 00:00:01')):boolean>
-- !query 36 output
true