Skip to content

Commit 5d80461

Browse files
fixing one problem with nested structs and breaking up files
1 parent 1b1b3d6 commit 5d80461

File tree

4 files changed

+341
-18
lines changed

4 files changed

+341
-18
lines changed
Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.parquet
19+
20+
import scala.collection.mutable.{Buffer, ArrayBuffer}
21+
22+
import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
23+
24+
import org.apache.spark.sql.catalyst.types._
25+
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Row, Attribute}
26+
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
27+
28+
object CatalystConverter {
29+
// The type internally used for fields
30+
type FieldType = StructField
31+
32+
// Note: repeated primitive fields that form an array (together with
33+
// their surrounding group) need to have this name in the schema
34+
// TODO: "values" is a generic name but without it the Parquet column path would
35+
// be incomplete and values may be silently dropped; better would be to give
36+
// primitive-type array elements a name of some sort
37+
val ARRAY_ELEMENTS_SCHEMA_NAME = "values"
38+
39+
protected[parquet] def createConverter(
40+
field: FieldType,
41+
fieldIndex: Int,
42+
parent: CatalystConverter): Converter = {
43+
val fieldType: DataType = field.dataType
44+
fieldType match {
45+
case ArrayType(elementType: DataType) => {
46+
elementType match {
47+
case StructType(fields) =>
48+
if (fields.size > 1) new CatalystGroupConverter(fields, fieldIndex, parent)
49+
else new CatalystArrayConverter(fields(0).dataType, fieldIndex, parent)
50+
case _ => new CatalystArrayConverter(elementType, fieldIndex, parent)
51+
}
52+
}
53+
case StructType(fields: Seq[StructField]) =>
54+
new CatalystGroupConverter(fields, fieldIndex, parent)
55+
case ctype: NativeType =>
56+
// note: for some reason matching for StringType fails so use this ugly if instead
57+
if (ctype == StringType) new CatalystPrimitiveStringConverter(parent, fieldIndex)
58+
else new CatalystPrimitiveConverter(parent, fieldIndex)
59+
case _ => throw new RuntimeException(
60+
s"unable to convert datatype ${field.dataType.toString} in CatalystGroupConverter")
61+
}
62+
}
63+
}
64+
65+
trait CatalystConverter {
66+
// the number of fields this group has
67+
protected[parquet] val size: Int
68+
69+
// the index of this converter in the parent
70+
protected[parquet] val index: Int
71+
72+
// the parent converter
73+
protected[parquet] val parent: CatalystConverter
74+
75+
// for child converters to update upstream values
76+
protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit
77+
78+
// TODO: in the future consider using specific methods to avoid autoboxing
79+
protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit =
80+
updateField(fieldIndex, value)
81+
82+
protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
83+
updateField(fieldIndex, value)
84+
85+
protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
86+
updateField(fieldIndex, value)
87+
88+
protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit =
89+
updateField(fieldIndex, value)
90+
91+
protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit =
92+
updateField(fieldIndex, value)
93+
94+
protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
95+
updateField(fieldIndex, value.getBytes)
96+
97+
protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit =
98+
updateField(fieldIndex, value.toStringUsingUTF8)
99+
100+
protected[parquet] def isRootConverter: Boolean = parent == null
101+
102+
protected[parquet] def clearBuffer(): Unit
103+
}
104+
105+
/**
106+
* A `parquet.io.api.GroupConverter` that is able to convert a Parquet record
107+
* to a [[org.apache.spark.sql.catalyst.expressions.Row]] object.
108+
*
109+
* @param schema The corresponding Catalyst schema in the form of a list of attributes.
110+
*/
111+
class CatalystGroupConverter(
112+
private[parquet] val schema: Seq[FieldType],
113+
protected[parquet] val index: Int,
114+
protected[parquet] val parent: CatalystConverter,
115+
protected[parquet] var current: ArrayBuffer[Any],
116+
protected[parquet] var buffer: ArrayBuffer[ArrayBuffer[Any]])
117+
extends GroupConverter with CatalystConverter {
118+
119+
def this(schema: Seq[FieldType], index: Int, parent: CatalystConverter) =
120+
this(
121+
schema,
122+
index,
123+
parent,
124+
current=null,
125+
buffer=new ArrayBuffer[ArrayBuffer[Any]](
126+
CatalystArrayConverter.INITIAL_ARRAY_SIZE))
127+
128+
// This constructor is used for the root converter only
129+
def this(attributes: Seq[Attribute]) =
130+
this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null)
131+
132+
protected [parquet] val converters: Array[Converter] =
133+
schema.map(field =>
134+
CatalystConverter.createConverter(field, schema.indexOf(field), this))
135+
.toArray
136+
137+
override val size = schema.size
138+
139+
// Should be only called in root group converter!
140+
def getCurrentRecord: Row = {
141+
assert(isRootConverter, "getCurrentRecord should only be called in root group converter!")
142+
new GenericRow {
143+
override val values: Array[Any] = current.toArray
144+
}
145+
}
146+
147+
override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
148+
149+
// for child converters to update upstream values
150+
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
151+
current.update(fieldIndex, value)
152+
}
153+
154+
override protected[parquet] def clearBuffer(): Unit = {
155+
// TODO: reuse buffer?
156+
buffer = new ArrayBuffer[ArrayBuffer[Any]](CatalystArrayConverter.INITIAL_ARRAY_SIZE)
157+
}
158+
159+
override def start(): Unit = {
160+
// TODO: reuse buffer?
161+
// Allocate new array in the root converter (others will be called clearBuffer() on)
162+
current = ArrayBuffer.fill(schema.length)(null)
163+
converters.foreach {
164+
converter => if (!converter.isPrimitive) {
165+
converter.asInstanceOf[CatalystConverter].clearBuffer
166+
}
167+
}
168+
}
169+
170+
// TODO: think about reusing the buffer
171+
override def end(): Unit = {
172+
if (!isRootConverter) {
173+
assert(current!=null) // there should be no empty groups
174+
buffer.append(current)
175+
parent.updateField(index, buffer)
176+
}
177+
}
178+
}
179+
180+
/**
181+
* A `parquet.io.api.PrimitiveConverter` that converts Parquet types to Catalyst types.
182+
*
183+
* @param parent The parent group converter.
184+
* @param fieldIndex The index inside the record.
185+
*/
186+
class CatalystPrimitiveConverter(
187+
parent: CatalystConverter,
188+
fieldIndex: Int) extends PrimitiveConverter {
189+
// TODO: consider refactoring these together with ParquetTypesConverter
190+
override def addBinary(value: Binary): Unit =
191+
parent.updateBinary(fieldIndex, value)
192+
193+
override def addBoolean(value: Boolean): Unit =
194+
parent.updateBoolean(fieldIndex, value)
195+
196+
override def addDouble(value: Double): Unit =
197+
parent.updateDouble(fieldIndex, value)
198+
199+
override def addFloat(value: Float): Unit =
200+
parent.updateFloat(fieldIndex, value)
201+
202+
override def addInt(value: Int): Unit =
203+
parent.updateInt(fieldIndex, value)
204+
205+
override def addLong(value: Long): Unit =
206+
parent.updateLong(fieldIndex, value)
207+
}
208+
209+
/**
210+
* A `parquet.io.api.PrimitiveConverter` that converts Parquet strings (fixed-length byte arrays)
211+
* into Catalyst Strings.
212+
*
213+
* @param parent The parent group converter.
214+
* @param fieldIndex The index inside the record.
215+
*/
216+
class CatalystPrimitiveStringConverter(
217+
parent: CatalystConverter,
218+
fieldIndex: Int)
219+
extends CatalystPrimitiveConverter(parent, fieldIndex) {
220+
override def addBinary(value: Binary): Unit =
221+
parent.updateString(fieldIndex, value)
222+
}
223+
224+
object CatalystArrayConverter {
225+
val INITIAL_ARRAY_SIZE = 20
226+
}
227+
228+
// this is for single-element groups of primitive or complex types
229+
// Note: AvroParquet only uses arrays for primitive types (?)
230+
class CatalystArrayConverter(
231+
val elementType: DataType,
232+
val index: Int,
233+
protected[parquet] val parent: CatalystConverter,
234+
protected[parquet] var buffer: Buffer[Any])
235+
extends GroupConverter with CatalystConverter {
236+
// TODO: In the future consider using native arrays instead of buffer for
237+
// primitive types for performance reasons
238+
239+
def this(elementType: DataType, index: Int, parent: CatalystConverter) =
240+
this(
241+
elementType,
242+
index,
243+
parent,
244+
new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE))
245+
246+
protected[parquet] val converter: Converter = CatalystConverter.createConverter(
247+
new CatalystConverter.FieldType(
248+
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
249+
elementType,
250+
false),
251+
fieldIndex=0,
252+
parent=this)
253+
254+
override def getConverter(fieldIndex: Int): Converter = converter
255+
256+
// arrays have only one (repeated) field, which is its elements
257+
override val size = 1
258+
259+
override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit ={
260+
buffer += value
261+
}
262+
263+
override protected[parquet] def clearBuffer(): Unit = {
264+
// TODO: reuse buffer?
265+
buffer = new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE)
266+
}
267+
268+
override def start(): Unit = {
269+
if (!converter.isPrimitive) {
270+
converter.asInstanceOf[CatalystConverter].clearBuffer
271+
}
272+
}
273+
274+
// TODO: think about reusing the buffer
275+
override def end(): Unit = {
276+
if (parent != null) parent.updateField(index, buffer)
277+
}
278+
}
279+
280+
// TODO: add MapConverter
281+
282+

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,11 @@ private[parquet] object ParquetTypesConverter {
228228
if (groupType.getFieldCount == 1) { // single field, either optional or required
229229
new StructType(fields)
230230
} else { // multi field repeated group, which we map into an array of structs
231+
if (parquetType.getRepetition == Repetition.REPEATED) {
231232
new ArrayType(StructType(fields))
233+
} else {
234+
new StructType(fields)
235+
}
232236
}
233237
}
234238
}
@@ -247,7 +251,11 @@ private[parquet] object ParquetTypesConverter {
247251
case _ => None
248252
}
249253

250-
def fromDataType(ctype: DataType, name: String, nullable: Boolean = true, inArray: Boolean = false): ParquetType = {
254+
def fromDataType(
255+
ctype: DataType,
256+
name: String,
257+
nullable: Boolean = true,
258+
inArray: Boolean = false): ParquetType = {
251259
val repetition =
252260
if (inArray) Repetition.REPEATED
253261
else {
@@ -262,16 +270,17 @@ private[parquet] object ParquetTypesConverter {
262270
case ArrayType(elementType: DataType) => {
263271
elementType match {
264272
case StructType(fields) => { // first case: array of structs
265-
val parquetFieldTypes = fields.map(f => fromDataType(f.dataType, f.name, f.nullable, false))
273+
val parquetFieldTypes = fields.map(
274+
f => fromDataType(f.dataType, f.name, f.nullable, false))
266275
new ParquetGroupType(repetition, name, ParquetOriginalType.LIST, parquetFieldTypes)
267276
//ConversionPatterns.listType(Repetition.REPEATED, name, parquetFieldTypes)
268277
}
269278
case _ => { // second case: array of primitive types
270-
// TODO: "values" is a generic name but without it the Parquet column path would
271-
// be incomplete and values may be silently dropped; better would be to give
272-
// Array elements a name of some sort (and specify whether they are nullable),
273-
// as in StructField
274-
val parquetElementType = fromDataType(elementType, "values", nullable=false, inArray=true)
279+
val parquetElementType = fromDataType(
280+
elementType,
281+
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
282+
nullable = false,
283+
inArray = true)
275284
ConversionPatterns.listType(repetition, name, parquetElementType)
276285
}
277286
}
@@ -281,14 +290,18 @@ private[parquet] object ParquetTypesConverter {
281290
val fields = structFields.map {
282291
field => fromDataType(field.dataType, field.name, field.nullable)
283292
}
284-
new ParquetGroupType(Repetition.REPEATED, name, fields)
293+
new ParquetGroupType(repetition, name, fields)
285294
}
286295
case _ => sys.error(s"Unsupported datatype $ctype")
287296
}
288297
}
289298
}
290299

291-
def consumeType(consumer: RecordConsumer, ctype: DataType, record: Row, index: Int): Unit = {
300+
def consumeType(
301+
consumer: RecordConsumer,
302+
ctype: DataType,
303+
record: Row,
304+
index: Int): Unit = {
292305
ctype match {
293306
case StringType => consumer.addBinary(
294307
Binary.fromByteArray(
@@ -311,11 +324,18 @@ private[parquet] object ParquetTypesConverter {
311324
parquetSchema
312325
.asGroupType()
313326
.getFields
314-
.map(field => new AttributeReference(field.getName, toDataType(field), field.getRepetition != Repetition.REQUIRED)())
327+
.map(
328+
field =>
329+
new AttributeReference(
330+
field.getName,
331+
toDataType(field),
332+
field.getRepetition != Repetition.REQUIRED)())
315333
}
316334

317335
def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
318-
val fields = attributes.map(attribute => fromDataType(attribute.dataType, attribute.name, attribute.nullable))
336+
val fields = attributes.map(
337+
attribute =>
338+
fromDataType(attribute.dataType, attribute.name, attribute.nullable))
319339
new MessageType("root", fields)
320340
}
321341

0 commit comments

Comments
 (0)