Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,11 @@ object SQLConf {
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
.createWithDefault("snappy")

val ORC_ENABLED = buildConf("spark.sql.orc.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about spark.sql.orc.useNewVersion? Also let's make it an internal config and enable it by default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

.doc("When true, use OrcFileFormat in sql/core module instead of the one in sql/hive module.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description should include the major difference of these two orc versions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I'll elaborate more.

.booleanConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.checkValues(Set("hive", "native"))
.createWithDefault("native")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

.createWithDefault(false)

val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
"read files of Hive data source directly.")
}

val cls = DataSource.lookupDataSource(source)
val cls = DataSource.lookupDataSource(sparkSession, source)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val options = new DataSourceV2Options(extraOptions.asJava)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

assertNotBucketed("save")

val cls = DataSource.lookupDataSource(source)
val cls = DataSource.lookupDataSource(df.sparkSession, source)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
cls.newInstance() match {
case ds: WriteSupport =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ case class AlterTableAddColumnsCommand(
colsToAdd: Seq[StructField]) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val catalogTable = verifyAlterTableAddColumn(catalog, table)
val catalogTable = verifyAlterTableAddColumn(sparkSession, catalog, table)

try {
sparkSession.catalog.uncacheTable(table.quotedString)
Expand All @@ -216,6 +216,7 @@ case class AlterTableAddColumnsCommand(
* For datasource table, it currently only supports parquet, json, csv.
*/
private def verifyAlterTableAddColumn(
sparkSession: SparkSession,
catalog: SessionCatalog,
table: TableIdentifier): CatalogTable = {
val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
Expand All @@ -229,7 +230,7 @@ case class AlterTableAddColumnsCommand(
}

if (DDLUtils.isDatasourceTable(catalogTable)) {
DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
DataSource.lookupDataSource(sparkSession, catalogTable.provider.get).newInstance() match {
// For datasource table, this command can only support the following File format.
// TextFileFormat only default to one column "value"
// Hive type is already considered as hive serde table, so the logic will not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
Expand Down Expand Up @@ -85,7 +87,7 @@ case class DataSource(

case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])

lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
lazy val providingClass: Class[_] = DataSource.lookupDataSource(sparkSession, className)
lazy val sourceInfo: SourceInfo = sourceSchema()
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
Expand Down Expand Up @@ -568,8 +570,13 @@ object DataSource extends Logging {
"org.apache.spark.Logging")

/** Given a provider name, look up the data source class definition. */
def lookupDataSource(provider: String): Class[_] = {
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
def lookupDataSource(sparkSession: SparkSession, provider: String): Class[_] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of passing the SparkSession, I think we only need SQLConf

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

var provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of using var, you can use the pattern match

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add the maps for new orc format to backwardCompatibilityMap

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Sure.

if (Seq("orc", "org.apache.spark.sql.hive.orc.OrcFileFormat").contains(provider1.toLowerCase) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"org.apache.spark.sql.hive.orc.OrcFileFormat" should still point to the old implementation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

sparkSession.conf.get(SQLConf.ORC_ENABLED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we get the conf from sessionState?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's done.

logInfo(s"$provider1 is replaced with ${classOf[OrcFileFormat].getCanonicalName}")
provider1 = classOf[OrcFileFormat].getCanonicalName
}
val provider2 = s"$provider1.DefaultSource"
val loader = Utils.getContextOrSparkClassLoader
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
}

// Check if the specified data source match the data source of the existing table.
val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get)
val existingProvider = DataSource.lookupDataSource(sparkSession, existingTable.provider.get)
val specifiedProvider = DataSource.lookupDataSource(sparkSession, tableDesc.provider.get)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
if (existingProvider != specifiedProvider) {
Expand Down
21 changes: 21 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -2782,4 +2784,23 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
assert(spark.read.format(orc).load(path).collect().length == 2)
}
}

test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") {
withSQLConf(SQLConf.ORC_ENABLED.key -> "false") {
val e = intercept[AnalysisException] {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
}
assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
}

withSQLConf(SQLConf.ORC_ENABLED.key -> "true") {
withTable("spark_20728") {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst {
case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass
}
assert(fileFormat == Some(classOf[OrcFileFormat]))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.sources

import org.apache.spark.sql.{AnalysisException, SQLContext}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -54,11 +55,17 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {
.load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false))))
}

test("should fail to load ORC without Hive Support") {
val e = intercept[AnalysisException] {
spark.read.format("orc").load()
test("should fail to load ORC only if spark.sql.orc.enabled=false and without Hive Support") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, those tests cover different cases.

  • In this test: true -> Use new OrcFileFormat, false -> Throw Exception (the existing behavior)
  • In that test: true -> Use new OrcFileFormat, false -> Use old OrcFileFormat (the existing behavior).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I confused with SQLQuerySuite.scala in hive. Sorry, I'll remove this.

Seq(
(true, "Unable to infer schema for ORC. It must be specified manually"),
(false, "The ORC data source must be used with Hive support")).foreach { case (value, m) =>
withSQLConf(SQLConf.ORC_ENABLED.key -> s"$value") {
val e = intercept[AnalysisException] {
spark.read.format("orc").load()
}
assert(e.message.contains(m))
}
}
assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}
}


test("resolve default source") {
spark.read
.format("org.apache.spark.sql.test")
Expand Down Expand Up @@ -478,42 +477,56 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
spark.read.schema(userSchema).parquet(Seq(dir, dir): _*), expData ++ expData, userSchema)
}

/**
* This only tests whether API compiles, but does not run it as orc()
* cannot be run without Hive classes.
*/
ignore("orc - API") {
// Reader, with user specified schema
// Refer to csv-specific test suites for behavior without user specified schema
spark.read.schema(userSchema).orc()
spark.read.schema(userSchema).orc(dir)
spark.read.schema(userSchema).orc(dir, dir, dir)
spark.read.schema(userSchema).orc(Seq(dir, dir): _*)
Option(dir).map(spark.read.schema(userSchema).orc)
test("orc - API and behavior regarding schema") {
withSQLConf(SQLConf.ORC_ENABLED.key -> "true") {
// Writer
spark.createDataset(data).toDF("str").write.mode(SaveMode.Overwrite).orc(dir)
val df = spark.read.orc(dir)
checkAnswer(df, spark.createDataset(data).toDF())
val schema = df.schema

// Writer
spark.range(10).write.orc(dir)
// Reader, without user specified schema
intercept[AnalysisException] {
testRead(spark.read.orc(), Seq.empty, schema)
}
testRead(spark.read.orc(dir), data, schema)
testRead(spark.read.orc(dir, dir), data ++ data, schema)
testRead(spark.read.orc(Seq(dir, dir): _*), data ++ data, schema)
// Test explicit calls to single arg method - SPARK-16009
testRead(Option(dir).map(spark.read.orc).get, data, schema)

// Reader, with user specified schema, data should be nulls as schema in file different
// from user schema
val expData = Seq[String](null, null, null)
testRead(spark.read.schema(userSchema).orc(), Seq.empty, userSchema)
testRead(spark.read.schema(userSchema).orc(dir), expData, userSchema)
testRead(spark.read.schema(userSchema).orc(dir, dir), expData ++ expData, userSchema)
testRead(
spark.read.schema(userSchema).orc(Seq(dir, dir): _*), expData ++ expData, userSchema)
}
}

test("column nullability and comment - write and then read") {
Seq("json", "parquet", "csv").foreach { format =>
val schema = StructType(
StructField("cl1", IntegerType, nullable = false).withComment("test") ::
StructField("cl2", IntegerType, nullable = true) ::
StructField("cl3", IntegerType, nullable = true) :: Nil)
val row = Row(3, null, 4)
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)

val tableName = "tab"
withTable(tableName) {
df.write.format(format).mode("overwrite").saveAsTable(tableName)
// Verify the DDL command result: DESCRIBE TABLE
checkAnswer(
sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"),
Row("cl1", "test") :: Nil)
// Verify the schema
val expectedFields = schema.fields.map(f => f.copy(nullable = true))
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
withSQLConf(SQLConf.ORC_ENABLED.key -> "true") {
Seq("json", "orc", "parquet", "csv").foreach { format =>
val schema = StructType(
StructField("cl1", IntegerType, nullable = false).withComment("test") ::
StructField("cl2", IntegerType, nullable = true) ::
StructField("cl3", IntegerType, nullable = true) :: Nil)
val row = Row(3, null, 4)
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)

val tableName = "tab"
withTable(tableName) {
df.write.format(format).mode("overwrite").saveAsTable(tableName)
// Verify the DDL command result: DESCRIBE TABLE
checkAnswer(
sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"),
Row("cl1", "test") :: Nil)
// Verify the schema
val expectedFields = schema.fields.map(f => f.copy(nullable = true))
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,18 @@ case class RelationConversions(
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.storage.properties
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc")
if (conf.getConf(SQLConf.ORC_ENABLED)) {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc")
} else {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], "orc")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indents.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2153,4 +2153,21 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
}

test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move it to OrcQuerySuite

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

Seq(
(true, classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]),
(false, classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (v, format) =>

withSQLConf(SQLConf.ORC_ENABLED.key -> s"$v") {
withTable("spark_20728") {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst {
case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass
}
assert(fileFormat == Some(format))
}
}
}
}
}