Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
prevent replacing existing column with a different data type
- add method in `SchemaUtils` to comparte two schemas and raise an exception if columns with the same name have different data types
- enable this check in `verifyColumnsToAddReplace` to prevent `alter table replace columns` to replace  existing column with a column having a different data type
  • Loading branch information
manu-olx committed Jun 5, 2019
commit 8b6da23e3deb10f9be85566cff819fca5596faa2
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,33 @@ private[spark] object SchemaUtils {
s"Found duplicate column(s) $colType: ${duplicateColumns.mkString(", ")}")
}
}

/**
* Checks if the two provided schemas have columns with matching names. If yes, also the
* data type has to match otherwise an exception is raised
*
* @param schemaOne first schema to compare
* @param schemaTwo second schema to compare
* @param resolver resolver used to determine if two identifiers are equal
*/
def checkDataTypeMatchesForSameColumnName(
schemaOne: StructType, schemaTwo: StructType, resolver: Resolver): Unit = {
checkDataTypeMatchesForSameColumnName(schemaOne, schemaTwo, isCaseSensitiveAnalysis(resolver))
}

def checkDataTypeMatchesForSameColumnName(
schemaOne: StructType, schemaTwo: StructType, caseSensitiveAnalysis: Boolean): Unit = {
for (s1 <- schemaOne; s2 <- schemaTwo) {
// scalastyle:off caselocale
val schemaOneColumName = if (caseSensitiveAnalysis) s1.name else s1.name.toLowerCase
val schemaTwoColumName = if (caseSensitiveAnalysis) s2.name else s2.name.toLowerCase
// scalastyle:on caselocale

if (schemaOneColumName == schemaTwoColumName & s1.dataType != s2.dataType) {
throw new AnalysisException(
s"Column `$schemaOneColumName` type doesn't match between schemas ($s1 <> $s2)")
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,48 @@ class SchemaUtilsSuite extends SparkFunSuite {

checkNoExceptionCases("a INT, b INT, c INT", caseSensitive = false)
}

test(s"Test checkDataTypeMatchesForSameColumnName") {
def compareSchemas(schema1_str: String, schema2_str: String,
caseSensitive: Boolean, shouldRaiseException: Boolean): Unit = {
val schema1 = StructType.fromDDL(schema1_str)
val schema2 = StructType.fromDDL(schema2_str)

if (shouldRaiseException) {
val msg = intercept[AnalysisException] {
SchemaUtils.checkDataTypeMatchesForSameColumnName(schema1, schema2, caseSensitive)
}.getMessage
assert(msg.contains("type doesn't match between schemas"))
}
else SchemaUtils.checkDataTypeMatchesForSameColumnName(schema1, schema2, caseSensitive)

}
// pass when datatype is the same
compareSchemas("a int, b string", "a int, B string",
caseSensitive = false, shouldRaiseException = false)
compareSchemas("a int, b string, B int", "a int, b string, B int",
caseSensitive = true, shouldRaiseException = false)

// fail when there's at least one mismatch
compareSchemas("a int, b string", "a string, b string",
caseSensitive = false, shouldRaiseException = true)
compareSchemas("a int, b string", "a int, b string, B int",
caseSensitive = false, shouldRaiseException = true)

// work as expected when schemas structures differ
compareSchemas("a int, b string", "c string, D int, A int",
caseSensitive = true, shouldRaiseException = false)
compareSchemas("a int, b string", "b string",
caseSensitive = false, shouldRaiseException = false)
compareSchemas("a int, b string", "B string",
caseSensitive = false, shouldRaiseException = false)
compareSchemas("a int, b string", "a string",
caseSensitive = true, shouldRaiseException = true)
compareSchemas("a int, b string", "A string",
caseSensitive = false, shouldRaiseException = true)
compareSchemas("a int", "a int, A string",
caseSensitive = true, shouldRaiseException = false)
compareSchemas("b string", "b string, B int",
caseSensitive = false, shouldRaiseException = true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ case class AlterTableRenameCommand(

abstract class AlterTableAddReplaceColumnsCommandsBase extends RunnableCommand {
/**
* Ensure the columns to add/replace meet requirements.
* Ensure the columns to add/replace meet requirements:
* - columns to add should not have conflicting names with existing columns
* - columns to replace should have distinct names
* - if a column to replace exists already in the table, the data type has to match
* - column names have to match the given datasource format specifications
*/
protected def verifyColumnsToAddReplace(
table: TableIdentifier,
Expand All @@ -188,6 +192,11 @@ abstract class AlterTableAddReplaceColumnsCommandsBase extends RunnableCommand {
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)

SchemaUtils.checkDataTypeMatchesForSameColumnName(
StructType(colsToVerify),
catalogTable.dataSchema,
conf.caseSensitiveAnalysis)

DDLUtils.checkDataColNames(catalogTable, colsToVerify.map(_.name))
}

Expand Down