Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
aa688fe
Adding conversion of nested Parquet schemas
AndreSchumacher Mar 26, 2014
4d4892a
First commit nested Parquet read converters
AndreSchumacher Mar 27, 2014
6125c75
First working nested Parquet record input
AndreSchumacher Mar 27, 2014
745a42b
Completing testcase for nested data (Addressbook(
AndreSchumacher Apr 1, 2014
ddb40d2
Extending tests for nested Parquet data
AndreSchumacher Apr 1, 2014
1b1b3d6
Fixing one problem with nested arrays
AndreSchumacher Apr 2, 2014
5d80461
fixing one problem with nested structs and breaking up files
AndreSchumacher Apr 2, 2014
98219cf
added struct converter
AndreSchumacher Apr 2, 2014
ee70125
fixing one problem with arrayconverter
AndreSchumacher Apr 3, 2014
b7fcc35
Documenting conversions, bugfix, wrappers of Rows
AndreSchumacher Apr 4, 2014
6dbc9b7
Fixing some problems intruduced during rebase
AndreSchumacher Apr 6, 2014
f8f8911
For primitive rows fall back to more efficient converter, code reorg
AndreSchumacher Apr 6, 2014
4e25fcb
Adding resolution of complex ArrayTypes
AndreSchumacher Apr 8, 2014
a594aed
Scalastyle
AndreSchumacher Apr 8, 2014
b539fde
First commit for MapType
AndreSchumacher Apr 11, 2014
824500c
Adding attribute resolution for MapType
AndreSchumacher Apr 11, 2014
f777b4b
Scalastyle
AndreSchumacher Apr 11, 2014
d1911dc
Simplifying ArrayType conversion
AndreSchumacher Apr 12, 2014
1dc5ac9
First version of WriteSupport for nested types
AndreSchumacher Apr 12, 2014
e99cc51
Fixing nested WriteSupport and adding tests
AndreSchumacher Apr 13, 2014
adc1258
Optimizing imports
AndreSchumacher Apr 13, 2014
f466ff0
Added ParquetAvro tests and revised Array conversion
AndreSchumacher Apr 13, 2014
79d81d5
Replacing field names for array and map in WriteSupport
AndreSchumacher Apr 13, 2014
619c397
Completing Map testcase
AndreSchumacher Apr 14, 2014
c52ff2c
Adding native-array converter
AndreSchumacher Apr 19, 2014
431f00f
Fixing problems introduced during rebase
AndreSchumacher Apr 19, 2014
a6b4f05
Cleaning up ArrayConverter, moving classTag to NativeType, adding Nat…
AndreSchumacher Apr 20, 2014
0ae9376
Doc strings and simplifying ParquetConverter.scala
AndreSchumacher May 10, 2014
32229c7
Removing Row nested values and placing by generic types
AndreSchumacher May 11, 2014
cbb5793
Code review feedback
AndreSchumacher May 11, 2014
191bc0d
Changing to Seq for ArrayType, refactoring SQLParser for nested field…
AndreSchumacher May 24, 2014
2f5a805
Removing stripMargin from test schemas
AndreSchumacher May 24, 2014
de02538
Cleaning up ParquetTestData
AndreSchumacher May 24, 2014
31465d6
Scalastyle: fixing commented out bottom
AndreSchumacher May 24, 2014
3c6b25f
Trying to reduce no-op changes wrt master
AndreSchumacher Jun 1, 2014
3104886
Nested Rows should be Rows, not Seqs.
marmbrus Jun 3, 2014
f7aeba3
[SPARK-1982] Support for ByteType and ShortType.
marmbrus Jun 3, 2014
3e1456c
WIP: Directly serialize catalyst attributes.
marmbrus Jun 4, 2014
14c3fd8
Attempting to fix Spark-Parquet schema conversion
AndreSchumacher Jun 4, 2014
37e0a0a
Cleaning up
AndreSchumacher Jun 4, 2014
88e6bdb
Attempting to fix loss of schema
AndreSchumacher Jun 4, 2014
63d1b57
Cleaning up and Scalastyle
AndreSchumacher Jun 8, 2014
b8a8b9a
More fixes to short and byte conversion
AndreSchumacher Jun 8, 2014
403061f
Fixing some issues with tests and schema metadata
AndreSchumacher Jun 8, 2014
94eea3a
Scalastyle
AndreSchumacher Jun 8, 2014
7eceb67
Review feedback
AndreSchumacher Jun 19, 2014
95c1367
Changes to ParquetRelation and its metadata
AndreSchumacher Jun 19, 2014
30708c8
Taking out AvroParquet test for now to remove Avro dependency
AndreSchumacher Jun 20, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Simplifying ArrayType conversion
  • Loading branch information
AndreSchumacher committed Jun 19, 2014
commit d1911dcc19a7a3c5a83823a4938f1ce0bbe3efbd
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
.findAllIn(field)
.matchData
.map(_.group(2))
// TODO: we should recover the JVM type of valueType to match the
// TODO: we should recover the JVM type of keyType to match the
// actual type of the key?! should we restrict ourselves to NativeType?
(ordinals.foldLeft(exp)((v1: Expression, v2: String) =>
GetItem(v1, Literal(v2, keyType))), valueType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@ private[parquet] object CatalystConverter {
// The type internally used for fields
type FieldType = StructField

// Note: repeated primitive fields that form an array (together with
// their surrounding group) need to have this name in the schema
// TODO: "values" is a generic name but without it the Parquet column path would
// be incomplete and values may be silently dropped; better would be to give
// primitive-type array elements a name of some sort
// This is mostly Parquet convention (see, e.g., `ConversionPatterns`)
val ARRAY_ELEMENTS_SCHEMA_NAME = "values"
val MAP_KEY_SCHEMA_NAME = "key"
val MAP_VALUE_SCHEMA_NAME = "value"
val MAP_SCHEMA_NAME = "map"

protected[parquet] def createConverter(
field: FieldType,
Expand All @@ -46,21 +45,16 @@ private[parquet] object CatalystConverter {
val fieldType: DataType = field.dataType
fieldType match {
case ArrayType(elementType: DataType) => {
elementType match {
case StructType(fields) =>
if (fields.size > 1) new CatalystGroupConverter(fields, fieldIndex, parent)
else new CatalystArrayConverter(fields(0).dataType, fieldIndex, parent)
case _ => new CatalystArrayConverter(elementType, fieldIndex, parent)
}
new CatalystArrayConverter(elementType, fieldIndex, parent)
}
case StructType(fields: Seq[StructField]) => {
new CatalystStructConverter(fields, fieldIndex, parent)
}
case MapType(keyType: DataType, valueType: DataType) => {
new CatalystMapConverter(
Seq(
new FieldType("key", keyType, false),
new FieldType("value", valueType, true)),
new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false),
new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, true)),
fieldIndex,
parent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@ private[sql] object ParquetTestData {
|optional group ownerPhoneNumbers {
|repeated binary values;
|}
|repeated group contacts {
|required binary name;
|optional binary phoneNumber;
|optional group contacts {
|repeated group values {
|required binary name;
|optional binary phoneNumber;
|}
|}
|}
""".stripMargin
Expand All @@ -139,9 +141,11 @@ private[sql] object ParquetTestData {
|optional group longs {
|repeated int64 values;
|}
|repeated group entries {
|required double value;
|optional boolean truth;
|required group entries {
|repeated group values {
|required double value;
|optional boolean truth;
|}
|}
|optional group outerouter {
|repeated group values {
Expand All @@ -156,12 +160,16 @@ private[sql] object ParquetTestData {
val testNestedSchema3 =
"""
|message TestNested3 {
|required int32 x;
|repeated group booleanNumberPairs {
|required int32 key;
|repeated group value {
|required double nestedValue;
|optional boolean truth;
|required int32 x;
|optional group booleanNumberPairs {
|repeated group values {
|required int32 key;
|optional group value {
|repeated group values {
|required double nestedValue;
|optional boolean truth;
|}
|}
|}
|}
|}
Expand Down Expand Up @@ -268,12 +276,11 @@ private[sql] object ParquetTestData {
.append("values", "555 123 4567")
.append("values", "555 666 1337")
.append("values", "XXX XXX XXXX")
r1.addGroup(2)
// .addGroup(0)
val contacts = r1.addGroup(2)
contacts.addGroup(0)
.append("name", "Dmitriy Ryaboy")
.append("phoneNumber", "555 987 6543")
r1.addGroup(2)
// .addGroup(0)
contacts.addGroup(0)
.append("name", "Chris Aniszczyk")

val r2 = new SimpleGroup(schema)
Expand All @@ -298,9 +305,9 @@ private[sql] object ParquetTestData {
longs.add(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME , 1.toLong << 32)
longs.add(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 1.toLong << 33)
longs.add(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 1.toLong << 34)
val booleanNumberPairs = r1.addGroup(3)
booleanNumberPairs.add("value", 2.5)
booleanNumberPairs.add("truth", false)
val booleanNumberPair = r1.addGroup(3).addGroup(0)
booleanNumberPair.add("value", 2.5)
booleanNumberPair.add("truth", false)
val top_level = r1.addGroup(4)
val second_level_a = top_level.addGroup(0)
val second_level_b = top_level.addGroup(0)
Expand Down Expand Up @@ -330,17 +337,20 @@ private[sql] object ParquetTestData {

val r1 = new SimpleGroup(schema)
r1.add(0, 1)
val g1 = r1.addGroup(1)
val booleanNumberPairs = r1.addGroup(1)
val g1 = booleanNumberPairs.addGroup(0)
g1.add(0, 1)
val ng1 = g1.addGroup(1)
val nested1 = g1.addGroup(1)
val ng1 = nested1.addGroup(0)
ng1.add(0, 1.5)
ng1.add(1, false)
val ng2 = g1.addGroup(1)
val ng2 = nested1.addGroup(0)
ng2.add(0, 2.5)
ng2.add(1, true)
val g2 = r1.addGroup(1)
val g2 = booleanNumberPairs.addGroup(0)
g2.add(0, 2)
val ng3 = g2.addGroup(1)
.addGroup(0)
ng3.add(0, 3.5)
ng3.add(1, false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,15 @@ private[parquet] object ParquetTypesConverter {
* <ul>
* <li> Primitive types are converter to the corresponding primitive type.</li>
* <li> Group types that have a single field that is itself a group, which has repetition
* level `REPEATED` and two fields (named `key` and `value`), are converted to
* a [[MapType]] with the corresponding key and value (value possibly complex)
* as element type.</li>
* <li> Other group types are converted as follows:<ul>
* <li> Group types that have a single field with repetition `REPEATED` or themselves
* have repetition level `REPEATED` are converted to an [[ArrayType]] with the
* corresponding field type (possibly primitive) as element type.</li>
* <li> Other groups with a single field are converted into a [[StructType]] with
* the corresponding field type.</li>
* <li> If groups have more than one field and repetition level `REPEATED` they are
* converted into an [[ArrayType]] with the corresponding [[StructType]] as complex
* element type.</li>
* <li> Otherwise they are converted into a [[StructType]] with the corresponding
* field types.</li></ul></li>
* level `REPEATED` are treated as follows:<ul>
* <li> If the nested group has name `values` and repetition level `REPEATED`, the
* surrounding group is converted into an [[ArrayType]] with the
* corresponding field type (primitive or complex) as element type.</li>
* <li> If the nested group has name `map`, repetition level `REPEATED` and two fields
* (named `key` and `value`), the surrounding group is converted into a [[MapType]]
* with the corresponding key and value (value possibly complex) types.</li>
* <li> Other group types are converted into a [[StructType]] with the corresponding
* field types.</li></ul></li>
* </ul>
* Note that fields are determined to be `nullable` if and only if their Parquet repetition
* level is not `REQUIRED`.
Expand All @@ -93,15 +88,16 @@ private[parquet] object ParquetTypesConverter {
// This mostly follows the convention in ``parquet.schema.ConversionPatterns``
val keyValueGroup = groupType.getFields.apply(0).asGroupType()
keyValueGroup.getRepetition == Repetition.REPEATED &&
keyValueGroup.getName == "map" &&
keyValueGroup.getFields.apply(0).getName == "key" &&
keyValueGroup.getFields.apply(1).getName == "value"
keyValueGroup.getName == CatalystConverter.MAP_SCHEMA_NAME &&
keyValueGroup.getFieldCount == 2 &&
keyValueGroup.getFields.apply(0).getName == CatalystConverter.MAP_KEY_SCHEMA_NAME &&
keyValueGroup.getFields.apply(1).getName == CatalystConverter.MAP_VALUE_SCHEMA_NAME
}
}
def correspondsToArray(groupType: ParquetGroupType): Boolean = {
groupType.getFieldCount == 1 &&
(groupType.getFields.apply(0).getRepetition == Repetition.REPEATED ||
groupType.getRepetition == Repetition.REPEATED)
groupType.getFieldName(0) == CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME &&
groupType.getFields.apply(0).getRepetition == Repetition.REPEATED
}

if (parquetType.isPrimitive) {
Expand All @@ -112,17 +108,9 @@ private[parquet] object ParquetTypesConverter {
// if the schema was constructed programmatically there may be hints how to convert
// it inside the metadata via the OriginalType field
case ParquetOriginalType.LIST => { // TODO: check enums!
val fields = groupType.getFields.map {
field => new StructField(
field.getName,
toDataType(field),
field.getRepetition != Repetition.REQUIRED)
}
if (fields.size == 1) {
new ArrayType(fields.apply(0).dataType)
} else {
new ArrayType(StructType(fields))
}
assert(groupType.getFieldCount == 1)
val field = groupType.getFields.apply(0)
new ArrayType(toDataType(field))
}
case ParquetOriginalType.MAP => {
assert(
Expand Down Expand Up @@ -153,16 +141,7 @@ private[parquet] object ParquetTypesConverter {
ptype.getName,
toDataType(ptype),
ptype.getRepetition != Repetition.REQUIRED))

if (groupType.getFieldCount == 1) {
new StructType(fields)
} else {
if (parquetType.getRepetition == Repetition.REPEATED) {
new ArrayType(StructType(fields))
} else {
new StructType(fields)
}
}
new StructType(fields)
}
}
}
Expand Down Expand Up @@ -199,17 +178,17 @@ private[parquet] object ParquetTypesConverter {
* <li> Primitive types are converted into Parquet's primitive types.</li>
* <li> [[org.apache.spark.sql.catalyst.types.StructType]]s are converted
* into Parquet's `GroupType` with the corresponding field types.</li>
* <li> [[org.apache.spark.sql.catalyst.types.ArrayType]]s are converterd
* into a 2-level nested group, where the outer group has the inner
* group as sole field. The inner group has name `values` and
* repetition level `REPEATED` and has the element type of
* the array as schema. We use Parquet's `ConversionPatterns` for this
* purpose.</li>
* <li> [[org.apache.spark.sql.catalyst.types.MapType]]s are converted
* into a nested (2-level) Parquet `GroupType` with two fields: a key type and
* a value type. The nested group has repetition level `REPEATED`.</li>
* <li> [[org.apache.spark.sql.catalyst.types.ArrayType]]s are handled as follows:<ul>
* <li> If their element is complex, that is of type
* [[org.apache.spark.sql.catalyst.types.StructType]], they are converted
* into a `GroupType` with the corresponding field types of the struct and
* original type of the `GroupType` is set to `LIST`.</li>
* <li> Otherwise, that is they contain a primitive they are converted into a `GroupType`
* that is also a list but has only a single field of the type corresponding to
* the element type.</li></ul></li>
* into a nested (2-level) Parquet `GroupType` with two fields: a key
* type and a value type. The nested group has repetition level
* `REPEATED` and name `map`. We use Parquet's `ConversionPatterns`
* for this purpose</li>
* </ul>
* Parquet's repetition level is generally set according to the following rule:
* <ul>
Expand All @@ -218,11 +197,8 @@ private[parquet] object ParquetTypesConverter {
* <li> Otherwise, if the attribute whose type is converted is `nullable`, the Parquet
* type gets repetition level `OPTIONAL` and otherwise `REQUIRED`.</li>
* </ul>
* The single exception to this rule is an [[org.apache.spark.sql.catalyst.types.ArrayType]]
* that contains a [[org.apache.spark.sql.catalyst.types.StructType]], whose repetition level
* is always set to `REPEATED`.
*
* @param ctype The type to convert.
*@param ctype The type to convert
* @param name The name of the [[org.apache.spark.sql.catalyst.expressions.Attribute]]
* whose type is converted
* @param nullable When true indicates that the attribute is nullable
Expand All @@ -245,43 +221,38 @@ private[parquet] object ParquetTypesConverter {
new ParquetPrimitiveType(repetition, primitiveType.get, name)
} else {
ctype match {
case ArrayType(elementType: DataType) => {
elementType match {
case StructType(fields) => { // first case: array of structs
val parquetFieldTypes = fields.map(
f => fromDataType(f.dataType, f.name, f.nullable, inArray = false))
assert(
fields.size > 1,
"Found struct inside array with a single field.. error parsing Catalyst schema")
new ParquetGroupType(
Repetition.REPEATED,
name,
ParquetOriginalType.LIST,
parquetFieldTypes)
}
case _ => { // second case: array of primitive types
val parquetElementType = fromDataType(
elementType,
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
nullable = false,
inArray = true)
ConversionPatterns.listType(repetition, name, parquetElementType)
}
}
case ArrayType(elementType) => {
val parquetElementType = fromDataType(
elementType,
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
nullable = false,
inArray = true)
ConversionPatterns.listType(repetition, name, parquetElementType)
}
// TODO: test structs inside arrays
case StructType(structFields) => {
val fields = structFields.map {
field => fromDataType(field.dataType, field.name, field.nullable, inArray = false)
}
new ParquetGroupType(repetition, name, fields)
}
case MapType(keyType, valueType) => {
val parquetKeyType =
fromDataType(
keyType,
CatalystConverter.MAP_KEY_SCHEMA_NAME,
false,
inArray = false)
val parquetValueType =
fromDataType(
valueType,
CatalystConverter.MAP_VALUE_SCHEMA_NAME,
true,
inArray = false)
ConversionPatterns.mapType(
repetition,
name,
fromDataType(keyType, "key", false, inArray = false),
fromDataType(valueType, "value", true, inArray = false))
parquetKeyType,
parquetValueType)
}
case _ => sys.error(s"Unsupported datatype $ctype")
}
Expand Down