Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ case class CurrentBatchTimestamp(
def toLiteral: Literal = dataType match {
case _: TimestampType =>
Literal(DateTimeUtils.fromJavaTimestamp(new Timestamp(timestampMs)), TimestampType)
case _: DateType => Literal(DateTimeUtils.millisToDays(timestampMs, zoneId), DateType)
case _: DateType => Literal(DateTimeUtils.microsToDays(timestampMs, zoneId), DateType)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ trait LegacyDateFormatter extends DateFormatter {

override def parse(s: String): Int = {
val milliseconds = parseToDate(s).getTime
DateTimeUtils.millisToDays(milliseconds)
DateTimeUtils.microsToDays(milliseconds)
}

override def format(days: Int): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,23 @@ object DateTimeUtils {
}

// we should use the exact day as Int, for example, (year, month, day) -> day
def millisToDays(millisUtc: Long): SQLDate = {
millisToDays(millisUtc, defaultTimeZone().toZoneId)
def microsToDays(microsUtc: Long): SQLDate = {
microsToDays(microsUtc, defaultTimeZone().toZoneId)
}

def millisToDays(millisUtc: Long, zoneId: ZoneId): SQLDate = {
val instant = microsToInstant(Math.multiplyExact(millisUtc, MICROS_PER_MILLIS))
def microsToDays(microsUtc: Long, zoneId: ZoneId): SQLDate = {
val instant = microsToInstant(microsUtc)
localDateToDays(LocalDateTime.ofInstant(instant, zoneId).toLocalDate)
}

// reverse of millisToDays
def daysToMillis(days: SQLDate): Long = {
daysToMillis(days, defaultTimeZone().toZoneId)
def daysToMicros(days: SQLDate): Long = {
daysToMicros(days, defaultTimeZone().toZoneId)
}

def daysToMillis(days: SQLDate, zoneId: ZoneId): Long = {
def daysToMicros(days: SQLDate, zoneId: ZoneId): Long = {
val instant = daysToLocalDate(days).atStartOfDay(zoneId).toInstant
instantToMicros(instant) / MICROS_PER_MILLIS
instantToMicros(instant)
}

// Converts Timestamp to string according to Hive TimestampWritable convention.
Expand All @@ -88,14 +88,14 @@ object DateTimeUtils {
* Returns the number of days since epoch from java.sql.Date.
*/
def fromJavaDate(date: Date): SQLDate = {
millisToDays(date.getTime)
microsToDays(date.getTime)
}

/**
* Returns a java.sql.Date from number of days since epoch.
*/
def toJavaDate(daysSinceEpoch: SQLDate): Date = {
new Date(daysToMillis(daysSinceEpoch))
new Date(daysToMicros(daysSinceEpoch))
}

/**
Expand Down Expand Up @@ -572,10 +572,10 @@ object DateTimeUtils {
time2: SQLTimestamp,
roundOff: Boolean,
zoneId: ZoneId): Double = {
val millis1 = MICROSECONDS.toMillis(time1)
val millis2 = MICROSECONDS.toMillis(time2)
val date1 = millisToDays(millis1, zoneId)
val date2 = millisToDays(millis2, zoneId)
val micros1 = MICROSECONDS.toMicros(time1)
val micros2 = MICROSECONDS.toMicros(time2)
val date1 = microsToDays(micros1, zoneId)
val date2 = microsToDays(micros2, zoneId)
val (year1, monthInYear1, dayInMonth1, daysToMonthEnd1) = splitDate(date1)
val (year2, monthInYear2, dayInMonth2, daysToMonthEnd2) = splitDate(date2)

Expand All @@ -589,8 +589,8 @@ object DateTimeUtils {
}
// using milliseconds can cause precision loss with more than 8 digits
// we follow Hive's implementation which uses seconds
val secondsInDay1 = MILLISECONDS.toSeconds(millis1 - daysToMillis(date1, zoneId))
val secondsInDay2 = MILLISECONDS.toSeconds(millis2 - daysToMillis(date2, zoneId))
val secondsInDay1 = MICROSECONDS.toSeconds(micros1 - daysToMicros(date1, zoneId))
val secondsInDay2 = MICROSECONDS.toSeconds(micros2 - daysToMicros(date2, zoneId))
val secondsDiff = (dayInMonth1 - dayInMonth2) * SECONDS_PER_DAY + secondsInDay1 - secondsInDay2
val secondsInMonth = DAYS.toSeconds(31)
val diff = monthDiff + secondsDiff / secondsInMonth.toDouble
Expand Down Expand Up @@ -712,18 +712,18 @@ object DateTimeUtils {
case TRUNC_TO_HOUR => truncToUnit(t, zoneId, ChronoUnit.HOURS)
case TRUNC_TO_DAY => truncToUnit(t, zoneId, ChronoUnit.DAYS)
case _ =>
val millis = MICROSECONDS.toMillis(t)
val micros = MICROSECONDS.toMicros(t)
val truncated = level match {
case TRUNC_TO_MILLISECOND => millis
case TRUNC_TO_MILLISECOND => MICROSECONDS.toMillis(t) * MICROS_PER_MILLIS
case TRUNC_TO_SECOND =>
millis - Math.floorMod(millis, MILLIS_PER_SECOND)
micros - Math.floorMod(micros, MICROS_PER_SECOND)
case TRUNC_TO_MINUTE =>
millis - Math.floorMod(millis, MILLIS_PER_MINUTE)
micros - Math.floorMod(micros, MICROS_PER_MINUTE)
case _ => // Try to truncate date levels
val dDays = millisToDays(millis, zoneId)
daysToMillis(truncDate(dDays, level), zoneId)
val dDays = microsToDays(micros, zoneId)
daysToMicros(truncDate(dDays, level), zoneId)
}
truncated * MICROS_PER_MILLIS
truncated
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
val expectedDate = format.parse(customDate).getTime
val castedDate = parser.makeConverter("_1", DateType, nullable = true)
.apply(customDate)
assert(castedDate == DateTimeUtils.millisToDays(expectedDate, ZoneOffset.UTC))
assert(castedDate == DateTimeUtils.microsToDays(expectedDate, ZoneOffset.UTC))

val timestamp = "2015-01-01 00:00:00"
timestampsOptions = new CSVOptions(Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val ts = new Timestamp(toMillis(time))

test("datetime function current_date") {
val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis(), ZoneOffset.UTC)
val d0 = DateTimeUtils.microsToDays(System.currentTimeMillis(), ZoneOffset.UTC)
val cd = CurrentDate(gmtId).eval(EmptyRow).asInstanceOf[Int]
val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis(), ZoneOffset.UTC)
val d1 = DateTimeUtils.microsToDays(System.currentTimeMillis(), ZoneOffset.UTC)
assert(d0 <= cd && cd <= d1 && d1 - d0 <= 1)

val cdjst = CurrentDate(jstId).eval(EmptyRow).asInstanceOf[Int]
Expand Down Expand Up @@ -789,14 +789,14 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(
UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
MILLISECONDS.toSeconds(
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
DateTimeUtils.daysToMicros(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
checkEvaluation(
UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))),
Literal(fmt2), timeZoneId),
-1000L)
checkEvaluation(UnixTimestamp(
Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId),
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
MILLISECONDS.toSeconds(DateTimeUtils.daysToMicros(
DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz.toZoneId)))
val t1 = UnixTimestamp(
CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
Expand All @@ -815,7 +815,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(
UnixTimestamp(Literal(date1), Literal.create(null, StringType), timeZoneId),
MILLISECONDS.toSeconds(
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
DateTimeUtils.daysToMicros(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
checkEvaluation(
UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null)
}
Expand Down Expand Up @@ -853,15 +853,15 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(
ToUnixTimestamp(Literal(date1), Literal(fmt1), timeZoneId),
MILLISECONDS.toSeconds(
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
DateTimeUtils.daysToMicros(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
checkEvaluation(
ToUnixTimestamp(
Literal(sdf2.format(new Timestamp(-1000000))),
Literal(fmt2), timeZoneId),
-1000L)
checkEvaluation(ToUnixTimestamp(
Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId),
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
MILLISECONDS.toSeconds(DateTimeUtils.daysToMicros(
DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz.toZoneId)))
val t1 = ToUnixTimestamp(
CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long]
Expand All @@ -877,7 +877,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(ToUnixTimestamp(
Literal(date1), Literal.create(null, StringType), timeZoneId),
MILLISECONDS.toSeconds(
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
DateTimeUtils.daysToMicros(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
checkEvaluation(
ToUnixTimestamp(
Literal("2015-07-24"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class ComputeCurrentTimeSuite extends PlanTest {
test("analyzer should replace current_date with literals") {
val in = Project(Seq(Alias(CurrentDate(), "a")(), Alias(CurrentDate(), "b")()), LocalRelation())

val min = DateTimeUtils.millisToDays(System.currentTimeMillis())
val min = DateTimeUtils.microsToDays(System.currentTimeMillis())
val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
val max = DateTimeUtils.millisToDays(System.currentTimeMillis())
val max = DateTimeUtils.microsToDays(System.currentTimeMillis())

val lits = new scala.collection.mutable.ArrayBuffer[Int]
plan.transformAllExpressions { case e: Literal =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
test("SPARK-6785: java date conversion before and after epoch") {
def format(d: Date): String = {
TimestampFormatter("uuuu-MM-dd", defaultTimeZone().toZoneId)
.format(d.getTime * MICROS_PER_MILLIS)
.format(d.getTime)
}
def checkFromToJavaDate(d1: Date): Unit = {
val d2 = toJavaDate(fromJavaDate(d1))
Expand Down Expand Up @@ -584,16 +584,16 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
}

test("daysToMillis and millisToDays") {
val input = TimeUnit.MICROSECONDS.toMillis(date(2015, 12, 31, 16, zid = zonePST))
assert(millisToDays(input, zonePST) === 16800)
assert(millisToDays(input, ZoneOffset.UTC) === 16801)
assert(millisToDays(-1 * MILLIS_PER_DAY + 1, ZoneOffset.UTC) == -1)
val input = TimeUnit.MICROSECONDS.toMicros(date(2015, 12, 31, 16, zid = zonePST))
assert(microsToDays(input, zonePST) === 16800)
assert(microsToDays(input, ZoneOffset.UTC) === 16801)
assert(microsToDays(-1 * MILLIS_PER_DAY + 1, ZoneOffset.UTC) == -1)

var expected = TimeUnit.MICROSECONDS.toMillis(date(2015, 12, 31, zid = zonePST))
assert(daysToMillis(16800, zonePST) === expected)
var expected = TimeUnit.MICROSECONDS.toMicros(date(2015, 12, 31, zid = zonePST))
assert(daysToMicros(16800, zonePST) === expected)

expected = TimeUnit.MICROSECONDS.toMillis(date(2015, 12, 31, zid = zoneGMT))
assert(daysToMillis(16800, ZoneOffset.UTC) === expected)
expected = TimeUnit.MICROSECONDS.toMicros(date(2015, 12, 31, zid = zoneGMT))
assert(daysToMicros(16800, ZoneOffset.UTC) === expected)

// There are some days are skipped entirely in some timezone, skip them here.
val skipped_days = Map[String, Set[Int]](
Expand All @@ -608,7 +608,7 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
val skipped = skipped_days.getOrElse(tz.getID, Set.empty)
(-20000 to 20000).foreach { d =>
if (!skipped.contains(d)) {
assert(millisToDays(daysToMillis(d, tz.toZoneId), tz.toZoneId) === d,
assert(microsToDays(daysToMicros(d, tz.toZoneId), tz.toZoneId) === d,
s"Round trip of ${d} did not work in tz ${tz}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {

test("function current_date") {
val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())
val d0 = DateTimeUtils.microsToDays(System.currentTimeMillis())
val d1 = DateTimeUtils.fromJavaDate(df1.select(current_date()).collect().head.getDate(0))
val d2 = DateTimeUtils.fromJavaDate(
sql("""SELECT CURRENT_DATE()""").collect().head.getDate(0))
val d3 = DateTimeUtils.millisToDays(System.currentTimeMillis())
val d3 = DateTimeUtils.microsToDays(System.currentTimeMillis())
assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ssXXX")))

val ISO8601Date = "1970-01-01"
checkTypePromotion(DateTimeUtils.millisToDays(32400000),
checkTypePromotion(DateTimeUtils.microsToDays(32400000),
enforceCorrectType(ISO8601Date, DateType))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,7 @@ class StreamSuite extends StreamTest {
}

var lastTimestamp = System.currentTimeMillis()
val currentDate = DateTimeUtils.millisToDays(lastTimestamp)
val currentDate = DateTimeUtils.microsToDays(lastTimestamp)
testStream(df) (
AddData(input, 1),
CheckLastBatch { rows: Seq[Row] =>
Expand Down