Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,12 @@ case class CurrentBatchTimestamp(
*/
override protected def evalInternal(input: InternalRow): Any = toLiteral.value

def toLiteral: Literal = dataType match {
case _: TimestampType =>
Literal(DateTimeUtils.fromJavaTimestamp(new Timestamp(timestampMs)), TimestampType)
case _: DateType => Literal(DateTimeUtils.millisToDays(timestampMs, zoneId), DateType)
def toLiteral: Literal = {
val timestampUs = millisToMicros(timestampMs)
dataType match {
case _: TimestampType => Literal(timestampUs, TimestampType)
case _: DateType => Literal(microsToDays(timestampUs, zoneId), DateType)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import java.util.{Date, Locale}

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, localDateToDays}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf

sealed trait DateFormatter extends Serializable {
Expand Down Expand Up @@ -57,8 +58,8 @@ trait LegacyDateFormatter extends DateFormatter {
def formatDate(d: Date): String

override def parse(s: String): Int = {
val milliseconds = parseToDate(s).getTime
DateTimeUtils.millisToDays(milliseconds)
val micros = DateTimeUtils.millisToMicros(parseToDate(s).getTime)
DateTimeUtils.microsToDays(micros)
}

override def format(days: Int): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,22 @@ object DateTimeUtils {
TimeZone.getTimeZone(getZoneId(timeZoneId))
}

// we should use the exact day as Int, for example, (year, month, day) -> day
def millisToDays(millisUtc: Long): SQLDate = {
millisToDays(millisUtc, defaultTimeZone().toZoneId)
def microsToDays(timestamp: SQLTimestamp): SQLDate = {
microsToDays(timestamp, defaultTimeZone().toZoneId)
}

def millisToDays(millisUtc: Long, zoneId: ZoneId): SQLDate = {
val instant = microsToInstant(fromMillis(millisUtc))
def microsToDays(timestamp: SQLTimestamp, zoneId: ZoneId): SQLDate = {
val instant = microsToInstant(timestamp)
localDateToDays(LocalDateTime.ofInstant(instant, zoneId).toLocalDate)
}

// reverse of millisToDays
def daysToMillis(days: SQLDate): Long = {
daysToMillis(days, defaultTimeZone().toZoneId)
def daysToMicros(days: SQLDate): SQLTimestamp = {
daysToMicros(days, defaultTimeZone().toZoneId)
}

def daysToMillis(days: SQLDate, zoneId: ZoneId): Long = {
def daysToMicros(days: SQLDate, zoneId: ZoneId): SQLTimestamp = {
val instant = daysToLocalDate(days).atStartOfDay(zoneId).toInstant
toMillis(instantToMicros(instant))
instantToMicros(instant)
}

// Converts Timestamp to string according to Hive TimestampWritable convention.
Expand All @@ -88,14 +86,14 @@ object DateTimeUtils {
* Returns the number of days since epoch from java.sql.Date.
*/
def fromJavaDate(date: Date): SQLDate = {
millisToDays(date.getTime)
microsToDays(millisToMicros(date.getTime))
}

/**
* Returns a java.sql.Date from number of days since epoch.
*/
def toJavaDate(daysSinceEpoch: SQLDate): Date = {
new Date(daysToMillis(daysSinceEpoch))
new Date(microsToMillis(daysToMicros(daysSinceEpoch)))
}

/**
Expand Down Expand Up @@ -138,7 +136,7 @@ object DateTimeUtils {
* Converts the timestamp to milliseconds since epoch. In spark timestamp values have microseconds
* precision, so this conversion is lossy.
*/
def toMillis(us: SQLTimestamp): Long = {
def microsToMillis(us: SQLTimestamp): Long = {
// When the timestamp is negative i.e before 1970, we need to adjust the millseconds portion.
// Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision.
// In millis precision the above needs to be represented as (-157700927877).
Expand All @@ -148,7 +146,7 @@ object DateTimeUtils {
/*
* Converts milliseconds since epoch to SQLTimestamp.
*/
def fromMillis(millis: Long): SQLTimestamp = {
def millisToMicros(millis: Long): SQLTimestamp = {
Math.multiplyExact(millis, MICROS_PER_MILLIS)
}

Expand Down Expand Up @@ -574,10 +572,8 @@ object DateTimeUtils {
time2: SQLTimestamp,
roundOff: Boolean,
zoneId: ZoneId): Double = {
val millis1 = toMillis(time1)
val millis2 = toMillis(time2)
val date1 = millisToDays(millis1, zoneId)
val date2 = millisToDays(millis2, zoneId)
val date1 = microsToDays(time1, zoneId)
val date2 = microsToDays(time2, zoneId)
val (year1, monthInYear1, dayInMonth1, daysToMonthEnd1) = splitDate(date1)
val (year2, monthInYear2, dayInMonth2, daysToMonthEnd2) = splitDate(date2)

Expand All @@ -591,8 +587,8 @@ object DateTimeUtils {
}
// using milliseconds can cause precision loss with more than 8 digits
// we follow Hive's implementation which uses seconds
val secondsInDay1 = MILLISECONDS.toSeconds(millis1 - daysToMillis(date1, zoneId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we call Math.floorDiv?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Highly likely, yes. I will prepare a separate fix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error of rounding is invisible in dividing by DAYS.toSeconds(31). At least, I haven't reproduce the issue yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, this is "seconds in day", so it's always positive.

val secondsInDay2 = MILLISECONDS.toSeconds(millis2 - daysToMillis(date2, zoneId))
val secondsInDay1 = MICROSECONDS.toSeconds(time1 - daysToMicros(date1, zoneId))
val secondsInDay2 = MICROSECONDS.toSeconds(time2 - daysToMicros(date2, zoneId))
val secondsDiff = (dayInMonth1 - dayInMonth2) * SECONDS_PER_DAY + secondsInDay1 - secondsInDay2
val secondsInMonth = DAYS.toSeconds(31)
val diff = monthDiff + secondsDiff / secondsInMonth.toDouble
Expand Down Expand Up @@ -711,21 +707,17 @@ object DateTimeUtils {
def truncTimestamp(t: SQLTimestamp, level: Int, zoneId: ZoneId): SQLTimestamp = {
level match {
case TRUNC_TO_MICROSECOND => t
case TRUNC_TO_MILLISECOND =>
t - Math.floorMod(t, MICROS_PER_MILLIS)
case TRUNC_TO_SECOND =>
t - Math.floorMod(t, MICROS_PER_SECOND)
case TRUNC_TO_MINUTE =>
t - Math.floorMod(t, MICROS_PER_MINUTE)
case TRUNC_TO_HOUR => truncToUnit(t, zoneId, ChronoUnit.HOURS)
case TRUNC_TO_DAY => truncToUnit(t, zoneId, ChronoUnit.DAYS)
case _ =>
val millis = toMillis(t)
val truncated = level match {
case TRUNC_TO_MILLISECOND => millis
case TRUNC_TO_SECOND =>
millis - Math.floorMod(millis, MILLIS_PER_SECOND)
case TRUNC_TO_MINUTE =>
millis - Math.floorMod(millis, MILLIS_PER_MINUTE)
case _ => // Try to truncate date levels
val dDays = millisToDays(millis, zoneId)
daysToMillis(truncDate(dDays, level), zoneId)
}
fromMillis(truncated)
case _ => // Try to truncate date levels
val dDays = microsToDays(t, zoneId)
daysToMicros(truncDate(dDays, level), zoneId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import scala.util.control.NonFatal

import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.fromMillis
import org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToMicros
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
Expand Down Expand Up @@ -705,7 +705,7 @@ object IntervalUtils {
microseconds = Math.addExact(microseconds, minutesUs)
i += minuteStr.numBytes()
} else if (s.matchAt(millisStr, i)) {
val millisUs = fromMillis(currentValue)
val millisUs = millisToMicros(currentValue)
microseconds = Math.addExact(microseconds, millisUs)
i += millisStr.numBytes()
} else if (s.matchAt(microsStr, i)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class LegacyFastTimestampFormatter(
}
val micros = cal.getMicros()
cal.set(Calendar.MILLISECOND, 0)
Math.addExact(fromMillis(cal.getTimeInMillis), micros)
Math.addExact(millisToMicros(cal.getTimeInMillis), micros)
}

def format(timestamp: SQLTimestamp): String = {
Expand All @@ -164,7 +164,7 @@ class LegacySimpleTimestampFormatter(
}

override def parse(s: String): Long = {
fromMillis(sdf.parse(s).getTime)
millisToMicros(sdf.parse(s).getTime)
}

override def format(us: Long): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
dateOptions.dateFormat,
TimeZone.getTimeZone(dateOptions.zoneId),
dateOptions.locale)
val expectedDate = format.parse(customDate).getTime
val expectedDate = DateTimeUtils.millisToMicros(format.parse(customDate).getTime)
val castedDate = parser.makeConverter("_1", DateType, nullable = true)
.apply(customDate)
assert(castedDate == DateTimeUtils.millisToDays(expectedDate, ZoneOffset.UTC))
assert(castedDate == DateTimeUtils.microsToDays(expectedDate, ZoneOffset.UTC))

val timestamp = "2015-01-01 00:00:00"
timestampsOptions = new CSVOptions(Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,13 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(
cast(cast(new Timestamp(c.getTimeInMillis), StringType, timeZoneId),
TimestampType, timeZoneId),
fromMillis(c.getTimeInMillis))
millisToMicros(c.getTimeInMillis))
c = Calendar.getInstance(TimeZoneGMT)
c.set(2015, 10, 1, 2, 30, 0)
checkEvaluation(
cast(cast(new Timestamp(c.getTimeInMillis), StringType, timeZoneId),
TimestampType, timeZoneId),
fromMillis(c.getTimeInMillis))
millisToMicros(c.getTimeInMillis))
}

val gmtId = Option("GMT")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {

def toMillis(timestamp: String): Long = {
val tf = TimestampFormatter("yyyy-MM-dd HH:mm:ss", ZoneOffset.UTC)
DateTimeUtils.toMillis(tf.parse(timestamp))
DateTimeUtils.microsToMillis(tf.parse(timestamp))
}
val date = "2015-04-08 13:10:15"
val d = new Date(toMillis(date))
val time = "2013-11-08 13:10:15"
val ts = new Timestamp(toMillis(time))

test("datetime function current_date") {
val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis(), ZoneOffset.UTC)
val d0 = DateTimeUtils.currentDate(ZoneOffset.UTC)
val cd = CurrentDate(gmtId).eval(EmptyRow).asInstanceOf[Int]
val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis(), ZoneOffset.UTC)
val d1 = DateTimeUtils.currentDate(ZoneOffset.UTC)
assert(d0 <= cd && cd <= d1 && d1 - d0 <= 1)

val cdjst = CurrentDate(jstId).eval(EmptyRow).asInstanceOf[Int]
Expand Down Expand Up @@ -787,15 +787,15 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
1000L)
checkEvaluation(
UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
MILLISECONDS.toSeconds(
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
MICROSECONDS.toSeconds(
DateTimeUtils.daysToMicros(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
checkEvaluation(
UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))),
Literal(fmt2), timeZoneId),
-1000L)
checkEvaluation(UnixTimestamp(
Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId),
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
MICROSECONDS.toSeconds(DateTimeUtils.daysToMicros(
DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz.toZoneId)))
val t1 = UnixTimestamp(
CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
Expand All @@ -813,8 +813,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
null)
checkEvaluation(
UnixTimestamp(Literal(date1), Literal.create(null, StringType), timeZoneId),
MILLISECONDS.toSeconds(
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
MICROSECONDS.toSeconds(
DateTimeUtils.daysToMicros(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
checkEvaluation(
UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null)
}
Expand Down Expand Up @@ -851,16 +851,16 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
1000L)
checkEvaluation(
ToUnixTimestamp(Literal(date1), Literal(fmt1), timeZoneId),
MILLISECONDS.toSeconds(
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
MICROSECONDS.toSeconds(
DateTimeUtils.daysToMicros(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
checkEvaluation(
ToUnixTimestamp(
Literal(sdf2.format(new Timestamp(-1000000))),
Literal(fmt2), timeZoneId),
-1000L)
checkEvaluation(ToUnixTimestamp(
Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId),
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
MICROSECONDS.toSeconds(DateTimeUtils.daysToMicros(
DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz.toZoneId)))
val t1 = ToUnixTimestamp(
CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long]
Expand All @@ -875,8 +875,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
null)
checkEvaluation(ToUnixTimestamp(
Literal(date1), Literal.create(null, StringType), timeZoneId),
MILLISECONDS.toSeconds(
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
MICROSECONDS.toSeconds(
DateTimeUtils.daysToMicros(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
checkEvaluation(
ToUnixTimestamp(
Literal("2015-07-24"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.optimizer

import java.time.ZoneId

import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, Literal}
import org.apache.spark.sql.catalyst.plans.PlanTest
Expand Down Expand Up @@ -51,9 +53,9 @@ class ComputeCurrentTimeSuite extends PlanTest {
test("analyzer should replace current_date with literals") {
val in = Project(Seq(Alias(CurrentDate(), "a")(), Alias(CurrentDate(), "b")()), LocalRelation())

val min = DateTimeUtils.millisToDays(System.currentTimeMillis())
val min = DateTimeUtils.currentDate(ZoneId.systemDefault())
val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
val max = DateTimeUtils.millisToDays(System.currentTimeMillis())
val max = DateTimeUtils.currentDate(ZoneId.systemDefault())

val lits = new scala.collection.mutable.ArrayBuffer[Int]
plan.transformAllExpressions { case e: Literal =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {

test("SPARK-6785: java date conversion before and after epoch") {
def format(d: Date): String = {
TimestampFormatter("uuuu-MM-dd", defaultTimeZone().toZoneId).format(fromMillis(d.getTime))
TimestampFormatter("uuuu-MM-dd", defaultTimeZone().toZoneId)
.format(millisToMicros(d.getTime))
}
def checkFromToJavaDate(d1: Date): Unit = {
val d2 = toJavaDate(fromJavaDate(d1))
Expand Down Expand Up @@ -582,17 +583,17 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
}
}

test("daysToMillis and millisToDays") {
val input = toMillis(date(2015, 12, 31, 16, zid = zonePST))
assert(millisToDays(input, zonePST) === 16800)
assert(millisToDays(input, ZoneOffset.UTC) === 16801)
assert(millisToDays(-1 * MILLIS_PER_DAY + 1, ZoneOffset.UTC) == -1)
test("daysToMicros and microsToDays") {
val input = date(2015, 12, 31, 16, zid = zonePST)
assert(microsToDays(input, zonePST) === 16800)
assert(microsToDays(input, ZoneOffset.UTC) === 16801)
assert(microsToDays(-1 * MILLIS_PER_DAY + 1, ZoneOffset.UTC) == -1)

var expected = toMillis(date(2015, 12, 31, zid = zonePST))
assert(daysToMillis(16800, zonePST) === expected)
var expected = date(2015, 12, 31, zid = zonePST)
assert(daysToMicros(16800, zonePST) === expected)

expected = toMillis(date(2015, 12, 31, zid = zoneGMT))
assert(daysToMillis(16800, ZoneOffset.UTC) === expected)
expected = date(2015, 12, 31, zid = zoneGMT)
assert(daysToMicros(16800, ZoneOffset.UTC) === expected)

// There are some days are skipped entirely in some timezone, skip them here.
val skipped_days = Map[String, Set[Int]](
Expand All @@ -607,16 +608,16 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
val skipped = skipped_days.getOrElse(tz.getID, Set.empty)
(-20000 to 20000).foreach { d =>
if (!skipped.contains(d)) {
assert(millisToDays(daysToMillis(d, tz.toZoneId), tz.toZoneId) === d,
assert(microsToDays(daysToMicros(d, tz.toZoneId), tz.toZoneId) === d,
s"Round trip of ${d} did not work in tz ${tz}")
}
}
}
}

test("toMillis") {
assert(DateTimeUtils.toMillis(-9223372036844776001L) === -9223372036844777L)
assert(DateTimeUtils.toMillis(-157700927876544L) === -157700927877L)
test("microsToMillis") {
assert(DateTimeUtils.microsToMillis(-9223372036844776001L) === -9223372036844777L)
assert(DateTimeUtils.microsToMillis(-157700927876544L) === -157700927877L)
}

test("special timestamp values") {
Expand Down
Loading