-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20460][SQL] Make it more consistent to handle column name duplication #17758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 26 commits
1e647ee
4467077
33ab217
d8efb9d
11d1818
22e1e4f
743a069
f6eab2d
09da8d6
6d03f31
a0b9b05
91b6424
37ad3f3
d0d9d3e
cbe9c71
c69270f
af959f6
8d3e10a
9b386d5
a878510
be20127
f41bf80
0526391
9e199bc
1ae132d
5c29a75
5ed2c0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,37 +17,73 @@ | |
|
|
||
| package org.apache.spark.sql.util | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.analysis._ | ||
| import org.apache.spark.sql.types.StructType | ||
|
|
||
|
|
||
| /** | ||
| * Utils for handling schemas. | ||
| * | ||
| * TODO: Merge this file with [[org.apache.spark.ml.util.SchemaUtils]]. | ||
| */ | ||
| private[spark] object SchemaUtils extends Logging { | ||
| private[spark] object SchemaUtils { | ||
|
|
||
| /** | ||
| * Checks if input column names have duplicate identifiers. Prints a warning message if | ||
| * Checks if an input schema has duplicate column names. This throws an exception if the | ||
| * duplication exists. | ||
| * | ||
| * @param schema schema to check | ||
| * @param colType column type name, used in an exception message | ||
| * @param caseSensitiveAnalysis whether duplication checks should be case sensitive or not | ||
| */ | ||
| def checkSchemaColumnNameDuplication( | ||
| schema: StructType, colType: String, caseSensitiveAnalysis: Boolean = false): Unit = { | ||
| checkColumnNameDuplication(schema.map(_.name), colType, caseSensitiveAnalysis) | ||
| } | ||
|
|
||
| // Returns true if a given resolver is case-sensitive | ||
| private def isCaseSensitiveAnalysis(resolver: Resolver): Boolean = { | ||
| if (resolver == caseSensitiveResolution) { | ||
| true | ||
| } else if (resolver == caseInsensitiveResolution) { | ||
| false | ||
| } else { | ||
| sys.error("A resolver to check if two identifiers are equal must be " + | ||
| "`caseSensitiveResolution` or `caseInsensitiveResolution` in o.a.s.sql.catalyst.") | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Checks if input column names have duplicate identifiers. This throws an exception if | ||
| * the duplication exists. | ||
| * | ||
| * @param columnNames column names to check | ||
| * @param colType column type name, used in a warning message | ||
| * @param colType column type name, used in an exception message | ||
| * @param resolver resolver used to determine if two identifiers are equal | ||
| */ | ||
| def checkColumnNameDuplication( | ||
|
||
| columnNames: Seq[String], colType: String, resolver: Resolver): Unit = { | ||
| checkColumnNameDuplication(columnNames, colType, isCaseSensitiveAnalysis(resolver)) | ||
| } | ||
|
|
||
| /** | ||
| * Checks if input column names have duplicate identifiers. This throws an exception if | ||
| * the duplication exists. | ||
| * | ||
| * @param columnNames column names to check | ||
| * @param colType column type name, used in an exception message | ||
| * @param caseSensitiveAnalysis whether duplication checks should be case sensitive or not | ||
| */ | ||
| def checkColumnNameDuplication( | ||
| columnNames: Seq[String], colType: String, caseSensitiveAnalysis: Boolean): Unit = { | ||
| val names = if (caseSensitiveAnalysis) { | ||
| columnNames | ||
| } else { | ||
| columnNames.map(_.toLowerCase) | ||
| } | ||
| val names = if (caseSensitiveAnalysis) columnNames else columnNames.map(_.toLowerCase) | ||
| if (names.distinct.length != names.length) { | ||
| val duplicateColumns = names.groupBy(identity).collect { | ||
| case (x, ys) if ys.length > 1 => s"`$x`" | ||
| } | ||
| logWarning(s"Found duplicate column(s) $colType: ${duplicateColumns.mkString(", ")}. " + | ||
| "You might need to assign different column names.") | ||
| throw new AnalysisException( | ||
| s"Found duplicate column(s) $colType: ${duplicateColumns.mkString(", ")}") | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.util | ||
|
|
||
| import org.apache.spark.SparkFunSuite | ||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.analysis._ | ||
| import org.apache.spark.sql.types.StructType | ||
|
|
||
| class SchemaUtilsSuite extends SparkFunSuite { | ||
|
|
||
| private def resolver(caseSensitiveAnalysis: Boolean): Resolver = { | ||
| if (caseSensitiveAnalysis) { | ||
| caseSensitiveResolution | ||
| } else { | ||
| caseInsensitiveResolution | ||
| } | ||
| } | ||
|
|
||
| Seq((true, ("a", "a"), ("b", "b")), (false, ("a", "A"), ("b", "B"))).foreach { | ||
| case (caseSensitive, (a0, a1), (b0, b1)) => | ||
|
|
||
| val testType = if (caseSensitive) "case-sensitive" else "case-insensitive" | ||
| test(s"Check column name duplication in $testType cases") { | ||
| def checkExceptionCases(schemaStr: String, duplicatedColumns: Seq[String]): Unit = { | ||
| val expectedErrorMsg = "Found duplicate column(s) in SchemaUtilsSuite: " + | ||
| duplicatedColumns.map(c => s"`${c.toLowerCase}`").mkString(", ") | ||
| val schema = StructType.fromDDL(schemaStr) | ||
| var msg = intercept[AnalysisException] { | ||
| SchemaUtils.checkSchemaColumnNameDuplication( | ||
| schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive) | ||
| }.getMessage | ||
| assert(msg.contains(expectedErrorMsg)) | ||
| msg = intercept[AnalysisException] { | ||
| SchemaUtils.checkColumnNameDuplication( | ||
| schema.map(_.name), "in SchemaUtilsSuite", resolver(caseSensitive)) | ||
| }.getMessage | ||
| assert(msg.contains(expectedErrorMsg)) | ||
| msg = intercept[AnalysisException] { | ||
| SchemaUtils.checkColumnNameDuplication( | ||
| schema.map(_.name), "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive) | ||
| }.getMessage | ||
| assert(msg.contains(expectedErrorMsg)) | ||
| } | ||
|
|
||
| checkExceptionCases(s"$a0 INT, b INT, $a1 INT", a0 :: Nil) | ||
| checkExceptionCases(s"$a0 INT, b INT, $a1 INT, $a0 INT", a0 :: Nil) | ||
| checkExceptionCases(s"$a0 INT, $b0 INT, $a1 INT, $a0 INT, $b1 INT", b0 :: a0 :: Nil) | ||
| } | ||
| } | ||
|
|
||
| test("Check no exception thrown for valid schemas") { | ||
| def checkNoExceptionCases(schemaStr: String, caseSensitive: Boolean): Unit = { | ||
| val schema = StructType.fromDDL(schemaStr) | ||
| SchemaUtils.checkSchemaColumnNameDuplication( | ||
| schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive) | ||
| SchemaUtils.checkColumnNameDuplication( | ||
| schema.map(_.name), "in SchemaUtilsSuite", resolver(caseSensitive)) | ||
| SchemaUtils.checkColumnNameDuplication( | ||
| schema.map(_.name), "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive) | ||
| } | ||
|
|
||
| checkNoExceptionCases("a INT, b INT, c INT", caseSensitive = true) | ||
| checkNoExceptionCases("Aa INT, b INT, aA INT", caseSensitive = true) | ||
|
|
||
| checkNoExceptionCases("a INT, b INT, c INT", caseSensitive = false) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -87,6 +87,14 @@ case class DataSource( | |
| lazy val providingClass: Class[_] = DataSource.lookupDataSource(className) | ||
| lazy val sourceInfo: SourceInfo = sourceSchema() | ||
| private val caseInsensitiveOptions = CaseInsensitiveMap(options) | ||
| private val equality = sparkSession.sessionState.conf.resolver | ||
|
|
||
| bucketSpec.map { bucket => | ||
| SchemaUtils.checkColumnNameDuplication( | ||
| bucket.bucketColumnNames, "in the bucket column(s)", equality) | ||
| SchemaUtils.checkColumnNameDuplication( | ||
| bucket.sortColumnNames, "in the sort column(s)", equality) | ||
| } | ||
|
|
||
|
||
| /** | ||
| * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer | ||
|
|
@@ -132,7 +140,6 @@ case class DataSource( | |
| // Try to infer partitioning, because no DataSource in the read path provides the partitioning | ||
| // columns properly unless it is a Hive DataSource | ||
| val resolved = tempFileIndex.partitionSchema.map { partitionField => | ||
| val equality = sparkSession.sessionState.conf.resolver | ||
| // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred | ||
| userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( | ||
| partitionField) | ||
|
|
@@ -146,7 +153,6 @@ case class DataSource( | |
| inferredPartitions | ||
| } else { | ||
| val partitionFields = partitionColumns.map { partitionColumn => | ||
| val equality = sparkSession.sessionState.conf.resolver | ||
| userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse { | ||
| val inferredPartitions = tempFileIndex.partitionSchema | ||
| val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn)) | ||
|
|
@@ -172,7 +178,6 @@ case class DataSource( | |
| } | ||
|
|
||
| val dataSchema = userSpecifiedSchema.map { schema => | ||
| val equality = sparkSession.sessionState.conf.resolver | ||
| StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name)))) | ||
| }.orElse { | ||
| format.inferSchema( | ||
|
|
@@ -184,9 +189,18 @@ case class DataSource( | |
| s"Unable to infer schema for $format. It must be specified manually.") | ||
| } | ||
|
|
||
| SchemaUtils.checkColumnNameDuplication( | ||
| (dataSchema ++ partitionSchema).map(_.name), "in the data schema and the partition schema", | ||
| sparkSession.sessionState.conf.caseSensitiveAnalysis) | ||
| // We just print a waring message if the data schema and partition schema have the duplicate | ||
| // columns. This is because we allow users to do so in the previous Spark releases and | ||
| // we have the existing tests for the cases (e.g., `ParquetHadoopFsRelationSuite`). | ||
| // See SPARK-18108 and SPARK-21144 for related discussions. | ||
| try { | ||
| SchemaUtils.checkColumnNameDuplication( | ||
|
||
| (dataSchema ++ partitionSchema).map(_.name), | ||
| "in the data schema and the partition column(s)", | ||
| equality) | ||
| } catch { | ||
| case e: AnalysisException => logWarning(e.getMessage) | ||
| } | ||
|
|
||
| (dataSchema, partitionSchema) | ||
| } | ||
|
|
@@ -391,6 +405,23 @@ case class DataSource( | |
| s"$className is not a valid Spark SQL Data Source.") | ||
| } | ||
|
|
||
| relation match { | ||
| case hs: HadoopFsRelation => | ||
| SchemaUtils.checkColumnNameDuplication( | ||
| hs.dataSchema.map(_.name), | ||
| "in the data schema", | ||
| equality) | ||
| SchemaUtils.checkColumnNameDuplication( | ||
| hs.partitionSchema.map(_.name), | ||
| "in the partition column(s)", | ||
| equality) | ||
| case _ => | ||
|
||
| SchemaUtils.checkColumnNameDuplication( | ||
| relation.schema.map(_.name), | ||
| "in the data schema", | ||
| equality) | ||
| } | ||
|
|
||
| relation | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,6 @@ import java.io.IOException | |
|
|
||
| import org.apache.hadoop.fs.{FileSystem, Path} | ||
|
|
||
| import org.apache.spark.SparkContext | ||
| import org.apache.spark.internal.io.FileCommitProtocol | ||
| import org.apache.spark.sql._ | ||
| import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} | ||
|
|
@@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute | |
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.execution.SparkPlan | ||
| import org.apache.spark.sql.execution.command._ | ||
| import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} | ||
| import org.apache.spark.sql.util.SchemaUtils | ||
|
|
||
| /** | ||
| * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. | ||
|
|
@@ -64,13 +63,10 @@ case class InsertIntoHadoopFsRelationCommand( | |
| assert(children.length == 1) | ||
|
|
||
| // Most formats don't do well with duplicate columns, so lets not allow that | ||
| if (query.schema.fieldNames.length != query.schema.fieldNames.distinct.length) { | ||
| val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect { | ||
| case (x, ys) if ys.length > 1 => "\"" + x + "\"" | ||
| }.mkString(", ") | ||
| throw new AnalysisException(s"Duplicate column(s): $duplicateColumns found, " + | ||
| "cannot save to file.") | ||
| } | ||
| SchemaUtils.checkSchemaColumnNameDuplication( | ||
|
||
| query.schema, | ||
| s"when inserting into $outputPath", | ||
| sparkSession.sessionState.conf.caseSensitiveAnalysis) | ||
|
|
||
| val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) | ||
| val fs = outputPath.getFileSystem(hadoopConf) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add the function description and parameter description?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok