Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update test cases.
  • Loading branch information
dongjoon-hyun committed Oct 2, 2017
commit 452a00376e0cf6e743da18f80cc44900390df948
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET}
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
Expand Down Expand Up @@ -1439,75 +1440,30 @@ class HiveDDLSuite
}
}

test("create hive serde table with new syntax - orc") {
Seq("true", "false").foreach { value =>
withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> value) {
withTable("t", "t2", "t3") {
withTempPath { path =>
sql(
s"""
|CREATE TABLE t(id int) USING hive
|OPTIONS(fileFormat 'orc', compression 'Zlib')
|LOCATION '${path.toURI}'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
assert(table.storage.properties.get("compression") == Some("Zlib"))
assert(spark.table("t").collect().isEmpty)

sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
// Check if this is compressed as ZLIB.
val maybeOrcFile = path.listFiles().find(_.getName.startsWith("part"))
assert(maybeOrcFile.isDefined)
val orcFilePath = maybeOrcFile.get.toPath.toString
val expectedCompressionKind =
OrcFileOperator.getFileReader(orcFilePath).get.getCompression
assert("ZLIB" === expectedCompressionKind.name())

sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2")
val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
assert(DDLUtils.isHiveTable(table2))
assert(
table2.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
checkAnswer(spark.table("t2"), Row(1, "a"))

sql("CREATE TABLE t3(a int, p int) USING hive PARTITIONED BY (p)")
sql("INSERT INTO t3 PARTITION(p=1) SELECT 0")
checkAnswer(spark.table("t3"), Row(0, 1))
}
}
}
}
}

test("create hive serde table with new syntax - parquet") {
test("create hive serde table with new syntax") {
withTable("t", "t2", "t3") {
withTempPath { path =>
sql(
s"""
|CREATE TABLE t(id int) USING hive
|OPTIONS(fileFormat 'parquet', compression 'gzip')
|LOCATION '${path.toURI}'
""".stripMargin)
|CREATE TABLE t(id int) USING hive
|OPTIONS(fileFormat 'orc', compression 'Zlib')
|LOCATION '${path.toURI}'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.serde ==
Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
assert(table.storage.properties.get("compression") == Some("gzip"))
assert(table.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
assert(table.storage.properties.get("compression") == Some("Zlib"))
assert(spark.table("t").collect().isEmpty)

sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
val maybeParquetFile = path.listFiles().find(f => f.getName.startsWith("part"))
assert(maybeParquetFile.isDefined)

val footer = ParquetFileReader.readFooter(
sparkContext.hadoopConfiguration,
new Path(maybeParquetFile.get.getPath),
NO_FILTER)
assert("GZIP" === footer.getBlocks.get(0).getColumns().get(0).getCodec.toString)
// Check if this is compressed as ZLIB.
val maybeOrcFile = path.listFiles().find(_.getName.startsWith("part"))
assert(maybeOrcFile.isDefined)
val orcFilePath = maybeOrcFile.get.toPath.toString
val expectedCompressionKind =
OrcFileOperator.getFileReader(orcFilePath).get.getCompression
assert("ZLIB" === expectedCompressionKind.name())

sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2")
val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
Expand Down Expand Up @@ -2056,4 +2012,48 @@ class HiveDDLSuite
}
}
}

private def assertCompression(maybeFile: Option[File], format: String, compression: String) = {
assert(maybeFile.isDefined)

val actualCompression = format match {
case "orc" =>
OrcFileOperator.getFileReader(maybeFile.get.toPath.toString).get.getCompression.name

case "parquet" =>
val footer = ParquetFileReader.readFooter(
sparkContext.hadoopConfiguration, new Path(maybeFile.get.getPath), NO_FILTER)
footer.getBlocks.get(0).getColumns.get(0).getCodec.toString
}

assert(compression === actualCompression)
}

// Since ORC uses 'ZLIB' and Parquet uses 'SNAPPY' by default, we test with different formats.
Seq(("orc", "SNAPPY"), ("parquet", "GZIP")).foreach { case (fileFormat, compression) =>
test(s"SPARK-22158 convertMetastore should not ignore table property - $fileFormat") {
withSQLConf(CONVERT_METASTORE_ORC.key -> "true", CONVERT_METASTORE_PARQUET.key -> "true") {
withTable("t") {
withTempPath { path =>
sql(
s"""
|CREATE TABLE t(id int) USING hive
|OPTIONS(fileFormat '$fileFormat', compression '$compression')
|LOCATION '${path.toURI}'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.serde.get.contains(fileFormat))
assert(table.storage.properties.get("compression") == Some(compression))
assert(spark.table("t").collect().isEmpty)

sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
assertCompression(maybeFile, fileFormat, compression)
}
}
}
}
}
}