Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
aa688fe
Adding conversion of nested Parquet schemas
AndreSchumacher Mar 26, 2014
4d4892a
First commit nested Parquet read converters
AndreSchumacher Mar 27, 2014
6125c75
First working nested Parquet record input
AndreSchumacher Mar 27, 2014
745a42b
Completing testcase for nested data (Addressbook(
AndreSchumacher Apr 1, 2014
ddb40d2
Extending tests for nested Parquet data
AndreSchumacher Apr 1, 2014
1b1b3d6
Fixing one problem with nested arrays
AndreSchumacher Apr 2, 2014
5d80461
fixing one problem with nested structs and breaking up files
AndreSchumacher Apr 2, 2014
98219cf
added struct converter
AndreSchumacher Apr 2, 2014
ee70125
fixing one problem with arrayconverter
AndreSchumacher Apr 3, 2014
b7fcc35
Documenting conversions, bugfix, wrappers of Rows
AndreSchumacher Apr 4, 2014
6dbc9b7
Fixing some problems intruduced during rebase
AndreSchumacher Apr 6, 2014
f8f8911
For primitive rows fall back to more efficient converter, code reorg
AndreSchumacher Apr 6, 2014
4e25fcb
Adding resolution of complex ArrayTypes
AndreSchumacher Apr 8, 2014
a594aed
Scalastyle
AndreSchumacher Apr 8, 2014
b539fde
First commit for MapType
AndreSchumacher Apr 11, 2014
824500c
Adding attribute resolution for MapType
AndreSchumacher Apr 11, 2014
f777b4b
Scalastyle
AndreSchumacher Apr 11, 2014
d1911dc
Simplifying ArrayType conversion
AndreSchumacher Apr 12, 2014
1dc5ac9
First version of WriteSupport for nested types
AndreSchumacher Apr 12, 2014
e99cc51
Fixing nested WriteSupport and adding tests
AndreSchumacher Apr 13, 2014
adc1258
Optimizing imports
AndreSchumacher Apr 13, 2014
f466ff0
Added ParquetAvro tests and revised Array conversion
AndreSchumacher Apr 13, 2014
79d81d5
Replacing field names for array and map in WriteSupport
AndreSchumacher Apr 13, 2014
619c397
Completing Map testcase
AndreSchumacher Apr 14, 2014
c52ff2c
Adding native-array converter
AndreSchumacher Apr 19, 2014
431f00f
Fixing problems introduced during rebase
AndreSchumacher Apr 19, 2014
a6b4f05
Cleaning up ArrayConverter, moving classTag to NativeType, adding Nat…
AndreSchumacher Apr 20, 2014
0ae9376
Doc strings and simplifying ParquetConverter.scala
AndreSchumacher May 10, 2014
32229c7
Removing Row nested values and placing by generic types
AndreSchumacher May 11, 2014
cbb5793
Code review feedback
AndreSchumacher May 11, 2014
191bc0d
Changing to Seq for ArrayType, refactoring SQLParser for nested field…
AndreSchumacher May 24, 2014
2f5a805
Removing stripMargin from test schemas
AndreSchumacher May 24, 2014
de02538
Cleaning up ParquetTestData
AndreSchumacher May 24, 2014
31465d6
Scalastyle: fixing commented out bottom
AndreSchumacher May 24, 2014
3c6b25f
Trying to reduce no-op changes wrt master
AndreSchumacher Jun 1, 2014
3104886
Nested Rows should be Rows, not Seqs.
marmbrus Jun 3, 2014
f7aeba3
[SPARK-1982] Support for ByteType and ShortType.
marmbrus Jun 3, 2014
3e1456c
WIP: Directly serialize catalyst attributes.
marmbrus Jun 4, 2014
14c3fd8
Attempting to fix Spark-Parquet schema conversion
AndreSchumacher Jun 4, 2014
37e0a0a
Cleaning up
AndreSchumacher Jun 4, 2014
88e6bdb
Attempting to fix loss of schema
AndreSchumacher Jun 4, 2014
63d1b57
Cleaning up and Scalastyle
AndreSchumacher Jun 8, 2014
b8a8b9a
More fixes to short and byte conversion
AndreSchumacher Jun 8, 2014
403061f
Fixing some issues with tests and schema metadata
AndreSchumacher Jun 8, 2014
94eea3a
Scalastyle
AndreSchumacher Jun 8, 2014
7eceb67
Review feedback
AndreSchumacher Jun 19, 2014
95c1367
Changes to ParquetRelation and its metadata
AndreSchumacher Jun 19, 2014
30708c8
Taking out AvroParquet test for now to remove Avro dependency
AndreSchumacher Jun 20, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Cleaning up ArrayConverter, moving classTag to NativeType, adding Nat…
…iveRow
  • Loading branch information
AndreSchumacher committed Jun 19, 2014
commit a6b4f050c02e18409e052ae9c9e2489deac09b0d
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,67 @@ class GenericMutableRow(size: Int) extends GenericRow(size) with MutableRow {
override def copy() = new GenericRow(values.clone())
}

// TODO: this is an awful lot of code duplication. If values would be covariant we could reuse
// much of GenericRow
class NativeRow[T](protected[catalyst] val values: Array[T]) extends Row {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this class? Arrays don't need to be Rows inside of the execution engine, they only need to be of type Seq, and even that requirements should probably be removed. Instead of NativeRow can we just call toSeq on the Array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus Good question. I think I added that because GetField wants to get a Row when it calls eval on its children. I will have another look at that.


/** No-arg constructor for serialization. */
def this() = this(null)

def this(elementType: NativeType, size: Int) =
this(elementType.classTag.newArray(size).asInstanceOf[Array[T]])

def iterator = values.iterator

def length = values.length

def apply(i: Int) = values(i)

def isNullAt(i: Int) = values(i) == null

def getInt(i: Int): Int = {
if (values(i) == null) sys.error("Failed to check null bit for primitive int value.")
values(i).asInstanceOf[Int]
}

def getLong(i: Int): Long = {
if (values(i) == null) sys.error("Failed to check null bit for primitive long value.")
values(i).asInstanceOf[Long]
}

def getDouble(i: Int): Double = {
if (values(i) == null) sys.error("Failed to check null bit for primitive double value.")
values(i).asInstanceOf[Double]
}

def getFloat(i: Int): Float = {
if (values(i) == null) sys.error("Failed to check null bit for primitive float value.")
values(i).asInstanceOf[Float]
}

def getBoolean(i: Int): Boolean = {
if (values(i) == null) sys.error("Failed to check null bit for primitive boolean value.")
values(i).asInstanceOf[Boolean]
}

def getShort(i: Int): Short = {
if (values(i) == null) sys.error("Failed to check null bit for primitive short value.")
values(i).asInstanceOf[Short]
}

def getByte(i: Int): Byte = {
if (values(i) == null) sys.error("Failed to check null bit for primitive byte value.")
values(i).asInstanceOf[Byte]
}

def getString(i: Int): String = {
if (values(i) == null) sys.error("Failed to check null bit for primitive String value.")
values(i).asInstanceOf[String]
}

def copy() = this
}


class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package org.apache.spark.sql.catalyst.types

import java.sql.Timestamp

import scala.reflect.runtime.universe.{typeTag, TypeTag}
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror}

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.util.Utils

abstract class DataType {
/** Matches any expression that evaluates to this DataType */
Expand All @@ -43,6 +45,11 @@ abstract class NativeType extends DataType {
type JvmType
@transient val tag: TypeTag[JvmType]
val ordering: Ordering[JvmType]

@transient val classTag = {
val mirror = runtimeMirror(Utils.getSparkClassLoader)
ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
}
}

case object StringType extends NativeType with PrimitiveType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
package org.apache.spark.sql.parquet

import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.runtimeMirror

import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
import parquet.schema.MessageType

import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Row, Attribute}
import org.apache.spark.sql.catalyst.expressions.{NativeRow, GenericRow, Row, Attribute}
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
import org.apache.spark.util.Utils

private[parquet] object CatalystConverter {
// The type internally used for fields
Expand Down Expand Up @@ -83,7 +80,7 @@ private[parquet] object CatalystConverter {
val attributes = ParquetTypesConverter.convertToAttributes(parquetSchema)
// For non-nested types we use the optimized Row converter
if (attributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))) {
new MutableRowGroupConverter(attributes)
new PrimitiveRowGroupConverter(attributes)
} else {
new CatalystGroupConverter(attributes)
}
Expand Down Expand Up @@ -170,6 +167,9 @@ private[parquet] class CatalystGroupConverter(
def getCurrentRecord: Row = {
assert(isRootConverter, "getCurrentRecord should only be called in root group converter!")
// TODO: use iterators if possible
// Note: this will ever only be called in the root converter when the record has been
// fully processed. Therefore it will be difficult to use mutable rows instead, since
// any non-root converter never would be sure when it would be safe to re-use the buffer.
new GenericRow(current.toArray)
}

Expand All @@ -180,14 +180,9 @@ private[parquet] class CatalystGroupConverter(
current.update(fieldIndex, value)
}

override protected[parquet] def clearBuffer(): Unit = {
// TODO: reuse buffer?
buffer = new ArrayBuffer[Row](CatalystArrayConverter.INITIAL_ARRAY_SIZE)
}
override protected[parquet] def clearBuffer(): Unit = buffer.clear()

override def start(): Unit = {
// TODO: reuse buffer?
// Allocate new array in the root converter (others will be called clearBuffer() on)
current = ArrayBuffer.fill(schema.length)(null)
converters.foreach {
converter => if (!converter.isPrimitive) {
Expand All @@ -196,12 +191,10 @@ private[parquet] class CatalystGroupConverter(
}
}

// TODO: think about reusing the buffer
override def end(): Unit = {
if (!isRootConverter) {
assert(current!=null) // there should be no empty groups
buffer.append(new GenericRow(current.toArray))
// TODO: use iterators if possible, avoid Row wrapping
parent.updateField(index, new GenericRow(buffer.toArray.asInstanceOf[Array[Any]]))
}
}
Expand All @@ -212,7 +205,7 @@ private[parquet] class CatalystGroupConverter(
* to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. Note that his
* converter is optimized for rows of primitive types (non-nested records).
*/
private[parquet] class MutableRowGroupConverter(
private[parquet] class PrimitiveRowGroupConverter(
protected[parquet] val schema: Seq[FieldType],
protected[parquet] var current: ParquetRelation.RowType)
extends GroupConverter with CatalystConverter {
Expand Down Expand Up @@ -334,7 +327,7 @@ object CatalystArrayConverter {
* [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
* [[org.apache.spark.sql.catalyst.types.ArrayType]].
*
* @param elementType The type of the array elements
* @param elementType The type of the array elements (complex or primitive)
* @param index The position of this (array) field inside its parent converter
* @param parent The parent converter
* @param buffer A data buffer
Expand All @@ -345,8 +338,6 @@ private[parquet] class CatalystArrayConverter(
protected[parquet] val parent: CatalystConverter,
protected[parquet] var buffer: Buffer[Any])
extends GroupConverter with CatalystConverter {
// TODO: In the future consider using native arrays instead of buffer for
// primitive types for performance reasons

def this(elementType: DataType, index: Int, parent: CatalystConverter) =
this(
Expand Down Expand Up @@ -374,8 +365,7 @@ private[parquet] class CatalystArrayConverter(
}

override protected[parquet] def clearBuffer(): Unit = {
// TODO: reuse buffer?
buffer = new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE)
buffer.clear()
}

override def start(): Unit = {
Expand All @@ -384,10 +374,8 @@ private[parquet] class CatalystArrayConverter(
}
}

// TODO: think about reusing the buffer
override def end(): Unit = {
assert(parent != null)
// TODO: use iterators if possible, avoid Row wrapping
parent.updateField(index, new GenericRow(buffer.toArray))
clearBuffer()
}
Expand All @@ -396,20 +384,27 @@ private[parquet] class CatalystArrayConverter(
override def getCurrentRecord: Row = throw new UnsupportedOperationException
}

private[parquet] class CatalystNativeArrayConverter[T <: NativeType](
/**
* A `parquet.io.api.GroupConverter` that converts a single-element groups that
* match the characteristics of an array (see
* [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
* [[org.apache.spark.sql.catalyst.types.ArrayType]].
*
* @param elementType The type of the array elements (native)
* @param index The position of this (array) field inside its parent converter
* @param parent The parent converter
* @param capacity The (initial) capacity of the buffer
*/
private[parquet] class CatalystNativeArrayConverter(
val elementType: NativeType,
val index: Int,
protected[parquet] val parent: CatalystConverter,
protected[parquet] var capacity: Int = CatalystArrayConverter.INITIAL_ARRAY_SIZE)
extends GroupConverter with CatalystConverter {

// similar comment as in [[Decoder]]: this should probably be in NativeType
private val classTag = {
val mirror = runtimeMirror(Utils.getSparkClassLoader)
ClassTag[T#JvmType](mirror.runtimeClass(elementType.tag.tpe))
}
type nativeType = elementType.JvmType

private var buffer: Array[T#JvmType] = classTag.newArray(capacity)
private var buffer: Array[nativeType] = elementType.classTag.newArray(capacity)

private var elements: Int = 0

Expand All @@ -432,43 +427,43 @@ private[parquet] class CatalystNativeArrayConverter[T <: NativeType](
// Overriden here to avoid auto-boxing for primitive types
override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = {
checkGrowBuffer()
buffer(elements) = value.asInstanceOf[T#JvmType]
buffer(elements) = value.asInstanceOf[nativeType]
elements += 1
}

override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = {
checkGrowBuffer()
buffer(elements) = value.asInstanceOf[T#JvmType]
buffer(elements) = value.asInstanceOf[nativeType]
elements += 1
}

override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = {
checkGrowBuffer()
buffer(elements) = value.asInstanceOf[T#JvmType]
buffer(elements) = value.asInstanceOf[nativeType]
elements += 1
}

override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = {
checkGrowBuffer()
buffer(elements) = value.asInstanceOf[T#JvmType]
buffer(elements) = value.asInstanceOf[nativeType]
elements += 1
}

override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = {
checkGrowBuffer()
buffer(elements) = value.asInstanceOf[T#JvmType]
buffer(elements) = value.asInstanceOf[nativeType]
elements += 1
}

override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = {
checkGrowBuffer()
buffer(elements) = value.getBytes.asInstanceOf[T#JvmType]
buffer(elements) = value.getBytes.asInstanceOf[nativeType]
elements += 1
}

override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit = {
checkGrowBuffer()
buffer(elements) = value.toStringUsingUTF8.asInstanceOf[T#JvmType]
buffer(elements) = value.toStringUsingUTF8.asInstanceOf[nativeType]
elements += 1
}

Expand All @@ -482,12 +477,7 @@ private[parquet] class CatalystNativeArrayConverter[T <: NativeType](
assert(parent != null)
parent.updateField(
index,
new GenericRow {
// TODO: it would be much nicer to use a view here but GenericRow requires an Array
// TODO: we should avoid using GenericRow as a wrapper but [[GetField]] current
// requires that
override val values = buffer.slice(0, elements).map(_.asInstanceOf[Any])
})
new NativeRow[nativeType](buffer.slice(0, elements)))
clearBuffer()
}

Expand All @@ -497,7 +487,7 @@ private[parquet] class CatalystNativeArrayConverter[T <: NativeType](
private def checkGrowBuffer(): Unit = {
if (elements >= capacity) {
val newCapacity = 2 * capacity
val tmp: Array[T#JvmType] = classTag.newArray(newCapacity)
val tmp: Array[nativeType] = elementType.classTag.newArray(newCapacity)
Array.copy(buffer, 0, tmp, 0, capacity)
buffer = tmp
capacity = newCapacity
Expand Down