Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Test suites refactored
  • Loading branch information
liancheng committed Apr 1, 2014
commit c298b7666c3c920b0fd30ea86a4fe047e283a086
Original file line number Diff line number Diff line change
Expand Up @@ -17,68 +17,32 @@

package org.apache.spark.sql.columnar

import scala.reflect.ClassTag
import scala.util.Random

import org.scalatest.FunSuite

import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.types._

class ColumnStatsSuite extends FunSuite {
testColumnStats[BooleanType.type, BooleanColumnStats] {
Random.nextBoolean()
}

testColumnStats[IntegerType.type, IntColumnStats] {
Random.nextInt()
}

testColumnStats[LongType.type, LongColumnStats] {
Random.nextLong()
}

testColumnStats[ShortType.type, ShortColumnStats] {
(Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort
}

testColumnStats[ByteType.type, ByteColumnStats] {
(Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
}

testColumnStats[DoubleType.type, DoubleColumnStats] {
Random.nextDouble()
}

testColumnStats[FloatType.type, FloatColumnStats] {
Random.nextFloat()
}

testColumnStats[StringType.type, StringColumnStats] {
Random.nextString(Random.nextInt(32))
}
testColumnStats(classOf[IntColumnStats], INT)

def testColumnStats[T <: NativeType, U <: NativeColumnStats[T]: ClassTag](
mkRandomValue: => U#JvmType) {
def testColumnStats[T <: NativeType, U <: NativeColumnStats[T]](
columnStatsClass: Class[U],
columnType: NativeColumnType[T]) {

val columnStatsClass = implicitly[ClassTag[U]].runtimeClass
val columnStatsName = columnStatsClass.getSimpleName

test(s"$columnStatsName: empty") {
val columnStats = columnStatsClass.newInstance().asInstanceOf[U]
val columnStats = columnStatsClass.newInstance()
assert((columnStats.lowerBound, columnStats.upperBound) === columnStats.initialBounds)
}

test(s"$columnStatsName: non-empty") {
val columnStats = columnStatsClass.newInstance().asInstanceOf[U]
val values = Seq.fill[U#JvmType](10)(mkRandomValue)
val row = new GenericMutableRow(1)
import ColumnarTestData._

values.foreach { value =>
row(0) = value
columnStats.gatherStats(row, 0)
}
val columnStats = columnStatsClass.newInstance()
val rows = Seq.fill(10)(makeRandomRow(columnType))
rows.foreach(columnStats.gatherStats(_, 0))

val values = rows.map(_.head.asInstanceOf[T#JvmType])
assert(columnStats.lowerBound === values.min(columnStats.ordering))
assert(columnStats.upperBound === values.max(columnStats.ordering))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package org.apache.spark.sql.columnar

import java.nio.ByteBuffer

import scala.util.Random

import org.scalatest.FunSuite

import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar.ColumnarTestData._
import org.apache.spark.sql.execution.SparkSqlSerializer

class ColumnTypeSuite extends FunSuite {
val DEFAULT_BUFFER_SIZE = 512

val columnTypes = Seq(INT, SHORT, LONG, BYTE, DOUBLE, FLOAT, STRING, BINARY, GENERIC)

test("defaultSize") {
Expand Down Expand Up @@ -55,116 +56,69 @@ class ColumnTypeSuite extends FunSuite {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor existing comment: I find this style of testing produces very cryptic failures. When something breaks all you are going to get is 4 does not equal 8. Furthermore, because the failure is in a loop the stacktrace also won't be helpful in figuring out which datatype is wrong. Finally, the correct answer for GENERIC is 10 lines aways from the check, making it unnecessarily hard to read the test and see what the expected answers are.

I think something like this would be clearer, and the same number of lines of code:

def checkActualSize[A](t: ColumnType[A], v: A, expectedSize: Int) =
  if(t.actualSize(v) != expectedSize) { 
    fail(s"Wrong actualSize for $t, actual: ${t.actualSize(t)}, expected: $expected") 
  }

checkActualSize(INT, Int.MaxValue, 4)
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found expectResult is equivalent to this and is more concise. Updated all occurrences where I think is proper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh cool, I did not know about that. Much clearer!

}

testNumericColumnType[BooleanType.type, Boolean](
testNativeColumnStats[BooleanType.type](
BOOLEAN,
Array.fill(4)(Random.nextBoolean()),
ByteBuffer.allocate(32),
(buffer: ByteBuffer, v: Boolean) => {
buffer.put((if (v) 1 else 0).toByte)
},
(buffer: ByteBuffer) => {
buffer.get() == 1
})

testNumericColumnType[IntegerType.type, Int](
testNativeColumnStats[IntegerType.type](
INT,
Array.fill(4)(Random.nextInt()),
ByteBuffer.allocate(32),
(_: ByteBuffer).putInt(_),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I believe ByteBuffer is inferred from the function signature so this can just be _.putInt(_). Same below.

(_: ByteBuffer).getInt)

testNumericColumnType[ShortType.type, Short](
testNativeColumnStats[ShortType.type](
SHORT,
Array.fill(4)(Random.nextInt(Short.MaxValue).asInstanceOf[Short]),
ByteBuffer.allocate(32),
(_: ByteBuffer).putShort(_),
(_: ByteBuffer).getShort)

testNumericColumnType[LongType.type, Long](
testNativeColumnStats[LongType.type](
LONG,
Array.fill(4)(Random.nextLong()),
ByteBuffer.allocate(64),
(_: ByteBuffer).putLong(_),
(_: ByteBuffer).getLong)

testNumericColumnType[ByteType.type, Byte](
testNativeColumnStats[ByteType.type](
BYTE,
Array.fill(4)(Random.nextInt(Byte.MaxValue).asInstanceOf[Byte]),
ByteBuffer.allocate(64),
(_: ByteBuffer).put(_),
(_: ByteBuffer).get)

testNumericColumnType[DoubleType.type, Double](
testNativeColumnStats[DoubleType.type](
DOUBLE,
Array.fill(4)(Random.nextDouble()),
ByteBuffer.allocate(64),
(_: ByteBuffer).putDouble(_),
(_: ByteBuffer).getDouble)

testNumericColumnType[FloatType.type, Float](
testNativeColumnStats[FloatType.type](
FLOAT,
Array.fill(4)(Random.nextFloat()),
ByteBuffer.allocate(64),
(_: ByteBuffer).putFloat(_),
(_: ByteBuffer).getFloat)

test("STRING") {
val buffer = ByteBuffer.allocate(128)
val seq = Array("hello", "world", "spark", "sql")

seq.map(_.getBytes).foreach { bytes: Array[Byte] =>
buffer.putInt(bytes.length).put(bytes)
}

buffer.rewind()
seq.foreach { s =>
assert(s === STRING.extract(buffer))
}

buffer.rewind()
seq.foreach(STRING.append(_, buffer))

buffer.rewind()
seq.foreach { s =>
val length = buffer.getInt
assert(length === s.getBytes.length)

testNativeColumnStats[StringType.type](
STRING,
(buffer: ByteBuffer, string: String) => {
val bytes = string.getBytes()
buffer.putInt(bytes.length).put(string.getBytes)
},
(buffer: ByteBuffer) => {
val length = buffer.getInt()
val bytes = new Array[Byte](length)
buffer.get(bytes, 0, length)
assert(s === new String(bytes))
}
}

test("BINARY") {
val buffer = ByteBuffer.allocate(128)
val seq = Array.fill(4) {
val bytes = new Array[Byte](4)
Random.nextBytes(bytes)
bytes
}
new String(bytes)
})

seq.foreach { bytes =>
testColumnStats[BinaryType.type, Array[Byte]](
BINARY,
(buffer: ByteBuffer, bytes: Array[Byte]) => {
buffer.putInt(bytes.length).put(bytes)
}

buffer.rewind()
seq.foreach { b =>
assert(b === BINARY.extract(buffer))
}

buffer.rewind()
seq.foreach(BINARY.append(_, buffer))

buffer.rewind()
seq.foreach { b =>
val length = buffer.getInt
assert(length === b.length)

},
(buffer: ByteBuffer) => {
val length = buffer.getInt()
val bytes = new Array[Byte](length)
buffer.get(bytes, 0, length)
assert(b === bytes)
}
}
bytes
})

test("GENERIC") {
val buffer = ByteBuffer.allocate(512)
Expand All @@ -188,14 +142,22 @@ class ColumnTypeSuite extends FunSuite {
assert(obj === SparkSqlSerializer.deserialize(GENERIC.extract(buffer)))
}

def testNumericColumnType[T <: DataType, JvmType](
def testNativeColumnStats[T <: NativeType](
columnType: NativeColumnType[T],
putter: (ByteBuffer, T#JvmType) => Unit,
getter: (ByteBuffer) => T#JvmType) {

testColumnStats[T, T#JvmType](columnType, putter, getter)
}

def testColumnStats[T <: DataType, JvmType](
columnType: ColumnType[T, JvmType],
seq: Seq[JvmType],
buffer: ByteBuffer,
putter: (ByteBuffer, JvmType) => Unit,
getter: (ByteBuffer) => JvmType) {

val buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE)
val columnTypeName = columnType.getClass.getSimpleName.stripSuffix("$")
val seq = (0 until 4).map(_ => makeRandomValue(columnType))

test(s"$columnTypeName.extract") {
buffer.rewind()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,57 @@ package org.apache.spark.sql.columnar

import scala.util.Random

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.types.DataType

// TODO Enrich test data
object ColumnarTestData {
object GenericMutableRow {
def apply(values: Any*) = {
val row = new GenericMutableRow(values.length)
row.indices.foreach { i =>
row(i) = values(i)
}
row
def makeNullRow(length: Int) = {
val row = new GenericMutableRow(length)
(0 until length).foreach(row.setNullAt)
row
}

def makeRandomValue[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]): JvmType = {
def randomBytes(length: Int) = {
val bytes = new Array[Byte](length)
Random.nextBytes(bytes)
bytes
}

(columnType match {
case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort
case INT => Random.nextInt()
case LONG => Random.nextLong()
case FLOAT => Random.nextFloat()
case DOUBLE => Random.nextDouble()
case STRING => Random.nextString(Random.nextInt(32))
case BOOLEAN => Random.nextBoolean()
case BINARY => randomBytes(Random.nextInt(32))
case _ =>
// Using an random one-element map instead of an arbitrary object
Map(Random.nextInt() -> Random.nextString(Random.nextInt(32)))
}).asInstanceOf[JvmType]
}

def randomBytes(length: Int) = {
val bytes = new Array[Byte](length)
Random.nextBytes(bytes)
bytes
def makeRandomValues(
head: ColumnType[_ <: DataType, _],
tail: ColumnType[_ <: DataType, _]*): Seq[Any] = makeRandomValues(Seq(head) ++ tail)

def makeRandomValues(columnTypes: Seq[ColumnType[_ <: DataType, _]]): Seq[Any] = {
columnTypes.map(makeRandomValue(_))
}

val nonNullRandomRow = GenericMutableRow(
Random.nextInt(),
Random.nextLong(),
Random.nextFloat(),
Random.nextDouble(),
Random.nextBoolean(),
Random.nextInt(Byte.MaxValue).asInstanceOf[Byte],
Random.nextInt(Short.MaxValue).asInstanceOf[Short],
Random.nextString(Random.nextInt(64)),
randomBytes(Random.nextInt(64)),
Map(Random.nextInt() -> Random.nextString(4)))

val nullRow = GenericMutableRow(Seq.fill(10)(null): _*)
def makeRandomRow(
head: ColumnType[_ <: DataType, _],
tail: ColumnType[_ <: DataType, _]*): Row = makeRandomRow(Seq(head) ++ tail)

def makeRandomRow(columnTypes: Seq[ColumnType[_ <: DataType, _]]): Row = {
val row = new GenericMutableRow(columnTypes.length)
makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) =>
row(index) = value
}
row
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,29 @@ class NullableColumnAccessorSuite extends FunSuite {

def testNullableColumnAccessor[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) {
val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
val nullRow = makeNullRow(1)

test(s"$typeName accessor: empty column") {
test(s"Nullable $typeName accessor: empty column") {
val builder = TestNullableColumnBuilder(columnType)
val accessor = TestNullableColumnAccessor(builder.build(), columnType)
assert(!accessor.hasNext)
}

test(s"$typeName accessor: access null values") {
test(s"Nullable $typeName accessor: access null values") {
val builder = TestNullableColumnBuilder(columnType)
val randomRow = makeRandomRow(columnType)

(0 until 4).foreach { _ =>
builder.appendFrom(nonNullRandomRow, columnType.typeId)
builder.appendFrom(nullRow, columnType.typeId)
builder.appendFrom(randomRow, 0)
builder.appendFrom(nullRow, 0)
}

val accessor = TestNullableColumnAccessor(builder.build(), columnType)
val row = new GenericMutableRow(1)

(0 until 4).foreach { _ =>
accessor.extractTo(row, 0)
assert(row(0) === nonNullRandomRow(columnType.typeId))
assert(row(0) === randomRow(0))

accessor.extractTo(row, 0)
assert(row(0) === null)
Expand Down
Loading