Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1e647ee
Share code to check column name duplication
maropu Apr 25, 2017
4467077
Apply reviews
maropu Jun 13, 2017
33ab217
Make code more consistent
maropu Jun 13, 2017
d8efb9d
Apply review comments
maropu Jun 16, 2017
11d1818
Apply xiao's reviews
maropu Jun 16, 2017
22e1e4f
Apply more xiao's reviews
maropu Jun 17, 2017
743a069
Replace map with foreach
maropu Jun 20, 2017
f6eab2d
Add tests for data schema + parititon schema
maropu Jun 20, 2017
09da8d6
Drop name dplication checks in HiveMetastoreCatalog.scala
maropu Jun 20, 2017
6d03f31
Modify exception messages
maropu Jun 20, 2017
a0b9b05
Revert logic to check name duplication
maropu Jun 20, 2017
91b6424
Add tests for write paths
maropu Jun 21, 2017
37ad3f3
Add tests for stream sink paths
maropu Jun 21, 2017
d0d9d3e
Burhs up code and adds more tests
maropu Jun 25, 2017
cbe9c71
Apply reviews
maropu Jun 26, 2017
c69270f
Apply more comments
maropu Jun 27, 2017
af959f6
Add more tests in create.sql
maropu Jun 27, 2017
8d3e10a
Move duplication checks in constructor
maropu Jun 29, 2017
9b386d5
Brush up code
maropu Jun 30, 2017
a878510
[WIP] Add DataSourceValidator trait to validate schema in write path
maropu Jul 3, 2017
be20127
Revert "Brush up code"
maropu Jul 3, 2017
f41bf80
Fix more issues
maropu Jul 4, 2017
0526391
Revert DataSourceValidator
maropu Jul 4, 2017
9e199bc
Add the check for external relation providers
maropu Jul 4, 2017
1ae132d
[WIP] Handle DataSource name duplication in one place
maropu Jul 5, 2017
5c29a75
Fix more
maropu Jul 6, 2017
5ed2c0d
Move some tests to DDLSuite
maropu Jul 7, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Apply review comments
  • Loading branch information
maropu committed Jul 6, 2017
commit d8efb9d17fbc75d4d756f66bd84a3d9697ee0c0b
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.sql.util

import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.types.StructType


Expand All @@ -30,22 +33,26 @@ private[spark] object SchemaUtils {

def checkSchemaColumnNameDuplication(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the function description and parameter description?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

schema: StructType, colType: String, caseSensitiveAnalysis: Boolean = false): Unit = {
checkColumnNameDuplication(schema.map(_.name), colType, caseSensitiveAnalysis)
val resolver = if (caseSensitiveAnalysis) {
caseSensitiveResolution
} else {
caseInsensitiveResolution
}
checkColumnNameDuplication(schema.map(_.name), colType, resolver)
}

def checkColumnNameDuplication(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

names: Seq[String], colType: String, caseSensitiveAnalysis: Boolean = false): Unit = {
val colNames = if (caseSensitiveAnalysis) {
names
} else {
names.map(_.toLowerCase)
}
if (colNames.distinct.length != colNames.length) {
val duplicateColumns = colNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
names: Seq[String], colType: String, resolver: Resolver): Unit = {
val duplicateColumns = mutable.ArrayBuffer[String]()
names.foreach { name =>
val sameColNames = names.filter(resolver(_, name))
if (sameColNames.size > 1 && !duplicateColumns.exists(resolver(_, name))) {
duplicateColumns.append(name)
}
}
if (duplicateColumns.size > 0) {
throw new AnalysisException(s"Found duplicate column(s) in $colType: " +
duplicateColumns.mkString(", "))
duplicateColumns.map(colName => s""""$colName"""").mkString(", "))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.util

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.types.StructType

class SchemaUtilsSuite extends SparkFunSuite {
Expand All @@ -33,49 +34,49 @@ class SchemaUtilsSuite extends SparkFunSuite {
assert(msg1.contains("""Found duplicate column(s) in SchemaUtilsSuite: "a";"""))
val msg2 = intercept[AnalysisException] {
SchemaUtils.checkColumnNameDuplication(
"a" :: "b" :: "a" :: Nil, "SchemaUtilsSuite", caseSensitiveAnalysis = true)
"a" :: "b" :: "a" :: Nil, "SchemaUtilsSuite", caseSensitiveResolution)
}.getMessage
assert(msg2.contains("""Found duplicate column(s) in SchemaUtilsSuite: "a";"""))

// Check no exception thrown
SchemaUtils.checkSchemaColumnNameDuplication(
StructType.fromDDL("a INT, b INT, c INT"), "SchemaUtilsSuite", caseSensitiveAnalysis = true)
SchemaUtils.checkColumnNameDuplication(
"a" :: "b" :: "c" :: Nil, "SchemaUtilsSuite", caseSensitiveAnalysis = true)
"a" :: "b" :: "c" :: Nil, "SchemaUtilsSuite", caseSensitiveResolution)
SchemaUtils.checkSchemaColumnNameDuplication(
StructType.fromDDL("Aa INT, b INT, aA INT"), "SchemaUtilsSuite", caseSensitiveAnalysis = true)
SchemaUtils.checkColumnNameDuplication(
"Aa" :: "b" :: "aA" :: Nil, "SchemaUtilsSuite", caseSensitiveAnalysis = true)
"Aa" :: "b" :: "aA" :: Nil, "SchemaUtilsSuite", caseSensitiveResolution)

// Case-insensitive case
val msg3 = intercept[AnalysisException] {
SchemaUtils.checkSchemaColumnNameDuplication(
StructType.fromDDL("Aa INT, b INT, Aa INT"), "SchemaUtilsSuite",
caseSensitiveAnalysis = false)
}.getMessage
assert(msg3.contains("""Found duplicate column(s) in SchemaUtilsSuite: "aa";"""))
assert(msg3.contains("""Found duplicate column(s) in SchemaUtilsSuite: "Aa";"""))
val msg4 = intercept[AnalysisException] {
SchemaUtils.checkColumnNameDuplication(
"Aa" :: "b" :: "Aa" :: Nil, "SchemaUtilsSuite", caseSensitiveAnalysis = false)
"Aa" :: "b" :: "Aa" :: Nil, "SchemaUtilsSuite", caseInsensitiveResolution)
}.getMessage
assert(msg4.contains("""Found duplicate column(s) in SchemaUtilsSuite: "aa";"""))
assert(msg4.contains("""Found duplicate column(s) in SchemaUtilsSuite: "Aa";"""))

val msg5 = intercept[AnalysisException] {
SchemaUtils.checkSchemaColumnNameDuplication(
StructType.fromDDL("a INT, bB INT, Bb INT"), "SchemaUtilsSuite",
caseSensitiveAnalysis = false)
}.getMessage
assert(msg5.contains("""Found duplicate column(s) in SchemaUtilsSuite: "bb";"""))
assert(msg5.contains("""Found duplicate column(s) in SchemaUtilsSuite: "bB";"""))
val msg6 = intercept[AnalysisException] {
SchemaUtils.checkColumnNameDuplication(
"a" :: "bB" :: "Bb" :: Nil, "SchemaUtilsSuite", caseSensitiveAnalysis = false)
"a" :: "bB" :: "Bb" :: Nil, "SchemaUtilsSuite", caseInsensitiveResolution)
}.getMessage
assert(msg6.contains("""Found duplicate column(s) in SchemaUtilsSuite: "bb";"""))
assert(msg6.contains("""Found duplicate column(s) in SchemaUtilsSuite: "bB";"""))

// Check no exception thrown
SchemaUtils.checkSchemaColumnNameDuplication(
StructType.fromDDL("a INT, b INT, c INT"), "SchemaUtilsSuite", caseSensitiveAnalysis = false)
SchemaUtils.checkColumnNameDuplication(
"a" :: "b" :: "c" :: Nil, "SchemaUtilsSuite", caseSensitiveAnalysis = false)
"a" :: "b" :: "c" :: Nil, "SchemaUtilsSuite", caseInsensitiveResolution)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,7 @@ case class AlterTableAddPartitionCommand(
spec,
table.partitionColumnNames,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
sparkSession.sessionState.conf.resolver)
// inherit table storage format (possibly except for location)
CatalogTablePartition(normalizedSpec, table.storage.copy(
locationUri = location.map(CatalogUtils.stringToURI)))
Expand Down Expand Up @@ -481,15 +480,13 @@ case class AlterTableRenamePartitionCommand(
oldPartition,
table.partitionColumnNames,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
sparkSession.sessionState.conf.resolver)

val normalizedNewPartition = PartitioningUtils.normalizePartitionSpec(
newPartition,
table.partitionColumnNames,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
sparkSession.sessionState.conf.resolver)

catalog.renamePartitions(
tableName, Seq(normalizedOldPartition), Seq(normalizedNewPartition))
Expand Down Expand Up @@ -531,8 +528,7 @@ case class AlterTableDropPartitionCommand(
spec,
table.partitionColumnNames,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
sparkSession.sessionState.conf.resolver)
}

catalog.dropPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,7 @@ case class TruncateTableCommand(
spec,
partCols,
table.identifier.quotedString,
spark.sessionState.conf.resolver,
spark.sessionState.conf.caseSensitiveAnalysis)
spark.sessionState.conf.resolver)
}
val partLocations =
catalog.listPartitions(table.identifier, normalizedSpec).map(_.storage.locationUri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ object PartitioningUtils {
partitionSpec: Map[String, T],
partColNames: Seq[String],
tblName: String,
resolver: Resolver,
caseSensitiveAnalysis: Boolean): Map[String, T] = {
resolver: Resolver): Map[String, T] = {
val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) =>
val normalizedKey = partColNames.find(resolver(_, key)).getOrElse {
throw new AnalysisException(s"$key is not a valid partition column in table $tblName.")
Expand All @@ -304,7 +303,7 @@ object PartitioningUtils {
}

SchemaUtils.checkColumnNameDuplication(
normalizedPartSpec.map(_._1), "partition specification", caseSensitiveAnalysis)
normalizedPartSpec.map(_._1), "partition specification", resolver)

normalizedPartSpec.toMap
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,8 @@ object JdbcUtils extends Logging {
val nameEquality = df.sparkSession.sessionState.conf.resolver

// checks duplicate columns in the user specified column types.
SchemaUtils.checkSchemaColumnNameDuplication(userSchema, "createTableColumnTypes option value")
SchemaUtils.checkColumnNameDuplication(
userSchema.map(_.name), "createTableColumnTypes option value", nameEquality)

// checks if user specified column names exist in the DataFrame schema
userSchema.fieldNames.foreach { col =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
SchemaUtils.checkColumnNameDuplication(
normalizedPartitionCols,
"partition",
sparkSession.sessionState.conf.caseSensitiveAnalysis)
sparkSession.sessionState.conf.resolver)

if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
if (DDLUtils.isHiveTable(table)) {
Expand Down Expand Up @@ -286,15 +286,14 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
bucketSpec = bucketSpec,
resolver = sparkSession.sessionState.conf.resolver)

val caseSensitiveAnalysis = sparkSession.sessionState.conf.caseSensitiveAnalysis
SchemaUtils.checkColumnNameDuplication(
normalizedBucketSpec.bucketColumnNames,
"bucket",
caseSensitiveAnalysis)
sparkSession.sessionState.conf.resolver)
SchemaUtils.checkColumnNameDuplication(
normalizedBucketSpec.sortColumnNames,
"sort",
caseSensitiveAnalysis)
sparkSession.sessionState.conf.resolver)

normalizedBucketSpec.sortColumnNames.map(schema(_)).map(_.dataType).foreach {
case dt if RowOrdering.isOrderable(dt) => // OK
Expand Down Expand Up @@ -323,7 +322,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
partColNames: Seq[String]): InsertIntoTable = {

val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec(
insert.partition, partColNames, tblName, conf.resolver, conf.caseSensitiveAnalysis)
insert.partition, partColNames, tblName, conf.resolver)

val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet
val expectedColumns = insert.table.output.filterNot(a => staticPartCols.contains(a.name))
Expand Down
8 changes: 5 additions & 3 deletions sql/core/src/test/resources/sql-tests/inputs/create.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
-- Check name duplication in a regular case
CREATE TABLE t (c STRING, c INT) USING parquet;
-- Catch case-sensitive name duplication
SET spark.sql.caseSensitive=true;

-- Check multiple name duplication
CREATE TABLE t (c0 STRING, c1 INT, c1 DOUBLE, c0 INT) USING parquet;

-- Catch case-insensitive name duplication
SET spark.sql.caseSensitive=false;

CREATE TABLE t (c0 STRING, c1 INT, c1 DOUBLE, c0 INT) USING parquet;

CREATE TABLE t (ab STRING, cd INT, ef DOUBLE, Ab INT) USING parquet;
22 changes: 15 additions & 7 deletions sql/core/src/test/resources/sql-tests/results/create.sql.out
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 4
-- Number of queries: 5


-- !query 0
CREATE TABLE t (c STRING, c INT) USING parquet
SET spark.sql.caseSensitive=true
-- !query 0 schema
struct<>
struct<key:string,value:string>
-- !query 0 output
org.apache.spark.sql.AnalysisException
Found duplicate column(s) in table definition of `t`: "c";
spark.sql.caseSensitive true


-- !query 1
Expand All @@ -17,7 +16,7 @@ CREATE TABLE t (c0 STRING, c1 INT, c1 DOUBLE, c0 INT) USING parquet
struct<>
-- !query 1 output
org.apache.spark.sql.AnalysisException
Found duplicate column(s) in table definition of `t`: "c1", "c0";
Found duplicate column(s) in table definition of `t`: "c0", "c1";


-- !query 2
Expand All @@ -29,9 +28,18 @@ spark.sql.caseSensitive false


-- !query 3
CREATE TABLE t (ab STRING, cd INT, ef DOUBLE, Ab INT) USING parquet
CREATE TABLE t (c0 STRING, c1 INT, c1 DOUBLE, c0 INT) USING parquet
-- !query 3 schema
struct<>
-- !query 3 output
org.apache.spark.sql.AnalysisException
Found duplicate column(s) in table definition of `t`: "c0", "c1";


-- !query 4
CREATE TABLE t (ab STRING, cd INT, ef DOUBLE, Ab INT) USING parquet
-- !query 4 schema
struct<>
-- !query 4 output
org.apache.spark.sql.AnalysisException
Found duplicate column(s) in table definition of `t`: "ab";