Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
1d07034
Implementation of GenericArrayData specialized for primitive type array
kiszk Jun 18, 2016
7b48a30
fix scala style error
kiszk Jun 18, 2016
b5876a6
Introduce GenericRefArrayData
kiszk Jun 21, 2016
ac5b73b
replace 'new GenericArrayData' with 'GenericArrayData.allocate'
kiszk Jun 21, 2016
dab5a8c
Generate GenericArrayData.allocate in NewInstance()
kiszk Jun 22, 2016
a8fe2d8
intial version of Benchmark without performance
kiszk Jun 22, 2016
870dc84
move project of Benchmark program
kiszk Jun 22, 2016
7cdd558
update benchmark program
kiszk Jun 22, 2016
d28e256
addressed review comments
kiszk Jun 22, 2016
e95e137
fix test failures
kiszk Jun 22, 2016
2b27bb3
Enabled all of benchmark suites with performance data
kiszk Jun 22, 2016
878262b
fix test failures
kiszk Jun 22, 2016
6efddf0
update test suite to resolve test failures
kiszk Jun 23, 2016
4d2b6bc
fix compilation error
kiszk Jun 23, 2016
f55aeea
addressed comments
kiszk Jun 25, 2016
78aaf13
fix test failures
kiszk Jun 25, 2016
8b908e2
fix descriptions
kiszk Jun 25, 2016
e9ec382
Better usage of GenericArrayData
kiszk Jun 25, 2016
ae9591e
revert part of changes
kiszk Jun 26, 2016
79f4c95
undo revert
kiszk Jun 26, 2016
f93fbc2
revert changes at 0800fdc5
kiszk Jun 26, 2016
a452bda
add null check after asInstanceOf
kiszk Jun 26, 2016
685d3e7
generate GenericArrayData.allocate in NewInstance
kiszk Jun 26, 2016
29c8519
fix test failure
kiszk Jun 27, 2016
0e43c4b
fix test failure
kiszk Jun 27, 2016
d15a46c
update
kiszk Jul 11, 2016
3e84fdb
replace new GenericArrayData with GenericArrayData.allocate
kiszk Jul 11, 2016
604293e
rebase
kiszk Jul 25, 2016
1dcd582
rebase
kiszk Sep 10, 2016
e60bf40
reimplement without factory method
kiszk Oct 1, 2016
e541f35
update benchmark programs
kiszk Oct 1, 2016
22d310b
fix scala style error
kiszk Oct 1, 2016
8684ad6
fix test failure in OrcQuerySuite
kiszk Oct 1, 2016
aa3ada8
fix test failure (DatasetPrimitiveSuites)
kiszk Oct 1, 2016
a81ee14
update benchmark results
kiszk Oct 1, 2016
681ae03
Implementation of GenericArrayData specialized for primitive type array
kiszk Jun 18, 2016
556e76e
fix scala style error
kiszk Jun 18, 2016
ff45703
Introduce GenericRefArrayData
kiszk Jun 21, 2016
23f9f65
replace 'new GenericArrayData' with 'GenericArrayData.allocate'
kiszk Jun 21, 2016
839bcb8
Generate GenericArrayData.allocate in NewInstance()
kiszk Jun 22, 2016
97068d4
intial version of Benchmark without performance
kiszk Jun 22, 2016
cf2b216
move project of Benchmark program
kiszk Jun 22, 2016
8df1c34
update benchmark program
kiszk Jun 22, 2016
040d9aa
addressed review comments
kiszk Jun 22, 2016
cabc27a
fix test failures
kiszk Jun 22, 2016
80abdbb
Enabled all of benchmark suites with performance data
kiszk Jun 22, 2016
9311664
fix test failures
kiszk Jun 22, 2016
25105f3
update test suite to resolve test failures
kiszk Jun 23, 2016
f328662
fix compilation error
kiszk Jun 23, 2016
16de76f
addressed comments
kiszk Jun 25, 2016
12d138e
fix test failures
kiszk Jun 25, 2016
19c453b
fix descriptions
kiszk Jun 25, 2016
bf34ec4
Better usage of GenericArrayData
kiszk Jun 25, 2016
9cd7776
revert part of changes
kiszk Jun 26, 2016
d0b3f60
undo revert
kiszk Jun 26, 2016
8b19f75
revert changes at 0800fdc5
kiszk Jun 26, 2016
7b0e769
add null check after asInstanceOf
kiszk Jun 26, 2016
1a0486f
generate GenericArrayData.allocate in NewInstance
kiszk Jun 26, 2016
963876f
fix test failure
kiszk Jun 27, 2016
d50ee04
fix test failure
kiszk Jun 27, 2016
d581439
update
kiszk Jul 11, 2016
8063e63
replace new GenericArrayData with GenericArrayData.allocate
kiszk Jul 11, 2016
090869d
update benchmark program
kiszk Jul 19, 2016
1837792
rebase
kiszk Jul 25, 2016
06b07da
rebase
kiszk Sep 10, 2016
06c22ac
reimplement without factory method
kiszk Oct 1, 2016
4c6f41e
update benchmark programs
kiszk Oct 1, 2016
9031224
fix scala style error
kiszk Oct 1, 2016
ecbc32e
fix test failure in OrcQuerySuite
kiszk Oct 1, 2016
503dbde
update benchmark results
kiszk Oct 1, 2016
c7ed68f
fix compilation error
kiszk Nov 8, 2016
c82fbf3
add another use cases from #13909
kiszk Nov 8, 2016
6bf54ec
update benchmark results
kiszk Nov 8, 2016
8bcba32
Revert "update benchmark results"
kiszk Nov 9, 2016
7697e5f
Revert "add another use cases from #13909"
kiszk Nov 9, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rebase
  • Loading branch information
kiszk committed Nov 8, 2016
commit 1dcd58256f2bbb419bb34b600c76c4698f8ee1cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ case class ApproximatePercentile(
if (result.length == 0) {
null
} else if (returnPercentileArray) {
new GenericArrayData(result)
GenericArrayData.allocate(result)
} else {
result(0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,30 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
// test UnsafeRow-backed data
val structEncoder = ExpressionEncoder[Array[Tuple2[java.lang.Integer, java.lang.Integer]]]
val structInputRow = InternalRow.fromSeq(Seq(Array((1, 2), (3, 4))))
val structExpected = new GenericArrayData(
val structExpected = GenericArrayData.allocate(
Array(InternalRow.fromSeq(Seq(1, 2)), InternalRow.fromSeq(Seq(3, 4))))
checkEvalutionWithUnsafeProjection(
structEncoder.serializer.head, structExpected, structInputRow)

// test UnsafeArray-backed data
val arrayEncoder = ExpressionEncoder[Array[Array[Int]]]
val arrayInputRow = InternalRow.fromSeq(Seq(Array(Array(1, 2), Array(3, 4))))
val arrayExpected = new GenericArrayData(
Array(new GenericArrayData(Array(1, 2)), new GenericArrayData(Array(3, 4))))
val arrayExpected = GenericArrayData.allocate(
Array(GenericArrayData.allocate(Array(1, 2)), GenericArrayData.allocate(Array(3, 4))))
checkEvalutionWithUnsafeProjection(
arrayEncoder.serializer.head, arrayExpected, arrayInputRow)

// test UnsafeMap-backed data
val mapEncoder = ExpressionEncoder[Array[Map[Int, Int]]]
val mapInputRow = InternalRow.fromSeq(Seq(Array(
Map(1 -> 100, 2 -> 200), Map(3 -> 300, 4 -> 400))))
val mapExpected = new GenericArrayData(Seq(
val mapExpected = GenericArrayData.allocate(Seq(
new ArrayBasedMapData(
new GenericArrayData(Array(1, 2)),
new GenericArrayData(Array(100, 200))),
GenericArrayData.allocate(Array(1, 2)),
GenericArrayData.allocate(Array(100, 200))),
new ArrayBasedMapData(
new GenericArrayData(Array(3, 4)),
new GenericArrayData(Array(300, 400)))))
GenericArrayData.allocate(Array(3, 4)),
GenericArrayData.allocate(Array(300, 400)))))
checkEvalutionWithUnsafeProjection(
mapEncoder.serializer.head, mapExpected, mapInputRow)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,137 +229,6 @@ private[jdbc] class JDBCRDD(
}
}

// A `JDBCValueSetter` is responsible for converting and setting a value from `ResultSet`
// into a field for `MutableRow`. The last argument `Int` means the index for the
// value to be set in the row and also used for the value to retrieve from `ResultSet`.
private type JDBCValueSetter = (ResultSet, MutableRow, Int) => Unit

/**
* Creates `JDBCValueSetter`s according to [[StructType]], which can set
* each value from `ResultSet` to each field of [[MutableRow]] correctly.
*/
def makeSetters(schema: StructType): Array[JDBCValueSetter] =
schema.fields.map(sf => makeSetter(sf.dataType, sf.metadata))

private def makeSetter(dt: DataType, metadata: Metadata): JDBCValueSetter = dt match {
case BooleanType =>
(rs: ResultSet, row: MutableRow, pos: Int) =>
row.setBoolean(pos, rs.getBoolean(pos + 1))

case DateType =>
(rs: ResultSet, row: MutableRow, pos: Int) =>
// DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
val dateVal = rs.getDate(pos + 1)
if (dateVal != null) {
row.setInt(pos, DateTimeUtils.fromJavaDate(dateVal))
} else {
row.update(pos, null)
}

// When connecting with Oracle DB through JDBC, the precision and scale of BigDecimal
// object returned by ResultSet.getBigDecimal is not correctly matched to the table
// schema reported by ResultSetMetaData.getPrecision and ResultSetMetaData.getScale.
// If inserting values like 19999 into a column with NUMBER(12, 2) type, you get through
// a BigDecimal object with scale as 0. But the dataframe schema has correct type as
// DecimalType(12, 2). Thus, after saving the dataframe into parquet file and then
// retrieve it, you will get wrong result 199.99.
// So it is needed to set precision and scale for Decimal based on JDBC metadata.
case DecimalType.Fixed(p, s) =>
(rs: ResultSet, row: MutableRow, pos: Int) =>
val decimal =
nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s))
row.update(pos, decimal)

case DoubleType =>
(rs: ResultSet, row: MutableRow, pos: Int) =>
row.setDouble(pos, rs.getDouble(pos + 1))

case FloatType =>
(rs: ResultSet, row: MutableRow, pos: Int) =>
row.setFloat(pos, rs.getFloat(pos + 1))

case IntegerType =>
(rs: ResultSet, row: MutableRow, pos: Int) =>
row.setInt(pos, rs.getInt(pos + 1))

case LongType if metadata.contains("binarylong") =>
(rs: ResultSet, row: MutableRow, pos: Int) =>
val bytes = rs.getBytes(pos + 1)
var ans = 0L
var j = 0
while (j < bytes.size) {
ans = 256 * ans + (255 & bytes(j))
j = j + 1
}
row.setLong(pos, ans)

case LongType =>
(rs: ResultSet, row: MutableRow, pos: Int) =>
row.setLong(pos, rs.getLong(pos + 1))

case StringType =>
(rs: ResultSet, row: MutableRow, pos: Int) =>
// TODO(davies): use getBytes for better performance, if the encoding is UTF-8
row.update(pos, UTF8String.fromString(rs.getString(pos + 1)))

case TimestampType =>
(rs: ResultSet, row: MutableRow, pos: Int) =>
val t = rs.getTimestamp(pos + 1)
if (t != null) {
row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t))
} else {
row.update(pos, null)
}

case BinaryType =>
(rs: ResultSet, row: MutableRow, pos: Int) =>
row.update(pos, rs.getBytes(pos + 1))

case ArrayType(et, _) =>
val elementConversion = et match {
case TimestampType =>
(array: Object) =>
array.asInstanceOf[Array[java.sql.Timestamp]].map { timestamp =>
nullSafeConvert(timestamp, DateTimeUtils.fromJavaTimestamp)
}

case StringType =>
(array: Object) =>
array.asInstanceOf[Array[java.lang.String]]
.map(UTF8String.fromString)

case DateType =>
(array: Object) =>
array.asInstanceOf[Array[java.sql.Date]].map { date =>
nullSafeConvert(date, DateTimeUtils.fromJavaDate)
}

case dt: DecimalType =>
(array: Object) =>
array.asInstanceOf[Array[java.math.BigDecimal]].map { decimal =>
nullSafeConvert[java.math.BigDecimal](
decimal, d => Decimal(d, dt.precision, dt.scale))
}

case LongType if metadata.contains("binarylong") =>
throw new IllegalArgumentException(s"Unsupported array element " +
s"type ${dt.simpleString} based on binary")

case ArrayType(_, _) =>
throw new IllegalArgumentException("Nested arrays unsupported")

case _ => (array: Object) => array.asInstanceOf[Array[Any]]
}

(rs: ResultSet, row: MutableRow, pos: Int) =>
val array = nullSafeConvert[Object](
rs.getArray(pos + 1).getArray,
array => GenericArrayData.allocate(elementConversion.apply(array)))
row.update(pos, array)

case _ => throw new IllegalArgumentException(s"Unsupported type ${dt.simpleString}")
}

/**
* Runs the SQL query against the JDBC driver.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ object JdbcUtils extends Logging {
(rs: ResultSet, row: InternalRow, pos: Int) =>
val array = nullSafeConvert[Object](
rs.getArray(pos + 1).getArray,
array => new GenericArrayData(elementConversion.apply(array)))
array => GenericArrayData.allocate(elementConversion.apply(array)))
row.update(pos, array)

case _ => throw new IllegalArgumentException(s"Unsupported type ${dt.simpleString}")
Expand Down