Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
03c3bd9
Refactors Parquet read path to implement backwards-compatibility rules
liancheng Jul 5, 2015
0525346
Removes old Parquet record converters
liancheng Jul 5, 2015
a74fb2c
More comments
liancheng Jul 5, 2015
bcac49f
Removes the 16-byte restriction of decimals
liancheng Jul 5, 2015
6437d4b
Assembles requested schema from Parquet file schema
liancheng Jul 5, 2015
1781dff
Adds test case for SPARK-8811
liancheng Jul 5, 2015
7fb21f1
Reverts an unnecessary debugging change
liancheng Jul 5, 2015
38fe1e7
Adds explicit return type
liancheng Jul 6, 2015
802cbd7
Fixes bugs related to schema merging and empty requested columns
liancheng Jul 6, 2015
884d3e6
Fixes styling issue and reverts unnecessary changes
liancheng Jul 6, 2015
0cc1b37
Fixes MiMa checks
liancheng Jul 6, 2015
a099d3e
More comments
liancheng Jul 6, 2015
06cfe9d
Adds comments about TimestampType handling
liancheng Jul 6, 2015
13b9121
Adds ParquetAvroCompatibilitySuite
liancheng Jul 7, 2015
440f7b3
Adds generated files to .rat-excludes
liancheng Jul 7, 2015
1d390aa
Adds parquet-thrift compatibility test
liancheng Jul 7, 2015
f2208cd
Adds README.md for Thrift/Avro code generation
liancheng Jul 7, 2015
a8f13bb
Using Parquet writer API to do compatibility tests
liancheng Jul 7, 2015
3d7ab36
Fixes .rat-excludes
liancheng Jul 7, 2015
7946ee1
Fixes Scala styling issues
liancheng Jul 7, 2015
926af87
Simplifies Parquet compatibility test suites
liancheng Jul 8, 2015
598c3e8
Adds extra Maven repo for hadoop-lzo, which is a transitive dependenc…
liancheng Jul 8, 2015
b8c1295
Excludes the whole parquet package from MiMa
liancheng Jul 8, 2015
c6fbc06
Removes WIP file committed by mistake
liancheng Jul 8, 2015
360fe18
Adds ParquetHiveCompatibilitySuite
liancheng Jul 8, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Adds ParquetHiveCompatibilitySuite
  • Loading branch information
liancheng committed Jul 8, 2015
commit 360fe18a61538b03cac05da1c6d258e124df6feb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.{Row, SQLContext}
class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest {
import ParquetCompatibilityTest._

override def sqlContext: SQLContext = TestSQLContext
override val sqlContext: SQLContext = TestSQLContext

override protected def beforeAll(): Unit = {
super.beforeAll()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@ import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{QueryTest, SQLContext}
import org.apache.spark.sql.QueryTest
import org.apache.spark.util.Utils

abstract class ParquetCompatibilityTest extends QueryTest with ParquetTest with BeforeAndAfterAll {
override def sqlContext: SQLContext = TestSQLContext

protected var parquetStore: File = _

override protected def beforeAll(): Unit = {
Expand All @@ -45,10 +42,9 @@ abstract class ParquetCompatibilityTest extends QueryTest with ParquetTest with

def readParquetSchema(path: String): MessageType = {
val fsPath = new Path(path)
val footers =
ParquetFileReader.readAllFootersInParallel(
configuration, fsPath.getFileSystem(configuration).listStatus(fsPath).toSeq, true)

val fs = fsPath.getFileSystem(configuration)
val parquetFiles = fs.listStatus(fsPath).toSeq.filterNot(_.getPath.getName.startsWith("_"))
val footers = ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles, true)
footers.head.getParquetMetadata.getFileMetaData.getSchema
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.{Row, SQLContext}
class ParquetThriftCompatibilitySuite extends ParquetCompatibilityTest {
import ParquetCompatibilityTest._

override def sqlContext: SQLContext = TestSQLContext
override val sqlContext: SQLContext = TestSQLContext

override protected def beforeAll(): Unit = {
super.beforeAll()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.parquet.ParquetCompatibilityTest
import org.apache.spark.sql.{Row, SQLConf, SQLContext}

class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
import ParquetCompatibilityTest.makeNullable

override val sqlContext: SQLContext = TestHive

override protected def beforeAll(): Unit = {
super.beforeAll()

withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
withTempTable("data") {
sqlContext.sql(
s"""CREATE TABLE parquet_compat(
| bool_column BOOLEAN,
| byte_column TINYINT,
| short_column SMALLINT,
| int_column INT,
| long_column BIGINT,
| float_column FLOAT,
| double_column DOUBLE,
|
| strings_column ARRAY<STRING>,
| int_to_string_column MAP<INT, STRING>
|)
|STORED AS PARQUET
|LOCATION '${parquetStore.getCanonicalPath}'
""".stripMargin)

val schema = sqlContext.table("parquet_compat").schema
val rowRDD = sqlContext.sparkContext.parallelize(makeRows).coalesce(1)
sqlContext.createDataFrame(rowRDD, schema).registerTempTable("data")
sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM data")
}
}
}

override protected def afterAll(): Unit = {
sqlContext.sql("DROP TABLE parquet_compat")
}

test("Read Parquet file generated by parquet-hive") {
logInfo(
s"""Schema of the Parquet file written by parquet-hive:
|${readParquetSchema(parquetStore.getCanonicalPath)}
""".stripMargin)

// Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when writing strings.
// Have to assume all BINARY values are strings here.
withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), makeRows)
}
}

def makeRows: Seq[Row] = {
(0 until 10).map { i =>
def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)

Row(
nullable(i % 2 == 0: java.lang.Boolean),
nullable(i.toByte: java.lang.Byte),
nullable((i + 1).toShort: java.lang.Short),
nullable(i + 2: Integer),
nullable(i.toLong * 10: java.lang.Long),
nullable(i.toFloat + 0.1f: java.lang.Float),
nullable(i.toDouble + 0.2d: java.lang.Double),
nullable(Seq.tabulate(3)(n => s"arr_${i + n}")),
nullable(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap))
}
}
}