Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils, DateTimeUtils}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
import org.apache.spark.sql.hive.client.HiveClient
Expand Down Expand Up @@ -429,12 +429,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val properties = new mutable.HashMap[String, String]

properties.put(CREATED_SPARK_VERSION, table.createVersion)

// This is for backward compatibility to Spark 2 to read tables with char/varchar created by
// Spark 3.1. At read side, we will restore a table schema from its properties. So, we need to
// clear the `varchar(n)` and `char(n)` and replace them with `string` as Spark 2 does not have
// a type mapping for them in `DataType.nameToType`.
// See `restoreHiveSerdeTable` for example.
val newSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema)
// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val schemaJsonString = schema.json
val schemaJsonString = newSchema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
Expand Down Expand Up @@ -745,7 +750,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// If this is a view created by Spark 2.2 or higher versions, we should restore its schema
// from table properties.
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
table = table.copy(schema = getSchemaFromTableProperties(table))
val newSchema = CharVarcharUtils.getRawSchema(getSchemaFromTableProperties(table))
table = table.copy(schema = newSchema)
}

// No provider in table properties, which means this is a Hive serde table.
Expand Down Expand Up @@ -796,7 +802,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
// schema from table properties.
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
val schemaFromTableProps = getSchemaFromTableProperties(table)
val schemaFromTableProps =
CharVarcharUtils.getRawSchema(getSchemaFromTableProperties(table))
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

Expand Down Expand Up @@ -836,7 +843,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)

val schemaFromTableProps = getSchemaFromTableProperties(table)
val schemaFromTableProps = CharVarcharUtils.getRawSchema(getSchemaFromTableProperties(table))
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

Expand Down