Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Avoid boxing for primitive types.
  • Loading branch information
dongjoon-hyun committed Nov 29, 2017
commit b3734957fffbe7636718d72e0ff75b5059dbcb7e
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ private[orc] class OrcDeserializer(

private[this] val mutableRow = new SpecificInternalRow(requiredSchema.map(_.dataType))

private[this] val valueWrappers = requiredSchema.fields.map(f => getValueWrapper(f.dataType))
private[this] val unwrappers = requiredSchema.fields.map(f => unwrapperFor(f.dataType))

def deserialize(writable: OrcStruct): InternalRow = {
convertOrcStructToInternalRow(writable, dataSchema, requiredSchema,
maybeMissingSchema, Some(valueWrappers), Some(mutableRow))
maybeMissingSchema, Some(unwrappers), Some(mutableRow))
}

/**
Expand All @@ -53,11 +53,11 @@ private[orc] class OrcDeserializer(
dataSchema: StructType,
requiredSchema: StructType,
missingSchema: Option[StructType] = None,
valueWrappers: Option[Seq[Any => Any]] = None,
valueUnwrappers: Option[Seq[(Any, InternalRow, Int) => Unit]] = None,
internalRow: Option[InternalRow] = None): InternalRow = {
val mutableRow = internalRow.getOrElse(new SpecificInternalRow(requiredSchema.map(_.dataType)))
val wrappers =
valueWrappers.getOrElse(requiredSchema.fields.map(_.dataType).map(getValueWrapper).toSeq)
val unwrappers =
valueUnwrappers.getOrElse(requiredSchema.fields.map(_.dataType).map(unwrapperFor).toSeq)
var i = 0
val len = requiredSchema.length
val names = orcStruct.getSchema.getFieldNames
Expand All @@ -75,7 +75,7 @@ private[orc] class OrcDeserializer(
if (writable == null) {
mutableRow.setNullAt(i)
} else {
mutableRow(i) = wrappers(i)(writable)
unwrappers(i)(writable, mutableRow, i)
}
i += 1
}
Expand All @@ -90,7 +90,7 @@ private[orc] class OrcDeserializer(
* Builds a catalyst-value return function ahead of time according to DataType
* to avoid pattern matching and branching costs per row.
*/
private[this] def getValueWrapper(dataType: DataType): Any => Any = dataType match {
private[this] def getValueUnwrapper(dataType: DataType): Any => Any = dataType match {
case NullType => _ => null

case BooleanType => withNullSafe(o => o.asInstanceOf[BooleanWritable].get)
Expand Down Expand Up @@ -138,7 +138,7 @@ private[orc] class OrcDeserializer(

case ArrayType(elementType, _) =>
withNullSafe { o =>
val wrapper = getValueWrapper(elementType)
val wrapper = getValueUnwrapper(elementType)
val data = new ArrayBuffer[Any]
o.asInstanceOf[OrcList[WritableComparable[_]]].asScala.foreach { x =>
data += wrapper(x)
Expand All @@ -148,8 +148,8 @@ private[orc] class OrcDeserializer(

case MapType(keyType, valueType, _) =>
withNullSafe { o =>
val keyWrapper = getValueWrapper(keyType)
val valueWrapper = getValueWrapper(valueType)
val keyWrapper = getValueUnwrapper(keyType)
val valueWrapper = getValueUnwrapper(valueType)
val map = new java.util.TreeMap[Any, Any]
o.asInstanceOf[OrcMap[WritableComparable[_], WritableComparable[_]]]
.entrySet().asScala.foreach { entry =>
Expand All @@ -159,9 +159,48 @@ private[orc] class OrcDeserializer(
}

case udt: UserDefinedType[_] =>
withNullSafe { o => getValueWrapper(udt.sqlType)(o) }
withNullSafe { o => getValueUnwrapper(udt.sqlType)(o) }

case _ =>
throw new UnsupportedOperationException(s"$dataType is not supported yet.")
}

private[this] def unwrapperFor(dataType: DataType): (Any, InternalRow, Int) => Unit =
dataType match {
case NullType =>
(value: Any, row: InternalRow, ordinal: Int) => row.setNullAt(ordinal)

case BooleanType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setBoolean(ordinal, value.asInstanceOf[BooleanWritable].get)

case ByteType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setByte(ordinal, value.asInstanceOf[ByteWritable].get)

case ShortType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setShort(ordinal, value.asInstanceOf[ShortWritable].get)

case IntegerType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setInt(ordinal, value.asInstanceOf[IntWritable].get)

case LongType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setLong(ordinal, value.asInstanceOf[LongWritable].get)

case FloatType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setFloat(ordinal, value.asInstanceOf[FloatWritable].get)

case DoubleType =>
(value: Any, row: InternalRow, ordinal: Int) =>
row.setDouble(ordinal, value.asInstanceOf[DoubleWritable].get)

case _ =>
val unwrapper = getValueUnwrapper(dataType)
(value: Any, row: InternalRow, ordinal: Int) =>
row(ordinal) = unwrapper(value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

case s: StructType =>
  val types = s.map(_.dataType)
  val unwrappers = types.map(unwrapperFor)
  val mutableRow = new SpecificInternalRow(types)
  (value: Any, row: InternalRow, ordinal: Int) => {
    row(ordinal) = mutableRow
    val orcStruct = value.asInstanceOf[OrcStruct]
    var i = 0
    whie (i < unwrappers.length) {
      val fieldValue = orcStruct. getFieldValue(s(i).name)
      if (fieldValue == null) {
        mutableRow.setNullAt(i)
      } else {
        unwrapper(, mutableRow, i)
      }
      i += 1
     }
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add array and map type here, then we can remove convertOrcStructToInternalRow and getValueUnwrapper

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops this approach doesn't work for array and map type. I think we should follow ParquetRowConverter and introduce a trait OrcDataUpdater, then we implement StructDataUpdater, ArrayDataUpdater and MapDataUpdater, and the returned function should be (Any, OrcDataUpdater, Int) => Unit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan . The current way is an old ORC way. Do we need to introduce the Parquet way for some performance reason?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, for example, if the mappings look like the following, do we need to refactor some of the pattern between Parquet and ORC?

  • ArrayDataUpdater <= ParquetArrayConverter
  • MapDataUpdater <= ParquetMapConverter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel Updater is more precise here, let's not follow the naming of parquet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not only for performance, but also remove duplicated code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ private[orc] object OrcFilters {
} yield filter

for {
// Combines all convertible filters using `And` to produce a single conjunction
conjunction <- convertibleFilters.reduceOption(org.apache.spark.sql.sources.And)
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean, even if each individual filter is convertible, the final filter(combine filters by And) may be un-convertible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your previous question was about line 40.

why call this function inside a loop? Can we put it at the beginning?

+    val convertibleFilters = for {
+      filter <- filters
+      _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())

Here. It seems you are asking another one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah you are just following the previous code:

    // First, tries to convert each filter individually to see whether it's convertible, and then
    // collect all convertible ones to build the final `SearchArgument`.
    val convertibleFilters = for {
      filter <- filters
      _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())
    } yield filter

    for {
      // Combines all convertible filters using `And` to produce a single conjunction
      conjunction <- convertibleFilters.reduceOption(And)
      // Then tries to build a single ORC `SearchArgument` for the conjunction predicate
      builder <- buildSearchArgument(dataTypeMap, conjunction, SearchArgumentFactory.newBuilder())
    } yield builder.build()

can you add back those comments?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

} yield builder.build()
}
Expand Down