diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 019718cc53a9..1e43f2cc62ae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils, DateTimeUtils} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions} import org.apache.spark.sql.hive.client.HiveClient @@ -429,12 +429,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val properties = new mutable.HashMap[String, String] properties.put(CREATED_SPARK_VERSION, table.createVersion) - + // This is for backward compatibility to Spark 2 to read tables with char/varchar created by + // Spark 3.1. At read side, we will restore a table schema from its properties. So, we need to + // clear the `varchar(n)` and `char(n)` and replace them with `string` as Spark 2 does not have + // a type mapping for them in `DataType.nameToType`. + // See `restoreHiveSerdeTable` for example. + val newSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema) // Serialized JSON schema string may be too long to be stored into a single metastore table // property. In this case, we split the JSON string and store each part as a separate table // property. val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) - val schemaJsonString = schema.json + val schemaJsonString = newSchema.json // Split the JSON string. val parts = schemaJsonString.grouped(threshold).toSeq properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) @@ -745,7 +750,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // If this is a view created by Spark 2.2 or higher versions, we should restore its schema // from table properties. if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) { - table = table.copy(schema = getSchemaFromTableProperties(table)) + val newSchema = CharVarcharUtils.getRawSchema(getSchemaFromTableProperties(table)) + table = table.copy(schema = newSchema) } // No provider in table properties, which means this is a Hive serde table. @@ -796,7 +802,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its // schema from table properties. if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) { - val schemaFromTableProps = getSchemaFromTableProperties(table) + val schemaFromTableProps = + CharVarcharUtils.getRawSchema(getSchemaFromTableProperties(table)) val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) @@ -836,7 +843,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap) val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) - val schemaFromTableProps = getSchemaFromTableProperties(table) + val schemaFromTableProps = CharVarcharUtils.getRawSchema(getSchemaFromTableProperties(table)) val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)