Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added test suite for BooleanBitSet, refactored other test suites
  • Loading branch information
liancheng committed Apr 5, 2014
commit 3c1ad7afc74179536ef2898627e0192a7b6c68cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
override val typeId = 2

// 32K unique values allowed
private val MAX_DICT_SIZE = Short.MaxValue - 1
val MAX_DICT_SIZE = Short.MaxValue

override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
new this.Decoder(buffer, columnType)
Expand Down Expand Up @@ -272,9 +272,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
private[sql] case object BooleanBitSet extends CompressionScheme {
override val typeId = 3

private val BITS_PER_LONG = 64

private var _uncompressedSize = 0
val BITS_PER_LONG = 64

override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
new this.Decoder(buffer).asInstanceOf[compression.Decoder[T]]
Expand All @@ -285,11 +283,13 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
override def supports(columnType: ColumnType[_, _]) = columnType == BOOLEAN

class Encoder extends compression.Encoder[BooleanType.type] {
private var _uncompressedSize = 0

override def gatherCompressibilityStats(
value: Boolean,
columnType: NativeColumnType[BooleanType.type]) {

_uncompressedSize += columnType.actualSize(value)
_uncompressedSize += BOOLEAN.defaultSize
}

override def compress(
Expand All @@ -301,7 +301,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
// Total element count (1 byte per Boolean value)
.putInt(from.remaining)

while (from.remaining > BITS_PER_LONG) {
while (from.remaining >= BITS_PER_LONG) {
var word = 0: Long
var i = 0

Expand Down Expand Up @@ -344,7 +344,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
class Decoder(buffer: ByteBuffer) extends compression.Decoder[BooleanType.type] {
private val count = buffer.getInt()

private var currentWord = if (count > 0) buffer.getLong() else 0: Long
private var currentWord = 0: Long

private var visited: Int = 0

Expand All @@ -356,7 +356,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
currentWord = buffer.getLong()
}

((currentWord >> bit) & 1) > 0
((currentWord >> bit) & 1) != 0
}

override def hasNext: Boolean = visited < count
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.columnar.compression

import org.scalatest.FunSuite

import org.apache.spark.sql.Row
import org.apache.spark.sql.columnar.{BOOLEAN, BooleanColumnStats}
import org.apache.spark.sql.columnar.ColumnarTestUtils._

class BooleanBitSetSuite extends FunSuite {
import BooleanBitSet._

def skeleton(count: Int) {
// -------------
// Tests encoder
// -------------

val builder = TestCompressibleColumnBuilder(new BooleanColumnStats, BOOLEAN, BooleanBitSet)
val rows = Seq.fill[Row](count)(makeRandomRow(BOOLEAN))
val values = rows.map(_.head)

rows.foreach(builder.appendFrom(_, 0))
val buffer = builder.build()

// Column type ID + null count + null positions
val headerSize = CompressionScheme.columnHeaderSize(buffer)

// Compression scheme ID + element count + bitset words
val compressedSize = 4 + 4 + {
val extra = if (count % BITS_PER_LONG == 0) 0 else 1
(count / BITS_PER_LONG + extra) * 8
}

// 4 extra bytes for compression scheme type ID
expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)

// Skips column header
buffer.position(headerSize)
expectResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt())
expectResult(count, "Wrong element count")(buffer.getInt())

var word = 0: Long
for (i <- 0 until count) {
val bit = i % BITS_PER_LONG
word = if (bit == 0) buffer.getLong() else word
expectResult(values(i), s"Wrong value in compressed buffer, index=$i") {
(word & ((1: Long) << bit)) != 0
}
}

// -------------
// Tests decoder
// -------------

// Rewinds, skips column header and 4 more bytes for compression scheme ID
buffer.rewind().position(headerSize + 4)

val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
values.foreach(expectResult(_, "Wrong decoded value")(decoder.next()))
assert(!decoder.hasNext)
}

test(s"$BooleanBitSet: empty") {
skeleton(0)
}

test(s"$BooleanBitSet: less than 1 word") {
skeleton(BITS_PER_LONG - 1)
}

test(s"$BooleanBitSet: exactly 1 word") {
skeleton(BITS_PER_LONG)
}

test(s"$BooleanBitSet: multiple whole words") {
skeleton(BITS_PER_LONG * 2)
}

test(s"$BooleanBitSet: multiple words and 1 more bit") {
skeleton(BITS_PER_LONG * 2 + 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar._
import org.apache.spark.sql.columnar.ColumnarTestUtils._
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow

class DictionaryEncodingSuite extends FunSuite {
testDictionaryEncoding(new IntColumnStats, INT)
Expand All @@ -41,73 +40,82 @@ class DictionaryEncodingSuite extends FunSuite {
(0 until buffer.getInt()).map(columnType.extract(buffer) -> _.toShort).toMap
}

test(s"$DictionaryEncoding with $typeName: simple case") {
def stableDistinct(seq: Seq[Int]): Seq[Int] = if (seq.isEmpty) {
Seq.empty
} else {
seq.head +: seq.tail.filterNot(_ == seq.head)
}

def skeleton(uniqueValueCount: Int, inputSeq: Seq[Int]) {
// -------------
// Tests encoder
// -------------

val builder = TestCompressibleColumnBuilder(columnStats, columnType, DictionaryEncoding)
val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 2)

builder.initialize(0)
builder.appendFrom(rows(0), 0)
builder.appendFrom(rows(1), 0)
builder.appendFrom(rows(0), 0)
builder.appendFrom(rows(1), 0)

val buffer = builder.build()
val headerSize = CompressionScheme.columnHeaderSize(buffer)
// 4 extra bytes for dictionary size
val dictionarySize = 4 + values.map(columnType.actualSize).sum
// 4 `Short`s, 2 bytes each
val compressedSize = dictionarySize + 2 * 4
// 4 extra bytes for compression scheme type ID
expectResult(headerSize + 4 + compressedSize, "Wrong buffer capacity")(buffer.capacity)

// Skips column header
buffer.position(headerSize)
expectResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt())

val dictionary = buildDictionary(buffer)
Array[Short](0, 1).foreach { i =>
expectResult(i, "Wrong dictionary entry")(dictionary(values(i)))
}

Array[Short](0, 1, 0, 1).foreach {
expectResult(_, "Wrong column element value")(buffer.getShort())
val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount)
val dictValues = stableDistinct(inputSeq)

inputSeq.foreach(i => builder.appendFrom(rows(i), 0))

if (dictValues.length > DictionaryEncoding.MAX_DICT_SIZE) {
withClue("Dictionary overflowed, compression should fail") {
intercept[Throwable] {
builder.build()
}
}
} else {
val buffer = builder.build()
val headerSize = CompressionScheme.columnHeaderSize(buffer)
// 4 extra bytes for dictionary size
val dictionarySize = 4 + values.map(columnType.actualSize).sum
// 2 bytes for each `Short`
val compressedSize = 4 + dictionarySize + 2 * inputSeq.length
// 4 extra bytes for compression scheme type ID
expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)

// Skips column header
buffer.position(headerSize)
expectResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt())

val dictionary = buildDictionary(buffer).toMap

dictValues.foreach { i =>
expectResult(i, "Wrong dictionary entry") {
dictionary(values(i))
}
}

inputSeq.foreach { i =>
expectResult(i.toShort, "Wrong column element value")(buffer.getShort())
}

// -------------
// Tests decoder
// -------------

// Rewinds, skips column header and 4 more bytes for compression scheme ID
buffer.rewind().position(headerSize + 4)

val decoder = DictionaryEncoding.decoder(buffer, columnType)

inputSeq.foreach { i =>
expectResult(values(i), "Wrong decoded value")(decoder.next())
}

assert(!decoder.hasNext)
}

// -------------
// Tests decoder
// -------------

// Rewinds, skips column header and 4 more bytes for compression scheme ID
buffer.rewind().position(headerSize + 4)

val decoder = new DictionaryEncoding.Decoder[T](buffer, columnType)

Array[Short](0, 1, 0, 1).foreach { i =>
expectResult(values(i), "Wrong decoded value")(decoder.next())
}

assert(!decoder.hasNext)
}
}

test(s"$DictionaryEncoding: overflow") {
val builder = TestCompressibleColumnBuilder(new IntColumnStats, INT, DictionaryEncoding)
builder.initialize(0)
test(s"$DictionaryEncoding with $typeName: empty") {
skeleton(0, Seq.empty)
}

(0 to Short.MaxValue).foreach { n =>
val row = new GenericMutableRow(1)
row.setInt(0, n)
builder.appendFrom(row, 0)
test(s"$DictionaryEncoding with $typeName: simple case") {
skeleton(2, Seq(0, 1, 0, 1))
}

withClue("Dictionary overflowed, encoding should fail") {
intercept[Throwable] {
builder.build()
}
test(s"$DictionaryEncoding with $typeName: dictionary overflow") {
skeleton(DictionaryEncoding.MAX_DICT_SIZE + 1, 0 to DictionaryEncoding.MAX_DICT_SIZE)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ class RunLengthEncodingSuite extends FunSuite {
// -------------

val builder = TestCompressibleColumnBuilder(columnStats, columnType, RunLengthEncoding)
builder.initialize(0)

val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount)
val inputSeq = inputRuns.flatMap { case (index, run) =>
Seq.fill(run)(index)
Expand All @@ -56,13 +54,14 @@ class RunLengthEncodingSuite extends FunSuite {
// Column type ID + null count + null positions
val headerSize = CompressionScheme.columnHeaderSize(buffer)

// 4 extra bytes each run for run length
val compressedSize = inputRuns.map { case (index, _) =>
// Compression scheme ID + compressed contents
val compressedSize = 4 + inputRuns.map { case (index, _) =>
// 4 extra bytes each run for run length
columnType.actualSize(values(index)) + 4
}.sum

// 4 extra bytes for compression scheme type ID
expectResult(headerSize + 4 + compressedSize, "Wrong buffer capacity")(buffer.capacity)
expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)

// Skips column header
buffer.position(headerSize)
Expand All @@ -80,7 +79,7 @@ class RunLengthEncodingSuite extends FunSuite {
// Rewinds, skips column header and 4 more bytes for compression scheme ID
buffer.rewind().position(headerSize + 4)

val decoder = new RunLengthEncoding.Decoder[T](buffer, columnType)
val decoder = RunLengthEncoding.decoder(buffer, columnType)

inputSeq.foreach { i =>
expectResult(values(i), "Wrong decoded value")(decoder.next())
Expand All @@ -89,6 +88,10 @@ class RunLengthEncodingSuite extends FunSuite {
assert(!decoder.hasNext)
}

test(s"$RunLengthEncoding with $typeName: empty column") {
skeleton(0, Seq.empty)
}

test(s"$RunLengthEncoding with $typeName: simple case") {
skeleton(2, Seq(0 -> 2, 1 ->2))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ object TestCompressibleColumnBuilder {
columnType: NativeColumnType[T],
scheme: CompressionScheme) = {

new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme))
val builder = new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme))
builder.initialize(0)
builder
}
}