Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Refactored CompressionScheme, added 3 more compression schemes.
New schemes: BooleanBitSet, IntDelta and LongDelta
  • Loading branch information
liancheng committed Apr 5, 2014
commit 44fe4b25c44fa6dcadcf3a58de9be3e9bac58500
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ package org.apache.spark.sql.columnar
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.types._

/**
* Used to collect statistical information when building in-memory columns.
*
* NOTE: we intentionally avoid using `Ordering[T]` to compare values here because `Ordering[T]`
* brings significant performance penalty.
*/
private[sql] sealed abstract class ColumnStats[T <: DataType, JvmType] extends Serializable {
/**
* Closed lower bound of this column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]

import CompressionScheme._

val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder)
val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder[T])

protected def isWorthCompressing(encoder: Encoder) = {
protected def isWorthCompressing(encoder: Encoder[T]) = {
encoder.compressionRatio < 0.8
}

Expand All @@ -70,7 +70,7 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]

abstract override def build() = {
val rawBuffer = super.build()
val encoder = {
val encoder: Encoder[T] = {
val candidate = compressionEncoders.minBy(_.compressionRatio)
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import java.nio.ByteBuffer
import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}

private[sql] trait Encoder {
def gatherCompressibilityStats[T <: NativeType](
value: T#JvmType,
columnType: ColumnType[T, T#JvmType]) {}
private[sql] trait Encoder[T <: NativeType] {
def gatherCompressibilityStats(value: T#JvmType, columnType: NativeColumnType[T]) {}

def compressedSize: Int

Expand All @@ -35,10 +33,7 @@ private[sql] trait Encoder {
if (uncompressedSize > 0) compressedSize.toDouble / uncompressedSize else 1.0
}

def compress[T <: NativeType](
from: ByteBuffer,
to: ByteBuffer,
columnType: ColumnType[T, T#JvmType]): ByteBuffer
def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]): ByteBuffer
}

private[sql] trait Decoder[T <: NativeType] extends Iterator[T#JvmType]
Expand All @@ -48,7 +43,7 @@ private[sql] trait CompressionScheme {

def supports(columnType: ColumnType[_, _]): Boolean

def encoder: Encoder
def encoder[T <: NativeType]: Encoder[T]

def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T]
}
Expand All @@ -58,15 +53,18 @@ private[sql] trait WithCompressionSchemes {
}

private[sql] trait AllCompressionSchemes extends WithCompressionSchemes {
override val schemes: Seq[CompressionScheme] = {
Seq(PassThrough, RunLengthEncoding, DictionaryEncoding)
}
override val schemes: Seq[CompressionScheme] = CompressionScheme.all
}

private[sql] object CompressionScheme {
def apply(typeId: Int): CompressionScheme = typeId match {
case PassThrough.typeId => PassThrough
case _ => throw new UnsupportedOperationException()
val all: Seq[CompressionScheme] =
Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta)

private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap

def apply(typeId: Int): CompressionScheme = {
typeIdToScheme.getOrElse(typeId, throw new UnsupportedOperationException(
s"Unrecognized compression scheme type ID: $typeId"))
}

def copyColumnHeader(from: ByteBuffer, to: ByteBuffer) {
Expand Down
Loading