Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import org.apache.spark.Logging
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.execution.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.{Strategy, execution}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.spark.sql.columnar
package org.apache.spark.sql.execution.columnar

import java.nio.{ByteBuffer, ByteOrder}

import org.apache.spark.sql.catalyst.expressions.{MutableRow, UnsafeArrayData, UnsafeMapData, UnsafeRow}
import org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor
import org.apache.spark.sql.execution.columnar.compression.CompressibleColumnAccessor
import org.apache.spark.sql.types._

/**
Expand All @@ -29,7 +29,7 @@ import org.apache.spark.sql.types._
* a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods
* for primitive values provided by [[MutableRow]].
*/
private[sql] trait ColumnAccessor {
private[columnar] trait ColumnAccessor {
initialize()

protected def initialize()
Expand All @@ -41,7 +41,7 @@ private[sql] trait ColumnAccessor {
protected def underlyingBuffer: ByteBuffer
}

private[sql] abstract class BasicColumnAccessor[JvmType](
private[columnar] abstract class BasicColumnAccessor[JvmType](
protected val buffer: ByteBuffer,
protected val columnType: ColumnType[JvmType])
extends ColumnAccessor {
Expand All @@ -61,65 +61,65 @@ private[sql] abstract class BasicColumnAccessor[JvmType](
protected def underlyingBuffer = buffer
}

private[sql] class NullColumnAccessor(buffer: ByteBuffer)
private[columnar] class NullColumnAccessor(buffer: ByteBuffer)
extends BasicColumnAccessor[Any](buffer, NULL)
with NullableColumnAccessor

private[sql] abstract class NativeColumnAccessor[T <: AtomicType](
private[columnar] abstract class NativeColumnAccessor[T <: AtomicType](
override protected val buffer: ByteBuffer,
override protected val columnType: NativeColumnType[T])
extends BasicColumnAccessor(buffer, columnType)
with NullableColumnAccessor
with CompressibleColumnAccessor[T]

private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
private[columnar] class BooleanColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, BOOLEAN)

private[sql] class ByteColumnAccessor(buffer: ByteBuffer)
private[columnar] class ByteColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, BYTE)

private[sql] class ShortColumnAccessor(buffer: ByteBuffer)
private[columnar] class ShortColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, SHORT)

private[sql] class IntColumnAccessor(buffer: ByteBuffer)
private[columnar] class IntColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, INT)

private[sql] class LongColumnAccessor(buffer: ByteBuffer)
private[columnar] class LongColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, LONG)

private[sql] class FloatColumnAccessor(buffer: ByteBuffer)
private[columnar] class FloatColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, FLOAT)

private[sql] class DoubleColumnAccessor(buffer: ByteBuffer)
private[columnar] class DoubleColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, DOUBLE)

private[sql] class StringColumnAccessor(buffer: ByteBuffer)
private[columnar] class StringColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, STRING)

private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
private[columnar] class BinaryColumnAccessor(buffer: ByteBuffer)
extends BasicColumnAccessor[Array[Byte]](buffer, BINARY)
with NullableColumnAccessor

private[sql] class CompactDecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType)
private[columnar] class CompactDecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType)
extends NativeColumnAccessor(buffer, COMPACT_DECIMAL(dataType))

private[sql] class DecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType)
private[columnar] class DecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType)
extends BasicColumnAccessor[Decimal](buffer, LARGE_DECIMAL(dataType))
with NullableColumnAccessor

private[sql] class StructColumnAccessor(buffer: ByteBuffer, dataType: StructType)
private[columnar] class StructColumnAccessor(buffer: ByteBuffer, dataType: StructType)
extends BasicColumnAccessor[UnsafeRow](buffer, STRUCT(dataType))
with NullableColumnAccessor

private[sql] class ArrayColumnAccessor(buffer: ByteBuffer, dataType: ArrayType)
private[columnar] class ArrayColumnAccessor(buffer: ByteBuffer, dataType: ArrayType)
extends BasicColumnAccessor[UnsafeArrayData](buffer, ARRAY(dataType))
with NullableColumnAccessor

private[sql] class MapColumnAccessor(buffer: ByteBuffer, dataType: MapType)
private[columnar] class MapColumnAccessor(buffer: ByteBuffer, dataType: MapType)
extends BasicColumnAccessor[UnsafeMapData](buffer, MAP(dataType))
with NullableColumnAccessor

private[sql] object ColumnAccessor {
private[columnar] object ColumnAccessor {
def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = {
val buf = buffer.order(ByteOrder.nativeOrder)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
* limitations under the License.
*/

package org.apache.spark.sql.columnar
package org.apache.spark.sql.execution.columnar

import java.nio.{ByteBuffer, ByteOrder}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.columnar.ColumnBuilder._
import org.apache.spark.sql.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder}
import org.apache.spark.sql.execution.columnar.ColumnBuilder._
import org.apache.spark.sql.execution.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder}
import org.apache.spark.sql.types._

private[sql] trait ColumnBuilder {
private[columnar] trait ColumnBuilder {
/**
* Initializes with an approximate lower bound on the expected number of elements in this column.
*/
Expand All @@ -46,7 +46,7 @@ private[sql] trait ColumnBuilder {
def build(): ByteBuffer
}

private[sql] class BasicColumnBuilder[JvmType](
private[columnar] class BasicColumnBuilder[JvmType](
val columnStats: ColumnStats,
val columnType: ColumnType[JvmType])
extends ColumnBuilder {
Expand Down Expand Up @@ -84,58 +84,63 @@ private[sql] class BasicColumnBuilder[JvmType](
}
}

private[sql] class NullColumnBuilder
private[columnar] class NullColumnBuilder
extends BasicColumnBuilder[Any](new ObjectColumnStats(NullType), NULL)
with NullableColumnBuilder

private[sql] abstract class ComplexColumnBuilder[JvmType](
private[columnar] abstract class ComplexColumnBuilder[JvmType](
columnStats: ColumnStats,
columnType: ColumnType[JvmType])
extends BasicColumnBuilder[JvmType](columnStats, columnType)
with NullableColumnBuilder

private[sql] abstract class NativeColumnBuilder[T <: AtomicType](
private[columnar] abstract class NativeColumnBuilder[T <: AtomicType](
override val columnStats: ColumnStats,
override val columnType: NativeColumnType[T])
extends BasicColumnBuilder[T#InternalType](columnStats, columnType)
with NullableColumnBuilder
with AllCompressionSchemes
with CompressibleColumnBuilder[T]

private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)
private[columnar]
class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)

private[sql] class ByteColumnBuilder extends NativeColumnBuilder(new ByteColumnStats, BYTE)
private[columnar]
class ByteColumnBuilder extends NativeColumnBuilder(new ByteColumnStats, BYTE)

private[sql] class ShortColumnBuilder extends NativeColumnBuilder(new ShortColumnStats, SHORT)
private[columnar] class ShortColumnBuilder extends NativeColumnBuilder(new ShortColumnStats, SHORT)

private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)
private[columnar] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)

private[sql] class LongColumnBuilder extends NativeColumnBuilder(new LongColumnStats, LONG)
private[columnar] class LongColumnBuilder extends NativeColumnBuilder(new LongColumnStats, LONG)

private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColumnStats, FLOAT)
private[columnar] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColumnStats, FLOAT)

private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(new DoubleColumnStats, DOUBLE)
private[columnar]
class DoubleColumnBuilder extends NativeColumnBuilder(new DoubleColumnStats, DOUBLE)

private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING)
private[columnar]
class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING)

private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(new BinaryColumnStats, BINARY)
private[columnar]
class BinaryColumnBuilder extends ComplexColumnBuilder(new BinaryColumnStats, BINARY)

private[sql] class CompactDecimalColumnBuilder(dataType: DecimalType)
private[columnar] class CompactDecimalColumnBuilder(dataType: DecimalType)
extends NativeColumnBuilder(new DecimalColumnStats(dataType), COMPACT_DECIMAL(dataType))

private[sql] class DecimalColumnBuilder(dataType: DecimalType)
private[columnar] class DecimalColumnBuilder(dataType: DecimalType)
extends ComplexColumnBuilder(new DecimalColumnStats(dataType), LARGE_DECIMAL(dataType))

private[sql] class StructColumnBuilder(dataType: StructType)
private[columnar] class StructColumnBuilder(dataType: StructType)
extends ComplexColumnBuilder(new ObjectColumnStats(dataType), STRUCT(dataType))

private[sql] class ArrayColumnBuilder(dataType: ArrayType)
private[columnar] class ArrayColumnBuilder(dataType: ArrayType)
extends ComplexColumnBuilder(new ObjectColumnStats(dataType), ARRAY(dataType))

private[sql] class MapColumnBuilder(dataType: MapType)
private[columnar] class MapColumnBuilder(dataType: MapType)
extends ComplexColumnBuilder(new ObjectColumnStats(dataType), MAP(dataType))

private[sql] object ColumnBuilder {
private[columnar] object ColumnBuilder {
val DEFAULT_INITIAL_BUFFER_SIZE = 128 * 1024
val MAX_BATCH_SIZE_IN_BYTE = 4 * 1024 * 1024L

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* limitations under the License.
*/

package org.apache.spark.sql.columnar
package org.apache.spark.sql.execution.columnar

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
private[columnar] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
val upperBound = AttributeReference(a.name + ".upperBound", a.dataType, nullable = true)()
val lowerBound = AttributeReference(a.name + ".lowerBound", a.dataType, nullable = true)()
val nullCount = AttributeReference(a.name + ".nullCount", IntegerType, nullable = false)()
Expand All @@ -32,7 +32,7 @@ private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
val schema = Seq(lowerBound, upperBound, nullCount, count, sizeInBytes)
}

private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {
private[columnar] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {
val (forAttribute, schema) = {
val allStats = tableSchema.map(a => a -> new ColumnStatisticsSchema(a))
(AttributeMap(allStats), allStats.map(_._2.schema).foldLeft(Seq.empty[Attribute])(_ ++ _))
Expand All @@ -45,10 +45,10 @@ private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Seri
* NOTE: we intentionally avoid using `Ordering[T]` to compare values here because `Ordering[T]`
* brings significant performance penalty.
*/
private[sql] sealed trait ColumnStats extends Serializable {
private[columnar] sealed trait ColumnStats extends Serializable {
protected var count = 0
protected var nullCount = 0
private[sql] var sizeInBytes = 0L
private[columnar] var sizeInBytes = 0L

/**
* Gathers statistics information from `row(ordinal)`.
Expand All @@ -72,14 +72,14 @@ private[sql] sealed trait ColumnStats extends Serializable {
/**
* A no-op ColumnStats only used for testing purposes.
*/
private[sql] class NoopColumnStats extends ColumnStats {
private[columnar] class NoopColumnStats extends ColumnStats {
override def gatherStats(row: InternalRow, ordinal: Int): Unit = super.gatherStats(row, ordinal)

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](null, null, nullCount, count, 0L))
}

private[sql] class BooleanColumnStats extends ColumnStats {
private[columnar] class BooleanColumnStats extends ColumnStats {
protected var upper = false
protected var lower = true

Expand All @@ -97,7 +97,7 @@ private[sql] class BooleanColumnStats extends ColumnStats {
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
}

private[sql] class ByteColumnStats extends ColumnStats {
private[columnar] class ByteColumnStats extends ColumnStats {
protected var upper = Byte.MinValue
protected var lower = Byte.MaxValue

Expand All @@ -115,7 +115,7 @@ private[sql] class ByteColumnStats extends ColumnStats {
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
}

private[sql] class ShortColumnStats extends ColumnStats {
private[columnar] class ShortColumnStats extends ColumnStats {
protected var upper = Short.MinValue
protected var lower = Short.MaxValue

Expand All @@ -133,7 +133,7 @@ private[sql] class ShortColumnStats extends ColumnStats {
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
}

private[sql] class IntColumnStats extends ColumnStats {
private[columnar] class IntColumnStats extends ColumnStats {
protected var upper = Int.MinValue
protected var lower = Int.MaxValue

Expand All @@ -151,7 +151,7 @@ private[sql] class IntColumnStats extends ColumnStats {
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
}

private[sql] class LongColumnStats extends ColumnStats {
private[columnar] class LongColumnStats extends ColumnStats {
protected var upper = Long.MinValue
protected var lower = Long.MaxValue

Expand All @@ -169,7 +169,7 @@ private[sql] class LongColumnStats extends ColumnStats {
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
}

private[sql] class FloatColumnStats extends ColumnStats {
private[columnar] class FloatColumnStats extends ColumnStats {
protected var upper = Float.MinValue
protected var lower = Float.MaxValue

Expand All @@ -187,7 +187,7 @@ private[sql] class FloatColumnStats extends ColumnStats {
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
}

private[sql] class DoubleColumnStats extends ColumnStats {
private[columnar] class DoubleColumnStats extends ColumnStats {
protected var upper = Double.MinValue
protected var lower = Double.MaxValue

Expand All @@ -205,7 +205,7 @@ private[sql] class DoubleColumnStats extends ColumnStats {
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
}

private[sql] class StringColumnStats extends ColumnStats {
private[columnar] class StringColumnStats extends ColumnStats {
protected var upper: UTF8String = null
protected var lower: UTF8String = null

Expand All @@ -223,7 +223,7 @@ private[sql] class StringColumnStats extends ColumnStats {
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
}

private[sql] class BinaryColumnStats extends ColumnStats {
private[columnar] class BinaryColumnStats extends ColumnStats {
override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
Expand All @@ -235,7 +235,7 @@ private[sql] class BinaryColumnStats extends ColumnStats {
new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes))
}

private[sql] class DecimalColumnStats(precision: Int, scale: Int) extends ColumnStats {
private[columnar] class DecimalColumnStats(precision: Int, scale: Int) extends ColumnStats {
def this(dt: DecimalType) = this(dt.precision, dt.scale)

protected var upper: Decimal = null
Expand All @@ -256,7 +256,7 @@ private[sql] class DecimalColumnStats(precision: Int, scale: Int) extends Column
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
}

private[sql] class ObjectColumnStats(dataType: DataType) extends ColumnStats {
private[columnar] class ObjectColumnStats(dataType: DataType) extends ColumnStats {
val columnType = ColumnType(dataType)

override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
Expand Down
Loading