diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 05074d97efa4..767dacfde073 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -131,10 +131,12 @@ case class CurrentBatchTimestamp( */ override protected def evalInternal(input: InternalRow): Any = toLiteral.value - def toLiteral: Literal = dataType match { - case _: TimestampType => - Literal(DateTimeUtils.fromJavaTimestamp(new Timestamp(timestampMs)), TimestampType) - case _: DateType => Literal(DateTimeUtils.millisToDays(timestampMs, zoneId), DateType) + def toLiteral: Literal = { + val timestampUs = millisToMicros(timestampMs) + dataType match { + case _: TimestampType => Literal(timestampUs, TimestampType) + case _: DateType => Literal(microsToDays(timestampUs, zoneId), DateType) + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 2cf82d1cfa17..941c8fcccd1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -23,7 +23,8 @@ import java.util.{Date, Locale} import org.apache.commons.lang3.time.FastDateFormat -import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, localDateToDays} +import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS +import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.internal.SQLConf sealed trait DateFormatter extends Serializable { @@ -57,8 +58,8 @@ trait LegacyDateFormatter extends DateFormatter { def formatDate(d: Date): String override def parse(s: String): Int = { - val milliseconds = parseToDate(s).getTime - DateTimeUtils.millisToDays(milliseconds) + val micros = DateTimeUtils.millisToMicros(parseToDate(s).getTime) + DateTimeUtils.microsToDays(micros) } override def format(days: Int): String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 2e4daa20ad51..731aa3502753 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -59,24 +59,22 @@ object DateTimeUtils { TimeZone.getTimeZone(getZoneId(timeZoneId)) } - // we should use the exact day as Int, for example, (year, month, day) -> day - def millisToDays(millisUtc: Long): SQLDate = { - millisToDays(millisUtc, defaultTimeZone().toZoneId) + def microsToDays(timestamp: SQLTimestamp): SQLDate = { + microsToDays(timestamp, defaultTimeZone().toZoneId) } - def millisToDays(millisUtc: Long, zoneId: ZoneId): SQLDate = { - val instant = microsToInstant(fromMillis(millisUtc)) + def microsToDays(timestamp: SQLTimestamp, zoneId: ZoneId): SQLDate = { + val instant = microsToInstant(timestamp) localDateToDays(LocalDateTime.ofInstant(instant, zoneId).toLocalDate) } - // reverse of millisToDays - def daysToMillis(days: SQLDate): Long = { - daysToMillis(days, defaultTimeZone().toZoneId) + def daysToMicros(days: SQLDate): SQLTimestamp = { + daysToMicros(days, defaultTimeZone().toZoneId) } - def daysToMillis(days: SQLDate, zoneId: ZoneId): Long = { + def daysToMicros(days: SQLDate, zoneId: ZoneId): SQLTimestamp = { val instant = daysToLocalDate(days).atStartOfDay(zoneId).toInstant - toMillis(instantToMicros(instant)) + instantToMicros(instant) } // Converts Timestamp to string according to Hive TimestampWritable convention. @@ -88,14 +86,14 @@ object DateTimeUtils { * Returns the number of days since epoch from java.sql.Date. */ def fromJavaDate(date: Date): SQLDate = { - millisToDays(date.getTime) + microsToDays(millisToMicros(date.getTime)) } /** * Returns a java.sql.Date from number of days since epoch. */ def toJavaDate(daysSinceEpoch: SQLDate): Date = { - new Date(daysToMillis(daysSinceEpoch)) + new Date(microsToMillis(daysToMicros(daysSinceEpoch))) } /** @@ -138,7 +136,7 @@ object DateTimeUtils { * Converts the timestamp to milliseconds since epoch. In spark timestamp values have microseconds * precision, so this conversion is lossy. */ - def toMillis(us: SQLTimestamp): Long = { + def microsToMillis(us: SQLTimestamp): Long = { // When the timestamp is negative i.e before 1970, we need to adjust the millseconds portion. // Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision. // In millis precision the above needs to be represented as (-157700927877). @@ -148,7 +146,7 @@ object DateTimeUtils { /* * Converts milliseconds since epoch to SQLTimestamp. */ - def fromMillis(millis: Long): SQLTimestamp = { + def millisToMicros(millis: Long): SQLTimestamp = { Math.multiplyExact(millis, MICROS_PER_MILLIS) } @@ -574,10 +572,8 @@ object DateTimeUtils { time2: SQLTimestamp, roundOff: Boolean, zoneId: ZoneId): Double = { - val millis1 = toMillis(time1) - val millis2 = toMillis(time2) - val date1 = millisToDays(millis1, zoneId) - val date2 = millisToDays(millis2, zoneId) + val date1 = microsToDays(time1, zoneId) + val date2 = microsToDays(time2, zoneId) val (year1, monthInYear1, dayInMonth1, daysToMonthEnd1) = splitDate(date1) val (year2, monthInYear2, dayInMonth2, daysToMonthEnd2) = splitDate(date2) @@ -591,8 +587,8 @@ object DateTimeUtils { } // using milliseconds can cause precision loss with more than 8 digits // we follow Hive's implementation which uses seconds - val secondsInDay1 = MILLISECONDS.toSeconds(millis1 - daysToMillis(date1, zoneId)) - val secondsInDay2 = MILLISECONDS.toSeconds(millis2 - daysToMillis(date2, zoneId)) + val secondsInDay1 = MICROSECONDS.toSeconds(time1 - daysToMicros(date1, zoneId)) + val secondsInDay2 = MICROSECONDS.toSeconds(time2 - daysToMicros(date2, zoneId)) val secondsDiff = (dayInMonth1 - dayInMonth2) * SECONDS_PER_DAY + secondsInDay1 - secondsInDay2 val secondsInMonth = DAYS.toSeconds(31) val diff = monthDiff + secondsDiff / secondsInMonth.toDouble @@ -711,21 +707,17 @@ object DateTimeUtils { def truncTimestamp(t: SQLTimestamp, level: Int, zoneId: ZoneId): SQLTimestamp = { level match { case TRUNC_TO_MICROSECOND => t + case TRUNC_TO_MILLISECOND => + t - Math.floorMod(t, MICROS_PER_MILLIS) + case TRUNC_TO_SECOND => + t - Math.floorMod(t, MICROS_PER_SECOND) + case TRUNC_TO_MINUTE => + t - Math.floorMod(t, MICROS_PER_MINUTE) case TRUNC_TO_HOUR => truncToUnit(t, zoneId, ChronoUnit.HOURS) case TRUNC_TO_DAY => truncToUnit(t, zoneId, ChronoUnit.DAYS) - case _ => - val millis = toMillis(t) - val truncated = level match { - case TRUNC_TO_MILLISECOND => millis - case TRUNC_TO_SECOND => - millis - Math.floorMod(millis, MILLIS_PER_SECOND) - case TRUNC_TO_MINUTE => - millis - Math.floorMod(millis, MILLIS_PER_MINUTE) - case _ => // Try to truncate date levels - val dDays = millisToDays(millis, zoneId) - daysToMillis(truncDate(dDays, level), zoneId) - } - fromMillis(truncated) + case _ => // Try to truncate date levels + val dDays = microsToDays(t, zoneId) + daysToMicros(truncDate(dDays, level), zoneId) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala index 2790f8e99835..7611ab93f561 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit import scala.util.control.NonFatal import org.apache.spark.sql.catalyst.util.DateTimeConstants._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils.fromMillis +import org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToMicros import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.Decimal import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -705,7 +705,7 @@ object IntervalUtils { microseconds = Math.addExact(microseconds, minutesUs) i += minuteStr.numBytes() } else if (s.matchAt(millisStr, i)) { - val millisUs = fromMillis(currentValue) + val millisUs = millisToMicros(currentValue) microseconds = Math.addExact(microseconds, millisUs) i += millisStr.numBytes() } else if (s.matchAt(microsStr, i)) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index fd77326d6b01..b70a4edd5386 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -141,7 +141,7 @@ class LegacyFastTimestampFormatter( } val micros = cal.getMicros() cal.set(Calendar.MILLISECOND, 0) - Math.addExact(fromMillis(cal.getTimeInMillis), micros) + Math.addExact(millisToMicros(cal.getTimeInMillis), micros) } def format(timestamp: SQLTimestamp): String = { @@ -164,7 +164,7 @@ class LegacySimpleTimestampFormatter( } override def parse(s: String): Long = { - fromMillis(sdf.parse(s).getTime) + millisToMicros(sdf.parse(s).getTime) } override def format(us: Long): String = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 536c76f042d2..8c5df2660f31 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -135,10 +135,10 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { dateOptions.dateFormat, TimeZone.getTimeZone(dateOptions.zoneId), dateOptions.locale) - val expectedDate = format.parse(customDate).getTime + val expectedDate = DateTimeUtils.millisToMicros(format.parse(customDate).getTime) val castedDate = parser.makeConverter("_1", DateType, nullable = true) .apply(customDate) - assert(castedDate == DateTimeUtils.millisToDays(expectedDate, ZoneOffset.UTC)) + assert(castedDate == DateTimeUtils.microsToDays(expectedDate, ZoneOffset.UTC)) val timestamp = "2015-01-01 00:00:00" timestampsOptions = new CSVOptions(Map( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index bde95f0e3add..302a246c3937 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -271,13 +271,13 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation( cast(cast(new Timestamp(c.getTimeInMillis), StringType, timeZoneId), TimestampType, timeZoneId), - fromMillis(c.getTimeInMillis)) + millisToMicros(c.getTimeInMillis)) c = Calendar.getInstance(TimeZoneGMT) c.set(2015, 10, 1, 2, 30, 0) checkEvaluation( cast(cast(new Timestamp(c.getTimeInMillis), StringType, timeZoneId), TimestampType, timeZoneId), - fromMillis(c.getTimeInMillis)) + millisToMicros(c.getTimeInMillis)) } val gmtId = Option("GMT") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index f1ad75321776..e43eb594286c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -47,7 +47,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { def toMillis(timestamp: String): Long = { val tf = TimestampFormatter("yyyy-MM-dd HH:mm:ss", ZoneOffset.UTC) - DateTimeUtils.toMillis(tf.parse(timestamp)) + DateTimeUtils.microsToMillis(tf.parse(timestamp)) } val date = "2015-04-08 13:10:15" val d = new Date(toMillis(date)) @@ -55,9 +55,9 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val ts = new Timestamp(toMillis(time)) test("datetime function current_date") { - val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis(), ZoneOffset.UTC) + val d0 = DateTimeUtils.currentDate(ZoneOffset.UTC) val cd = CurrentDate(gmtId).eval(EmptyRow).asInstanceOf[Int] - val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis(), ZoneOffset.UTC) + val d1 = DateTimeUtils.currentDate(ZoneOffset.UTC) assert(d0 <= cd && cd <= d1 && d1 - d0 <= 1) val cdjst = CurrentDate(jstId).eval(EmptyRow).asInstanceOf[Int] @@ -787,15 +787,15 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { 1000L) checkEvaluation( UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), - MILLISECONDS.toSeconds( - DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId))) + MICROSECONDS.toSeconds( + DateTimeUtils.daysToMicros(DateTimeUtils.fromJavaDate(date1), tz.toZoneId))) checkEvaluation( UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2), timeZoneId), -1000L) checkEvaluation(UnixTimestamp( Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis( + MICROSECONDS.toSeconds(DateTimeUtils.daysToMicros( DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz.toZoneId))) val t1 = UnixTimestamp( CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long] @@ -813,8 +813,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { null) checkEvaluation( UnixTimestamp(Literal(date1), Literal.create(null, StringType), timeZoneId), - MILLISECONDS.toSeconds( - DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId))) + MICROSECONDS.toSeconds( + DateTimeUtils.daysToMicros(DateTimeUtils.fromJavaDate(date1), tz.toZoneId))) checkEvaluation( UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null) } @@ -851,8 +851,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { 1000L) checkEvaluation( ToUnixTimestamp(Literal(date1), Literal(fmt1), timeZoneId), - MILLISECONDS.toSeconds( - DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId))) + MICROSECONDS.toSeconds( + DateTimeUtils.daysToMicros(DateTimeUtils.fromJavaDate(date1), tz.toZoneId))) checkEvaluation( ToUnixTimestamp( Literal(sdf2.format(new Timestamp(-1000000))), @@ -860,7 +860,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { -1000L) checkEvaluation(ToUnixTimestamp( Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis( + MICROSECONDS.toSeconds(DateTimeUtils.daysToMicros( DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz.toZoneId))) val t1 = ToUnixTimestamp( CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long] @@ -875,8 +875,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { null) checkEvaluation(ToUnixTimestamp( Literal(date1), Literal.create(null, StringType), timeZoneId), - MILLISECONDS.toSeconds( - DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId))) + MICROSECONDS.toSeconds( + DateTimeUtils.daysToMicros(DateTimeUtils.fromJavaDate(date1), tz.toZoneId))) checkEvaluation( ToUnixTimestamp( Literal("2015-07-24"), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala index 10ed4e46ddd1..db0399d2a73e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.optimizer +import java.time.ZoneId + import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, Literal} import org.apache.spark.sql.catalyst.plans.PlanTest @@ -51,9 +53,9 @@ class ComputeCurrentTimeSuite extends PlanTest { test("analyzer should replace current_date with literals") { val in = Project(Seq(Alias(CurrentDate(), "a")(), Alias(CurrentDate(), "b")()), LocalRelation()) - val min = DateTimeUtils.millisToDays(System.currentTimeMillis()) + val min = DateTimeUtils.currentDate(ZoneId.systemDefault()) val plan = Optimize.execute(in.analyze).asInstanceOf[Project] - val max = DateTimeUtils.millisToDays(System.currentTimeMillis()) + val max = DateTimeUtils.currentDate(ZoneId.systemDefault()) val lits = new scala.collection.mutable.ArrayBuffer[Int] plan.transformAllExpressions { case e: Literal => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 2e689be01f33..1465b066434b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -89,7 +89,8 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { test("SPARK-6785: java date conversion before and after epoch") { def format(d: Date): String = { - TimestampFormatter("uuuu-MM-dd", defaultTimeZone().toZoneId).format(fromMillis(d.getTime)) + TimestampFormatter("uuuu-MM-dd", defaultTimeZone().toZoneId) + .format(millisToMicros(d.getTime)) } def checkFromToJavaDate(d1: Date): Unit = { val d2 = toJavaDate(fromJavaDate(d1)) @@ -582,17 +583,17 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { } } - test("daysToMillis and millisToDays") { - val input = toMillis(date(2015, 12, 31, 16, zid = zonePST)) - assert(millisToDays(input, zonePST) === 16800) - assert(millisToDays(input, ZoneOffset.UTC) === 16801) - assert(millisToDays(-1 * MILLIS_PER_DAY + 1, ZoneOffset.UTC) == -1) + test("daysToMicros and microsToDays") { + val input = date(2015, 12, 31, 16, zid = zonePST) + assert(microsToDays(input, zonePST) === 16800) + assert(microsToDays(input, ZoneOffset.UTC) === 16801) + assert(microsToDays(-1 * MILLIS_PER_DAY + 1, ZoneOffset.UTC) == -1) - var expected = toMillis(date(2015, 12, 31, zid = zonePST)) - assert(daysToMillis(16800, zonePST) === expected) + var expected = date(2015, 12, 31, zid = zonePST) + assert(daysToMicros(16800, zonePST) === expected) - expected = toMillis(date(2015, 12, 31, zid = zoneGMT)) - assert(daysToMillis(16800, ZoneOffset.UTC) === expected) + expected = date(2015, 12, 31, zid = zoneGMT) + assert(daysToMicros(16800, ZoneOffset.UTC) === expected) // There are some days are skipped entirely in some timezone, skip them here. val skipped_days = Map[String, Set[Int]]( @@ -607,16 +608,16 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { val skipped = skipped_days.getOrElse(tz.getID, Set.empty) (-20000 to 20000).foreach { d => if (!skipped.contains(d)) { - assert(millisToDays(daysToMillis(d, tz.toZoneId), tz.toZoneId) === d, + assert(microsToDays(daysToMicros(d, tz.toZoneId), tz.toZoneId) === d, s"Round trip of ${d} did not work in tz ${tz}") } } } } - test("toMillis") { - assert(DateTimeUtils.toMillis(-9223372036844776001L) === -9223372036844777L) - assert(DateTimeUtils.toMillis(-157700927876544L) === -157700927877L) + test("microsToMillis") { + assert(DateTimeUtils.microsToMillis(-9223372036844776001L) === -9223372036844777L) + assert(DateTimeUtils.microsToMillis(-157700927876544L) === -157700927877L) } test("special timestamp values") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala index d0d79acd7388..2f0a68189172 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.util.DateTimeConstants._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils.fromMillis +import org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToMicros import org.apache.spark.sql.catalyst.util.IntervalUtils._ import org.apache.spark.sql.catalyst.util.IntervalUtils.IntervalUnit._ import org.apache.spark.sql.internal.SQLConf @@ -77,7 +77,7 @@ class IntervalUtilsSuite extends SparkFunSuite with SQLHelper { testSingleUnit("HouR", 3, 0, 0, 3 * MICROS_PER_HOUR) testSingleUnit("MiNuTe", 3, 0, 0, 3 * MICROS_PER_MINUTE) testSingleUnit("Second", 3, 0, 0, 3 * MICROS_PER_SECOND) - testSingleUnit("MilliSecond", 3, 0, 0, fromMillis(3)) + testSingleUnit("MilliSecond", 3, 0, 0, millisToMicros(3)) testSingleUnit("MicroSecond", 3, 0, 0, 3) checkFromInvalidString(null, "cannot be null") @@ -176,7 +176,7 @@ class IntervalUtilsSuite extends SparkFunSuite with SQLHelper { new CalendarInterval( 0, 10, - 12 * MICROS_PER_MINUTE + fromMillis(888))) + 12 * MICROS_PER_MINUTE + millisToMicros(888))) assert(fromDayTimeString("-3 0:0:0") === new CalendarInterval(0, -3, 0L)) try { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 329465544979..6fccb629a381 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -298,7 +298,7 @@ private void decodeDictionaryIds( for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { column.putLong(i, - DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i)))); + DateTimeUtils.millisToMicros(dictionary.decodeToLong(dictionaryIds.getDictId(i)))); } } } else { @@ -432,7 +432,7 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro } else if (originalType == OriginalType.TIMESTAMP_MILLIS) { for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { - column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong())); + column.putLong(rowId + i, DateTimeUtils.millisToMicros(dataColumn.readLong())); } else { column.putNull(rowId + i); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala index fda4e148b640..637ce68ec05a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala @@ -111,7 +111,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister { case (PATH, i) => writer.write(i, UTF8String.fromString(status.getPath.toString)) case (LENGTH, i) => writer.write(i, status.getLen) case (MODIFICATION_TIME, i) => - writer.write(i, DateTimeUtils.fromMillis(status.getModificationTime)) + writer.write(i, DateTimeUtils.millisToMicros(status.getModificationTime)) case (CONTENT, i) => if (status.getLen > maxLength) { throw new SparkException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 850adae8a6b9..ff686d024f11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -272,7 +272,7 @@ private[parquet] class ParquetRowConverter( case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS => new ParquetPrimitiveConverter(updater) { override def addLong(value: Long): Unit = { - updater.setLong(DateTimeUtils.fromMillis(value)) + updater.setLong(DateTimeUtils.millisToMicros(value)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index f6490614ab05..bfa33ea23739 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -183,7 +183,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS => (row: SpecializedGetters, ordinal: Int) => - val millis = DateTimeUtils.toMillis(row.getLong(ordinal)) + val millis = DateTimeUtils.microsToMillis(row.getLong(ordinal)) recordConsumer.addLong(millis) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index 7dd52c1feabf..20fb06a851dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark -import org.apache.spark.sql.catalyst.util.DateTimeUtils.toMillis +import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToMillis import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.unsafe.types.CalendarInterval @@ -100,7 +100,7 @@ case class EventTimeWatermarkExec( child.execute().mapPartitions { iter => val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output) iter.map { row => - eventTimeStats.add(toMillis(getEventTime(row).getLong(0))) + eventTimeStats.add(microsToMillis(getEventTime(row).getLong(0))) row } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala index d40208f02e7b..f29970d5de19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY -import org.apache.spark.sql.catalyst.util.DateTimeUtils.toMillis +import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToMillis import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.streaming.Trigger import org.apache.spark.unsafe.types.UTF8String @@ -38,7 +38,7 @@ private object Triggers { throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") } val microsInDays = Math.multiplyExact(cal.days, MICROS_PER_DAY) - toMillis(Math.addExact(cal.microseconds, microsInDays)) + microsToMillis(Math.addExact(cal.microseconds, microsInDays)) } def convert(interval: Duration): Long = interval.toMillis diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala index e66a1fe48a2e..08840496b052 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala @@ -146,7 +146,7 @@ class RateStreamContinuousPartitionReader( } currentRow = InternalRow( - DateTimeUtils.fromMillis(nextReadTime), + DateTimeUtils.millisToMicros(nextReadTime), currentValue) true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala index eb6baf698a5b..2e24de4f0d14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala @@ -191,7 +191,7 @@ class RateStreamMicroBatchPartitionReader( val currValue = rangeStart + partitionId + numPartitions * count count += 1 val relative = math.round((currValue - rangeStart) * relativeMsPerValue) - InternalRow(DateTimeUtils.fromMillis(relative + localStartTimeMs), currValue) + InternalRow(DateTimeUtils.millisToMicros(relative + localStartTimeMs), currValue) } override def close(): Unit = {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala index 97a657683251..04431f3d381a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala @@ -85,7 +85,7 @@ class TextSocketMicroBatchStream(host: String, port: Int, numPartitions: Int) TextSocketMicroBatchStream.this.synchronized { val newData = ( UTF8String.fromString(line), - DateTimeUtils.fromMillis(Calendar.getInstance().getTimeInMillis) + DateTimeUtils.millisToMicros(Calendar.getInstance().getTimeInMillis) ) currentOffset += 1 batches.append(newData) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 527dfe56c228..fd65f7513aa6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat -import java.time.{Instant, LocalDateTime} +import java.time.{Instant, LocalDateTime, ZoneId} import java.util.{Locale, TimeZone} import java.util.concurrent.TimeUnit @@ -35,11 +35,11 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { test("function current_date") { val df1 = Seq((1, 2), (3, 1)).toDF("a", "b") - val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis()) + val d0 = DateTimeUtils.currentDate(ZoneId.systemDefault()) val d1 = DateTimeUtils.fromJavaDate(df1.select(current_date()).collect().head.getDate(0)) val d2 = DateTimeUtils.fromJavaDate( sql("""SELECT CURRENT_DATE()""").collect().head.getDate(0)) - val d3 = DateTimeUtils.millisToDays(System.currentTimeMillis()) + val d3 = DateTimeUtils.currentDate(ZoneId.systemDefault()) assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala index b6ea26ab9554..3d7777bff09b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala @@ -51,10 +51,10 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils private val d2 = Date.valueOf(d2Str) private val t1Str = "2016-05-08 00:00:01.000000" private val t1Internal = date(2016, 5, 8, 0, 0, 1) - private val t1 = new Timestamp(DateTimeUtils.toMillis(t1Internal)) + private val t1 = new Timestamp(DateTimeUtils.microsToMillis(t1Internal)) private val t2Str = "2016-05-09 00:00:02.000000" private val t2Internal = date(2016, 5, 9, 0, 0, 2) - private val t2 = new Timestamp(DateTimeUtils.toMillis(t2Internal)) + private val t2 = new Timestamp(DateTimeUtils.microsToMillis(t2Internal)) /** * Define a very simple 3 row table used for testing column serialization. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 7abe818a29d9..92f862a54297 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -123,7 +123,7 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ssXXX"))) val ISO8601Date = "1970-01-01" - checkTypePromotion(DateTimeUtils.millisToDays(32400000), + checkTypePromotion(DateTimeUtils.microsToDays(32400000000L), enforceCorrectType(ISO8601Date, DateType)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index b6618826487c..bbbf7e5c4697 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.{LocalLimitExec, SimpleMode, SparkPlan} import org.apache.spark.sql.execution.command.ExplainCommand @@ -1216,7 +1217,7 @@ class StreamSuite extends StreamTest { } var lastTimestamp = System.currentTimeMillis() - val currentDate = DateTimeUtils.millisToDays(lastTimestamp) + val currentDate = DateTimeUtils.microsToDays(DateTimeUtils.millisToMicros(lastTimestamp)) testStream(df) ( AddData(input, 1), CheckLastBatch { rows: Seq[Row] =>