Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
677541b
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 13, 2017
4e70fff
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 14, 2017
3f022f9
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 15, 2017
6d77bf9
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 15, 2017
42aca3d
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 15, 2017
5cbe999
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 16, 2017
732266c
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 16, 2017
c7ff62c
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 16, 2017
384ee04
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 20, 2017
8c92074
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 20, 2017
dd5060a
Merge branch 'master' into master
fjh100456 Sep 20, 2017
d427df5
Update InsertSuite.scala
fjh100456 Sep 20, 2017
35cfa01
Update InsertSuite.scala
fjh100456 Sep 20, 2017
5387497
Fix test problems
fjh100456 Sep 20, 2017
676d6a7
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 27, 2017
ae1da8f
Fix scala style issue
fjh100456 Sep 27, 2017
fd73145
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 28, 2017
7615939
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 28, 2017
90cbcb3
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 10, 2017
dd6d635
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 10, 2017
4fe8170
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 12, 2017
aa31261
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 16, 2017
dfb36d9
Merge branch 'master' into master
fjh100456 Oct 16, 2017
c4801f6
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 16, 2017
105e129
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 16, 2017
dc12038
Merge pull request #1 from apache/master
fjh100456 Dec 18, 2017
d779ee6
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Dec 19, 2017
0cb7b7a
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Dec 20, 2017
78e0403
Resume the changing, and change it in another pr later.
fjh100456 Dec 23, 2017
7804f60
Change to public
fjh100456 Dec 23, 2017
52cdd75
Fix the code with gatorsmile's suggestion.
fjh100456 Dec 23, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
…rk.sql.orc.compression.codec' configuration doesn't take effect on hive table writing

Fix some issue
  • Loading branch information
fjh100456 committed Oct 10, 2017
commit 90cbcb3c58e115995eaa58f61a9cc818d2f17cdf
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,22 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
}

fileSinkConf.tableInfo.getOutputFileFormatClassName match {
case formatName if formatName.endsWith("ParquetOutputFormat") =>
case formatName if formatName.toLowerCase.endsWith("parquetoutputformat") =>
val compressionConf = "parquet.compression"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"parquet.compression" -> ParquetOutputFormat.COMPRESSION

val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf,
sparkSession.sessionState.conf.parquetCompressionCodec) match {
val compressionCodec = getCompressionByPriority(
fileSinkConf,
compressionConf,
default = sparkSession.sessionState.conf.parquetCompressionCodec) match {
case "NONE" => "UNCOMPRESSED"
case _@x => x
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

}
hadoopConf.set(compressionConf, compressionCodec)
case formatName if formatName.endsWith("OrcOutputFormat") =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case formatName if formatName.toLowerCase.endsWith("orcoutputformat") =>?
Or, you write fileSinkConf.tableInfo.getOutputFileFormatClassName.toLowerCase match {, then each match does not convert lower-case conversion?

val compressionConf = "orc.compress"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> OrcRelation.ORC_COMPRESSION

val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf,
sparkSession.sessionState.conf.orcCompressionCodec) match {
val compressionCodec = getCompressionByPriority(
fileSinkConf,
compressionConf,
default = sparkSession.sessionState.conf.orcCompressionCodec) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to add a normalization logics for both ORC and Parquet.

Check the ParquetOptions.shortParquetCompressionCodecNames

case "UNCOMPRESSED" => "NONE"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ORC and Parquet are different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are different, the style of parameter names and parameter values are all different, and should be parquet and orc problems.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why always making it upper case? This looks buggy.

case _@x => x
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case x => x?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, the following process will check the correctness of this value, and because "orcoptions" is not accessable here, I have to add the "uncompressed" => "NONE" conversion.
Do you have any good advice?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case o => o

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the whole determination logics to object HiveOptions. You can call it in SaveAsHiveFile.scala

Expand All @@ -106,8 +110,13 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
options = Map.empty)
}

// Because compression configurations can come in a variety of ways,
// we choose the compression configuration in this order:
// For parquet: `compression` > `parquet.compression` > `spark.sql.parquet.compression.codec`
// For orc: `compression` > `orc.compress` > `spark.sql.orc.compression.codec`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to leave this priority in the spark document or somewhere? https://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

private def getCompressionByPriority(fileSinkConf: FileSinkDesc,
compressionConf: String, default: String): String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private def getCompressionByPriority(
    fileSinkConf: FileSinkDesc,
    compressionConf: String,
    default: String): String = {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the description to explain the priority sequences?

// The variable `default` was set to spark sql conf.
val props = fileSinkConf.tableInfo.getProperties
val priorities = List("compression", compressionConf)
priorities.find(props.getProperty(_, null) != null)
Expand Down
Loading