Skip to content

Commit 1b416a0

Browse files
wangyumcloud-fan
authored andcommitted
[SPARK-27592][SQL] Set the bucketed data source table SerDe correctly
## What changes were proposed in this pull request? Hive using incorrect **InputFormat**(`org.apache.hadoop.mapred.SequenceFileInputFormat`) to read Spark's **Parquet** bucketed data source table. Spark side: ```sql spark-sql> CREATE TABLE t (c1 INT, c2 INT) USING parquet CLUSTERED BY (c1) SORTED BY (c1) INTO 2 BUCKETS; 2019-04-29 17:52:05 WARN HiveExternalCatalog:66 - Persisting bucketed data source table `default`.`t` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. spark-sql> DESC FORMATTED t; c1 int NULL c2 int NULL # Detailed Table Information Database default Table t Owner yumwang Created Time Mon Apr 29 17:52:05 CST 2019 Last Access Thu Jan 01 08:00:00 CST 1970 Created By Spark 2.4.0 Type MANAGED Provider parquet Num Buckets 2 Bucket Columns [`c1`] Sort Columns [`c1`] Table Properties [transient_lastDdlTime=1556531525] Location file:/user/hive/warehouse/t Serde Library org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Storage Properties [serialization.format=1] ``` Hive side: ```sql hive> DESC FORMATTED t; OK # col_name data_type comment c1 int c2 int # Detailed Table Information Database: default Owner: root CreateTime: Wed May 08 03:38:46 GMT-07:00 2019 LastAccessTime: UNKNOWN Retention: 0 Location: file:/user/hive/warehouse/t Table Type: MANAGED_TABLE Table Parameters: bucketing_version spark spark.sql.create.version 3.0.0-SNAPSHOT spark.sql.sources.provider parquet spark.sql.sources.schema.bucketCol.0 c1 spark.sql.sources.schema.numBucketCols 1 spark.sql.sources.schema.numBuckets 2 spark.sql.sources.schema.numParts 1 spark.sql.sources.schema.numSortCols 1 spark.sql.sources.schema.part.0 {\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]} spark.sql.sources.schema.sortCol.0 c1 transient_lastDdlTime 1557311926 # Storage Information SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat Compressed: No Num Buckets: -1 Bucket Columns: [] Sort Columns: [] Storage Desc Params: path file:/user/hive/warehouse/t serialization.format 1 ``` So it's non-bucketed table at Hive side. This pr set the `SerDe` correctly so Hive can read these tables. Related code: https://github.com/apache/spark/blob/33f3c48cac087e079b9c7e342c2e58b16eaaa681/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala#L976-L990 https://github.com/apache/spark/blob/f9776e389215255dc61efaa2eddd92a1fa754b48/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L444-L459 ## How was this patch tested? unit tests Closes #24486 from wangyum/SPARK-27592. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent a493031 commit 1b416a0

File tree

2 files changed

+41
-4
lines changed

2 files changed

+41
-4
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,11 +363,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
363363
(None, message)
364364

365365
// our bucketing is un-compatible with hive(different hash function)
366-
case _ if table.bucketSpec.nonEmpty =>
366+
case Some(serde) if table.bucketSpec.nonEmpty =>
367367
val message =
368368
s"Persisting bucketed data source table $qualifiedTableName into " +
369-
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
370-
(None, message)
369+
"Hive metastore in Spark SQL specific format, which is NOT compatible with " +
370+
"Hive bucketed table. But Hive can read this table as a non-bucketed table."
371+
(Some(newHiveCompatibleMetastoreTable(serde)), message)
371372

372373
case Some(serde) =>
373374
val message =

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType
2323
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
2424
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
2525
import org.apache.spark.sql.hive.test.TestHiveSingleton
26-
import org.apache.spark.sql.internal.SQLConf
26+
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
2727
import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
2828
import org.apache.spark.sql.types._
2929

@@ -284,4 +284,40 @@ class DataSourceWithHiveMetastoreCatalogSuite
284284
}
285285

286286
}
287+
288+
test("SPARK-27592 set the bucketed data source table SerDe correctly") {
289+
val provider = "parquet"
290+
withTable("t") {
291+
spark.sql(
292+
s"""
293+
|CREATE TABLE t
294+
|USING $provider
295+
|CLUSTERED BY (c1)
296+
|SORTED BY (c1)
297+
|INTO 2 BUCKETS
298+
|AS SELECT 1 AS c1, 2 AS c2
299+
""".stripMargin)
300+
301+
val metadata = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default")))
302+
303+
val hiveSerDe = HiveSerDe.sourceToSerDe(provider).get
304+
assert(metadata.storage.serde === hiveSerDe.serde)
305+
assert(metadata.storage.inputFormat === hiveSerDe.inputFormat)
306+
assert(metadata.storage.outputFormat === hiveSerDe.outputFormat)
307+
308+
// It's a bucketed table at Spark side
309+
assert(sql("DESC FORMATTED t").collect().containsSlice(
310+
Seq(Row("Num Buckets", "2", ""), Row("Bucket Columns", "[`c1`]", ""))
311+
))
312+
checkAnswer(table("t"), Row(1, 2))
313+
314+
// It's not a bucketed table at Hive side
315+
val hiveSide = sparkSession.metadataHive.runSqlHive("DESC FORMATTED t")
316+
assert(hiveSide.contains("Num Buckets: \t-1 \t "))
317+
assert(hiveSide.contains("Bucket Columns: \t[] \t "))
318+
assert(hiveSide.contains("\tspark.sql.sources.schema.numBuckets\t2 "))
319+
assert(hiveSide.contains("\tspark.sql.sources.schema.bucketCol.0\tc1 "))
320+
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\t2"))
321+
}
322+
}
287323
}

0 commit comments

Comments
 (0)