Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Keep backward-compatibility
  • Loading branch information
maropu committed Jun 27, 2018
commit df1a67f8409c0c81016f5ae9adfe608f0a988273
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,38 @@ package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.types._


object DataSourceUtils {

/**
* Verify if the schema is supported in datasource in write path.
*/
def verifyWriteSchema(format: FileFormat, schema: StructType): Unit = {
verifySchema(format, schema, isReadPath = false)
}

/**
* Verify if the schema is supported in datasource in read path.
*/
def verifyReadSchema(format: FileFormat, schema: StructType): Unit = {
verifySchema(format, schema, isReadPath = true)
}

/**
* Verify if the schema is supported in datasource. This verification should be done
* in a driver side, e.g., `prepareWrite`, `buildReader`, and `buildReaderWithPartitionValues`
* in `FileFormat`.
*
* Unsupported data types of csv, json, orc, and parquet are as follows;
* csv -> R/W: Interval, Null, Array, Map, Struct
* json -> R/W: Interval
* orc -> R/W: Interval, Null
* json -> W: Interval
* orc -> W: Interval, Null
* parquet -> R/W: Interval, Null
*/
def verifySchema(format: FileFormat, schema: StructType): Unit = {
private def verifySchema(format: FileFormat, schema: StructType, isReadPath: Boolean): Unit = {
def throwUnsupportedException(dataType: DataType): Unit = {
throw new UnsupportedOperationException(
s"$format data source does not support ${dataType.simpleString} data type.")
Expand All @@ -56,10 +71,14 @@ object DataSourceUtils {
verifyType(keyType)
verifyType(valueType)

case _: CalendarIntervalType if isReadPath && format.isInstanceOf[JsonFileFormat] ||
isReadPath && format.isInstanceOf[OrcFileFormat] =>

case udt: UserDefinedType[_] => verifyType(udt.sqlType)

// For JSON backward-compatibility
case NullType if format.isInstanceOf[JsonFileFormat] =>
case NullType if format.isInstanceOf[JsonFileFormat] ||
(isReadPath && format.isInstanceOf[OrcFileFormat]) =>

case _ => throwUnsupportedException(dataType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write a comment above this?

// Actually we won't pass in unsupported data types, this is a safety check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
DataSourceUtils.verifySchema(this, dataSchema)
DataSourceUtils.verifyWriteSchema(this, dataSchema)
val conf = job.getConfiguration
val csvOptions = new CSVOptions(
options,
Expand Down Expand Up @@ -98,7 +98,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
DataSourceUtils.verifySchema(this, dataSchema)
DataSourceUtils.verifyReadSchema(this, dataSchema)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
DataSourceUtils.verifySchema(this, dataSchema)
DataSourceUtils.verifyWriteSchema(this, dataSchema)

val conf = job.getConfiguration
val parsedOptions = new JSONOptions(
Expand Down Expand Up @@ -98,7 +98,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
DataSourceUtils.verifySchema(this, dataSchema)
DataSourceUtils.verifyReadSchema(this, dataSchema)

val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class OrcFileFormat
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
DataSourceUtils.verifySchema(this, dataSchema)
DataSourceUtils.verifyWriteSchema(this, dataSchema)

val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)

Expand Down Expand Up @@ -143,7 +143,7 @@ class OrcFileFormat
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
DataSourceUtils.verifySchema(this, dataSchema)
DataSourceUtils.verifyReadSchema(this, dataSchema)

if (sparkSession.sessionState.conf.orcFilterPushDown) {
OrcFilters.createFilter(dataSchema, filters).foreach { f =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ParquetFileFormat
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
DataSourceUtils.verifySchema(this, dataSchema)
DataSourceUtils.verifyWriteSchema(this, dataSchema)

val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)

Expand Down Expand Up @@ -303,7 +303,7 @@ class ParquetFileFormat
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
DataSourceUtils.verifySchema(this, dataSchema)
DataSourceUtils.verifyReadSchema(this, dataSchema)

hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo

// Unsupported data types of csv, json, orc, and parquet are as follows;
// csv -> R/W: Interval, Null, Array, Map, Struct
// json -> R/W: Interval
// orc -> R/W: Interval, Null
// json -> W: Interval
// orc -> W: Interval, Null
// parquet -> R/W: Interval, Null
test("SPARK-24204 error handling for unsupported data types") {
withTempDir { dir =>
Expand Down Expand Up @@ -267,7 +267,39 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
withTempDir { dir =>
val tempDir = new File(dir, "files").getCanonicalPath

Seq("parquet", "orc", "json", "csv").foreach { format =>
Seq("orc", "json").foreach { format =>
// write path
var msg = intercept[AnalysisException] {
sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
assert(msg.contains("Cannot save interval data type into external storage."))

msg = intercept[UnsupportedOperationException] {
spark.udf.register("testType", () => new IntervalData())
sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"$format data source does not support calendarinterval data type."))

// read path
// We expect the types below should be passed for backward-compatibility

// Interval type
var schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
spark.read.schema(schema).format(format).load(tempDir).collect()

// UDT having interval data
schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
spark.read.schema(schema).format(format).load(tempDir).collect()
}
}

withTempDir { dir =>
val tempDir = new File(dir, "files").getCanonicalPath

Seq("parquet", "csv").foreach { format =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can put all the write path together to reduce duplicated code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// write path
var msg = intercept[AnalysisException] {
sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir)
Expand Down Expand Up @@ -299,7 +331,36 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
.contains(s"$format data source does not support calendarinterval data type."))
}

Seq("parquet", "orc", "csv").foreach { format =>
Seq("orc").foreach { format =>
// write path
var msg = intercept[UnsupportedOperationException] {
sql("select null").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"$format data source does not support null data type."))

msg = intercept[UnsupportedOperationException] {
spark.udf.register("testType", () => new NullData())
sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"$format data source does not support null data type."))

// read path
// We expect the types below should be passed for backward-compatibility

// Null type
var schema = StructType(StructField("a", NullType, true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
spark.read.schema(schema).format(format).load(tempDir).collect()

// UDT having null data
schema = StructType(StructField("a", new NullUDT(), true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
spark.read.schema(schema).format(format).load(tempDir).collect()
}

Seq("parquet", "csv").foreach { format =>
// write path
var msg = intercept[UnsupportedOperationException] {
sql("select null").write.format(format).mode("overwrite").save(tempDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
DataSourceUtils.verifySchema(this, dataSchema)
DataSourceUtils.verifyWriteSchema(this, dataSchema)

val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)

Expand Down Expand Up @@ -123,7 +123,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
DataSourceUtils.verifySchema(this, dataSchema)
DataSourceUtils.verifyReadSchema(this, dataSchema)

if (sparkSession.sessionState.conf.orcFilterPushDown) {
// Sets pushed predicates
Expand Down