Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-37452][SQL][BACKPORT][3.1] Char and Varchar break backward com…
…patibility between v3.1 and v2
  • Loading branch information
yaooqinn committed Nov 29, 2021
commit cb0e544aa814682bac9f174febf23cc59f47cc0e
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive

import java.io.IOException
import java.lang.reflect.InvocationTargetException

import java.util
import java.util.Locale

Expand All @@ -40,7 +41,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils, DateTimeUtils}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
import org.apache.spark.sql.hive.client.HiveClient
Expand Down Expand Up @@ -429,12 +430,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val properties = new mutable.HashMap[String, String]

properties.put(CREATED_SPARK_VERSION, table.createVersion)

// This is for backward compatibility to Spark 2 to read tables with char/varchar created by
// Spark 3.1. At read side, we will restore a table schema from its properties. So, we need to
// clear the `varchar(n)` and `char(n)` and replace them with `string` as Spark 2 does not have
// a type mapping for them in `DataType.nameToType`.
// See `restoreHiveSerdeTable` for example.
val newSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema)
// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val schemaJsonString = schema.json
val schemaJsonString = newSchema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
Expand Down Expand Up @@ -745,7 +751,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// If this is a view created by Spark 2.2 or higher versions, we should restore its schema
// from table properties.
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
table = table.copy(schema = getSchemaFromTableProperties(table))
val newSchema = CharVarcharUtils.getRawSchema(getSchemaFromTableProperties(table))
table = table.copy(schema = newSchema)
}

// No provider in table properties, which means this is a Hive serde table.
Expand Down Expand Up @@ -796,7 +803,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
// schema from table properties.
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
val schemaFromTableProps = getSchemaFromTableProperties(table)
val schemaFromTableProps =
CharVarcharUtils.getRawSchema(getSchemaFromTableProperties(table))
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

Expand Down Expand Up @@ -836,7 +844,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)

val schemaFromTableProps = getSchemaFromTableProperties(table)
val schemaFromTableProps = CharVarcharUtils.getRawSchema(getSchemaFromTableProperties(table))
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

Expand Down