Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
52ca902
alter_add_col: initial changes
xwu0226 Nov 21, 2016
f498fa6
add testcases
xwu0226 Dec 1, 2016
522443e
negative testcases
xwu0226 Dec 1, 2016
1af2654
remove non-support testcase
xwu0226 Dec 5, 2016
ec57ee9
fix testcase
xwu0226 Dec 5, 2016
ec74849
update testcases
xwu0226 Dec 7, 2016
8fca889
update testcases
xwu0226 Dec 7, 2016
4a17529
update testcases
xwu0226 Jan 13, 2017
9699128
comments for command caseclass
xwu0226 Jan 20, 2017
9860e5c
udate comments based on review
xwu0226 Jan 21, 2017
dfff364
SPARK-19261: update to support datasource table and add new testcases
xwu0226 Feb 3, 2017
9f23254
remove workaournd for parquet issue since parquet-1.8.2 is now supported
xwu0226 Feb 4, 2017
180092f
SPARK-19261: using white list for datasource table types that support…
xwu0226 Feb 7, 2017
5a8aa80
fix code style
xwu0226 Feb 7, 2017
d3860e6
fix coding style
xwu0226 Feb 7, 2017
55577aa
update upon review
xwu0226 Feb 24, 2017
6fa913a
refactor code from alterTable function
xwu0226 Feb 25, 2017
7231efe
rebase and resolve conflict
xwu0226 Mar 6, 2017
e4e9ecf
resolve conflicts
xwu0226 Mar 9, 2017
75e7441
using ExternalCatalog.alterTableSchema
xwu0226 Mar 14, 2017
9847030
add InMemoryCatalog testcases
xwu0226 Mar 15, 2017
1a383bb
revert change in HiveExernalCatalog.scala
xwu0226 Mar 15, 2017
f994ce9
update upon review
xwu0226 Mar 16, 2017
5bf7360
add checking for duplicate column names
xwu0226 Mar 16, 2017
599c45e
add case sensativity for duplicate name checking and new testcases
xwu0226 Mar 16, 2017
b3edfea
typo
xwu0226 Mar 16, 2017
7d8a515
resolve conflicts and modify testcases
xwu0226 Mar 17, 2017
e895278
update testcases
xwu0226 Mar 17, 2017
e171ac4
move checkduplicate and schema arrangement to SessionCatalog.alterTab…
xwu0226 Mar 17, 2017
4391edd
change SessionCatalog.alterTableAddColumn back to alterTableSchema
xwu0226 Mar 18, 2017
a3fef12
update upon review comments
xwu0226 Mar 18, 2017
1eb7cd3
some minor updates upon review comments
xwu0226 Mar 19, 2017
04ce8f4
update based on review
xwu0226 Mar 21, 2017
7d8437d
update on minor comments
xwu0226 Mar 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
revert change in HiveExernalCatalog.scala
  • Loading branch information
xwu0226 committed Mar 19, 2017
commit 1a383bb82d9b037d910867d36614d44c24fdbbf3
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.lang.reflect.InvocationTargetException
import java.util

import scala.collection.mutable
import scala.collection.Map
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
Expand All @@ -47,6 +46,7 @@ import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types.{DataType, StructType}


/**
* A persistent implementation of the system catalog using Hive.
* All public methods must be synchronized for thread-safety.
Expand Down Expand Up @@ -511,57 +511,94 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* Note: As of now, this doesn't support altering table schema, partition column names and bucket
* specification. We will ignore them even if users do specify different values for these fields.
*/
override def alterTable(newTableDefinition: CatalogTable): Unit = withClient {
assert(newTableDefinition.identifier.database.isDefined)
val db = newTableDefinition.identifier.database.get
requireTableExists(db, newTableDefinition.identifier.table)
verifyTableProperties(newTableDefinition)
override def alterTable(tableDefinition: CatalogTable): Unit = withClient {
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
requireTableExists(db, tableDefinition.identifier.table)
verifyTableProperties(tableDefinition)

// convert table statistics to properties so that we can persist them through hive api
val withStatsProps = populateStatsProps(newTableDefinition)
val withStatsProps = if (tableDefinition.stats.isDefined) {
val stats = tableDefinition.stats.get
var statsProperties: Map[String, String] =
Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
if (stats.rowCount.isDefined) {
statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
}
stats.colStats.foreach { case (colName, colStat) =>
colStat.toMap.foreach { case (k, v) =>
statsProperties += (columnStatKeyPropName(colName, k) -> v)
}
}
tableDefinition.copy(properties = tableDefinition.properties ++ statsProperties)
} else {
tableDefinition
}

if (newTableDefinition.tableType == VIEW) {
if (tableDefinition.tableType == VIEW) {
client.alterTable(withStatsProps)
} else {
val oldRawTableDef = getRawTable(db, newTableDefinition.identifier.table)

// restore the table metadata in spark sql format for comparing with the input
// table metadata that is also in spark sql format
val oldRestoredTableDef = restoreTableMetadata(oldRawTableDef)

val (newSchema, dataSourceProps) =
if (!oldRestoredTableDef.schema.equals(newTableDefinition.schema)) {
val props =
tableMetaToTableProps(newTableDefinition).filter(_._1.startsWith(DATASOURCE_PREFIX))
if (newTableDefinition.provider.isDefined &&
newTableDefinition.provider.get.toLowerCase != DDLUtils.HIVE_PROVIDER) {
// we only need to populate non-hive provider to the tableprops
props.put(DATASOURCE_PROVIDER, newTableDefinition.provider.get)
}
(newTableDefinition.schema, props)
val oldTableDef = getRawTable(db, withStatsProps.identifier.table)

val newStorage = if (DDLUtils.isHiveTable(tableDefinition)) {
tableDefinition.storage
} else {
// We can't alter the table storage of data source table directly for 2 reasons:
// 1. internally we use path option in storage properties to store the value of table
// location, but the given `tableDefinition` is from outside and doesn't have the path
// option, we need to add it manually.
// 2. this data source table may be created on a file, not a directory, then we can't set
// the `locationUri` field and save it to Hive metastore, because Hive only allows
// directory as table location.
//
// For example, an external data source table is created with a single file '/path/to/file'.
// Internally, we will add a path option with value '/path/to/file' to storage properties,
// and set the `locationUri` to a special value due to SPARK-15269(please see
// `saveTableIntoHive` for more details). When users try to get the table metadata back, we
// will restore the `locationUri` field from the path option and remove the path option from
// storage properties. When users try to alter the table storage, the given
// `tableDefinition` will have `locationUri` field with value `/path/to/file` and the path
// option is not set.
//
// Here we need 2 extra steps:
// 1. add path option to storage properties, to match the internal format, i.e. using path
// option to store the value of table location.
// 2. set the `locationUri` field back to the old one from the existing table metadata,
// if users don't want to alter the table location. This step is necessary as the
// `locationUri` is not always same with the path option, e.g. in the above example
// `locationUri` is a special value and we should respect it. Note that, if users
// want to alter the table location to a file path, we will fail. This should be fixed
// in the future.

val newLocation = tableDefinition.storage.locationUri.map(CatalogUtils.URIToString(_))
val storageWithPathOption = tableDefinition.storage.copy(
properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _))

val oldLocation = getLocationFromStorageProps(oldTableDef)
if (oldLocation == newLocation) {
storageWithPathOption.copy(locationUri = oldTableDef.storage.locationUri)
} else {
// maintain the original format of the table schema
(oldRawTableDef.schema,
oldRawTableDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX)))
storageWithPathOption
}
}

val partitionProviderProp = if (newTableDefinition.tracksPartitionsInCatalog) {
val partitionProviderProp = if (tableDefinition.tracksPartitionsInCatalog) {
TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_CATALOG
} else {
TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_FILESYSTEM
}

// Sets the `partitionColumnNames` and `bucketSpec` from the old table definition,
// Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
// to retain the spark specific format if it is. Also add old data source properties to table
// properties, to retain the data source table format.
val newTableProps =
dataSourceProps ++ withStatsProps.properties + partitionProviderProp
val newDef = oldRestoredTableDef.copy(
storage = newStorageForAlterTable(newTableDefinition, oldRawTableDef),
schema = newSchema,
partitionColumnNames = oldRawTableDef.partitionColumnNames,
bucketSpec = oldRawTableDef.bucketSpec,
properties = newTableProps.toMap)
val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp
val newDef = withStatsProps.copy(
storage = newStorage,
schema = oldTableDef.schema,
partitionColumnNames = oldTableDef.partitionColumnNames,
bucketSpec = oldTableDef.bucketSpec,
properties = newTableProps)

client.alterTable(newDef)
}
Expand All @@ -586,72 +623,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}

private def newStorageForAlterTable(
newTableDef: CatalogTable,
oldRawTableDef: CatalogTable): CatalogStorageFormat = {
if (DDLUtils.isHiveTable(newTableDef)) {
newTableDef.storage
} else {
// We can't alter the table storage of data source table directly for 2 reasons:
// 1. internally we use path option in storage properties to store the value of table
// location, but the given `tableDefinition` is from outside and doesn't have the path
// option, we need to add it manually.
// 2. this data source table may be created on a file, not a directory, then we can't set
// the `locationUri` field and save it to Hive metastore, because Hive only allows
// directory as table location.
//
// For example, an external data source table is created with a single file '/path/to/file'.
// Internally, we will add a path option with value '/path/to/file' to storage properties,
// and set the `locationUri` to a special value due to SPARK-15269(please see
// `saveTableIntoHive` for more details). When users try to get the table metadata back, we
// will restore the `locationUri` field from the path option and remove the path option from
// storage properties. When users try to alter the table storage, the given
// `tableDefinition` will have `locationUri` field with value `/path/to/file` and the path
// option is not set.
//
// Here we need 2 extra steps:
// 1. add path option to storage properties, to match the internal format, i.e. using path
// option to store the value of table location.
// 2. set the `locationUri` field back to the old one from the existing table metadata,
// if users don't want to alter the table location. This step is necessary as the
// `locationUri` is not always same with the path option, e.g. in the above example
// `locationUri` is a special value and we should respect it. Note that, if users
// want to alter the table location to a file path, we will fail. This should be fixed
// in the future.

val newLocation = newTableDef.storage.locationUri.map(CatalogUtils.URIToString(_))
val storageWithPathOption = newTableDef.storage.copy(
properties = newTableDef.storage.properties ++ newLocation.map("path" -> _))

val oldLocation = getLocationFromStorageProps(oldRawTableDef)
if (oldLocation == newLocation) {
storageWithPathOption.copy(locationUri = oldRawTableDef.storage.locationUri)
} else {
storageWithPathOption
}
}
}

private def populateStatsProps (newTableDefinition: CatalogTable): CatalogTable = {
// convert table statistics to properties so that we can persist them through hive api
if (newTableDefinition.stats.isDefined) {
val stats = newTableDefinition.stats.get
var statsProperties: Map[String, String] =
Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
if (stats.rowCount.isDefined) {
statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
}
stats.colStats.foreach { case (colName, colStat) =>
colStat.toMap.foreach { case (k, v) =>
statsProperties += (columnStatKeyPropName(colName, k) -> v)
}
}
newTableDefinition.copy(properties = newTableDefinition.properties ++ statsProperties)
} else {
newTableDefinition
}
}

override def getTable(db: String, table: String): CatalogTable = withClient {
restoreTableMetadata(getRawTable(db, table))
}
Expand Down