Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
fb9a42d
add two implementations (sparse and dense) for UnsafeArrayData
kiszk Jun 14, 2016
d931428
fix failures of testsuite
kiszk Jun 15, 2016
9777a2d
fix errors of unit tests
kiszk Jun 15, 2016
000eda4
fix failures of unit tests
kiszk Jun 15, 2016
804f081
make DenseID public
kiszk Jun 23, 2016
e6fb261
Use one implementation approach
kiszk Jun 25, 2016
a313084
fix test failures
kiszk Jun 25, 2016
68d92f7
fix test failures
kiszk Jun 25, 2016
7f2da14
update test suite
kiszk Jun 25, 2016
2f26f6f
fix scala style error
kiszk Jun 25, 2016
ccef63c
revert changes
kiszk Jun 25, 2016
c4f1b5e
addressed comments
kiszk Jun 28, 2016
34a5c6a
add benchmark
kiszk Jun 28, 2016
7a77b20
fix scala style error
kiszk Jun 28, 2016
7b0d4da
addressed comments
kiszk Jul 1, 2016
b4eac29
addressed comments
kiszk Jul 2, 2016
eecf6bd
fix parameters of Platform.OFFSET
kiszk Jul 3, 2016
d88a25a
update benchmark results
kiszk Jul 3, 2016
db15432
add test cases
kiszk Jul 3, 2016
3fa7052
addressed comments
kiszk Jul 4, 2016
4c094c2
addressed comments
kiszk Jul 6, 2016
9887171
update test cases
kiszk Jul 6, 2016
9fe7ad0
address comments
kiszk Jul 7, 2016
e4b4b52
address comments for test cases and benchmark
kiszk Jul 7, 2016
585ca7b
addressed comments
kiszk Jul 8, 2016
9933a06
addressed review comments
kiszk Aug 6, 2016
919e832
fixed test failures
kiszk Aug 7, 2016
0886e3a
update test suites
kiszk Aug 9, 2016
c385bf4
align each of variable length elements to 8 bytes
kiszk Aug 18, 2016
c8813db
fixed test failures
kiszk Aug 20, 2016
aa7cfdb
fixed test failures
kiszk Sep 9, 2016
0b7867b
address review comments
kiszk Sep 20, 2016
ab9a16a
address review comments
kiszk Sep 20, 2016
515701b
address review comments
kiszk Sep 20, 2016
8169abd
change benchmark size
kiszk Sep 26, 2016
e356a79
addressed comments
kiszk Sep 26, 2016
2ef6e3b
update performance results
kiszk Sep 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address comments for test cases and benchmark
  • Loading branch information
kiszk committed Sep 26, 2016
commit e4b4b52eb5c227ddddd603486e136538551be575
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class UnsafeArraySuite extends SparkFunSuite {
val longArray = Array(1.toLong, 10.toLong, 100.toLong)
val floatArray = Array(1.1.toFloat, 2.2.toFloat, 3.3.toFloat)
val doubleArray = Array(1.1, 2.2, 3.3)
val stringArray = Array("1", "10", "100")

val intMultiDimArray = Array(Array(1, 10), Array(2, 20, 200), Array(3, 30, 300, 3000))
val doubleMultiDimArray = Array(
Expand Down Expand Up @@ -84,6 +85,14 @@ class UnsafeArraySuite extends SparkFunSuite {
assert(unsafeDouble.getDouble(i) == e)
}

val unsafeString = ExpressionEncoder[Array[String]].resolveAndBind().
toRow(stringArray).getArray(0)
assert(unsafeString.isInstanceOf[UnsafeArrayData])
assert(unsafeString.numElements == stringArray.length)
stringArray.zipWithIndex.map { case (e, i) =>
assert(unsafeString.getUTF8String(i).toString().equals(e))
}

val unsafeMultiDimInt = ExpressionEncoder[Array[Array[Int]]].resolveAndBind().
toRow(intMultiDimArray).getArray(0)
assert(unsafeMultiDimInt.isInstanceOf[UnsafeArrayData])
Expand Down Expand Up @@ -131,13 +140,9 @@ class UnsafeArraySuite extends SparkFunSuite {

test("to primitive array") {
val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
val intInternalRow = intEncoder.toRow(intArray)
val intUnsafeArray = intInternalRow.getArray(0)
assert(intUnsafeArray.toIntArray.sameElements(intArray))
assert(intEncoder.toRow(intArray).getArray(0).toIntArray.sameElements(intArray))

val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
val doubleInternalRow = doubleEncoder.toRow(doubleArray)
val doubleUnsafeArray = doubleInternalRow.getArray(0)
assert(doubleUnsafeArray.toDoubleArray.sameElements(doubleArray))
assert(doubleEncoder.toRow(doubleArray).getArray(0).toDoubleArray.sameElements(doubleArray))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,52 +35,44 @@ import org.apache.spark.util.Benchmark
class UnsafeArrayDataBenchmark extends BenchmarkBase {

def calculateHeaderPortionInBytes(count: Int) : Int = {
// Use this assignment for SPARK-15962
// val size = 4 + 4 * count
val size = UnsafeArrayData.calculateHeaderPortionInBytes(count)
size
/* 4 + 4 * count // Use this expression for SPARK-15962 */
UnsafeArrayData.calculateHeaderPortionInBytes(count)
}

def readUnsafeArray(iters: Int): Unit = {
val count = 1024 * 1024 * 16
val rand = new Random(42)

var intResult: Int = 0
val intBuffer = Array.fill[Int](count) { rand.nextInt }
val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
val intInternalRow = intEncoder.toRow(intBuffer)
val intUnsafeArray = intInternalRow.getArray(0)
val intUnsafeArray = intEncoder.toRow(intPrimitiveArray).getArray(0)
val readIntArray = { i: Int =>
var n = 0
while (n < iters) {
val len = intUnsafeArray.numElements
var sum = 0.toInt
var sum = 0
var i = 0
while (i < len) {
sum += intUnsafeArray.getInt(i)
i += 1
}
intResult = sum
n += 1
}
}

var doubleResult: Double = 0
val doubleBuffer = Array.fill[Double](count) { rand.nextDouble }
val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble }
val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
val doubleInternalRow = doubleEncoder.toRow(doubleBuffer)
val doubleUnsafeArray = doubleInternalRow.getArray(0)
val doubleUnsafeArray = doubleEncoder.toRow(doublePrimitiveArray).getArray(0)
val readDoubleArray = { i: Int =>
var n = 0
while (n < iters) {
val len = doubleUnsafeArray.numElements
var sum = 0.toDouble
var sum = 0.0
var i = 0
while (i < len) {
sum += doubleUnsafeArray.getDouble(i)
i += 1
}
doubleResult = sum
n += 1
}
}
Expand All @@ -102,43 +94,26 @@ class UnsafeArrayDataBenchmark extends BenchmarkBase {

def writeUnsafeArray(iters: Int): Unit = {
val count = 1024 * 1024 * 16
val rand = new Random(42)

val intUnsafeRow = new UnsafeRow(1)
val intUnsafeArrayWriter = new UnsafeArrayWriter
val intBufferHolder = new BufferHolder(intUnsafeRow, 64)
intBufferHolder.reset()
intUnsafeArrayWriter.initialize(intBufferHolder, count, 4)
val intCursor = intBufferHolder.cursor
var intTotalLength: Int = 0
val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
val writeIntArray = { i: Int =>
var n = 0
while (n < iters) {
intBufferHolder.cursor = intCursor
val len = count
var i = 0
while (i < len) {
intUnsafeArrayWriter.write(i, 0.toInt)
i += 1
}
intTotalLength += intEncoder.toRow(intPrimitiveArray).getArray(0).numElements()
n += 1
}
}

val doubleUnsafeRow = new UnsafeRow(1)
val doubleUnsafeArrayWriter = new UnsafeArrayWriter
val doubleBufferHolder = new BufferHolder(doubleUnsafeRow, 64)
doubleBufferHolder.reset()
doubleUnsafeArrayWriter.initialize(doubleBufferHolder, count, 8)
val doubleCursor = doubleBufferHolder.cursor
var doubleTotalLength: Int = 0
val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble }
val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
val writeDoubleArray = { i: Int =>
var n = 0
while (n < iters) {
doubleBufferHolder.cursor = doubleCursor
val len = count
var i = 0
while (i < len) {
doubleUnsafeArrayWriter.write(i, 0.toDouble)
i += 1
}
doubleTotalLength += doubleEncoder.toRow(doublePrimitiveArray).getArray(0).numElements()
n += 1
}
}
Expand All @@ -147,26 +122,16 @@ class UnsafeArrayDataBenchmark extends BenchmarkBase {
benchmark.addCase("Int")(writeIntArray)
benchmark.addCase("Double")(writeDoubleArray)
benchmark.run
/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz

Write UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Int 79 / 86 2124.2 0.5 1.0X
Double 140 / 147 1201.0 0.8 0.6X
*/
}

def getPrimitiveArray(iters: Int): Unit = {
val count = 1024 * 1024 * 12
val rand = new Random(42)

var intTotalLength: Int = 0
val intBuffer = Array.fill[Int](count) { rand.nextInt }
val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
val intInternalRow = intEncoder.toRow(intBuffer)
val intUnsafeArray = intInternalRow.getArray(0)
val intUnsafeArray = intEncoder.toRow(intPrimitiveArray).getArray(0)
val readIntArray = { i: Int =>
var n = 0
while (n < iters) {
Expand All @@ -176,10 +141,9 @@ class UnsafeArrayDataBenchmark extends BenchmarkBase {
}

var doubleTotalLength: Int = 0
val doubleBuffer = Array.fill[Double](count) { rand.nextDouble }
val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble }
val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
val doubleInternalRow = doubleEncoder.toRow(doubleBuffer)
val doubleUnsafeArray = doubleInternalRow.getArray(0)
val doubleUnsafeArray = doubleEncoder.toRow(doublePrimitiveArray).getArray(0)
val readDoubleArray = { i: Int =>
var n = 0
while (n < iters) {
Expand Down