Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
677541b
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 13, 2017
4e70fff
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 14, 2017
3f022f9
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 15, 2017
6d77bf9
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 15, 2017
42aca3d
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 15, 2017
5cbe999
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 16, 2017
732266c
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 16, 2017
c7ff62c
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 16, 2017
384ee04
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 20, 2017
8c92074
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 20, 2017
dd5060a
Merge branch 'master' into master
fjh100456 Sep 20, 2017
d427df5
Update InsertSuite.scala
fjh100456 Sep 20, 2017
35cfa01
Update InsertSuite.scala
fjh100456 Sep 20, 2017
5387497
Fix test problems
fjh100456 Sep 20, 2017
676d6a7
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 27, 2017
ae1da8f
Fix scala style issue
fjh100456 Sep 27, 2017
fd73145
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 28, 2017
7615939
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 28, 2017
90cbcb3
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 10, 2017
dd6d635
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 10, 2017
4fe8170
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 12, 2017
aa31261
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 16, 2017
dfb36d9
Merge branch 'master' into master
fjh100456 Oct 16, 2017
c4801f6
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 16, 2017
105e129
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 16, 2017
dc12038
Merge pull request #1 from apache/master
fjh100456 Dec 18, 2017
d779ee6
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Dec 19, 2017
0cb7b7a
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Dec 20, 2017
78e0403
Resume the changing, and change it in another pr later.
fjh100456 Dec 23, 2017
7804f60
Change to public
fjh100456 Dec 23, 2017
52cdd75
Fix the code with gatorsmile's suggestion.
fjh100456 Dec 23, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
…rk.sql.orc.compression.codec' configuration doesn't take effect on hive table writing

Fix the test case issue
  • Loading branch information
fjh100456 committed Sep 27, 2017
commit 676d6a7a64e137ca05981fae6cebea27b2d9d401
217 changes: 144 additions & 73 deletions sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import java.io.File

import org.scalatest.BeforeAndAfter

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.sql.{QueryTest, _}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand All @@ -35,7 +39,7 @@ case class TestData(key: Int, value: String)
case class ThreeCloumntable(key: Int, value: String, key1: String)

class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
with SQLTestUtils {
with ParquetTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the insert suite. We are unable to do this.

Could you create a separate suite in the current package org.apache.spark.sql.hive? The suite name can be CompressionCodecSuite

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also check whether the compression takes an effect? Compare the size whether is smaller than the original size without compressions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems compressed table does not always be smaller than uncompressed tables.
SNAPPY Compression size may be bigger than non-compression size when the amount of data is not big. So I'd like to check the size not equal when compression are different.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine to me. Thanks!

import spark.implicits._

override lazy val testData = spark.sparkContext.parallelize(
Expand Down Expand Up @@ -730,118 +734,185 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
}

test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you split the whole test case to multiple independent smaller unit test cases?

"and 'spark.sql.parquet.compression.codec' taking effect on hive table writing") {
case class CompressionConf(name: String, codeC: String)
"and 'spark.sql.orc.compression.codec' taking effect on hive table writing") {

val hadoopConf = spark.sessionState.newHadoopConf()

val partitionStr = "p=10000"

case class TableCompressionConf(name: String, codeC: String)

case class TableDefine(tableName: String, isPartitioned: Boolean, format: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a function?

compressionConf: Option[CompressionConf]) {
compressionConf: Option[TableCompressionConf]) {
def createTable(rootDir: File): Unit = {
val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'")
sql(
s"""
|CREATE TABLE $tableName(a int)
|${ if (isPartitioned) "PARTITIONED BY (p int)" else "" }
|${if (isPartitioned) "PARTITIONED BY (p int)" else ""}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not embed it. Just create a parameter above this?

|STORED AS $format
|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
|${ if (compressionConf.nonEmpty) s"TBLPROPERTIES(${compression.get})" else "" }
|${if (compressionConf.nonEmpty) s"TBLPROPERTIES(${compression.get})" else ""}
""".stripMargin)
}

def insertOverwriteTable(): Unit = {
sql(
s"""
|INSERT OVERWRITE TABLE $tableName
|${ if (isPartitioned) "partition (p=10000)" else "" }
|${if (isPartitioned) s"partition ($partitionStr)" else ""}
|SELECT * from table_source
""".stripMargin)
}
}

def getTableCompressionCodec(path: String, format: String): String = {
val codecs = format match {
case "parquet" => for {
footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
block <- footer.getParquetMetadata.getBlocks.asScala
column <- block.getColumns.asScala
} yield column.getCodec.name()
case "orc" => new File(path).listFiles()
.filter(file => file.isFile && !file.getName.endsWith(".crc")).map {
orcFile =>
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
}.toSeq
}

def getDirFiles(file: File): List[File] = {
if (!file.exists()) Nil
else if (file.isFile) List(file)
else {
file.listFiles().filterNot(_.getName.startsWith(".hive-staging"))
.groupBy(_.isFile).flatMap {
case (isFile, files) if isFile => files.toList
case (_, dirs) => dirs.flatMap(getDirFiles)
}.toList
assert(codecs.distinct.length == 1)
codecs.head
}

def checkCompressionCodecForTable(format:String, isPartitioned: Boolean,
compressionConf: Option[TableCompressionConf])(assertion: String => Boolean): Unit = {
val table = TableDefine(s"tbl_$format${isPartitioned}",
isPartitioned, format, compressionConf)
withTempDir { tmpDir =>
withTable(table.tableName) {
table.createTable(tmpDir)
table.insertOverwriteTable()
val partition = if (table.isPartitioned) partitionStr else ""
val path = s"${tmpDir.getPath.stripSuffix("/")}/${table.tableName}/$partition"
assertion(getTableCompressionCodec(path, table.format))
}
}
}

def getConvertMetastoreConfName(format: String): String = format match {
case "parquet" => "spark.sql.hive.convertMetastoreParquet"
case "orc" => "spark.sql.hive.convertMetastoreOrc"
}

def getTableSize: Long = {
var totalSize = 0L
withTempDir { tmpDir =>
withTable(tableName) {
createTable(tmpDir)
insertOverwriteTable()
val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName"
val dir = new File(path)
val files = getDirFiles(dir).filter(_.getName.startsWith("part-"))
totalSize = files.map(_.length()).sum
def getSparkCompressionConfName(format: String): String = format match {
case "parquet" => "spark.sql.parquet.compression.codec"
case "orc" => "spark.sql.orc.compression.codec"
}

def checkTableCompressionCodecForCodecs(format: String, isPartitioned: Boolean,
convertMetastore: Boolean, compressionCodecs: List[String],
tableCompressionConf: List[TableCompressionConf])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update the indents for all of them in this PR? See the link: https://github.com/databricks/scala-style-guide#indent

(assertion: (Option[TableCompressionConf], String, String) => Boolean): Unit = {
withSQLConf(getConvertMetastoreConfName(format) -> convertMetastore.toString) {
tableCompressionConf.foreach { tableCompression =>
compressionCodecs.foreach { sessionCompressionCodec =>
withSQLConf(getSparkCompressionConfName(format) -> sessionCompressionCodec) {
val compression = if (tableCompression == null) None else Some(tableCompression)
checkCompressionCodecForTable(format, isPartitioned, compression) {
case realCompressionCodec => assertion(compression,
sessionCompressionCodec, realCompressionCodec)
}
}
}
}
totalSize
}
}

def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: String,
sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, "parquet",
Some(CompressionConf("parquet.compression", tableCodec)))
val tableOrgSize = tableOrg.getTableSize
def checkTableCompressionCodec(format: String, compressionCodecs: List[String],
tableCompressionConf: List[TableCompressionConf]): Unit = {
// For tables with table-level compression property, when
// 'spark.sql.hive.convertMetastoreParquet' was set to 'false', partitioned parquet tables
// and non-partitioned parquet tables will always take the table-level compression
// configuration first and ignore session compression configuration.
checkTableCompressionCodecForCodecs(format = format, isPartitioned = true,
convertMetastore = false, compressionCodecs, tableCompressionConf) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// table-level take effect
tableCompressionCodec.get.codeC == realCompressionCodec
}

checkTableCompressionCodecForCodecs(format = format, isPartitioned = false,
convertMetastore = false, compressionCodecs, tableCompressionConf) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// table-level take effect
tableCompressionCodec.get.codeC == realCompressionCodec
}

withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) {
// priority check, when table-level compression conf was set, expecting
// table-level compression conf is not affected by the session conf, and table-level
// compression conf takes precedence even the two conf of codec is different
val tableOrgSessionConfSize = tableOrg.getTableSize
assert(tableOrgSize == tableOrgSessionConfSize)
// For tables with table-level compression property, when
// 'spark.sql.hive.convertMetastoreParquet' was set to 'true', partitioned parquet tables
// will always take the table-level compression configuration first, but non-partitioned tables
// will take the session-level compression configuration.
checkTableCompressionCodecForCodecs(format = format, isPartitioned = true,
convertMetastore = true, compressionCodecs, tableCompressionConf) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// table-level take effect
tableCompressionCodec.get.codeC == realCompressionCodec
}

// check session conf of compression codec taking effect
val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", None)
assert(f(tableOrg.getTableSize, table.getTableSize))
checkTableCompressionCodecForCodecs(format = format, isPartitioned = false,
convertMetastore = true, compressionCodecs, tableCompressionConf) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// session-level take effect
sessionCompressionCodec == realCompressionCodec
}
}

def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: String,
sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, "orc",
Some(CompressionConf("orc.compress", tableCodec)))
val tableOrgSize = tableOrg.getTableSize
// For tables without table-level compression property, session-level compression configuration
// will take effect.
checkTableCompressionCodecForCodecs(format = format, isPartitioned = true,
convertMetastore = true, compressionCodecs, List(null)) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// session-level take effect
sessionCompressionCodec == realCompressionCodec
}

checkTableCompressionCodecForCodecs(format = format, isPartitioned = false,
convertMetastore = true, compressionCodecs, List(null)) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// session-level take effect
sessionCompressionCodec == realCompressionCodec
}

withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) {
// priority check, when table-level compression conf was set, expecting
// table-level compression conf is not affected by the session conf, and table-level
// compression conf takes precedence even the two conf of codec is different
val tableOrgSessionConfSize = tableOrg.getTableSize
assert(tableOrgSize == tableOrgSessionConfSize)
checkTableCompressionCodecForCodecs(format = format, isPartitioned = true,
convertMetastore = false, compressionCodecs, List(null)) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// session-level take effect
sessionCompressionCodec == realCompressionCodec
}

// check session conf of compression codec taking effect
val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None)
assert(f(tableOrg.getTableSize, table.getTableSize))
checkTableCompressionCodecForCodecs(format = format, isPartitioned = false,
convertMetastore = false, compressionCodecs, List(null)) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// session-level take effect
sessionCompressionCodec == realCompressionCodec
}
}

withTempView("table_source") {
(0 until 100000).toDF("a").createOrReplaceTempView("table_source")

checkParquetCompressionCodec(true, "UNCOMPRESSED", "UNCOMPRESSED")
checkParquetCompressionCodec(true, "GZIP", "GZIP")
checkParquetCompressionCodec(true, "GZIP", "UNCOMPRESSED", _ < _)

withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
checkParquetCompressionCodec(false, "UNCOMPRESSED", "UNCOMPRESSED")
checkParquetCompressionCodec(false, "GZIP", "GZIP")
checkParquetCompressionCodec(false, "GZIP", "UNCOMPRESSED", _ < _)
val parquetCompressionCodec = List("UNCOMPRESSED", "SNAPPY", "GZIP")
val tableCompressionConf = parquetCompressionCodec.map { tableCodec =>
TableCompressionConf("parquet.compression", tableCodec)
}
checkTableCompressionCodec("parquet", parquetCompressionCodec, tableCompressionConf)
}

checkOrcCompressionCodec(true, "NONE", "NONE")
checkOrcCompressionCodec(true, "ZLIB", "ZLIB")
checkOrcCompressionCodec(true, "ZLIB", "NONE", _ < _)

checkOrcCompressionCodec(false, "NONE", "NONE")
checkOrcCompressionCodec(false, "ZLIB", "ZLIB")
checkOrcCompressionCodec(false, "ZLIB", "NONE", _ < _)
withTempView("table_source") {
(0 until 100000).toDF("a").createOrReplaceTempView("table_source")
val orcCompressionCodec = List("NONE", "SNAPPY", "ZLIB")
val tableCompressionConf = orcCompressionCodec.map { tableCodec =>
TableCompressionConf("parquet.compression", tableCodec)
}
checkTableCompressionCodec("orc", orcCompressionCodec, tableCompressionConf)
}
}
}