Skip to content

Commit 267160b

Browse files
gengliangwangHyukjinKwon
authored andcommitted
[SPARK-27269][SQL] File source v2 should validate data schema only
## What changes were proposed in this pull request? Currently, File source v2 allows each data source to specify the supported data types by implementing the method `supportsDataType` in `FileScan` and `FileWriteBuilder`. However, in the read path, the validation checks all the data types in `readSchema`, which might contain partition columns. This is actually a regression. E.g. Text data source only supports String data type, while the partition columns can still contain Integer type since partition columns are processed by Spark. This PR is to: 1. Refactor schema validation and check data schema only. 2. Filter the partition columns in data schema if user specified schema provided. ## How was this patch tested? Unit test Closes apache#24203 from gengliangwang/schemaValidation. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 5624bfb commit 267160b

File tree

12 files changed

+166
-107
lines changed

12 files changed

+166
-107
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ package org.apache.spark.sql.execution.datasources.v2
1818

1919
import org.apache.hadoop.fs.Path
2020

21-
import org.apache.spark.sql.{AnalysisException, SparkSession}
21+
import org.apache.spark.sql.SparkSession
2222
import org.apache.spark.sql.execution.PartitionedFileUtil
2323
import org.apache.spark.sql.execution.datasources._
2424
import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan}
25-
import org.apache.spark.sql.types.{DataType, StructType}
25+
import org.apache.spark.sql.types.StructType
2626
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2727

2828
abstract class FileScan(
@@ -37,22 +37,6 @@ abstract class FileScan(
3737
false
3838
}
3939

40-
/**
41-
* Returns whether this format supports the given [[DataType]] in write path.
42-
* By default all data types are supported.
43-
*/
44-
def supportsDataType(dataType: DataType): Boolean = true
45-
46-
/**
47-
* The string that represents the format that this data source provider uses. This is
48-
* overridden by children to provide a nice alias for the data source. For example:
49-
*
50-
* {{{
51-
* override def formatName(): String = "ORC"
52-
* }}}
53-
*/
54-
def formatName: String
55-
5640
protected def partitions: Seq[FilePartition] = {
5741
val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty)
5842
val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions)
@@ -76,13 +60,5 @@ abstract class FileScan(
7660
partitions.toArray
7761
}
7862

79-
override def toBatch: Batch = {
80-
readSchema.foreach { field =>
81-
if (!supportsDataType(field.dataType)) {
82-
throw new AnalysisException(
83-
s"$formatName data source does not support ${field.dataType.catalogString} data type.")
84-
}
85-
}
86-
this
87-
}
63+
override def toBatch: Batch = this
8864
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
2424
import org.apache.spark.sql.execution.datasources._
2525
import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability}
2626
import org.apache.spark.sql.sources.v2.TableCapability._
27-
import org.apache.spark.sql.types.StructType
27+
import org.apache.spark.sql.types.{DataType, StructType}
2828
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2929
import org.apache.spark.sql.util.SchemaUtils
3030

@@ -46,7 +46,11 @@ abstract class FileTable(
4646
sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache)
4747
}
4848

49-
lazy val dataSchema: StructType = userSpecifiedSchema.orElse {
49+
lazy val dataSchema: StructType = userSpecifiedSchema.map { schema =>
50+
val partitionSchema = fileIndex.partitionSchema
51+
val resolver = sparkSession.sessionState.conf.resolver
52+
StructType(schema.filterNot(f => partitionSchema.exists(p => resolver(p.name, f.name))))
53+
}.orElse {
5054
inferSchema(fileIndex.allFiles())
5155
}.getOrElse {
5256
throw new AnalysisException(
@@ -57,6 +61,12 @@ abstract class FileTable(
5761
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
5862
SchemaUtils.checkColumnNameDuplication(dataSchema.fieldNames,
5963
"in the data schema", caseSensitive)
64+
dataSchema.foreach { field =>
65+
if (!supportsDataType(field.dataType)) {
66+
throw new AnalysisException(
67+
s"$formatName data source does not support ${field.dataType.catalogString} data type.")
68+
}
69+
}
6070
val partitionSchema = fileIndex.partitionSchema
6171
SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames,
6272
"in the partition schema", caseSensitive)
@@ -72,6 +82,22 @@ abstract class FileTable(
7282
* Spark will require that user specify the schema manually.
7383
*/
7484
def inferSchema(files: Seq[FileStatus]): Option[StructType]
85+
86+
/**
87+
* Returns whether this format supports the given [[DataType]] in read/write path.
88+
* By default all data types are supported.
89+
*/
90+
def supportsDataType(dataType: DataType): Boolean = true
91+
92+
/**
93+
* The string that represents the format that this data source provider uses. This is
94+
* overridden by children to provide a nice alias for the data source. For example:
95+
*
96+
* {{{
97+
* override def formatName(): String = "ORC"
98+
* }}}
99+
*/
100+
def formatName: String
75101
}
76102

77103
object FileTable {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
3939
import org.apache.spark.sql.util.SchemaUtils
4040
import org.apache.spark.util.SerializableConfiguration
4141

42-
abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String])
42+
abstract class FileWriteBuilder(
43+
options: CaseInsensitiveStringMap,
44+
paths: Seq[String],
45+
formatName: String,
46+
supportsDataType: DataType => Boolean)
4347
extends WriteBuilder with SupportsSaveMode {
4448
private var schema: StructType = _
4549
private var queryId: String = _
@@ -108,22 +112,6 @@ abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[St
108112
options: Map[String, String],
109113
dataSchema: StructType): OutputWriterFactory
110114

111-
/**
112-
* Returns whether this format supports the given [[DataType]] in write path.
113-
* By default all data types are supported.
114-
*/
115-
def supportsDataType(dataType: DataType): Boolean = true
116-
117-
/**
118-
* The string that represents the format that this data source provider uses. This is
119-
* overridden by children to provide a nice alias for the data source. For example:
120-
*
121-
* {{{
122-
* override def formatName(): String = "ORC"
123-
* }}}
124-
*/
125-
def formatName: String
126-
127115
private def validateInputs(caseSensitiveAnalysis: Boolean): Unit = {
128116
assert(schema != null, "Missing input data schema")
129117
assert(queryId != null, "Missing query ID")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.sql.execution.datasources.FileFormat
2020
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
2121
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
2222
import org.apache.spark.sql.sources.v2.Table
23-
import org.apache.spark.sql.types._
23+
import org.apache.spark.sql.types.StructType
2424
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2525

2626
class CSVDataSourceV2 extends FileDataSourceV2 {
@@ -41,13 +41,3 @@ class CSVDataSourceV2 extends FileDataSourceV2 {
4141
CSVTable(tableName, sparkSession, options, paths, Some(schema))
4242
}
4343
}
44-
45-
object CSVDataSourceV2 {
46-
def supportsDataType(dataType: DataType): Boolean = dataType match {
47-
case _: AtomicType => true
48-
49-
case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)
50-
51-
case _ => false
52-
}
53-
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,4 @@ case class CSVScan(
7575
CSVPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
7676
dataSchema, fileIndex.partitionSchema, readSchema, parsedOptions)
7777
}
78-
79-
override def supportsDataType(dataType: DataType): Boolean = {
80-
CSVDataSourceV2.supportsDataType(dataType)
81-
}
82-
83-
override def formatName: String = "CSV"
8478
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.csv.CSVOptions
2525
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
2626
import org.apache.spark.sql.execution.datasources.v2.FileTable
2727
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
28-
import org.apache.spark.sql.types.StructType
28+
import org.apache.spark.sql.types.{AtomicType, DataType, StructType, UserDefinedType}
2929
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3030

3131
case class CSVTable(
@@ -48,5 +48,15 @@ case class CSVTable(
4848
}
4949

5050
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
51-
new CSVWriteBuilder(options, paths)
51+
new CSVWriteBuilder(options, paths, formatName, supportsDataType)
52+
53+
override def supportsDataType(dataType: DataType): Boolean = dataType match {
54+
case _: AtomicType => true
55+
56+
case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)
57+
58+
case _ => false
59+
}
60+
61+
override def formatName: String = "CSV"
5262
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ import org.apache.spark.sql.internal.SQLConf
2727
import org.apache.spark.sql.types.{DataType, StructType}
2828
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2929

30-
class CSVWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String])
31-
extends FileWriteBuilder(options, paths) {
30+
class CSVWriteBuilder(
31+
options: CaseInsensitiveStringMap,
32+
paths: Seq[String],
33+
formatName: String,
34+
supportsDataType: DataType => Boolean)
35+
extends FileWriteBuilder(options, paths, formatName, supportsDataType) {
3236
override def prepareWrite(
3337
sqlConf: SQLConf,
3438
job: Job,
@@ -56,10 +60,4 @@ class CSVWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String])
5660
}
5761
}
5862
}
59-
60-
override def supportsDataType(dataType: DataType): Boolean = {
61-
CSVDataSourceV2.supportsDataType(dataType)
62-
}
63-
64-
override def formatName: String = "CSV"
6563
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.sql.execution.datasources._
2020
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
2121
import org.apache.spark.sql.execution.datasources.v2._
2222
import org.apache.spark.sql.sources.v2.Table
23-
import org.apache.spark.sql.types._
23+
import org.apache.spark.sql.types.StructType
2424
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2525

2626
class OrcDataSourceV2 extends FileDataSourceV2 {
@@ -42,19 +42,3 @@ class OrcDataSourceV2 extends FileDataSourceV2 {
4242
}
4343
}
4444

45-
object OrcDataSourceV2 {
46-
def supportsDataType(dataType: DataType): Boolean = dataType match {
47-
case _: AtomicType => true
48-
49-
case st: StructType => st.forall { f => supportsDataType(f.dataType) }
50-
51-
case ArrayType(elementType, _) => supportsDataType(elementType)
52-
53-
case MapType(keyType, valueType, _) =>
54-
supportsDataType(keyType) && supportsDataType(valueType)
55-
56-
case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)
57-
58-
case _ => false
59-
}
60-
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
2323
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
2424
import org.apache.spark.sql.execution.datasources.v2.FileScan
2525
import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
26-
import org.apache.spark.sql.types.{DataType, StructType}
26+
import org.apache.spark.sql.types.StructType
2727
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2828
import org.apache.spark.util.SerializableConfiguration
2929

@@ -43,10 +43,4 @@ case class OrcScan(
4343
OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
4444
dataSchema, fileIndex.partitionSchema, readSchema)
4545
}
46-
47-
override def supportsDataType(dataType: DataType): Boolean = {
48-
OrcDataSourceV2.supportsDataType(dataType)
49-
}
50-
51-
override def formatName: String = "ORC"
5246
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.sql.SparkSession
2222
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
2323
import org.apache.spark.sql.execution.datasources.v2.FileTable
2424
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
25-
import org.apache.spark.sql.types.StructType
25+
import org.apache.spark.sql.types._
2626
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2727

2828
case class OrcTable(
@@ -40,5 +40,22 @@ case class OrcTable(
4040
OrcUtils.readSchema(sparkSession, files)
4141

4242
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
43-
new OrcWriteBuilder(options, paths)
43+
new OrcWriteBuilder(options, paths, formatName, supportsDataType)
44+
45+
override def supportsDataType(dataType: DataType): Boolean = dataType match {
46+
case _: AtomicType => true
47+
48+
case st: StructType => st.forall { f => supportsDataType(f.dataType) }
49+
50+
case ArrayType(elementType, _) => supportsDataType(elementType)
51+
52+
case MapType(keyType, valueType, _) =>
53+
supportsDataType(keyType) && supportsDataType(valueType)
54+
55+
case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)
56+
57+
case _ => false
58+
}
59+
60+
override def formatName: String = "ORC"
4461
}

0 commit comments

Comments
 (0)