Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[WIP] in-memory columnar compression support
* Added two more compression schemes (RLE & dictionary encoding)
* Moved compression support code to columnar.compression
* Various refactoring
  • Loading branch information
liancheng committed Apr 1, 2014
commit 2780d6acad290277e0240599b6fe9c40a84b429a
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.{ByteOrder, ByteBuffer}

import org.apache.spark.sql.catalyst.types.{BinaryType, NativeType, DataType}
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor

/**
* An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
Expand Down Expand Up @@ -53,17 +54,17 @@ private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](
columnType.setField(row, ordinal, extractSingle(buffer))
}

def extractSingle(buffer: ByteBuffer) = columnType.extract(buffer)
def extractSingle(buffer: ByteBuffer): JvmType = columnType.extract(buffer)

protected def underlyingBuffer = buffer
}

private[sql] abstract class NativeColumnAccessor[T <: NativeType](
buffer: ByteBuffer,
columnType: NativeColumnType[T])
override protected val buffer: ByteBuffer,
override protected val columnType: NativeColumnType[T])
extends BasicColumnAccessor(buffer, columnType)
with NullableColumnAccessor
with CompressedColumnAccessor[T]
with CompressibleColumnAccessor[T]

private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, BOOLEAN)
Expand Down Expand Up @@ -98,9 +99,8 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer)
with NullableColumnAccessor

private[sql] object ColumnAccessor {
def apply(b: ByteBuffer): ColumnAccessor = {
// The first 4 bytes in the buffer indicates the column type.
val buffer = b.duplicate().order(ByteOrder.nativeOrder())
def apply(buffer: ByteBuffer): ColumnAccessor = {
// The first 4 bytes in the buffer indicate the column type.
val columnTypeId = buffer.getInt()

columnTypeId match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.{ByteBuffer, ByteOrder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar.ColumnBuilder._
import org.apache.spark.sql.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder}

private[sql] trait ColumnBuilder {
/**
Expand All @@ -30,22 +31,23 @@ private[sql] trait ColumnBuilder {
def initialize(initialSize: Int, columnName: String = "")

/**
* Gathers statistics information from `row(ordinal)`.
* Appends `row(ordinal)` to the column builder.
*/
def gatherStats(row: Row, ordinal: Int) {}
def appendFrom(row: Row, ordinal: Int)

/**
* Appends `row(ordinal)` to the column builder.
* Column statistics information
*/
def appendFrom(row: Row, ordinal: Int)
def columnStats: ColumnStats[_, _]

/**
* Returns the final columnar byte buffer.
*/
def build(): ByteBuffer
}

private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType](
private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
val columnStats: ColumnStats[T, JvmType],
val columnType: ColumnType[T, JvmType])
extends ColumnBuilder {

Expand Down Expand Up @@ -74,20 +76,20 @@ private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType](
}
}

private[sql] abstract class NativeColumnBuilder[T <: NativeType](
protected val columnStats: ColumnStats[T],
columnType: NativeColumnType[T])
extends BasicColumnBuilder(columnType)
private[sql] abstract class ComplexColumnBuilder[T <: DataType, JvmType](
columnType: ColumnType[T, JvmType])
extends BasicColumnBuilder[T, JvmType](new NoopColumnStats[T, JvmType], columnType)
with NullableColumnBuilder
with CompressedColumnBuilder[T] {

override def gatherStats(row: Row, ordinal: Int) {
columnStats.gatherStats(row, ordinal)
}
}
private[sql] abstract class NativeColumnBuilder[T <: NativeType](
override val columnStats: NativeColumnStats[T],
override val columnType: NativeColumnType[T])
extends BasicColumnBuilder[T, T#JvmType](columnStats, columnType)
with NullableColumnBuilder
with AllCompressionSchemes
with CompressibleColumnBuilder[T]

private[sql] class BooleanColumnBuilder
extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)
private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)

private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)

Expand All @@ -101,16 +103,12 @@ private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(new DoubleCol

private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColumnStats, FLOAT)

private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStates, STRING)
private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING)

private[sql] class BinaryColumnBuilder
extends BasicColumnBuilder[BinaryType.type, Array[Byte]](BINARY)
with NullableColumnBuilder
private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)

// TODO (lian) Add support for array, struct and map
private[sql] class GenericColumnBuilder
extends BasicColumnBuilder[DataType, Array[Byte]](GENERIC)
with NullableColumnBuilder
private[sql] class GenericColumnBuilder extends ComplexColumnBuilder(GENERIC)

private[sql] object ColumnBuilder {
val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104
Expand Down
Loading