Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1e647ee
Share code to check column name duplication
maropu Apr 25, 2017
4467077
Apply reviews
maropu Jun 13, 2017
33ab217
Make code more consistent
maropu Jun 13, 2017
d8efb9d
Apply review comments
maropu Jun 16, 2017
11d1818
Apply xiao's reviews
maropu Jun 16, 2017
22e1e4f
Apply more xiao's reviews
maropu Jun 17, 2017
743a069
Replace map with foreach
maropu Jun 20, 2017
f6eab2d
Add tests for data schema + parititon schema
maropu Jun 20, 2017
09da8d6
Drop name dplication checks in HiveMetastoreCatalog.scala
maropu Jun 20, 2017
6d03f31
Modify exception messages
maropu Jun 20, 2017
a0b9b05
Revert logic to check name duplication
maropu Jun 20, 2017
91b6424
Add tests for write paths
maropu Jun 21, 2017
37ad3f3
Add tests for stream sink paths
maropu Jun 21, 2017
d0d9d3e
Burhs up code and adds more tests
maropu Jun 25, 2017
cbe9c71
Apply reviews
maropu Jun 26, 2017
c69270f
Apply more comments
maropu Jun 27, 2017
af959f6
Add more tests in create.sql
maropu Jun 27, 2017
8d3e10a
Move duplication checks in constructor
maropu Jun 29, 2017
9b386d5
Brush up code
maropu Jun 30, 2017
a878510
[WIP] Add DataSourceValidator trait to validate schema in write path
maropu Jul 3, 2017
be20127
Revert "Brush up code"
maropu Jul 3, 2017
f41bf80
Fix more issues
maropu Jul 4, 2017
0526391
Revert DataSourceValidator
maropu Jul 4, 2017
9e199bc
Add the check for external relation providers
maropu Jul 4, 2017
1ae132d
[WIP] Handle DataSource name duplication in one place
maropu Jul 5, 2017
5c29a75
Fix more
maropu Jul 6, 2017
5ed2c0d
Move some tests to DDLSuite
maropu Jul 7, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Share code to check column name duplication
  • Loading branch information
maropu committed Jul 6, 2017
commit 1e647ee9c779d5d60fcadfe70a3dac05d48d8ccc
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ case class DataSource(
(dataSchema ++ partitionSchema).map(_.name), "in the data schema and the partition schema",
sparkSession.sessionState.conf.caseSensitiveAnalysis)

SchemaUtils.checkSchemaColumnNameDuplication(
dataSchema, "datasource", sparkSession.sessionState.conf.caseSensitiveAnalysis)

(dataSchema, partitionSchema)
}

Expand Down Expand Up @@ -334,6 +337,9 @@ case class DataSource(
"It must be specified manually")
}

SchemaUtils.checkSchemaColumnNameDuplication(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are we checking here? both user-specified schema and inferred schema should have been checked

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I'll remove (I wrongly reverted this entry, too...)

dataSchema, "datasource", sparkSession.sessionState.conf.caseSensitiveAnalysis)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here


HadoopFsRelation(
fileCatalog,
partitionSchema = fileCatalog.partitionSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ abstract class JsonDataSource extends Serializable {
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): Option[StructType] = {
if (inputPaths.nonEmpty) {
val jsonSchema = infer(sparkSession, inputPaths, parsedOptions)
checkConstraints(jsonSchema)
Some(jsonSchema)
Some(infer(sparkSession, inputPaths, parsedOptions))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need to check duplication here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do not need to check the duplication in each datasource because it will be checked in DataSource. https://github.com/apache/spark/pull/17758/files#diff-7a6cb188d2ae31eb3347b5629a679cecR187

} else {
None
}
Expand All @@ -71,17 +69,6 @@ abstract class JsonDataSource extends Serializable {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType

/** Constraints to be imposed on schema to be stored. */
private def checkConstraints(schema: StructType): Unit = {
if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
}.mkString(", ")
throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
s"cannot save to JSON format")
}
}
}

object JsonDataSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.types.{AtomicType, StructType}
import org.apache.spark.sql.util.SchemaUtils

/**
* Try to replaces [[UnresolvedRelation]]s if the plan is for direct query on files.
Expand Down Expand Up @@ -222,12 +223,10 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
}

private def normalizeCatalogTable(schema: StructType, table: CatalogTable): CatalogTable = {
val columnNames = if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
schema.map(_.name)
} else {
schema.map(_.name.toLowerCase)
}
checkDuplication(columnNames, "table definition of " + table.identifier)
SchemaUtils.checkSchemaColumnNameDuplication(
schema,
"table definition of " + table.identifier,
sparkSession.sessionState.conf.caseSensitiveAnalysis)

val normalizedPartCols = normalizePartitionColumns(schema, table)
val normalizedBucketSpec = normalizeBucketSpec(schema, table)
Expand All @@ -253,7 +252,10 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
partCols = table.partitionColumnNames,
resolver = sparkSession.sessionState.conf.resolver)

checkDuplication(normalizedPartitionCols, "partition")
SchemaUtils.checkColumnNameDuplication(
normalizedPartitionCols,
"partition",
sparkSession.sessionState.conf.caseSensitiveAnalysis)

if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
if (DDLUtils.isHiveTable(table)) {
Expand Down Expand Up @@ -283,8 +285,16 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
tableCols = schema.map(_.name),
bucketSpec = bucketSpec,
resolver = sparkSession.sessionState.conf.resolver)
checkDuplication(normalizedBucketSpec.bucketColumnNames, "bucket")
checkDuplication(normalizedBucketSpec.sortColumnNames, "sort")

val caseSensitiveAnalysis = sparkSession.sessionState.conf.caseSensitiveAnalysis
SchemaUtils.checkColumnNameDuplication(
normalizedBucketSpec.bucketColumnNames,
"bucket",
caseSensitiveAnalysis)
SchemaUtils.checkColumnNameDuplication(
normalizedBucketSpec.sortColumnNames,
"sort",
caseSensitiveAnalysis)

normalizedBucketSpec.sortColumnNames.map(schema(_)).map(_.dataType).foreach {
case dt if RowOrdering.isOrderable(dt) => // OK
Expand All @@ -297,15 +307,6 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
}
}

private def checkDuplication(colNames: Seq[String], colType: String): Unit = {
if (colNames.distinct.length != colNames.length) {
val duplicateColumns = colNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => x
}
failAnalysis(s"Found duplicate column(s) in $colType: ${duplicateColumns.mkString(", ")}")
}
}

private def failAnalysis(msg: String) = throw new AnalysisException(msg)
}

Expand Down
51 changes: 51 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/util/SchemaUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.util

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.types.StructType


/**
* Utils for handling schemas.
*
* TODO: Merge this file with [[org.apache.spark.ml.util.SchemaUtils]].
*/
private[spark] object SchemaUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename the file (SchemaUtil) also as SchemaUtils?


def checkSchemaColumnNameDuplication(
schema: StructType, colType: String, caseSensitiveAnalysis: Boolean = false): Unit = {
checkColumnNameDuplication(schema.map(_.name), colType, caseSensitiveAnalysis)
}

def checkColumnNameDuplication(
names: Seq[String], colType: String, caseSensitiveAnalysis: Boolean = false): Unit = {
val colNames = if (caseSensitiveAnalysis) {
Copy link
Contributor

@wzhfy wzhfy Jun 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to add tests for case sensitivity.

names
} else {
names.map(_.toLowerCase)
}
if (colNames.distinct.length != colNames.length) {
val duplicateColumns = colNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
}
throw new AnalysisException(s"Found duplicate column(s) in $colType: " +
duplicateColumns.mkString(", "))
}
}
}
5 changes: 5 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Check name duplication in a regular case
CREATE TABLE t (c STRING, c INT) USING parquet;
Copy link
Contributor

@wzhfy wzhfy Jun 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to end-to-end tests, we can also add a suite for SchemaUtils and put the case sensitivity cases in it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added SchemaUtilsSuite.


-- Check multiple name duplication
CREATE TABLE t (c0 STRING, c1 INT, c1 DOUBLE, c0 INT) USING parquet;
20 changes: 20 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/create.sql.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 2


-- !query 0
CREATE TABLE t (c STRING, c INT) USING parquet
-- !query 0 schema
struct<>
-- !query 0 output
org.apache.spark.sql.AnalysisException
Found duplicate column(s) in table definition of `t`: "c";


-- !query 1
CREATE TABLE t (c0 STRING, c1 INT, c1 DOUBLE, c0 INT) USING parquet
-- !query 1 schema
struct<>
-- !query 1 output
org.apache.spark.sql.AnalysisException
Found duplicate column(s) in table definition of `t`: "c1", "c0";
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,13 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
val e = intercept[AnalysisException] {
sql("CREATE TABLE tbl(a int, a string) USING json")
}
assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
assert(e.message == """Found duplicate column(s) in table definition of `tbl`: "a"""")

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
val e2 = intercept[AnalysisException] {
sql("CREATE TABLE tbl(a int, A string) USING json")
}
assert(e2.message == "Found duplicate column(s) in table definition of `tbl`: a")
assert(e2.message == """Found duplicate column(s) in table definition of `tbl`: "a"""")
}
}

Expand All @@ -469,14 +469,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
val e = intercept[AnalysisException] {
sql("CREATE TABLE tbl(a int) USING json PARTITIONED BY (a, a)")
}
assert(e.message == "Found duplicate column(s) in partition: a")
assert(e.message == """Found duplicate column(s) in partition: "a"""")
}

test("create table - column repeated in bucket columns") {
val e = intercept[AnalysisException] {
sql("CREATE TABLE tbl(a int) USING json CLUSTERED BY (a, a) INTO 4 BUCKETS")
}
assert(e.message == "Found duplicate column(s) in bucket: a")
assert(e.message == """Found duplicate column(s) in bucket: "a"""")
}

test("Refresh table after changing the data source table partitioning") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,4 +687,46 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
testRead(spark.read.schema(userSchemaString).text(dir, dir), data ++ data, userSchema)
testRead(spark.read.schema(userSchemaString).text(Seq(dir, dir): _*), data ++ data, userSchema)
}

test("SPARK-20460 Check name duplication in schema") {
withTempDir { src =>
val columnDuplicateSchema = StructType(
StructField("a", IntegerType) ::
StructField("a", IntegerType) ::
Nil)

// Check CSV format
Seq("a,a", "1,1").toDF().coalesce(1).write.mode("overwrite").text(src.toString)
val e1 = intercept[AnalysisException] {
spark.read.format("csv").schema(columnDuplicateSchema).option("header", false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is header option necessary?

.load(src.toString)
}
assert(e1.getMessage.contains("""Found duplicate column(s) in datasource: "a";"""))

// If `inferSchema` is true, a CSV format is duplicate-safe (See SPARK-16896)
val df = spark.read.format("csv").option("inferSchema", true).option("header", true)
.load(src.toString)
checkAnswer(df, Row(1, 1))

// Check JSON format
Seq("""{"a":1, "a":1}"""""").toDF().coalesce(1).write.mode("overwrite").text(src.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: duplicate """

val e2 = intercept[AnalysisException] {
spark.read.format("json").schema(columnDuplicateSchema).option("header", false)
.load(src.toString)
}
assert(e2.getMessage.contains("""Found duplicate column(s) in datasource: "a";"""))

val e3 = intercept[AnalysisException] {
spark.read.format("json").option("inferSchema", true).load(src.toString)
}
assert(e3.getMessage.contains("""Found duplicate column(s) in datasource: "a";"""))

// Check Paruqet format
Seq((1, 1)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet(src.toString)
val e4 = intercept[AnalysisException] {
spark.read.format("parquet").schema(columnDuplicateSchema).load(src.toString)
}
assert(e4.getMessage.contains("""Found duplicate column(s) in datasource: "a";"""))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils

/**
* Legacy catalog for interacting with the Hive metastore.
Expand Down Expand Up @@ -248,6 +249,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
if (inferenceMode == INFER_AND_SAVE) {
updateCatalogSchema(relation.tableMeta.identifier, schema)
}

SchemaUtils.checkSchemaColumnNameDuplication(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please double check whether Hive allows duplicate column names?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked in hive-v2.0.1

hive> CREATE TABLE t(a INT, b INT, a INT);
FAILED: SemanticException [Error 10036]: Duplicate column name: a
hive> CREATE TABLE t(a INT, b INT, A INT);
FAILED: SemanticException [Error 10036]: Duplicate column name: a

Also, I checked in the Hive doc: Table names and column names are case insensitive but SerDe and property names are case sensitive.
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test case to cover this check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked code again and it seems I found no column duplication happen here if the hive catalog has no name duplication. mergeWithMetastoreSchema merges a catalog schema with an inferred one on the catalog names, so IIUC this does not lead to the duplication. So, we could safely remove this check?

schema, "hive serde table", sparkSession.sessionState.conf.caseSensitiveAnalysis)

(schema, relation.tableMeta.copy(schema = schema))
case None =>
logWarning(s"Unable to infer schema for table $tableName from file format " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class HiveDDLSuite
val e = intercept[AnalysisException] {
sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)")
}
assert(e.message == "Found duplicate column(s) in table definition of `default`.`tbl`: a")
assert(e.message == """Found duplicate column(s) in table definition of `default`.`tbl`: "a"""")
}

test("add/drop partition with location - managed table") {
Expand Down