Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
Expand Down Expand Up @@ -309,7 +309,7 @@ case class AlterTableChangeColumnCommand(
columnName: String,
newColumn: StructField) extends RunnableCommand {

// TODO: support change column name/dataType/metadata/position.
// TODO: support change column name/metadata/position.
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
Expand All @@ -318,18 +318,25 @@ case class AlterTableChangeColumnCommand(

// Find the origin column from dataSchema by column name.
val originColumn = findColumnByName(table.dataSchema, columnName, resolver)
// Throw an AnalysisException if the column name/dataType is changed.
if (!columnEqual(originColumn, newColumn, resolver)) {
// Throw an AnalysisException if the column name is changed or type change is incompatible.
if (!columnCheck(originColumn, newColumn, resolver)) {
throw new AnalysisException(
"ALTER TABLE CHANGE COLUMN is not supported for changing column " +
s"'${originColumn.name}' with type '${originColumn.dataType}' to " +
s"'${newColumn.name}' with type '${newColumn.dataType}'")
}

val typeChanged = originColumn.dataType != newColumn.dataType

val newDataSchema = table.dataSchema.fields.map { field =>
if (field.name == originColumn.name) {
// Create a new column from the origin column with the new comment.
addComment(field, newColumn.getComment)
// Add the comment to a column, if comment is empty, return the original column.
val newField = newColumn.getComment().map(field.withComment).getOrElse(field)
if (typeChanged) {
newField.copy(dataType = newColumn.dataType)
} else {
newField
}
} else {
field
}
Expand All @@ -350,16 +357,11 @@ case class AlterTableChangeColumnCommand(
s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
}

// Add the comment to a column, if comment is empty, return the original column.
private def addComment(column: StructField, comment: Option[String]): StructField = {
comment.map(column.withComment(_)).getOrElse(column)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we need data conversion (e.g., from ing to double?) in binary formats (parquet and orc)? Also, What happens if we get incompatible type changes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for advise, I should also check the type compatible, add in ef65c4d.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we need to comply with the Hive behaivour. Is the current fix (by casting) the same with Hive?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your question, actually that's also what I'm consider during do the compatible check. Hive do this column type change work in HiveAlterHandler and the detailed compatible check is in ColumnType. You can see in the ColumnType checking work, it actually use the canCast semantic to judge compatible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. Thanks for the check. btw, have you checked if this could work correctly?


sql("""CREATE TABLE t(a INT, b STRING, c INT) using parquet""")
sql("""INSERT INTO t VALUES (1, 'a', 3)""")
sql("""ALTER TABLE t CHANGE a a STRING""")
spark.table("t").show
org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///Users/maropu/Repositories/spark/spark-master/spark-warehouse/t/part-00000-93ddfd05-690a-480c-8cc5-fd0981206fc3-c000.snappy.parquet. Column: [a], Expected: string, Found: INT32
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:192)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.s
    ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, in this pr, we need an additional logic to cast input data into a changed type in catalog when reading....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advise!
I look into this in these days. With currently implement, all behavior comply with Hive(Support type change/Work well in non binary format/Exception in binary format like orc and parquet). Is it ok to add a config for constraint this?

The work of adding logic to cast input data into changed type in catalog may need modifying 4 parts logic including vectorized reader and row reader in parquet and orc. If we don't agree the currently behavior, I'll keep following these.

Item Behavior
Parquet Row Reader ClassCastException in SpecificInternalRow.set${Type}
Parquet Vectorized Reader SchemaColumnConvertNotSupportedException in VectorizedColumnReader.read${Type}Batch
Orc Row Reader ClassCastException in OrcDeserializer.newWriter
Orc Vectorized Reader NullPointerException in OrcColumnVector get value by type method

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the check!. I think we don't always need to comply with the Hive behaivour and an understandable behaivour for users is the best.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pinging me, @maropu .

// Compare a [[StructField]] to another, return true if they have the same column
// name(by resolver) and dataType.
private def columnEqual(
// name(by resolver) and dataType and data type compatible.
private def columnCheck(
field: StructField, other: StructField, resolver: Resolver): Boolean = {
resolver(field.name, other.name) && field.dataType == other.dataType
resolver(field.name, other.name) && Cast.canCast(field.dataType, other.dataType)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ DESC test_change;
ALTER TABLE test_change CHANGE a a1 INT;
DESC test_change;

-- Change column dataType (not supported yet)
-- Change column dataType
ALTER TABLE test_change CHANGE a a STRING;
DESC test_change;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,15 @@ ALTER TABLE test_change CHANGE a a STRING
-- !query 4 schema
struct<>
-- !query 4 output
org.apache.spark.sql.AnalysisException
ALTER TABLE CHANGE COLUMN is not supported for changing column 'a' with type 'IntegerType' to 'a' with type 'StringType';



-- !query 5
DESC test_change
-- !query 5 schema
struct<col_name:string,data_type:string,comment:string>
-- !query 5 output
a int
a string
b string
c int

Expand Down Expand Up @@ -91,7 +90,7 @@ DESC test_change
-- !query 8 schema
struct<col_name:string,data_type:string,comment:string>
-- !query 8 output
a int
a string
b string
c int

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1697,6 +1697,27 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 INT COMMENT 'this is col1'")
assert(getMetadata("col1").getString("key") == "value")
assert(getMetadata("col1").getString("comment") == "this is col1")

// Ensure that changing column type takes effect
sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 STRING")
val column = catalog.getTableMetadata(tableIdent).schema.fields.find(_.name == "col1")
assert(column.get.dataType == StringType)

// Ensure that changing partition column type throw exception
var msg = intercept[AnalysisException] {
sql("ALTER TABLE dbx.tab1 CHANGE COLUMN a a STRING")
}
assert(msg.getMessage.startsWith(
"Can't find column `a` given table data columns"))

withTable("t") {
sql("CREATE TABLE t(s STRUCT<a:INT, b:STRING>) USING PARQUET")
msg = intercept[AnalysisException]{
sql("ALTER TABLE t CHANGE COLUMN s s INT")
}
assert(msg.getMessage.startsWith(
"ALTER TABLE CHANGE COLUMN is not supported for changing column "))
}
}

test("drop build-in function") {
Expand Down