Skip to content

Commit 3b20b34

Browse files
gengliangwangHyukjinKwon
authored andcommitted
[SPARK-24367][SQL] Parquet: use JOB_SUMMARY_LEVEL instead of deprecated flag ENABLE_JOB_SUMMARY
## What changes were proposed in this pull request? In current parquet version,the conf ENABLE_JOB_SUMMARY is deprecated. When writing to Parquet files, the warning message ```WARN org.apache.parquet.hadoop.ParquetOutputFormat: Setting parquet.enable.summary-metadata is deprecated, please use parquet.summary.metadata.level``` keeps showing up. From https://github.com/apache/parquet-mr/blame/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L164 we can see that we should use JOB_SUMMARY_LEVEL. ## How was this patch tested? Unit test Author: Gengliang Wang <[email protected]> Closes #21411 from gengliangwang/summaryLevel.
1 parent 0fd68cb commit 3b20b34

File tree

7 files changed

+17
-10
lines changed

7 files changed

+17
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ object SQLConf {
395395
.doc("The output committer class used by Parquet. The specified class needs to be a " +
396396
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
397397
"of org.apache.parquet.hadoop.ParquetOutputCommitter. If it is not, then metadata summaries" +
398-
"will never be created, irrespective of the value of parquet.enable.summary-metadata")
398+
"will never be created, irrespective of the value of parquet.summary.metadata.level")
399399
.internal()
400400
.stringConf
401401
.createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.parquet.filter2.compat.FilterCompat
3434
import org.apache.parquet.filter2.predicate.FilterApi
3535
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
3636
import org.apache.parquet.hadoop._
37+
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel
3738
import org.apache.parquet.hadoop.codec.CodecConfig
3839
import org.apache.parquet.hadoop.util.ContextUtil
3940
import org.apache.parquet.schema.MessageType
@@ -125,16 +126,17 @@ class ParquetFileFormat
125126
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
126127

127128
// SPARK-15719: Disables writing Parquet summary files by default.
128-
if (conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) {
129-
conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
129+
if (conf.get(ParquetOutputFormat.JOB_SUMMARY_LEVEL) == null
130+
&& conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) {
131+
conf.setEnum(ParquetOutputFormat.JOB_SUMMARY_LEVEL, JobSummaryLevel.NONE)
130132
}
131133

132-
if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
134+
if (ParquetOutputFormat.getJobSummaryLevel(conf) == JobSummaryLevel.NONE
133135
&& !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
134136
// output summary is requested, but the class is not a Parquet Committer
135137
logWarning(s"Committer $committerClass is not a ParquetOutputCommitter and cannot" +
136138
s" create job summaries. " +
137-
s"Set Parquet option ${ParquetOutputFormat.ENABLE_JOB_SUMMARY} to false.")
139+
s"Set Parquet option ${ParquetOutputFormat.JOB_SUMMARY_LEVEL} to NONE.")
138140
}
139141

140142
new OutputWriterFactory {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,14 @@ class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils
9191
summary: Boolean,
9292
check: Boolean): Option[FileStatus] = {
9393
var result: Option[FileStatus] = None
94+
val summaryLevel = if (summary) {
95+
"ALL"
96+
} else {
97+
"NONE"
98+
}
9499
withSQLConf(
95100
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> committer,
96-
ParquetOutputFormat.ENABLE_JOB_SUMMARY -> summary.toString) {
101+
ParquetOutputFormat.JOB_SUMMARY_LEVEL -> summaryLevel) {
97102
withTempPath { dest =>
98103
val df = spark.createDataFrame(Seq((1, "4"), (2, "2")))
99104
val destPath = new Path(dest.toURI)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
543543

544544
val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions)
545545

546-
withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
546+
withSQLConf(ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL") {
547547
withTempPath { dir =>
548548
val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
549549
spark.range(1 << 16).selectExpr("(id % 4) AS i")

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1014,7 +1014,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
10141014
val path = dir.getCanonicalPath
10151015

10161016
withSQLConf(
1017-
ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true",
1017+
ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",
10181018
"spark.sql.sources.commitProtocolClass" ->
10191019
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
10201020
spark.range(3).write.parquet(s"$path/p0=0/p1=0")

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
275275
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName,
276276
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
277277
SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true",
278-
ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true"
278+
ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL"
279279
) {
280280
testSchemaMerging(2)
281281
}

sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
124124

125125
test("SPARK-8604: Parquet data source should write summary file while doing appending") {
126126
withSQLConf(
127-
ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true",
127+
ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",
128128
SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
129129
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
130130
withTempPath { dir =>

0 commit comments

Comments
 (0)