Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
readContext: ReadContext): RecordMaterializer[InternalRow] = {
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")

val toCatalyst = new CatalystSchemaConverter(conf)
val toCatalyst = if (keyValueMetaData.containsKey("parquet.proto.class")) {
new ProtobufCatalystSchemaConverter(conf)
} else {
new CatalystSchemaConverter(conf)
}

val parquetRequestedSchema = readContext.getRequestedSchema

val catalystRequestedSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package org.apache.spark.sql.parquet
import java.math.{BigDecimal, BigInteger}
import java.nio.ByteOrder


import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.io.api._
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}

Expand Down Expand Up @@ -103,16 +103,22 @@ private[parquet] class CatalystRowConverter(
}.toArray
}

private val needsArrayReset =
fieldConverters.filter(converter => converter.isInstanceOf[NeedsResetArray])
override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)

override def end(): Unit = updater.set(currentRow)


override def start(): Unit = {
var i = 0
while (i < currentRow.numFields) {
currentRow.setNullAt(i)
i += 1
}
needsArrayReset.foreach {
converter => converter.asInstanceOf[NeedsResetArray].resetArray()
}
}

/**
Expand Down Expand Up @@ -171,8 +177,20 @@ private[parquet] class CatalystRowConverter(
}
}

case t: ArrayType =>
new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
case t: ArrayType => {
parquetType.isRepetition(Type.Repetition.REPEATED) match {
case true => {
t match {
case ArrayType(elementType: StructType, _) =>
new CatalystRepeatedStructConverter(parquetType.asGroupType(), elementType, updater)
case ArrayType(elementType: DataType, _) =>
new CatalystRepeatedPrimitiveConverter(parquetType, elementType, updater)
}
}
case false =>
new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
}
}

case t: MapType =>
new CatalystMapConverter(parquetType.asGroupType(), t, updater)
Expand Down Expand Up @@ -378,6 +396,66 @@ private[parquet] class CatalystRowConverter(
}
}

/**
* Support Protobuf native repeated. parquet-protobuf does a 1 - 1 conversion, i.e.,
* repeated int32 myInt;
* @param parquetType
* @param catalystType
* @param updater
*/
private final class CatalystRepeatedPrimitiveConverter(
parquetType: Type,
myCatalystType: DataType,
updater: ParentContainerUpdater)
extends PrimitiveConverter with NeedsResetArray {

private var elements: Int = 0
private var buffer: ArrayBuffer[Any] = ArrayBuffer.empty[Any]

private val stringConverter = new CatalystStringConverter(new ParentContainerUpdater {
override def set(value: Any): Unit = addValue(value)
})

override def hasDictionarySupport: Boolean = true

override def addValueFromDictionary(dictionaryId: Int): Unit =
stringConverter.addValueFromDictionary(dictionaryId)

override def setDictionary(dictionary: Dictionary): Unit =
stringConverter.setDictionary(dictionary)

private def addValue(value: Any): Unit = {
buffer+= value
elements +=1
updater.set(new GenericArrayData(buffer.slice(0, elements).toArray))
}

override def addBinary(value: Binary): Unit =
myCatalystType match {
case StringType => stringConverter.addBinary(value)
case _ => addValue(value)
}


override def addDouble(value: Double): Unit = addValue(value)

override def addInt(value: Int): Unit = {
addValue(value)
}

override def addBoolean(value: Boolean): Unit = addValue(value)

override def addFloat(value: Float): Unit = addValue(value)

override def addLong(value: Long): Unit = addValue(value)

override def resetArray(): Unit = {
buffer.clear()
elements = 0
}
}


/** Parquet converter for maps */
private final class CatalystMapConverter(
parquetType: GroupType,
Expand Down Expand Up @@ -446,4 +524,32 @@ private[parquet] class CatalystRowConverter(
}
}
}

trait NeedsResetArray {
def resetArray(): Unit
}

private class CatalystRepeatedStructConverter(
groupType: GroupType,
structType: StructType,
updater: ParentContainerUpdater)
extends CatalystRowConverter(groupType, structType, updater)
with NeedsResetArray {

private var rowBuffer: ArrayBuffer[Any] = ArrayBuffer.empty
private var elements = 0

override def end(): Unit = {
rowBuffer += currentRow.copy()
elements += 1
updater.set(new GenericArrayData(rowBuffer.slice(0, elements).toArray))
}

def resetArray(): Unit = {
rowBuffer.clear()
elements = 0
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import org.apache.spark.sql.{AnalysisException, SQLConf}
private[parquet] class CatalystSchemaConverter(
private val assumeBinaryIsString: Boolean,
private val assumeInt96IsTimestamp: Boolean,
private val followParquetFormatSpec: Boolean) {
private val followParquetFormatSpec: Boolean) extends SchemaConverter{

// Only used when constructing converter for converting Spark SQL schema to Parquet schema, in
// which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are irrelevant.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,22 @@ private[sql] object ParquetRelation extends Logging {
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
followParquetFormatSpec = followParquetFormatSpec)

footers.map { footer =>
ParquetRelation.readSchemaFromFooter(footer, converter)
val protobufConverter =
new ProtobufCatalystSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
followParquetFormatSpec = followParquetFormatSpec)

footers.map { footer => {
val myConverter =
if (footer.getParquetMetadata.getFileMetaData.
getKeyValueMetaData.containsKey("parquet.proto.class")) {
protobufConverter
} else {
converter
}
ParquetRelation.readSchemaFromFooter(footer, myConverter)
}
}.reduceOption(_ merge _).iterator
}.collect()

Expand All @@ -719,7 +733,7 @@ private[sql] object ParquetRelation extends Logging {
* a [[StructType]] converted from the [[MessageType]] stored in this footer.
*/
def readSchemaFromFooter(
footer: Footer, converter: CatalystSchemaConverter): StructType = {
footer: Footer, converter: SchemaConverter): StructType = {
val fileMetaData = footer.getParquetMetadata.getFileMetaData
fileMetaData
.getKeyValueMetaData
Expand Down
Loading