-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spark.sql.orc.compression.codec' configuration doesn't take effect on hive table writing #20087
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
9bbfe6e
48cf108
5dbd3ed
5124f1b
6907a3e
67e40d4
e2526ca
8ae86ee
94ac716
43e041f
ee0c558
e9f705d
d3aa7a0
5244aaf
b96a213
a05e85e
b962488
27c949d
79f7263
a51212b
f51c8fd
1860a43
a7cfd6b
eb99b8a
1f5e354
bcfeef5
cd92913
bc4bef4
2ab4012
84707f0
ea9da61
158f7e6
145820b
5b524cc
f9dcdbc
fd4e304
0a30e93
d1f422c
55afac4
bf85301
3e3e938
7236914
e6449e8
0377755
b66700a
f9e7b0c
285d342
bd1a80a
584cdc2
5b150bc
2337edd
43e7eb5
4b89b44
6cf32e0
365c5bf
99271d6
2b9dfbe
5b5e1df
118f788
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Fix test issue
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,24 +102,24 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo | |
| val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" else "" | ||
| sql( | ||
| s""" | ||
| |CREATE TABLE $tableName(a int) | ||
| |$partitionCreate | ||
| |STORED AS $format | ||
| |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' | ||
| |$tblProperties | ||
| """.stripMargin) | ||
| |CREATE TABLE $tableName(a int) | ||
| |$partitionCreate | ||
| |STORED AS $format | ||
| |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' | ||
| |$tblProperties | ||
| """.stripMargin) | ||
| } | ||
|
|
||
| private def writeDataToTable( | ||
| tableName: String, | ||
| partition: Option[String]): Unit = { | ||
| val partitionInsert = partition.map(p => s"partition ($p)").mkString | ||
| val partitionInsert = partition.map(p => s"partition (p='$p')").mkString | ||
| sql( | ||
| s""" | ||
| |INSERT INTO TABLE $tableName | ||
| |$partitionInsert | ||
| |SELECT * FROM table_source | ||
| """.stripMargin) | ||
| |INSERT INTO TABLE $tableName | ||
| |$partitionInsert | ||
| |SELECT * FROM table_source | ||
| """.stripMargin) | ||
| } | ||
|
|
||
| private def getTableSize(path: String): Long = { | ||
|
|
@@ -128,6 +128,11 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo | |
| files.map(_.length()).sum | ||
| } | ||
|
|
||
| private def getTablePartitionPath(dir: File, tableName: String, partition: Option[String]) = { | ||
| val partitionPath = partition.map(p => s"p=$p").mkString | ||
| s"${dir.getPath.stripSuffix("/")}/$tableName/$partitionPath" | ||
| } | ||
|
|
||
| private def getUncompressedDataSizeByFormat( | ||
| format: String, isPartitioned: Boolean): Long = { | ||
| var totalSize = 0L | ||
|
|
@@ -137,9 +142,9 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo | |
| withTempDir { tmpDir => | ||
| withTable(tableName) { | ||
| createTable(tmpDir, tableName, isPartitioned, format, Option(codecName)) | ||
| val partition = if (isPartitioned) Some("p='test'") else None | ||
| val partition = if (isPartitioned) Some("test") else None | ||
|
||
| writeDataToTable(tableName, partition) | ||
| val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName/${partition.mkString}" | ||
| val path = getTablePartitionPath(tmpDir, tableName, partition) | ||
| totalSize = getTableSize(path) | ||
| } | ||
| } | ||
|
|
@@ -157,9 +162,9 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo | |
| withTempDir { tmpDir => | ||
| withTable(tableName) { | ||
| createTable(tmpDir, tableName, isPartitioned, format, compressionCodec) | ||
| val partition = if (isPartitioned) Some("p='test'") else None | ||
| val partition = if (isPartitioned) Some("test") else None | ||
|
||
| writeDataToTable(tableName, partition) | ||
| val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName/${partition.mkString}" | ||
| val path = getTablePartitionPath(tmpDir, tableName, partition) | ||
| val relCompressionCodecs = getTableCompressionCodec(path, format) | ||
| assert(relCompressionCodecs.length == 1) | ||
| val tableSize = getTableSize(path) | ||
|
|
@@ -289,12 +294,12 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo | |
| createTable(tmpDir, tableName, isPartitioned, format, None) | ||
| withTable(tableName) { | ||
| compressCodecs.foreach { compressionCodec => | ||
| val partition = if (isPartitioned) Some(s"p='$compressionCodec'") else None | ||
| val partition = if (isPartitioned) Some(compressionCodec) else None | ||
|
||
| withSQLConf(getConvertMetastoreConfName(format) -> convertMetastore.toString, | ||
| getSparkCompressionConfName(format) -> compressionCodec | ||
| ) { writeDataToTable(tableName, partition) } | ||
| } | ||
| val tablePath = s"${tmpDir.getPath.stripSuffix("/")}/$tableName" | ||
| val tablePath = getTablePartitionPath(tmpDir, tableName, None) | ||
| val relCompressionCodecs = | ||
| if (isPartitioned) compressCodecs.flatMap { codec => | ||
| getTableCompressionCodec(s"$tablePath/p=$codec", format) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is INSERT after CREATE TABLE. We also need to test/fix another common cases, CTAS [CREATE TABLE AS SELECT]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CTAS statement is not allowed to create a partitioned table using Hive's file formats. So I use the syntax of
CREATE TABLE tableName USING ... OPTIONS (...) PARTITIONED BY...to create a table.However, it seems to be different from non-partitioned hive table when convertMetastore is true.For non-partitioned hive table, session-level will take effect, but for table created by CTAS, table-level takes effect.
And if I merge the code of your PR(#20120), they would be consistent, table-level compression will take effect.
Should I fix it after your PR closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can merge this PR first. Will ping you when my PR is fixed. Thanks!