diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala new file mode 100644 index 000000000000..c98c0b2a756a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap + +/** + * Options for the data source. + */ +class SourceOptions( + @transient private val parameters: CaseInsensitiveMap[String]) + extends Serializable { + import SourceOptions._ + + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + + // A flag to disable saving a data source table's metadata in hive compatible way. + val skipHiveMetadata: Boolean = parameters + .get(SKIP_HIVE_METADATA).map(_.toBoolean).getOrElse(DEFAULT_SKIP_HIVE_METADATA) + + // A flag to always respect the Spark schema restored from the table properties + val respectSparkSchema: Boolean = parameters + .get(RESPECT_SPARK_SCHEMA).map(_.toBoolean).getOrElse(DEFAULT_RESPECT_SPARK_SCHEMA) +} + + +object SourceOptions { + + val SKIP_HIVE_METADATA = "skipHiveMetadata" + val DEFAULT_SKIP_HIVE_METADATA = false + + val RESPECT_SPARK_SCHEMA = "respectSparkSchema" + val DEFAULT_RESPECT_SPARK_SCHEMA = false + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 547447b31f0a..0c6121c2b896 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.ColumnStat import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions} import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.internal.StaticSQLConf._ @@ -260,6 +260,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = { // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`. val provider = table.provider.get + val options = new SourceOptions(table.storage.properties) // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type // support, no column nullability, etc., we should do some extra works before saving table @@ -325,11 +326,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val qualifiedTableName = table.identifier.quotedString val maybeSerde = HiveSerDe.sourceToSerDe(provider) - val skipHiveMetadata = table.storage.properties - .getOrElse("skipHiveMetadata", "false").toBoolean val (hiveCompatibleTable, logMessage) = maybeSerde match { - case _ if skipHiveMetadata => + case _ if options.skipHiveMetadata => val message = s"Persisting data source table $qualifiedTableName into Hive metastore in" + "Spark SQL specific format, which is NOT compatible with Hive." @@ -722,6 +721,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = { + val options = new SourceOptions(table.storage.properties) val hiveTable = table.copy( provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog = true) @@ -733,7 +733,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) - if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) { + if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema) || + options.respectSparkSchema) { hiveTable.copy( schema = reorderedSchema, partitionColumnNames = partColumnNames, diff --git a/sql/hive/src/test/resources/avroDecimal/decimal.avro b/sql/hive/src/test/resources/avroDecimal/decimal.avro new file mode 100755 index 000000000000..6da423f78661 Binary files /dev/null and b/sql/hive/src/test/resources/avroDecimal/decimal.avro differ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 072e538b9ed5..cbbe86940372 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -763,6 +763,47 @@ class VersionsSuite extends SparkFunSuite with Logging { } } + test(s"$version: read avro file containing decimal") { + val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") + val location = new File(url.getFile) + + val tableName = "tab1" + val avroSchema = + """{ + | "name": "test_record", + | "type": "record", + | "fields": [ { + | "name": "f0", + | "type": [ + | "null", + | { + | "precision": 38, + | "scale": 2, + | "type": "bytes", + | "logicalType": "decimal" + | } + | ] + | } ] + |} + """.stripMargin + withTable(tableName) { + versionSpark.sql( + s""" + |CREATE TABLE $tableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$location' + |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema') + """.stripMargin + ) + assert(versionSpark.table(tableName).collect() === + versionSpark.sql("SELECT 1.30").collect()) + } + } + // TODO: add more tests. } }