Skip to content

Commit 4c7349f

Browse files
committed
fix.
1 parent 7880909 commit 4c7349f

File tree

4 files changed

+97
-5
lines changed

4 files changed

+97
-5
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
21+
22+
/**
23+
* Options for the Parquet data source.
24+
*/
25+
class SourceOptions(
26+
@transient private val parameters: CaseInsensitiveMap[String])
27+
extends Serializable {
28+
import SourceOptions._
29+
30+
def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
31+
32+
// A flag to disable saving a data source table's metadata in hive compatible way.
33+
val skipHiveMetadata: Boolean = parameters
34+
.get(SKIP_HIVE_METADATA).map(_.toBoolean).getOrElse(DEFAULT_SKIP_HIVE_METADATA)
35+
36+
// A flag to always respect the Spark schema restored from the table properties
37+
val respectSparkSchema: Boolean = parameters
38+
.get(RESPECT_SPARK_SCHEMA).map(_.toBoolean).getOrElse(DEFAULT_RESPECT_SPARK_SCHEMA)
39+
}
40+
41+
42+
object SourceOptions {
43+
44+
val SKIP_HIVE_METADATA = "skipHiveMetadata"
45+
val DEFAULT_SKIP_HIVE_METADATA = false
46+
47+
val RESPECT_SPARK_SCHEMA = "respectSparkSchema"
48+
val DEFAULT_RESPECT_SPARK_SCHEMA = false
49+
50+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._
4141
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
4242
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
4343
import org.apache.spark.sql.execution.command.DDLUtils
44-
import org.apache.spark.sql.execution.datasources.PartitioningUtils
44+
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
4545
import org.apache.spark.sql.hive.client.HiveClient
4646
import org.apache.spark.sql.internal.HiveSerDe
4747
import org.apache.spark.sql.internal.StaticSQLConf._
@@ -260,6 +260,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
260260
private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
261261
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
262262
val provider = table.provider.get
263+
val options = new SourceOptions(table.storage.properties)
263264

264265
// To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
265266
// support, no column nullability, etc., we should do some extra works before saving table
@@ -325,11 +326,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
325326

326327
val qualifiedTableName = table.identifier.quotedString
327328
val maybeSerde = HiveSerDe.sourceToSerDe(provider)
328-
val skipHiveMetadata = table.storage.properties
329-
.getOrElse("skipHiveMetadata", "false").toBoolean
330329

331330
val (hiveCompatibleTable, logMessage) = maybeSerde match {
332-
case _ if skipHiveMetadata =>
331+
case _ if options.skipHiveMetadata =>
333332
val message =
334333
s"Persisting data source table $qualifiedTableName into Hive metastore in" +
335334
"Spark SQL specific format, which is NOT compatible with Hive."
@@ -722,6 +721,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
722721
}
723722

724723
private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
724+
val options = new SourceOptions(table.storage.properties)
725725
val hiveTable = table.copy(
726726
provider = Some(DDLUtils.HIVE_PROVIDER),
727727
tracksPartitionsInCatalog = true)
@@ -733,7 +733,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
733733
val partColumnNames = getPartitionColumnsFromTableProperties(table)
734734
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
735735

736-
if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) {
736+
if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema) ||
737+
options.respectSparkSchema) {
737738
hiveTable.copy(
738739
schema = reorderedSchema,
739740
partitionColumnNames = partColumnNames,
203 Bytes
Binary file not shown.

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,47 @@ class VersionsSuite extends SparkFunSuite with Logging {
763763
}
764764
}
765765

766+
test(s"$version: read avro file containing decimal") {
767+
val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal")
768+
val location = new File(url.getFile)
769+
770+
val tableName = "tab1"
771+
val avroSchema =
772+
"""{
773+
| "name": "test_record",
774+
| "type": "record",
775+
| "fields": [ {
776+
| "name": "f0",
777+
| "type": [
778+
| "null",
779+
| {
780+
| "precision": 38,
781+
| "scale": 2,
782+
| "type": "bytes",
783+
| "logicalType": "decimal"
784+
| }
785+
| ]
786+
| } ]
787+
|}
788+
""".stripMargin
789+
withTable(tableName) {
790+
versionSpark.sql(
791+
s"""
792+
|CREATE TABLE $tableName
793+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
794+
|WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
795+
|STORED AS
796+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
797+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
798+
|LOCATION '$location'
799+
|TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
800+
""".stripMargin
801+
)
802+
assert(versionSpark.table(tableName).collect() ===
803+
versionSpark.sql("SELECT 1.30").collect())
804+
}
805+
}
806+
766807
// TODO: add more tests.
767808
}
768809
}

0 commit comments

Comments
 (0)