Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Merge branch 'master' of github.com:Peng-Lei/spark into SPARK-35139
  • Loading branch information
Peng-Lei committed Apr 27, 2021
commit b6f1dc00c7a8ae9eb5ef9133596142b6c3cd58ac
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,8 @@ private static class IntervalDayAccessor extends ArrowVectorAccessor {
@Override
long getLong(int rowId) {
accessor.get(rowId, intervalDayHolder);
return Math.addExact(
intervalDayHolder.days * MICROS_PER_DAY,
intervalDayHolder.milliseconds * MICROS_PER_MILLIS);
return Math.addExact(Math.multiplyExact(intervalDayHolder.days, MICROS_PER_DAY),
intervalDayHolder.milliseconds * MICROS_PER_MILLIS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._

import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.complex.MapVector
import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, IntervalUnit, TimeUnit, Types}
import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, IntervalUnit, TimeUnit}
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}

import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -54,8 +54,8 @@ private[sql] object ArrowUtils {
new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZoneId)
}
case NullType => ArrowType.Null.INSTANCE
case YearMonthIntervalType => Types.MinorType.INTERVALYEAR.getType
case DayTimeIntervalType => Types.MinorType.INTERVALDAY.getType
case YearMonthIntervalType => new ArrowType.Interval(IntervalUnit.YEAR_MONTH)
case DayTimeIntervalType => new ArrowType.Interval(IntervalUnit.DAY_TIME)
case _ =>
throw new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class ArrowUtilsSuite extends SparkFunSuite {
roundtrip(BinaryType)
roundtrip(DecimalType.SYSTEM_DEFAULT)
roundtrip(DateType)
roundtrip(YearMonthIntervalType)
roundtrip(DayTimeIntervalType)
val tsExMsg = intercept[UnsupportedOperationException] {
roundtrip(TimestampType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,5 +421,4 @@ private[arrow] class IntervalDayWriter(val valueVector: IntervalDayVector)
val millis = (totalMicroseconds % MICROS_PER_DAY) / MICROS_PER_MILLIS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, do we lose micro seconds part? I think this is another reason to use duration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. we lose micro seconds part, end with millisecond. It's inconsistent with that convert java.time.Duration to DayTimeIntervalType that drop any excess presision that greater than microsecond precision.

valueVector.set(count, days.toInt, millis.toInt)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.arrow

import org.apache.arrow.vector.IntervalDayVector

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util._
Expand Down Expand Up @@ -75,12 +77,33 @@ class ArrowWriterSuite extends SparkFunSuite {
check(DateType, Seq(0, 1, 2, null, 4))
check(TimestampType, Seq(0L, 3.6e9.toLong, null, 8.64e10.toLong), "America/Los_Angeles")
check(NullType, Seq(null, null, null))
check(YearMonthIntervalType,
Seq(null, 0, 1, -1, scala.Int.MaxValue, scala.Int.MinValue))
check(DayTimeIntervalType,
Seq(null, 0L, 1000L, -1000L,
(scala.Long.MaxValue - 807L),
(scala.Long.MinValue + 808L)))
check(YearMonthIntervalType, Seq(null, 0, 1, -1, Int.MaxValue, Int.MinValue))
check(DayTimeIntervalType, Seq(null, 0L, 1000L, -1000L, (Long.MaxValue - 807L),
(Long.MinValue + 808L)))
}

test("long overflow for DayTimeIntervalType")
{
val schema = new StructType().add("value", DayTimeIntervalType, nullable = true)
val writer = ArrowWriter.create(schema, null)
val reader = new ArrowColumnVector(writer.root.getFieldVectors().get(0))
val valueVector = writer.root.getFieldVectors().get(0).asInstanceOf[IntervalDayVector]

valueVector.set(0, 106751992, 0)
valueVector.set(1, 106751991, Int.MaxValue)

// first long overflow for test Math.multiplyExact()
val msg = intercept[java.lang.ArithmeticException] {
reader.getLong(0)
}.getMessage
assert(msg.equals("long overflow"))

// second long overflow for test Math.addExact()
val msg1 = intercept[java.lang.ArithmeticException] {
reader.getLong(1)
}.getMessage
assert(msg1.equals("long overflow"))
writer.root.close()
}

test("get multiple") {
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.