Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,14 @@ public final int appendFloats(int count, float v) {
return result;
}

public final int appendFloats(int length, float[] src, int offset) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really use this API and also appendDoubles? I scan the codes but didn't find anywhere they are used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. That's the reason why this is missing until now. We need to use them in order to ColumnarBatch independently from Parquet.

reserve(elementsAppended + length);
int result = elementsAppended;
putFloats(elementsAppended, length, src, offset);
elementsAppended += length;
return result;
}

public final int appendDouble(double v) {
reserve(elementsAppended + 1);
putDouble(elementsAppended, v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,33 +41,58 @@ class ColumnarBatchSuite extends SparkFunSuite {
val column = ColumnVector.allocate(1024, IntegerType, memMode)
var idx = 0
assert(column.anyNullsSet() == false)
assert(column.numNulls() == 0)

column.appendNotNull()
reference += false
assert(column.anyNullsSet() == false)
assert(column.numNulls() == 0)

column.appendNotNulls(3)
(1 to 3).foreach(_ => reference += false)
assert(column.anyNullsSet() == false)
assert(column.numNulls() == 0)

column.appendNull()
reference += true
assert(column.anyNullsSet())
assert(column.numNulls() == 1)

column.appendNulls(3)
(1 to 3).foreach(_ => reference += true)
assert(column.anyNullsSet())
assert(column.numNulls() == 4)

idx = column.elementsAppended

column.putNotNull(idx)
reference += false
idx += 1
assert(column.anyNullsSet() == false)
assert(column.anyNullsSet())
assert(column.numNulls() == 4)

column.putNull(idx)
reference += true
idx += 1
assert(column.anyNullsSet() == true)
assert(column.numNulls() == 1)
assert(column.anyNullsSet())
assert(column.numNulls() == 5)

column.putNulls(idx, 3)
reference += true
reference += true
reference += true
idx += 3
assert(column.anyNullsSet() == true)
assert(column.anyNullsSet())
assert(column.numNulls() == 8)

column.putNotNulls(idx, 4)
reference += false
reference += false
reference += false
reference += false
idx += 4
assert(column.anyNullsSet() == true)
assert(column.numNulls() == 4)
assert(column.anyNullsSet())
assert(column.numNulls() == 8)

reference.zipWithIndex.foreach { v =>
assert(v._1 == column.isNullAt(v._2))
Expand All @@ -85,9 +110,26 @@ class ColumnarBatchSuite extends SparkFunSuite {
val reference = mutable.ArrayBuffer.empty[Byte]

val column = ColumnVector.allocate(1024, ByteType, memMode)
var idx = 0

val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toByte).toArray
var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toByte).toArray
column.appendBytes(2, values, 0)
reference += 10.toByte
reference += 20.toByte

column.appendBytes(3, values, 2)
reference += 30.toByte
reference += 40.toByte
reference += 50.toByte

column.appendBytes(6, 60.toByte)
(1 to 6).foreach(_ => reference += 60.toByte)

column.appendByte(70.toByte)
reference += 70.toByte

var idx = column.elementsAppended

values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toByte).toArray
column.putBytes(idx, 2, values, 0)
reference += 1
reference += 2
Expand Down Expand Up @@ -126,9 +168,26 @@ class ColumnarBatchSuite extends SparkFunSuite {
val reference = mutable.ArrayBuffer.empty[Short]

val column = ColumnVector.allocate(1024, ShortType, memMode)
var idx = 0

val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toShort).toArray
var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toShort).toArray
column.appendShorts(2, values, 0)
reference += 10.toShort
reference += 20.toShort

column.appendShorts(3, values, 2)
reference += 30.toShort
reference += 40.toShort
reference += 50.toShort

column.appendShorts(6, 60.toShort)
(1 to 6).foreach(_ => reference += 60.toShort)

column.appendShort(70.toShort)
reference += 70.toShort

var idx = column.elementsAppended

values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toShort).toArray
column.putShorts(idx, 2, values, 0)
reference += 1
reference += 2
Expand Down Expand Up @@ -189,9 +248,26 @@ class ColumnarBatchSuite extends SparkFunSuite {
val reference = mutable.ArrayBuffer.empty[Int]

val column = ColumnVector.allocate(1024, IntegerType, memMode)
var idx = 0

val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).toArray
var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).toArray
column.appendInts(2, values, 0)
reference += 10
reference += 20

column.appendInts(3, values, 2)
reference += 30
reference += 40
reference += 50

column.appendInts(6, 60)
(1 to 6).foreach(_ => reference += 60)

column.appendInt(70)
reference += 70

var idx = column.elementsAppended

values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).toArray
column.putInts(idx, 2, values, 0)
reference += 1
reference += 2
Expand Down Expand Up @@ -257,9 +333,26 @@ class ColumnarBatchSuite extends SparkFunSuite {
val reference = mutable.ArrayBuffer.empty[Long]

val column = ColumnVector.allocate(1024, LongType, memMode)
var idx = 0

val values = (1L :: 2L :: 3L :: 4L :: 5L :: Nil).toArray
var values = (10L :: 20L :: 30L :: 40L :: 50L :: Nil).toArray
column.appendLongs(2, values, 0)
reference += 10L
reference += 20L

column.appendLongs(3, values, 2)
reference += 30L
reference += 40L
reference += 50L

column.appendLongs(6, 60L)
(1 to 6).foreach(_ => reference += 60L)

column.appendLong(70L)
reference += 70L

var idx = column.elementsAppended

values = (1L :: 2L :: 3L :: 4L :: 5L :: Nil).toArray
column.putLongs(idx, 2, values, 0)
reference += 1
reference += 2
Expand Down Expand Up @@ -320,16 +413,124 @@ class ColumnarBatchSuite extends SparkFunSuite {
}}
}

test("Float APIs") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the added test, do we really test the added appendFloats?

(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Float]

val column = ColumnVector.allocate(1024, FloatType, memMode)

var values = (.1f :: .2f :: .3f :: .4f :: .5f :: Nil).toArray
column.appendFloats(2, values, 0)
reference += .1f
reference += .2f

column.appendFloats(3, values, 2)
reference += .3f
reference += .4f
reference += .5f

column.appendFloats(6, .6f)
(1 to 6).foreach(_ => reference += .6f)

column.appendFloat(.7f)
reference += .7f

var idx = column.elementsAppended

values = (1.0f :: 2.0f :: 3.0f :: 4.0f :: 5.0f :: Nil).toArray
column.putFloats(idx, 2, values, 0)
reference += 1.0f
reference += 2.0f
idx += 2

column.putFloats(idx, 3, values, 2)
reference += 3.0f
reference += 4.0f
reference += 5.0f
idx += 3

val buffer = new Array[Byte](8)
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234f)
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, 1.123f)

if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
// Ensure array contains Little Endian floats
val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getFloat(0))
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, bb.getFloat(4))
}

column.putFloats(idx, 1, buffer, 4)
column.putFloats(idx + 1, 1, buffer, 0)
reference += 1.123f
reference += 2.234f
idx += 2

column.putFloats(idx, 2, buffer, 0)
reference += 2.234f
reference += 1.123f
idx += 2

while (idx < column.capacity) {
val single = random.nextBoolean()
if (single) {
val v = random.nextFloat()
column.putFloat(idx, v)
reference += v
idx += 1
} else {
val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx)
val v = random.nextFloat()
column.putFloats(idx, n, v)
var i = 0
while (i < n) {
reference += v
i += 1
}
idx += n
}
}

reference.zipWithIndex.foreach { v =>
assert(v._1 == column.getFloat(v._2), "Seed = " + seed + " MemMode=" + memMode)
if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.valuesNativeAddress()
assert(v._1 == Platform.getFloat(null, addr + 4 * v._2))
}
}
column.close
}}
}

test("Double APIs") {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Double]

val column = ColumnVector.allocate(1024, DoubleType, memMode)
var idx = 0

val values = (1.0 :: 2.0 :: 3.0 :: 4.0 :: 5.0 :: Nil).toArray
var values = (.1 :: .2 :: .3 :: .4 :: .5 :: Nil).toArray
column.appendDoubles(2, values, 0)
reference += .1
reference += .2

column.appendDoubles(3, values, 2)
reference += .3
reference += .4
reference += .5

column.appendDoubles(6, .6)
(1 to 6).foreach(_ => reference += .6)

column.appendDouble(.7)
reference += .7

var idx = column.elementsAppended

values = (1.0 :: 2.0 :: 3.0 :: 4.0 :: 5.0 :: Nil).toArray
column.putDoubles(idx, 2, values, 0)
reference += 1.0
reference += 2.0
Expand All @@ -346,8 +547,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123)

if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
// Ensure array contains Liitle Endian doubles
var bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
// Ensure array contains Little Endian doubles
val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0))
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, bb.getDouble(8))
}
Expand Down Expand Up @@ -400,40 +601,47 @@ class ColumnarBatchSuite extends SparkFunSuite {

val column = ColumnVector.allocate(6, BinaryType, memMode)
assert(column.arrayData().elementsAppended == 0)
var idx = 0

val str = "string"
column.appendByteArray(str.getBytes(StandardCharsets.UTF_8),
0, str.getBytes(StandardCharsets.UTF_8).length)
reference += str
assert(column.arrayData().elementsAppended == 6)

var idx = column.elementsAppended

val values = ("Hello" :: "abc" :: Nil).toArray
column.putByteArray(idx, values(0).getBytes(StandardCharsets.UTF_8),
0, values(0).getBytes(StandardCharsets.UTF_8).length)
reference += values(0)
idx += 1
assert(column.arrayData().elementsAppended == 5)
assert(column.arrayData().elementsAppended == 11)

column.putByteArray(idx, values(1).getBytes(StandardCharsets.UTF_8),
0, values(1).getBytes(StandardCharsets.UTF_8).length)
reference += values(1)
idx += 1
assert(column.arrayData().elementsAppended == 8)
assert(column.arrayData().elementsAppended == 14)

// Just put llo
val offset = column.putByteArray(idx, values(0).getBytes(StandardCharsets.UTF_8),
2, values(0).getBytes(StandardCharsets.UTF_8).length - 2)
reference += "llo"
idx += 1
assert(column.arrayData().elementsAppended == 11)
assert(column.arrayData().elementsAppended == 17)

// Put the same "ll" at offset. This should not allocate more memory in the column.
column.putArray(idx, offset, 2)
reference += "ll"
idx += 1
assert(column.arrayData().elementsAppended == 11)
assert(column.arrayData().elementsAppended == 17)

// Put a long string
val s = "abcdefghijklmnopqrstuvwxyz"
column.putByteArray(idx, (s + s).getBytes(StandardCharsets.UTF_8))
reference += (s + s)
idx += 1
assert(column.arrayData().elementsAppended == 11 + (s + s).length)
assert(column.arrayData().elementsAppended == 17 + (s + s).length)

reference.zipWithIndex.foreach { v =>
assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode)
Expand Down