Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix
  • Loading branch information
dongjoon-hyun committed Nov 29, 2017
commit 8e0d392602ac30e938a06a7414ea2058e046fedc
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ private[orc] class OrcDeserializer(

private[this] val mutableRow = new SpecificInternalRow(requiredSchema.map(_.dataType))

private[this] val unwrappers = requiredSchema.fields.map(f => unwrapperFor(f.dataType))
private[this] val length = requiredSchema.length

private[this] val unwrappers = requiredSchema.map(_.dataType).map(unwrapperFor).toArray

def deserialize(orcStruct: OrcStruct): InternalRow = {
var i = 0
val len = requiredSchema.length
val names = orcStruct.getSchema.getFieldNames
while (i < len) {
while (i < length) {
val name = requiredSchema(i).name
val writable = if (missingColumnNames.contains(name)) {
null
Expand All @@ -65,6 +66,46 @@ private[orc] class OrcDeserializer(
mutableRow
}

private[this] def unwrapperFor(dataType: DataType): (Any, InternalRow, Int) => Unit =
dataType match {
case NullType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setNullAt(ordinal)

case BooleanType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setBoolean(ordinal, value.asInstanceOf[BooleanWritable].get)

case ByteType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setByte(ordinal, value.asInstanceOf[ByteWritable].get)

case ShortType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setShort(ordinal, value.asInstanceOf[ShortWritable].get)

case IntegerType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setInt(ordinal, value.asInstanceOf[IntWritable].get)

case LongType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setLong(ordinal, value.asInstanceOf[LongWritable].get)

case FloatType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setFloat(ordinal, value.asInstanceOf[FloatWritable].get)

case DoubleType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setDouble(ordinal, value.asInstanceOf[DoubleWritable].get)

case _ =>
val unwrapper = getValueUnwrapper(dataType)
(value: Any, row: InternalRow, ordinal: Int) =>
row(ordinal) = unwrapper(value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

case s: StructType =>
  val types = s.map(_.dataType)
  val unwrappers = types.map(unwrapperFor)
  val mutableRow = new SpecificInternalRow(types)
  (value: Any, row: InternalRow, ordinal: Int) => {
    row(ordinal) = mutableRow
    val orcStruct = value.asInstanceOf[OrcStruct]
    var i = 0
    whie (i < unwrappers.length) {
      val fieldValue = orcStruct. getFieldValue(s(i).name)
      if (fieldValue == null) {
        mutableRow.setNullAt(i)
      } else {
        unwrapper(, mutableRow, i)
      }
      i += 1
     }
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add array and map type here, then we can remove convertOrcStructToInternalRow and getValueUnwrapper

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops this approach doesn't work for array and map type. I think we should follow ParquetRowConverter and introduce a trait OrcDataUpdater, then we implement StructDataUpdater, ArrayDataUpdater and MapDataUpdater, and the returned function should be (Any, OrcDataUpdater, Int) => Unit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan . The current way is an old ORC way. Do we need to introduce the Parquet way for some performance reason?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, for example, if the mappings look like the following, do we need to refactor some of the pattern between Parquet and ORC?

  • ArrayDataUpdater <= ParquetArrayConverter
  • MapDataUpdater <= ParquetMapConverter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel Updater is more precise here, let's not follow the naming of parquet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not only for performance, but also remove duplicated code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks.

}

/**
* Convert Apache ORC OrcStruct to Apache Spark InternalRow.
* If internalRow is not None, fill into it. Otherwise, create a SpecificInternalRow and use it.
Expand Down Expand Up @@ -173,43 +214,4 @@ private[orc] class OrcDeserializer(
case _ =>
throw new UnsupportedOperationException(s"$dataType is not supported yet.")
}

private[this] def unwrapperFor(dataType: DataType): (Any, InternalRow, Int) => Unit =
dataType match {
case NullType =>
(value: Any, row: InternalRow, ordinal: Int) => row.setNullAt(ordinal)

case BooleanType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setBoolean(ordinal, value.asInstanceOf[BooleanWritable].get)

case ByteType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setByte(ordinal, value.asInstanceOf[ByteWritable].get)

case ShortType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setShort(ordinal, value.asInstanceOf[ShortWritable].get)

case IntegerType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setInt(ordinal, value.asInstanceOf[IntWritable].get)

case LongType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setLong(ordinal, value.asInstanceOf[LongWritable].get)

case FloatType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setFloat(ordinal, value.asInstanceOf[FloatWritable].get)

case DoubleType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setDouble(ordinal, value.asInstanceOf[DoubleWritable].get)

case _ =>
val unwrapper = getValueUnwrapper(dataType)
(value: Any, row: InternalRow, ordinal: Int) =>
row(ordinal) = unwrapper(value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,70 @@ import org.apache.orc.storage.common.`type`.HiveDecimal
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.datasources.orc.OrcUtils.{getTypeDescription, withNullSafe}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

private[orc] class OrcSerializer(dataSchema: StructType) {

private[this] lazy val orcStruct: OrcStruct =
createOrcValue(dataSchema).asInstanceOf[OrcStruct]
private[this] lazy val orcStruct: OrcStruct = createOrcValue(dataSchema).asInstanceOf[OrcStruct]

private[this] val writableWrappers =
dataSchema.fields.map(f => getWritableWrapper(f.dataType))
private[this] lazy val length = dataSchema.length

private[this] val writers = dataSchema.map(_.dataType).map(makeWriter).toArray

def serialize(row: InternalRow): OrcStruct = {
convertInternalRowToOrcStruct(row, dataSchema, Some(writableWrappers), Some(orcStruct))
var i = 0
while (i < length) {
if (row.isNullAt(i)) {
orcStruct.setFieldValue(i, null)
} else {
writers(i)(row, i)
}
i += 1
}
orcStruct
}

private[this] def makeWriter(dataType: DataType): (SpecializedGetters, Int) => Unit = {
dataType match {
case BooleanType =>
(row: SpecializedGetters, ordinal: Int) =>
orcStruct.setFieldValue(ordinal, new BooleanWritable(row.getBoolean(ordinal)))

case ByteType =>
(row: SpecializedGetters, ordinal: Int) =>
orcStruct.setFieldValue(ordinal, new ByteWritable(row.getByte(ordinal)))

case ShortType =>
(row: SpecializedGetters, ordinal: Int) =>
orcStruct.setFieldValue(ordinal, new ShortWritable(row.getShort(ordinal)))

case IntegerType =>
(row: SpecializedGetters, ordinal: Int) =>
orcStruct.setFieldValue(ordinal, new IntWritable(row.getInt(ordinal)))

case LongType =>
(row: SpecializedGetters, ordinal: Int) =>
orcStruct.setFieldValue(ordinal, new LongWritable(row.getLong(ordinal)))

case FloatType =>
(row: SpecializedGetters, ordinal: Int) =>
orcStruct.setFieldValue(ordinal, new FloatWritable(row.getFloat(ordinal)))

case DoubleType =>
(row: SpecializedGetters, ordinal: Int) =>
orcStruct.setFieldValue(ordinal, new DoubleWritable(row.getDouble(ordinal)))

case _ =>
val wrapper = getWritableWrapper(dataType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In getWritableWrapper, the above data types are also addressed, isn't duplicate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the previous @cloud-fan 's comment. This function is added to avoid boxing issue.

(row: SpecializedGetters, ordinal: Int) => {
val value = wrapper(row.get(ordinal, dataType)).asInstanceOf[WritableComparable[_]]
orcStruct.setFieldValue(ordinal, value)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

case s: StrcutType =>
  val writers = s.map(_.dataType).map(makeWriter).toArray
  val result = createOrcValue(s)
  (row: SpecializedGetters, ordinal: Int) => {
    val struct = row.getStruct(ordinal, s.length)
    var i = 0
    while (i < s.length) {
      if (struct.isNullAt(i)) {
        result.setFieldValue(i, null)
      } else {
        writers(i)(struct, i)
      }
    }
    orcStruct.setFieldValue(ordinal, result)
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the writer should be (SpecializedGetters, Int, OrcValue) => Unit

}
}

/**
Expand All @@ -50,24 +98,22 @@ private[orc] class OrcSerializer(dataSchema: StructType) {
/**
* Convert Apache Spark InternalRow to Apache ORC OrcStruct.
*/
private[this] def convertInternalRowToOrcStruct(
row: InternalRow,
schema: StructType,
valueWrappers: Option[Seq[Any => Any]] = None,
struct: Option[OrcStruct] = None): OrcStruct = {
val wrappers =
valueWrappers.getOrElse(schema.fields.map(_.dataType).map(getWritableWrapper).toSeq)
val orcStruct = struct.getOrElse(createOrcValue(schema).asInstanceOf[OrcStruct])

for (schemaIndex <- 0 until schema.length) {
val fieldType = schema(schemaIndex).dataType
if (row.isNullAt(schemaIndex)) {
orcStruct.setFieldValue(schemaIndex, null)
private[this] def convertInternalRowToOrcStruct(row: InternalRow, schema: StructType) = {
val wrappers = schema.map(_.dataType).map(getWritableWrapper).toArray
val orcStruct = createOrcValue(schema).asInstanceOf[OrcStruct]

var i = 0
val length = schema.length
while (i < length) {
val fieldType = schema(i).dataType
if (row.isNullAt(i)) {
orcStruct.setFieldValue(i, null)
} else {
val field = row.get(schemaIndex, fieldType)
val fieldValue = wrappers(schemaIndex)(field).asInstanceOf[WritableComparable[_]]
orcStruct.setFieldValue(schemaIndex, fieldValue)
val field = row.get(i, fieldType)
val fieldValue = wrappers(i)(field).asInstanceOf[WritableComparable[_]]
orcStruct.setFieldValue(i, fieldValue)
}
i += 1
}
orcStruct
}
Expand Down