Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
aa688fe
Adding conversion of nested Parquet schemas
AndreSchumacher Mar 26, 2014
4d4892a
First commit nested Parquet read converters
AndreSchumacher Mar 27, 2014
6125c75
First working nested Parquet record input
AndreSchumacher Mar 27, 2014
745a42b
Completing testcase for nested data (Addressbook(
AndreSchumacher Apr 1, 2014
ddb40d2
Extending tests for nested Parquet data
AndreSchumacher Apr 1, 2014
1b1b3d6
Fixing one problem with nested arrays
AndreSchumacher Apr 2, 2014
5d80461
fixing one problem with nested structs and breaking up files
AndreSchumacher Apr 2, 2014
98219cf
added struct converter
AndreSchumacher Apr 2, 2014
ee70125
fixing one problem with arrayconverter
AndreSchumacher Apr 3, 2014
b7fcc35
Documenting conversions, bugfix, wrappers of Rows
AndreSchumacher Apr 4, 2014
6dbc9b7
Fixing some problems intruduced during rebase
AndreSchumacher Apr 6, 2014
f8f8911
For primitive rows fall back to more efficient converter, code reorg
AndreSchumacher Apr 6, 2014
4e25fcb
Adding resolution of complex ArrayTypes
AndreSchumacher Apr 8, 2014
a594aed
Scalastyle
AndreSchumacher Apr 8, 2014
b539fde
First commit for MapType
AndreSchumacher Apr 11, 2014
824500c
Adding attribute resolution for MapType
AndreSchumacher Apr 11, 2014
f777b4b
Scalastyle
AndreSchumacher Apr 11, 2014
d1911dc
Simplifying ArrayType conversion
AndreSchumacher Apr 12, 2014
1dc5ac9
First version of WriteSupport for nested types
AndreSchumacher Apr 12, 2014
e99cc51
Fixing nested WriteSupport and adding tests
AndreSchumacher Apr 13, 2014
adc1258
Optimizing imports
AndreSchumacher Apr 13, 2014
f466ff0
Added ParquetAvro tests and revised Array conversion
AndreSchumacher Apr 13, 2014
79d81d5
Replacing field names for array and map in WriteSupport
AndreSchumacher Apr 13, 2014
619c397
Completing Map testcase
AndreSchumacher Apr 14, 2014
c52ff2c
Adding native-array converter
AndreSchumacher Apr 19, 2014
431f00f
Fixing problems introduced during rebase
AndreSchumacher Apr 19, 2014
a6b4f05
Cleaning up ArrayConverter, moving classTag to NativeType, adding Nat…
AndreSchumacher Apr 20, 2014
0ae9376
Doc strings and simplifying ParquetConverter.scala
AndreSchumacher May 10, 2014
32229c7
Removing Row nested values and placing by generic types
AndreSchumacher May 11, 2014
cbb5793
Code review feedback
AndreSchumacher May 11, 2014
191bc0d
Changing to Seq for ArrayType, refactoring SQLParser for nested field…
AndreSchumacher May 24, 2014
2f5a805
Removing stripMargin from test schemas
AndreSchumacher May 24, 2014
de02538
Cleaning up ParquetTestData
AndreSchumacher May 24, 2014
31465d6
Scalastyle: fixing commented out bottom
AndreSchumacher May 24, 2014
3c6b25f
Trying to reduce no-op changes wrt master
AndreSchumacher Jun 1, 2014
3104886
Nested Rows should be Rows, not Seqs.
marmbrus Jun 3, 2014
f7aeba3
[SPARK-1982] Support for ByteType and ShortType.
marmbrus Jun 3, 2014
3e1456c
WIP: Directly serialize catalyst attributes.
marmbrus Jun 4, 2014
14c3fd8
Attempting to fix Spark-Parquet schema conversion
AndreSchumacher Jun 4, 2014
37e0a0a
Cleaning up
AndreSchumacher Jun 4, 2014
88e6bdb
Attempting to fix loss of schema
AndreSchumacher Jun 4, 2014
63d1b57
Cleaning up and Scalastyle
AndreSchumacher Jun 8, 2014
b8a8b9a
More fixes to short and byte conversion
AndreSchumacher Jun 8, 2014
403061f
Fixing some issues with tests and schema metadata
AndreSchumacher Jun 8, 2014
94eea3a
Scalastyle
AndreSchumacher Jun 8, 2014
7eceb67
Review feedback
AndreSchumacher Jun 19, 2014
95c1367
Changes to ParquetRelation and its metadata
AndreSchumacher Jun 19, 2014
30708c8
Taking out AvroParquet test for now to remove Avro dependency
AndreSchumacher Jun 20, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adding resolution of complex ArrayTypes
  • Loading branch information
AndreSchumacher committed Jun 19, 2014
commit 4e25fcb420088b86e8f7cc7668b4d98d01c2fb4d
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
| failure("illegal character")
)

override def identChar = letter | elem('.') | elem('_')
override def identChar = letter | elem('.') | elem('_') | elem('[') | elem(']')

override def whitespace: Parser[Any] = rep(
whitespaceChar
Expand Down Expand Up @@ -390,6 +390,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
FALSE ^^^ Literal(false, BooleanType) |
cast |
"(" ~> expression <~ ")" |
"[" ~> literal <~ "]" |
function |
"-" ~> literal ^^ UnaryMinus |
ident ^^ UnresolvedAttribute |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.catalyst.types.{DataType, ArrayType, StructType}
import org.apache.spark.sql.catalyst.trees

abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
Expand Down Expand Up @@ -54,9 +54,41 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
/**
* Optionally resolves the given string to a
* [[catalyst.expressions.NamedExpression NamedExpression]]. The attribute is expressed as
* as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
* as string in the following form: `[scope].AttributeName.[nested].[fields]...`. Fields
* can contain ordinal expressions, such as `field[i][j][k]...`.
*/
def resolve(name: String): Option[NamedExpression] = {
def expandFunc(expType: (Expression, DataType), field: String): (Expression, DataType) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of #518 instead of changing the resolver? I am not a parser expert, but I think this is closer to the way hive (and probably optiq which we hope to use eventually) work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I will have a look. One question: does this also handle maps and nested fields inside arrays, like struct.array1[1].field1.map1["key1"].array2[0]? I don't know Optiq but I will still check out how Hive does this. Since there was (is?) no support for nested Parquet types in Hive that may be a dead end though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, you are right. That case is probably not handled yet. However, I think the version here doesn't handle things like array[1 + 1]. So for that reason I think we really need to do all the parsing in the parser. I'd propose we defer the parsing problem for now. If we need to do really complex things for unit tests we can hand construct the necessary expressions.

val (exp, t) = expType
val ordinalRegExp = """(\[(\d+)\])""".r
val fieldName = if (field.matches("\\w*(\\[\\d\\])+")) {
field.substring(0, field.indexOf("["))
} else {
field
}
t match {
case ArrayType(elementType) =>
val ordinals = ordinalRegExp.findAllIn(field).matchData.map(_.group(2))
(ordinals.foldLeft(exp)((v1: Expression, v2: String) => GetItem(v1, Literal(v2.toInt))), elementType)
case StructType(fields) =>
// Note: this only works if we are not on the top-level!
val structField = fields.find(_.name == fieldName)
if (!structField.isDefined) {
throw new TreeNodeException(
this, s"Trying to resolve Attribute but field ${fieldName} is not defined")
}
structField.get.dataType match {
case ArrayType(elementType) =>
val ordinals = ordinalRegExp.findAllIn(field).matchData.map(_.group(2))
(ordinals.foldLeft(GetField(exp, fieldName).asInstanceOf[Expression])((v1: Expression, v2: String) => GetItem(v1, Literal(v2.toInt))), elementType)
case _ =>
(GetField(exp, fieldName), structField.get.dataType)
}
case _ =>
expType
}
}

val parts = name.split("\\.")
// Collect all attributes that are output by this nodes children where either the first part
// matches the name or where the first part matches the scope and the second part matches the
Expand All @@ -67,16 +99,40 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
val remainingParts =
if (option.qualifiers.contains(parts.head) && parts.size > 1) parts.drop(1) else parts
if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil
// TODO from rebase!
/*val remainingParts = if (option.qualifiers contains parts.head) parts.drop(1) else parts
val relevantRemaining =
if (remainingParts.head.matches("\\w*\\[(\\d+)\\]")) { // array field name
remainingParts.head.substring(0, remainingParts.head.indexOf("["))
} else {
remainingParts.head
}
if (option.name == relevantRemaining) (option, remainingParts.tail.toList) :: Nil else Nil*/
}

options.distinct match {
case (a, Nil) :: Nil => Some(a) // One match, no nested fields, use it.
case (a, Nil) :: Nil => {
a.dataType match {
case ArrayType(elementType) =>
val expression = expandFunc((a: Expression, a.dataType), name)._1
Some(Alias(expression, name)())
case _ => Some(a)
}
} // One match, no nested fields, use it.
// One match, but we also need to extract the requested nested field.
case (a, nestedFields) :: Nil =>
a.dataType match {
case StructType(fields) =>
Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
case _ => None // Don't know how to resolve these field references
// this is compatibility reasons with earlier code! TODO: why only nestedFields and not parts?
if ((parts(0) :: nestedFields).forall(!_.matches("\\w*\\[\\d+\\]+"))) { // not nested arrays, only fields
Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
} else {
val expression = parts.foldLeft((a: Expression, a.dataType))(expandFunc)._1
Some(Alias(expression, nestedFields.last)())
}
case _ =>
val expression = parts.foldLeft((a: Expression, a.dataType))(expandFunc)._1
Some(Alias(expression, nestedFields.last)())
}
case Nil => None // No matches.
case ambiguousReferences =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@ abstract class DataType {
case e: Expression if e.dataType == this => true
case _ => false
}

def isPrimitive(): Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No ().

}

case object NullType extends DataType

trait PrimitiveType
trait PrimitiveType extends DataType {
override def isPrimitive() = true
}

abstract class NativeType extends DataType {
type JvmType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private[sql] object ParquetTestData {
|optional group longs {
|repeated int64 values;
|}
|required group booleanNumberPairs {
|repeated group entries {
|required double value;
|optional boolean truth;
|}
Expand All @@ -153,8 +153,23 @@ private[sql] object ParquetTestData {
|}
""".stripMargin

val testNestedSchema3 =
"""
|message TestNested3 {
|required int32 x;
|repeated group booleanNumberPairs {
|required int32 key;
|repeated group value {
|required double nestedValue;
|optional boolean truth;
|}
|}
|}
""".stripMargin

val testNestedDir1 = Utils.createTempDir()
val testNestedDir2 = Utils.createTempDir()
val testNestedDir3 = Utils.createTempDir()

lazy val testNestedData1 = new ParquetRelation(testNestedDir1.toURI.toString)
lazy val testNestedData2 = new ParquetRelation(testNestedDir2.toURI.toString)
Expand Down Expand Up @@ -285,6 +300,32 @@ private[sql] object ParquetTestData {
writer.close()
}

def writeNestedFile3() {
testNestedDir3.delete()
val path: Path = testNestedDir3
val schema: MessageType = MessageTypeParser.parseMessageType(testNestedSchema3)

val r1 = new SimpleGroup(schema)
r1.add(0, 1)
val g1 = r1.addGroup(1)
g1.add(0, 1)
val ng1 = g1.addGroup(1)
ng1.add(0, 1.5)
ng1.add(1, false)
val ng2 = g1.addGroup(1)
ng2.add(0, 2.5)
ng2.add(1, true)
val g2 = r1.addGroup(1)
g2.add(0, 2)
val ng3 = g2.addGroup(1)
ng3.add(0, 3.5)
ng3.add(1, false)

val writeSupport = new TestGroupWriteSupport(schema)
val writer = new ParquetWriter[Group](path, writeSupport)
writer.write(r1)
writer.close()
}

def readNestedFile(path: File, schemaString: String): Unit = {
val configuration = new Configuration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.IntegerType
import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, DataType}
import org.apache.spark.sql.{parquet, SchemaRDD}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import scala.Tuple2
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute

// Implicits
import org.apache.spark.sql.test.TestSQLContext._
Expand Down Expand Up @@ -432,9 +437,9 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(result(0)(2)(0) === (1.toLong << 32))
assert(result(0)(2)(1) === (1.toLong << 33))
assert(result(0)(2)(2) === (1.toLong << 34))
assert(result(0)(3).size === 2)
assert(result(0)(3)(0) === 2.5)
assert(result(0)(3)(1) === false)
assert(result(0)(3)(0).size === 2)
assert(result(0)(3)(0)(0) === 2.5)
assert(result(0)(3)(0)(1) === false)
assert(result(0)(4).size === 2)
assert(result(0)(4)(0).size === 2)
assert(result(0)(4)(1).size === 1)
Expand All @@ -452,23 +457,61 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(tmp(0)(0) === "Julien Le Dem")
}

test("Projection in addressbook") {
implicit def anyToRow(value: Any): Row = value.asInstanceOf[Row]
val data = TestSQLContext
.parquetFile(ParquetTestData.testNestedDir1.toString)
.toSchemaRDD
data.registerAsTable("data")
val tmp = sql("SELECT owner, contacts[1].name FROM data").collect()
assert(tmp.size === 2)
assert(tmp(0).size === 2)
assert(tmp(0)(0) === "Julien Le Dem")
assert(tmp(0)(1) === "Chris Aniszczyk")
assert(tmp(1)(0) === "A. Nonymous")
assert(tmp(1)(1) === null)
}

test("Simple query on nested int data") {
implicit def anyToRow(value: Any): Row = value.asInstanceOf[Row]
val data = TestSQLContext
.parquetFile(ParquetTestData.testNestedDir2.toString)
.toSchemaRDD
data.registerAsTable("data")
val tmp = sql("SELECT booleanNumberPairs.value, booleanNumberPairs.truth FROM data").collect()
assert(tmp(0)(0) === 2.5)
assert(tmp(0)(1) === false)
val result = sql("SELECT outerouter FROM data").collect()
// TODO: why does this not work?
//val result = sql("SELECT outerouter.values FROM data").collect()
// TODO: .. or this:
// val result = sql("SELECT outerouter[0] FROM data").collect()
assert(result(0)(0)(0)(0)(0) === 7)
assert(result(0)(0)(0)(1)(0) === 8)
assert(result(0)(0)(1)(0)(0) === 9)
val result1 = sql("SELECT entries[0].value FROM data").collect()
assert(result1.size === 1)
assert(result1(0).size === 1)
assert(result1(0)(0) === 2.5)
val result2 = sql("SELECT entries[0] FROM data").collect()
assert(result2.size === 1)
assert(result2(0)(0).size === 2)
assert(result2(0)(0)(0) === 2.5)
assert(result2(0)(0)(1) === false)
val result3 = sql("SELECT outerouter FROM data").collect()
assert(result3(0)(0)(0)(0)(0) === 7)
assert(result3(0)(0)(0)(1)(0) === 8)
assert(result3(0)(0)(1)(0)(0) === 9)
}

test("nested structs") {
implicit def anyToRow(value: Any): Row = value.asInstanceOf[Row]
ParquetTestData.writeNestedFile3()
val data = TestSQLContext
.parquetFile(ParquetTestData.testNestedDir3.toString)
.toSchemaRDD
data.registerAsTable("data")
val result1 = sql("SELECT booleanNumberPairs[0].value[0].truth FROM data").collect()
assert(result1.size === 1)
assert(result1(0).size === 1)
assert(result1(0)(0) === false)
val result2 = sql("SELECT booleanNumberPairs[0].value[1].truth FROM data").collect()
assert(result2.size === 1)
assert(result2(0).size === 1)
assert(result2(0)(0) === true)
val result3 = sql("SELECT booleanNumberPairs[1].value[0].truth FROM data").collect()
assert(result3.size === 1)
assert(result3(0).size === 1)
assert(result3(0)(0) === false)
}

/**
Expand Down