Skip to content
Closed
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
677541b
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 13, 2017
4e70fff
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 14, 2017
3f022f9
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 15, 2017
6d77bf9
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 15, 2017
42aca3d
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 15, 2017
5cbe999
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 16, 2017
732266c
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 16, 2017
c7ff62c
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 16, 2017
384ee04
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 20, 2017
8c92074
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 20, 2017
dd5060a
Merge branch 'master' into master
fjh100456 Sep 20, 2017
d427df5
Update InsertSuite.scala
fjh100456 Sep 20, 2017
35cfa01
Update InsertSuite.scala
fjh100456 Sep 20, 2017
5387497
Fix test problems
fjh100456 Sep 20, 2017
676d6a7
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 27, 2017
ae1da8f
Fix scala style issue
fjh100456 Sep 27, 2017
fd73145
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 28, 2017
7615939
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 28, 2017
90cbcb3
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 10, 2017
dd6d635
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 10, 2017
4fe8170
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 12, 2017
aa31261
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 16, 2017
dfb36d9
Merge branch 'master' into master
fjh100456 Oct 16, 2017
c4801f6
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 16, 2017
105e129
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 16, 2017
dc12038
Merge pull request #1 from apache/master
fjh100456 Dec 18, 2017
d779ee6
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Dec 19, 2017
0cb7b7a
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Dec 20, 2017
78e0403
Resume the changing, and change it in another pr later.
fjh100456 Dec 23, 2017
7804f60
Change to public
fjh100456 Dec 23, 2017
52cdd75
Fix the code with gatorsmile's suggestion.
fjh100456 Dec 23, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.util.Locale

import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
Expand All @@ -27,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf
/**
* Options for the Parquet data source.
*/
private[parquet] class ParquetOptions(
class ParquetOptions(
@transient private val parameters: CaseInsensitiveMap[String],
@transient private val sqlConf: SQLConf)
extends Serializable {
Expand All @@ -42,8 +43,15 @@ private[parquet] class ParquetOptions(
* Acceptable values are defined in [[shortParquetCompressionCodecNames]].
*/
val compressionCodecClassName: String = {
val codecName = parameters.getOrElse("compression",
sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT)
// `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and
// `spark.sql.parquet.compression.codec`
// are in order of precedence from highest to lowest.
val parquetCompressionConf = parameters.get(ParquetOutputFormat.COMPRESSION)
val codecName = parameters
.get("compression")
.orElse(parquetCompressionConf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this new? Do we support parquet.compression before this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's new. I guess PartitionOptions did not used when writing hive table before, because it's invisible for hive. I changeed it to public.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we keep the old behavior? We could add it later? We do not want to mix multiple issues in the same PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, parquet's table-level compression may be overwrited in this PR, and it may not be what we want.
Shall I fix it first in another PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can submit a separate PR for that issue. The behavior change needs to be documented in SparkSQL doc.

.getOrElse(sqlConf.parquetCompressionCodec)
.toLowerCase(Locale.ROOT)
if (!shortParquetCompressionCodecNames.contains(codecName)) {
val availableCodecs =
shortParquetCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,16 @@ package org.apache.spark.sql.hive.execution

import java.util.Locale

import scala.collection.JavaConverters._

import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileSinkDesc is still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove it.

import org.apache.orc.OrcConf.COMPRESS
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.orc.OrcOptions
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.internal.SQLConf

/**
* Options for the Hive data source. Note that rule `DetermineHiveSerde` will extract Hive
Expand Down Expand Up @@ -102,4 +111,18 @@ object HiveOptions {
"collectionDelim" -> "colelction.delim",
"mapkeyDelim" -> "mapkey.delim",
"lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase(Locale.ROOT) -> v }

def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): Option[(String, String)] = {
tableInfo.getOutputFileFormatClassName.toLowerCase match {
case formatName if formatName.endsWith("parquetoutputformat") =>
val compressionCodec = new ParquetOptions(tableInfo.getProperties.asScala.toMap,
sqlConf).compressionCodecClassName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We normally do not split the code like this. We like the following way:

    val tableProps = tableInfo.getProperties.asScala.toMap
    tableInfo.getOutputFileFormatClassName.toLowerCase match {
      case formatName if formatName.endsWith("parquetoutputformat") =>
        val compressionCodec = new ParquetOptions(tableProps, sqlConf).compressionCodecClassName
        Option((ParquetOutputFormat.COMPRESSION, compressionCodec))
...

Copy link
Contributor Author

@fjh100456 fjh100456 Dec 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it looks better, I will change it.

Option((ParquetOutputFormat.COMPRESSION, compressionCodec))
case formatName if formatName.endsWith("orcoutputformat") =>
val compressionCodec = new OrcOptions(tableInfo.getProperties.asScala.toMap,
sqlConf).compressionCodec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also update OrcOptions's compressionCodec to compressionCodecClassName

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compressionCodec is used in several places, do you mean I should fix them all?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Just to make it consistent

Option((COMPRESS.getAttribute, compressionCodec))
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
.get("mapreduce.output.fileoutputformat.compress.type"))
}

// Set compression by priority
HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf)
.foreach{ case (compression, codec) =>
hadoopConf.set(compression, codec)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.foreach { case (compression, codec) => hadoopConf.set(compression, codec) }


val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
Expand Down
261 changes: 258 additions & 3 deletions sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ package org.apache.spark.sql.hive

import java.io.File

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkException
import org.apache.spark.sql.{QueryTest, _}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand All @@ -35,7 +39,7 @@ case class TestData(key: Int, value: String)
case class ThreeCloumntable(key: Int, value: String, key1: String)

class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
with SQLTestUtils {
with ParquetTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the insert suite. We are unable to do this.

Could you create a separate suite in the current package org.apache.spark.sql.hive? The suite name can be CompressionCodecSuite

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also check whether the compression takes an effect? Compare the size whether is smaller than the original size without compressions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems compressed table does not always be smaller than uncompressed tables.
SNAPPY Compression size may be bigger than non-compression size when the amount of data is not big. So I'd like to check the size not equal when compression are different.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine to me. Thanks!

import spark.implicits._

override lazy val testData = spark.sparkContext.parallelize(
Expand Down Expand Up @@ -750,4 +754,255 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
}
}
}

private def getConvertMetastoreConfName(format: String): String = format match {
case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
}

private def getSparkCompressionConfName(format: String): String = format match {
case "parquet" => SQLConf.PARQUET_COMPRESSION.key
case "orc" => SQLConf.ORC_COMPRESSION.key
}

private def getHiveCompressPropName(format: String): String = {
format.toLowerCase match {
case "parquet" => "parquet.compression"
case "orc" => "orc.compress"
}
}

private def getTableCompressionCodec(path: String, format: String): String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic means a compression codec from the files. The prefix getTable looks misleading to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to getHiveCompressPropName, is it appropriate?

val hadoopConf = spark.sessionState.newHadoopConf()
val codecs = format match {
case "parquet" => for {
footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
block <- footer.getParquetMetadata.getBlocks.asScala
column <- block.getColumns.asScala
} yield column.getCodec.name()
case "orc" => new File(path).listFiles().filter{ file =>
file.isFile && !file.getName.endsWith(".crc") && file.getName != "_SUCCESS"
}.map { orcFile =>
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
}.toSeq
}

assert(codecs.distinct.length == 1)
codecs.head
}

private def writeDataToTable(
rootDir: File,
tableName: String,
isPartitioned: Boolean,
format: String,
compressionCodec: Option[String]) {
val tblProperties = compressionCodec match {
case Some(prop) => s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
case _ => ""
}
val partitionCreate = if (isPartitioned) "PARTITIONED BY (p int)" else ""
sql(
s"""
|CREATE TABLE $tableName(a int)
|$partitionCreate
|STORED AS $format
|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
|$tblProperties
""".stripMargin)

val partitionInsert = if (isPartitioned) s"partition (p=10000)" else ""
sql(
s"""
|INSERT OVERWRITE TABLE $tableName
|$partitionInsert
|SELECT * FROM table_source
""".stripMargin)
}

private def checkCompressionCodecForTable(
format: String,
isPartitioned: Boolean,
compressionCodec: Option[String])(assertion: String => Unit): Unit = {
val tableName = s"tbl_$format${isPartitioned}"
withTempDir { tmpDir =>
withTable(tableName) {
writeDataToTable(tmpDir, tableName, isPartitioned, format, compressionCodec)
val partition = if (isPartitioned) "p=10000" else ""
val path = s"${tmpDir.getPath.stripSuffix("/")}/${tableName}/$partition"
assertion(getTableCompressionCodec(path, format))
}
}
}

private def checkTableCompressionCodecForCodecs(
format: String,
isPartitioned: Boolean,
convertMetastore: Boolean,
compressionCodecs: List[String],
tableCompressionCodecs: List[String])
(assertion: (Option[String], String, String) => Unit): Unit = {
withSQLConf(getConvertMetastoreConfName(format) -> convertMetastore.toString) {
tableCompressionCodecs.foreach { tableCompression =>
compressionCodecs.foreach { sessionCompressionCodec =>
withSQLConf(getSparkCompressionConfName(format) -> sessionCompressionCodec) {
// 'tableCompression = null' means no table-level compression
val compression = if (tableCompression == null) None else Some(tableCompression)
checkCompressionCodecForTable(format, isPartitioned, compression) {
case realCompressionCodec => assertion(compression,
sessionCompressionCodec, realCompressionCodec)
}
}
}
}
}
}

private def testCompressionCodec(testCondition: String)(f: => Unit): Unit = {
test("[SPARK-21786] - Check the priority between table-level compression and " +
s"session-level compression $testCondition") {
withTempView("table_source") {
(0 until 100000).toDF("a").createOrReplaceTempView("table_source")
f
}
}
}

testCompressionCodec("when table-level and session-level compression are both configured and " +
"convertMetastore is false") {
def checkForTableWithCompressProp(format: String, compressCodecs: List[String]): Unit = {
// For tables with table-level compression property, when
// 'spark.sql.hive.convertMetastore[Parquet|Orc]' was set to 'false', partitioned tables
// and non-partitioned tables will always take the table-level compression
// configuration first and ignore session compression configuration.
// Check for partitioned table, when convertMetastore is false
checkTableCompressionCodecForCodecs(
format = format,
isPartitioned = true,
convertMetastore = false,
compressionCodecs = compressCodecs,
tableCompressionCodecs = compressCodecs) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// Expect table-level take effect
assert(tableCompressionCodec.get == realCompressionCodec)
}

// Check for non-partitioned table, when convertMetastoreParquet is false
checkTableCompressionCodecForCodecs(
format = format,
isPartitioned = false,
convertMetastore = false,
compressionCodecs = compressCodecs,
tableCompressionCodecs = compressCodecs) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// Expect table-level take effect
assert(tableCompressionCodec.get == realCompressionCodec)
}
}

checkForTableWithCompressProp("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP"))
checkForTableWithCompressProp("orc", List("NONE", "SNAPPY", "ZLIB"))
}

testCompressionCodec("when there's no table-level compression and convertMetastore is false") {
def checkForTableWithoutCompressProp(format: String, compressCodecs: List[String]): Unit = {
// For tables without table-level compression property, session-level compression
// configuration will take effect.
// Check for partitioned table, when convertMetastore is false
checkTableCompressionCodecForCodecs(
format = format,
isPartitioned = true,
convertMetastore = false,
compressionCodecs = compressCodecs,
tableCompressionCodecs = List(null)) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// Expect session-level take effect
assert(sessionCompressionCodec == realCompressionCodec)
}

// Check for non-partitioned table, when convertMetastore is false
checkTableCompressionCodecForCodecs(
format = format,
isPartitioned = false,
convertMetastore = false,
compressionCodecs = compressCodecs,
tableCompressionCodecs = List(null)) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// Expect session-level take effect
assert(sessionCompressionCodec == realCompressionCodec)
}
}

checkForTableWithoutCompressProp("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP"))
checkForTableWithoutCompressProp("orc", List("NONE", "SNAPPY", "ZLIB"))
}

testCompressionCodec("when table-level and session-level compression are both configured and " +
"convertMetastore is true") {
def checkForTableWithCompressProp(format: String, compressCodecs: List[String]): Unit = {
// For tables with table-level compression property, when
// 'spark.sql.hive.convertMetastore[Parquet|Orc]' was set to 'true', partitioned tables
// will always take the table-level compression configuration first, but non-partitioned
// tables will take the session-level compression configuration.
// Check for partitioned table, when convertMetastore is true
checkTableCompressionCodecForCodecs(
format = format,
isPartitioned = true,
convertMetastore = true,
compressionCodecs = compressCodecs,
tableCompressionCodecs = compressCodecs) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// Expect table-level take effect
assert(tableCompressionCodec.get == realCompressionCodec)
}

// Check for non-partitioned table, when convertMetastore is true
checkTableCompressionCodecForCodecs(
format = format,
isPartitioned = false,
convertMetastore = true,
compressionCodecs = compressCodecs,
tableCompressionCodecs = compressCodecs) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// Expect session-level take effect
assert(sessionCompressionCodec == realCompressionCodec)
}
}

checkForTableWithCompressProp("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP"))
checkForTableWithCompressProp("orc", List("NONE", "SNAPPY", "ZLIB"))
}

testCompressionCodec("when there's no table-level compression and convertMetastore is true") {
def checkForTableWithoutCompressProp(format: String, compressCodecs: List[String]): Unit = {
// For tables without table-level compression property, session-level compression
// configuration will take effect.
// Check for partitioned table, when convertMetastore is true
checkTableCompressionCodecForCodecs(
format = format,
isPartitioned = true,
convertMetastore = true,
compressionCodecs = compressCodecs,
tableCompressionCodecs = List(null)) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// Expect session-level take effect
assert(sessionCompressionCodec == realCompressionCodec)
}

// Check for non-partitioned table, when convertMetastore is true
checkTableCompressionCodecForCodecs(
format = format,
isPartitioned = false,
convertMetastore = true,
compressionCodecs = compressCodecs,
tableCompressionCodecs = List(null)) {
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) =>
// Expect session-level take effect
assert(sessionCompressionCodec == realCompressionCodec)
}
}

checkForTableWithoutCompressProp("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP"))
checkForTableWithoutCompressProp("orc", List("NONE", "SNAPPY", "ZLIB"))
}
}