1717
1818package org .apache .spark .sql .parquet
1919
20- import java .io .IOException
20+ import java .io .IOException ,
2121
2222import org .apache .hadoop .conf .Configuration
2323import org .apache .hadoop .fs .{FileSystem , Path }
@@ -26,9 +26,10 @@ import org.apache.hadoop.mapreduce.Job
2626
2727import parquet .hadoop .util .ContextUtil
2828import parquet .hadoop .{ParquetOutputFormat , Footer , ParquetFileWriter , ParquetFileReader }
29+
2930import parquet .hadoop .metadata .{CompressionCodecName , FileMetaData , ParquetMetadata }
3031import parquet .io .api .{Binary , RecordConsumer }
31- import parquet .schema .{Type => ParquetType , PrimitiveType => ParquetPrimitiveType , MessageType , MessageTypeParser }
32+ import parquet .schema .{Type => ParquetType , PrimitiveType => ParquetPrimitiveType , MessageType , MessageTypeParser , GroupType => ParquetGroupType , OriginalType => ParquetOriginalType , ConversionPatterns }
3233import parquet .schema .PrimitiveType .{PrimitiveTypeName => ParquetPrimitiveTypeName }
3334import parquet .schema .Type .Repetition
3435
@@ -172,7 +173,7 @@ private[sql] object ParquetRelation {
172173}
173174
174175private [parquet] object ParquetTypesConverter {
175- def toDataType (parquetType : ParquetPrimitiveTypeName ): DataType = parquetType match {
176+ def toPrimitiveDataType (parquetType : ParquetPrimitiveTypeName ): DataType = parquetType match {
176177 // for now map binary to string type
177178 // TODO: figure out how Parquet uses strings or why we can't use them in a MessageType schema
178179 case ParquetPrimitiveTypeName .BINARY => StringType
@@ -190,15 +191,61 @@ private[parquet] object ParquetTypesConverter {
190191 s " Unsupported parquet datatype $parquetType" )
191192 }
192193
193- def fromDataType (ctype : DataType ): ParquetPrimitiveTypeName = ctype match {
194- case StringType => ParquetPrimitiveTypeName .BINARY
195- case BooleanType => ParquetPrimitiveTypeName .BOOLEAN
196- case DoubleType => ParquetPrimitiveTypeName .DOUBLE
197- case ArrayType (ByteType ) => ParquetPrimitiveTypeName .FIXED_LEN_BYTE_ARRAY
198- case FloatType => ParquetPrimitiveTypeName .FLOAT
199- case IntegerType => ParquetPrimitiveTypeName .INT32
200- case LongType => ParquetPrimitiveTypeName .INT64
201- case _ => sys.error(s " Unsupported datatype $ctype" )
194+ def toDataType (parquetType : ParquetType ): DataType = {
195+ if (parquetType.isPrimitive) toPrimitiveDataType(parquetType.asPrimitiveType.getPrimitiveTypeName)
196+ else {
197+ val groupType = parquetType.asGroupType()
198+ parquetType.getOriginalType match {
199+ case ParquetOriginalType .LIST | ParquetOriginalType .ENUM => {
200+ val fields = groupType.getFields.map(toDataType(_))
201+ new ArrayType (fields.apply(0 )) // array fields should have the same type
202+ }
203+ case _ => { // everything else nested becomes a Struct
204+ val fields = groupType
205+ .getFields
206+ .map(ptype => new StructField (
207+ ptype.getName,
208+ toDataType(ptype),
209+ ptype.getRepetition != Repetition .REQUIRED ))
210+ new StructType (fields)
211+ }
212+ }
213+ }
214+ }
215+
216+ def fromPrimitiveDataType (ctype : DataType ): Option [ParquetPrimitiveTypeName ] = ctype match {
217+ case StringType => Some (ParquetPrimitiveTypeName .BINARY )
218+ case BooleanType => Some (ParquetPrimitiveTypeName .BOOLEAN )
219+ case DoubleType => Some (ParquetPrimitiveTypeName .DOUBLE )
220+ case ArrayType (ByteType ) => Some (ParquetPrimitiveTypeName .FIXED_LEN_BYTE_ARRAY )
221+ case FloatType => Some (ParquetPrimitiveTypeName .FLOAT )
222+ case IntegerType => Some (ParquetPrimitiveTypeName .INT32 )
223+ case LongType => Some (ParquetPrimitiveTypeName .INT64 )
224+ case _ => None
225+ }
226+
227+ def fromComplexDataType (ctype : DataType , name : String , nullable : Boolean = true ): ParquetType = {
228+ val repetition =
229+ if (nullable) Repetition .OPTIONAL
230+ else Repetition .REQUIRED
231+ val primitiveType = fromPrimitiveDataType(ctype)
232+ if (primitiveType.isDefined) {
233+ new ParquetPrimitiveType (repetition, primitiveType.get, name)
234+ } else {
235+ ctype match {
236+ case ArrayType (elementType : DataType ) => {
237+ val parquetElementType = fromComplexDataType(elementType, name + " _values" , false )
238+ new ParquetGroupType (repetition, name, parquetElementType)
239+ }
240+ case StructType (structFields) => {
241+ val fields = structFields.map {
242+ field => fromComplexDataType(field.dataType, field.name, false )
243+ }
244+ new ParquetGroupType (repetition, name, fields)
245+ }
246+ case _ => sys.error(s " Unsupported datatype $ctype" )
247+ }
248+ }
202249 }
203250
204251 def consumeType (consumer : RecordConsumer , ctype : DataType , record : Row , index : Int ): Unit = {
@@ -217,23 +264,18 @@ private[parquet] object ParquetTypesConverter {
217264 }
218265 }
219266
220- def getSchema (schemaString : String ) : MessageType =
267+ def getSchema (schemaString : String ) : MessageType =
221268 MessageTypeParser .parseMessageType(schemaString)
222269
223- def convertToAttributes (parquetSchema : MessageType ) : Seq [Attribute ] = {
224- parquetSchema.getColumns.map {
225- case (desc) =>
226- val ctype = toDataType(desc.getType)
227- val name : String = desc.getPath.mkString(" ." )
228- new AttributeReference (name, ctype, false )()
229- }
270+ def convertToAttributes (parquetSchema : ParquetType ): Seq [Attribute ] = {
271+ parquetSchema
272+ .asGroupType()
273+ .getFields
274+ .map(field => new AttributeReference (field.getName, toDataType(field), field.getRepetition != Repetition .REQUIRED )())
230275 }
231276
232- // TODO: allow nesting?
233277 def convertFromAttributes (attributes : Seq [Attribute ]): MessageType = {
234- val fields : Seq [ParquetType ] = attributes.map {
235- a => new ParquetPrimitiveType (Repetition .OPTIONAL , fromDataType(a.dataType), a.name)
236- }
278+ val fields = attributes.map(attribute => fromComplexDataType(attribute.dataType, attribute.name, attribute.nullable))
237279 new MessageType (" root" , fields)
238280 }
239281
0 commit comments